123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207 |
- import asyncio
- import logging
- from typing import Optional
- from ..clients.hyperliquid_client import HyperliquidClient
- from ..notifications.notification_manager import NotificationManager
- from ..config.config import Config
- from .position_tracker import PositionTracker
- from .pending_orders_manager import PendingOrdersManager
- from .risk_manager import RiskManager
- from .alarm_manager import AlarmManager
- from .exchange_order_sync import ExchangeOrderSync
- # DrawdownMonitor and RsiMonitor will be lazy-loaded to avoid circular imports
- logger = logging.getLogger(__name__)
- class MonitoringCoordinator:
- """
- Simplified monitoring coordinator that manages all monitoring components.
- Replaces the complex unified monitor with a clean, focused approach.
- """
-
- def __init__(self, hl_client: HyperliquidClient, notification_manager: NotificationManager, config: Config):
- self.hl_client = hl_client
- self.notification_manager = notification_manager
- self.config = config
- self.is_running = False
-
- # Initialize monitoring components
- self.position_tracker = PositionTracker(hl_client, notification_manager)
- self.pending_orders_manager = PendingOrdersManager(hl_client, notification_manager)
- self.risk_manager = RiskManager(hl_client, notification_manager, config)
- self.alarm_manager = AlarmManager() # AlarmManager only needs alarms_file (defaults to data/price_alarms.json)
-
- # Exchange order synchronization (will be initialized with trading stats)
- self.exchange_order_sync = None
-
- # DrawdownMonitor and RSIMonitor will be lazy-loaded to avoid circular imports
- self.drawdown_monitor = None
- self.rsi_monitor = None
- self._hl_client = hl_client
- self._notification_manager = notification_manager
- self._config = config
-
- async def start(self):
- """Start all monitoring components"""
- if self.is_running:
- return
-
- self.is_running = True
- logger.info("Starting simplified monitoring system")
-
- try:
- # Start all monitors
- await self.position_tracker.start()
- await self.pending_orders_manager.start()
- await self.risk_manager.start()
- # AlarmManager doesn't have start() method - it's always ready
-
- # Initialize exchange order sync with trading stats
- self._init_exchange_order_sync()
-
- # Lazy-load optional monitors to avoid circular imports
- self._init_optional_monitors()
-
- # Start optional monitors if they have async start methods
- if hasattr(self.drawdown_monitor, 'start'):
- await self.drawdown_monitor.start()
- if hasattr(self.rsi_monitor, 'start'):
- await self.rsi_monitor.start()
-
- # Start order synchronization loop
- asyncio.create_task(self._order_sync_loop())
-
- logger.info("All monitoring components started successfully")
-
- except Exception as e:
- logger.error(f"Error starting monitoring system: {e}")
- await self.stop()
- raise
-
- async def stop(self):
- """Stop all monitoring components"""
- if not self.is_running:
- return
-
- self.is_running = False
- logger.info("Stopping monitoring system")
-
- # Stop all monitors
- await self.position_tracker.stop()
- await self.pending_orders_manager.stop()
- await self.risk_manager.stop()
- # AlarmManager doesn't have stop() method - nothing to stop
-
- # Stop optional monitors if they exist and have stop methods
- if self.drawdown_monitor and hasattr(self.drawdown_monitor, 'stop'):
- await self.drawdown_monitor.stop()
- if self.rsi_monitor and hasattr(self.rsi_monitor, 'stop'):
- await self.rsi_monitor.stop()
-
- logger.info("Monitoring system stopped")
-
- def _init_exchange_order_sync(self):
- """Initialize exchange order synchronization"""
- try:
- # Get trading stats from position tracker
- if hasattr(self.position_tracker, 'trading_stats') and self.position_tracker.trading_stats:
- self.exchange_order_sync = ExchangeOrderSync(
- self.hl_client,
- self.position_tracker.trading_stats
- )
- logger.info("✅ Exchange order sync initialized")
- else:
- logger.warning("⚠️ Trading stats not available, exchange order sync disabled")
- except Exception as e:
- logger.error(f"Error initializing exchange order sync: {e}")
-
- async def _order_sync_loop(self):
- """Periodic order synchronization loop"""
- logger.info("🔄 Starting exchange order synchronization loop")
- sync_interval = 30 # Sync every 30 seconds
- loop_count = 0
-
- while self.is_running:
- try:
- loop_count += 1
-
- # Run sync every 30 seconds (6 cycles with 5s heartbeat)
- if loop_count % 6 == 0 and self.exchange_order_sync:
- logger.debug(f"🔄 Running exchange order sync (loop #{loop_count})")
- sync_results = self.exchange_order_sync.sync_exchange_orders_to_database()
-
- # Log results if there were changes
- if sync_results.get('new_orders_added', 0) > 0 or sync_results.get('orders_updated', 0) > 0:
- logger.info(f"📊 Order sync: +{sync_results.get('new_orders_added', 0)} new, "
- f"~{sync_results.get('orders_updated', 0)} updated, "
- f"-{sync_results.get('orphaned_orders_cancelled', 0)} cancelled")
-
- # Wait for next cycle
- await asyncio.sleep(Config.BOT_HEARTBEAT_SECONDS)
-
- except Exception as e:
- logger.error(f"Error in order sync loop: {e}")
- await asyncio.sleep(Config.BOT_HEARTBEAT_SECONDS)
-
- logger.info("🛑 Exchange order sync loop stopped")
-
- def _init_optional_monitors(self):
- """Initialize optional monitors with lazy loading to avoid circular imports"""
- try:
- if self.drawdown_monitor is None:
- # DrawdownMonitor needs a TradingStats instance
- from ..stats.trading_stats import TradingStats
- from .drawdown_monitor import DrawdownMonitor
- stats = TradingStats()
- self.drawdown_monitor = DrawdownMonitor(stats)
-
- if self.rsi_monitor is None:
- # RsiMonitor class from rsi_monitor module
- from .rsi_monitor import RsiMonitor
- self.rsi_monitor = RsiMonitor(self._hl_client, self._notification_manager)
-
- except Exception as e:
- logger.warning(f"Could not initialize optional monitors: {e}")
- # Set to dummy objects that won't break the system
- if self.drawdown_monitor is None:
- self.drawdown_monitor = type('DummyMonitor', (), {'start': lambda: None, 'stop': lambda: None, 'is_running': False})()
- if self.rsi_monitor is None:
- self.rsi_monitor = type('DummyMonitor', (), {'start': lambda: None, 'stop': lambda: None, 'is_running': False})()
-
- async def add_pending_stop_loss(self, symbol: str, stop_price: float, size: float, side: str, expires_hours: int = 24):
- """Add a pending stop loss order"""
- await self.pending_orders_manager.add_pending_stop_loss(symbol, stop_price, size, side, expires_hours)
-
- async def cancel_pending_order(self, symbol: str) -> bool:
- """Cancel pending order for symbol"""
- return await self.pending_orders_manager.cancel_pending_order(symbol)
-
- async def get_pending_orders(self) -> list:
- """Get all pending orders"""
- return await self.pending_orders_manager.get_pending_orders()
-
- async def get_risk_status(self) -> dict:
- """Get current risk status"""
- return await self.risk_manager.get_risk_status()
-
- async def get_monitoring_status(self) -> dict:
- """Get overall monitoring status"""
- try:
- return {
- 'is_running': self.is_running,
- 'components': {
- 'position_tracker': self.position_tracker.is_running,
- 'pending_orders_manager': self.pending_orders_manager.is_running,
- 'risk_manager': self.risk_manager.is_running,
- 'alarm_manager': self.alarm_manager.is_running if hasattr(self.alarm_manager, 'is_running') else True,
- 'drawdown_monitor': self.drawdown_monitor.is_running if hasattr(self.drawdown_monitor, 'is_running') else True,
- 'rsi_monitor': self.rsi_monitor.is_running if hasattr(self.rsi_monitor, 'is_running') else True
- },
- 'pending_orders_count': len(await self.get_pending_orders()),
- 'risk_status': await self.get_risk_status()
- }
- except Exception as e:
- logger.error(f"Error getting monitoring status: {e}")
- return {'error': str(e)}
|