copy_trading_monitor.py 24 KB


  1. """
  2. Copy Trading Monitor - Tracks and copies trades from a target trader on Hyperliquid
  3. """
  4. import logging
  5. import time
  6. import asyncio
  7. from datetime import datetime, timedelta
  8. from typing import Dict, List, Optional, Any
  9. from dataclasses import dataclass
  10. import aiohttp
  11. import json
  12. from decimal import Decimal, ROUND_DOWN
  13. from ..config.config import Config
  14. from ..clients.hyperliquid_client import HyperliquidClient
  15. from ..notifications.notification_manager import NotificationManager
  16. from .copy_trading_state import CopyTradingStateManager
  17. @dataclass
  18. class TraderPosition:
  19. """Represents a position held by the target trader"""
  20. coin: str
  21. size: float
  22. side: str # 'long' or 'short'
  23. entry_price: float
  24. leverage: float
  25. position_value: float
  26. unrealized_pnl: float
  27. margin_used: float
  28. timestamp: int
  29. @dataclass
  30. class CopyTrade:
  31. """Represents a trade to be copied"""
  32. coin: str
  33. action: str # 'open_long', 'open_short', 'close_long', 'close_short'
  34. size: float
  35. leverage: float
  36. original_trade_hash: str
  37. target_trader_address: str
  38. timestamp: int
  39. class CopyTradingMonitor:
  40. """Monitor and copy trades from a target trader"""
  41. def __init__(self, client: HyperliquidClient, notification_manager: NotificationManager):
  42. self.client = client
  43. self.notification_manager = notification_manager
  44. self.config = Config()
  45. self.logger = logging.getLogger(__name__)
  46. # Configuration
  47. self.enabled = self.config.COPY_TRADING_ENABLED
  48. self.target_address = self.config.COPY_TRADING_TARGET_ADDRESS
  49. self.portfolio_percentage = self.config.COPY_TRADING_PORTFOLIO_PERCENTAGE
  50. self.copy_mode = self.config.COPY_TRADING_MODE
  51. self.max_leverage = self.config.COPY_TRADING_MAX_LEVERAGE
  52. self.min_position_size = self.config.COPY_TRADING_MIN_POSITION_SIZE
  53. self.execution_delay = self.config.COPY_TRADING_EXECUTION_DELAY
  54. self.notifications_enabled = self.config.COPY_TRADING_NOTIFICATIONS
  55. # State management for persistence and tracking
  56. self.state_manager = CopyTradingStateManager()
  57. # Override enabled status from state if different from config
  58. if self.state_manager.is_enabled() and self.target_address:
  59. self.enabled = True
  60. # State tracking (legacy, kept for compatibility)
  61. self.target_positions: Dict[str, TraderPosition] = {}
  62. self.our_positions: Dict[str, Any] = {}
  63. self.last_check_time = 0
  64. self.pending_trades: List[CopyTrade] = []
  65. # API endpoints
  66. self.info_url = "https://api.hyperliquid.xyz/info"
  67. self.logger.info(f"Copy Trading Monitor initialized - Target: {self.target_address}")
  68. # Load previous session info if available
  69. session_info = self.state_manager.get_session_info()
  70. if session_info['start_time']:
  71. self.logger.info(f"📅 Previous session started: {session_info['start_time']}")
  72. self.logger.info(f"📊 Tracked positions: {session_info['tracked_positions_count']}")
  73. self.logger.info(f"🔄 Copied trades: {session_info['copied_trades_count']}")
  74. async def start_monitoring(self):
  75. """Start the copy trading monitoring loop"""
  76. if not self.enabled:
  77. self.logger.info("Copy trading is disabled")
  78. return
  79. if not self.target_address:
  80. self.logger.error("No target trader address configured")
  81. return
  82. self.logger.info(f"Starting copy trading monitor for {self.target_address}")
  83. # Start state tracking
  84. self.state_manager.start_copy_trading(self.target_address)
  85. # Get current target positions for initialization
  86. current_positions = await self.get_target_positions()
  87. if current_positions:
  88. # Check if this is a fresh start or resuming
  89. if not self.state_manager.get_tracked_positions():
  90. # Fresh start - initialize tracking but don't copy existing positions
  91. self.logger.info("🆕 Fresh start - initializing with existing positions (won't copy)")
  92. self.state_manager.initialize_tracked_positions(current_positions)
  93. startup_message = (
  94. f"🔄 Copy Trading Started (Fresh)\n"
  95. f"Target: {self.target_address[:10]}...\n"
  96. f"Portfolio Allocation: {self.portfolio_percentage:.1%}\n"
  97. f"Mode: {self.copy_mode}\n"
  98. f"Max Leverage: {self.max_leverage}x\n\n"
  99. f"📊 Found {len(current_positions)} existing positions\n"
  100. f"⚠️ Will only copy NEW trades from now on"
  101. )
  102. else:
  103. # Resuming - continue from where we left off
  104. tracked_count = len(self.state_manager.get_tracked_positions())
  105. self.logger.info(f"▶️ Resuming session - {tracked_count} positions tracked")
  106. startup_message = (
  107. f"▶️ Copy Trading Resumed\n"
  108. f"Target: {self.target_address[:10]}...\n"
  109. f"Portfolio Allocation: {self.portfolio_percentage:.1%}\n"
  110. f"Mode: {self.copy_mode}\n"
  111. f"Max Leverage: {self.max_leverage}x\n\n"
  112. f"📊 Resuming with {tracked_count} tracked positions"
  113. )
  114. else:
  115. startup_message = (
  116. f"🔄 Copy Trading Started\n"
  117. f"Target: {self.target_address[:10]}...\n"
  118. f"Portfolio Allocation: {self.portfolio_percentage:.1%}\n"
  119. f"Mode: {self.copy_mode}\n"
  120. f"Max Leverage: {self.max_leverage}x\n\n"
  121. f"⚠️ Could not access target trader positions"
  122. )
  123. # Send startup notification
  124. if self.notifications_enabled:
  125. await self.notification_manager.send_message(startup_message)
  126. # Initial sync
  127. await self.sync_positions()
  128. # Start monitoring loop
  129. while self.enabled and self.state_manager.is_enabled():
  130. try:
  131. await self.monitor_cycle()
  132. await asyncio.sleep(30) # Check every 30 seconds
  133. except Exception as e:
  134. self.logger.error(f"Error in copy trading monitor cycle: {e}")
  135. await asyncio.sleep(60) # Wait longer on error
  136. async def monitor_cycle(self):
  137. """Single monitoring cycle"""
  138. try:
  139. # Get target trader's current positions
  140. new_positions = await self.get_target_positions()
  141. if new_positions is None:
  142. return
  143. # Compare with previous positions to detect changes
  144. position_changes = self.detect_position_changes(new_positions)
  145. # Execute any detected trades
  146. for trade in position_changes:
  147. await self.execute_copy_trade(trade)
  148. # Update our tracking
  149. self.target_positions = new_positions
  150. except Exception as e:
  151. self.logger.error(f"Error in monitor cycle: {e}")
  152. async def get_target_positions(self) -> Optional[Dict[str, TraderPosition]]:
  153. """Get current positions of target trader"""
  154. try:
  155. payload = {
  156. "type": "clearinghouseState",
  157. "user": self.target_address
  158. }
  159. async with aiohttp.ClientSession() as session:
  160. async with session.post(self.info_url, json=payload) as response:
  161. if response.status != 200:
  162. self.logger.error(f"Failed to get target positions: {response.status}")
  163. return None
  164. data = await response.json()
  165. positions = {}
  166. # Parse asset positions
  167. for asset_pos in data.get('assetPositions', []):
  168. if asset_pos.get('type') == 'oneWay':
  169. pos = asset_pos['position']
  170. coin = pos['coin']
  171. size = float(pos['szi'])
  172. if abs(size) < 0.001: # Skip dust positions
  173. continue
  174. side = 'long' if size > 0 else 'short'
  175. positions[coin] = TraderPosition(
  176. coin=coin,
  177. size=abs(size),
  178. side=side,
  179. entry_price=float(pos['entryPx']),
  180. leverage=float(pos['leverage']['value']),
  181. position_value=float(pos['positionValue']),
  182. unrealized_pnl=float(pos['unrealizedPnl']),
  183. margin_used=float(pos['marginUsed']),
  184. timestamp=int(time.time() * 1000)
  185. )
  186. return positions
  187. except Exception as e:
  188. self.logger.error(f"Error getting target positions: {e}")
  189. return None
  190. def detect_position_changes(self, new_positions: Dict[str, TraderPosition]) -> List[CopyTrade]:
  191. """Detect changes in target trader's positions using state manager"""
  192. trades = []
  193. # Check for new positions and position increases
  194. for coin, new_pos in new_positions.items():
  195. position_data = {
  196. 'size': new_pos.size,
  197. 'side': new_pos.side,
  198. 'entry_price': new_pos.entry_price,
  199. 'leverage': new_pos.leverage
  200. }
  201. # Check if this is a new position we should copy
  202. if self.state_manager.should_copy_position(coin, position_data):
  203. tracked_pos = self.state_manager.get_tracked_positions().get(coin)
  204. if tracked_pos is None:
  205. # Completely new position
  206. action = f"open_{new_pos.side}"
  207. copy_size = new_pos.size
  208. self.logger.info(f"🆕 Detected NEW position: {action} {copy_size} {coin} at {new_pos.leverage}x")
  209. else:
  210. # Position increase
  211. size_increase = new_pos.size - tracked_pos['size']
  212. action = f"add_{new_pos.side}"
  213. copy_size = size_increase
  214. self.logger.info(f"📈 Detected position increase: {action} {size_increase} {coin}")
  215. # Create trade to copy
  216. trade_id = f"{coin}_{action}_{new_pos.timestamp}"
  217. if not self.state_manager.has_copied_trade(trade_id):
  218. trades.append(CopyTrade(
  219. coin=coin,
  220. action=action,
  221. size=copy_size,
  222. leverage=new_pos.leverage,
  223. original_trade_hash=trade_id,
  224. target_trader_address=self.target_address,
  225. timestamp=new_pos.timestamp
  226. ))
  227. # Check for position reductions
  228. elif self.state_manager.is_position_reduction(coin, position_data):
  229. tracked_pos = self.state_manager.get_tracked_positions()[coin]
  230. size_decrease = tracked_pos['size'] - new_pos.size
  231. action = f"reduce_{new_pos.side}"
  232. trade_id = f"{coin}_{action}_{new_pos.timestamp}"
  233. if not self.state_manager.has_copied_trade(trade_id):
  234. trades.append(CopyTrade(
  235. coin=coin,
  236. action=action,
  237. size=size_decrease,
  238. leverage=new_pos.leverage,
  239. original_trade_hash=trade_id,
  240. target_trader_address=self.target_address,
  241. timestamp=new_pos.timestamp
  242. ))
  243. self.logger.info(f"📉 Detected position decrease: {action} {size_decrease} {coin}")
  244. # Update tracking regardless
  245. self.state_manager.update_tracked_position(coin, position_data)
  246. # Check for closed positions (exits)
  247. tracked_positions = self.state_manager.get_tracked_positions()
  248. for coin in list(tracked_positions.keys()):
  249. if coin not in new_positions:
  250. # Position fully closed
  251. tracked_pos = tracked_positions[coin]
  252. action = f"close_{tracked_pos['side']}"
  253. trade_id = f"{coin}_{action}_{int(time.time() * 1000)}"
  254. if not self.state_manager.has_copied_trade(trade_id):
  255. trades.append(CopyTrade(
  256. coin=coin,
  257. action=action,
  258. size=tracked_pos['size'],
  259. leverage=tracked_pos['leverage'],
  260. original_trade_hash=trade_id,
  261. target_trader_address=self.target_address,
  262. timestamp=int(time.time() * 1000)
  263. ))
  264. self.logger.info(f"❌ Detected position closure: {action} {tracked_pos['size']} {coin}")
  265. # Remove from tracking
  266. self.state_manager.remove_tracked_position(coin)
  267. # Update last check time
  268. self.state_manager.update_last_check()
  269. return trades
  270. async def execute_copy_trade(self, trade: CopyTrade):
  271. """Execute a copy trade"""
  272. try:
  273. # Check if we've already copied this trade
  274. if self.state_manager.has_copied_trade(trade.original_trade_hash):
  275. self.logger.debug(f"Skipping already copied trade: {trade.original_trade_hash}")
  276. return
  277. # Calculate our position size
  278. our_size = await self.calculate_position_size(trade)
  279. if our_size < self.min_position_size:
  280. self.logger.info(f"Skipping {trade.coin} trade - size too small: ${our_size:.2f}")
  281. # Still mark as copied to avoid retrying
  282. self.state_manager.add_copied_trade(trade.original_trade_hash)
  283. return
  284. # Apply leverage limit
  285. leverage = min(trade.leverage, self.max_leverage)
  286. # Add execution delay
  287. await asyncio.sleep(self.execution_delay)
  288. # Execute the trade
  289. success = await self._execute_hyperliquid_trade(trade, our_size, leverage)
  290. # Mark trade as copied (whether successful or not to avoid retrying)
  291. self.state_manager.add_copied_trade(trade.original_trade_hash)
  292. # Send notification
  293. if self.notifications_enabled:
  294. status = "✅ SUCCESS" if success else "❌ FAILED"
  295. await self.notification_manager.send_message(
  296. f"🔄 Copy Trade {status}\n"
  297. f"Action: {trade.action}\n"
  298. f"Asset: {trade.coin}\n"
  299. f"Size: ${our_size:.2f}\n"
  300. f"Leverage: {leverage}x\n"
  301. f"Target: {trade.target_trader_address[:10]}...\n"
  302. f"Trade ID: {trade.original_trade_hash[:16]}..."
  303. )
  304. except Exception as e:
  305. self.logger.error(f"Error executing copy trade for {trade.coin}: {e}")
  306. # Mark as copied even on error to avoid infinite retries
  307. self.state_manager.add_copied_trade(trade.original_trade_hash)
  308. if self.notifications_enabled:
  309. await self.notification_manager.send_message(
  310. f"❌ Copy Trade Error\n"
  311. f"Asset: {trade.coin}\n"
  312. f"Action: {trade.action}\n"
  313. f"Error: {str(e)[:100]}\n"
  314. f"Trade ID: {trade.original_trade_hash[:16]}..."
  315. )
  316. async def calculate_position_size(self, trade: CopyTrade) -> float:
  317. """Calculate our position size based on the copy trading mode"""
  318. try:
  319. # Get our current account balance
  320. our_balance = await self.get_our_account_balance()
  321. if self.copy_mode == 'FIXED':
  322. # Fixed percentage of our account
  323. return our_balance * self.portfolio_percentage
  324. elif self.copy_mode == 'PROPORTIONAL':
  325. # Get target trader's account balance
  326. target_balance = await self.get_target_account_balance()
  327. if target_balance <= 0:
  328. return our_balance * self.portfolio_percentage
  329. # Calculate target trader's position percentage
  330. target_pos = self.target_positions.get(trade.coin)
  331. if not target_pos:
  332. return our_balance * self.portfolio_percentage
  333. target_position_percentage = target_pos.margin_used / target_balance
  334. # Apply same percentage to our account
  335. our_position_size = our_balance * target_position_percentage
  336. # Cap at our portfolio percentage limit
  337. max_size = our_balance * self.portfolio_percentage
  338. return min(our_position_size, max_size)
  339. else:
  340. return our_balance * self.portfolio_percentage
  341. except Exception as e:
  342. self.logger.error(f"Error calculating position size: {e}")
  343. # Fallback to fixed percentage
  344. our_balance = await self.get_our_account_balance()
  345. return our_balance * self.portfolio_percentage
  346. async def get_our_account_balance(self) -> float:
  347. """Get our account balance"""
  348. try:
  349. balance_info = self.client.get_balance()
  350. if balance_info:
  351. return float(balance_info.get('accountValue', 0))
  352. else:
  353. return 0.0
  354. except Exception as e:
  355. self.logger.error(f"Error getting our account balance: {e}")
  356. return 0.0
  357. async def get_target_account_balance(self) -> float:
  358. """Get target trader's account balance"""
  359. try:
  360. payload = {
  361. "type": "clearinghouseState",
  362. "user": self.target_address
  363. }
  364. async with aiohttp.ClientSession() as session:
  365. async with session.post(self.info_url, json=payload) as response:
  366. if response.status == 200:
  367. data = await response.json()
  368. return float(data.get('marginSummary', {}).get('accountValue', 0))
  369. else:
  370. return 0.0
  371. except Exception as e:
  372. self.logger.error(f"Error getting target account balance: {e}")
  373. return 0.0
  374. async def _execute_hyperliquid_trade(self, trade: CopyTrade, size: float, leverage: float) -> bool:
  375. """Execute trade on Hyperliquid"""
  376. try:
  377. # Determine if this is a buy or sell order
  378. is_buy = 'long' in trade.action or ('close' in trade.action and 'short' in trade.action)
  379. # For position opening/closing
  380. if 'open' in trade.action:
  381. # Open new position
  382. result = await self.client.place_order(
  383. symbol=trade.coin,
  384. side='buy' if is_buy else 'sell',
  385. size=size,
  386. leverage=leverage,
  387. order_type='market'
  388. )
  389. elif 'close' in trade.action:
  390. # Close existing position
  391. result = await self.client.close_position(
  392. symbol=trade.coin,
  393. size=size
  394. )
  395. elif 'add' in trade.action:
  396. # Add to existing position
  397. result = await self.client.place_order(
  398. symbol=trade.coin,
  399. side='buy' if is_buy else 'sell',
  400. size=size,
  401. leverage=leverage,
  402. order_type='market'
  403. )
  404. elif 'reduce' in trade.action:
  405. # Reduce existing position
  406. result = await self.client.place_order(
  407. symbol=trade.coin,
  408. side='sell' if 'long' in trade.action else 'buy',
  409. size=size,
  410. order_type='market'
  411. )
  412. else:
  413. self.logger.error(f"Unknown trade action: {trade.action}")
  414. return False
  415. if result and result.get('success'):
  416. self.logger.info(f"Successfully executed copy trade: {trade.action} {size} {trade.coin}")
  417. return True
  418. else:
  419. self.logger.error(f"Failed to execute copy trade: {result}")
  420. return False
  421. except Exception as e:
  422. self.logger.error(f"Error executing Hyperliquid trade: {e}")
  423. return False
  424. async def sync_positions(self):
  425. """Sync our current positions with tracking"""
  426. try:
  427. # Get our current positions
  428. positions = self.client.get_positions()
  429. if positions:
  430. self.our_positions = {pos['symbol']: pos for pos in positions}
  431. else:
  432. self.our_positions = {}
  433. # Get target positions for initial sync
  434. self.target_positions = await self.get_target_positions() or {}
  435. self.logger.info(f"Synced positions - Target: {len(self.target_positions)}, Ours: {len(self.our_positions)}")
  436. except Exception as e:
  437. self.logger.error(f"Error syncing positions: {e}")
  438. async def stop_monitoring(self):
  439. """Stop copy trading monitoring"""
  440. self.enabled = False
  441. self.state_manager.stop_copy_trading()
  442. self.logger.info("Copy trading monitor stopped")
  443. if self.notifications_enabled:
  444. session_info = self.state_manager.get_session_info()
  445. duration_str = ""
  446. if session_info['session_duration_seconds']:
  447. duration_hours = session_info['session_duration_seconds'] / 3600
  448. duration_str = f"\nSession duration: {duration_hours:.1f} hours"
  449. await self.notification_manager.send_message(
  450. f"🛑 Copy Trading Stopped\n"
  451. f"📊 Tracked positions: {session_info['tracked_positions_count']}\n"
  452. f"🔄 Copied trades: {session_info['copied_trades_count']}"
  453. + duration_str +
  454. f"\n\n💾 State saved - can resume later"
  455. )
  456. def get_status(self) -> Dict[str, Any]:
  457. """Get current copy trading status"""
  458. session_info = self.state_manager.get_session_info()
  459. return {
  460. 'enabled': self.enabled and self.state_manager.is_enabled(),
  461. 'target_address': self.target_address,
  462. 'portfolio_percentage': self.portfolio_percentage,
  463. 'copy_mode': self.copy_mode,
  464. 'max_leverage': self.max_leverage,
  465. 'target_positions': len(self.target_positions),
  466. 'our_positions': len(self.our_positions),
  467. 'tracked_positions': session_info['tracked_positions_count'],
  468. 'copied_trades': session_info['copied_trades_count'],
  469. 'session_start_time': session_info['start_time'],
  470. 'session_duration_hours': session_info['session_duration_seconds'] / 3600 if session_info['session_duration_seconds'] else None,
  471. 'last_check': session_info['last_check_time']
  472. }