""" Price formatting utilities with exchange-specific precision handling. """ import logging import math from typing import Dict, Optional, Any from functools import lru_cache logger = logging.getLogger(__name__) def normalize_token_case(token: str) -> str: """ Normalize token case: if any characters are already uppercase, keep as-is. Otherwise, convert to uppercase. This handles mixed-case tokens like kPEPE, kBONK. """ if any(c.isupper() for c in token): return token else: return token.upper() class TokenDisplayFormatter: """Handles price and amount formatting with proper decimal precision from exchange data.""" def __init__(self, trading_engine=None): """ Initialize price formatter. Args: trading_engine: TradingEngine instance for accessing exchange data """ self.trading_engine = trading_engine # Cache structure: {token: {'price_decimals': X, 'amount_decimals': Y}} self._precision_cache: Dict[str, Dict[str, int]] = {} self._markets_cache: Optional[Dict[str, Any]] = None async def _load_markets_data(self) -> Dict[str, Any]: """Load markets data with caching.""" if self._markets_cache is None and self.trading_engine: try: markets = await self.trading_engine.client.get_markets() if markets: self._markets_cache = markets logger.info(f"📊 Loaded {len(markets)} markets for precision data") else: logger.warning("⚠️ Could not load markets data from exchange") self._markets_cache = {} except Exception as e: logger.error(f"❌ Error loading markets data: {e}") self._markets_cache = {} return self._markets_cache or {} async def _fetch_and_cache_precisions(self, token: str) -> Optional[Dict[str, int]]: """ Fetches price and amount precisions for a token from market data and caches them. Returns the cached dict {'price_decimals': X, 'amount_decimals': Y} or None if not found. """ normalized_token = normalize_token_case(token) if normalized_token in self._precision_cache: return self._precision_cache[normalized_token] markets = await self._load_markets_data() if not markets: logger.debug(f"No markets data available for {normalized_token}, cannot fetch precisions.") return None # Search for the market symbol in a more streamlined way symbol_variants = [ f"{normalized_token}/USDC:USDC", f"{normalized_token}/USDC", token # Direct match for fully qualified symbols ] market_info = None for symbol in symbol_variants: if symbol in markets: market_info = markets[symbol] break if not market_info: logger.warning(f"Market info not found for {normalized_token} or its variants.") return None precision_info = market_info.get('precision', {}) price_precision = precision_info.get('price') amount_precision = precision_info.get('amount') price_decimals = self._get_default_price_decimals_for_token(normalized_token) if price_precision and price_precision > 0: price_decimals = int(-math.log10(price_precision)) amount_decimals = 6 # Default amount precision if amount_precision and amount_precision > 0: amount_decimals = int(-math.log10(amount_precision)) self._precision_cache[normalized_token] = { 'price_decimals': price_decimals, 'amount_decimals': amount_decimals } logger.debug(f"📊 Cached precisions for {normalized_token}: price {price_decimals}, amount {amount_decimals}") return self._precision_cache[normalized_token] async def get_token_price_decimal_places(self, token: str) -> int: """ Get the number of decimal places for a token's price. """ normalized_token = normalize_token_case(token) precisions = self._precision_cache.get(normalized_token) if not precisions: precisions = await self._fetch_and_cache_precisions(normalized_token) if precisions: return precisions['price_decimals'] # Fallback to smart default if fetching failed completely return self._get_default_price_decimals_for_token(normalized_token) async def get_token_amount_decimal_places(self, token: str) -> int: """ Get the number of decimal places for a token's amount (quantity). """ normalized_token = normalize_token_case(token) precisions = self._precision_cache.get(normalized_token) if not precisions: precisions = await self._fetch_and_cache_precisions(normalized_token) if precisions: return precisions['amount_decimals'] # Fallback if fetching failed - consider a sensible default for amounts logger.warning(f"Amount precision not found for {normalized_token}, defaulting to 6.") return 6 # Default amount precision def _get_default_price_decimals_for_token(self, token: str) -> int: """Get smart default price decimal places based on token characteristics.""" token_upper = token.upper() # Define decimal places for different token categories token_decimals = { 2: ['BTC', 'ETH', 'BNB', 'SOL', 'ADA', 'DOT', 'AVAX', 'MATIC', 'LINK'], 4: ['DOGE', 'XRP', 'LTC', 'BCH', 'ETC', 'FIL', 'AAVE', 'UNI'], 6: ['PEPE', 'SHIB', 'FLOKI', 'BONK', 'WIF'] } # Check for meme tokens first, as they might be substrings for decimals, tokens in sorted(token_decimals.items(), reverse=True): if any(t in token_upper for t in tokens): return decimals # Check for major tokens if token_upper in token_decimals[2]: return 2 return 4 # Default for other tokens async def format_price(self, price: float, token: str = None) -> str: """ Format a price with appropriate decimal places. """ if price is None: return "N/A" # Handle None price gracefully try: if token: decimal_places = await self.get_token_price_decimal_places(token) else: # Smart default based on price magnitude if price == 0: decimal_places = 2 elif abs(price) >= 1000: decimal_places = 2 elif abs(price) >= 1: decimal_places = 3 elif abs(price) >= 0.01: decimal_places = 4 else: decimal_places = 6 return f"{price:,.{decimal_places}f}" except Exception as e: logger.error(f"❌ Error formatting price {price} for {token}: {e}") try: return f"{price:,.2f}" except Exception: return str(price) async def format_price_with_symbol(self, price: float, token: str = None) -> str: """ Format a price with currency symbol and appropriate decimal places. """ formatted_price = await self.format_price(price, token) return f"${formatted_price}" async def format_amount(self, amount: float, token: str) -> str: """ Format an amount (quantity) with appropriate decimal places for the given token. """ try: decimal_places = await self.get_token_amount_decimal_places(token) return f"{amount:,.{decimal_places}f}" except Exception as e: logger.error(f"❌ Error formatting amount {amount} for {token}: {e}") # Fallback, ensuring a reasonable number of decimals for an amount return f"{amount:,.6f}" def clear_cache(self): """Clear the precision cache.""" self._precision_cache.clear() self._markets_cache = None logger.info("🗑️ Cleared price formatter cache") # Global formatter instance _global_formatter: Optional[TokenDisplayFormatter] = None def set_global_trading_engine(trading_engine): """Set the trading engine for the global formatter.""" global _global_formatter if _global_formatter is None or _global_formatter.trading_engine is None: _global_formatter = TokenDisplayFormatter(trading_engine) logger.info("Global TokenDisplayFormatter initialized with trading engine.") elif trading_engine is not _global_formatter.trading_engine: _global_formatter.trading_engine = trading_engine _global_formatter.clear_cache() # Clear cache if engine changes logger.info("Global TokenDisplayFormatter updated with a new trading engine and cache cleared.") def get_formatter() -> TokenDisplayFormatter: """Get the global formatter instance. Ensures trading_engine is set if possible.""" global _global_formatter if _global_formatter is None: # Attempt to create a basic formatter. # It's better if set_global_trading_engine is called explicitly during app setup. logger.warning("TokenDisplayFormatter.get_formatter() called before trading_engine was set globally. Creating instance without it. Precision data may be limited to defaults.") _global_formatter = TokenDisplayFormatter() return _global_formatter async def format_price(price: float, token: str = None) -> str: """Convenience function to format price using global formatter.""" return await get_formatter().format_price(price, token) async def format_price_with_symbol(price: float, token: str = None) -> str: """Convenience function to format price with $ symbol using global formatter.""" return await get_formatter().format_price_with_symbol(price, token) async def format_amount(amount: float, token: str) -> str: """Convenience function to format amount using global formatter.""" return await get_formatter().format_amount(amount, token)