""" Copy Trading Monitor - Tracks and copies trades from a target trader on Hyperliquid """ import logging import time import asyncio from datetime import datetime, timedelta from typing import Dict, List, Optional, Any from dataclasses import dataclass import aiohttp import json from decimal import Decimal, ROUND_DOWN from ..config.config import Config from ..clients.hyperliquid_client import HyperliquidClient from ..notifications.notification_manager import NotificationManager from .copy_trading_state import CopyTradingStateManager @dataclass class TraderPosition: """Represents a position held by the target trader""" coin: str size: float side: str # 'long' or 'short' entry_price: float leverage: float position_value: float unrealized_pnl: float margin_used: float timestamp: int @dataclass class CopyTrade: """Represents a trade to be copied""" coin: str action: str # 'open_long', 'open_short', 'close_long', 'close_short' size: float leverage: float original_trade_hash: str target_trader_address: str timestamp: int class CopyTradingMonitor: """Monitor and copy trades from a target trader""" def __init__(self, client: HyperliquidClient, notification_manager: NotificationManager): self.client = client self.notification_manager = notification_manager self.config = Config() self.logger = logging.getLogger(__name__) # Configuration self.enabled = self.config.COPY_TRADING_ENABLED self.target_address = self.config.COPY_TRADING_TARGET_ADDRESS self.portfolio_percentage = self.config.COPY_TRADING_PORTFOLIO_PERCENTAGE self.copy_mode = self.config.COPY_TRADING_MODE self.max_leverage = self.config.COPY_TRADING_MAX_LEVERAGE self.min_position_size = self.config.COPY_TRADING_MIN_POSITION_SIZE self.execution_delay = self.config.COPY_TRADING_EXECUTION_DELAY self.notifications_enabled = self.config.COPY_TRADING_NOTIFICATIONS # State management for persistence and tracking self.state_manager = CopyTradingStateManager() # Override enabled status from state if different from config if self.state_manager.is_enabled() and self.target_address: self.enabled = True # State tracking (legacy, kept for compatibility) self.target_positions: Dict[str, TraderPosition] = {} self.our_positions: Dict[str, Any] = {} self.last_check_time = 0 self.pending_trades: List[CopyTrade] = [] # API endpoints self.info_url = "https://api.hyperliquid.xyz/info" self.logger.info(f"Copy Trading Monitor initialized - Target: {self.target_address}") self.logger.info(f"๐Ÿ“Š Configuration:") self.logger.info(f" - Enabled: {self.enabled}") self.logger.info(f" - Target Address: {self.target_address}") self.logger.info(f" - Portfolio %: {self.portfolio_percentage:.1%}") self.logger.info(f" - Copy Mode: {self.copy_mode}") self.logger.info(f" - Max Leverage: {self.max_leverage}x") self.logger.info(f" - Min Position Size: ${self.min_position_size:.2f}") self.logger.info(f" - Execution Delay: {self.execution_delay}s") self.logger.info(f" - Notifications: {self.notifications_enabled}") # Load previous session info if available session_info = self.state_manager.get_session_info() if session_info['start_time']: self.logger.info(f"๐Ÿ“… Previous session started: {session_info['start_time']}") self.logger.info(f"๐Ÿ“Š Tracked positions: {session_info['tracked_positions_count']}") self.logger.info(f"๐Ÿ”„ Copied trades: {session_info['copied_trades_count']}") async def start_monitoring(self): """Start the copy trading monitoring loop""" if not self.enabled: self.logger.info("Copy trading is disabled") return if not self.target_address: self.logger.error("No target trader address configured") return self.logger.info(f"Starting copy trading monitor for {self.target_address}") try: # Start state tracking (using async version to prevent blocking) await self.state_manager.start_copy_trading_async(self.target_address) # Get current target positions for initialization (with timeout) try: current_positions = await asyncio.wait_for( self.get_target_positions(), timeout=15.0 # 15 second timeout for initialization ) except asyncio.TimeoutError: self.logger.warning("Timeout during initialization - will retry in monitoring loop") current_positions = None except Exception as e: self.logger.error(f"Error during initialization: {e}") current_positions = None if current_positions: # Check if this is a fresh start or resuming if not self.state_manager.get_tracked_positions(): # Fresh start - initialize tracking but don't copy existing positions self.logger.info("๐Ÿ†• Fresh start - initializing with existing positions (won't copy)") await self.state_manager.initialize_tracked_positions_async(current_positions) startup_message = ( f"๐Ÿ”„ Copy Trading Started (Fresh)\n" f"Target: {self.target_address[:10]}...\n" f"Portfolio Allocation: {self.portfolio_percentage:.1%}\n" f"Mode: {self.copy_mode}\n" f"Max Leverage: {self.max_leverage}x\n\n" f"๐Ÿ“Š Found {len(current_positions)} existing positions\n" f"โš ๏ธ Will only copy NEW trades from now on" ) else: # Resuming - continue from where we left off tracked_count = len(self.state_manager.get_tracked_positions()) self.logger.info(f"โ–ถ๏ธ Resuming session - {tracked_count} positions tracked") startup_message = ( f"โ–ถ๏ธ Copy Trading Resumed\n" f"Target: {self.target_address[:10]}...\n" f"Portfolio Allocation: {self.portfolio_percentage:.1%}\n" f"Mode: {self.copy_mode}\n" f"Max Leverage: {self.max_leverage}x\n\n" f"๐Ÿ“Š Resuming with {tracked_count} tracked positions" ) else: startup_message = ( f"๐Ÿ”„ Copy Trading Started\n" f"Target: {self.target_address[:10]}...\n" f"Portfolio Allocation: {self.portfolio_percentage:.1%}\n" f"Mode: {self.copy_mode}\n" f"Max Leverage: {self.max_leverage}x\n\n" f"โš ๏ธ Could not access target trader positions during startup" ) # Send startup notification if self.notifications_enabled: try: await asyncio.wait_for( self.notification_manager.send_generic_notification(startup_message), timeout=5.0 ) except Exception as e: self.logger.error(f"Error sending startup notification: {e}") # Initial sync (non-blocking) try: await asyncio.wait_for(self.sync_positions(), timeout=10.0) except Exception as e: self.logger.error(f"Error during initial sync: {e}") # Start monitoring loop while self.enabled and self.state_manager.is_enabled(): try: await self.monitor_cycle() await asyncio.sleep(30) # Check every 30 seconds except Exception as e: self.logger.error(f"Error in copy trading monitor cycle: {e}") await asyncio.sleep(60) # Wait longer on error except Exception as e: self.logger.error(f"Fatal error in copy trading monitor: {e}") self.enabled = False async def monitor_cycle(self): """Single monitoring cycle""" try: # Get target trader's current positions with timeout try: new_positions = await asyncio.wait_for( self.get_target_positions(), timeout=15.0 # 15 second timeout ) except asyncio.TimeoutError: self.logger.warning("Timeout getting target positions - skipping this cycle") return except Exception as e: self.logger.error(f"Error getting target positions: {e}") return if new_positions is None: return # Compare with previous positions to detect changes try: position_changes = await self.detect_position_changes(new_positions) except Exception as e: self.logger.error(f"Error detecting position changes: {e}") return # Execute any detected trades for trade in position_changes: try: await asyncio.wait_for( self.execute_copy_trade(trade), timeout=30.0 # 30 second timeout per trade ) except asyncio.TimeoutError: self.logger.error(f"Timeout executing copy trade for {trade.coin}") except Exception as e: self.logger.error(f"Error executing copy trade for {trade.coin}: {e}") # Update our tracking self.target_positions = new_positions # Update last check timestamp (async version) await self.state_manager.update_last_check_async() except Exception as e: self.logger.error(f"Error in monitor cycle: {e}") async def get_target_positions(self) -> Optional[Dict[str, TraderPosition]]: """Get current positions of target trader""" try: payload = { "type": "clearinghouseState", "user": self.target_address } # Use timeout to prevent blocking timeout = aiohttp.ClientTimeout(total=10.0) # 10 second timeout async with aiohttp.ClientSession(timeout=timeout) as session: async with session.post(self.info_url, json=payload) as response: if response.status != 200: self.logger.error(f"Failed to get target positions: {response.status}") return None data = await response.json() positions = {} # Parse asset positions for asset_pos in data.get('assetPositions', []): if asset_pos.get('type') == 'oneWay': pos = asset_pos['position'] coin = pos['coin'] size = float(pos['szi']) if abs(size) < 0.001: # Skip dust positions continue side = 'long' if size > 0 else 'short' positions[coin] = TraderPosition( coin=coin, size=abs(size), side=side, entry_price=float(pos['entryPx']), leverage=float(pos['leverage']['value']), position_value=float(pos['positionValue']), unrealized_pnl=float(pos['unrealizedPnl']), margin_used=float(pos['marginUsed']), timestamp=int(time.time() * 1000) ) return positions except asyncio.TimeoutError: self.logger.warning("Timeout getting target positions - will retry next cycle") return None except Exception as e: self.logger.error(f"Error getting target positions: {e}") return None async def detect_position_changes(self, new_positions: Dict[str, TraderPosition]) -> List[CopyTrade]: """Detect changes in target trader's positions using state manager""" trades = [] # Check for new positions and position increases for coin, new_pos in new_positions.items(): position_data = { 'size': new_pos.size, 'side': new_pos.side, 'entry_price': new_pos.entry_price, 'leverage': new_pos.leverage } # Check if this is a new position we should copy if self.state_manager.should_copy_position(coin, position_data): tracked_pos = self.state_manager.get_tracked_positions().get(coin) if tracked_pos is None: # Completely new position action = f"open_{new_pos.side}" copy_size = new_pos.size self.logger.info(f"๐Ÿ†• Detected NEW position: {action} {copy_size} {coin} at {new_pos.leverage}x") else: # Position increase size_increase = new_pos.size - tracked_pos['size'] action = f"add_{new_pos.side}" copy_size = size_increase self.logger.info(f"๐Ÿ“ˆ Detected position increase: {action} {size_increase} {coin}") # Create trade to copy trade_id = f"{coin}_{action}_{new_pos.timestamp}" if not self.state_manager.has_copied_trade(trade_id): trades.append(CopyTrade( coin=coin, action=action, size=copy_size, leverage=new_pos.leverage, original_trade_hash=trade_id, target_trader_address=self.target_address, timestamp=new_pos.timestamp )) # Check for position reductions elif self.state_manager.is_position_reduction(coin, position_data): tracked_pos = self.state_manager.get_tracked_positions()[coin] size_decrease = tracked_pos['size'] - new_pos.size action = f"reduce_{new_pos.side}" trade_id = f"{coin}_{action}_{new_pos.timestamp}" if not self.state_manager.has_copied_trade(trade_id): trades.append(CopyTrade( coin=coin, action=action, size=size_decrease, leverage=new_pos.leverage, original_trade_hash=trade_id, target_trader_address=self.target_address, timestamp=new_pos.timestamp )) self.logger.info(f"๐Ÿ“‰ Detected position decrease: {action} {size_decrease} {coin}") # Update tracking regardless await self.state_manager.update_tracked_position_async(coin, position_data) # Check for closed positions (exits) tracked_positions = self.state_manager.get_tracked_positions() for coin in list(tracked_positions.keys()): if coin not in new_positions: # Position fully closed tracked_pos = tracked_positions[coin] action = f"close_{tracked_pos['side']}" trade_id = f"{coin}_{action}_{int(time.time() * 1000)}" if not self.state_manager.has_copied_trade(trade_id): trades.append(CopyTrade( coin=coin, action=action, size=tracked_pos['size'], leverage=tracked_pos['leverage'], original_trade_hash=trade_id, target_trader_address=self.target_address, timestamp=int(time.time() * 1000) )) self.logger.info(f"โŒ Detected position closure: {action} {tracked_pos['size']} {coin}") # Remove from tracking await self.state_manager.remove_tracked_position_async(coin) # Update last check time (already updated in monitor_cycle, so skip here) return trades async def execute_copy_trade(self, trade: CopyTrade): """Execute a copy trade""" try: # Check if we've already copied this trade if self.state_manager.has_copied_trade(trade.original_trade_hash): self.logger.debug(f"Skipping already copied trade: {trade.original_trade_hash}") return # Get current price for the asset symbol = f"{trade.coin}/USDC:USDC" try: market_data = await asyncio.to_thread(self.client.get_market_data, symbol) if not market_data or not market_data.get('ticker'): self.logger.error(f"โŒ Could not get market data for {trade.coin}") await self.state_manager.add_copied_trade_async(trade.original_trade_hash) return current_price = float(market_data['ticker'].get('last', 0)) if current_price <= 0: self.logger.error(f"โŒ Invalid price for {trade.coin}: {current_price}") await self.state_manager.add_copied_trade_async(trade.original_trade_hash) return except Exception as e: self.logger.error(f"โŒ Error getting price for {trade.coin}: {e}") await self.state_manager.add_copied_trade_async(trade.original_trade_hash) return # Apply leverage limit leverage = min(trade.leverage, self.max_leverage) # Calculate our position size with proper leverage handling position_calc = await self.calculate_position_size(trade, current_price, leverage) if position_calc['margin_to_use'] < self.min_position_size: self.logger.info(f"Skipping {trade.coin} trade - margin too small: ${position_calc['margin_to_use']:.2f} (min: ${self.min_position_size:.2f})") # Still mark as copied to avoid retrying await self.state_manager.add_copied_trade_async(trade.original_trade_hash) return # Add execution delay await asyncio.sleep(self.execution_delay) # Execute the trade success = await self._execute_hyperliquid_trade(trade, position_calc, leverage) # Mark trade as copied (whether successful or not to avoid retrying) await self.state_manager.add_copied_trade_async(trade.original_trade_hash) # Send notification if self.notifications_enabled: status = "โœ… SUCCESS" if success else "โŒ FAILED" await self.notification_manager.send_generic_notification( f"๐Ÿ”„ Copy Trade {status}\n" f"Action: {trade.action}\n" f"Asset: {trade.coin}\n" f"๐Ÿ’ณ Margin Used: ${position_calc['margin_to_use']:.2f}\n" f"๐Ÿฆ Position Value: ${position_calc['position_value']:.2f}\n" f"๐Ÿช™ Token Amount: {position_calc['token_amount']:.6f}\n" f"โš–๏ธ Leverage: {leverage}x\n" f"Target: {trade.target_trader_address[:10]}...\n" f"Trade ID: {trade.original_trade_hash[:16]}..." ) except Exception as e: self.logger.error(f"Error executing copy trade for {trade.coin}: {e}") # Mark as copied even on error to avoid infinite retries await self.state_manager.add_copied_trade_async(trade.original_trade_hash) if self.notifications_enabled: await self.notification_manager.send_generic_notification( f"โŒ Copy Trade Error\n" f"Asset: {trade.coin}\n" f"Action: {trade.action}\n" f"Error: {str(e)[:100]}\n" f"Trade ID: {trade.original_trade_hash[:16]}..." ) async def calculate_position_size(self, trade: CopyTrade, current_price: float, leverage: float) -> Dict[str, float]: """ Calculate our position size based on the copy trading mode. Returns margin allocation and converts to actual token amount. Args: trade: The copy trade to execute current_price: Current price of the asset leverage: Leverage to use for the trade Returns: Dict with 'margin_to_use', 'position_value', 'token_amount' """ try: # Get our current account balance our_balance = await self.get_our_account_balance() self.logger.info(f"๐Ÿ“Š Our account balance: ${our_balance:.2f}") self.logger.info(f"๐ŸŽฏ Portfolio percentage: {self.portfolio_percentage:.1%}") self.logger.info(f"๐Ÿ“ˆ Copy mode: {self.copy_mode}") self.logger.info(f"๐Ÿ’ฐ Current {trade.coin} price: ${current_price:.4f}") self.logger.info(f"โš–๏ธ Leverage to use: {leverage:.1f}x") # Calculate the MARGIN we want to allocate (risk-based) margin_to_use = 0.0 if self.copy_mode == 'FIXED': # Fixed percentage of our account as margin margin_to_use = our_balance * self.portfolio_percentage self.logger.info(f"๐Ÿ”ข FIXED mode - margin to allocate: ${margin_to_use:.2f}") elif self.copy_mode == 'PROPORTIONAL': # Get target trader's account balance target_balance = await self.get_target_account_balance() self.logger.info(f"๐ŸŽฏ Target balance: ${target_balance:.2f}") if target_balance <= 0: margin_to_use = our_balance * self.portfolio_percentage self.logger.info(f"๐Ÿ”ข PROPORTIONAL mode (fallback) - margin: ${margin_to_use:.2f}") else: # Calculate target trader's margin percentage target_pos = self.target_positions.get(trade.coin) if not target_pos: margin_to_use = our_balance * self.portfolio_percentage self.logger.info(f"๐Ÿ”ข PROPORTIONAL mode (no target pos) - margin: ${margin_to_use:.2f}") else: target_margin_percentage = target_pos.margin_used / target_balance self.logger.info(f"๐Ÿ“Š Target margin percentage: {target_margin_percentage:.1%}") # Apply same margin percentage to our account our_proportional_margin = our_balance * target_margin_percentage # Cap at our portfolio percentage limit max_margin = our_balance * self.portfolio_percentage margin_to_use = min(our_proportional_margin, max_margin) self.logger.info(f"๐Ÿ”ข PROPORTIONAL mode - uncapped: ${our_proportional_margin:.2f}, max: ${max_margin:.2f}, final: ${margin_to_use:.2f}") else: margin_to_use = our_balance * self.portfolio_percentage self.logger.info(f"๐Ÿ”ข Unknown mode (fallback) - margin: ${margin_to_use:.2f}") # Calculate position value with leverage position_value = margin_to_use * leverage # Calculate token amount based on current price token_amount = position_value / current_price result = { 'margin_to_use': margin_to_use, 'position_value': position_value, 'token_amount': token_amount } self.logger.info(f"๐Ÿ“Š Position calculation:") self.logger.info(f" ๐Ÿ’ณ Margin to use: ${margin_to_use:.2f}") self.logger.info(f" ๐Ÿฆ Position value (with {leverage:.1f}x): ${position_value:.2f}") self.logger.info(f" ๐Ÿช™ Token amount: {token_amount:.6f} {trade.coin}") return result except Exception as e: self.logger.error(f"Error calculating position size: {e}") # Fallback to fixed percentage our_balance = await self.get_our_account_balance() fallback_margin = our_balance * self.portfolio_percentage fallback_value = fallback_margin * leverage fallback_tokens = fallback_value / current_price result = { 'margin_to_use': fallback_margin, 'position_value': fallback_value, 'token_amount': fallback_tokens } self.logger.info(f"๐Ÿ”ข Error fallback - margin: ${fallback_margin:.2f}, tokens: {fallback_tokens:.6f}") return result async def get_our_account_balance(self) -> float: """Get our account balance""" try: balance_info = await asyncio.to_thread(self.client.get_balance) self.logger.info(f"๐Ÿ” Raw balance info: {balance_info}") if balance_info: # Use the same approach as the /balance command usdc_total = 0.0 usdc_free = 0.0 usdc_used = 0.0 if 'USDC' in balance_info.get('total', {}): usdc_total = float(balance_info['total']['USDC']) usdc_free = float(balance_info.get('free', {}).get('USDC', 0)) usdc_used = float(balance_info.get('used', {}).get('USDC', 0)) self.logger.info(f"๐Ÿ’ฐ USDC Balance - Total: ${usdc_total:.2f}, Free: ${usdc_free:.2f}, Used: ${usdc_used:.2f}") if usdc_total > 0: self.logger.info(f"๐Ÿ“Š Using total USDC balance: ${usdc_total:.2f}") return usdc_total else: self.logger.warning(f"โš ๏ธ No USDC balance found - raw response: {balance_info}") return 0.0 else: self.logger.warning("โš ๏ธ No balance info returned") return 0.0 except Exception as e: self.logger.error(f"Error getting our account balance: {e}") return 0.0 async def get_target_account_balance(self) -> float: """Get target trader's account balance""" try: payload = { "type": "clearinghouseState", "user": self.target_address } # Use timeout to prevent blocking timeout = aiohttp.ClientTimeout(total=10.0) # 10 second timeout async with aiohttp.ClientSession(timeout=timeout) as session: async with session.post(self.info_url, json=payload) as response: if response.status == 200: data = await response.json() return float(data.get('marginSummary', {}).get('accountValue', 0)) else: return 0.0 except asyncio.TimeoutError: self.logger.warning("Timeout getting target account balance") return 0.0 except Exception as e: self.logger.error(f"Error getting target account balance: {e}") return 0.0 async def _execute_hyperliquid_trade(self, trade: CopyTrade, position_calc: Dict[str, float], leverage: float) -> bool: """Execute trade on Hyperliquid""" try: # Determine if this is a buy or sell order is_buy = 'long' in trade.action or ('close' in trade.action and 'short' in trade.action) side = 'buy' if is_buy else 'sell' # Extract values from position calculation token_amount = position_calc['token_amount'] margin_used = position_calc['margin_to_use'] position_value = position_calc['position_value'] self.logger.info(f"๐Ÿ”„ Executing {trade.action} for {trade.coin}:") self.logger.info(f" ๐Ÿ“Š Side: {side}") self.logger.info(f" ๐Ÿช™ Token Amount: {token_amount:.6f} {trade.coin}") self.logger.info(f" ๐Ÿ’ณ Margin: ${margin_used:.2f}") self.logger.info(f" ๐Ÿฆ Position Value: ${position_value:.2f}") self.logger.info(f" โš–๏ธ Leverage: {leverage}x") # For position opening/closing/modifying - all use market orders if 'open' in trade.action or 'add' in trade.action: # Open new position or add to existing position symbol = f"{trade.coin}/USDC:USDC" result, error = await asyncio.to_thread( self.client.place_market_order, symbol=symbol, side=side, amount=token_amount ) if error: self.logger.error(f"โŒ Market order failed: {error}") return False elif 'close' in trade.action: # Close existing position - we need to place an opposite market order # Get current position to determine the exact size to close symbol = f"{trade.coin}/USDC:USDC" positions = await asyncio.to_thread(self.client.get_positions, symbol=symbol) if not positions: self.logger.warning(f"โš ๏ธ No position found for {trade.coin} to close") return False # Find the position to close position_to_close = None for pos in positions: if pos.get('symbol') == symbol and float(pos.get('contracts', 0)) != 0: position_to_close = pos break if not position_to_close: self.logger.warning(f"โš ๏ธ No open position found for {trade.coin}") return False # Determine the opposite side to close the position current_side = 'long' if float(position_to_close.get('contracts', 0)) > 0 else 'short' close_side = 'sell' if current_side == 'long' else 'buy' close_size = abs(float(position_to_close.get('contracts', 0))) self.logger.info(f"๐Ÿ“‰ Closing {current_side} position: {close_side} {close_size} {trade.coin}") result, error = await asyncio.to_thread( self.client.place_market_order, symbol=symbol, side=close_side, amount=close_size ) if error: self.logger.error(f"โŒ Close order failed: {error}") return False elif 'reduce' in trade.action: # Reduce existing position reduce_side = 'sell' if 'long' in trade.action else 'buy' symbol = f"{trade.coin}/USDC:USDC" result, error = await asyncio.to_thread( self.client.place_market_order, symbol=symbol, side=reduce_side, amount=token_amount ) if error: self.logger.error(f"โŒ Reduce order failed: {error}") return False else: self.logger.error(f"โŒ Unknown trade action: {trade.action}") return False # Check if result indicates success if result: self.logger.info(f"โœ… Successfully executed copy trade: {trade.action}") self.logger.info(f" ๐Ÿช™ {token_amount:.6f} {trade.coin} (${position_value:.2f} value, ${margin_used:.2f} margin)") return True else: self.logger.error(f"โŒ Failed to execute copy trade - no result returned") return False except Exception as e: self.logger.error(f"โŒ Error executing Hyperliquid trade: {e}") return False async def sync_positions(self): """Sync our current positions with tracking""" try: # Get our current positions positions = self.client.get_positions() if positions: self.our_positions = {pos['symbol']: pos for pos in positions} else: self.our_positions = {} # Get target positions for initial sync self.target_positions = await self.get_target_positions() or {} self.logger.info(f"Synced positions - Target: {len(self.target_positions)}, Ours: {len(self.our_positions)}") except Exception as e: self.logger.error(f"Error syncing positions: {e}") async def stop_monitoring(self): """Stop copy trading monitoring""" self.enabled = False await self.state_manager.stop_copy_trading_async() self.logger.info("Copy trading monitor stopped") if self.notifications_enabled: session_info = self.state_manager.get_session_info() duration_str = "" if session_info['session_duration_seconds']: duration_hours = session_info['session_duration_seconds'] / 3600 duration_str = f"\nSession duration: {duration_hours:.1f} hours" await self.notification_manager.send_generic_notification( f"๐Ÿ›‘ Copy Trading Stopped\n" f"๐Ÿ“Š Tracked positions: {session_info['tracked_positions_count']}\n" f"๐Ÿ”„ Copied trades: {session_info['copied_trades_count']}" + duration_str + f"\n\n๐Ÿ’พ State saved - can resume later" ) async def test_balance_fetching(self) -> Dict[str, Any]: """Test balance fetching and position sizing for debugging purposes""" try: our_balance = await self.get_our_account_balance() target_balance = await self.get_target_account_balance() # Test with a mock trade for SOL test_price = 150.0 # Mock SOL price test_leverage = min(10.0, self.max_leverage) # Test leverage mock_trade = CopyTrade( coin="SOL", action="open_long", size=100, leverage=test_leverage, original_trade_hash="test_hash", target_trader_address=self.target_address or "test_address", timestamp=int(time.time() * 1000) ) # Calculate position with leverage position_calc = await self.calculate_position_size(mock_trade, test_price, test_leverage) result = { 'our_balance': our_balance, 'target_balance': target_balance, 'portfolio_percentage': self.portfolio_percentage, 'test_price': test_price, 'test_leverage': test_leverage, 'margin_to_use': position_calc['margin_to_use'], 'position_value': position_calc['position_value'], 'token_amount': position_calc['token_amount'], 'min_position_size': self.min_position_size, 'would_execute': position_calc['margin_to_use'] >= self.min_position_size, 'config_enabled': self.enabled, 'state_enabled': self.state_manager.is_enabled() } self.logger.info(f"๐Ÿงช Balance & Leverage test results:") self.logger.info(f" ๐Ÿ’ฐ Our balance: ${our_balance:.2f}") self.logger.info(f" ๐ŸŽฏ Target balance: ${target_balance:.2f}") self.logger.info(f" ๐Ÿ“Š Portfolio allocation: {self.portfolio_percentage:.1%}") self.logger.info(f" โš–๏ธ Test leverage: {test_leverage:.1f}x") self.logger.info(f" ๐Ÿ’ณ Margin to use: ${position_calc['margin_to_use']:.2f}") self.logger.info(f" ๐Ÿฆ Position value: ${position_calc['position_value']:.2f}") self.logger.info(f" ๐Ÿช™ Token amount: {position_calc['token_amount']:.6f} SOL") self.logger.info(f" โœ… Would execute: {position_calc['margin_to_use'] >= self.min_position_size}") return result except Exception as e: self.logger.error(f"โŒ Error in balance test: {e}") return {'error': str(e)} def get_status(self) -> Dict[str, Any]: """Get current copy trading status""" session_info = self.state_manager.get_session_info() return { 'enabled': self.enabled and self.state_manager.is_enabled(), 'target_address': self.target_address, 'portfolio_percentage': self.portfolio_percentage, 'copy_mode': self.copy_mode, 'max_leverage': self.max_leverage, 'target_positions': len(self.target_positions), 'our_positions': len(self.our_positions), 'tracked_positions': session_info['tracked_positions_count'], 'copied_trades': session_info['copied_trades_count'], 'session_start_time': session_info['start_time'], 'session_duration_hours': session_info['session_duration_seconds'] / 3600 if session_info['session_duration_seconds'] else None, 'last_check': session_info['last_check_time'] }