monitoring_coordinator.py 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. import asyncio
  2. import logging
  3. from typing import Optional
  4. from ..clients.hyperliquid_client import HyperliquidClient
  5. from ..notifications.notification_manager import NotificationManager
  6. from ..config.config import Config
  7. from .position_tracker import PositionTracker
  8. from .pending_orders_manager import PendingOrdersManager
  9. from .risk_manager import RiskManager
  10. from .alarm_manager import AlarmManager
  11. from .exchange_order_sync import ExchangeOrderSync
  12. # DrawdownMonitor and RsiMonitor will be lazy-loaded to avoid circular imports
  13. logger = logging.getLogger(__name__)
  14. class MonitoringCoordinator:
  15. """
  16. Simplified monitoring coordinator that manages all monitoring components.
  17. Replaces the complex unified monitor with a clean, focused approach.
  18. """
  19. def __init__(self, hl_client: HyperliquidClient, notification_manager: NotificationManager, config: Config):
  20. self.hl_client = hl_client
  21. self.notification_manager = notification_manager
  22. self.config = config
  23. self.is_running = False
  24. # Initialize monitoring components
  25. self.position_tracker = PositionTracker(hl_client, notification_manager)
  26. self.pending_orders_manager = PendingOrdersManager(hl_client, notification_manager)
  27. self.risk_manager = RiskManager(hl_client, notification_manager, config)
  28. self.alarm_manager = AlarmManager() # AlarmManager only needs alarms_file (defaults to data/price_alarms.json)
  29. # Exchange order synchronization (will be initialized with trading stats)
  30. self.exchange_order_sync = None
  31. # DrawdownMonitor and RSIMonitor will be lazy-loaded to avoid circular imports
  32. self.drawdown_monitor = None
  33. self.rsi_monitor = None
  34. self._hl_client = hl_client
  35. self._notification_manager = notification_manager
  36. self._config = config
  37. async def start(self):
  38. """Start all monitoring components"""
  39. if self.is_running:
  40. return
  41. self.is_running = True
  42. logger.info("Starting simplified monitoring system")
  43. try:
  44. # Start all monitors
  45. await self.position_tracker.start()
  46. await self.pending_orders_manager.start()
  47. await self.risk_manager.start()
  48. # AlarmManager doesn't have start() method - it's always ready
  49. # Initialize exchange order sync with trading stats
  50. self._init_exchange_order_sync()
  51. # Lazy-load optional monitors to avoid circular imports
  52. self._init_optional_monitors()
  53. # Start optional monitors if they have async start methods
  54. if hasattr(self.drawdown_monitor, 'start'):
  55. await self.drawdown_monitor.start()
  56. if hasattr(self.rsi_monitor, 'start'):
  57. await self.rsi_monitor.start()
  58. # Start order synchronization loop
  59. asyncio.create_task(self._order_sync_loop())
  60. logger.info("All monitoring components started successfully")
  61. except Exception as e:
  62. logger.error(f"Error starting monitoring system: {e}")
  63. await self.stop()
  64. raise
  65. async def stop(self):
  66. """Stop all monitoring components"""
  67. if not self.is_running:
  68. return
  69. self.is_running = False
  70. logger.info("Stopping monitoring system")
  71. # Stop all monitors
  72. await self.position_tracker.stop()
  73. await self.pending_orders_manager.stop()
  74. await self.risk_manager.stop()
  75. # AlarmManager doesn't have stop() method - nothing to stop
  76. # Stop optional monitors if they exist and have stop methods
  77. if self.drawdown_monitor and hasattr(self.drawdown_monitor, 'stop'):
  78. await self.drawdown_monitor.stop()
  79. if self.rsi_monitor and hasattr(self.rsi_monitor, 'stop'):
  80. await self.rsi_monitor.stop()
  81. logger.info("Monitoring system stopped")
  82. def _init_exchange_order_sync(self):
  83. """Initialize exchange order synchronization"""
  84. try:
  85. # Get trading stats from position tracker
  86. if hasattr(self.position_tracker, 'trading_stats') and self.position_tracker.trading_stats:
  87. self.exchange_order_sync = ExchangeOrderSync(
  88. self.hl_client,
  89. self.position_tracker.trading_stats
  90. )
  91. logger.info("✅ Exchange order sync initialized")
  92. else:
  93. logger.warning("⚠️ Trading stats not available, exchange order sync disabled")
  94. except Exception as e:
  95. logger.error(f"Error initializing exchange order sync: {e}")
  96. async def _order_sync_loop(self):
  97. """Periodic order synchronization loop"""
  98. logger.info("🔄 Starting exchange order synchronization loop")
  99. sync_interval = 30 # Sync every 30 seconds
  100. loop_count = 0
  101. while self.is_running:
  102. try:
  103. loop_count += 1
  104. # Run sync every 30 seconds (6 cycles with 5s heartbeat)
  105. if loop_count % 6 == 0 and self.exchange_order_sync:
  106. logger.debug(f"🔄 Running exchange order sync (loop #{loop_count})")
  107. sync_results = self.exchange_order_sync.sync_exchange_orders_to_database()
  108. # Log results if there were changes
  109. if sync_results.get('new_orders_added', 0) > 0 or sync_results.get('orders_updated', 0) > 0:
  110. logger.info(f"📊 Order sync: +{sync_results.get('new_orders_added', 0)} new, "
  111. f"~{sync_results.get('orders_updated', 0)} updated, "
  112. f"-{sync_results.get('orphaned_orders_cancelled', 0)} cancelled")
  113. # Wait for next cycle
  114. await asyncio.sleep(Config.BOT_HEARTBEAT_SECONDS)
  115. except Exception as e:
  116. logger.error(f"Error in order sync loop: {e}")
  117. await asyncio.sleep(Config.BOT_HEARTBEAT_SECONDS)
  118. logger.info("🛑 Exchange order sync loop stopped")
  119. def _init_optional_monitors(self):
  120. """Initialize optional monitors with lazy loading to avoid circular imports"""
  121. try:
  122. if self.drawdown_monitor is None:
  123. # DrawdownMonitor needs a TradingStats instance
  124. from ..stats.trading_stats import TradingStats
  125. from .drawdown_monitor import DrawdownMonitor
  126. stats = TradingStats()
  127. self.drawdown_monitor = DrawdownMonitor(stats)
  128. if self.rsi_monitor is None:
  129. # RsiMonitor class from rsi_monitor module
  130. from .rsi_monitor import RsiMonitor
  131. self.rsi_monitor = RsiMonitor(self._hl_client, self._notification_manager)
  132. except Exception as e:
  133. logger.warning(f"Could not initialize optional monitors: {e}")
  134. # Set to dummy objects that won't break the system
  135. if self.drawdown_monitor is None:
  136. self.drawdown_monitor = type('DummyMonitor', (), {'start': lambda: None, 'stop': lambda: None, 'is_running': False})()
  137. if self.rsi_monitor is None:
  138. self.rsi_monitor = type('DummyMonitor', (), {'start': lambda: None, 'stop': lambda: None, 'is_running': False})()
  139. async def add_pending_stop_loss(self, symbol: str, stop_price: float, size: float, side: str, expires_hours: int = 24):
  140. """Add a pending stop loss order"""
  141. await self.pending_orders_manager.add_pending_stop_loss(symbol, stop_price, size, side, expires_hours)
  142. async def cancel_pending_order(self, symbol: str) -> bool:
  143. """Cancel pending order for symbol"""
  144. return await self.pending_orders_manager.cancel_pending_order(symbol)
  145. async def get_pending_orders(self) -> list:
  146. """Get all pending orders"""
  147. return await self.pending_orders_manager.get_pending_orders()
  148. async def get_risk_status(self) -> dict:
  149. """Get current risk status"""
  150. return await self.risk_manager.get_risk_status()
  151. async def get_monitoring_status(self) -> dict:
  152. """Get overall monitoring status"""
  153. try:
  154. return {
  155. 'is_running': self.is_running,
  156. 'components': {
  157. 'position_tracker': self.position_tracker.is_running,
  158. 'pending_orders_manager': self.pending_orders_manager.is_running,
  159. 'risk_manager': self.risk_manager.is_running,
  160. 'alarm_manager': self.alarm_manager.is_running if hasattr(self.alarm_manager, 'is_running') else True,
  161. 'drawdown_monitor': self.drawdown_monitor.is_running if hasattr(self.drawdown_monitor, 'is_running') else True,
  162. 'rsi_monitor': self.rsi_monitor.is_running if hasattr(self.rsi_monitor, 'is_running') else True
  163. },
  164. 'pending_orders_count': len(await self.get_pending_orders()),
  165. 'risk_status': await self.get_risk_status()
  166. }
  167. except Exception as e:
  168. logger.error(f"Error getting monitoring status: {e}")
  169. return {'error': str(e)}