123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831 |
- """
- Copy Trading Monitor - Tracks and copies trades from a target trader on Hyperliquid
- """
- import logging
- import time
- import asyncio
- from datetime import datetime, timedelta
- from typing import Dict, List, Optional, Any
- from dataclasses import dataclass
- import aiohttp
- import json
- from decimal import Decimal, ROUND_DOWN
- from ..config.config import Config
- from ..clients.hyperliquid_client import HyperliquidClient
- from ..notifications.notification_manager import NotificationManager
- from .copy_trading_state import CopyTradingStateManager
- @dataclass
- class TraderPosition:
- """Represents a position held by the target trader"""
- coin: str
- size: float
- side: str # 'long' or 'short'
- entry_price: float
- leverage: float
- position_value: float
- unrealized_pnl: float
- margin_used: float
- timestamp: int
- @dataclass
- class CopyTrade:
- """Represents a trade to be copied"""
- coin: str
- action: str # 'open_long', 'open_short', 'close_long', 'close_short'
- size: float
- leverage: float
- original_trade_hash: str
- target_trader_address: str
- timestamp: int
- class CopyTradingMonitor:
- """Monitor and copy trades from a target trader"""
-
- def __init__(self, client: HyperliquidClient, notification_manager: NotificationManager):
- self.client = client
- self.notification_manager = notification_manager
- self.config = Config()
- self.logger = logging.getLogger(__name__)
-
- # Configuration
- self.enabled = self.config.COPY_TRADING_ENABLED
- self.target_address = self.config.COPY_TRADING_TARGET_ADDRESS
- self.portfolio_percentage = self.config.COPY_TRADING_PORTFOLIO_PERCENTAGE
- self.copy_mode = self.config.COPY_TRADING_MODE
- self.max_leverage = self.config.COPY_TRADING_MAX_LEVERAGE
- self.min_position_size = self.config.COPY_TRADING_MIN_POSITION_SIZE
- self.execution_delay = self.config.COPY_TRADING_EXECUTION_DELAY
- self.notifications_enabled = self.config.COPY_TRADING_NOTIFICATIONS
-
- # State management for persistence and tracking
- self.state_manager = CopyTradingStateManager()
-
- # Override enabled status from state if different from config
- if self.state_manager.is_enabled() and self.target_address:
- self.enabled = True
-
- # State tracking (legacy, kept for compatibility)
- self.target_positions: Dict[str, TraderPosition] = {}
- self.our_positions: Dict[str, Any] = {}
- self.last_check_time = 0
- self.pending_trades: List[CopyTrade] = []
-
- # API endpoints
- self.info_url = "https://api.hyperliquid.xyz/info"
-
- self.logger.info(f"Copy Trading Monitor initialized - Target: {self.target_address}")
- self.logger.info(f"📊 Configuration:")
- self.logger.info(f" - Enabled: {self.enabled}")
- self.logger.info(f" - Target Address: {self.target_address}")
- self.logger.info(f" - Portfolio %: {self.portfolio_percentage:.1%}")
- self.logger.info(f" - Copy Mode: {self.copy_mode}")
- self.logger.info(f" - Max Leverage: {self.max_leverage}x")
- self.logger.info(f" - Min Position Size: ${self.min_position_size:.2f}")
- self.logger.info(f" - Execution Delay: {self.execution_delay}s")
- self.logger.info(f" - Notifications: {self.notifications_enabled}")
-
- # Load previous session info if available
- session_info = self.state_manager.get_session_info()
- if session_info['start_time']:
- self.logger.info(f"📅 Previous session started: {session_info['start_time']}")
- self.logger.info(f"📊 Tracked positions: {session_info['tracked_positions_count']}")
- self.logger.info(f"🔄 Copied trades: {session_info['copied_trades_count']}")
-
- async def start_monitoring(self):
- """Start the copy trading monitoring loop"""
- if not self.enabled:
- self.logger.info("Copy trading is disabled")
- return
-
- if not self.target_address:
- self.logger.error("No target trader address configured")
- return
-
- self.logger.info(f"Starting copy trading monitor for {self.target_address}")
-
- try:
- # Start state tracking (using async version to prevent blocking)
- await self.state_manager.start_copy_trading_async(self.target_address)
-
- # Get current target positions for initialization (with timeout)
- try:
- current_positions = await asyncio.wait_for(
- self.get_target_positions(),
- timeout=15.0 # 15 second timeout for initialization
- )
- except asyncio.TimeoutError:
- self.logger.warning("Timeout during initialization - will retry in monitoring loop")
- current_positions = None
- except Exception as e:
- self.logger.error(f"Error during initialization: {e}")
- current_positions = None
-
- if current_positions:
- # Check if this is a fresh start or resuming
- if not self.state_manager.get_tracked_positions():
- # Fresh start - initialize tracking but don't copy existing positions
- self.logger.info("🆕 Fresh start - initializing with existing positions (won't copy)")
- await self.state_manager.initialize_tracked_positions_async(current_positions)
-
- startup_message = (
- f"🔄 Copy Trading Started (Fresh)\n"
- f"Target: {self.target_address[:10]}...\n"
- f"Portfolio Allocation: {self.portfolio_percentage:.1%}\n"
- f"Mode: {self.copy_mode}\n"
- f"Max Leverage: {self.max_leverage}x\n\n"
- f"📊 Found {len(current_positions)} existing positions\n"
- f"⚠️ Will only copy NEW trades from now on"
- )
- else:
- # Resuming - continue from where we left off
- tracked_count = len(self.state_manager.get_tracked_positions())
- self.logger.info(f"▶️ Resuming session - {tracked_count} positions tracked")
-
- startup_message = (
- f"▶️ Copy Trading Resumed\n"
- f"Target: {self.target_address[:10]}...\n"
- f"Portfolio Allocation: {self.portfolio_percentage:.1%}\n"
- f"Mode: {self.copy_mode}\n"
- f"Max Leverage: {self.max_leverage}x\n\n"
- f"📊 Resuming with {tracked_count} tracked positions"
- )
- else:
- startup_message = (
- f"🔄 Copy Trading Started\n"
- f"Target: {self.target_address[:10]}...\n"
- f"Portfolio Allocation: {self.portfolio_percentage:.1%}\n"
- f"Mode: {self.copy_mode}\n"
- f"Max Leverage: {self.max_leverage}x\n\n"
- f"⚠️ Could not access target trader positions during startup"
- )
-
- # Send startup notification
- if self.notifications_enabled:
- try:
- await asyncio.wait_for(
- self.notification_manager.send_generic_notification(startup_message),
- timeout=5.0
- )
- except Exception as e:
- self.logger.error(f"Error sending startup notification: {e}")
-
- # Initial sync (non-blocking)
- try:
- await asyncio.wait_for(self.sync_positions(), timeout=10.0)
- except Exception as e:
- self.logger.error(f"Error during initial sync: {e}")
-
- # Start monitoring loop
- while self.enabled and self.state_manager.is_enabled():
- try:
- await self.monitor_cycle()
- await asyncio.sleep(30) # Check every 30 seconds
- except Exception as e:
- self.logger.error(f"Error in copy trading monitor cycle: {e}")
- await asyncio.sleep(60) # Wait longer on error
-
- except Exception as e:
- self.logger.error(f"Fatal error in copy trading monitor: {e}")
- self.enabled = False
-
- async def monitor_cycle(self):
- """Single monitoring cycle"""
- try:
- # Get target trader's current positions with timeout
- try:
- new_positions = await asyncio.wait_for(
- self.get_target_positions(),
- timeout=15.0 # 15 second timeout
- )
- except asyncio.TimeoutError:
- self.logger.warning("Timeout getting target positions - skipping this cycle")
- return
- except Exception as e:
- self.logger.error(f"Error getting target positions: {e}")
- return
- if new_positions is None:
- return
-
- # Compare with previous positions to detect changes
- try:
- position_changes = await self.detect_position_changes(new_positions)
- except Exception as e:
- self.logger.error(f"Error detecting position changes: {e}")
- return
-
- # Execute any detected trades
- for trade in position_changes:
- try:
- await asyncio.wait_for(
- self.execute_copy_trade(trade),
- timeout=30.0 # 30 second timeout per trade
- )
- except asyncio.TimeoutError:
- self.logger.error(f"Timeout executing copy trade for {trade.coin}")
- except Exception as e:
- self.logger.error(f"Error executing copy trade for {trade.coin}: {e}")
-
- # Update our tracking
- self.target_positions = new_positions
-
- # Update last check timestamp (async version)
- await self.state_manager.update_last_check_async()
-
- except Exception as e:
- self.logger.error(f"Error in monitor cycle: {e}")
-
- async def get_target_positions(self) -> Optional[Dict[str, TraderPosition]]:
- """Get current positions of target trader"""
- try:
- payload = {
- "type": "clearinghouseState",
- "user": self.target_address
- }
-
- # Use timeout to prevent blocking
- timeout = aiohttp.ClientTimeout(total=10.0) # 10 second timeout
- async with aiohttp.ClientSession(timeout=timeout) as session:
- async with session.post(self.info_url, json=payload) as response:
- if response.status != 200:
- self.logger.error(f"Failed to get target positions: {response.status}")
- return None
-
- data = await response.json()
- positions = {}
-
- # Parse asset positions
- for asset_pos in data.get('assetPositions', []):
- if asset_pos.get('type') == 'oneWay':
- pos = asset_pos['position']
- coin = pos['coin']
- size = float(pos['szi'])
-
- if abs(size) < 0.001: # Skip dust positions
- continue
-
- side = 'long' if size > 0 else 'short'
-
- positions[coin] = TraderPosition(
- coin=coin,
- size=abs(size),
- side=side,
- entry_price=float(pos['entryPx']),
- leverage=float(pos['leverage']['value']),
- position_value=float(pos['positionValue']),
- unrealized_pnl=float(pos['unrealizedPnl']),
- margin_used=float(pos['marginUsed']),
- timestamp=int(time.time() * 1000)
- )
-
- return positions
-
- except asyncio.TimeoutError:
- self.logger.warning("Timeout getting target positions - will retry next cycle")
- return None
- except Exception as e:
- self.logger.error(f"Error getting target positions: {e}")
- return None
-
- async def detect_position_changes(self, new_positions: Dict[str, TraderPosition]) -> List[CopyTrade]:
- """Detect changes in target trader's positions using state manager"""
- trades = []
-
- # Check for new positions and position increases
- for coin, new_pos in new_positions.items():
- position_data = {
- 'size': new_pos.size,
- 'side': new_pos.side,
- 'entry_price': new_pos.entry_price,
- 'leverage': new_pos.leverage
- }
-
- # Check if this is a new position we should copy
- if self.state_manager.should_copy_position(coin, position_data):
- tracked_pos = self.state_manager.get_tracked_positions().get(coin)
-
- if tracked_pos is None:
- # Completely new position
- action = f"open_{new_pos.side}"
- copy_size = new_pos.size
- self.logger.info(f"🆕 Detected NEW position: {action} {copy_size} {coin} at {new_pos.leverage}x")
- else:
- # Position increase
- size_increase = new_pos.size - tracked_pos['size']
- action = f"add_{new_pos.side}"
- copy_size = size_increase
- self.logger.info(f"📈 Detected position increase: {action} {size_increase} {coin}")
-
- # Create trade to copy
- trade_id = f"{coin}_{action}_{new_pos.timestamp}"
- if not self.state_manager.has_copied_trade(trade_id):
- trades.append(CopyTrade(
- coin=coin,
- action=action,
- size=copy_size,
- leverage=new_pos.leverage,
- original_trade_hash=trade_id,
- target_trader_address=self.target_address,
- timestamp=new_pos.timestamp
- ))
-
- # Check for position reductions
- elif self.state_manager.is_position_reduction(coin, position_data):
- tracked_pos = self.state_manager.get_tracked_positions()[coin]
- size_decrease = tracked_pos['size'] - new_pos.size
- action = f"reduce_{new_pos.side}"
-
- trade_id = f"{coin}_{action}_{new_pos.timestamp}"
- if not self.state_manager.has_copied_trade(trade_id):
- trades.append(CopyTrade(
- coin=coin,
- action=action,
- size=size_decrease,
- leverage=new_pos.leverage,
- original_trade_hash=trade_id,
- target_trader_address=self.target_address,
- timestamp=new_pos.timestamp
- ))
- self.logger.info(f"📉 Detected position decrease: {action} {size_decrease} {coin}")
-
- # Update tracking regardless
- await self.state_manager.update_tracked_position_async(coin, position_data)
-
- # Check for closed positions (exits)
- tracked_positions = self.state_manager.get_tracked_positions()
- for coin in list(tracked_positions.keys()):
- if coin not in new_positions:
- # Position fully closed
- tracked_pos = tracked_positions[coin]
- action = f"close_{tracked_pos['side']}"
-
- trade_id = f"{coin}_{action}_{int(time.time() * 1000)}"
- if not self.state_manager.has_copied_trade(trade_id):
- trades.append(CopyTrade(
- coin=coin,
- action=action,
- size=tracked_pos['size'],
- leverage=tracked_pos['leverage'],
- original_trade_hash=trade_id,
- target_trader_address=self.target_address,
- timestamp=int(time.time() * 1000)
- ))
- self.logger.info(f"❌ Detected position closure: {action} {tracked_pos['size']} {coin}")
-
- # Remove from tracking
- await self.state_manager.remove_tracked_position_async(coin)
-
- # Update last check time (already updated in monitor_cycle, so skip here)
-
- return trades
-
- async def execute_copy_trade(self, trade: CopyTrade):
- """Execute a copy trade"""
- try:
- # Check if we've already copied this trade
- if self.state_manager.has_copied_trade(trade.original_trade_hash):
- self.logger.debug(f"Skipping already copied trade: {trade.original_trade_hash}")
- return
-
- # Get current price for the asset
- symbol = f"{trade.coin}/USDC:USDC"
- try:
- market_data = await asyncio.to_thread(self.client.get_market_data, symbol)
- if not market_data or not market_data.get('ticker'):
- self.logger.error(f"❌ Could not get market data for {trade.coin}")
- await self.state_manager.add_copied_trade_async(trade.original_trade_hash)
- return
-
- current_price = float(market_data['ticker'].get('last', 0))
- if current_price <= 0:
- self.logger.error(f"❌ Invalid price for {trade.coin}: {current_price}")
- await self.state_manager.add_copied_trade_async(trade.original_trade_hash)
- return
- except Exception as e:
- self.logger.error(f"❌ Error getting price for {trade.coin}: {e}")
- await self.state_manager.add_copied_trade_async(trade.original_trade_hash)
- return
-
- # Apply leverage limit
- leverage = min(trade.leverage, self.max_leverage)
-
- # Calculate our position size with proper leverage handling
- position_calc = await self.calculate_position_size(trade, current_price, leverage)
-
- if position_calc['margin_to_use'] < self.min_position_size:
- self.logger.info(f"Skipping {trade.coin} trade - margin too small: ${position_calc['margin_to_use']:.2f} (min: ${self.min_position_size:.2f})")
- # Still mark as copied to avoid retrying
- await self.state_manager.add_copied_trade_async(trade.original_trade_hash)
- return
-
- # Add execution delay
- await asyncio.sleep(self.execution_delay)
-
- # Execute the trade
- success = await self._execute_hyperliquid_trade(trade, position_calc, leverage)
-
- # Mark trade as copied (whether successful or not to avoid retrying)
- await self.state_manager.add_copied_trade_async(trade.original_trade_hash)
-
- # Send notification
- if self.notifications_enabled:
- status = "✅ SUCCESS" if success else "❌ FAILED"
- await self.notification_manager.send_generic_notification(
- f"🔄 Copy Trade {status}\n"
- f"Action: {trade.action}\n"
- f"Asset: {trade.coin}\n"
- f"💳 Margin Used: ${position_calc['margin_to_use']:.2f}\n"
- f"🏦 Position Value: ${position_calc['position_value']:.2f}\n"
- f"🪙 Token Amount: {position_calc['token_amount']:.6f}\n"
- f"⚖️ Leverage: {leverage}x\n"
- f"Target: {trade.target_trader_address[:10]}...\n"
- f"Trade ID: {trade.original_trade_hash[:16]}..."
- )
-
- except Exception as e:
- self.logger.error(f"Error executing copy trade for {trade.coin}: {e}")
-
- # Mark as copied even on error to avoid infinite retries
- await self.state_manager.add_copied_trade_async(trade.original_trade_hash)
-
- if self.notifications_enabled:
- await self.notification_manager.send_generic_notification(
- f"❌ Copy Trade Error\n"
- f"Asset: {trade.coin}\n"
- f"Action: {trade.action}\n"
- f"Error: {str(e)[:100]}\n"
- f"Trade ID: {trade.original_trade_hash[:16]}..."
- )
-
- async def calculate_position_size(self, trade: CopyTrade, current_price: float, leverage: float) -> Dict[str, float]:
- """
- Calculate our position size based on the copy trading mode.
- Returns margin allocation and converts to actual token amount.
-
- Args:
- trade: The copy trade to execute
- current_price: Current price of the asset
- leverage: Leverage to use for the trade
-
- Returns:
- Dict with 'margin_to_use', 'position_value', 'token_amount'
- """
- try:
- # Get our current account balance
- our_balance = await self.get_our_account_balance()
- self.logger.info(f"📊 Our account balance: ${our_balance:.2f}")
- self.logger.info(f"🎯 Portfolio percentage: {self.portfolio_percentage:.1%}")
- self.logger.info(f"📈 Copy mode: {self.copy_mode}")
- self.logger.info(f"💰 Current {trade.coin} price: ${current_price:.4f}")
- self.logger.info(f"⚖️ Leverage to use: {leverage:.1f}x")
-
- # Calculate the MARGIN we want to allocate (risk-based)
- margin_to_use = 0.0
-
- if self.copy_mode == 'FIXED':
- # Fixed percentage of our account as margin
- margin_to_use = our_balance * self.portfolio_percentage
- self.logger.info(f"🔢 FIXED mode - margin to allocate: ${margin_to_use:.2f}")
-
- elif self.copy_mode == 'PROPORTIONAL':
- # Get target trader's account balance
- target_balance = await self.get_target_account_balance()
- self.logger.info(f"🎯 Target balance: ${target_balance:.2f}")
-
- if target_balance <= 0:
- margin_to_use = our_balance * self.portfolio_percentage
- self.logger.info(f"🔢 PROPORTIONAL mode (fallback) - margin: ${margin_to_use:.2f}")
- else:
- # Calculate target trader's margin percentage
- target_pos = self.target_positions.get(trade.coin)
- if not target_pos:
- margin_to_use = our_balance * self.portfolio_percentage
- self.logger.info(f"🔢 PROPORTIONAL mode (no target pos) - margin: ${margin_to_use:.2f}")
- else:
- target_margin_percentage = target_pos.margin_used / target_balance
- self.logger.info(f"📊 Target margin percentage: {target_margin_percentage:.1%}")
-
- # Apply same margin percentage to our account
- our_proportional_margin = our_balance * target_margin_percentage
-
- # Cap at our portfolio percentage limit
- max_margin = our_balance * self.portfolio_percentage
- margin_to_use = min(our_proportional_margin, max_margin)
-
- self.logger.info(f"🔢 PROPORTIONAL mode - uncapped: ${our_proportional_margin:.2f}, max: ${max_margin:.2f}, final: ${margin_to_use:.2f}")
-
- else:
- margin_to_use = our_balance * self.portfolio_percentage
- self.logger.info(f"🔢 Unknown mode (fallback) - margin: ${margin_to_use:.2f}")
-
- # Calculate position value with leverage
- position_value = margin_to_use * leverage
-
- # Calculate token amount based on current price
- token_amount = position_value / current_price
-
- result = {
- 'margin_to_use': margin_to_use,
- 'position_value': position_value,
- 'token_amount': token_amount
- }
-
- self.logger.info(f"📊 Position calculation:")
- self.logger.info(f" 💳 Margin to use: ${margin_to_use:.2f}")
- self.logger.info(f" 🏦 Position value (with {leverage:.1f}x): ${position_value:.2f}")
- self.logger.info(f" 🪙 Token amount: {token_amount:.6f} {trade.coin}")
-
- return result
-
- except Exception as e:
- self.logger.error(f"Error calculating position size: {e}")
- # Fallback to fixed percentage
- our_balance = await self.get_our_account_balance()
- fallback_margin = our_balance * self.portfolio_percentage
- fallback_value = fallback_margin * leverage
- fallback_tokens = fallback_value / current_price
-
- result = {
- 'margin_to_use': fallback_margin,
- 'position_value': fallback_value,
- 'token_amount': fallback_tokens
- }
-
- self.logger.info(f"🔢 Error fallback - margin: ${fallback_margin:.2f}, tokens: {fallback_tokens:.6f}")
- return result
-
- async def get_our_account_balance(self) -> float:
- """Get our account balance"""
- try:
- balance_info = await asyncio.to_thread(self.client.get_balance)
- self.logger.info(f"🔍 Raw balance info: {balance_info}")
-
- if balance_info:
- # Use the same approach as the /balance command
- usdc_total = 0.0
- usdc_free = 0.0
- usdc_used = 0.0
-
- if 'USDC' in balance_info.get('total', {}):
- usdc_total = float(balance_info['total']['USDC'])
- usdc_free = float(balance_info.get('free', {}).get('USDC', 0))
- usdc_used = float(balance_info.get('used', {}).get('USDC', 0))
-
- self.logger.info(f"💰 USDC Balance - Total: ${usdc_total:.2f}, Free: ${usdc_free:.2f}, Used: ${usdc_used:.2f}")
-
- if usdc_total > 0:
- self.logger.info(f"📊 Using total USDC balance: ${usdc_total:.2f}")
- return usdc_total
- else:
- self.logger.warning(f"⚠️ No USDC balance found - raw response: {balance_info}")
- return 0.0
- else:
- self.logger.warning("⚠️ No balance info returned")
- return 0.0
- except Exception as e:
- self.logger.error(f"Error getting our account balance: {e}")
- return 0.0
-
- async def get_target_account_balance(self) -> float:
- """Get target trader's account balance"""
- try:
- payload = {
- "type": "clearinghouseState",
- "user": self.target_address
- }
-
- # Use timeout to prevent blocking
- timeout = aiohttp.ClientTimeout(total=10.0) # 10 second timeout
- async with aiohttp.ClientSession(timeout=timeout) as session:
- async with session.post(self.info_url, json=payload) as response:
- if response.status == 200:
- data = await response.json()
- return float(data.get('marginSummary', {}).get('accountValue', 0))
- else:
- return 0.0
-
- except asyncio.TimeoutError:
- self.logger.warning("Timeout getting target account balance")
- return 0.0
- except Exception as e:
- self.logger.error(f"Error getting target account balance: {e}")
- return 0.0
-
- async def _execute_hyperliquid_trade(self, trade: CopyTrade, position_calc: Dict[str, float], leverage: float) -> bool:
- """Execute trade on Hyperliquid"""
- try:
- # Determine if this is a buy or sell order
- is_buy = 'long' in trade.action or ('close' in trade.action and 'short' in trade.action)
- side = 'buy' if is_buy else 'sell'
-
- # Extract values from position calculation
- token_amount = position_calc['token_amount']
- margin_used = position_calc['margin_to_use']
- position_value = position_calc['position_value']
-
- self.logger.info(f"🔄 Executing {trade.action} for {trade.coin}:")
- self.logger.info(f" 📊 Side: {side}")
- self.logger.info(f" 🪙 Token Amount: {token_amount:.6f} {trade.coin}")
- self.logger.info(f" 💳 Margin: ${margin_used:.2f}")
- self.logger.info(f" 🏦 Position Value: ${position_value:.2f}")
- self.logger.info(f" ⚖️ Leverage: {leverage}x")
-
- # For position opening/closing/modifying - all use market orders
- if 'open' in trade.action or 'add' in trade.action:
- # Open new position or add to existing position
- symbol = f"{trade.coin}/USDC:USDC"
- result, error = await asyncio.to_thread(
- self.client.place_market_order,
- symbol=symbol,
- side=side,
- amount=token_amount
- )
- if error:
- self.logger.error(f"❌ Market order failed: {error}")
- return False
-
- elif 'close' in trade.action:
- # Close existing position - we need to place an opposite market order
- # Get current position to determine the exact size to close
- symbol = f"{trade.coin}/USDC:USDC"
- positions = await asyncio.to_thread(self.client.get_positions, symbol=symbol)
- if not positions:
- self.logger.warning(f"⚠️ No position found for {trade.coin} to close")
- return False
-
- # Find the position to close
- position_to_close = None
- for pos in positions:
- if pos.get('symbol') == symbol and float(pos.get('contracts', 0)) != 0:
- position_to_close = pos
- break
-
- if not position_to_close:
- self.logger.warning(f"⚠️ No open position found for {trade.coin}")
- return False
-
- # Determine the opposite side to close the position
- current_side = 'long' if float(position_to_close.get('contracts', 0)) > 0 else 'short'
- close_side = 'sell' if current_side == 'long' else 'buy'
- close_size = abs(float(position_to_close.get('contracts', 0)))
-
- self.logger.info(f"📉 Closing {current_side} position: {close_side} {close_size} {trade.coin}")
-
- result, error = await asyncio.to_thread(
- self.client.place_market_order,
- symbol=symbol,
- side=close_side,
- amount=close_size
- )
- if error:
- self.logger.error(f"❌ Close order failed: {error}")
- return False
-
- elif 'reduce' in trade.action:
- # Reduce existing position
- reduce_side = 'sell' if 'long' in trade.action else 'buy'
- symbol = f"{trade.coin}/USDC:USDC"
- result, error = await asyncio.to_thread(
- self.client.place_market_order,
- symbol=symbol,
- side=reduce_side,
- amount=token_amount
- )
- if error:
- self.logger.error(f"❌ Reduce order failed: {error}")
- return False
-
- else:
- self.logger.error(f"❌ Unknown trade action: {trade.action}")
- return False
-
- # Check if result indicates success
- if result:
- self.logger.info(f"✅ Successfully executed copy trade: {trade.action}")
- self.logger.info(f" 🪙 {token_amount:.6f} {trade.coin} (${position_value:.2f} value, ${margin_used:.2f} margin)")
- return True
- else:
- self.logger.error(f"❌ Failed to execute copy trade - no result returned")
- return False
-
- except Exception as e:
- self.logger.error(f"❌ Error executing Hyperliquid trade: {e}")
- return False
-
- async def sync_positions(self):
- """Sync our current positions with tracking"""
- try:
- # Get our current positions
- positions = self.client.get_positions()
- if positions:
- self.our_positions = {pos['symbol']: pos for pos in positions}
- else:
- self.our_positions = {}
-
- # Get target positions for initial sync
- self.target_positions = await self.get_target_positions() or {}
-
- self.logger.info(f"Synced positions - Target: {len(self.target_positions)}, Ours: {len(self.our_positions)}")
-
- except Exception as e:
- self.logger.error(f"Error syncing positions: {e}")
-
- async def stop_monitoring(self):
- """Stop copy trading monitoring"""
- self.enabled = False
- await self.state_manager.stop_copy_trading_async()
- self.logger.info("Copy trading monitor stopped")
-
- if self.notifications_enabled:
- session_info = self.state_manager.get_session_info()
- duration_str = ""
- if session_info['session_duration_seconds']:
- duration_hours = session_info['session_duration_seconds'] / 3600
- duration_str = f"\nSession duration: {duration_hours:.1f} hours"
-
- await self.notification_manager.send_generic_notification(
- f"🛑 Copy Trading Stopped\n"
- f"📊 Tracked positions: {session_info['tracked_positions_count']}\n"
- f"🔄 Copied trades: {session_info['copied_trades_count']}"
- + duration_str +
- f"\n\n💾 State saved - can resume later"
- )
-
- async def test_balance_fetching(self) -> Dict[str, Any]:
- """Test balance fetching and position sizing for debugging purposes"""
- try:
- our_balance = await self.get_our_account_balance()
- target_balance = await self.get_target_account_balance()
-
- # Test with a mock trade for SOL
- test_price = 150.0 # Mock SOL price
- test_leverage = min(10.0, self.max_leverage) # Test leverage
-
- mock_trade = CopyTrade(
- coin="SOL",
- action="open_long",
- size=100,
- leverage=test_leverage,
- original_trade_hash="test_hash",
- target_trader_address=self.target_address or "test_address",
- timestamp=int(time.time() * 1000)
- )
-
- # Calculate position with leverage
- position_calc = await self.calculate_position_size(mock_trade, test_price, test_leverage)
-
- result = {
- 'our_balance': our_balance,
- 'target_balance': target_balance,
- 'portfolio_percentage': self.portfolio_percentage,
- 'test_price': test_price,
- 'test_leverage': test_leverage,
- 'margin_to_use': position_calc['margin_to_use'],
- 'position_value': position_calc['position_value'],
- 'token_amount': position_calc['token_amount'],
- 'min_position_size': self.min_position_size,
- 'would_execute': position_calc['margin_to_use'] >= self.min_position_size,
- 'config_enabled': self.enabled,
- 'state_enabled': self.state_manager.is_enabled()
- }
-
- self.logger.info(f"🧪 Balance & Leverage test results:")
- self.logger.info(f" 💰 Our balance: ${our_balance:.2f}")
- self.logger.info(f" 🎯 Target balance: ${target_balance:.2f}")
- self.logger.info(f" 📊 Portfolio allocation: {self.portfolio_percentage:.1%}")
- self.logger.info(f" ⚖️ Test leverage: {test_leverage:.1f}x")
- self.logger.info(f" 💳 Margin to use: ${position_calc['margin_to_use']:.2f}")
- self.logger.info(f" 🏦 Position value: ${position_calc['position_value']:.2f}")
- self.logger.info(f" 🪙 Token amount: {position_calc['token_amount']:.6f} SOL")
- self.logger.info(f" ✅ Would execute: {position_calc['margin_to_use'] >= self.min_position_size}")
-
- return result
-
- except Exception as e:
- self.logger.error(f"❌ Error in balance test: {e}")
- return {'error': str(e)}
-
- def get_status(self) -> Dict[str, Any]:
- """Get current copy trading status"""
- session_info = self.state_manager.get_session_info()
-
- return {
- 'enabled': self.enabled and self.state_manager.is_enabled(),
- 'target_address': self.target_address,
- 'portfolio_percentage': self.portfolio_percentage,
- 'copy_mode': self.copy_mode,
- 'max_leverage': self.max_leverage,
- 'target_positions': len(self.target_positions),
- 'our_positions': len(self.our_positions),
- 'tracked_positions': session_info['tracked_positions_count'],
- 'copied_trades': session_info['copied_trades_count'],
- 'session_start_time': session_info['start_time'],
- 'session_duration_hours': session_info['session_duration_seconds'] / 3600 if session_info['session_duration_seconds'] else None,
- 'last_check': session_info['last_check_time']
- }
|