Przeglądaj źródła

Enhance MarketMonitor with improved caching and state comparison logic.

- Updated caching mechanism to use more descriptive variable names for fresh positions and orders.
- Added logic to compare current exchange state with last known positions and orders, logging changes for better visibility.
- Implemented migration of trade statistics to aggregated stats during various lifecycle events, improving data management.
- Enhanced error handling and logging for better debugging and operational insights.
Carles Sentis 2 dni temu
rodzic
commit
319ea7eec7
2 zmienionych plików z 41 dodań i 13 usunięć
  1. 40 12
      src/monitoring/market_monitor.py
  2. 1 1
      trading_bot.py

+ 40 - 12
src/monitoring/market_monitor.py

@@ -215,22 +215,40 @@ class MarketMonitor:
         """🆕 Continuously update cached exchange data every heartbeat."""
         try:
             # Fetch fresh data from exchange
-            fresh_positions = self.trading_engine.get_positions() or []
-            fresh_orders = self.trading_engine.get_orders() or []
+            fresh_positions_list = self.trading_engine.get_positions() or []
+            fresh_orders_list = self.trading_engine.get_orders() or []
             fresh_balance = self.trading_engine.get_balance()
             
-            # Update cache
-            self.cached_positions = fresh_positions
-            self.cached_orders = fresh_orders
+            # Update primary cache immediately
+            self.cached_positions = fresh_positions_list
+            self.cached_orders = fresh_orders_list
             self.cached_balance = fresh_balance
             self.last_cache_update = datetime.now(timezone.utc)
             
-            logger.debug(f"🔄 Updated cache: {len(fresh_positions)} positions, {len(fresh_orders)} orders")
+            logger.debug(f"🔄 Fetched fresh cache: {len(fresh_positions_list)} positions, {len(fresh_orders_list)} orders")
+
+            # Prepare current state maps for comparison and for updating last_known state
+            current_exchange_position_map = {
+                pos.get('symbol'): pos for pos in fresh_positions_list
+                if pos.get('symbol') and abs(float(pos.get('contracts', 0))) > 1e-9
+            }
+            current_exchange_order_ids = {order.get('id') for order in fresh_orders_list if order.get('id')}
+
+            # Log changes by comparing with the state from the end of the previous cycle
+            if len(current_exchange_position_map) != len(self.last_known_positions):
+                logger.info(f"📊 Position count changed: {len(self.last_known_positions)} → {len(current_exchange_position_map)}")
+            
+            if len(current_exchange_order_ids) != len(self.last_known_orders):
+                logger.info(f"📋 Order count changed: {len(self.last_known_orders)} → {len(current_exchange_order_ids)}")
 
+            # Update last_known_xxx to the current exchange state for the *next* cycle's comparison
+            self.last_known_positions = current_exchange_position_map
+            self.last_known_orders = current_exchange_order_ids
+            
             # 💹 Update unrealized P&L and mark price in DB for open positions
             stats = self.trading_engine.get_stats()
-            if stats and fresh_positions:
-                for ex_pos in fresh_positions:
+            if stats and fresh_positions_list:
+                for ex_pos in fresh_positions_list:
                     symbol = ex_pos.get('symbol')
                     if not symbol:
                         continue
@@ -318,11 +336,11 @@ class MarketMonitor:
                         )
             
             # 🆕 Detect immediate changes for responsive notifications
-            if len(fresh_positions) != len(self.last_known_positions):
-                logger.info(f"📊 Position count changed: {len(self.last_known_positions)} → {len(fresh_positions)}")
+            if len(fresh_positions_list) != len(self.last_known_positions):
+                logger.info(f"📊 Position count changed: {len(self.last_known_positions)} → {len(current_exchange_position_map)}")
             
-            if len(fresh_orders) != len(self.last_known_orders):
-                logger.info(f"📋 Order count changed: {len(self.last_known_orders)} → {len(fresh_orders)}")
+            if len(fresh_orders_list) != len(self.last_known_orders):
+                logger.info(f"📋 Order count changed: {len(self.last_known_orders)} → {len(current_exchange_order_ids)}")
             
         except Exception as e:
             logger.error(f"❌ Error updating cached data: {e}")
@@ -777,6 +795,8 @@ class MarketMonitor:
                                             full_symbol, side_from_fill, amount_from_fill, price_from_fill,
                                             action_type, timestamp_dt.isoformat()
                                         )
+                                    # MIGRATE STATS
+                                    stats._migrate_trade_to_aggregated_stats(lc_id)
                                     if is_direct_sl_tp_fill and exchange_order_id_from_fill:
                                         order_db = stats.get_order_by_exchange_id(exchange_order_id_from_fill)
                                         if order_db:
@@ -806,6 +826,8 @@ class MarketMonitor:
                                             stop_loss_info, full_symbol, side_from_fill, amount_from_fill, price_from_fill, 
                                             f'{lc_pos_side}_closed_external_sl', timestamp_dt.isoformat(), realized_pnl
                                         )
+                                    # MIGRATE STATS
+                                    stats._migrate_trade_to_aggregated_stats(lc_id)
                                     del self.external_stop_losses[exchange_order_id_from_fill]
                                     fill_processed_this_iteration = True
                             else:
@@ -889,6 +911,8 @@ class MarketMonitor:
                                                 f"verified_external_{lc_position_side}_close",
                                                 timestamp_dt.isoformat()
                                             )
+                                        # MIGRATE STATS
+                                        stats._migrate_trade_to_aggregated_stats(lc_id)
                                         fill_processed_this_iteration = True
                                     else:
                                         logger.error(f"❌ Failed to close lifecycle {lc_id} via verified external fill {trade_id}.")
@@ -1821,6 +1845,8 @@ class MarketMonitor:
                     if success:
                         closed_due_to_discrepancy += 1
                         logger.info(f"✅ AUTO-SYNC (Discrepancy): Successfully closed bot lifecycle {lc_id} for {symbol}.")
+                        # MIGRATE STATS
+                        stats._migrate_trade_to_aggregated_stats(lc_id)
                         if self.notification_manager:
                             pnl_emoji = "🟢" if realized_pnl >= 0 else "🔴"
                             # formatter is already defined in the outer scope of _auto_sync_orphaned_positions
@@ -2047,6 +2073,8 @@ class MarketMonitor:
                         if success_close:
                             closed_due_to_discrepancy_startup += 1
                             logger.info(f"✅ STARTUP (Discrepancy): Successfully closed bot lifecycle {lc_id} for {symbol}.")
+                            # MIGRATE STATS
+                            stats._migrate_trade_to_aggregated_stats(lc_id)
                             if self.notification_manager:
                                 pnl_emoji = "🟢" if realized_pnl >= 0 else "🔴"
                                 notification_text = (

+ 1 - 1
trading_bot.py

@@ -14,7 +14,7 @@ from datetime import datetime
 from pathlib import Path
 
 # Bot version
-BOT_VERSION = "2.2.130"
+BOT_VERSION = "2.2.131"
 
 # Add src directory to Python path
 sys.path.insert(0, str(Path(__file__).parent / "src"))