123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208 |
- """
- Price formatting utilities with exchange-specific precision handling.
- """
- import logging
- import math
- from typing import Dict, Optional, Any
- from functools import lru_cache
- logger = logging.getLogger(__name__)
- def _normalize_token_case(token: str) -> str:
- """
- Normalize token case: if any characters are already uppercase, keep as-is.
- Otherwise, convert to uppercase. This handles mixed-case tokens like kPEPE, kBONK.
- """
- # Check if any character is already uppercase
- if any(c.isupper() for c in token):
- return token # Keep original case for mixed-case tokens
- else:
- return token.upper() # Convert to uppercase for all-lowercase input
- class PriceFormatter:
- """Handles price formatting with proper decimal precision from exchange data."""
-
- def __init__(self, trading_engine=None):
- """
- Initialize price formatter.
-
- Args:
- trading_engine: TradingEngine instance for accessing exchange data
- """
- self.trading_engine = trading_engine
- self._precision_cache: Dict[str, int] = {}
- self._markets_cache: Optional[Dict[str, Any]] = None
-
- def _load_markets_data(self) -> Dict[str, Any]:
- """Load markets data with caching."""
- if self._markets_cache is None and self.trading_engine:
- try:
- markets = self.trading_engine.client.get_markets()
- if markets:
- self._markets_cache = markets
- logger.info(f"📊 Loaded {len(markets)} markets for precision data")
- else:
- logger.warning("⚠️ Could not load markets data from exchange")
- self._markets_cache = {}
- except Exception as e:
- logger.error(f"❌ Error loading markets data: {e}")
- self._markets_cache = {}
-
- return self._markets_cache or {}
-
- def get_token_decimal_places(self, token: str) -> int:
- """
- Get the number of decimal places for a token price based on exchange precision.
-
- Args:
- token: Token symbol (e.g., 'BTC', 'PEPE')
-
- Returns:
- Number of decimal places for price formatting
- """
- # Check cache first
- if token in self._precision_cache:
- return self._precision_cache[token]
-
- # Default decimal places for fallback
- default_decimals = 2
-
- try:
- markets = self._load_markets_data()
- if not markets:
- logger.debug(f"No markets data available for {token}, using default {default_decimals} decimals")
- return default_decimals
-
- # Try different symbol formats
- possible_symbols = [
- f"{token}/USDC:USDC",
- f"{token}/USDC",
- f"k{token}/USDC:USDC", # For some tokens like kPEPE
- f"H{token}/USDC", # For some tokens like HPEPE
- ]
-
- for symbol in possible_symbols:
- if symbol in markets:
- market_info = markets[symbol]
- precision_info = market_info.get('precision', {})
- price_precision = precision_info.get('price')
-
- if price_precision and price_precision > 0:
- # Convert precision to decimal places
- # precision 0.01 -> 2 decimals, 0.001 -> 3 decimals, etc.
- decimal_places = int(-math.log10(price_precision))
-
- # Cache the result
- self._precision_cache[token] = decimal_places
- logger.debug(f"📊 {token} precision: {price_precision} -> {decimal_places} decimal places")
- return decimal_places
-
- # If no matching symbol found, use smart defaults based on token type
- decimal_places = self._get_default_decimals_for_token(token)
- self._precision_cache[token] = decimal_places
- logger.debug(f"💡 {token} using default precision: {decimal_places} decimal places")
- return decimal_places
-
- except Exception as e:
- logger.error(f"❌ Error getting precision for {token}: {e}")
- self._precision_cache[token] = default_decimals
- return default_decimals
-
- def _get_default_decimals_for_token(self, token: str) -> int:
- """Get smart default decimal places based on token characteristics."""
- token = _normalize_token_case(token)
-
- # High-value tokens (usually need fewer decimals)
- if token in ['BTC', 'ETH', 'BNB', 'SOL', 'ADA', 'DOT', 'AVAX', 'MATIC', 'LINK']:
- return 2
-
- # Mid-range tokens
- if token in ['DOGE', 'XRP', 'LTC', 'BCH', 'ETC', 'FIL', 'AAVE', 'UNI']:
- return 4
-
- # Meme/micro-cap tokens (usually need more decimals)
- if any(meme in token for meme in ['PEPE', 'SHIB', 'DOGE', 'FLOKI', 'BONK', 'WIF']):
- return 6
-
- # Default for unknown tokens
- return 4
-
- def format_price(self, price: float, token: str = None) -> str:
- """
- Format a price with appropriate decimal places.
-
- Args:
- price: Price value to format
- token: Token symbol for precision lookup (optional)
-
- Returns:
- Formatted price string
- """
- try:
- if token:
- decimal_places = self.get_token_decimal_places(token)
- else:
- # Use smart default based on price magnitude
- if price >= 1000:
- decimal_places = 2
- elif price >= 1:
- decimal_places = 3
- elif price >= 0.01:
- decimal_places = 4
- elif price >= 0.0001:
- decimal_places = 6
- else:
- decimal_places = 8
-
- # Format with proper decimal places and thousand separators
- formatted = f"{price:,.{decimal_places}f}"
- return formatted
-
- except Exception as e:
- logger.error(f"❌ Error formatting price {price} for {token}: {e}")
- return f"{price:,.2f}" # Fallback to 2 decimals
-
- def format_price_with_symbol(self, price: float, token: str = None) -> str:
- """
- Format a price with currency symbol and appropriate decimal places.
-
- Args:
- price: Price value to format
- token: Token symbol for precision lookup (optional)
-
- Returns:
- Formatted price string with $ symbol
- """
- formatted_price = self.format_price(price, token)
- return f"${formatted_price}"
-
- def clear_cache(self):
- """Clear the precision cache."""
- self._precision_cache.clear()
- self._markets_cache = None
- logger.info("🗑️ Cleared price formatter cache")
- # Global formatter instance
- _global_formatter: Optional[PriceFormatter] = None
- def set_global_trading_engine(trading_engine):
- """Set the trading engine for the global formatter."""
- global _global_formatter
- _global_formatter = PriceFormatter(trading_engine)
- def get_formatter() -> PriceFormatter:
- """Get the global formatter instance."""
- global _global_formatter
- if _global_formatter is None:
- _global_formatter = PriceFormatter()
- return _global_formatter
- def format_price(price: float, token: str = None) -> str:
- """Convenience function to format price using global formatter."""
- return get_formatter().format_price(price, token)
- def format_price_with_symbol(price: float, token: str = None) -> str:
- """Convenience function to format price with $ symbol using global formatter."""
- return get_formatter().format_price_with_symbol(price, token)
|