import logging from typing import Optional, Dict, Any, List, Tuple from hyperliquid import HyperliquidSync from src.config.config import Config import re import json # Use existing logger setup (will be configured by main application) logger = logging.getLogger(__name__) class HyperliquidClient: """Wrapper class for Hyperliquid API client with enhanced functionality.""" def __init__(self, use_testnet: bool = None): """ Initialize the Hyperliquid client with CCXT-style configuration. Args: use_testnet: Whether to use testnet (default: from Config.HYPERLIQUID_TESTNET) """ # Use config value if not explicitly provided if use_testnet is None: use_testnet = Config.HYPERLIQUID_TESTNET self.use_testnet = use_testnet # Get CCXT-style configuration self.config = Config.get_hyperliquid_config() # Override testnet setting if provided if use_testnet is not None: self.config['testnet'] = use_testnet self.config['sandbox'] = use_testnet # Ensure proper CCXT format # Hyperliquid CCXT expects: apiKey=API_generator_key, walletAddress=wallet_address if not self.config.get('apiKey') and Config.HYPERLIQUID_SECRET_KEY: self.config['apiKey'] = Config.HYPERLIQUID_SECRET_KEY # API generator key if not self.config.get('walletAddress') and Config.HYPERLIQUID_WALLET_ADDRESS: self.config['walletAddress'] = Config.HYPERLIQUID_WALLET_ADDRESS # Wallet address if not self.config.get('secret') and Config.HYPERLIQUID_WALLET_ADDRESS: self.config['secret'] = Config.HYPERLIQUID_WALLET_ADDRESS # Wallet address as secret too # Initialize clients self.sync_client = None self.async_client = None if self.config.get('privateKey') or self.config.get('apiKey'): try: # Log configuration (safely) logger.info(f"🔧 Initializing Hyperliquid client with config: {self._safe_config_log()}") # Initialize with Hyperliquid-specific CCXT format ccxt_config = { 'apiKey': self.config.get('apiKey'), 'privateKey': self.config.get('apiKey'), # Same as apiKey for Hyperliquid 'testnet': self.config.get('testnet', False), 'sandbox': self.config.get('sandbox', False), } # Add walletAddress - this is required by CCXT Hyperliquid if self.config.get('walletAddress'): ccxt_config['walletAddress'] = self.config['walletAddress'] # Add secret if available if self.config.get('secret'): ccxt_config['secret'] = self.config['secret'] logger.info(f"📋 Using CCXT config structure: {self._safe_ccxt_config_log(ccxt_config)}") # Initialize with the proper CCXT format self.sync_client = HyperliquidSync(ccxt_config) logger.info(f"✅ Hyperliquid client initialized successfully") logger.info(f"🌐 Network: {'Testnet' if use_testnet else '🚨 MAINNET 🚨'}") # Test the connection self._test_connection() except Exception as e: logger.error(f"❌ Failed to initialize Hyperliquid client: {e}") logger.error(f"💡 Config used: {self._safe_config_log()}") raise else: logger.warning("⚠️ No private key provided - client will have limited functionality") def _safe_config_log(self) -> Dict[str, Any]: """Return config with sensitive data masked for logging.""" safe_config = self.config.copy() if 'apiKey' in safe_config and safe_config['apiKey']: safe_config['apiKey'] = f"{safe_config['apiKey'][:8]}..." if 'walletAddress' in safe_config and safe_config['walletAddress']: safe_config['walletAddress'] = f"{safe_config['walletAddress'][:8]}..." if 'secret' in safe_config and safe_config['secret']: safe_config['secret'] = f"{safe_config['secret'][:8]}..." return safe_config def _safe_ccxt_config_log(self, config: dict) -> dict: """Return CCXT config with sensitive data masked for logging.""" safe_config = config.copy() if 'apiKey' in safe_config and safe_config['apiKey']: safe_config['apiKey'] = f"{safe_config['apiKey'][:8]}..." if 'privateKey' in safe_config and safe_config['privateKey']: safe_config['privateKey'] = f"{safe_config['privateKey'][:8]}..." if 'walletAddress' in safe_config and safe_config['walletAddress']: safe_config['walletAddress'] = f"{safe_config['walletAddress'][:8]}..." if 'secret' in safe_config and safe_config['secret']: safe_config['secret'] = f"{safe_config['secret'][:8]}..." return safe_config def _test_connection(self): """Test the connection to verify credentials.""" try: # Try to fetch balance to test authentication # Use the same logic as get_balance for consistency params = {} if Config.HYPERLIQUID_WALLET_ADDRESS: wallet_address = Config.HYPERLIQUID_WALLET_ADDRESS params['user'] = f"0x{wallet_address}" if not wallet_address.startswith('0x') else wallet_address balance = self.sync_client.fetch_balance(params=params) logger.info(f"🔗 Connection test successful") except Exception as e: logger.warning(f"⚠️ Connection test failed: {e}") logger.warning("💡 This might be normal if you have no positions/balance") def _extract_error_message(self, exception_obj: Exception) -> str: """Extracts a more specific error message from a Hyperliquid exception.""" error_str = str(exception_obj) # Attempt to parse the JSON-like structure in the error string # Example: hyperliquid {"status":"ok","response":{"type":"order","data":{"statuses":[{"error":"Insufficient margin..."}]}}} try: # Look for the start of the JSON part json_match = re.search(r'{\s*"status":.*}', error_str) if json_match: json_str = json_match.group(0) error_data = json.loads(json_str) if isinstance(error_data, dict): response = error_data.get('response') if isinstance(response, dict): data = response.get('data') if isinstance(data, dict): statuses = data.get('statuses') if isinstance(statuses, list) and statuses: first_status = statuses[0] if isinstance(first_status, dict) and 'error' in first_status: return str(first_status['error']) # Return the specific error except (json.JSONDecodeError, AttributeError, TypeError, IndexError) as parse_error: logger.debug(f"Could not parse detailed Hyperliquid error from string '{error_str}': {parse_error}") # Fallback: Check for common CCXT error types if the above fails or if it's a CCXT error # (ccxt.base.errors.InsufficientFunds, ccxt.base.errors.ExchangeError etc.) # These often have a message attribute or a more direct string representation. if hasattr(exception_obj, 'message') and isinstance(exception_obj.message, str) and exception_obj.message: return exception_obj.message # Generic fallback to the first 150 chars of the exception string # Avoid returning the full "hyperliquid {..." string if parsing failed. prefix_to_remove = "hyperliquid " if error_str.startswith(prefix_to_remove): return error_str[len(prefix_to_remove):].split(',')[0][:150] # Get a cleaner part return error_str[:150] def get_balance(self) -> Optional[Dict[str, Any]]: """Get account balance.""" try: if not self.sync_client: logger.error("❌ Client not initialized") return None # For Hyperliquid, we need to pass the wallet address/user parameter # The user parameter should be the wallet address derived from private key params = {} # If we have a wallet address, use it as the user parameter if Config.HYPERLIQUID_WALLET_ADDRESS: # Extract the wallet address # For CCXT Hyperliquid, the user parameter should be the wallet address wallet_address = Config.HYPERLIQUID_WALLET_ADDRESS if wallet_address.startswith('0x'): # Use the address as the user parameter params['user'] = wallet_address else: # Add 0x prefix if missing params['user'] = f"0x{wallet_address}" logger.debug(f"🔍 Fetching balance with params: {params}") balance = self.sync_client.fetch_balance(params=params) logger.info("✅ Successfully fetched balance") return balance except Exception as e: error_message = self._extract_error_message(e) logger.error(f"❌ Error fetching balance: {error_message} (Full exception: {e})") logger.debug(f"💡 Attempted with params: {params}") return None def get_balance_alternative(self) -> Optional[Dict[str, Any]]: """Alternative balance fetching method trying different approaches.""" try: if not self.sync_client: logger.error("❌ Client not initialized") return None # Try different approaches for balance fetching approaches = [ # Approach 1: No params (original) {}, # Approach 2: Wallet address as user {'user': Config.HYPERLIQUID_WALLET_ADDRESS}, # Approach 3: Wallet address with 0x prefix {'user': f"0x{Config.HYPERLIQUID_WALLET_ADDRESS}" if Config.HYPERLIQUID_WALLET_ADDRESS and not Config.HYPERLIQUID_WALLET_ADDRESS.startswith('0x') else Config.HYPERLIQUID_WALLET_ADDRESS}, # Approach 4: Empty user {'user': ''}, ] for i, params in enumerate(approaches, 1): try: logger.info(f"🔍 Trying approach {i}: {params}") balance = self.sync_client.fetch_balance(params=params) logger.info(f"✅ Approach {i} successful!") return balance except Exception as e: logger.warning(f"⚠️ Approach {i} failed: {e}") continue logger.error("❌ All approaches failed") return None except Exception as e: error_message = self._extract_error_message(e) logger.error(f"❌ Error in alternative balance fetch: {error_message} (Full exception: {e})") return None def get_positions(self, symbol: Optional[str] = None) -> Optional[List[Dict[str, Any]]]: """Get current positions.""" try: if not self.sync_client: logger.error("❌ Client not initialized") return None # Add user parameter for Hyperliquid CCXT compatibility params = {} if Config.HYPERLIQUID_WALLET_ADDRESS: wallet_address = Config.HYPERLIQUID_WALLET_ADDRESS params['user'] = f"0x{wallet_address}" if not wallet_address.startswith('0x') else wallet_address logger.debug(f"🔍 Fetching positions with params: {params}") positions = self.sync_client.fetch_positions([symbol] if symbol else None, params=params) logger.info(f"✅ Successfully fetched positions for {symbol or 'all symbols'}") return positions except Exception as e: error_message = self._extract_error_message(e) logger.error(f"❌ Error fetching positions: {error_message} (Full exception: {e})") logger.debug(f"💡 Attempted with params: {params}") return None def get_market_data(self, symbol: str) -> Optional[Dict[str, Any]]: """Get market data for a symbol, including OHLCV for high/low.""" try: if not self.sync_client: logger.error("❌ Client not initialized") return None ticker = self.sync_client.fetch_ticker(symbol) orderbook = self.sync_client.fetch_order_book(symbol) # Fetch last 24h OHLCV data to get accurate high/low ohlcv = self.sync_client.fetch_ohlcv(symbol, '1d', limit=1) if ohlcv: # CCXT OHLCV format: [timestamp, open, high, low, close, volume] last_day_candle = ohlcv[0] ticker['high'] = last_day_candle[2] ticker['low'] = last_day_candle[3] market_data = { 'ticker': ticker, 'orderbook': orderbook, 'symbol': symbol } logger.info(f"✅ Successfully fetched market data for {symbol}") return market_data except Exception as e: error_message = self._extract_error_message(e) logger.error(f"❌ Error fetching market data for {symbol}: {error_message} (Full exception: {e})") return None def get_candle_data(self, symbol: str, timeframe: str = '1h', limit: int = 100, since: Optional[int] = None) -> Optional[List[List]]: """ Get OHLCV candle data for a symbol. Args: symbol: Trading symbol (e.g., 'BTC/USDC:USDC') timeframe: Timeframe for candles ('1m', '5m', '15m', '1h', '4h', '1d', etc.) limit: Maximum number of candles to return (default: 100) since: Timestamp in milliseconds to start from (optional) Returns: List of OHLCV candles in format: [[timestamp, open, high, low, close, volume], ...] Returns None if error occurs """ try: if not self.sync_client: logger.error("❌ Client not initialized") return None logger.debug(f"🕯️ Fetching {limit} candles for {symbol} ({timeframe})") # Fetch OHLCV data params = {} if since is not None: candles = self.sync_client.fetch_ohlcv(symbol, timeframe, since=since, limit=limit, params=params) else: candles = self.sync_client.fetch_ohlcv(symbol, timeframe, limit=limit, params=params) if candles: logger.info(f"✅ Successfully fetched {len(candles)} candles for {symbol} ({timeframe})") logger.debug(f"📊 Candle data range: {candles[0][0]} to {candles[-1][0]} (timestamps)") else: logger.warning(f"⚠️ No candle data returned for {symbol} ({timeframe})") return candles except Exception as e: error_message = self._extract_error_message(e) logger.error(f"❌ Error fetching candle data for {symbol} ({timeframe}): {error_message} (Full exception: {e})") return None def get_candle_data_formatted(self, symbol: str, timeframe: str = '1h', limit: int = 100, since: Optional[int] = None) -> Optional[List[Dict[str, Any]]]: """ Get OHLCV candle data for a symbol in a formatted dictionary structure. Args: symbol: Trading symbol (e.g., 'BTC/USDC:USDC') timeframe: Timeframe for candles ('1m', '5m', '15m', '1h', '4h', '1d', etc.) limit: Maximum number of candles to return (default: 100) since: Timestamp in milliseconds to start from (optional) Returns: List of candle dictionaries with keys: timestamp, open, high, low, close, volume Returns None if error occurs """ try: # Get raw candle data raw_candles = self.get_candle_data(symbol, timeframe, limit, since) if not raw_candles: return None # Format candles into dictionaries formatted_candles = [] for candle in raw_candles: if len(candle) >= 6: # Ensure we have all OHLCV data formatted_candle = { 'timestamp': candle[0], 'open': candle[1], 'high': candle[2], 'low': candle[3], 'close': candle[4], 'volume': candle[5] } formatted_candles.append(formatted_candle) logger.info(f"✅ Successfully formatted {len(formatted_candles)} candles for {symbol} ({timeframe})") return formatted_candles except Exception as e: error_message = self._extract_error_message(e) logger.error(f"❌ Error formatting candle data for {symbol} ({timeframe}): {error_message} (Full exception: {e})") return None def place_limit_order(self, symbol: str, side: str, amount: float, price: float, params: Optional[Dict] = None) -> Tuple[Optional[Dict[str, Any]], Optional[str]]: """Place a limit order.""" try: if not self.sync_client: logger.error("❌ Client not initialized") return None, "Client not initialized" order_params = params or {} # Add margin mode from config if Config.HYPERLIQUID_MARGIN_MODE: order_params['marginMode'] = Config.HYPERLIQUID_MARGIN_MODE.lower() logger.info(f"Placing limit order: {side} {amount} {symbol} @ {price} with params {order_params}") order = self.sync_client.create_limit_order(symbol, side, amount, price, params=order_params) logger.info(f"✅ Limit order placed successfully: {order}") return order, None except Exception as e: error_message = self._extract_error_message(e) logger.error(f"❌ Error placing limit order: {error_message}") return None, error_message def place_market_order(self, symbol: str, side: str, amount: float, params: Optional[Dict] = None) -> Tuple[Optional[Dict[str, Any]], Optional[str]]: """Place a market order.""" try: if not self.sync_client: logger.error("❌ Client not initialized") return None, "Client not initialized" # Get current market price for slippage calculation ticker = self.sync_client.fetch_ticker(symbol) if not ticker or not ticker.get('last'): error_msg = f"Could not fetch current price for {symbol} to calculate slippage." logger.error(f"❌ {error_msg}") return None, error_msg current_price = ticker['last'] slippage_percent = 0.5 # 0.5% slippage slippage_price = current_price * (1 + slippage_percent / 100) if side == 'buy' else current_price * (1 - slippage_percent / 100) order_params = params or {} # Add margin mode from config if Config.HYPERLIQUID_MARGIN_MODE: order_params['marginMode'] = Config.HYPERLIQUID_MARGIN_MODE.lower() # Hyperliquid requires a price for market orders for slippage protection. # This must be passed as the 'price' argument, not within 'params'. logger.info(f"Placing market order: {side} {amount} {symbol} with slippage price {slippage_price} and params {order_params}") # Use create_market_order for market orders, passing the slippage price explicitly. order = self.sync_client.create_market_order(symbol, side, amount, price=slippage_price, params=order_params) logger.info(f"✅ Market order placed successfully: {order}") return order, None except Exception as e: error_message = self._extract_error_message(e) logger.error(f"❌ Error placing market order: {error_message}") return None, error_message def get_open_orders(self, symbol: Optional[str] = None) -> Optional[List[Dict[str, Any]]]: """Get all open orders for a symbol or all symbols.""" try: if not self.sync_client: logger.error("❌ Client not initialized") return None # Add user parameter for Hyperliquid CCXT compatibility params = {} if Config.HYPERLIQUID_WALLET_ADDRESS: wallet_address = Config.HYPERLIQUID_WALLET_ADDRESS params['user'] = f"0x{wallet_address}" if not wallet_address.startswith('0x') else wallet_address logger.debug(f"🔍 Fetching open orders with params: {params}") orders = self.sync_client.fetch_open_orders(symbol, params=params) logger.info(f"✅ Successfully fetched open orders for {symbol or 'all symbols'}") return orders except Exception as e: error_message = self._extract_error_message(e) logger.error(f"❌ Error fetching open orders: {error_message} (Full exception: {e})") logger.debug(f"💡 Attempted with params: {params}") return None def cancel_order(self, order_id: str, symbol: str, params: Optional[Dict] = None) -> bool: """Cancel an order with CCXT-style parameters.""" try: if not self.sync_client: logger.error("❌ Client not initialized") return False cancel_params = params or {} result = self.sync_client.cancel_order(order_id, symbol, params=cancel_params) logger.info(f"✅ Successfully cancelled order {order_id}") return True except Exception as e: error_message = self._extract_error_message(e) logger.error(f"❌ Error cancelling order {order_id}: {error_message} (Full exception: {e})") return False def get_recent_trades(self, symbol: str, limit: int = 10) -> Optional[List[Dict[str, Any]]]: """Get recent trades for a symbol.""" try: if not self.sync_client: logger.error("❌ Client not initialized") return None trades = self.sync_client.fetch_trades(symbol, limit=limit) logger.info(f"✅ Successfully fetched {len(trades)} recent trades for {symbol}") return trades except Exception as e: error_message = self._extract_error_message(e) logger.error(f"❌ Error fetching recent trades for {symbol}: {error_message} (Full exception: {e})") return None def get_trading_fee(self, symbol: str) -> Optional[Dict[str, Any]]: """Get trading fee for a symbol.""" try: if not self.sync_client: logger.error("❌ Client not initialized") return None fee = self.sync_client.fetch_trading_fee(symbol) logger.info(f"✅ Successfully fetched trading fee for {symbol}") return fee except Exception as e: error_message = self._extract_error_message(e) logger.error(f"❌ Error fetching trading fee for {symbol}: {error_message} (Full exception: {e})") return None async def get_markets(self) -> Optional[Dict[str, Any]]: """Get available markets/symbols.""" try: if not self.sync_client: logger.error("❌ Client not initialized") return None markets = self.sync_client.load_markets() logger.info(f"✅ Successfully loaded {len(markets)} markets") return markets except Exception as e: error_message = self._extract_error_message(e) logger.error(f"❌ Error loading markets: {error_message} (Full exception: {e})") return None def place_stop_loss_order(self, symbol: str, side: str, amount: float, stop_price_arg: float, params: Optional[Dict] = None) -> Tuple[Optional[Dict[str, Any]], Optional[str]]: """ Place a stop loss order (as a stop-market order). Returns a tuple: (order_object, error_message_string). Error_message_string is None on success. Order_object is None on failure. Args: symbol: Trading symbol (e.g., 'BTC/USDC:USDC') side: 'buy' or 'sell' (side of the order to be placed when stop is triggered) amount: Order amount stop_price_arg: The price at which the stop loss triggers params: Additional parameters (mostly unused now, but kept for signature compatibility) """ try: if not self.sync_client: logger.error("❌ Client not initialized") return None, "Client not initialized" # Construct parameters for a trigger order (stop-market) # The main order type is 'market', triggered at stop_price_arg. # The 'price' for the create_order call will be None. trigger_params = { 'trigger': { 'triggerPx': str(stop_price_arg), # The stop price 'isMarket': True, # Execute as a market order when triggered 'tpsl': 'sl' # Indicate it's a stop loss } } # Merge with any incoming params, though trigger_params should take precedence for SL logic if params: trigger_params.update(params) logger.info(f"🛑 Placing STOP-MARKET order: {side} {amount} {symbol} with trigger @ ${stop_price_arg:.4f}") # Pass stop_price_arg as the price parameter for slippage calculation, as required by the exchange for market orders. order = self.sync_client.create_order(symbol, 'market', side, amount, stop_price_arg, trigger_params) logger.info(f"✅ Stop-market order placed successfully: {order}") return order, None except Exception as e: error_message = self._extract_error_message(e) logger.error(f"❌ Error placing stop-market order: {error_message} (Full exception: {e})") return None, error_message def place_take_profit_order(self, symbol: str, side: str, amount: float, take_profit_price_arg: float, params: Optional[Dict] = None) -> Tuple[Optional[Dict[str, Any]], Optional[str]]: """ Place a take profit order (as a limit order, or can be adapted to trigger like SL). Currently places a limit order. For true TP trigger, needs similar logic to SL. Returns a tuple: (order_object, error_message_string). Args: symbol: Trading symbol side: 'buy' or 'sell' amount: Order amount take_profit_price_arg: The price for the take profit order params: Additional parameters """ # TODO: Implement actual take profit trigger logic (similar to stop_loss_order if needed) # For now, it remains a limit order as per previous implementation, # but could be changed to a trigger order: isMarket=False, tpsl='tp'. logger.warning("⚠️ place_take_profit_order currently places a LIMIT order. For triggered TP, it needs updating similar to SL.") try: if not self.sync_client: logger.error("❌ Client not initialized") return None, "Client not initialized" order_params = params or {} logger.info(f"🎯 Placing take profit (LIMIT) order: {side} {amount} {symbol} @ ${take_profit_price_arg}") order = self.sync_client.create_limit_order(symbol, side, amount, take_profit_price_arg, params=order_params) logger.info(f"✅ Successfully placed take profit (LIMIT) order for {amount} {symbol} at ${take_profit_price_arg}") logger.debug(f"📄 Take profit order details: {order}") return order, None except Exception as e: error_message = self._extract_error_message(e) logger.error(f"❌ Error placing take profit (LIMIT) order: {error_message} (Full exception: {e})") return None, error_message def get_recent_fills(self, limit: int = 100) -> Optional[List[Dict[str, Any]]]: """ Get recent fills/trades for the account. Args: limit: Maximum number of fills to return Returns: List of recent fills/trades or None if error """ try: if not self.sync_client: logger.error("❌ Client not initialized") return None # Add user parameter for Hyperliquid CCXT compatibility params = {} if Config.HYPERLIQUID_WALLET_ADDRESS: wallet_address = Config.HYPERLIQUID_WALLET_ADDRESS params['user'] = f"0x{wallet_address}" if not wallet_address.startswith('0x') else wallet_address # Fetch recent trades/fills for the account # Use fetch_my_trades to get account-specific trades logger.debug(f"🔍 Fetching recent fills with params: {params}") # Get recent fills across all symbols # We'll fetch trades for all symbols and merge them try: # Option 1: Try fetch_my_trades if available fills = self.sync_client.fetch_my_trades(None, limit=limit, params=params) logger.info(f"✅ Successfully fetched {len(fills)} recent fills") return fills except AttributeError: # Option 2: If fetch_my_trades not available, try alternative approach logger.debug("fetch_my_trades not available, trying alternative approach") # Get positions to determine active symbols positions = self.get_positions() if not positions: logger.info("No positions found, no recent fills to fetch") return [] # Get symbols from positions symbols = list(set([pos.get('symbol') for pos in positions if pos.get('symbol')])) all_fills = [] for symbol in symbols[:5]: # Limit to 5 symbols to avoid too many requests try: symbol_trades = self.sync_client.fetch_my_trades(symbol, limit=limit//len(symbols), params=params) if symbol_trades: all_fills.extend(symbol_trades) except Exception as e: logger.warning(f"Could not fetch trades for {symbol}: {e}") continue # Sort by timestamp (newest first) all_fills.sort(key=lambda x: x.get('timestamp', ''), reverse=True) # Return only the requested limit result = all_fills[:limit] logger.info(f"✅ Successfully fetched {len(result)} recent fills from {len(symbols)} symbols") return result except Exception as e: error_message = self._extract_error_message(e) logger.error(f"❌ Error fetching recent fills: {error_message} (Full exception: {e})") logger.debug(f"💡 Attempted with params: {params}") return None def set_leverage(self, leverage: int, symbol: str, params: Optional[Dict] = None) -> Tuple[Optional[Dict[str, Any]], Optional[str]]: """Set leverage for a symbol.""" try: if not self.sync_client: logger.error("❌ Client not initialized") return None, "Client not initialized" # CCXT's setLeverage usually requires the symbol response = self.sync_client.setLeverage(leverage, symbol, params=params) logger.info(f"✅ Successfully set leverage to {leverage}x for {symbol}.") return response, None except Exception as e: error_message = self._extract_error_message(e) logger.error(f"❌ Error setting leverage for {symbol}: {error_message}") return None, error_message def fetch_leverage(self, symbol: str, params: Optional[Dict] = None) -> Tuple[Optional[Dict[str, Any]], Optional[str]]: """Fetch leverage for a symbol.""" try: if not self.sync_client: logger.error("❌ Client not initialized") return None, "Client not initialized" # CCXT's fetchLeverage usually requires the symbol leverage_data = self.sync_client.fetchLeverage(symbol, params=params if params else {}) logger.info(f"✅ Successfully fetched leverage for {symbol}.") return leverage_data, None except Exception as e: error_message = self._extract_error_message(e) logger.error(f"❌ Error fetching leverage for {symbol}: {error_message}") return None, error_message