price_formatter.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. """
  2. Price formatting utilities with exchange-specific precision handling.
  3. """
  4. import logging
  5. import math
  6. from typing import Dict, Optional, Any
  7. from functools import lru_cache
  8. logger = logging.getLogger(__name__)
  9. def _normalize_token_case(token: str) -> str:
  10. """
  11. Normalize token case: if any characters are already uppercase, keep as-is.
  12. Otherwise, convert to uppercase. This handles mixed-case tokens like kPEPE, kBONK.
  13. """
  14. # Check if any character is already uppercase
  15. if any(c.isupper() for c in token):
  16. return token # Keep original case for mixed-case tokens
  17. else:
  18. return token.upper() # Convert to uppercase for all-lowercase input
  19. class PriceFormatter:
  20. """Handles price formatting with proper decimal precision from exchange data."""
  21. def __init__(self, trading_engine=None):
  22. """
  23. Initialize price formatter.
  24. Args:
  25. trading_engine: TradingEngine instance for accessing exchange data
  26. """
  27. self.trading_engine = trading_engine
  28. self._precision_cache: Dict[str, int] = {}
  29. self._markets_cache: Optional[Dict[str, Any]] = None
  30. def _load_markets_data(self) -> Dict[str, Any]:
  31. """Load markets data with caching."""
  32. if self._markets_cache is None and self.trading_engine:
  33. try:
  34. markets = self.trading_engine.client.get_markets()
  35. if markets:
  36. self._markets_cache = markets
  37. logger.info(f"📊 Loaded {len(markets)} markets for precision data")
  38. else:
  39. logger.warning("⚠️ Could not load markets data from exchange")
  40. self._markets_cache = {}
  41. except Exception as e:
  42. logger.error(f"❌ Error loading markets data: {e}")
  43. self._markets_cache = {}
  44. return self._markets_cache or {}
  45. def get_token_decimal_places(self, token: str) -> int:
  46. """
  47. Get the number of decimal places for a token price based on exchange precision.
  48. Args:
  49. token: Token symbol (e.g., 'BTC', 'PEPE')
  50. Returns:
  51. Number of decimal places for price formatting
  52. """
  53. # Check cache first
  54. if token in self._precision_cache:
  55. return self._precision_cache[token]
  56. # Default decimal places for fallback
  57. default_decimals = 2
  58. try:
  59. markets = self._load_markets_data()
  60. if not markets:
  61. logger.debug(f"No markets data available for {token}, using default {default_decimals} decimals")
  62. return default_decimals
  63. # Try different symbol formats
  64. possible_symbols = [
  65. f"{token}/USDC:USDC",
  66. f"{token}/USDC",
  67. f"k{token}/USDC:USDC", # For some tokens like kPEPE
  68. f"H{token}/USDC", # For some tokens like HPEPE
  69. ]
  70. for symbol in possible_symbols:
  71. if symbol in markets:
  72. market_info = markets[symbol]
  73. precision_info = market_info.get('precision', {})
  74. price_precision = precision_info.get('price')
  75. if price_precision and price_precision > 0:
  76. # Convert precision to decimal places
  77. # precision 0.01 -> 2 decimals, 0.001 -> 3 decimals, etc.
  78. decimal_places = int(-math.log10(price_precision))
  79. # Cache the result
  80. self._precision_cache[token] = decimal_places
  81. logger.debug(f"📊 {token} precision: {price_precision} -> {decimal_places} decimal places")
  82. return decimal_places
  83. # If no matching symbol found, use smart defaults based on token type
  84. decimal_places = self._get_default_decimals_for_token(token)
  85. self._precision_cache[token] = decimal_places
  86. logger.debug(f"💡 {token} using default precision: {decimal_places} decimal places")
  87. return decimal_places
  88. except Exception as e:
  89. logger.error(f"❌ Error getting precision for {token}: {e}")
  90. self._precision_cache[token] = default_decimals
  91. return default_decimals
  92. def _get_default_decimals_for_token(self, token: str) -> int:
  93. """Get smart default decimal places based on token characteristics."""
  94. token = _normalize_token_case(token)
  95. # High-value tokens (usually need fewer decimals)
  96. if token in ['BTC', 'ETH', 'BNB', 'SOL', 'ADA', 'DOT', 'AVAX', 'MATIC', 'LINK']:
  97. return 2
  98. # Mid-range tokens
  99. if token in ['DOGE', 'XRP', 'LTC', 'BCH', 'ETC', 'FIL', 'AAVE', 'UNI']:
  100. return 4
  101. # Meme/micro-cap tokens (usually need more decimals)
  102. if any(meme in token for meme in ['PEPE', 'SHIB', 'DOGE', 'FLOKI', 'BONK', 'WIF']):
  103. return 6
  104. # Default for unknown tokens
  105. return 4
  106. def format_price(self, price: float, token: str = None) -> str:
  107. """
  108. Format a price with appropriate decimal places.
  109. Args:
  110. price: Price value to format
  111. token: Token symbol for precision lookup (optional)
  112. Returns:
  113. Formatted price string
  114. """
  115. try:
  116. if token:
  117. decimal_places = self.get_token_decimal_places(token)
  118. else:
  119. # Use smart default based on price magnitude
  120. if price >= 1000:
  121. decimal_places = 2
  122. elif price >= 1:
  123. decimal_places = 3
  124. elif price >= 0.01:
  125. decimal_places = 4
  126. elif price >= 0.0001:
  127. decimal_places = 6
  128. else:
  129. decimal_places = 8
  130. # Format with proper decimal places and thousand separators
  131. formatted = f"{price:,.{decimal_places}f}"
  132. return formatted
  133. except Exception as e:
  134. logger.error(f"❌ Error formatting price {price} for {token}: {e}")
  135. return f"{price:,.2f}" # Fallback to 2 decimals
  136. def format_price_with_symbol(self, price: float, token: str = None) -> str:
  137. """
  138. Format a price with currency symbol and appropriate decimal places.
  139. Args:
  140. price: Price value to format
  141. token: Token symbol for precision lookup (optional)
  142. Returns:
  143. Formatted price string with $ symbol
  144. """
  145. formatted_price = self.format_price(price, token)
  146. return f"${formatted_price}"
  147. def clear_cache(self):
  148. """Clear the precision cache."""
  149. self._precision_cache.clear()
  150. self._markets_cache = None
  151. logger.info("🗑️ Cleared price formatter cache")
  152. # Global formatter instance
  153. _global_formatter: Optional[PriceFormatter] = None
  154. def set_global_trading_engine(trading_engine):
  155. """Set the trading engine for the global formatter."""
  156. global _global_formatter
  157. _global_formatter = PriceFormatter(trading_engine)
  158. def get_formatter() -> PriceFormatter:
  159. """Get the global formatter instance."""
  160. global _global_formatter
  161. if _global_formatter is None:
  162. _global_formatter = PriceFormatter()
  163. return _global_formatter
  164. def format_price(price: float, token: str = None) -> str:
  165. """Convenience function to format price using global formatter."""
  166. return get_formatter().format_price(price, token)
  167. def format_price_with_symbol(price: float, token: str = None) -> str:
  168. """Convenience function to format price with $ symbol using global formatter."""
  169. return get_formatter().format_price_with_symbol(price, token)