monitoring_coordinator.py 13 KB


  1. import asyncio
  2. import logging
  3. from typing import Optional
  4. from ..clients.hyperliquid_client import HyperliquidClient
  5. from ..notifications.notification_manager import NotificationManager
  6. from ..config.config import Config
  7. from .position_tracker import PositionTracker
  8. from .pending_orders_manager import PendingOrdersManager
  9. from .risk_manager import RiskManager
  10. from .alarm_manager import AlarmManager
  11. from .exchange_order_sync import ExchangeOrderSync
  12. # DrawdownMonitor and RsiMonitor will be lazy-loaded to avoid circular imports
  13. logger = logging.getLogger(__name__)
  14. try:
  15. from .copy_trading_monitor import CopyTradingMonitor
  16. COPY_TRADING_AVAILABLE = True
  17. except ImportError as e:
  18. logger.warning(f"Copy trading monitor not available: {e}")
  19. COPY_TRADING_AVAILABLE = False
  20. class MonitoringCoordinator:
  21. """
  22. Simplified monitoring coordinator that manages all monitoring components.
  23. Replaces the complex unified monitor with a clean, focused approach.
  24. """
  25. def __init__(self, hl_client: HyperliquidClient, notification_manager: NotificationManager, config: Config):
  26. self.hl_client = hl_client
  27. self.notification_manager = notification_manager
  28. self.config = config
  29. self.is_running = False
  30. # Initialize monitoring components
  31. self.position_tracker = PositionTracker(hl_client, notification_manager)
  32. self.pending_orders_manager = PendingOrdersManager(hl_client, notification_manager)
  33. self.risk_manager = RiskManager(hl_client, notification_manager, config)
  34. self.alarm_manager = AlarmManager() # AlarmManager only needs alarms_file (defaults to data/price_alarms.json)
  35. # Initialize copy trading monitor (Step 2: Initialize but don't start)
  36. try:
  37. from src.monitoring.copy_trading_monitor import CopyTradingMonitor
  38. self.copy_trading_monitor = CopyTradingMonitor(
  39. self.hl_client,
  40. self.notification_manager
  41. )
  42. logger.info("✅ Copy trading monitor initialized (Step 2: not auto-starting)")
  43. self.copy_trading_available = True
  44. except ImportError as e:
  45. logger.warning(f"Copy trading monitor not available: {e}")
  46. self.copy_trading_monitor = None
  47. self.copy_trading_available = False
  48. except Exception as e:
  49. logger.error(f"Error initializing copy trading monitor: {e}")
  50. self.copy_trading_monitor = None
  51. self.copy_trading_available = False
  52. # Exchange order synchronization (will be initialized with trading stats)
  53. self.exchange_order_sync = None
  54. # DrawdownMonitor and RSIMonitor will be lazy-loaded to avoid circular imports
  55. self.drawdown_monitor = None
  56. self.rsi_monitor = None
  57. self._hl_client = hl_client
  58. self._notification_manager = notification_manager
  59. self._config = config
  60. async def start(self):
  61. """Start all monitoring components"""
  62. if self.is_running:
  63. return
  64. self.is_running = True
  65. logger.info("Starting simplified monitoring system")
  66. try:
  67. # Start all monitors
  68. await self.position_tracker.start()
  69. await self.pending_orders_manager.start()
  70. await self.risk_manager.start()
  71. # AlarmManager doesn't have start() method - it's always ready
  72. # Start copy trading monitor if enabled (with async fixes applied)
  73. if self.copy_trading_monitor and hasattr(self.copy_trading_monitor, 'enabled') and self.copy_trading_monitor.enabled:
  74. try:
  75. asyncio.create_task(self.copy_trading_monitor.start_monitoring())
  76. logger.info("🔄 Copy trading monitor started (with full async implementation)")
  77. except Exception as e:
  78. logger.error(f"❌ Failed to start copy trading monitor: {e}")
  79. elif self.copy_trading_monitor:
  80. logger.info("✅ Copy trading monitor initialized (ready for manual start)")
  81. # Initialize exchange order sync with trading stats
  82. self._init_exchange_order_sync()
  83. # Lazy-load optional monitors to avoid circular imports
  84. self._init_optional_monitors()
  85. # Start optional monitors if they have async start methods
  86. if hasattr(self.drawdown_monitor, 'start'):
  87. await self.drawdown_monitor.start()
  88. if hasattr(self.rsi_monitor, 'start'):
  89. await self.rsi_monitor.start()
  90. # Start order synchronization loop
  91. asyncio.create_task(self._order_sync_loop())
  92. logger.info("All monitoring components started successfully")
  93. except Exception as e:
  94. logger.error(f"Error starting monitoring system: {e}")
  95. await self.stop()
  96. raise
  97. async def stop(self):
  98. """Stop all monitoring components"""
  99. if not self.is_running:
  100. return
  101. self.is_running = False
  102. logger.info("Stopping monitoring system")
  103. # Stop all monitors
  104. await self.position_tracker.stop()
  105. await self.pending_orders_manager.stop()
  106. await self.risk_manager.stop()
  107. # AlarmManager doesn't have stop() method - nothing to stop
  108. # Stop copy trading monitor
  109. if self.copy_trading_monitor and hasattr(self.copy_trading_monitor, 'stop_monitoring'):
  110. try:
  111. await self.copy_trading_monitor.stop_monitoring()
  112. logger.info("🛑 Copy trading monitor stopped")
  113. except Exception as e:
  114. logger.error(f"❌ Error stopping copy trading monitor: {e}")
  115. # Stop optional monitors if they exist and have stop methods
  116. if self.drawdown_monitor and hasattr(self.drawdown_monitor, 'stop'):
  117. await self.drawdown_monitor.stop()
  118. if self.rsi_monitor and hasattr(self.rsi_monitor, 'stop'):
  119. await self.rsi_monitor.stop()
  120. logger.info("Monitoring system stopped")
  121. def _init_exchange_order_sync(self):
  122. """Initialize exchange order synchronization"""
  123. try:
  124. # Try multiple sources for trading stats
  125. trading_stats = None
  126. # Method 1: Get from position tracker
  127. if hasattr(self.position_tracker, 'trading_stats') and self.position_tracker.trading_stats:
  128. trading_stats = self.position_tracker.trading_stats
  129. logger.debug("Found trading stats from position tracker")
  130. # Method 2: Get from trading engine (if we have access to it)
  131. elif hasattr(self, '_trading_engine') and self._trading_engine:
  132. trading_stats = self._trading_engine.get_stats()
  133. logger.debug("Found trading stats from trading engine")
  134. # Method 3: Try to initialize trading stats directly
  135. if not trading_stats:
  136. try:
  137. from ..stats.trading_stats import TradingStats
  138. trading_stats = TradingStats()
  139. logger.debug("Initialized trading stats directly")
  140. except Exception as e:
  141. logger.warning(f"Could not initialize trading stats directly: {e}")
  142. if trading_stats:
  143. self.exchange_order_sync = ExchangeOrderSync(
  144. self.hl_client,
  145. trading_stats
  146. )
  147. logger.info("✅ Exchange order sync initialized")
  148. else:
  149. logger.warning("⚠️ Trading stats not available from any source, exchange order sync disabled")
  150. except Exception as e:
  151. logger.error(f"Error initializing exchange order sync: {e}", exc_info=True)
  152. async def _order_sync_loop(self):
  153. """Periodic order synchronization loop"""
  154. logger.info("🔄 Starting exchange order synchronization loop")
  155. sync_interval = 30 # Sync every 30 seconds
  156. loop_count = 0
  157. while self.is_running:
  158. try:
  159. loop_count += 1
  160. # Run sync every 30 seconds (6 cycles with 5s heartbeat)
  161. if loop_count % 6 == 0 and self.exchange_order_sync:
  162. logger.debug(f"🔄 Running exchange order sync (loop #{loop_count})")
  163. sync_results = self.exchange_order_sync.sync_exchange_orders_to_database()
  164. # Log results if there were changes
  165. if sync_results.get('new_orders_added', 0) > 0 or sync_results.get('orders_updated', 0) > 0:
  166. logger.info(f"📊 Order sync: +{sync_results.get('new_orders_added', 0)} new, "
  167. f"~{sync_results.get('orders_updated', 0)} updated, "
  168. f"-{sync_results.get('orphaned_orders_cancelled', 0)} cancelled")
  169. # Wait for next cycle
  170. await asyncio.sleep(Config.BOT_HEARTBEAT_SECONDS)
  171. except Exception as e:
  172. logger.error(f"Error in order sync loop: {e}")
  173. await asyncio.sleep(Config.BOT_HEARTBEAT_SECONDS)
  174. logger.info("🛑 Exchange order sync loop stopped")
  175. def _init_optional_monitors(self):
  176. """Initialize optional monitors with lazy loading to avoid circular imports"""
  177. try:
  178. if self.drawdown_monitor is None:
  179. # DrawdownMonitor needs a TradingStats instance
  180. from ..stats.trading_stats import TradingStats
  181. from .drawdown_monitor import DrawdownMonitor
  182. stats = TradingStats()
  183. self.drawdown_monitor = DrawdownMonitor(stats)
  184. if self.rsi_monitor is None:
  185. # RsiMonitor class from rsi_monitor module
  186. from .rsi_monitor import RsiMonitor
  187. self.rsi_monitor = RsiMonitor(self._hl_client, self._notification_manager)
  188. except Exception as e:
  189. logger.warning(f"Could not initialize optional monitors: {e}")
  190. # Set to dummy objects that won't break the system
  191. if self.drawdown_monitor is None:
  192. self.drawdown_monitor = type('DummyMonitor', (), {'start': lambda: None, 'stop': lambda: None, 'is_running': False})()
  193. if self.rsi_monitor is None:
  194. self.rsi_monitor = type('DummyMonitor', (), {'start': lambda: None, 'stop': lambda: None, 'is_running': False})()
  195. async def add_pending_stop_loss(self, symbol: str, stop_price: float, size: float, side: str, expires_hours: int = 24):
  196. """Add a pending stop loss order"""
  197. await self.pending_orders_manager.add_pending_stop_loss(symbol, stop_price, size, side, expires_hours)
  198. async def cancel_pending_order(self, symbol: str) -> bool:
  199. """Cancel pending order for symbol"""
  200. return await self.pending_orders_manager.cancel_pending_order(symbol)
  201. async def get_pending_orders(self) -> list:
  202. """Get all pending orders"""
  203. return await self.pending_orders_manager.get_pending_orders()
  204. async def get_risk_status(self) -> dict:
  205. """Get current risk status"""
  206. return await self.risk_manager.get_risk_status()
  207. async def get_monitoring_status(self) -> dict:
  208. """Get overall monitoring status"""
  209. try:
  210. return {
  211. 'is_running': self.is_running,
  212. 'components': {
  213. 'position_tracker': self.position_tracker.is_running,
  214. 'pending_orders_manager': self.pending_orders_manager.is_running,
  215. 'risk_manager': self.risk_manager.is_running,
  216. 'alarm_manager': self.alarm_manager.is_running if hasattr(self.alarm_manager, 'is_running') else True,
  217. 'copy_trading_monitor': self.copy_trading_monitor.enabled if self.copy_trading_monitor else False,
  218. 'drawdown_monitor': self.drawdown_monitor.is_running if hasattr(self.drawdown_monitor, 'is_running') else True,
  219. 'rsi_monitor': self.rsi_monitor.is_running if hasattr(self.rsi_monitor, 'is_running') else True
  220. },
  221. 'pending_orders_count': len(await self.get_pending_orders()),
  222. 'risk_status': await self.get_risk_status()
  223. }
  224. except Exception as e:
  225. logger.error(f"Error getting monitoring status: {e}")
  226. return {'error': str(e)}