Przeglądaj źródła

Implement full async support for copy trading functionality

- Updated copy trading commands to enable monitoring with async fixes applied, improving responsiveness.
- Refactored the CopyTradingMonitor to utilize async methods for state management and position tracking, preventing blocking issues.
- Enhanced CopyTradingStateManager with async versions of key methods for better performance and state handling.
- Improved logging messages to reflect the active state of copy trading and monitoring, providing clearer feedback to users.
Carles Sentis 5 dni temu
rodzic
commit
3d6b898801

+ 7 - 8
src/commands/copy_trading_commands.py

@@ -418,20 +418,19 @@ This will:
                 await query.edit_message_text("🚀 Copy trading configuration updated...")
                 
                 if copy_monitor:
-                    # Enable copy trading config but DON'T start monitoring yet (Step 3 debugging)
+                    # Enable copy trading and start monitoring (with async fixes applied)
                     copy_monitor.enabled = True
                     copy_monitor.target_address = target_address
                     
-                    # TEMPORARILY DISABLED: Start monitoring in background (causes blocking)
-                    # import asyncio
-                    # asyncio.create_task(copy_monitor.start_monitoring())
+                    # Start monitoring in background (now with full async implementation)
+                    import asyncio
+                    asyncio.create_task(copy_monitor.start_monitoring())
                     
                     await query.edit_message_text(
-                        f"✅ Copy trading configured but not started!\n\n"
+                        f"✅ Copy trading started!\n\n"
                         f"🎯 Target: {target_address[:10]}...\n"
-                        f"⚠️ Monitoring temporarily disabled for debugging\n\n"
-                        f"The start_monitoring() method is causing blocking issues.\n"
-                        f"We need to debug this specific method.",
+                        f"📊 Monitoring active positions and new trades\n\n"
+                        f"Use /copy status to check progress.",
                         parse_mode='HTML'
                     )
                 else:

+ 15 - 13
src/monitoring/copy_trading_monitor.py

@@ -101,8 +101,8 @@ class CopyTradingMonitor:
         self.logger.info(f"Starting copy trading monitor for {self.target_address}")
         
         try:
-            # Start state tracking
-            self.state_manager.start_copy_trading(self.target_address)
+            # Start state tracking (using async version to prevent blocking)
+            await self.state_manager.start_copy_trading_async(self.target_address)
             
             # Get current target positions for initialization (with timeout)
             try:
@@ -122,7 +122,7 @@ class CopyTradingMonitor:
                 if not self.state_manager.get_tracked_positions():
                     # Fresh start - initialize tracking but don't copy existing positions
                     self.logger.info("🆕 Fresh start - initializing with existing positions (won't copy)")
-                    self.state_manager.initialize_tracked_positions(current_positions)
+                    await self.state_manager.initialize_tracked_positions_async(current_positions)
                     
                     startup_message = (
                         f"🔄 Copy Trading Started (Fresh)\n"
@@ -206,7 +206,7 @@ class CopyTradingMonitor:
                 
             # Compare with previous positions to detect changes
             try:
-                position_changes = self.detect_position_changes(new_positions)
+                position_changes = await self.detect_position_changes(new_positions)
             except Exception as e:
                 self.logger.error(f"Error detecting position changes: {e}")
                 return
@@ -226,6 +226,9 @@ class CopyTradingMonitor:
             # Update our tracking
             self.target_positions = new_positions
             
+            # Update last check timestamp (async version)
+            await self.state_manager.update_last_check_async()
+            
         except Exception as e:
             self.logger.error(f"Error in monitor cycle: {e}")
     
@@ -281,7 +284,7 @@ class CopyTradingMonitor:
             self.logger.error(f"Error getting target positions: {e}")
             return None
     
-    def detect_position_changes(self, new_positions: Dict[str, TraderPosition]) -> List[CopyTrade]:
+    async def detect_position_changes(self, new_positions: Dict[str, TraderPosition]) -> List[CopyTrade]:
         """Detect changes in target trader's positions using state manager"""
         trades = []
         
@@ -343,7 +346,7 @@ class CopyTradingMonitor:
                     self.logger.info(f"📉 Detected position decrease: {action} {size_decrease} {coin}")
             
             # Update tracking regardless
-            self.state_manager.update_tracked_position(coin, position_data)
+            await self.state_manager.update_tracked_position_async(coin, position_data)
         
         # Check for closed positions (exits)
         tracked_positions = self.state_manager.get_tracked_positions()
@@ -367,10 +370,9 @@ class CopyTradingMonitor:
                     self.logger.info(f"❌ Detected position closure: {action} {tracked_pos['size']} {coin}")
                 
                 # Remove from tracking
-                self.state_manager.remove_tracked_position(coin)
+                await self.state_manager.remove_tracked_position_async(coin)
         
-        # Update last check time
-        self.state_manager.update_last_check()
+        # Update last check time (already updated in monitor_cycle, so skip here)
         
         return trades
     
@@ -388,7 +390,7 @@ class CopyTradingMonitor:
             if our_size < self.min_position_size:
                 self.logger.info(f"Skipping {trade.coin} trade - size too small: ${our_size:.2f}")
                 # Still mark as copied to avoid retrying
-                self.state_manager.add_copied_trade(trade.original_trade_hash)
+                await self.state_manager.add_copied_trade_async(trade.original_trade_hash)
                 return
             
             # Apply leverage limit
@@ -401,7 +403,7 @@ class CopyTradingMonitor:
             success = await self._execute_hyperliquid_trade(trade, our_size, leverage)
             
             # Mark trade as copied (whether successful or not to avoid retrying)
-            self.state_manager.add_copied_trade(trade.original_trade_hash)
+            await self.state_manager.add_copied_trade_async(trade.original_trade_hash)
             
             # Send notification
             if self.notifications_enabled:
@@ -420,7 +422,7 @@ class CopyTradingMonitor:
             self.logger.error(f"Error executing copy trade for {trade.coin}: {e}")
             
             # Mark as copied even on error to avoid infinite retries
-            self.state_manager.add_copied_trade(trade.original_trade_hash)
+            await self.state_manager.add_copied_trade_async(trade.original_trade_hash)
             
             if self.notifications_enabled:
                 await self.notification_manager.send_message(
@@ -583,7 +585,7 @@ class CopyTradingMonitor:
     async def stop_monitoring(self):
         """Stop copy trading monitoring"""
         self.enabled = False
-        self.state_manager.stop_copy_trading()
+        await self.state_manager.stop_copy_trading_async()
         self.logger.info("Copy trading monitor stopped")
         
         if self.notifications_enabled:

+ 104 - 0
src/monitoring/copy_trading_state.py

@@ -168,6 +168,54 @@ class CopyTradingStateManager:
         except Exception as e:
             logger.error(f"Error saving copy trading state: {e}")
     
+    async def start_copy_trading_async(self, target_address: str) -> None:
+        """Start copy trading for a target address (async version)"""
+        # Ensure state is loaded first (using async version)
+        await self._ensure_initialized_async()
+        
+        current_time = int(time.time() * 1000)
+        
+        # If starting fresh or changing target, reset state
+        if (not self.state.enabled or 
+            self.state.target_address != target_address or
+            self.state.start_timestamp is None):
+            
+            logger.info(f"🚀 Starting fresh copy trading session for {target_address}")
+            self.state = CopyTradingState(
+                enabled=True,
+                target_address=target_address,
+                start_timestamp=current_time,
+                session_start_time=current_time,
+                tracked_positions={},
+                copied_trades=set()
+            )
+        else:
+            # Resuming existing session
+            logger.info(f"▶️ Resuming copy trading session for {target_address}")
+            self.state.enabled = True
+            self.state.session_start_time = current_time
+        
+        self.state.last_check_timestamp = current_time
+        await self.save_state_async()
+    
+    async def _ensure_initialized_async(self) -> None:
+        """Ensure state is loaded (async version)"""
+        if self._initialized:
+            return
+            
+        try:
+            # Ensure data directory exists
+            self.state_file.parent.mkdir(exist_ok=True)
+            
+            # Load existing state using async method
+            await self.load_state()
+            self._initialized = True
+            
+        except Exception as e:
+            logger.error(f"Error during async state initialization: {e}")
+            self.state = CopyTradingState()
+            self._initialized = True
+    
     def start_copy_trading(self, target_address: str) -> None:
         """Start copy trading for a target address"""
         # Ensure state is loaded first
@@ -207,6 +255,13 @@ class CopyTradingStateManager:
             self.save_state()
             logger.info("⏹️ Copy trading stopped (state preserved)")
     
+    async def stop_copy_trading_async(self) -> None:
+        """Stop copy trading but preserve state (async version)"""
+        self.state.enabled = False
+        self.state.last_check_timestamp = int(time.time() * 1000)
+        await self.save_state_async()
+        logger.info("⏹️ Copy trading stopped (state preserved)")
+    
     def should_copy_position(self, coin: str, position_data: Dict) -> bool:
         """Determine if we should copy a position"""
         self._ensure_initialized()
@@ -249,6 +304,17 @@ class CopyTradingStateManager:
             }
             self.save_state()
     
+    async def update_tracked_position_async(self, coin: str, position_data: Dict) -> None:
+        """Update our tracking of a position (async version)"""
+        self.state.tracked_positions[coin] = {
+            'size': position_data.get('size', 0),
+            'side': position_data.get('side'),
+            'entry_price': position_data.get('entry_price', 0),
+            'leverage': position_data.get('leverage', 1),
+            'last_updated': int(time.time() * 1000)
+        }
+        await self.save_state_async()
+    
     def remove_tracked_position(self, coin: str) -> None:
         """Remove a position from tracking (when closed)"""
         with self._lock:
@@ -257,6 +323,13 @@ class CopyTradingStateManager:
                 self.save_state()
                 logger.info(f"🗑️ Removed {coin} from tracked positions")
     
+    async def remove_tracked_position_async(self, coin: str) -> None:
+        """Remove a position from tracking (when closed) (async version)"""
+        if coin in self.state.tracked_positions:
+            del self.state.tracked_positions[coin]
+            await self.save_state_async()
+            logger.info(f"🗑️ Removed {coin} from tracked positions")
+    
     def add_copied_trade(self, trade_id: str) -> None:
         """Mark a trade as copied"""
         with self._lock:
@@ -268,6 +341,16 @@ class CopyTradingStateManager:
                     self.state.copied_trades.discard(trade_id)
             self.save_state()
     
+    async def add_copied_trade_async(self, trade_id: str) -> None:
+        """Mark a trade as copied (async version)"""
+        self.state.copied_trades.add(trade_id)
+        # Keep only last 1000 trade IDs to prevent unbounded growth
+        if len(self.state.copied_trades) > 1000:
+            oldest_trades = sorted(self.state.copied_trades)[:500]
+            for trade_id in oldest_trades:
+                self.state.copied_trades.discard(trade_id)
+        await self.save_state_async()
+    
     def has_copied_trade(self, trade_id: str) -> bool:
         """Check if we've already copied a trade"""
         self._ensure_initialized()
@@ -280,6 +363,11 @@ class CopyTradingStateManager:
             self.state.last_check_timestamp = int(time.time() * 1000)
             self.save_state()
     
+    async def update_last_check_async(self) -> None:
+        """Update the last check timestamp (async version)"""
+        self.state.last_check_timestamp = int(time.time() * 1000)
+        await self.save_state_async()
+    
     def initialize_tracked_positions(self, current_positions: Dict) -> None:
         """Initialize tracking with current target positions (on first start)"""
         with self._lock:
@@ -297,6 +385,22 @@ class CopyTradingStateManager:
             
             self.save_state()
     
+    async def initialize_tracked_positions_async(self, current_positions: Dict) -> None:
+        """Initialize tracking with current target positions (on first start) (async version)"""
+        logger.info(f"🔄 Initializing position tracking with {len(current_positions)} existing positions")
+        
+        for coin, position in current_positions.items():
+            self.state.tracked_positions[coin] = {
+                'size': position.size,
+                'side': position.side,
+                'entry_price': position.entry_price,
+                'leverage': position.leverage,
+                'last_updated': int(time.time() * 1000)
+            }
+            logger.info(f"   📊 Tracking existing {position.side.upper()} {coin}: {position.size}")
+        
+        await self.save_state_async()
+    
     def get_session_info(self) -> Dict[str, Any]:
         """Get information about current session"""
         with self._lock:

+ 9 - 9
src/monitoring/monitoring_coordinator.py

@@ -83,15 +83,15 @@ class MonitoringCoordinator:
             await self.risk_manager.start()
             # AlarmManager doesn't have start() method - it's always ready
             
-            # Start copy trading monitor if enabled (Step 2: Don't auto-start yet)
-            # if self.copy_trading_monitor and hasattr(self.copy_trading_monitor, 'enabled') and self.copy_trading_monitor.enabled:
-            #     try:
-            #         asyncio.create_task(self.copy_trading_monitor.start_monitoring())
-            #         logger.info("🔄 Copy trading monitor started (with non-blocking state manager)")
-            #     except Exception as e:
-            #         logger.error(f"❌ Failed to start copy trading monitor: {e}")
-            if self.copy_trading_monitor:
-                logger.info("🔄 Copy trading monitor initialized but not auto-started (Step 2 testing)")
+            # Start copy trading monitor if enabled (with async fixes applied)
+            if self.copy_trading_monitor and hasattr(self.copy_trading_monitor, 'enabled') and self.copy_trading_monitor.enabled:
+                try:
+                    asyncio.create_task(self.copy_trading_monitor.start_monitoring())
+                    logger.info("🔄 Copy trading monitor started (with full async implementation)")
+                except Exception as e:
+                    logger.error(f"❌ Failed to start copy trading monitor: {e}")
+            elif self.copy_trading_monitor:
+                logger.info("✅ Copy trading monitor initialized (ready for manual start)")
             
             # Initialize exchange order sync with trading stats
             self._init_exchange_order_sync()

+ 1 - 1
trading_bot.py

@@ -14,7 +14,7 @@ from datetime import datetime
 from pathlib import Path
 
 # Bot version
-BOT_VERSION = "3.0.328"
+BOT_VERSION = "3.0.329"
 
 # Add src directory to Python path
 sys.path.insert(0, str(Path(__file__).parent / "src"))