Forráskód Böngészése

Integrate market monitoring with trading engine for enhanced data caching - Connected MarketMonitor to TradingEngine for shared access to cached positions, orders, and balance data. Updated InfoCommands to utilize cached data for auto-syncing positions, improving efficiency and responsiveness. Enhanced logging for better traceability of data usage and cache updates.

Carles Sentis 4 napja
szülő
commit
e5c105fd5b

+ 3 - 0
src/bot/core.py

@@ -34,6 +34,9 @@ class TelegramTradingBot:
         self.notification_manager = NotificationManager()
         self.market_monitor = MarketMonitor(self.trading_engine, self.notification_manager)
         
+        # 🆕 WIRE UP: Connect market monitor to trading engine for cached data sharing
+        self.trading_engine.set_market_monitor(self.market_monitor)
+        
         # Initialize command handlers
         self.info_commands = InfoCommands(self.trading_engine)
         self.management_commands = ManagementCommands(self.trading_engine, self.market_monitor)

+ 16 - 1
src/commands/info_commands.py

@@ -113,7 +113,22 @@ class InfoCommands:
             return
         
         # 🆕 AUTO-SYNC: Check for positions on exchange that don't have trade lifecycle records
-        exchange_positions = self.trading_engine.get_positions() or []
+        # Use cached data from MarketMonitor if available (updated every heartbeat)
+        if (hasattr(self.trading_engine, 'market_monitor') and 
+            self.trading_engine.market_monitor and 
+            hasattr(self.trading_engine.market_monitor, 'get_cached_positions')):
+            
+            cache_age = self.trading_engine.market_monitor.get_cache_age_seconds()
+            if cache_age < 60:  # Use cached data if less than 1 minute old
+                exchange_positions = self.trading_engine.market_monitor.get_cached_positions() or []
+                logger.debug(f"Using cached positions for auto-sync (age: {cache_age:.1f}s)")
+            else:
+                exchange_positions = self.trading_engine.get_positions() or []
+                logger.debug("Using fresh API call for auto-sync (cache too old)")
+        else:
+            exchange_positions = self.trading_engine.get_positions() or []
+            logger.debug("Using fresh API call for auto-sync (no cache available)")
+        
         synced_positions = []
         
         for exchange_pos in exchange_positions:

+ 79 - 23
src/monitoring/market_monitor.py

@@ -19,27 +19,36 @@ class MarketMonitor:
     """Handles external trade monitoring and market events."""
     
     def __init__(self, trading_engine, notification_manager=None):
-        """Initialize the market monitor."""
+        """Initialize market monitor."""
         self.trading_engine = trading_engine
         self.notification_manager = notification_manager
-        self.client = trading_engine.client
-        self._last_processed_trade_time = datetime.now(timezone.utc) - timedelta(seconds=120)
         self._monitoring_active = False
+        self._monitor_task = None
         
-        # 🆕 External stop loss tracking
-        self._external_stop_loss_orders = {}  # Format: {exchange_order_id: {'token': str, 'trigger_price': float, 'side': str, 'detected_at': datetime}}
+        # Enhanced tracking for Phase 3+
+        self.last_known_orders = set()  # Set of order IDs we know about
+        self.last_known_positions = {}  # Dict mapping symbol -> position data
+        
+        # Price alarms tracking
+        self.price_alarms = {}  # Dict mapping alarm_id -> alarm_data
+        self.next_alarm_id = 1
+        
+        # External stop loss tracking
+        self.external_stop_losses = {}  # symbol -> {order_id, stop_price, side, amount}
+        
+        # 🆕 CONTINUOUS DATA CACHE: Keep bot state updated
+        self.cached_positions = []  # Fresh exchange positions
+        self.cached_orders = []     # Fresh exchange orders
+        self.cached_balance = None  # Fresh balance data
+        self.last_cache_update = None
         
         # External trade monitoring
-        # self.state_file = "data/market_monitor_state.json" # Removed, state now in DB
         self.last_processed_trade_time: Optional[datetime] = None
         
         # Alarm management
         self.alarm_manager = AlarmManager()
         
-        # Order monitoring
-        self.last_known_orders = set()
-        self.last_known_positions = {}
-        
+        # Load persistent state
         self._load_state()
         
     async def start(self):
@@ -156,6 +165,9 @@ class MarketMonitor:
         try:
             loop_count = 0
             while self._monitoring_active:
+                # 🆕 CONTINUOUS UPDATE: Cache fresh exchange data every heartbeat
+                await self._update_cached_data()
+                
                 # 🆕 PHASE 4: Check trades table for pending stop loss activation first (highest priority)
                 await self._activate_pending_stop_losses_from_trades()
                 
@@ -188,6 +200,50 @@ class MarketMonitor:
                 await asyncio.sleep(5)
                 await self._monitor_loop()
     
+    async def _update_cached_data(self):
+        """🆕 Continuously update cached exchange data every heartbeat."""
+        try:
+            # Fetch fresh data from exchange
+            fresh_positions = self.trading_engine.get_positions() or []
+            fresh_orders = self.trading_engine.get_orders() or []
+            fresh_balance = self.trading_engine.get_balance()
+            
+            # Update cache
+            self.cached_positions = fresh_positions
+            self.cached_orders = fresh_orders
+            self.cached_balance = fresh_balance
+            self.last_cache_update = datetime.now(timezone.utc)
+            
+            logger.debug(f"🔄 Updated cache: {len(fresh_positions)} positions, {len(fresh_orders)} orders")
+            
+            # 🆕 Detect immediate changes for responsive notifications
+            if len(fresh_positions) != len(self.last_known_positions):
+                logger.info(f"📊 Position count changed: {len(self.last_known_positions)} → {len(fresh_positions)}")
+            
+            if len(fresh_orders) != len(self.last_known_orders):
+                logger.info(f"📋 Order count changed: {len(self.last_known_orders)} → {len(fresh_orders)}")
+            
+        except Exception as e:
+            logger.error(f"❌ Error updating cached data: {e}")
+    
+    def get_cached_positions(self):
+        """Get cached positions (updated every heartbeat)."""
+        return self.cached_positions or []
+    
+    def get_cached_orders(self):
+        """Get cached orders (updated every heartbeat)."""
+        return self.cached_orders or []
+    
+    def get_cached_balance(self):
+        """Get cached balance (updated every heartbeat)."""
+        return self.cached_balance
+    
+    def get_cache_age_seconds(self):
+        """Get age of cached data in seconds."""
+        if not self.last_cache_update:
+            return float('inf')
+        return (datetime.now(timezone.utc) - self.last_cache_update).total_seconds()
+    
     async def _check_order_fills(self):
         """Check for filled orders and send notifications."""
         try:
@@ -517,9 +573,9 @@ class MarketMonitor:
                         # 🆕 Check if this fill corresponds to an external stop loss order
                         is_external_stop_loss = False
                         stop_loss_info = None
-                        if exchange_order_id_from_fill and exchange_order_id_from_fill in self._external_stop_loss_orders:
+                        if exchange_order_id_from_fill and exchange_order_id_from_fill in self.external_stop_losses:
                             is_external_stop_loss = True
-                            stop_loss_info = self._external_stop_loss_orders[exchange_order_id_from_fill]
+                            stop_loss_info = self.external_stop_losses[exchange_order_id_from_fill]
                             logger.info(f"🛑 EXTERNAL STOP LOSS EXECUTION: {token} - Order {exchange_order_id_from_fill} filled @ ${price:.2f}")
                         
                         logger.info(f"🔍 Processing {'external stop loss' if is_external_stop_loss else 'external trade'}: {trade_id} - {side} {amount} {full_symbol} @ ${price:.2f}")
@@ -555,7 +611,7 @@ class MarketMonitor:
                                 )
                             
                             # Remove from tracking since it's now executed
-                            del self._external_stop_loss_orders[exchange_order_id_from_fill]
+                            del self.external_stop_losses[exchange_order_id_from_fill]
                             
                             logger.info(f"🛑 Processed external stop loss execution: {side} {amount} {full_symbol} @ ${price:.2f} (long_closed)")
                         
@@ -1435,7 +1491,7 @@ class MarketMonitor:
                         continue
                         
                     # Skip if we're already tracking this order
-                    if exchange_order_id in self._external_stop_loss_orders:
+                    if exchange_order_id in self.external_stop_losses:
                         continue
                         
                     # Check if this order could be a stop loss
@@ -1462,7 +1518,7 @@ class MarketMonitor:
                     
                     if is_stop_loss:
                         # Track this as an external stop loss order
-                        self._external_stop_loss_orders[exchange_order_id] = {
+                        self.external_stop_losses[exchange_order_id] = {
                             'token': token,
                             'symbol': symbol,
                             'trigger_price': price,
@@ -1488,15 +1544,15 @@ class MarketMonitor:
     async def _cleanup_external_stop_loss_tracking(self):
         """Clean up external stop loss orders that are no longer active."""
         try:
-            if not self._external_stop_loss_orders:
+            if not self.external_stop_losses:
                 return
                 
             # Get current open orders
             open_orders = self.trading_engine.get_orders()
             if not open_orders:
                 # No open orders, clear all tracking
-                removed_count = len(self._external_stop_loss_orders)
-                self._external_stop_loss_orders.clear()
+                removed_count = len(self.external_stop_losses)
+                self.external_stop_losses.clear()
                 if removed_count > 0:
                     logger.info(f"🧹 Cleared {removed_count} external stop loss orders (no open orders)")
                 return
@@ -1506,13 +1562,13 @@ class MarketMonitor:
             
             # Remove any tracked stop loss orders that are no longer open
             to_remove = []
-            for order_id, stop_loss_info in self._external_stop_loss_orders.items():
+            for order_id, stop_loss_info in self.external_stop_losses.items():
                 if order_id not in current_order_ids:
                     to_remove.append(order_id)
                     
             for order_id in to_remove:
-                stop_loss_info = self._external_stop_loss_orders[order_id]
-                del self._external_stop_loss_orders[order_id]
+                stop_loss_info = self.external_stop_losses[order_id]
+                del self.external_stop_losses[order_id]
                 logger.info(f"🗑️ Removed external stop loss tracking for {stop_loss_info['token']} order {order_id} (no longer open)")
                 
             if to_remove:
@@ -1528,8 +1584,8 @@ class MarketMonitor:
             if not stats:
                 return
 
-            # Get current exchange positions
-            exchange_positions = self.trading_engine.get_positions() or []
+            # 🆕 Use cached exchange positions (already fresh from heartbeat update)
+            exchange_positions = self.cached_positions or []
             synced_count = 0
 
             for exchange_pos in exchange_positions:

+ 44 - 3
src/trading/trading_engine.py

@@ -24,6 +24,7 @@ class TradingEngine:
         """Initialize the trading engine."""
         self.client = HyperliquidClient()
         self.stats = None
+        self.market_monitor = None  # Will be set by the main bot
         
         # State persistence (Removed - state is now in DB)
         # self.state_file = "data/trading_engine_state.json"
@@ -49,16 +50,56 @@ class TradingEngine:
         except Exception as e:
             logger.error(f"Could not initialize trading stats: {e}")
     
+    def set_market_monitor(self, market_monitor):
+        """Set the market monitor reference for accessing cached data."""
+        self.market_monitor = market_monitor
+        
     def get_balance(self) -> Optional[Dict[str, Any]]:
-        """Get account balance."""
+        """Get account balance (uses cached data when available)."""
+        # Try cached data first (updated every heartbeat)
+        if self.market_monitor and hasattr(self.market_monitor, 'get_cached_balance'):
+            cached_balance = self.market_monitor.get_cached_balance()
+            cache_age = self.market_monitor.get_cache_age_seconds()
+            
+            # Use cached data if it's fresh (less than 30 seconds old)
+            if cached_balance and cache_age < 30:
+                logger.debug(f"Using cached balance (age: {cache_age:.1f}s)")
+                return cached_balance
+        
+        # Fallback to fresh API call
+        logger.debug("Using fresh balance API call")
         return self.client.get_balance()
     
     def get_positions(self) -> Optional[List[Dict[str, Any]]]:
-        """Get all positions."""
+        """Get all positions (uses cached data when available)."""
+        # Try cached data first (updated every heartbeat)
+        if self.market_monitor and hasattr(self.market_monitor, 'get_cached_positions'):
+            cached_positions = self.market_monitor.get_cached_positions()
+            cache_age = self.market_monitor.get_cache_age_seconds()
+            
+            # Use cached data if it's fresh (less than 30 seconds old)
+            if cached_positions is not None and cache_age < 30:
+                logger.debug(f"Using cached positions (age: {cache_age:.1f}s): {len(cached_positions)} positions")
+                return cached_positions
+        
+        # Fallback to fresh API call
+        logger.debug("Using fresh positions API call")
         return self.client.get_positions()
     
     def get_orders(self) -> Optional[List[Dict[str, Any]]]:
-        """Get all open orders."""
+        """Get all open orders (uses cached data when available)."""
+        # Try cached data first (updated every heartbeat)
+        if self.market_monitor and hasattr(self.market_monitor, 'get_cached_orders'):
+            cached_orders = self.market_monitor.get_cached_orders()
+            cache_age = self.market_monitor.get_cache_age_seconds()
+            
+            # Use cached data if it's fresh (less than 30 seconds old)
+            if cached_orders is not None and cache_age < 30:
+                logger.debug(f"Using cached orders (age: {cache_age:.1f}s): {len(cached_orders)} orders")
+                return cached_orders
+        
+        # Fallback to fresh API call
+        logger.debug("Using fresh orders API call")
         return self.client.get_open_orders()
     
     def get_recent_fills(self) -> Optional[List[Dict[str, Any]]]: