token_display_formatter.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. """
  2. Price formatting utilities with exchange-specific precision handling.
  3. """
  4. import logging
  5. import math
  6. from typing import Dict, Optional, Any
  7. from functools import lru_cache
  8. logger = logging.getLogger(__name__)
  9. def normalize_token_case(token: str) -> str:
  10. """
  11. Normalize token case: if any characters are already uppercase, keep as-is.
  12. Otherwise, convert to uppercase. This handles mixed-case tokens like kPEPE, kBONK.
  13. """
  14. if any(c.isupper() for c in token):
  15. return token
  16. else:
  17. return token.upper()
  18. class TokenDisplayFormatter:
  19. """Handles price and amount formatting with proper decimal precision from exchange data."""
  20. def __init__(self, trading_engine=None):
  21. """
  22. Initialize price formatter.
  23. Args:
  24. trading_engine: TradingEngine instance for accessing exchange data
  25. """
  26. self.trading_engine = trading_engine
  27. # Cache structure: {token: {'price_decimals': X, 'amount_decimals': Y}}
  28. self._precision_cache: Dict[str, Dict[str, int]] = {}
  29. self._markets_cache: Optional[Dict[str, Any]] = None
  30. async def _load_markets_data(self) -> Dict[str, Any]:
  31. """Load markets data with caching."""
  32. if self._markets_cache is None and self.trading_engine:
  33. try:
  34. markets = await self.trading_engine.client.get_markets()
  35. if markets:
  36. self._markets_cache = markets
  37. logger.info(f"📊 Loaded {len(markets)} markets for precision data")
  38. else:
  39. logger.warning("⚠️ Could not load markets data from exchange")
  40. self._markets_cache = {}
  41. except Exception as e:
  42. logger.error(f"❌ Error loading markets data: {e}")
  43. self._markets_cache = {}
  44. return self._markets_cache or {}
  45. async def _fetch_and_cache_precisions(self, token: str) -> Optional[Dict[str, int]]:
  46. """
  47. Fetches price and amount precisions for a token from market data and caches them.
  48. Returns the cached dict {'price_decimals': X, 'amount_decimals': Y} or None if not found.
  49. """
  50. normalized_token = normalize_token_case(token)
  51. if normalized_token in self._precision_cache:
  52. return self._precision_cache[normalized_token]
  53. markets = await self._load_markets_data()
  54. if not markets:
  55. logger.debug(f"No markets data available for {normalized_token}, cannot fetch precisions.")
  56. return None
  57. # Search for the market symbol in a more streamlined way
  58. symbol_variants = [
  59. f"{normalized_token}/USDC:USDC",
  60. f"{normalized_token}/USDC",
  61. token # Direct match for fully qualified symbols
  62. ]
  63. market_info = None
  64. for symbol in symbol_variants:
  65. if symbol in markets:
  66. market_info = markets[symbol]
  67. break
  68. if not market_info:
  69. logger.warning(f"Market info not found for {normalized_token} or its variants.")
  70. return None
  71. precision_info = market_info.get('precision', {})
  72. price_precision = precision_info.get('price')
  73. amount_precision = precision_info.get('amount')
  74. price_decimals = self._get_default_price_decimals_for_token(normalized_token)
  75. if price_precision and price_precision > 0:
  76. price_decimals = int(-math.log10(price_precision))
  77. amount_decimals = 6 # Default amount precision
  78. if amount_precision and amount_precision > 0:
  79. amount_decimals = int(-math.log10(amount_precision))
  80. self._precision_cache[normalized_token] = {
  81. 'price_decimals': price_decimals,
  82. 'amount_decimals': amount_decimals
  83. }
  84. logger.debug(f"📊 Cached precisions for {normalized_token}: price {price_decimals}, amount {amount_decimals}")
  85. return self._precision_cache[normalized_token]
  86. async def get_token_price_decimal_places(self, token: str) -> int:
  87. """
  88. Get the number of decimal places for a token's price.
  89. """
  90. normalized_token = normalize_token_case(token)
  91. precisions = self._precision_cache.get(normalized_token)
  92. if not precisions:
  93. precisions = await self._fetch_and_cache_precisions(normalized_token)
  94. if precisions:
  95. return precisions['price_decimals']
  96. # Fallback to smart default if fetching failed completely
  97. return self._get_default_price_decimals_for_token(normalized_token)
  98. async def get_token_amount_decimal_places(self, token: str) -> int:
  99. """
  100. Get the number of decimal places for a token's amount (quantity).
  101. """
  102. normalized_token = normalize_token_case(token)
  103. precisions = self._precision_cache.get(normalized_token)
  104. if not precisions:
  105. precisions = await self._fetch_and_cache_precisions(normalized_token)
  106. if precisions:
  107. return precisions['amount_decimals']
  108. # Fallback if fetching failed - consider a sensible default for amounts
  109. logger.warning(f"Amount precision not found for {normalized_token}, defaulting to 6.")
  110. return 6 # Default amount precision
  111. def _get_default_price_decimals_for_token(self, token: str) -> int:
  112. """Get smart default price decimal places based on token characteristics."""
  113. token_upper = token.upper()
  114. # Define decimal places for different token categories
  115. token_decimals = {
  116. 2: ['BTC', 'ETH', 'BNB', 'SOL', 'ADA', 'DOT', 'AVAX', 'MATIC', 'LINK'],
  117. 4: ['DOGE', 'XRP', 'LTC', 'BCH', 'ETC', 'FIL', 'AAVE', 'UNI'],
  118. 6: ['PEPE', 'SHIB', 'FLOKI', 'BONK', 'WIF']
  119. }
  120. # Check for meme tokens first, as they might be substrings
  121. for decimals, tokens in sorted(token_decimals.items(), reverse=True):
  122. if any(t in token_upper for t in tokens):
  123. return decimals
  124. # Check for major tokens
  125. if token_upper in token_decimals[2]:
  126. return 2
  127. return 4 # Default for other tokens
  128. async def format_price(self, price: float, token: str = None) -> str:
  129. """
  130. Format a price with appropriate decimal places.
  131. """
  132. if price is None:
  133. return "N/A" # Handle None price gracefully
  134. try:
  135. if token:
  136. decimal_places = await self.get_token_price_decimal_places(token)
  137. else:
  138. # Smart default based on price magnitude
  139. if price == 0:
  140. decimal_places = 2
  141. elif abs(price) >= 1000:
  142. decimal_places = 2
  143. elif abs(price) >= 1:
  144. decimal_places = 3
  145. elif abs(price) >= 0.01:
  146. decimal_places = 4
  147. else:
  148. decimal_places = 6
  149. return f"{price:,.{decimal_places}f}"
  150. except Exception as e:
  151. logger.error(f"❌ Error formatting price {price} for {token}: {e}")
  152. try:
  153. return f"{price:,.2f}"
  154. except Exception:
  155. return str(price)
  156. async def format_price_with_symbol(self, price: float, token: str = None) -> str:
  157. """
  158. Format a price with currency symbol and appropriate decimal places.
  159. """
  160. formatted_price = await self.format_price(price, token)
  161. return f"${formatted_price}"
  162. async def format_amount(self, amount: float, token: str) -> str:
  163. """
  164. Format an amount (quantity) with appropriate decimal places for the given token.
  165. """
  166. try:
  167. decimal_places = await self.get_token_amount_decimal_places(token)
  168. return f"{amount:,.{decimal_places}f}"
  169. except Exception as e:
  170. logger.error(f"❌ Error formatting amount {amount} for {token}: {e}")
  171. # Fallback, ensuring a reasonable number of decimals for an amount
  172. return f"{amount:,.6f}"
  173. def clear_cache(self):
  174. """Clear the precision cache."""
  175. self._precision_cache.clear()
  176. self._markets_cache = None
  177. logger.info("🗑️ Cleared price formatter cache")
  178. # Global formatter instance
  179. _global_formatter: Optional[TokenDisplayFormatter] = None
  180. def set_global_trading_engine(trading_engine):
  181. """Set the trading engine for the global formatter."""
  182. global _global_formatter
  183. if _global_formatter is None or _global_formatter.trading_engine is None:
  184. _global_formatter = TokenDisplayFormatter(trading_engine)
  185. logger.info("Global TokenDisplayFormatter initialized with trading engine.")
  186. elif trading_engine is not _global_formatter.trading_engine:
  187. _global_formatter.trading_engine = trading_engine
  188. _global_formatter.clear_cache() # Clear cache if engine changes
  189. logger.info("Global TokenDisplayFormatter updated with a new trading engine and cache cleared.")
  190. def get_formatter() -> TokenDisplayFormatter:
  191. """Get the global formatter instance. Ensures trading_engine is set if possible."""
  192. global _global_formatter
  193. if _global_formatter is None:
  194. # Attempt to create a basic formatter.
  195. # It's better if set_global_trading_engine is called explicitly during app setup.
  196. logger.warning("TokenDisplayFormatter.get_formatter() called before trading_engine was set globally. Creating instance without it. Precision data may be limited to defaults.")
  197. _global_formatter = TokenDisplayFormatter()
  198. return _global_formatter
  199. async def format_price(price: float, token: str = None) -> str:
  200. """Convenience function to format price using global formatter."""
  201. return await get_formatter().format_price(price, token)
  202. async def format_price_with_symbol(price: float, token: str = None) -> str:
  203. """Convenience function to format price with $ symbol using global formatter."""
  204. return await get_formatter().format_price_with_symbol(price, token)
  205. async def format_amount(amount: float, token: str) -> str:
  206. """Convenience function to format amount using global formatter."""
  207. return await get_formatter().format_amount(amount, token)