market_monitor.py 89 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518
  1. #!/usr/bin/env python3
  2. """
  3. Market Monitor - Handles external trade monitoring and heartbeat functionality.
  4. """
  5. import logging
  6. import asyncio
  7. from datetime import datetime, timedelta, timezone
  8. from typing import Optional, Dict, Any, List
  9. import os
  10. import json
  11. from src.config.config import Config
  12. from src.monitoring.alarm_manager import AlarmManager
  13. logger = logging.getLogger(__name__)
  14. class MarketMonitor:
  15. """Handles external trade monitoring and market events."""
  16. def __init__(self, trading_engine, notification_manager=None):
  17. """Initialize the market monitor."""
  18. self.trading_engine = trading_engine
  19. self.notification_manager = notification_manager
  20. self.client = trading_engine.client
  21. self._last_processed_trade_time = datetime.now(timezone.utc) - timedelta(seconds=120)
  22. self._monitoring_active = False
  23. # 🆕 External stop loss tracking
  24. self._external_stop_loss_orders = {} # Format: {exchange_order_id: {'token': str, 'trigger_price': float, 'side': str, 'detected_at': datetime}}
  25. # External trade monitoring
  26. # self.state_file = "data/market_monitor_state.json" # Removed, state now in DB
  27. self.last_processed_trade_time: Optional[datetime] = None
  28. # Alarm management
  29. self.alarm_manager = AlarmManager()
  30. # Order monitoring
  31. self.last_known_orders = set()
  32. self.last_known_positions = {}
  33. self._load_state()
  34. async def start(self):
  35. """Start the market monitor."""
  36. if self._monitoring_active:
  37. return
  38. self._monitoring_active = True
  39. logger.info("🔄 Market monitor started")
  40. # Initialize tracking
  41. await self._initialize_tracking()
  42. # Start monitoring task
  43. self._monitor_task = asyncio.create_task(self._monitor_loop())
  44. async def stop(self):
  45. """Stop the market monitor."""
  46. if not self._monitoring_active:
  47. return
  48. self._monitoring_active = False
  49. if self._monitor_task:
  50. self._monitor_task.cancel()
  51. try:
  52. await self._monitor_task
  53. except asyncio.CancelledError:
  54. pass
  55. self._save_state()
  56. logger.info("🛑 Market monitor stopped")
  57. def _load_state(self):
  58. """Load market monitor state from SQLite DB via TradingStats."""
  59. stats = self.trading_engine.get_stats()
  60. if not stats:
  61. logger.warning("⚠️ TradingStats not available, cannot load MarketMonitor state.")
  62. self.last_processed_trade_time = None
  63. return
  64. try:
  65. last_time_str = stats._get_metadata('market_monitor_last_processed_trade_time')
  66. if last_time_str:
  67. self.last_processed_trade_time = datetime.fromisoformat(last_time_str)
  68. # Ensure it's timezone-aware (UTC)
  69. if self.last_processed_trade_time.tzinfo is None:
  70. self.last_processed_trade_time = self.last_processed_trade_time.replace(tzinfo=timezone.utc)
  71. else:
  72. self.last_processed_trade_time = self.last_processed_trade_time.astimezone(timezone.utc)
  73. logger.info(f"🔄 Loaded MarketMonitor state from DB: last_processed_trade_time = {self.last_processed_trade_time.isoformat()}")
  74. else:
  75. logger.info("💨 No MarketMonitor state (last_processed_trade_time) found in DB. Will start with fresh external trade tracking.")
  76. self.last_processed_trade_time = None
  77. except Exception as e:
  78. logger.error(f"Error loading MarketMonitor state from DB: {e}. Proceeding with default state.")
  79. self.last_processed_trade_time = None
  80. def _save_state(self):
  81. """Save market monitor state to SQLite DB via TradingStats."""
  82. stats = self.trading_engine.get_stats()
  83. if not stats:
  84. logger.warning("⚠️ TradingStats not available, cannot save MarketMonitor state.")
  85. return
  86. try:
  87. if self.last_processed_trade_time:
  88. # Ensure timestamp is UTC before saving
  89. lptt_utc = self.last_processed_trade_time
  90. if lptt_utc.tzinfo is None:
  91. lptt_utc = lptt_utc.replace(tzinfo=timezone.utc)
  92. else:
  93. lptt_utc = lptt_utc.astimezone(timezone.utc)
  94. stats._set_metadata('market_monitor_last_processed_trade_time', lptt_utc.isoformat())
  95. logger.info(f"💾 Saved MarketMonitor state (last_processed_trade_time) to DB: {lptt_utc.isoformat()}")
  96. else:
  97. # If it's None, we might want to remove the key or save it as an empty string
  98. # For now, let's assume we only save if there is a time. Or remove it.
  99. stats._set_metadata('market_monitor_last_processed_trade_time', '') # Or handle deletion
  100. logger.info("💾 MarketMonitor state (last_processed_trade_time) is None, saved as empty in DB.")
  101. except Exception as e:
  102. logger.error(f"Error saving MarketMonitor state to DB: {e}")
  103. async def _initialize_tracking(self):
  104. """Initialize order and position tracking."""
  105. try:
  106. # Get current open orders to initialize tracking
  107. orders = self.trading_engine.get_orders()
  108. if orders:
  109. self.last_known_orders = {order.get('id') for order in orders if order.get('id')}
  110. logger.info(f"📋 Initialized tracking with {len(self.last_known_orders)} open orders")
  111. # Get current positions for P&L tracking
  112. positions = self.trading_engine.get_positions()
  113. if positions:
  114. for position in positions:
  115. symbol = position.get('symbol')
  116. contracts = float(position.get('contracts', 0))
  117. entry_price = float(position.get('entryPx', 0))
  118. if symbol and contracts != 0:
  119. self.last_known_positions[symbol] = {
  120. 'contracts': contracts,
  121. 'entry_price': entry_price
  122. }
  123. logger.info(f"📊 Initialized tracking with {len(self.last_known_positions)} positions")
  124. except Exception as e:
  125. logger.error(f"❌ Error initializing tracking: {e}")
  126. async def _monitor_loop(self):
  127. """Main monitoring loop that runs every BOT_HEARTBEAT_SECONDS."""
  128. try:
  129. loop_count = 0
  130. while self._monitoring_active:
  131. # 🆕 PHASE 2: Check active trades for pending stop loss activation first (highest priority)
  132. await self._activate_pending_stop_losses_from_active_trades()
  133. await self._check_order_fills()
  134. await self._check_price_alarms()
  135. await self._check_external_trades()
  136. await self._check_pending_triggers()
  137. await self._check_automatic_risk_management()
  138. await self._check_external_stop_loss_orders()
  139. # Run orphaned stop loss cleanup every 10 heartbeats (less frequent but regular)
  140. loop_count += 1
  141. if loop_count % 10 == 0:
  142. await self._cleanup_orphaned_stop_losses()
  143. await self._cleanup_external_stop_loss_tracking()
  144. loop_count = 0 # Reset counter to prevent overflow
  145. await asyncio.sleep(Config.BOT_HEARTBEAT_SECONDS)
  146. except asyncio.CancelledError:
  147. logger.info("Market monitor loop cancelled")
  148. raise
  149. except Exception as e:
  150. logger.error(f"Error in market monitor loop: {e}")
  151. # Restart after error
  152. if self._monitoring_active:
  153. await asyncio.sleep(5)
  154. await self._monitor_loop()
  155. async def _check_order_fills(self):
  156. """Check for filled orders and send notifications."""
  157. try:
  158. # Get current orders and positions
  159. current_orders = self.trading_engine.get_orders() or []
  160. current_positions = self.trading_engine.get_positions() or []
  161. # Get current order IDs
  162. current_order_ids = {order.get('id') for order in current_orders if order.get('id')}
  163. # Find filled orders (orders that were in last_known_orders but not in current_orders)
  164. disappeared_order_ids = self.last_known_orders - current_order_ids
  165. if disappeared_order_ids:
  166. logger.info(f"🎯 Detected {len(disappeared_order_ids)} bot orders no longer open: {list(disappeared_order_ids)}. Corresponding fills (if any) are processed by external trade checker.")
  167. await self._process_disappeared_orders(disappeared_order_ids)
  168. # Update tracking data for open bot orders
  169. self.last_known_orders = current_order_ids
  170. # Position state is primarily managed by TradingStats based on all fills.
  171. # This local tracking can provide supplementary logging if needed.
  172. # await self._update_position_tracking(current_positions)
  173. except Exception as e:
  174. logger.error(f"❌ Error checking order fills: {e}")
  175. async def _process_filled_orders(self, filled_order_ids: set, current_positions: list):
  176. """Process filled orders using enhanced position tracking."""
  177. try:
  178. # For bot-initiated orders, we'll detect changes in position size
  179. # and send appropriate notifications using the enhanced system
  180. # This method will be triggered when orders placed through the bot are filled
  181. # The external trade monitoring will handle trades made outside the bot
  182. # Update position tracking based on current positions
  183. await self._update_position_tracking(current_positions)
  184. except Exception as e:
  185. logger.error(f"❌ Error processing filled orders: {e}")
  186. async def _update_position_tracking(self, current_positions: list):
  187. """Update position tracking and calculate P&L changes."""
  188. try:
  189. new_position_map = {}
  190. for position in current_positions:
  191. symbol = position.get('symbol')
  192. contracts = float(position.get('contracts', 0))
  193. entry_price = float(position.get('entryPx', 0))
  194. if symbol and contracts != 0:
  195. new_position_map[symbol] = {
  196. 'contracts': contracts,
  197. 'entry_price': entry_price
  198. }
  199. # Compare with previous positions to detect changes
  200. for symbol, new_data in new_position_map.items():
  201. old_data = self.last_known_positions.get(symbol)
  202. if not old_data:
  203. # New position opened
  204. logger.info(f"📈 New position detected (observed by MarketMonitor): {symbol} {new_data['contracts']} @ ${new_data['entry_price']:.4f}. TradingStats is the definitive source.")
  205. elif abs(new_data['contracts'] - old_data['contracts']) > 0.000001:
  206. # Position size changed
  207. change = new_data['contracts'] - old_data['contracts']
  208. logger.info(f"📊 Position change detected (observed by MarketMonitor): {symbol} {change:+.6f} contracts. TradingStats is the definitive source.")
  209. # Check for closed positions
  210. for symbol in self.last_known_positions:
  211. if symbol not in new_position_map:
  212. logger.info(f"📉 Position closed (observed by MarketMonitor): {symbol}. TradingStats is the definitive source.")
  213. # Update tracking
  214. self.last_known_positions = new_position_map
  215. except Exception as e:
  216. logger.error(f"❌ Error updating position tracking: {e}")
  217. async def _process_disappeared_orders(self, disappeared_order_ids: set):
  218. """Log and investigate bot orders that have disappeared from the exchange."""
  219. stats = self.trading_engine.get_stats()
  220. if not stats:
  221. logger.warning("⚠️ TradingStats not available in _process_disappeared_orders.")
  222. return
  223. try:
  224. total_linked_cancelled = 0
  225. external_cancellations = []
  226. for exchange_oid in disappeared_order_ids:
  227. order_in_db = stats.get_order_by_exchange_id(exchange_oid)
  228. if order_in_db:
  229. last_status = order_in_db.get('status', 'unknown')
  230. order_type = order_in_db.get('type', 'unknown')
  231. symbol = order_in_db.get('symbol', 'unknown')
  232. token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0]
  233. logger.info(f"Order {exchange_oid} was in our DB with status '{last_status}' but has now disappeared from exchange.")
  234. # Check if this was an unexpected disappearance (likely external cancellation)
  235. active_statuses = ['open', 'submitted', 'partially_filled', 'pending_submission']
  236. if last_status in active_statuses:
  237. logger.warning(f"⚠️ EXTERNAL CANCELLATION: Order {exchange_oid} with status '{last_status}' was likely cancelled externally on Hyperliquid")
  238. stats.update_order_status(exchange_order_id=exchange_oid, new_status='cancelled_externally')
  239. # Track external cancellations for notification
  240. external_cancellations.append({
  241. 'exchange_oid': exchange_oid,
  242. 'token': token,
  243. 'type': order_type,
  244. 'last_status': last_status
  245. })
  246. # Send notification about external cancellation
  247. if self.notification_manager:
  248. await self.notification_manager.send_generic_notification(
  249. f"⚠️ <b>External Order Cancellation Detected</b>\n\n"
  250. f"Token: {token}\n"
  251. f"Order Type: {order_type.replace('_', ' ').title()}\n"
  252. f"Exchange Order ID: <code>{exchange_oid[:8]}...</code>\n"
  253. f"Previous Status: {last_status.replace('_', ' ').title()}\n"
  254. f"Source: Cancelled directly on Hyperliquid\n"
  255. f"Time: {datetime.now().strftime('%H:%M:%S')}\n\n"
  256. f"🤖 Bot status updated automatically"
  257. )
  258. # 🔧 EDGE CASE FIX: Wait briefly before cancelling stop losses
  259. # Sometimes orders are cancelled externally but fills come through simultaneously
  260. logger.info(f"⏳ Waiting 3 seconds to check for potential fills before cancelling stop losses for {exchange_oid}")
  261. await asyncio.sleep(3)
  262. # Re-check the order status after waiting - it might have been filled
  263. order_in_db_updated = stats.get_order_by_exchange_id(exchange_oid)
  264. if order_in_db_updated and order_in_db_updated.get('status') in ['filled', 'partially_filled']:
  265. logger.info(f"✅ Order {exchange_oid} was filled during the wait period - NOT cancelling stop losses")
  266. # Don't cancel stop losses - let them be activated normally
  267. continue
  268. # Additional check: Look for very recent fills that might match this order
  269. recent_fill_found = await self._check_for_recent_fills_for_order(exchange_oid, order_in_db)
  270. if recent_fill_found:
  271. logger.info(f"✅ Found recent fill for order {exchange_oid} - NOT cancelling stop losses")
  272. continue
  273. else:
  274. # Normal completion/cancellation - update status
  275. stats.update_order_status(exchange_order_id=exchange_oid, new_status='disappeared_from_exchange')
  276. # Cancel any pending stop losses linked to this order (only if not filled)
  277. if order_in_db.get('bot_order_ref_id'):
  278. # Double-check one more time that the order wasn't filled
  279. final_order_check = stats.get_order_by_exchange_id(exchange_oid)
  280. if final_order_check and final_order_check.get('status') in ['filled', 'partially_filled']:
  281. logger.info(f"🛑 Order {exchange_oid} was filled - preserving stop losses")
  282. continue
  283. cancelled_sl_count = stats.cancel_linked_orders(
  284. parent_bot_order_ref_id=order_in_db['bot_order_ref_id'],
  285. new_status='cancelled_parent_disappeared'
  286. )
  287. total_linked_cancelled += cancelled_sl_count
  288. if cancelled_sl_count > 0:
  289. logger.info(f"Cancelled {cancelled_sl_count} pending stop losses linked to disappeared order {exchange_oid}")
  290. if self.notification_manager:
  291. await self.notification_manager.send_generic_notification(
  292. f"🛑 <b>Linked Stop Losses Cancelled</b>\n\n"
  293. f"Token: {token}\n"
  294. f"Cancelled: {cancelled_sl_count} stop loss(es)\n"
  295. f"Reason: Parent order {exchange_oid[:8]}... disappeared\n"
  296. f"Likely Cause: External cancellation on Hyperliquid\n"
  297. f"Time: {datetime.now().strftime('%H:%M:%S')}"
  298. )
  299. else:
  300. logger.warning(f"Order {exchange_oid} disappeared from exchange but was not found in our DB. This might be an order placed externally.")
  301. # Send summary notification if multiple external cancellations occurred
  302. if len(external_cancellations) > 1:
  303. tokens_affected = list(set(item['token'] for item in external_cancellations))
  304. if self.notification_manager:
  305. await self.notification_manager.send_generic_notification(
  306. f"⚠️ <b>Multiple External Cancellations Detected</b>\n\n"
  307. f"Orders Cancelled: {len(external_cancellations)}\n"
  308. f"Tokens Affected: {', '.join(tokens_affected)}\n"
  309. f"Source: Direct cancellation on Hyperliquid\n"
  310. f"Linked Stop Losses Cancelled: {total_linked_cancelled}\n"
  311. f"Time: {datetime.now().strftime('%H:%M:%S')}\n\n"
  312. f"💡 Check individual orders for details"
  313. )
  314. except Exception as e:
  315. logger.error(f"❌ Error processing disappeared orders: {e}", exc_info=True)
  316. async def _check_price_alarms(self):
  317. """Check price alarms and trigger notifications."""
  318. try:
  319. active_alarms = self.alarm_manager.get_all_active_alarms()
  320. if not active_alarms:
  321. return
  322. # Group alarms by token to minimize API calls
  323. tokens_to_check = list(set(alarm['token'] for alarm in active_alarms))
  324. for token in tokens_to_check:
  325. try:
  326. # Get current market price
  327. symbol = f"{token}/USDC:USDC"
  328. market_data = self.trading_engine.get_market_data(symbol)
  329. if not market_data or not market_data.get('ticker'):
  330. continue
  331. current_price = float(market_data['ticker'].get('last', 0))
  332. if current_price <= 0:
  333. continue
  334. # Check alarms for this token
  335. token_alarms = [alarm for alarm in active_alarms if alarm['token'] == token]
  336. for alarm in token_alarms:
  337. target_price = alarm['target_price']
  338. direction = alarm['direction']
  339. # Check if alarm should trigger
  340. should_trigger = False
  341. if direction == 'above' and current_price >= target_price:
  342. should_trigger = True
  343. elif direction == 'below' and current_price <= target_price:
  344. should_trigger = True
  345. if should_trigger:
  346. # Trigger the alarm
  347. triggered_alarm = self.alarm_manager.trigger_alarm(alarm['id'], current_price)
  348. if triggered_alarm:
  349. await self._send_alarm_notification(triggered_alarm)
  350. except Exception as e:
  351. logger.error(f"Error checking alarms for {token}: {e}")
  352. except Exception as e:
  353. logger.error(f"❌ Error checking price alarms: {e}")
  354. async def _send_alarm_notification(self, alarm: Dict[str, Any]):
  355. """Send notification for triggered alarm."""
  356. try:
  357. # Send through notification manager if available
  358. if self.notification_manager:
  359. await self.notification_manager.send_alarm_triggered_notification(
  360. alarm['token'],
  361. alarm['target_price'],
  362. alarm['triggered_price'],
  363. alarm['direction']
  364. )
  365. else:
  366. # Fallback to logging if notification manager not available
  367. logger.info(f"🔔 ALARM TRIGGERED: {alarm['token']} @ ${alarm['triggered_price']:,.2f}")
  368. except Exception as e:
  369. logger.error(f"❌ Error sending alarm notification: {e}")
  370. async def _check_external_trades(self):
  371. """Check for trades made outside the Telegram bot and update stats."""
  372. try:
  373. # Get recent fills from exchange
  374. recent_fills = self.trading_engine.get_recent_fills()
  375. if not recent_fills:
  376. logger.debug("No recent fills data available")
  377. return
  378. # Get last processed timestamp from database
  379. if not hasattr(self, '_last_processed_trade_time') or self._last_processed_trade_time is None:
  380. try:
  381. last_time_str = self.trading_engine.stats._get_metadata('last_processed_trade_time')
  382. if last_time_str:
  383. self._last_processed_trade_time = datetime.fromisoformat(last_time_str)
  384. logger.debug(f"Loaded last_processed_trade_time from DB: {self._last_processed_trade_time}")
  385. else:
  386. # If no last processed time, start from 1 hour ago to avoid processing too much history
  387. self._last_processed_trade_time = datetime.now(timezone.utc) - timedelta(hours=1)
  388. logger.info("No last_processed_trade_time found, setting to 1 hour ago (UTC).")
  389. except Exception as e:
  390. logger.warning(f"Could not load last_processed_trade_time from DB: {e}")
  391. self._last_processed_trade_time = datetime.now(timezone.utc) - timedelta(hours=1)
  392. # Process new fills
  393. external_trades_processed = 0
  394. symbols_with_fills = set() # Track symbols that had fills for stop loss activation
  395. for fill in recent_fills:
  396. try:
  397. # Parse fill data - CCXT format from fetch_my_trades
  398. trade_id = fill.get('id') # CCXT uses 'id' for trade ID
  399. timestamp_ms = fill.get('timestamp') # CCXT uses 'timestamp' (milliseconds)
  400. symbol = fill.get('symbol') # CCXT uses 'symbol' in full format like 'LTC/USDC:USDC'
  401. side = fill.get('side') # CCXT uses 'side' ('buy' or 'sell')
  402. amount = float(fill.get('amount', 0)) # CCXT uses 'amount'
  403. price = float(fill.get('price', 0)) # CCXT uses 'price'
  404. # Convert timestamp
  405. if timestamp_ms:
  406. timestamp_dt = datetime.fromtimestamp(timestamp_ms / 1000, tz=timezone.utc)
  407. else:
  408. timestamp_dt = datetime.now(timezone.utc)
  409. # Skip if already processed
  410. if timestamp_dt <= self._last_processed_trade_time:
  411. continue
  412. # Process as external trade if we reach here
  413. if symbol and side and amount > 0 and price > 0:
  414. # Symbol is already in full format for CCXT
  415. full_symbol = symbol
  416. token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0]
  417. # Check if this might be a bot order fill by looking for exchange order ID
  418. # CCXT might have this in 'info' sub-object with the raw exchange data
  419. exchange_order_id_from_fill = None
  420. if 'info' in fill and isinstance(fill['info'], dict):
  421. # Look for Hyperliquid order ID in the raw response
  422. exchange_order_id_from_fill = fill['info'].get('oid')
  423. # 🆕 Check if this fill corresponds to an external stop loss order
  424. is_external_stop_loss = False
  425. stop_loss_info = None
  426. if exchange_order_id_from_fill and exchange_order_id_from_fill in self._external_stop_loss_orders:
  427. is_external_stop_loss = True
  428. stop_loss_info = self._external_stop_loss_orders[exchange_order_id_from_fill]
  429. logger.info(f"🛑 EXTERNAL STOP LOSS EXECUTION: {token} - Order {exchange_order_id_from_fill} filled @ ${price:.2f}")
  430. logger.info(f"🔍 Processing {'external stop loss' if is_external_stop_loss else 'external trade'}: {trade_id} - {side} {amount} {full_symbol} @ ${price:.2f}")
  431. stats = self.trading_engine.stats
  432. if not stats:
  433. logger.warning("⚠️ TradingStats not available in _check_external_trades.")
  434. continue
  435. # If this is an external stop loss execution, handle it specially
  436. if is_external_stop_loss and stop_loss_info:
  437. # 🧹 PHASE 3: Close active trade for stop loss execution
  438. active_trade = stats.get_active_trade_by_symbol(full_symbol, status='active')
  439. if active_trade:
  440. entry_price = active_trade.get('entry_price', 0)
  441. active_trade_side = active_trade.get('side')
  442. # Calculate realized P&L
  443. if active_trade_side == 'buy': # Long position
  444. realized_pnl = amount * (price - entry_price)
  445. else: # Short position
  446. realized_pnl = amount * (entry_price - price)
  447. stats.update_active_trade_closed(
  448. active_trade['id'], realized_pnl, timestamp_dt.isoformat()
  449. )
  450. logger.info(f"🛑 Active trade {active_trade['id']} closed via external stop loss - P&L: ${realized_pnl:.2f}")
  451. # Send specialized stop loss execution notification
  452. if self.notification_manager:
  453. await self.notification_manager.send_stop_loss_execution_notification(
  454. stop_loss_info, full_symbol, side, amount, price, 'long_closed', timestamp_dt.isoformat()
  455. )
  456. # Remove from tracking since it's now executed
  457. del self._external_stop_loss_orders[exchange_order_id_from_fill]
  458. logger.info(f"🛑 Processed external stop loss execution: {side} {amount} {full_symbol} @ ${price:.2f} (long_closed)")
  459. else:
  460. # Handle as regular external trade
  461. # Check if this corresponds to a bot order by exchange_order_id
  462. linked_order_db_id = None
  463. if exchange_order_id_from_fill:
  464. order_in_db = stats.get_order_by_exchange_id(exchange_order_id_from_fill)
  465. if order_in_db:
  466. linked_order_db_id = order_in_db.get('id')
  467. logger.info(f"🔗 Linked external fill {trade_id} to bot order DB ID {linked_order_db_id} (Exchange OID: {exchange_order_id_from_fill})")
  468. # Update order status to filled if it was open
  469. current_status = order_in_db.get('status', '')
  470. if current_status in ['open', 'partially_filled', 'pending_submission']:
  471. # Determine if this is a partial or full fill
  472. order_amount_requested = float(order_in_db.get('amount_requested', 0))
  473. if abs(amount - order_amount_requested) < 0.000001: # Allow small floating point differences
  474. new_status_after_fill = 'filled'
  475. else:
  476. new_status_after_fill = 'partially_filled'
  477. stats.update_order_status(
  478. order_db_id=linked_order_db_id,
  479. new_status=new_status_after_fill
  480. )
  481. logger.info(f"📊 Updated bot order {linked_order_db_id} status: {current_status} → {new_status_after_fill}")
  482. # Check if this order is now fully filled and has pending stop losses to activate
  483. if new_status_after_fill == 'filled':
  484. await self._activate_pending_stop_losses(order_in_db, stats)
  485. # 🧹 PHASE 3: Record trade simply, use active_trades for tracking
  486. stats.record_trade(
  487. full_symbol, side, amount, price,
  488. exchange_fill_id=trade_id, trade_type="external",
  489. timestamp=timestamp_dt.isoformat(),
  490. linked_order_table_id_to_link=linked_order_db_id
  491. )
  492. # Derive action type from trade context for notifications
  493. if linked_order_db_id:
  494. # Bot order - determine action from order context
  495. order_side = order_in_db.get('side', side).lower()
  496. if order_side == 'buy':
  497. action_type = 'long_opened'
  498. elif order_side == 'sell':
  499. action_type = 'short_opened'
  500. else:
  501. action_type = 'position_opened'
  502. else:
  503. # External trade - determine from current active trades
  504. existing_active_trade = stats.get_active_trade_by_symbol(full_symbol, status='active')
  505. if existing_active_trade:
  506. # Has active position - this is likely a closure
  507. existing_side = existing_active_trade.get('side')
  508. if existing_side == 'buy' and side.lower() == 'sell':
  509. action_type = 'long_closed'
  510. elif existing_side == 'sell' and side.lower() == 'buy':
  511. action_type = 'short_closed'
  512. else:
  513. action_type = 'position_modified'
  514. else:
  515. # No active position - this opens a new one
  516. if side.lower() == 'buy':
  517. action_type = 'long_opened'
  518. else:
  519. action_type = 'short_opened'
  520. # 🧹 PHASE 3: Update active trades based on action
  521. if linked_order_db_id:
  522. # Bot order - update linked active trade
  523. order_data = stats.get_order_by_db_id(linked_order_db_id)
  524. if order_data:
  525. exchange_order_id = order_data.get('exchange_order_id')
  526. # Find active trade by entry order ID
  527. all_active_trades = stats.get_all_active_trades()
  528. for at in all_active_trades:
  529. if at.get('entry_order_id') == exchange_order_id:
  530. active_trade_id = at['id']
  531. current_status = at['status']
  532. if current_status == 'pending' and action_type in ['long_opened', 'short_opened']:
  533. # Entry order filled - update active trade to active
  534. stats.update_active_trade_opened(
  535. active_trade_id, price, amount, timestamp_dt.isoformat()
  536. )
  537. logger.info(f"🆕 Active trade {active_trade_id} opened via fill {trade_id}")
  538. elif current_status == 'active' and action_type in ['long_closed', 'short_closed']:
  539. # Exit order filled - calculate P&L and close active trade
  540. entry_price = at.get('entry_price', 0)
  541. active_trade_side = at.get('side')
  542. # Calculate realized P&L
  543. if active_trade_side == 'buy': # Long position
  544. realized_pnl = amount * (price - entry_price)
  545. else: # Short position
  546. realized_pnl = amount * (entry_price - price)
  547. stats.update_active_trade_closed(
  548. active_trade_id, realized_pnl, timestamp_dt.isoformat()
  549. )
  550. logger.info(f"🆕 Active trade {active_trade_id} closed via fill {trade_id} - P&L: ${realized_pnl:.2f}")
  551. break
  552. elif action_type in ['long_opened', 'short_opened']:
  553. # External trade that opened a position - create external active trade
  554. active_trade_id = stats.create_active_trade(
  555. symbol=full_symbol,
  556. side=side.lower(),
  557. entry_order_id=None, # External order
  558. trade_type='external'
  559. )
  560. if active_trade_id:
  561. stats.update_active_trade_opened(
  562. active_trade_id, price, amount, timestamp_dt.isoformat()
  563. )
  564. logger.info(f"🆕 Created external active trade {active_trade_id} for {side.upper()} {full_symbol}")
  565. elif action_type in ['long_closed', 'short_closed']:
  566. # External closure - close any active trade for this symbol
  567. active_trade = stats.get_active_trade_by_symbol(full_symbol, status='active')
  568. if active_trade:
  569. entry_price = active_trade.get('entry_price', 0)
  570. active_trade_side = active_trade.get('side')
  571. # Calculate realized P&L
  572. if active_trade_side == 'buy': # Long position
  573. realized_pnl = amount * (price - entry_price)
  574. else: # Short position
  575. realized_pnl = amount * (entry_price - price)
  576. stats.update_active_trade_closed(
  577. active_trade['id'], realized_pnl, timestamp_dt.isoformat()
  578. )
  579. logger.info(f"🆕 External closure: Active trade {active_trade['id']} closed - P&L: ${realized_pnl:.2f}")
  580. # Track symbol for potential stop loss activation
  581. symbols_with_fills.add(token)
  582. # Send notification for external trade
  583. if self.notification_manager:
  584. await self.notification_manager.send_external_trade_notification(
  585. full_symbol, side, amount, price, action_type, timestamp_dt.isoformat()
  586. )
  587. logger.info(f"📋 Processed external trade: {side} {amount} {full_symbol} @ ${price:.2f} ({action_type}) using timestamp {timestamp_dt.isoformat()}")
  588. external_trades_processed += 1
  589. # Update last processed time
  590. self._last_processed_trade_time = timestamp_dt
  591. except Exception as e:
  592. logger.error(f"Error processing fill {fill}: {e}")
  593. continue
  594. # Save the last processed timestamp to database
  595. if external_trades_processed > 0:
  596. self.trading_engine.stats._set_metadata('last_processed_trade_time', self._last_processed_trade_time.isoformat())
  597. logger.info(f"💾 Saved MarketMonitor state (last_processed_trade_time) to DB: {self._last_processed_trade_time.isoformat()}")
  598. logger.info(f"📊 Processed {external_trades_processed} external trades")
  599. # Additional check: Activate any pending stop losses for symbols that had fills
  600. # This is a safety net for cases where the fill linking above didn't work
  601. await self._check_pending_stop_losses_for_filled_symbols(symbols_with_fills)
  602. except Exception as e:
  603. logger.error(f"❌ Error checking external trades: {e}", exc_info=True)
  604. async def _check_pending_stop_losses_for_filled_symbols(self, symbols_with_fills: set):
  605. """
  606. Safety net: Check if any symbols that just had fills have pending stop losses
  607. that should be activated. This handles cases where fill linking failed.
  608. """
  609. try:
  610. if not symbols_with_fills:
  611. return
  612. stats = self.trading_engine.stats
  613. if not stats:
  614. return
  615. # Get all pending stop losses
  616. pending_stop_losses = stats.get_orders_by_status('pending_trigger', 'stop_limit_trigger')
  617. if not pending_stop_losses:
  618. return
  619. # Check each pending stop loss to see if its symbol had fills
  620. activated_any = False
  621. for sl_order in pending_stop_losses:
  622. symbol = sl_order.get('symbol', '')
  623. token = symbol.split('/')[0] if '/' in symbol else ''
  624. if token in symbols_with_fills:
  625. # This symbol had fills - check if we should activate the stop loss
  626. parent_bot_ref_id = sl_order.get('parent_bot_order_ref_id')
  627. if parent_bot_ref_id:
  628. # Get the parent order
  629. parent_order = stats.get_order_by_bot_ref_id(parent_bot_ref_id)
  630. if parent_order and parent_order.get('status') in ['filled', 'partially_filled']:
  631. logger.info(f"🛑 Safety net activation: Found pending SL for {token} with filled parent order {parent_bot_ref_id}")
  632. await self._activate_pending_stop_losses(parent_order, stats)
  633. activated_any = True
  634. if activated_any:
  635. logger.info("✅ Safety net activated pending stop losses for symbols with recent fills")
  636. except Exception as e:
  637. logger.error(f"Error in safety net stop loss activation: {e}", exc_info=True)
  638. async def _check_pending_triggers(self):
  639. """Check and process pending conditional triggers (e.g., SL/TP)."""
  640. stats = self.trading_engine.get_stats()
  641. if not stats:
  642. logger.warning("⚠️ TradingStats not available in _check_pending_triggers.")
  643. return
  644. try:
  645. # Fetch pending SL triggers (adjust type if TP triggers are different)
  646. # For now, assuming 'STOP_LIMIT_TRIGGER' is the type used for SLs that become limit orders
  647. pending_sl_triggers = stats.get_orders_by_status(status='pending_trigger', order_type_filter='stop_limit_trigger')
  648. if not pending_sl_triggers:
  649. return
  650. logger.debug(f"Found {len(pending_sl_triggers)} pending SL triggers to check.")
  651. for trigger_order in pending_sl_triggers:
  652. symbol = trigger_order['symbol']
  653. trigger_price = trigger_order['price'] # This is the stop price
  654. trigger_side = trigger_order['side'] # This is the side of the SL order (e.g., sell for a long position's SL)
  655. order_db_id = trigger_order['id']
  656. parent_ref_id = trigger_order.get('parent_bot_order_ref_id')
  657. if not symbol or trigger_price is None:
  658. logger.warning(f"Invalid trigger order data for DB ID {order_db_id}, skipping: {trigger_order}")
  659. continue
  660. market_data = self.trading_engine.get_market_data(symbol)
  661. if not market_data or not market_data.get('ticker'):
  662. logger.warning(f"Could not fetch market data for {symbol} to check SL trigger {order_db_id}.")
  663. continue
  664. current_price = float(market_data['ticker'].get('last', 0))
  665. if current_price <= 0:
  666. logger.warning(f"Invalid current price ({current_price}) for {symbol} checking SL trigger {order_db_id}.")
  667. continue
  668. trigger_hit = False
  669. if trigger_side.lower() == 'sell' and current_price <= trigger_price:
  670. trigger_hit = True
  671. logger.info(f"🔴 SL TRIGGER HIT (Sell): Order DB ID {order_db_id}, Symbol {symbol}, Trigger@ ${trigger_price:.4f}, Market@ ${current_price:.4f}")
  672. elif trigger_side.lower() == 'buy' and current_price >= trigger_price:
  673. trigger_hit = True
  674. logger.info(f"🟢 SL TRIGGER HIT (Buy): Order DB ID {order_db_id}, Symbol {symbol}, Trigger@ ${trigger_price:.4f}, Market@ ${current_price:.4f}")
  675. if trigger_hit:
  676. logger.info(f"Attempting to execute actual stop order for triggered DB ID: {order_db_id} (Parent Bot Ref: {trigger_order.get('parent_bot_order_ref_id')})")
  677. execution_result = await self.trading_engine.execute_triggered_stop_order(original_trigger_order_db_id=order_db_id)
  678. notification_message_detail = ""
  679. if execution_result.get("success"):
  680. new_trigger_status = 'triggered_order_placed'
  681. placed_sl_details = execution_result.get("placed_sl_order_details", {})
  682. logger.info(f"Successfully placed actual SL order from trigger {order_db_id}. New SL Order DB ID: {placed_sl_details.get('order_db_id')}, Exchange ID: {placed_sl_details.get('exchange_order_id')}")
  683. notification_message_detail = f"Actual SL order placed (New DB ID: {placed_sl_details.get('order_db_id', 'N/A')})."
  684. else:
  685. new_trigger_status = 'trigger_execution_failed'
  686. error_msg = execution_result.get("error", "Unknown error during SL execution.")
  687. logger.error(f"Failed to execute actual SL order from trigger {order_db_id}: {error_msg}")
  688. notification_message_detail = f"Failed to place actual SL order: {error_msg}"
  689. stats.update_order_status(order_db_id=order_db_id, new_status=new_trigger_status)
  690. if self.notification_manager:
  691. await self.notification_manager.send_generic_notification(
  692. f"🔔 Stop-Loss Update!\nSymbol: {symbol}\nSide: {trigger_side.upper()}\nTrigger Price: ${trigger_price:.4f}\nMarket Price: ${current_price:.4f}\n(Original Trigger DB ID: {order_db_id}, Parent: {parent_ref_id or 'N/A'})\nStatus: {new_trigger_status.replace('_', ' ').title()}\nDetails: {notification_message_detail}"
  693. )
  694. except Exception as e:
  695. logger.error(f"❌ Error checking pending SL triggers: {e}", exc_info=True)
  696. async def _check_automatic_risk_management(self):
  697. """Check for automatic stop loss triggers based on Config.STOP_LOSS_PERCENTAGE as safety net."""
  698. try:
  699. # Skip if risk management is disabled or percentage is 0
  700. if not getattr(Config, 'RISK_MANAGEMENT_ENABLED', True) or Config.STOP_LOSS_PERCENTAGE <= 0:
  701. return
  702. # Get current positions
  703. positions = self.trading_engine.get_positions()
  704. if not positions:
  705. # If no positions exist, clean up any orphaned pending stop losses
  706. await self._cleanup_orphaned_stop_losses()
  707. return
  708. for position in positions:
  709. try:
  710. symbol = position.get('symbol', '')
  711. contracts = float(position.get('contracts', 0))
  712. entry_price = float(position.get('entryPx', 0))
  713. mark_price = float(position.get('markPx', 0))
  714. unrealized_pnl = float(position.get('unrealizedPnl', 0))
  715. # Skip if no position or missing data
  716. if contracts == 0 or entry_price <= 0 or mark_price <= 0:
  717. continue
  718. # Calculate PnL percentage based on entry value
  719. entry_value = abs(contracts) * entry_price
  720. if entry_value <= 0:
  721. continue
  722. pnl_percentage = (unrealized_pnl / entry_value) * 100
  723. # Check if loss exceeds the safety threshold
  724. if pnl_percentage <= -Config.STOP_LOSS_PERCENTAGE:
  725. token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0]
  726. position_side = "LONG" if contracts > 0 else "SHORT"
  727. logger.warning(f"🚨 AUTOMATIC STOP LOSS TRIGGERED: {token} {position_side} position has {pnl_percentage:.2f}% loss (threshold: -{Config.STOP_LOSS_PERCENTAGE}%)")
  728. # Send notification before attempting exit
  729. if self.notification_manager:
  730. await self.notification_manager.send_generic_notification(
  731. f"🚨 AUTOMATIC STOP LOSS TRIGGERED!\n"
  732. f"Token: {token}\n"
  733. f"Position: {position_side} {abs(contracts):.6f}\n"
  734. f"Entry Price: ${entry_price:.4f}\n"
  735. f"Current Price: ${mark_price:.4f}\n"
  736. f"Unrealized PnL: ${unrealized_pnl:.2f} ({pnl_percentage:.2f}%)\n"
  737. f"Safety Threshold: -{Config.STOP_LOSS_PERCENTAGE}%\n"
  738. f"Action: Executing emergency exit order..."
  739. )
  740. # Execute emergency exit order
  741. exit_result = await self.trading_engine.execute_exit_order(token)
  742. if exit_result.get('success'):
  743. logger.info(f"✅ Emergency exit order placed successfully for {token}. Order details: {exit_result.get('order_placed_details', {})}")
  744. # Cancel any pending stop losses for this symbol since position is now closed
  745. stats = self.trading_engine.get_stats()
  746. if stats:
  747. cancelled_sl_count = stats.cancel_pending_stop_losses_by_symbol(
  748. symbol=symbol,
  749. new_status='cancelled_auto_exit'
  750. )
  751. if cancelled_sl_count > 0:
  752. logger.info(f"🛑 Cancelled {cancelled_sl_count} pending stop losses for {symbol} after automatic exit")
  753. if self.notification_manager:
  754. await self.notification_manager.send_generic_notification(
  755. f"✅ <b>Emergency Exit Completed</b>\n\n"
  756. f"📊 <b>Position:</b> {token} {position_side}\n"
  757. f"📉 <b>Loss:</b> {pnl_percentage:.2f}% (${unrealized_pnl:.2f})\n"
  758. f"⚠️ <b>Threshold:</b> -{Config.STOP_LOSS_PERCENTAGE}%\n"
  759. f"✅ <b>Action:</b> Position automatically closed\n"
  760. f"💰 <b>Exit Price:</b> ~${mark_price:.2f}\n"
  761. f"🆔 <b>Order ID:</b> {exit_result.get('order_placed_details', {}).get('exchange_order_id', 'N/A')}\n"
  762. f"{f'🛑 <b>Cleanup:</b> Cancelled {cancelled_sl_count} pending stop losses' if cancelled_sl_count > 0 else ''}\n\n"
  763. f"🛡️ This was an automatic safety stop triggered by the risk management system."
  764. )
  765. else:
  766. error_msg = exit_result.get('error', 'Unknown error')
  767. logger.error(f"❌ Failed to execute emergency exit order for {token}: {error_msg}")
  768. if self.notification_manager:
  769. await self.notification_manager.send_generic_notification(
  770. f"❌ <b>CRITICAL: Emergency Exit Failed!</b>\n\n"
  771. f"📊 <b>Position:</b> {token} {position_side}\n"
  772. f"📉 <b>Loss:</b> {pnl_percentage:.2f}%\n"
  773. f"❌ <b>Error:</b> {error_msg}\n\n"
  774. f"⚠️ <b>MANUAL INTERVENTION REQUIRED</b>\n"
  775. f"Please close this position manually via /exit {token}"
  776. )
  777. except Exception as pos_error:
  778. logger.error(f"Error processing position for automatic stop loss: {pos_error}")
  779. continue
  780. except Exception as e:
  781. logger.error(f"❌ Error in automatic risk management check: {e}", exc_info=True)
  782. async def _cleanup_orphaned_stop_losses(self):
  783. """Clean up pending stop losses that no longer have corresponding positions OR whose parent orders have been cancelled/failed."""
  784. try:
  785. stats = self.trading_engine.get_stats()
  786. if not stats:
  787. return
  788. # Get all pending stop loss triggers
  789. pending_stop_losses = stats.get_orders_by_status('pending_trigger', 'stop_limit_trigger')
  790. if not pending_stop_losses:
  791. return
  792. logger.debug(f"Checking {len(pending_stop_losses)} pending stop losses for orphaned orders")
  793. # Get current positions to check against
  794. current_positions = self.trading_engine.get_positions()
  795. position_symbols = set()
  796. if current_positions:
  797. for pos in current_positions:
  798. symbol = pos.get('symbol')
  799. contracts = float(pos.get('contracts', 0))
  800. if symbol and contracts != 0:
  801. position_symbols.add(symbol)
  802. # Check each pending stop loss
  803. orphaned_count = 0
  804. for sl_order in pending_stop_losses:
  805. symbol = sl_order.get('symbol')
  806. order_db_id = sl_order.get('id')
  807. parent_bot_ref_id = sl_order.get('parent_bot_order_ref_id')
  808. should_cancel = False
  809. cancel_reason = ""
  810. # Check if parent order exists and its status
  811. if parent_bot_ref_id:
  812. parent_order = stats.get_order_by_bot_ref_id(parent_bot_ref_id)
  813. if parent_order:
  814. parent_status = parent_order.get('status', '').lower()
  815. # Cancel if parent order failed, was cancelled, or disappeared
  816. if parent_status in ['failed_submission', 'failed_submission_no_data', 'cancelled_manually',
  817. 'cancelled_externally', 'disappeared_from_exchange']:
  818. should_cancel = True
  819. cancel_reason = f"parent order {parent_status}"
  820. elif parent_status == 'filled':
  821. # Parent order filled but no position - position might have been closed externally
  822. if symbol not in position_symbols:
  823. should_cancel = True
  824. cancel_reason = "parent filled but position no longer exists"
  825. # If parent is still 'open', 'submitted', or 'partially_filled', keep the stop loss
  826. else:
  827. # Parent order not found in DB - this is truly orphaned
  828. should_cancel = True
  829. cancel_reason = "parent order not found in database"
  830. else:
  831. # No parent reference - fallback to old logic (position-based check)
  832. if symbol not in position_symbols:
  833. should_cancel = True
  834. cancel_reason = "no position exists and no parent reference"
  835. if should_cancel:
  836. # Cancel this orphaned stop loss
  837. success = stats.update_order_status(
  838. order_db_id=order_db_id,
  839. new_status='cancelled_orphaned_no_position'
  840. )
  841. if success:
  842. orphaned_count += 1
  843. token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0]
  844. logger.info(f"🧹 Cancelled orphaned stop loss for {token} (Order DB ID: {order_db_id}) - {cancel_reason}")
  845. if orphaned_count > 0:
  846. logger.info(f"🧹 Cleanup completed: Cancelled {orphaned_count} orphaned stop loss orders")
  847. if self.notification_manager:
  848. await self.notification_manager.send_generic_notification(
  849. f"🧹 <b>Cleanup Completed</b>\n\n"
  850. f"Cancelled {orphaned_count} orphaned stop loss order(s)\n"
  851. f"Reason: Parent orders cancelled/failed or positions closed externally\n"
  852. f"Time: {datetime.now().strftime('%H:%M:%S')}\n\n"
  853. f"💡 This automatic cleanup ensures stop losses stay synchronized with actual orders and positions."
  854. )
  855. except Exception as e:
  856. logger.error(f"❌ Error cleaning up orphaned stop losses: {e}", exc_info=True)
  857. async def _activate_pending_stop_losses_from_trades(self):
  858. """🆕 PHASE 4: Check trades table for pending stop loss activation first (highest priority)"""
  859. try:
  860. stats = self.trading_engine.get_stats()
  861. if not stats:
  862. return
  863. # Get open positions that need stop loss activation
  864. trades_needing_sl = stats.get_pending_stop_loss_activations()
  865. if not trades_needing_sl:
  866. return
  867. logger.debug(f"🆕 Found {len(trades_needing_sl)} open positions needing stop loss activation")
  868. for position_trade in trades_needing_sl:
  869. try:
  870. symbol = position_trade['symbol']
  871. token = symbol.split('/')[0] if '/' in symbol else symbol
  872. stop_loss_price = position_trade['stop_loss_price']
  873. position_side = position_trade['position_side']
  874. current_amount = position_trade.get('current_position_size', 0)
  875. lifecycle_id = position_trade['trade_lifecycle_id']
  876. # Get current market price
  877. current_price = None
  878. try:
  879. market_data = self.trading_engine.get_market_data(symbol)
  880. if market_data and market_data.get('ticker'):
  881. current_price = float(market_data['ticker'].get('last', 0))
  882. except Exception as price_error:
  883. logger.warning(f"Could not fetch current price for {symbol}: {price_error}")
  884. # Determine stop loss side based on position side
  885. sl_side = 'sell' if position_side == 'long' else 'buy' # Long SL = sell, Short SL = buy
  886. # Check if trigger condition is already met
  887. trigger_already_hit = False
  888. trigger_reason = ""
  889. if current_price and current_price > 0:
  890. if sl_side == 'sell' and current_price <= stop_loss_price:
  891. # LONG position stop loss - price below trigger
  892. trigger_already_hit = True
  893. trigger_reason = f"LONG SL: Current ${current_price:.4f} ≤ Stop ${stop_loss_price:.4f}"
  894. elif sl_side == 'buy' and current_price >= stop_loss_price:
  895. # SHORT position stop loss - price above trigger
  896. trigger_already_hit = True
  897. trigger_reason = f"SHORT SL: Current ${current_price:.4f} ≥ Stop ${stop_loss_price:.4f}"
  898. if trigger_already_hit:
  899. # Execute immediate market close
  900. logger.warning(f"🚨 IMMEDIATE SL EXECUTION (Trades Table): {token} - {trigger_reason}")
  901. try:
  902. exit_result = await self.trading_engine.execute_exit_order(token)
  903. if exit_result.get('success'):
  904. logger.info(f"✅ Immediate {position_side.upper()} SL execution successful for {token}")
  905. if self.notification_manager:
  906. await self.notification_manager.send_generic_notification(
  907. f"🚨 <b>Immediate Stop Loss Execution</b>\n\n"
  908. f"🆕 <b>Source: Unified Trades Table (Phase 4)</b>\n"
  909. f"Token: {token}\n"
  910. f"Position Type: {position_side.upper()}\n"
  911. f"SL Trigger Price: ${stop_loss_price:.4f}\n"
  912. f"Current Market Price: ${current_price:.4f}\n"
  913. f"Trigger Logic: {trigger_reason}\n"
  914. f"Action: Market close order placed immediately\n"
  915. f"Order ID: {exit_result.get('order_placed_details', {}).get('exchange_order_id', 'N/A')}\n"
  916. f"Lifecycle ID: {lifecycle_id[:8]}\n"
  917. f"Time: {datetime.now().strftime('%H:%M:%S')}\n\n"
  918. f"⚡ Single source of truth prevents missed stop losses"
  919. )
  920. else:
  921. logger.error(f"❌ Failed to execute immediate SL for {token}: {exit_result.get('error')}")
  922. except Exception as exec_error:
  923. logger.error(f"❌ Exception during immediate SL execution for {token}: {exec_error}")
  924. else:
  925. # Normal activation - place stop loss order
  926. try:
  927. sl_result = await self.trading_engine.execute_stop_loss_order(token, stop_loss_price)
  928. if sl_result.get('success'):
  929. sl_order_id = sl_result.get('order_placed_details', {}).get('exchange_order_id')
  930. # Link the stop loss order to the trade lifecycle
  931. stats.link_stop_loss_to_trade(lifecycle_id, sl_order_id, stop_loss_price)
  932. logger.info(f"✅ Activated {position_side.upper()} stop loss for {token}: ${stop_loss_price:.4f}")
  933. if self.notification_manager:
  934. await self.notification_manager.send_generic_notification(
  935. f"🛑 <b>Stop Loss Activated</b>\n\n"
  936. f"🆕 <b>Source: Unified Trades Table (Phase 4)</b>\n"
  937. f"Token: {token}\n"
  938. f"Position Type: {position_side.upper()}\n"
  939. f"Stop Loss Price: ${stop_loss_price:.4f}\n"
  940. f"Current Price: ${current_price:.4f if current_price else 'Unknown'}\n"
  941. f"Order ID: {sl_order_id or 'N/A'}\n"
  942. f"Lifecycle ID: {lifecycle_id[:8]}\n"
  943. f"Time: {datetime.now().strftime('%H:%M:%S')}\n\n"
  944. f"🛡️ Your position is now protected"
  945. )
  946. else:
  947. logger.error(f"❌ Failed to activate SL for {token}: {sl_result.get('error')}")
  948. except Exception as activation_error:
  949. logger.error(f"❌ Exception during SL activation for {token}: {activation_error}")
  950. except Exception as trade_error:
  951. logger.error(f"❌ Error processing position trade for SL activation: {trade_error}")
  952. except Exception as e:
  953. logger.error(f"❌ Error activating pending stop losses from trades table: {e}", exc_info=True)
  954. async def _activate_pending_stop_losses(self, order_in_db, stats):
  955. """Activate pending stop losses for a filled order, checking current price for immediate execution."""
  956. try:
  957. # Fetch pending stop losses for this order
  958. pending_stop_losses = stats.get_orders_by_status('pending_trigger', 'stop_limit_trigger', order_in_db['bot_order_ref_id'])
  959. if not pending_stop_losses:
  960. return
  961. symbol = order_in_db.get('symbol')
  962. token = symbol.split('/')[0] if '/' in symbol and symbol else 'Unknown'
  963. logger.debug(f"Found {len(pending_stop_losses)} pending stop loss(es) for filled order {order_in_db.get('exchange_order_id', 'N/A')}")
  964. # Get current market price for the symbol
  965. current_price = None
  966. try:
  967. market_data = self.trading_engine.get_market_data(symbol)
  968. if market_data and market_data.get('ticker'):
  969. current_price = float(market_data['ticker'].get('last', 0))
  970. if current_price <= 0:
  971. current_price = None
  972. except Exception as price_error:
  973. logger.warning(f"Could not fetch current price for {symbol}: {price_error}")
  974. current_price = None
  975. # Check if we still have a position for this symbol after the fill
  976. if symbol:
  977. # Try to get current position
  978. try:
  979. position = self.trading_engine.find_position(token)
  980. if position and float(position.get('contracts', 0)) != 0:
  981. # Position exists - check each stop loss
  982. activated_count = 0
  983. immediately_executed_count = 0
  984. for sl_order in pending_stop_losses:
  985. sl_trigger_price = float(sl_order.get('price', 0))
  986. sl_side = sl_order.get('side', '').lower() # 'sell' for long SL, 'buy' for short SL
  987. sl_db_id = sl_order.get('id')
  988. sl_amount = sl_order.get('amount_requested', 0)
  989. if not sl_trigger_price or not sl_side or not sl_db_id:
  990. logger.warning(f"Invalid stop loss data for DB ID {sl_db_id}, skipping")
  991. continue
  992. # Check if trigger condition is already met
  993. trigger_already_hit = False
  994. trigger_reason = ""
  995. if current_price and current_price > 0:
  996. if sl_side == 'sell' and current_price <= sl_trigger_price:
  997. # LONG position stop loss - price has fallen below trigger
  998. # Long SL = SELL order that triggers when price drops below stop price
  999. trigger_already_hit = True
  1000. trigger_reason = f"LONG SL: Current price ${current_price:.4f} ≤ Stop price ${sl_trigger_price:.4f}"
  1001. elif sl_side == 'buy' and current_price >= sl_trigger_price:
  1002. # SHORT position stop loss - price has risen above trigger
  1003. # Short SL = BUY order that triggers when price rises above stop price
  1004. trigger_already_hit = True
  1005. trigger_reason = f"SHORT SL: Current price ${current_price:.4f} ≥ Stop price ${sl_trigger_price:.4f}"
  1006. if trigger_already_hit:
  1007. # Execute immediate market close instead of activating stop loss
  1008. logger.warning(f"🚨 IMMEDIATE SL EXECUTION: {token} - {trigger_reason}")
  1009. # Update the stop loss status to show it was immediately executed
  1010. stats.update_order_status(order_db_id=sl_db_id, new_status='immediately_executed_on_activation')
  1011. # Execute market order to close position
  1012. try:
  1013. exit_result = await self.trading_engine.execute_exit_order(token)
  1014. if exit_result.get('success'):
  1015. immediately_executed_count += 1
  1016. position_side = "LONG" if sl_side == 'sell' else "SHORT"
  1017. logger.info(f"✅ Immediate {position_side} SL execution successful for {token}. Market order placed: {exit_result.get('order_placed_details', {}).get('exchange_order_id', 'N/A')}")
  1018. if self.notification_manager:
  1019. await self.notification_manager.send_generic_notification(
  1020. f"🚨 <b>Immediate Stop Loss Execution</b>\n\n"
  1021. f"Token: {token}\n"
  1022. f"Position Type: {position_side}\n"
  1023. f"SL Trigger Price: ${sl_trigger_price:.4f}\n"
  1024. f"Current Market Price: ${current_price:.4f}\n"
  1025. f"Trigger Logic: {trigger_reason}\n"
  1026. f"Action: Market close order placed immediately\n"
  1027. f"Reason: Trigger condition already met when activating\n"
  1028. f"Order ID: {exit_result.get('order_placed_details', {}).get('exchange_order_id', 'N/A')}\n"
  1029. f"Time: {datetime.now().strftime('%H:%M:%S')}\n\n"
  1030. f"⚡ This prevents waiting for a trigger that's already passed"
  1031. )
  1032. else:
  1033. logger.error(f"❌ Failed to execute immediate SL for {token}: {exit_result.get('error', 'Unknown error')}")
  1034. if self.notification_manager:
  1035. await self.notification_manager.send_generic_notification(
  1036. f"❌ <b>Immediate SL Execution Failed</b>\n\n"
  1037. f"Token: {token}\n"
  1038. f"SL Price: ${sl_trigger_price:.4f}\n"
  1039. f"Current Price: ${current_price:.4f}\n"
  1040. f"Trigger Logic: {trigger_reason}\n"
  1041. f"Error: {exit_result.get('error', 'Unknown error')}\n\n"
  1042. f"⚠️ Manual intervention may be required"
  1043. )
  1044. # Revert status since execution failed
  1045. stats.update_order_status(order_db_id=sl_db_id, new_status='activation_execution_failed')
  1046. except Exception as exec_error:
  1047. logger.error(f"❌ Exception during immediate SL execution for {token}: {exec_error}")
  1048. stats.update_order_status(order_db_id=sl_db_id, new_status='activation_execution_error')
  1049. else:
  1050. # Normal activation - trigger condition not yet met
  1051. activated_count += 1
  1052. position_side = "LONG" if sl_side == 'sell' else "SHORT"
  1053. logger.info(f"✅ Activating {position_side} stop loss for {token}: SL price ${sl_trigger_price:.4f} (Current: ${current_price:.4f if current_price else 'Unknown'})")
  1054. # Send summary notification for normal activations
  1055. if activated_count > 0 and self.notification_manager:
  1056. await self.notification_manager.send_generic_notification(
  1057. f"🛑 <b>Stop Losses Activated</b>\n\n"
  1058. f"Symbol: {token}\n"
  1059. f"Activated: {activated_count} stop loss(es)\n"
  1060. f"Current Price: ${current_price:.4f if current_price else 'Unknown'}\n"
  1061. f"Status: Monitoring for trigger conditions"
  1062. f"{f'\\n\\n⚡ Additionally executed {immediately_executed_count} stop loss(es) immediately due to current market conditions' if immediately_executed_count > 0 else ''}"
  1063. )
  1064. elif immediately_executed_count > 0 and activated_count == 0:
  1065. # All stop losses were immediately executed
  1066. if self.notification_manager:
  1067. await self.notification_manager.send_generic_notification(
  1068. f"⚡ <b>All Stop Losses Executed Immediately</b>\n\n"
  1069. f"Symbol: {token}\n"
  1070. f"Executed: {immediately_executed_count} stop loss(es)\n"
  1071. f"Reason: Market price already beyond trigger levels\n"
  1072. f"Current Price: ${current_price:.4f if current_price else 'Unknown'}\n\n"
  1073. f"🚀 Position(s) closed at market to prevent further losses"
  1074. )
  1075. else:
  1076. # No position exists (might have been closed immediately) - cancel the stop losses
  1077. cancelled_count = stats.cancel_linked_orders(
  1078. parent_bot_order_ref_id=order_in_db['bot_order_ref_id'],
  1079. new_status='cancelled_no_position'
  1080. )
  1081. if cancelled_count > 0:
  1082. logger.info(f"❌ Cancelled {cancelled_count} pending stop losses for {symbol} - no position found")
  1083. if self.notification_manager:
  1084. await self.notification_manager.send_generic_notification(
  1085. f"🛑 <b>Stop Losses Cancelled</b>\n\n"
  1086. f"Symbol: {token}\n"
  1087. f"Cancelled: {cancelled_count} stop loss(es)\n"
  1088. f"Reason: No open position found"
  1089. )
  1090. except Exception as pos_check_error:
  1091. logger.warning(f"Could not check position for {symbol} during SL activation: {pos_check_error}")
  1092. # In case of error, still try to activate (safer to have redundant SLs than none)
  1093. except Exception as e:
  1094. logger.error(f"Error in _activate_pending_stop_losses: {e}", exc_info=True)
  1095. async def _check_for_recent_fills_for_order(self, exchange_oid, order_in_db):
  1096. """Check for very recent fills that might match this order."""
  1097. try:
  1098. # Get recent fills from exchange
  1099. recent_fills = self.trading_engine.get_recent_fills()
  1100. if not recent_fills:
  1101. return False
  1102. # Get last processed timestamp from database
  1103. if not hasattr(self, '_last_processed_trade_time') or self._last_processed_trade_time is None:
  1104. try:
  1105. last_time_str = self.trading_engine.stats._get_metadata('last_processed_trade_time')
  1106. if last_time_str:
  1107. self._last_processed_trade_time = datetime.fromisoformat(last_time_str)
  1108. logger.debug(f"Loaded last_processed_trade_time from DB: {self._last_processed_trade_time}")
  1109. else:
  1110. # If no last processed time, start from 1 hour ago to avoid processing too much history
  1111. self._last_processed_trade_time = datetime.now(timezone.utc) - timedelta(hours=1)
  1112. logger.info("No last_processed_trade_time found, setting to 1 hour ago (UTC).")
  1113. except Exception as e:
  1114. logger.warning(f"Could not load last_processed_trade_time from DB: {e}")
  1115. self._last_processed_trade_time = datetime.now(timezone.utc) - timedelta(hours=1)
  1116. # Process new fills
  1117. for fill in recent_fills:
  1118. try:
  1119. # Parse fill data - CCXT format from fetch_my_trades
  1120. trade_id = fill.get('id') # CCXT uses 'id' for trade ID
  1121. timestamp_ms = fill.get('timestamp') # CCXT uses 'timestamp' (milliseconds)
  1122. symbol = fill.get('symbol') # CCXT uses 'symbol' in full format like 'LTC/USDC:USDC'
  1123. side = fill.get('side') # CCXT uses 'side' ('buy' or 'sell')
  1124. amount = float(fill.get('amount', 0)) # CCXT uses 'amount'
  1125. price = float(fill.get('price', 0)) # CCXT uses 'price'
  1126. # Convert timestamp
  1127. if timestamp_ms:
  1128. timestamp_dt = datetime.fromtimestamp(timestamp_ms / 1000, tz=timezone.utc)
  1129. else:
  1130. timestamp_dt = datetime.now(timezone.utc)
  1131. # Skip if already processed
  1132. if timestamp_dt <= self._last_processed_trade_time:
  1133. continue
  1134. # Process as external trade if we reach here
  1135. if symbol and side and amount > 0 and price > 0:
  1136. # Symbol is already in full format for CCXT
  1137. full_symbol = symbol
  1138. token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0]
  1139. # Check if this might be a bot order fill by looking for exchange order ID
  1140. # CCXT might have this in 'info' sub-object with the raw exchange data
  1141. exchange_order_id_from_fill = None
  1142. if 'info' in fill and isinstance(fill['info'], dict):
  1143. # Look for Hyperliquid order ID in the raw response
  1144. exchange_order_id_from_fill = fill['info'].get('oid')
  1145. if exchange_order_id_from_fill == exchange_oid:
  1146. logger.info(f"✅ Found recent fill for order {exchange_oid} - NOT cancelling stop losses")
  1147. return True
  1148. except Exception as e:
  1149. logger.error(f"Error processing fill {fill}: {e}")
  1150. continue
  1151. return False
  1152. except Exception as e:
  1153. logger.error(f"❌ Error checking for recent fills for order: {e}", exc_info=True)
  1154. return False
  1155. async def _check_external_stop_loss_orders(self):
  1156. """Check for externally placed stop loss orders and track them."""
  1157. try:
  1158. # Get current open orders
  1159. open_orders = self.trading_engine.get_orders()
  1160. if not open_orders:
  1161. return
  1162. # Get current positions to understand what could be stop losses
  1163. positions = self.trading_engine.get_positions()
  1164. if not positions:
  1165. return
  1166. # Create a map of current positions
  1167. position_map = {}
  1168. for position in positions:
  1169. symbol = position.get('symbol')
  1170. contracts = float(position.get('contracts', 0))
  1171. if symbol and contracts != 0:
  1172. token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0]
  1173. position_map[token] = {
  1174. 'symbol': symbol,
  1175. 'contracts': contracts,
  1176. 'side': 'long' if contracts > 0 else 'short',
  1177. 'entry_price': float(position.get('entryPx', 0))
  1178. }
  1179. # Check each order to see if it could be a stop loss
  1180. newly_detected = 0
  1181. for order in open_orders:
  1182. try:
  1183. exchange_order_id = order.get('id')
  1184. symbol = order.get('symbol')
  1185. side = order.get('side') # 'buy' or 'sell'
  1186. amount = float(order.get('amount', 0))
  1187. price = float(order.get('price', 0))
  1188. order_type = order.get('type', '').lower()
  1189. if not all([exchange_order_id, symbol, side, amount, price]):
  1190. continue
  1191. # Skip if we're already tracking this order
  1192. if exchange_order_id in self._external_stop_loss_orders:
  1193. continue
  1194. # Check if this order could be a stop loss
  1195. token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0]
  1196. # Must have a position in this token to have a stop loss
  1197. if token not in position_map:
  1198. continue
  1199. position = position_map[token]
  1200. # Check if this order matches stop loss pattern
  1201. is_stop_loss = False
  1202. if position['side'] == 'long' and side == 'sell':
  1203. # Long position with sell order - could be stop loss if price is below entry
  1204. if price < position['entry_price'] * 0.98: # Allow 2% buffer for approximation
  1205. is_stop_loss = True
  1206. elif position['side'] == 'short' and side == 'buy':
  1207. # Short position with buy order - could be stop loss if price is above entry
  1208. if price > position['entry_price'] * 1.02: # Allow 2% buffer for approximation
  1209. is_stop_loss = True
  1210. if is_stop_loss:
  1211. # Track this as an external stop loss order
  1212. self._external_stop_loss_orders[exchange_order_id] = {
  1213. 'token': token,
  1214. 'symbol': symbol,
  1215. 'trigger_price': price,
  1216. 'side': side,
  1217. 'amount': amount,
  1218. 'position_side': position['side'],
  1219. 'detected_at': datetime.now(timezone.utc),
  1220. 'entry_price': position['entry_price']
  1221. }
  1222. newly_detected += 1
  1223. logger.info(f"🛑 Detected external stop loss order: {token} {side.upper()} {amount} @ ${price:.2f} (protecting {position['side'].upper()} position)")
  1224. except Exception as e:
  1225. logger.error(f"Error analyzing order for stop loss detection: {e}")
  1226. continue
  1227. if newly_detected > 0:
  1228. logger.info(f"🔍 Detected {newly_detected} new external stop loss orders")
  1229. except Exception as e:
  1230. logger.error(f"❌ Error checking external stop loss orders: {e}")
  1231. async def _cleanup_external_stop_loss_tracking(self):
  1232. """Clean up external stop loss orders that are no longer active."""
  1233. try:
  1234. if not self._external_stop_loss_orders:
  1235. return
  1236. # Get current open orders
  1237. open_orders = self.trading_engine.get_orders()
  1238. if not open_orders:
  1239. # No open orders, clear all tracking
  1240. removed_count = len(self._external_stop_loss_orders)
  1241. self._external_stop_loss_orders.clear()
  1242. if removed_count > 0:
  1243. logger.info(f"🧹 Cleared {removed_count} external stop loss orders (no open orders)")
  1244. return
  1245. # Get set of current order IDs
  1246. current_order_ids = {order.get('id') for order in open_orders if order.get('id')}
  1247. # Remove any tracked stop loss orders that are no longer open
  1248. to_remove = []
  1249. for order_id, stop_loss_info in self._external_stop_loss_orders.items():
  1250. if order_id not in current_order_ids:
  1251. to_remove.append(order_id)
  1252. for order_id in to_remove:
  1253. stop_loss_info = self._external_stop_loss_orders[order_id]
  1254. del self._external_stop_loss_orders[order_id]
  1255. logger.info(f"🗑️ Removed external stop loss tracking for {stop_loss_info['token']} order {order_id} (no longer open)")
  1256. if to_remove:
  1257. logger.info(f"🧹 Cleaned up {len(to_remove)} external stop loss orders")
  1258. except Exception as e:
  1259. logger.error(f"❌ Error cleaning up external stop loss tracking: {e}")