monitoring_coordinator.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. import asyncio
  2. import logging
  3. from typing import Optional
  4. from ..clients.hyperliquid_client import HyperliquidClient
  5. from ..notifications.notification_manager import NotificationManager
  6. from ..config.config import Config
  7. from .position_tracker import PositionTracker
  8. from .pending_orders_manager import PendingOrdersManager
  9. from .risk_manager import RiskManager
  10. from .alarm_manager import AlarmManager
  11. from .exchange_order_sync import ExchangeOrderSync
  12. # DrawdownMonitor and RsiMonitor will be lazy-loaded to avoid circular imports
  13. logger = logging.getLogger(__name__)
  14. class MonitoringCoordinator:
  15. """
  16. Simplified monitoring coordinator that manages all monitoring components.
  17. Replaces the complex unified monitor with a clean, focused approach.
  18. """
  19. def __init__(self, hl_client: HyperliquidClient, notification_manager: NotificationManager, config: Config):
  20. self.hl_client = hl_client
  21. self.notification_manager = notification_manager
  22. self.config = config
  23. self.is_running = False
  24. # Initialize monitoring components
  25. self.position_tracker = PositionTracker(hl_client, notification_manager)
  26. self.pending_orders_manager = PendingOrdersManager(hl_client, notification_manager)
  27. self.risk_manager = RiskManager(hl_client, notification_manager, config)
  28. self.alarm_manager = AlarmManager() # AlarmManager only needs alarms_file (defaults to data/price_alarms.json)
  29. # Exchange order synchronization (will be initialized with trading stats)
  30. self.exchange_order_sync = None
  31. # DrawdownMonitor and RSIMonitor will be lazy-loaded to avoid circular imports
  32. self.drawdown_monitor = None
  33. self.rsi_monitor = None
  34. self._hl_client = hl_client
  35. self._notification_manager = notification_manager
  36. self._config = config
  37. async def start(self):
  38. """Start all monitoring components"""
  39. if self.is_running:
  40. return
  41. self.is_running = True
  42. logger.info("Starting simplified monitoring system")
  43. try:
  44. # Start all monitors
  45. await self.position_tracker.start()
  46. await self.pending_orders_manager.start()
  47. await self.risk_manager.start()
  48. # AlarmManager doesn't have start() method - it's always ready
  49. # Initialize exchange order sync with trading stats
  50. self._init_exchange_order_sync()
  51. # Lazy-load optional monitors to avoid circular imports
  52. self._init_optional_monitors()
  53. # Start optional monitors if they have async start methods
  54. if hasattr(self.drawdown_monitor, 'start'):
  55. await self.drawdown_monitor.start()
  56. if hasattr(self.rsi_monitor, 'start'):
  57. await self.rsi_monitor.start()
  58. # Start order synchronization loop
  59. asyncio.create_task(self._order_sync_loop())
  60. logger.info("All monitoring components started successfully")
  61. except Exception as e:
  62. logger.error(f"Error starting monitoring system: {e}")
  63. await self.stop()
  64. raise
  65. async def stop(self):
  66. """Stop all monitoring components"""
  67. if not self.is_running:
  68. return
  69. self.is_running = False
  70. logger.info("Stopping monitoring system")
  71. # Stop all monitors
  72. await self.position_tracker.stop()
  73. await self.pending_orders_manager.stop()
  74. await self.risk_manager.stop()
  75. # AlarmManager doesn't have stop() method - nothing to stop
  76. # Stop optional monitors if they exist and have stop methods
  77. if self.drawdown_monitor and hasattr(self.drawdown_monitor, 'stop'):
  78. await self.drawdown_monitor.stop()
  79. if self.rsi_monitor and hasattr(self.rsi_monitor, 'stop'):
  80. await self.rsi_monitor.stop()
  81. logger.info("Monitoring system stopped")
  82. def _init_exchange_order_sync(self):
  83. """Initialize exchange order synchronization"""
  84. try:
  85. # Try multiple sources for trading stats
  86. trading_stats = None
  87. # Method 1: Get from position tracker
  88. if hasattr(self.position_tracker, 'trading_stats') and self.position_tracker.trading_stats:
  89. trading_stats = self.position_tracker.trading_stats
  90. logger.debug("Found trading stats from position tracker")
  91. # Method 2: Get from trading engine (if we have access to it)
  92. elif hasattr(self, '_trading_engine') and self._trading_engine:
  93. trading_stats = self._trading_engine.get_stats()
  94. logger.debug("Found trading stats from trading engine")
  95. # Method 3: Try to initialize trading stats directly
  96. if not trading_stats:
  97. try:
  98. from ..stats.trading_stats import TradingStats
  99. trading_stats = TradingStats()
  100. logger.debug("Initialized trading stats directly")
  101. except Exception as e:
  102. logger.warning(f"Could not initialize trading stats directly: {e}")
  103. if trading_stats:
  104. self.exchange_order_sync = ExchangeOrderSync(
  105. self.hl_client,
  106. trading_stats
  107. )
  108. logger.info("✅ Exchange order sync initialized")
  109. else:
  110. logger.warning("⚠️ Trading stats not available from any source, exchange order sync disabled")
  111. except Exception as e:
  112. logger.error(f"Error initializing exchange order sync: {e}", exc_info=True)
  113. async def _order_sync_loop(self):
  114. """Periodic order synchronization loop"""
  115. logger.info("🔄 Starting exchange order synchronization loop")
  116. sync_interval = 30 # Sync every 30 seconds
  117. loop_count = 0
  118. while self.is_running:
  119. try:
  120. loop_count += 1
  121. # Run sync every 30 seconds (6 cycles with 5s heartbeat)
  122. if loop_count % 6 == 0 and self.exchange_order_sync:
  123. logger.debug(f"🔄 Running exchange order sync (loop #{loop_count})")
  124. sync_results = self.exchange_order_sync.sync_exchange_orders_to_database()
  125. # Log results if there were changes
  126. if sync_results.get('new_orders_added', 0) > 0 or sync_results.get('orders_updated', 0) > 0:
  127. logger.info(f"📊 Order sync: +{sync_results.get('new_orders_added', 0)} new, "
  128. f"~{sync_results.get('orders_updated', 0)} updated, "
  129. f"-{sync_results.get('orphaned_orders_cancelled', 0)} cancelled")
  130. # Wait for next cycle
  131. await asyncio.sleep(Config.BOT_HEARTBEAT_SECONDS)
  132. except Exception as e:
  133. logger.error(f"Error in order sync loop: {e}")
  134. await asyncio.sleep(Config.BOT_HEARTBEAT_SECONDS)
  135. logger.info("🛑 Exchange order sync loop stopped")
  136. def _init_optional_monitors(self):
  137. """Initialize optional monitors with lazy loading to avoid circular imports"""
  138. try:
  139. if self.drawdown_monitor is None:
  140. # DrawdownMonitor needs a TradingStats instance
  141. from ..stats.trading_stats import TradingStats
  142. from .drawdown_monitor import DrawdownMonitor
  143. stats = TradingStats()
  144. self.drawdown_monitor = DrawdownMonitor(stats)
  145. if self.rsi_monitor is None:
  146. # RsiMonitor class from rsi_monitor module
  147. from .rsi_monitor import RsiMonitor
  148. self.rsi_monitor = RsiMonitor(self._hl_client, self._notification_manager)
  149. except Exception as e:
  150. logger.warning(f"Could not initialize optional monitors: {e}")
  151. # Set to dummy objects that won't break the system
  152. if self.drawdown_monitor is None:
  153. self.drawdown_monitor = type('DummyMonitor', (), {'start': lambda: None, 'stop': lambda: None, 'is_running': False})()
  154. if self.rsi_monitor is None:
  155. self.rsi_monitor = type('DummyMonitor', (), {'start': lambda: None, 'stop': lambda: None, 'is_running': False})()
  156. async def add_pending_stop_loss(self, symbol: str, stop_price: float, size: float, side: str, expires_hours: int = 24):
  157. """Add a pending stop loss order"""
  158. await self.pending_orders_manager.add_pending_stop_loss(symbol, stop_price, size, side, expires_hours)
  159. async def cancel_pending_order(self, symbol: str) -> bool:
  160. """Cancel pending order for symbol"""
  161. return await self.pending_orders_manager.cancel_pending_order(symbol)
  162. async def get_pending_orders(self) -> list:
  163. """Get all pending orders"""
  164. return await self.pending_orders_manager.get_pending_orders()
  165. async def get_risk_status(self) -> dict:
  166. """Get current risk status"""
  167. return await self.risk_manager.get_risk_status()
  168. async def get_monitoring_status(self) -> dict:
  169. """Get overall monitoring status"""
  170. try:
  171. return {
  172. 'is_running': self.is_running,
  173. 'components': {
  174. 'position_tracker': self.position_tracker.is_running,
  175. 'pending_orders_manager': self.pending_orders_manager.is_running,
  176. 'risk_manager': self.risk_manager.is_running,
  177. 'alarm_manager': self.alarm_manager.is_running if hasattr(self.alarm_manager, 'is_running') else True,
  178. 'drawdown_monitor': self.drawdown_monitor.is_running if hasattr(self.drawdown_monitor, 'is_running') else True,
  179. 'rsi_monitor': self.rsi_monitor.is_running if hasattr(self.rsi_monitor, 'is_running') else True
  180. },
  181. 'pending_orders_count': len(await self.get_pending_orders()),
  182. 'risk_status': await self.get_risk_status()
  183. }
  184. except Exception as e:
  185. logger.error(f"Error getting monitoring status: {e}")
  186. return {'error': str(e)}