123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247 |
- """
- Price formatting utilities with exchange-specific precision handling.
- """
- import logging
- import math
- from typing import Dict, Optional, Any
- from functools import lru_cache
- logger = logging.getLogger(__name__)
- def normalize_token_case(token: str) -> str:
- """
- Normalize token case: if any characters are already uppercase, keep as-is.
- Otherwise, convert to uppercase. This handles mixed-case tokens like kPEPE, kBONK.
- """
- if any(c.isupper() for c in token):
- return token
- else:
- return token.upper()
- class TokenDisplayFormatter:
- """Handles price and amount formatting with proper decimal precision from exchange data."""
- def __init__(self, trading_engine=None):
- """
- Initialize price formatter.
- Args:
- trading_engine: TradingEngine instance for accessing exchange data
- """
- self.trading_engine = trading_engine
- # Cache structure: {token: {'price_decimals': X, 'amount_decimals': Y}}
- self._precision_cache: Dict[str, Dict[str, int]] = {}
- self._markets_cache: Optional[Dict[str, Any]] = None
- async def _load_markets_data(self) -> Dict[str, Any]:
- """Load markets data with caching."""
- if self._markets_cache is None and self.trading_engine:
- try:
- markets = await self.trading_engine.client.get_markets()
- if markets:
- self._markets_cache = markets
- logger.info(f"📊 Loaded {len(markets)} markets for precision data")
- else:
- logger.warning("⚠️ Could not load markets data from exchange")
- self._markets_cache = {}
- except Exception as e:
- logger.error(f"❌ Error loading markets data: {e}")
- self._markets_cache = {}
- return self._markets_cache or {}
- async def _fetch_and_cache_precisions(self, token: str) -> Optional[Dict[str, int]]:
- """
- Fetches price and amount precisions for a token from market data and caches them.
- Returns the cached dict {'price_decimals': X, 'amount_decimals': Y} or None if not found.
- """
- normalized_token = normalize_token_case(token)
- if normalized_token in self._precision_cache:
- return self._precision_cache[normalized_token]
- markets = await self._load_markets_data()
- if not markets:
- logger.debug(f"No markets data available for {normalized_token}, cannot fetch precisions.")
- return None
- # Search for the market symbol in a more streamlined way
- symbol_variants = [
- f"{normalized_token}/USDC:USDC",
- f"{normalized_token}/USDC",
- token # Direct match for fully qualified symbols
- ]
-
- market_info = None
- for symbol in symbol_variants:
- if symbol in markets:
- market_info = markets[symbol]
- break
-
- if not market_info:
- logger.warning(f"Market info not found for {normalized_token} or its variants.")
- return None
- precision_info = market_info.get('precision', {})
- price_precision = precision_info.get('price')
- amount_precision = precision_info.get('amount')
- price_decimals = self._get_default_price_decimals_for_token(normalized_token)
- if price_precision and price_precision > 0:
- price_decimals = int(-math.log10(price_precision))
- amount_decimals = 6 # Default amount precision
- if amount_precision and amount_precision > 0:
- amount_decimals = int(-math.log10(amount_precision))
-
- self._precision_cache[normalized_token] = {
- 'price_decimals': price_decimals,
- 'amount_decimals': amount_decimals
- }
- logger.debug(f"📊 Cached precisions for {normalized_token}: price {price_decimals}, amount {amount_decimals}")
- return self._precision_cache[normalized_token]
- async def get_token_price_decimal_places(self, token: str) -> int:
- """
- Get the number of decimal places for a token's price.
- """
- normalized_token = normalize_token_case(token)
- precisions = self._precision_cache.get(normalized_token)
- if not precisions:
- precisions = await self._fetch_and_cache_precisions(normalized_token)
- if precisions:
- return precisions['price_decimals']
-
- # Fallback to smart default if fetching failed completely
- return self._get_default_price_decimals_for_token(normalized_token)
- async def get_token_amount_decimal_places(self, token: str) -> int:
- """
- Get the number of decimal places for a token's amount (quantity).
- """
- normalized_token = normalize_token_case(token)
- precisions = self._precision_cache.get(normalized_token)
- if not precisions:
- precisions = await self._fetch_and_cache_precisions(normalized_token)
-
- if precisions:
- return precisions['amount_decimals']
-
- # Fallback if fetching failed - consider a sensible default for amounts
- logger.warning(f"Amount precision not found for {normalized_token}, defaulting to 6.")
- return 6 # Default amount precision
- def _get_default_price_decimals_for_token(self, token: str) -> int:
- """Get smart default price decimal places based on token characteristics."""
- token_upper = token.upper()
- # Define decimal places for different token categories
- token_decimals = {
- 2: ['BTC', 'ETH', 'BNB', 'SOL', 'ADA', 'DOT', 'AVAX', 'MATIC', 'LINK'],
- 4: ['DOGE', 'XRP', 'LTC', 'BCH', 'ETC', 'FIL', 'AAVE', 'UNI'],
- 6: ['PEPE', 'SHIB', 'FLOKI', 'BONK', 'WIF']
- }
- # Check for meme tokens first, as they might be substrings
- for decimals, tokens in sorted(token_decimals.items(), reverse=True):
- if any(t in token_upper for t in tokens):
- return decimals
-
- # Check for major tokens
- if token_upper in token_decimals[2]:
- return 2
-
- return 4 # Default for other tokens
- async def format_price(self, price: float, token: str = None) -> str:
- """
- Format a price with appropriate decimal places.
- """
- if price is None:
- return "N/A" # Handle None price gracefully
- try:
- if token:
- decimal_places = await self.get_token_price_decimal_places(token)
- else:
- # Smart default based on price magnitude
- if price == 0:
- decimal_places = 2
- elif abs(price) >= 1000:
- decimal_places = 2
- elif abs(price) >= 1:
- decimal_places = 3
- elif abs(price) >= 0.01:
- decimal_places = 4
- else:
- decimal_places = 6
-
- return f"{price:,.{decimal_places}f}"
- except Exception as e:
- logger.error(f"❌ Error formatting price {price} for {token}: {e}")
- try:
- return f"{price:,.2f}"
- except Exception:
- return str(price)
- async def format_price_with_symbol(self, price: float, token: str = None) -> str:
- """
- Format a price with currency symbol and appropriate decimal places.
- """
- formatted_price = await self.format_price(price, token)
- return f"${formatted_price}"
- async def format_amount(self, amount: float, token: str) -> str:
- """
- Format an amount (quantity) with appropriate decimal places for the given token.
- """
- try:
- decimal_places = await self.get_token_amount_decimal_places(token)
- return f"{amount:,.{decimal_places}f}"
- except Exception as e:
- logger.error(f"❌ Error formatting amount {amount} for {token}: {e}")
- # Fallback, ensuring a reasonable number of decimals for an amount
- return f"{amount:,.6f}"
- def clear_cache(self):
- """Clear the precision cache."""
- self._precision_cache.clear()
- self._markets_cache = None
- logger.info("🗑️ Cleared price formatter cache")
- # Global formatter instance
- _global_formatter: Optional[TokenDisplayFormatter] = None
- def set_global_trading_engine(trading_engine):
- """Set the trading engine for the global formatter."""
- global _global_formatter
- if _global_formatter is None or _global_formatter.trading_engine is None:
- _global_formatter = TokenDisplayFormatter(trading_engine)
- logger.info("Global TokenDisplayFormatter initialized with trading engine.")
- elif trading_engine is not _global_formatter.trading_engine:
- _global_formatter.trading_engine = trading_engine
- _global_formatter.clear_cache() # Clear cache if engine changes
- logger.info("Global TokenDisplayFormatter updated with a new trading engine and cache cleared.")
- def get_formatter() -> TokenDisplayFormatter:
- """Get the global formatter instance. Ensures trading_engine is set if possible."""
- global _global_formatter
- if _global_formatter is None:
- # Attempt to create a basic formatter.
- # It's better if set_global_trading_engine is called explicitly during app setup.
- logger.warning("TokenDisplayFormatter.get_formatter() called before trading_engine was set globally. Creating instance without it. Precision data may be limited to defaults.")
- _global_formatter = TokenDisplayFormatter()
- return _global_formatter
- async def format_price(price: float, token: str = None) -> str:
- """Convenience function to format price using global formatter."""
- return await get_formatter().format_price(price, token)
- async def format_price_with_symbol(price: float, token: str = None) -> str:
- """Convenience function to format price with $ symbol using global formatter."""
- return await get_formatter().format_price_with_symbol(price, token)
- async def format_amount(amount: float, token: str) -> str:
- """Convenience function to format amount using global formatter."""
- return await get_formatter().format_amount(amount, token)
|