浏览代码

Refactor MarketMonitor to enhance external trade processing - Updated the method for checking external trades to utilize the TradingEngine's recent fills retrieval. Improved timestamp handling and state management by loading the last processed trade time from the database. Enhanced logging for better debugging and added a safety net for activating pending stop losses based on recent fills, ensuring robust trade management.

Carles Sentis 4 天之前
父节点
当前提交
2da0a71b51
共有 2 个文件被更改,包括 161 次插入183 次删除
  1. 155 181
      src/monitoring/market_monitor.py
  2. 6 2
      src/trading/trading_engine.py

+ 155 - 181
src/monitoring/market_monitor.py

@@ -426,202 +426,176 @@ class MarketMonitor:
     async def _check_external_trades(self):
         """Check for trades made outside the Telegram bot and update stats."""
         try:
-            # Get recent fills from Hyperliquid
-            recent_fills = self.trading_engine.client.get_recent_fills()
-            
+            # Get recent fills from exchange
+            recent_fills = self.trading_engine.get_recent_fills()
             if not recent_fills:
+                logger.debug("No recent fills data available")
                 return
-            
-            # Initialize last processed time if it's still None after loading state (first run or bad state file)
-            if self.last_processed_trade_time is None:
-                logger.info("No last_processed_trade_time found, setting to 1 hour ago (UTC).")
-                # Set to current time minus 1 hour to catch recent activity, ensure UTC
-                self.last_processed_trade_time = datetime.now(timezone.utc) - timedelta(hours=1)
-            
-            new_trades = []
-            # Ensure latest_trade_time starts with a valid datetime object for comparison
-            latest_trade_time = self.last_processed_trade_time 
-            
-            for fill in recent_fills:
-                fill_time_data = fill.get('timestamp') # Renamed for clarity
-                processed_fill_datetime_utc_iso: Optional[str] = None # For storing the processed ISO timestamp
 
-                if fill_time_data:
-                    fill_datetime_utc: Optional[datetime] = None
-                    try:
-                        if isinstance(fill_time_data, (int, float)):
-                            # Assume it's a unix timestamp (milliseconds for Hyperliquid)
-                            fill_datetime_utc = datetime.fromtimestamp(fill_time_data / 1000, timezone.utc)
-                        elif isinstance(fill_time_data, str):
-                            # Try to parse as ISO string, ensure UTC
-                            dt_obj = datetime.fromisoformat(fill_time_data.replace('Z', '+00:00'))
-                            if dt_obj.tzinfo is None: # If somehow still naive
-                                fill_datetime_utc = dt_obj.replace(tzinfo=timezone.utc)
-                            else:
-                                fill_datetime_utc = dt_obj.astimezone(timezone.utc) # Convert to UTC if different tz
-                        else:
-                            logger.warning(f"⚠️ Unknown timestamp format for {fill_time_data}")
-                            continue
-                        
-                        if fill_datetime_utc:
-                            processed_fill_datetime_utc_iso = fill_datetime_utc.isoformat()
+            # Get last processed timestamp from database
+            if not hasattr(self, '_last_processed_trade_time') or self._last_processed_trade_time is None:
+                try:
+                    last_time_str = self.trading_engine.stats._get_metadata('last_processed_trade_time')
+                    if last_time_str:
+                        self._last_processed_trade_time = datetime.fromisoformat(last_time_str)
+                        logger.debug(f"Loaded last_processed_trade_time from DB: {self._last_processed_trade_time}")
+                    else:
+                        # If no last processed time, start from 1 hour ago to avoid processing too much history
+                        self._last_processed_trade_time = datetime.now(timezone.utc) - timedelta(hours=1)
+                        logger.info("No last_processed_trade_time found, setting to 1 hour ago (UTC).")
+                except Exception as e:
+                    logger.warning(f"Could not load last_processed_trade_time from DB: {e}")
+                    self._last_processed_trade_time = datetime.now(timezone.utc) - timedelta(hours=1)
 
-                        if fill_datetime_utc and self.last_processed_trade_time and fill_datetime_utc > self.last_processed_trade_time:
-                            # Store the ISO formatted timestamp directly in the fill dict for later use
-                            fill['iso_timestamp'] = processed_fill_datetime_utc_iso 
-                            new_trades.append(fill)
-                            if latest_trade_time is None or fill_datetime_utc > latest_trade_time: # Ensure latest_trade_time is updated
-                                latest_trade_time = fill_datetime_utc
-                    except Exception as timestamp_error:
-                        logger.warning(f"⚠️ Error processing timestamp {fill_time_data}: {timestamp_error}")
-                        continue
-            
-            if not new_trades:
-                return
-            
-            # Process new trades
-            for trade in new_trades:
-                # Log trade processing for debugging
-                trade_id = trade.get('id', 'external') # Use a default if 'id' is missing
-                symbol = trade.get('symbol', 'Unknown')
-                side = trade.get('side', 'Unknown')
-                amount = trade.get('amount', 0)
-                price = trade.get('price', 0)
-                
-                logger.info(f"🔍 Processing external trade: {trade_id} - {side} {amount} {symbol} @ ${price}")
-                
-                await self._process_external_trade(trade)
-            
-            # Update last processed time only if new trades were found and processed
-            if new_trades and latest_trade_time and latest_trade_time != self.last_processed_trade_time:
-                self.last_processed_trade_time = latest_trade_time
-                self._save_state() # Save state after updating the time
+            # Process new fills
+            external_trades_processed = 0
+            symbols_with_fills = set()  # Track symbols that had fills for stop loss activation
             
-            if new_trades:
-                logger.info(f"📊 Processed {len(new_trades)} external trades")
-                
-        except Exception as e:
-            logger.error(f"❌ Error checking external trades: {e}")
-    
-    async def _process_external_trade(self, trade: Dict[str, Any]):
-        """Process an individual external trade and determine if it's opening or closing a position."""
-        try:
-            # Extract trade information
-            symbol = trade.get('symbol', '')
-            side = trade.get('side', '')
-            amount = float(trade.get('amount', 0))
-            price = float(trade.get('price', 0))
-            trade_id = trade.get('id', 'external')
-            timestamp = trade.get('timestamp', '')
-            
-            if not all([symbol, side, amount, price]):
-                return
-            
-            # Hyperliquid fill object typically has 'roid' (Referral Order ID) or 'oid' for the actual order ID.
-            # Assuming 'oid' is the exchange order ID we might have stored.
-            exchange_order_id_from_fill = trade.get('oid') 
-
-            # Skip bot-generated trades to prevent double processing IF an order system isn't fully in place yet for them.
-            # However, with the new orders table, we WANT to process bot-generated fills to link them.
-            # The check for `trade_id in self.trading_engine.bot_trade_ids` might need re-evaluation or removal
-            # if bot_trade_ids are exchange_fill_ids and we want to link them to an order.
-            # For now, let's assume trade_id from get_recent_fills is the fill_id (Hyperliquid calls it 'tid').
-            # And exchange_order_id_from_fill is the one to link to orders table.
-
-            # Record trade in stats and get action type using enhanced tracking
-            stats = self.trading_engine.get_stats()
-            if stats:
-                linked_order_db_id = None
-                if exchange_order_id_from_fill:
-                    order_in_db = stats.get_order_by_exchange_id(exchange_order_id_from_fill)
-                    if order_in_db:
-                        linked_order_db_id = order_in_db['id']
-                        logger.info(f"🔗 Linking fill {trade_id} to order DB ID {linked_order_db_id} (Exchange Order ID: {exchange_order_id_from_fill})")
-                        # Update the order status and amount filled
-                        new_status_after_fill = order_in_db['status'] # Default to current
-                        current_filled = order_in_db.get('amount_filled', 0.0)
-                        requested_amount = order_in_db.get('amount_requested', 0.0)
-                        
-                        if abs((current_filled + amount) - requested_amount) < 1e-9: # Comparing floats
-                            new_status_after_fill = 'filled'
-                        elif (current_filled + amount) < requested_amount:
-                            new_status_after_fill = 'partially_filled'
-                        else: # Overfilled? Or issue with amounts. Log a warning.
-                            logger.warning(f"Order {linked_order_db_id} might be overfilled. Current: {current_filled}, Fill: {amount}, Requested: {requested_amount}")
-                            new_status_after_fill = 'filled' # Assume filled for now if it exceeds
-
-                        stats.update_order_status(order_db_id=linked_order_db_id, 
-                                                new_status=new_status_after_fill, 
-                                                amount_filled_increment=amount)
-                        
-                        # Check if this order is now fully filled and has pending stop losses to activate
-                        if new_status_after_fill == 'filled':
-                            await self._activate_pending_stop_losses(order_in_db, stats)
-                            
+            for fill in recent_fills:
+                try:
+                    # Parse fill data - CCXT format from fetch_my_trades
+                    trade_id = fill.get('id')  # CCXT uses 'id' for trade ID
+                    timestamp_ms = fill.get('timestamp')  # CCXT uses 'timestamp' (milliseconds)
+                    symbol = fill.get('symbol')  # CCXT uses 'symbol' in full format like 'LTC/USDC:USDC'
+                    side = fill.get('side')  # CCXT uses 'side' ('buy' or 'sell')
+                    amount = float(fill.get('amount', 0))  # CCXT uses 'amount'
+                    price = float(fill.get('price', 0))  # CCXT uses 'price'
+                    
+                    # Convert timestamp
+                    if timestamp_ms:
+                        timestamp_dt = datetime.fromtimestamp(timestamp_ms / 1000, tz=timezone.utc)
                     else:
-                        logger.info(f"ℹ️ Fill {trade_id} has Exchange Order ID {exchange_order_id_from_fill}, but no matching order found in DB. Processing as unlinked fill.")
-                
-                iso_timestamp_from_trade_dict = trade.get('iso_timestamp') # Use the correct variable name
-                if not iso_timestamp_from_trade_dict:
-                    logger.error(f"❌ Critical: iso_timestamp missing in trade data for fill {trade_id}. Aborting processing for this fill.")
-                    return
-
-                # Pass the ISO formatted timestamp and potentially linked order ID to stats
-                action_type = stats.record_trade_with_enhanced_tracking(
-                    symbol, side, amount, price, trade_id, # trade_id here is the fill ID (e.g. Hyperliquid 'tid')
-                    trade_type="external", # Or determine more specific type if possible
-                    timestamp=iso_timestamp_from_trade_dict, # Use the fetched iso_timestamp
-                    linked_order_table_id_to_link=linked_order_db_id
-                )
-                
-                # Handle position closures - cancel pending stop losses if position was closed
-                token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0]
-                if action_type in ['long_closed', 'short_closed', 'long_closed_and_short_opened', 'short_closed_and_long_opened']:
-                    logger.info(f"🔄 External trade resulted in position closure: {action_type} for {token}")
+                        timestamp_dt = datetime.now(timezone.utc)
                     
-                    # Cancel any pending stop losses for this symbol
-                    cancelled_sl_count = stats.cancel_pending_stop_losses_by_symbol(
-                        symbol=symbol,
-                        new_status='cancelled_external_position_close'
-                    )
+                    # Skip if already processed
+                    if timestamp_dt <= self._last_processed_trade_time:
+                        continue
                     
-                    if cancelled_sl_count > 0:
-                        logger.info(f"🛑 Cancelled {cancelled_sl_count} pending stop losses for {symbol} due to external position closure")
+                    # Process as external trade if we reach here
+                    if symbol and side and amount > 0 and price > 0:
+                        # Symbol is already in full format for CCXT
+                        full_symbol = symbol
+                        token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0]
                         
-                        if self.notification_manager:
-                            await self.notification_manager.send_generic_notification(
-                                f"🛑 <b>Stop Losses Cancelled</b>\n\n"
-                                f"Symbol: {token}\n"
-                                f"Cancelled: {cancelled_sl_count} stop loss(es)\n"
-                                f"Reason: Position closed externally\n"
-                                f"Action: {action_type.replace('_', ' ').title()}\n"
-                                f"Time: {datetime.now().strftime('%H:%M:%S')}"
+                        # Check if this might be a bot order fill by looking for exchange order ID
+                        # CCXT might have this in 'info' sub-object with the raw exchange data
+                        exchange_order_id_from_fill = None
+                        if 'info' in fill and isinstance(fill['info'], dict):
+                            # Look for Hyperliquid order ID in the raw response
+                            exchange_order_id_from_fill = fill['info'].get('oid')
+                        
+                        logger.info(f"🔍 Processing external trade: {trade_id} - {side} {amount} {full_symbol} @ ${price:.2f}")
+                        
+                        stats = self.trading_engine.stats
+                        if stats:
+                            linked_order_db_id = None
+                            if exchange_order_id_from_fill:
+                                order_in_db = stats.get_order_by_exchange_id(exchange_order_id_from_fill)
+                                if order_in_db:
+                                    linked_order_db_id = order_in_db['id']
+                                    logger.info(f"🔗 Linking fill {trade_id} to order DB ID {linked_order_db_id} (Exchange Order ID: {exchange_order_id_from_fill})")
+                                    # Update the order status and amount filled
+                                    new_status_after_fill = order_in_db['status'] # Default to current
+                                    current_filled = order_in_db.get('amount_filled', 0.0)
+                                    requested_amount = order_in_db.get('amount_requested', 0.0)
+                                    
+                                    if abs((current_filled + amount) - requested_amount) < 1e-9: # Comparing floats
+                                        new_status_after_fill = 'filled'
+                                    elif (current_filled + amount) < requested_amount:
+                                        new_status_after_fill = 'partially_filled'
+                                    else: # Overfilled? Or issue with amounts. Log a warning.
+                                        logger.warning(f"Order {linked_order_db_id} might be overfilled. Current: {current_filled}, Fill: {amount}, Requested: {requested_amount}")
+                                        new_status_after_fill = 'filled' # Assume filled for now if it exceeds
+
+                                    stats.update_order_status(order_db_id=linked_order_db_id, 
+                                                            new_status=new_status_after_fill, 
+                                                            amount_filled_increment=amount)
+                                    
+                                    # Check if this order is now fully filled and has pending stop losses to activate
+                                    if new_status_after_fill == 'filled':
+                                        await self._activate_pending_stop_losses(order_in_db, stats)
+                            
+                            # Record the trade in stats with enhanced tracking
+                            action_type = stats.record_trade_with_enhanced_tracking(
+                                full_symbol, side, amount, price, 
+                                exchange_fill_id=trade_id, trade_type="external",
+                                timestamp=timestamp_dt.isoformat(),
+                                linked_order_table_id_to_link=linked_order_db_id
                             )
-                
-                # Send enhanced notification based on action type
-                # For notification, we can use the iso_timestamp_from_trade_dict or original_timestamp_for_notification based on desired format
-                await self._send_enhanced_trade_notification(symbol, side, amount, price, action_type, iso_timestamp_from_trade_dict)
-                
-                logger.info(f"📋 Processed external trade: {side} {amount} {symbol} @ ${price} ({action_type}) using timestamp {iso_timestamp_from_trade_dict}")
+                            
+                            # Track symbol for potential stop loss activation
+                            symbols_with_fills.add(token)
+                            
+                            # Send notification for external trade
+                            if self.notification_manager:
+                                await self.notification_manager.send_external_trade_notification(
+                                    full_symbol, side, amount, price, action_type, timestamp_dt.isoformat()
+                                )
+                            
+                            logger.info(f"📋 Processed external trade: {side} {amount} {full_symbol} @ ${price:.2f} ({action_type}) using timestamp {timestamp_dt.isoformat()}")
+                            external_trades_processed += 1
+                            
+                            # Update last processed time
+                            self._last_processed_trade_time = timestamp_dt
+                        
+                except Exception as e:
+                    logger.error(f"Error processing fill {fill}: {e}")
+                    continue
             
+            # Save the last processed timestamp to database
+            if external_trades_processed > 0:
+                self.trading_engine.stats._set_metadata('last_processed_trade_time', self._last_processed_trade_time.isoformat())
+                logger.info(f"💾 Saved MarketMonitor state (last_processed_trade_time) to DB: {self._last_processed_trade_time.isoformat()}")
+                logger.info(f"📊 Processed {external_trades_processed} external trades")
+                
+                # Additional check: Activate any pending stop losses for symbols that had fills
+                # This is a safety net for cases where the fill linking above didn't work
+                await self._check_pending_stop_losses_for_filled_symbols(symbols_with_fills)
+
         except Exception as e:
-            logger.error(f"❌ Error processing external trade: {e}")
-    
-    async def _send_enhanced_trade_notification(self, symbol: str, side: str, amount: float, 
-                                                price: float, action_type: str, timestamp: str): # timestamp here is expected to be ISO for consistency
-        """Send enhanced notification for external trades."""
+            logger.error(f"❌ Error checking external trades: {e}", exc_info=True)
+
+    async def _check_pending_stop_losses_for_filled_symbols(self, symbols_with_fills: set):
+        """
+        Safety net: Check if any symbols that just had fills have pending stop losses
+        that should be activated. This handles cases where fill linking failed.
+        """
         try:
-            # Send through notification manager if available
-            if self.notification_manager:
-                await self.notification_manager.send_external_trade_notification(
-                    symbol, side, amount, price, action_type, timestamp
-                )
-            else:
-                # Fallback to logging if notification manager not available
-                logger.info(f"📢 External trade notification: {action_type} for {symbol.split('/')[0]}")
+            if not symbols_with_fills:
+                return
+                
+            stats = self.trading_engine.stats
+            if not stats:
+                return
+            
+            # Get all pending stop losses
+            pending_stop_losses = stats.get_orders_by_status('pending_trigger', 'stop_limit_trigger')
+            
+            if not pending_stop_losses:
+                return
             
+            # Check each pending stop loss to see if its symbol had fills
+            activated_any = False
+            for sl_order in pending_stop_losses:
+                symbol = sl_order.get('symbol', '')
+                token = symbol.split('/')[0] if '/' in symbol else ''
+                
+                if token in symbols_with_fills:
+                    # This symbol had fills - check if we should activate the stop loss
+                    parent_bot_ref_id = sl_order.get('parent_bot_order_ref_id')
+                    if parent_bot_ref_id:
+                        # Get the parent order
+                        parent_order = stats.get_order_by_bot_ref_id(parent_bot_ref_id)
+                        if parent_order and parent_order.get('status') in ['filled', 'partially_filled']:
+                            logger.info(f"🛑 Safety net activation: Found pending SL for {token} with filled parent order {parent_bot_ref_id}")
+                            await self._activate_pending_stop_losses(parent_order, stats)
+                            activated_any = True
+            
+            if activated_any:
+                logger.info("✅ Safety net activated pending stop losses for symbols with recent fills")
+                
         except Exception as e:
-            logger.error(f"❌ Error sending external trade notification: {e}")
+            logger.error(f"Error in safety net stop loss activation: {e}", exc_info=True)
     
     async def _check_pending_triggers(self):
         """Check and process pending conditional triggers (e.g., SL/TP)."""

+ 6 - 2
src/trading/trading_engine.py

@@ -50,13 +50,17 @@ class TradingEngine:
         return self.client.get_balance()
     
     def get_positions(self) -> Optional[List[Dict[str, Any]]]:
-        """Get current positions."""
+        """Get all positions."""
         return self.client.get_positions()
     
     def get_orders(self) -> Optional[List[Dict[str, Any]]]:
-        """Get open orders."""
+        """Get all open orders."""
         return self.client.get_open_orders()
     
+    def get_recent_fills(self) -> Optional[List[Dict[str, Any]]]:
+        """Get recent fills/trades."""
+        return self.client.get_recent_fills()
+    
     def get_market_data(self, symbol: str) -> Optional[Dict[str, Any]]:
         """Get market data for a symbol."""
         return self.client.get_market_data(symbol)