Просмотр исходного кода

Implement state management for MarketMonitor - Added functionality to load and save the market monitor's state to a JSON file, ensuring persistence of the last processed trade time. Enhanced timestamp handling for incoming trades to support various formats and ensure UTC consistency. Improved logging for state loading and saving processes to aid in debugging and monitoring.

Carles Sentis 4 дней назад
Родитель
Сommit
a10e7c34aa
1 измененных файлов с 77 добавлено и 25 удалено
  1. 77 25
      src/monitoring/market_monitor.py

+ 77 - 25
src/monitoring/market_monitor.py

@@ -5,8 +5,10 @@ Market Monitor - Handles external trade monitoring and heartbeat functionality.
 
 import logging
 import asyncio
-from datetime import datetime, timedelta
+from datetime import datetime, timedelta, timezone
 from typing import Optional, Dict, Any, List
+import os
+import json
 
 from src.config.config import Config
 from src.monitoring.alarm_manager import AlarmManager
@@ -23,7 +25,8 @@ class MarketMonitor:
         self._monitor_task = None
         
         # External trade monitoring
-        self.last_processed_trade_time = None
+        self.state_file = "data/market_monitor_state.json"
+        self.last_processed_trade_time: Optional[datetime] = None
         
         # Alarm management
         self.alarm_manager = AlarmManager()
@@ -35,6 +38,8 @@ class MarketMonitor:
         # Notification manager (will be set by the core bot)
         self.notification_manager = None
         
+        self._load_state()
+        
     def set_notification_manager(self, notification_manager):
         """Set the notification manager for sending alerts."""
         self.notification_manager = notification_manager
@@ -67,8 +72,47 @@ class MarketMonitor:
             except asyncio.CancelledError:
                 pass
         
+        self._save_state()
         logger.info("🛑 Market monitor stopped")
     
+    def _load_state(self):
+        """Load market monitor state from disk."""
+        try:
+            if os.path.exists(self.state_file):
+                with open(self.state_file, 'r') as f:
+                    state_data = json.load(f)
+                
+                last_time_str = state_data.get('last_processed_trade_time')
+                if last_time_str:
+                    self.last_processed_trade_time = datetime.fromisoformat(last_time_str)
+                    logger.info(f"🔄 Loaded MarketMonitor state: last_processed_trade_time = {self.last_processed_trade_time.isoformat()}")
+                else:
+                    logger.info("🔄 MarketMonitor state file found, but no last_processed_trade_time.")
+            else:
+                logger.info("💨 No MarketMonitor state file found. Will start with fresh external trade tracking.")
+        except Exception as e:
+            logger.error(f"Error loading MarketMonitor state from {self.state_file}: {e}. Proceeding with default state.")
+            self.last_processed_trade_time = None
+
+    def _save_state(self):
+        """Save market monitor state to disk."""
+        try:
+            # Ensure the data directory exists
+            data_dir = os.path.dirname(self.state_file)
+            if data_dir and not os.path.exists(data_dir):
+                os.makedirs(data_dir)
+                logger.info(f"Created data directory for MarketMonitor state: {data_dir}")
+
+            state_data = {}
+            if self.last_processed_trade_time:
+                state_data['last_processed_trade_time'] = self.last_processed_trade_time.isoformat()
+            
+            with open(self.state_file, 'w') as f:
+                json.dump(state_data, f, indent=2)
+            logger.info(f"💾 Saved MarketMonitor state to {self.state_file}")
+        except Exception as e:
+            logger.error(f"Error saving MarketMonitor state to {self.state_file}: {e}")
+    
     async def _initialize_tracking(self):
         """Initialize order and position tracking."""
         try:
@@ -269,35 +313,41 @@ class MarketMonitor:
             if not recent_fills:
                 return
             
-            # Initialize last processed time if first run
+            # Initialize last processed time if it's still None after loading state (first run or bad state file)
             if self.last_processed_trade_time is None:
-                # Set to current time minus 1 hour to catch recent activity
-                self.last_processed_trade_time = datetime.now() - timedelta(hours=1)
+                logger.info("No last_processed_trade_time found, setting to 1 hour ago (UTC).")
+                # Set to current time minus 1 hour to catch recent activity, ensure UTC
+                self.last_processed_trade_time = datetime.now(timezone.utc) - timedelta(hours=1)
             
-            # Filter for new trades since last check
             new_trades = []
-            latest_trade_time = self.last_processed_trade_time
+            # Ensure latest_trade_time starts with a valid datetime object for comparison
+            latest_trade_time = self.last_processed_trade_time 
             
             for fill in recent_fills:
-                fill_time = fill.get('timestamp')
-                if fill_time:
-                    # Convert timestamps to comparable format
+                fill_time_data = fill.get('timestamp') # Renamed for clarity
+                if fill_time_data:
+                    fill_datetime_utc: Optional[datetime] = None
                     try:
-                        # Convert fill_time to datetime object for comparison
-                        if isinstance(fill_time, (int, float)):
-                            # Assume it's a unix timestamp
-                            fill_datetime = datetime.fromtimestamp(fill_time / 1000 if fill_time > 1e10 else fill_time)
+                        if isinstance(fill_time_data, (int, float)):
+                            # Assume it's a unix timestamp (milliseconds for Hyperliquid)
+                            fill_datetime_utc = datetime.fromtimestamp(fill_time_data / 1000, timezone.utc)
+                        elif isinstance(fill_time_data, str):
+                            # Try to parse as ISO string, ensure UTC
+                            dt_obj = datetime.fromisoformat(fill_time_data.replace('Z', '+00:00'))
+                            if dt_obj.tzinfo is None: # If somehow still naive
+                                fill_datetime_utc = dt_obj.replace(tzinfo=timezone.utc)
+                            else:
+                                fill_datetime_utc = dt_obj.astimezone(timezone.utc) # Convert to UTC if different tz
                         else:
-                            # Try to parse as ISO string
-                            fill_datetime = datetime.fromisoformat(str(fill_time).replace('Z', '+00:00'))
-                        
-                        # Compare datetime objects
-                        if fill_datetime > self.last_processed_trade_time:
+                            logger.warning(f"⚠️ Unknown timestamp format for {fill_time_data}")
+                            continue
+
+                        if fill_datetime_utc and self.last_processed_trade_time and fill_datetime_utc > self.last_processed_trade_time:
                             new_trades.append(fill)
-                            if fill_datetime > latest_trade_time:
-                                latest_trade_time = fill_datetime
+                            if latest_trade_time is None or fill_datetime_utc > latest_trade_time: # Ensure latest_trade_time is updated
+                                latest_trade_time = fill_datetime_utc
                     except Exception as timestamp_error:
-                        logger.warning(f"⚠️ Error processing timestamp {fill_time}: {timestamp_error}")
+                        logger.warning(f"⚠️ Error processing timestamp {fill_time_data}: {timestamp_error}")
                         continue
             
             if not new_trades:
@@ -306,7 +356,7 @@ class MarketMonitor:
             # Process new trades
             for trade in new_trades:
                 # Log trade processing for debugging
-                trade_id = trade.get('id', 'external')
+                trade_id = trade.get('id', 'external') # Use a default if 'id' is missing
                 symbol = trade.get('symbol', 'Unknown')
                 side = trade.get('side', 'Unknown')
                 amount = trade.get('amount', 0)
@@ -316,8 +366,10 @@ class MarketMonitor:
                 
                 await self._process_external_trade(trade)
             
-            # Update last processed time (keep as datetime object)
-            self.last_processed_trade_time = latest_trade_time
+            # Update last processed time only if new trades were found and processed
+            if new_trades and latest_trade_time and latest_trade_time != self.last_processed_trade_time:
+                self.last_processed_trade_time = latest_trade_time
+                self._save_state() # Save state after updating the time
             
             if new_trades:
                 logger.info(f"📊 Processed {len(new_trades)} external trades")