123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564 |
- """
- Copy Trading Monitor - Tracks and copies trades from a target trader on Hyperliquid
- """
- import logging
- import time
- import asyncio
- from datetime import datetime, timedelta
- from typing import Dict, List, Optional, Any
- from dataclasses import dataclass
- import aiohttp
- import json
- from decimal import Decimal, ROUND_DOWN
- from ..config.config import Config
- from ..clients.hyperliquid_client import HyperliquidClient
- from ..notifications.notification_manager import NotificationManager
- from .copy_trading_state import CopyTradingStateManager
- @dataclass
- class TraderPosition:
- """Represents a position held by the target trader"""
- coin: str
- size: float
- side: str # 'long' or 'short'
- entry_price: float
- leverage: float
- position_value: float
- unrealized_pnl: float
- margin_used: float
- timestamp: int
- @dataclass
- class CopyTrade:
- """Represents a trade to be copied"""
- coin: str
- action: str # 'open_long', 'open_short', 'close_long', 'close_short'
- size: float
- leverage: float
- original_trade_hash: str
- target_trader_address: str
- timestamp: int
- class CopyTradingMonitor:
- """Monitor and copy trades from a target trader"""
-
- def __init__(self, client: HyperliquidClient, notification_manager: NotificationManager):
- self.client = client
- self.notification_manager = notification_manager
- self.config = Config()
- self.logger = logging.getLogger(__name__)
-
- # Configuration
- self.enabled = self.config.COPY_TRADING_ENABLED
- self.target_address = self.config.COPY_TRADING_TARGET_ADDRESS
- self.portfolio_percentage = self.config.COPY_TRADING_PORTFOLIO_PERCENTAGE
- self.copy_mode = self.config.COPY_TRADING_MODE
- self.max_leverage = self.config.COPY_TRADING_MAX_LEVERAGE
- self.min_position_size = self.config.COPY_TRADING_MIN_POSITION_SIZE
- self.execution_delay = self.config.COPY_TRADING_EXECUTION_DELAY
- self.notifications_enabled = self.config.COPY_TRADING_NOTIFICATIONS
-
- # State management for persistence and tracking
- self.state_manager = CopyTradingStateManager()
-
- # Override enabled status from state if different from config
- if self.state_manager.is_enabled() and self.target_address:
- self.enabled = True
-
- # State tracking (legacy, kept for compatibility)
- self.target_positions: Dict[str, TraderPosition] = {}
- self.our_positions: Dict[str, Any] = {}
- self.last_check_time = 0
- self.pending_trades: List[CopyTrade] = []
-
- # API endpoints
- self.info_url = "https://api.hyperliquid.xyz/info"
-
- self.logger.info(f"Copy Trading Monitor initialized - Target: {self.target_address}")
-
- # Load previous session info if available
- session_info = self.state_manager.get_session_info()
- if session_info['start_time']:
- self.logger.info(f"📅 Previous session started: {session_info['start_time']}")
- self.logger.info(f"📊 Tracked positions: {session_info['tracked_positions_count']}")
- self.logger.info(f"🔄 Copied trades: {session_info['copied_trades_count']}")
-
- async def start_monitoring(self):
- """Start the copy trading monitoring loop"""
- if not self.enabled:
- self.logger.info("Copy trading is disabled")
- return
-
- if not self.target_address:
- self.logger.error("No target trader address configured")
- return
-
- self.logger.info(f"Starting copy trading monitor for {self.target_address}")
-
- # Start state tracking
- self.state_manager.start_copy_trading(self.target_address)
-
- # Get current target positions for initialization
- current_positions = await self.get_target_positions()
- if current_positions:
- # Check if this is a fresh start or resuming
- if not self.state_manager.get_tracked_positions():
- # Fresh start - initialize tracking but don't copy existing positions
- self.logger.info("🆕 Fresh start - initializing with existing positions (won't copy)")
- self.state_manager.initialize_tracked_positions(current_positions)
-
- startup_message = (
- f"🔄 Copy Trading Started (Fresh)\n"
- f"Target: {self.target_address[:10]}...\n"
- f"Portfolio Allocation: {self.portfolio_percentage:.1%}\n"
- f"Mode: {self.copy_mode}\n"
- f"Max Leverage: {self.max_leverage}x\n\n"
- f"📊 Found {len(current_positions)} existing positions\n"
- f"⚠️ Will only copy NEW trades from now on"
- )
- else:
- # Resuming - continue from where we left off
- tracked_count = len(self.state_manager.get_tracked_positions())
- self.logger.info(f"▶️ Resuming session - {tracked_count} positions tracked")
-
- startup_message = (
- f"▶️ Copy Trading Resumed\n"
- f"Target: {self.target_address[:10]}...\n"
- f"Portfolio Allocation: {self.portfolio_percentage:.1%}\n"
- f"Mode: {self.copy_mode}\n"
- f"Max Leverage: {self.max_leverage}x\n\n"
- f"📊 Resuming with {tracked_count} tracked positions"
- )
- else:
- startup_message = (
- f"🔄 Copy Trading Started\n"
- f"Target: {self.target_address[:10]}...\n"
- f"Portfolio Allocation: {self.portfolio_percentage:.1%}\n"
- f"Mode: {self.copy_mode}\n"
- f"Max Leverage: {self.max_leverage}x\n\n"
- f"⚠️ Could not access target trader positions"
- )
-
- # Send startup notification
- if self.notifications_enabled:
- await self.notification_manager.send_message(startup_message)
-
- # Initial sync
- await self.sync_positions()
-
- # Start monitoring loop
- while self.enabled and self.state_manager.is_enabled():
- try:
- await self.monitor_cycle()
- await asyncio.sleep(30) # Check every 30 seconds
- except Exception as e:
- self.logger.error(f"Error in copy trading monitor cycle: {e}")
- await asyncio.sleep(60) # Wait longer on error
-
- async def monitor_cycle(self):
- """Single monitoring cycle"""
- try:
- # Get target trader's current positions
- new_positions = await self.get_target_positions()
-
- if new_positions is None:
- return
-
- # Compare with previous positions to detect changes
- position_changes = self.detect_position_changes(new_positions)
-
- # Execute any detected trades
- for trade in position_changes:
- await self.execute_copy_trade(trade)
-
- # Update our tracking
- self.target_positions = new_positions
-
- except Exception as e:
- self.logger.error(f"Error in monitor cycle: {e}")
-
- async def get_target_positions(self) -> Optional[Dict[str, TraderPosition]]:
- """Get current positions of target trader"""
- try:
- payload = {
- "type": "clearinghouseState",
- "user": self.target_address
- }
-
- async with aiohttp.ClientSession() as session:
- async with session.post(self.info_url, json=payload) as response:
- if response.status != 200:
- self.logger.error(f"Failed to get target positions: {response.status}")
- return None
-
- data = await response.json()
- positions = {}
-
- # Parse asset positions
- for asset_pos in data.get('assetPositions', []):
- if asset_pos.get('type') == 'oneWay':
- pos = asset_pos['position']
- coin = pos['coin']
- size = float(pos['szi'])
-
- if abs(size) < 0.001: # Skip dust positions
- continue
-
- side = 'long' if size > 0 else 'short'
-
- positions[coin] = TraderPosition(
- coin=coin,
- size=abs(size),
- side=side,
- entry_price=float(pos['entryPx']),
- leverage=float(pos['leverage']['value']),
- position_value=float(pos['positionValue']),
- unrealized_pnl=float(pos['unrealizedPnl']),
- margin_used=float(pos['marginUsed']),
- timestamp=int(time.time() * 1000)
- )
-
- return positions
-
- except Exception as e:
- self.logger.error(f"Error getting target positions: {e}")
- return None
-
- def detect_position_changes(self, new_positions: Dict[str, TraderPosition]) -> List[CopyTrade]:
- """Detect changes in target trader's positions using state manager"""
- trades = []
-
- # Check for new positions and position increases
- for coin, new_pos in new_positions.items():
- position_data = {
- 'size': new_pos.size,
- 'side': new_pos.side,
- 'entry_price': new_pos.entry_price,
- 'leverage': new_pos.leverage
- }
-
- # Check if this is a new position we should copy
- if self.state_manager.should_copy_position(coin, position_data):
- tracked_pos = self.state_manager.get_tracked_positions().get(coin)
-
- if tracked_pos is None:
- # Completely new position
- action = f"open_{new_pos.side}"
- copy_size = new_pos.size
- self.logger.info(f"🆕 Detected NEW position: {action} {copy_size} {coin} at {new_pos.leverage}x")
- else:
- # Position increase
- size_increase = new_pos.size - tracked_pos['size']
- action = f"add_{new_pos.side}"
- copy_size = size_increase
- self.logger.info(f"📈 Detected position increase: {action} {size_increase} {coin}")
-
- # Create trade to copy
- trade_id = f"{coin}_{action}_{new_pos.timestamp}"
- if not self.state_manager.has_copied_trade(trade_id):
- trades.append(CopyTrade(
- coin=coin,
- action=action,
- size=copy_size,
- leverage=new_pos.leverage,
- original_trade_hash=trade_id,
- target_trader_address=self.target_address,
- timestamp=new_pos.timestamp
- ))
-
- # Check for position reductions
- elif self.state_manager.is_position_reduction(coin, position_data):
- tracked_pos = self.state_manager.get_tracked_positions()[coin]
- size_decrease = tracked_pos['size'] - new_pos.size
- action = f"reduce_{new_pos.side}"
-
- trade_id = f"{coin}_{action}_{new_pos.timestamp}"
- if not self.state_manager.has_copied_trade(trade_id):
- trades.append(CopyTrade(
- coin=coin,
- action=action,
- size=size_decrease,
- leverage=new_pos.leverage,
- original_trade_hash=trade_id,
- target_trader_address=self.target_address,
- timestamp=new_pos.timestamp
- ))
- self.logger.info(f"📉 Detected position decrease: {action} {size_decrease} {coin}")
-
- # Update tracking regardless
- self.state_manager.update_tracked_position(coin, position_data)
-
- # Check for closed positions (exits)
- tracked_positions = self.state_manager.get_tracked_positions()
- for coin in list(tracked_positions.keys()):
- if coin not in new_positions:
- # Position fully closed
- tracked_pos = tracked_positions[coin]
- action = f"close_{tracked_pos['side']}"
-
- trade_id = f"{coin}_{action}_{int(time.time() * 1000)}"
- if not self.state_manager.has_copied_trade(trade_id):
- trades.append(CopyTrade(
- coin=coin,
- action=action,
- size=tracked_pos['size'],
- leverage=tracked_pos['leverage'],
- original_trade_hash=trade_id,
- target_trader_address=self.target_address,
- timestamp=int(time.time() * 1000)
- ))
- self.logger.info(f"❌ Detected position closure: {action} {tracked_pos['size']} {coin}")
-
- # Remove from tracking
- self.state_manager.remove_tracked_position(coin)
-
- # Update last check time
- self.state_manager.update_last_check()
-
- return trades
-
- async def execute_copy_trade(self, trade: CopyTrade):
- """Execute a copy trade"""
- try:
- # Check if we've already copied this trade
- if self.state_manager.has_copied_trade(trade.original_trade_hash):
- self.logger.debug(f"Skipping already copied trade: {trade.original_trade_hash}")
- return
-
- # Calculate our position size
- our_size = await self.calculate_position_size(trade)
-
- if our_size < self.min_position_size:
- self.logger.info(f"Skipping {trade.coin} trade - size too small: ${our_size:.2f}")
- # Still mark as copied to avoid retrying
- self.state_manager.add_copied_trade(trade.original_trade_hash)
- return
-
- # Apply leverage limit
- leverage = min(trade.leverage, self.max_leverage)
-
- # Add execution delay
- await asyncio.sleep(self.execution_delay)
-
- # Execute the trade
- success = await self._execute_hyperliquid_trade(trade, our_size, leverage)
-
- # Mark trade as copied (whether successful or not to avoid retrying)
- self.state_manager.add_copied_trade(trade.original_trade_hash)
-
- # Send notification
- if self.notifications_enabled:
- status = "✅ SUCCESS" if success else "❌ FAILED"
- await self.notification_manager.send_message(
- f"🔄 Copy Trade {status}\n"
- f"Action: {trade.action}\n"
- f"Asset: {trade.coin}\n"
- f"Size: ${our_size:.2f}\n"
- f"Leverage: {leverage}x\n"
- f"Target: {trade.target_trader_address[:10]}...\n"
- f"Trade ID: {trade.original_trade_hash[:16]}..."
- )
-
- except Exception as e:
- self.logger.error(f"Error executing copy trade for {trade.coin}: {e}")
-
- # Mark as copied even on error to avoid infinite retries
- self.state_manager.add_copied_trade(trade.original_trade_hash)
-
- if self.notifications_enabled:
- await self.notification_manager.send_message(
- f"❌ Copy Trade Error\n"
- f"Asset: {trade.coin}\n"
- f"Action: {trade.action}\n"
- f"Error: {str(e)[:100]}\n"
- f"Trade ID: {trade.original_trade_hash[:16]}..."
- )
-
- async def calculate_position_size(self, trade: CopyTrade) -> float:
- """Calculate our position size based on the copy trading mode"""
- try:
- # Get our current account balance
- our_balance = await self.get_our_account_balance()
-
- if self.copy_mode == 'FIXED':
- # Fixed percentage of our account
- return our_balance * self.portfolio_percentage
-
- elif self.copy_mode == 'PROPORTIONAL':
- # Get target trader's account balance
- target_balance = await self.get_target_account_balance()
-
- if target_balance <= 0:
- return our_balance * self.portfolio_percentage
-
- # Calculate target trader's position percentage
- target_pos = self.target_positions.get(trade.coin)
- if not target_pos:
- return our_balance * self.portfolio_percentage
-
- target_position_percentage = target_pos.margin_used / target_balance
-
- # Apply same percentage to our account
- our_position_size = our_balance * target_position_percentage
-
- # Cap at our portfolio percentage limit
- max_size = our_balance * self.portfolio_percentage
- return min(our_position_size, max_size)
-
- else:
- return our_balance * self.portfolio_percentage
-
- except Exception as e:
- self.logger.error(f"Error calculating position size: {e}")
- # Fallback to fixed percentage
- our_balance = await self.get_our_account_balance()
- return our_balance * self.portfolio_percentage
-
- async def get_our_account_balance(self) -> float:
- """Get our account balance"""
- try:
- balance_info = self.client.get_balance()
- if balance_info:
- return float(balance_info.get('accountValue', 0))
- else:
- return 0.0
- except Exception as e:
- self.logger.error(f"Error getting our account balance: {e}")
- return 0.0
-
- async def get_target_account_balance(self) -> float:
- """Get target trader's account balance"""
- try:
- payload = {
- "type": "clearinghouseState",
- "user": self.target_address
- }
-
- async with aiohttp.ClientSession() as session:
- async with session.post(self.info_url, json=payload) as response:
- if response.status == 200:
- data = await response.json()
- return float(data.get('marginSummary', {}).get('accountValue', 0))
- else:
- return 0.0
-
- except Exception as e:
- self.logger.error(f"Error getting target account balance: {e}")
- return 0.0
-
- async def _execute_hyperliquid_trade(self, trade: CopyTrade, size: float, leverage: float) -> bool:
- """Execute trade on Hyperliquid"""
- try:
- # Determine if this is a buy or sell order
- is_buy = 'long' in trade.action or ('close' in trade.action and 'short' in trade.action)
-
- # For position opening/closing
- if 'open' in trade.action:
- # Open new position
- result = await self.client.place_order(
- symbol=trade.coin,
- side='buy' if is_buy else 'sell',
- size=size,
- leverage=leverage,
- order_type='market'
- )
- elif 'close' in trade.action:
- # Close existing position
- result = await self.client.close_position(
- symbol=trade.coin,
- size=size
- )
- elif 'add' in trade.action:
- # Add to existing position
- result = await self.client.place_order(
- symbol=trade.coin,
- side='buy' if is_buy else 'sell',
- size=size,
- leverage=leverage,
- order_type='market'
- )
- elif 'reduce' in trade.action:
- # Reduce existing position
- result = await self.client.place_order(
- symbol=trade.coin,
- side='sell' if 'long' in trade.action else 'buy',
- size=size,
- order_type='market'
- )
- else:
- self.logger.error(f"Unknown trade action: {trade.action}")
- return False
-
- if result and result.get('success'):
- self.logger.info(f"Successfully executed copy trade: {trade.action} {size} {trade.coin}")
- return True
- else:
- self.logger.error(f"Failed to execute copy trade: {result}")
- return False
-
- except Exception as e:
- self.logger.error(f"Error executing Hyperliquid trade: {e}")
- return False
-
- async def sync_positions(self):
- """Sync our current positions with tracking"""
- try:
- # Get our current positions
- positions = self.client.get_positions()
- if positions:
- self.our_positions = {pos['symbol']: pos for pos in positions}
- else:
- self.our_positions = {}
-
- # Get target positions for initial sync
- self.target_positions = await self.get_target_positions() or {}
-
- self.logger.info(f"Synced positions - Target: {len(self.target_positions)}, Ours: {len(self.our_positions)}")
-
- except Exception as e:
- self.logger.error(f"Error syncing positions: {e}")
-
- async def stop_monitoring(self):
- """Stop copy trading monitoring"""
- self.enabled = False
- self.state_manager.stop_copy_trading()
- self.logger.info("Copy trading monitor stopped")
-
- if self.notifications_enabled:
- session_info = self.state_manager.get_session_info()
- duration_str = ""
- if session_info['session_duration_seconds']:
- duration_hours = session_info['session_duration_seconds'] / 3600
- duration_str = f"\nSession duration: {duration_hours:.1f} hours"
-
- await self.notification_manager.send_message(
- f"🛑 Copy Trading Stopped\n"
- f"📊 Tracked positions: {session_info['tracked_positions_count']}\n"
- f"🔄 Copied trades: {session_info['copied_trades_count']}"
- + duration_str +
- f"\n\n💾 State saved - can resume later"
- )
-
- def get_status(self) -> Dict[str, Any]:
- """Get current copy trading status"""
- session_info = self.state_manager.get_session_info()
-
- return {
- 'enabled': self.enabled and self.state_manager.is_enabled(),
- 'target_address': self.target_address,
- 'portfolio_percentage': self.portfolio_percentage,
- 'copy_mode': self.copy_mode,
- 'max_leverage': self.max_leverage,
- 'target_positions': len(self.target_positions),
- 'our_positions': len(self.our_positions),
- 'tracked_positions': session_info['tracked_positions_count'],
- 'copied_trades': session_info['copied_trades_count'],
- 'session_start_time': session_info['start_time'],
- 'session_duration_hours': session_info['session_duration_seconds'] / 3600 if session_info['session_duration_seconds'] else None,
- 'last_check': session_info['last_check_time']
- }
|