""" Copy Trading Monitor - Tracks and copies trades from a target trader on Hyperliquid """ import logging import time import asyncio from datetime import datetime, timedelta from typing import Dict, List, Optional, Any from dataclasses import dataclass import aiohttp import json from decimal import Decimal, ROUND_DOWN from ..config.config import Config from ..clients.hyperliquid_client import HyperliquidClient from ..notifications.notification_manager import NotificationManager from .copy_trading_state import CopyTradingStateManager @dataclass class TraderPosition: """Represents a position held by the target trader""" coin: str size: float side: str # 'long' or 'short' entry_price: float leverage: float position_value: float unrealized_pnl: float margin_used: float timestamp: int @dataclass class CopyTrade: """Represents a trade to be copied""" coin: str action: str # 'open_long', 'open_short', 'close_long', 'close_short' size: float leverage: float original_trade_hash: str target_trader_address: str timestamp: int class CopyTradingMonitor: """Monitor and copy trades from a target trader""" def __init__(self, client: HyperliquidClient, notification_manager: NotificationManager): self.client = client self.notification_manager = notification_manager self.config = Config() self.logger = logging.getLogger(__name__) # Configuration self.enabled = self.config.COPY_TRADING_ENABLED self.target_address = self.config.COPY_TRADING_TARGET_ADDRESS self.portfolio_percentage = self.config.COPY_TRADING_PORTFOLIO_PERCENTAGE self.copy_mode = self.config.COPY_TRADING_MODE self.max_leverage = self.config.COPY_TRADING_MAX_LEVERAGE self.min_position_size = self.config.COPY_TRADING_MIN_POSITION_SIZE self.execution_delay = self.config.COPY_TRADING_EXECUTION_DELAY self.notifications_enabled = self.config.COPY_TRADING_NOTIFICATIONS # State management for persistence and tracking self.state_manager = CopyTradingStateManager() # Override enabled status from state if different from config if self.state_manager.is_enabled() and self.target_address: self.enabled = True # State tracking (legacy, kept for compatibility) self.target_positions: Dict[str, TraderPosition] = {} self.our_positions: Dict[str, Any] = {} self.last_check_time = 0 self.pending_trades: List[CopyTrade] = [] # API endpoints self.info_url = "https://api.hyperliquid.xyz/info" self.logger.info(f"Copy Trading Monitor initialized - Target: {self.target_address}") # Load previous session info if available session_info = self.state_manager.get_session_info() if session_info['start_time']: self.logger.info(f"šŸ“… Previous session started: {session_info['start_time']}") self.logger.info(f"šŸ“Š Tracked positions: {session_info['tracked_positions_count']}") self.logger.info(f"šŸ”„ Copied trades: {session_info['copied_trades_count']}") async def start_monitoring(self): """Start the copy trading monitoring loop""" if not self.enabled: self.logger.info("Copy trading is disabled") return if not self.target_address: self.logger.error("No target trader address configured") return self.logger.info(f"Starting copy trading monitor for {self.target_address}") # Start state tracking self.state_manager.start_copy_trading(self.target_address) # Get current target positions for initialization current_positions = await self.get_target_positions() if current_positions: # Check if this is a fresh start or resuming if not self.state_manager.get_tracked_positions(): # Fresh start - initialize tracking but don't copy existing positions self.logger.info("šŸ†• Fresh start - initializing with existing positions (won't copy)") self.state_manager.initialize_tracked_positions(current_positions) startup_message = ( f"šŸ”„ Copy Trading Started (Fresh)\n" f"Target: {self.target_address[:10]}...\n" f"Portfolio Allocation: {self.portfolio_percentage:.1%}\n" f"Mode: {self.copy_mode}\n" f"Max Leverage: {self.max_leverage}x\n\n" f"šŸ“Š Found {len(current_positions)} existing positions\n" f"āš ļø Will only copy NEW trades from now on" ) else: # Resuming - continue from where we left off tracked_count = len(self.state_manager.get_tracked_positions()) self.logger.info(f"ā–¶ļø Resuming session - {tracked_count} positions tracked") startup_message = ( f"ā–¶ļø Copy Trading Resumed\n" f"Target: {self.target_address[:10]}...\n" f"Portfolio Allocation: {self.portfolio_percentage:.1%}\n" f"Mode: {self.copy_mode}\n" f"Max Leverage: {self.max_leverage}x\n\n" f"šŸ“Š Resuming with {tracked_count} tracked positions" ) else: startup_message = ( f"šŸ”„ Copy Trading Started\n" f"Target: {self.target_address[:10]}...\n" f"Portfolio Allocation: {self.portfolio_percentage:.1%}\n" f"Mode: {self.copy_mode}\n" f"Max Leverage: {self.max_leverage}x\n\n" f"āš ļø Could not access target trader positions" ) # Send startup notification if self.notifications_enabled: await self.notification_manager.send_message(startup_message) # Initial sync await self.sync_positions() # Start monitoring loop while self.enabled and self.state_manager.is_enabled(): try: await self.monitor_cycle() await asyncio.sleep(30) # Check every 30 seconds except Exception as e: self.logger.error(f"Error in copy trading monitor cycle: {e}") await asyncio.sleep(60) # Wait longer on error async def monitor_cycle(self): """Single monitoring cycle""" try: # Get target trader's current positions new_positions = await self.get_target_positions() if new_positions is None: return # Compare with previous positions to detect changes position_changes = self.detect_position_changes(new_positions) # Execute any detected trades for trade in position_changes: await self.execute_copy_trade(trade) # Update our tracking self.target_positions = new_positions except Exception as e: self.logger.error(f"Error in monitor cycle: {e}") async def get_target_positions(self) -> Optional[Dict[str, TraderPosition]]: """Get current positions of target trader""" try: payload = { "type": "clearinghouseState", "user": self.target_address } async with aiohttp.ClientSession() as session: async with session.post(self.info_url, json=payload) as response: if response.status != 200: self.logger.error(f"Failed to get target positions: {response.status}") return None data = await response.json() positions = {} # Parse asset positions for asset_pos in data.get('assetPositions', []): if asset_pos.get('type') == 'oneWay': pos = asset_pos['position'] coin = pos['coin'] size = float(pos['szi']) if abs(size) < 0.001: # Skip dust positions continue side = 'long' if size > 0 else 'short' positions[coin] = TraderPosition( coin=coin, size=abs(size), side=side, entry_price=float(pos['entryPx']), leverage=float(pos['leverage']['value']), position_value=float(pos['positionValue']), unrealized_pnl=float(pos['unrealizedPnl']), margin_used=float(pos['marginUsed']), timestamp=int(time.time() * 1000) ) return positions except Exception as e: self.logger.error(f"Error getting target positions: {e}") return None def detect_position_changes(self, new_positions: Dict[str, TraderPosition]) -> List[CopyTrade]: """Detect changes in target trader's positions using state manager""" trades = [] # Check for new positions and position increases for coin, new_pos in new_positions.items(): position_data = { 'size': new_pos.size, 'side': new_pos.side, 'entry_price': new_pos.entry_price, 'leverage': new_pos.leverage } # Check if this is a new position we should copy if self.state_manager.should_copy_position(coin, position_data): tracked_pos = self.state_manager.get_tracked_positions().get(coin) if tracked_pos is None: # Completely new position action = f"open_{new_pos.side}" copy_size = new_pos.size self.logger.info(f"šŸ†• Detected NEW position: {action} {copy_size} {coin} at {new_pos.leverage}x") else: # Position increase size_increase = new_pos.size - tracked_pos['size'] action = f"add_{new_pos.side}" copy_size = size_increase self.logger.info(f"šŸ“ˆ Detected position increase: {action} {size_increase} {coin}") # Create trade to copy trade_id = f"{coin}_{action}_{new_pos.timestamp}" if not self.state_manager.has_copied_trade(trade_id): trades.append(CopyTrade( coin=coin, action=action, size=copy_size, leverage=new_pos.leverage, original_trade_hash=trade_id, target_trader_address=self.target_address, timestamp=new_pos.timestamp )) # Check for position reductions elif self.state_manager.is_position_reduction(coin, position_data): tracked_pos = self.state_manager.get_tracked_positions()[coin] size_decrease = tracked_pos['size'] - new_pos.size action = f"reduce_{new_pos.side}" trade_id = f"{coin}_{action}_{new_pos.timestamp}" if not self.state_manager.has_copied_trade(trade_id): trades.append(CopyTrade( coin=coin, action=action, size=size_decrease, leverage=new_pos.leverage, original_trade_hash=trade_id, target_trader_address=self.target_address, timestamp=new_pos.timestamp )) self.logger.info(f"šŸ“‰ Detected position decrease: {action} {size_decrease} {coin}") # Update tracking regardless self.state_manager.update_tracked_position(coin, position_data) # Check for closed positions (exits) tracked_positions = self.state_manager.get_tracked_positions() for coin in list(tracked_positions.keys()): if coin not in new_positions: # Position fully closed tracked_pos = tracked_positions[coin] action = f"close_{tracked_pos['side']}" trade_id = f"{coin}_{action}_{int(time.time() * 1000)}" if not self.state_manager.has_copied_trade(trade_id): trades.append(CopyTrade( coin=coin, action=action, size=tracked_pos['size'], leverage=tracked_pos['leverage'], original_trade_hash=trade_id, target_trader_address=self.target_address, timestamp=int(time.time() * 1000) )) self.logger.info(f"āŒ Detected position closure: {action} {tracked_pos['size']} {coin}") # Remove from tracking self.state_manager.remove_tracked_position(coin) # Update last check time self.state_manager.update_last_check() return trades async def execute_copy_trade(self, trade: CopyTrade): """Execute a copy trade""" try: # Check if we've already copied this trade if self.state_manager.has_copied_trade(trade.original_trade_hash): self.logger.debug(f"Skipping already copied trade: {trade.original_trade_hash}") return # Calculate our position size our_size = await self.calculate_position_size(trade) if our_size < self.min_position_size: self.logger.info(f"Skipping {trade.coin} trade - size too small: ${our_size:.2f}") # Still mark as copied to avoid retrying self.state_manager.add_copied_trade(trade.original_trade_hash) return # Apply leverage limit leverage = min(trade.leverage, self.max_leverage) # Add execution delay await asyncio.sleep(self.execution_delay) # Execute the trade success = await self._execute_hyperliquid_trade(trade, our_size, leverage) # Mark trade as copied (whether successful or not to avoid retrying) self.state_manager.add_copied_trade(trade.original_trade_hash) # Send notification if self.notifications_enabled: status = "āœ… SUCCESS" if success else "āŒ FAILED" await self.notification_manager.send_message( f"šŸ”„ Copy Trade {status}\n" f"Action: {trade.action}\n" f"Asset: {trade.coin}\n" f"Size: ${our_size:.2f}\n" f"Leverage: {leverage}x\n" f"Target: {trade.target_trader_address[:10]}...\n" f"Trade ID: {trade.original_trade_hash[:16]}..." ) except Exception as e: self.logger.error(f"Error executing copy trade for {trade.coin}: {e}") # Mark as copied even on error to avoid infinite retries self.state_manager.add_copied_trade(trade.original_trade_hash) if self.notifications_enabled: await self.notification_manager.send_message( f"āŒ Copy Trade Error\n" f"Asset: {trade.coin}\n" f"Action: {trade.action}\n" f"Error: {str(e)[:100]}\n" f"Trade ID: {trade.original_trade_hash[:16]}..." ) async def calculate_position_size(self, trade: CopyTrade) -> float: """Calculate our position size based on the copy trading mode""" try: # Get our current account balance our_balance = await self.get_our_account_balance() if self.copy_mode == 'FIXED': # Fixed percentage of our account return our_balance * self.portfolio_percentage elif self.copy_mode == 'PROPORTIONAL': # Get target trader's account balance target_balance = await self.get_target_account_balance() if target_balance <= 0: return our_balance * self.portfolio_percentage # Calculate target trader's position percentage target_pos = self.target_positions.get(trade.coin) if not target_pos: return our_balance * self.portfolio_percentage target_position_percentage = target_pos.margin_used / target_balance # Apply same percentage to our account our_position_size = our_balance * target_position_percentage # Cap at our portfolio percentage limit max_size = our_balance * self.portfolio_percentage return min(our_position_size, max_size) else: return our_balance * self.portfolio_percentage except Exception as e: self.logger.error(f"Error calculating position size: {e}") # Fallback to fixed percentage our_balance = await self.get_our_account_balance() return our_balance * self.portfolio_percentage async def get_our_account_balance(self) -> float: """Get our account balance""" try: balance_info = self.client.get_balance() if balance_info: return float(balance_info.get('accountValue', 0)) else: return 0.0 except Exception as e: self.logger.error(f"Error getting our account balance: {e}") return 0.0 async def get_target_account_balance(self) -> float: """Get target trader's account balance""" try: payload = { "type": "clearinghouseState", "user": self.target_address } async with aiohttp.ClientSession() as session: async with session.post(self.info_url, json=payload) as response: if response.status == 200: data = await response.json() return float(data.get('marginSummary', {}).get('accountValue', 0)) else: return 0.0 except Exception as e: self.logger.error(f"Error getting target account balance: {e}") return 0.0 async def _execute_hyperliquid_trade(self, trade: CopyTrade, size: float, leverage: float) -> bool: """Execute trade on Hyperliquid""" try: # Determine if this is a buy or sell order is_buy = 'long' in trade.action or ('close' in trade.action and 'short' in trade.action) # For position opening/closing if 'open' in trade.action: # Open new position result = await self.client.place_order( symbol=trade.coin, side='buy' if is_buy else 'sell', size=size, leverage=leverage, order_type='market' ) elif 'close' in trade.action: # Close existing position result = await self.client.close_position( symbol=trade.coin, size=size ) elif 'add' in trade.action: # Add to existing position result = await self.client.place_order( symbol=trade.coin, side='buy' if is_buy else 'sell', size=size, leverage=leverage, order_type='market' ) elif 'reduce' in trade.action: # Reduce existing position result = await self.client.place_order( symbol=trade.coin, side='sell' if 'long' in trade.action else 'buy', size=size, order_type='market' ) else: self.logger.error(f"Unknown trade action: {trade.action}") return False if result and result.get('success'): self.logger.info(f"Successfully executed copy trade: {trade.action} {size} {trade.coin}") return True else: self.logger.error(f"Failed to execute copy trade: {result}") return False except Exception as e: self.logger.error(f"Error executing Hyperliquid trade: {e}") return False async def sync_positions(self): """Sync our current positions with tracking""" try: # Get our current positions positions = self.client.get_positions() if positions: self.our_positions = {pos['symbol']: pos for pos in positions} else: self.our_positions = {} # Get target positions for initial sync self.target_positions = await self.get_target_positions() or {} self.logger.info(f"Synced positions - Target: {len(self.target_positions)}, Ours: {len(self.our_positions)}") except Exception as e: self.logger.error(f"Error syncing positions: {e}") async def stop_monitoring(self): """Stop copy trading monitoring""" self.enabled = False self.state_manager.stop_copy_trading() self.logger.info("Copy trading monitor stopped") if self.notifications_enabled: session_info = self.state_manager.get_session_info() duration_str = "" if session_info['session_duration_seconds']: duration_hours = session_info['session_duration_seconds'] / 3600 duration_str = f"\nSession duration: {duration_hours:.1f} hours" await self.notification_manager.send_message( f"šŸ›‘ Copy Trading Stopped\n" f"šŸ“Š Tracked positions: {session_info['tracked_positions_count']}\n" f"šŸ”„ Copied trades: {session_info['copied_trades_count']}" + duration_str + f"\n\nšŸ’¾ State saved - can resume later" ) def get_status(self) -> Dict[str, Any]: """Get current copy trading status""" session_info = self.state_manager.get_session_info() return { 'enabled': self.enabled and self.state_manager.is_enabled(), 'target_address': self.target_address, 'portfolio_percentage': self.portfolio_percentage, 'copy_mode': self.copy_mode, 'max_leverage': self.max_leverage, 'target_positions': len(self.target_positions), 'our_positions': len(self.our_positions), 'tracked_positions': session_info['tracked_positions_count'], 'copied_trades': session_info['copied_trades_count'], 'session_start_time': session_info['start_time'], 'session_duration_hours': session_info['session_duration_seconds'] / 3600 if session_info['session_duration_seconds'] else None, 'last_check': session_info['last_check_time'] }