external_event_monitor.py 47 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817
  1. #!/usr/bin/env python3
  2. """
  3. Monitors external events like trades made outside the bot and price alarms.
  4. """
  5. import logging
  6. import asyncio
  7. from datetime import datetime, timedelta, timezone
  8. from typing import Optional, Dict, Any, List
  9. # Assuming AlarmManager will be moved here or imported appropriately
  10. # from .alarm_manager import AlarmManager
  11. from src.monitoring.alarm_manager import AlarmManager # Keep if AlarmManager stays in its own file as per original structure
  12. from src.utils.token_display_formatter import get_formatter
  13. logger = logging.getLogger(__name__)
  14. class ExternalEventMonitor:
  15. def __init__(self, trading_engine, notification_manager, alarm_manager, market_monitor_cache, shared_state):
  16. self.trading_engine = trading_engine
  17. self.notification_manager = notification_manager
  18. self.alarm_manager = alarm_manager
  19. self.market_monitor_cache = market_monitor_cache
  20. self.shared_state = shared_state # Expected to contain {'external_stop_losses': ...}
  21. self.last_processed_trade_time: Optional[datetime] = None
  22. # Add necessary initializations, potentially loading last_processed_trade_time
  23. def _safe_get_positions(self) -> Optional[List[Dict[str, Any]]]:
  24. """Safely get positions from trading engine, returning None on API failures instead of empty list."""
  25. try:
  26. return self.trading_engine.get_positions()
  27. except Exception as e:
  28. logger.warning(f"⚠️ Failed to fetch positions in external event monitor: {e}")
  29. return None
  30. async def _check_price_alarms(self):
  31. """Check price alarms and trigger notifications."""
  32. try:
  33. active_alarms = self.alarm_manager.get_all_active_alarms()
  34. if not active_alarms:
  35. return
  36. tokens_to_check = list(set(alarm['token'] for alarm in active_alarms))
  37. for token in tokens_to_check:
  38. try:
  39. symbol = f"{token}/USDC:USDC"
  40. market_data = self.trading_engine.get_market_data(symbol)
  41. if not market_data or not market_data.get('ticker'):
  42. continue
  43. current_price = float(market_data['ticker'].get('last', 0))
  44. if current_price <= 0:
  45. continue
  46. token_alarms = [alarm for alarm in active_alarms if alarm['token'] == token]
  47. for alarm in token_alarms:
  48. target_price = alarm['target_price']
  49. direction = alarm['direction']
  50. should_trigger = False
  51. if direction == 'above' and current_price >= target_price:
  52. should_trigger = True
  53. elif direction == 'below' and current_price <= target_price:
  54. should_trigger = True
  55. if should_trigger:
  56. triggered_alarm = self.alarm_manager.trigger_alarm(alarm['id'], current_price)
  57. if triggered_alarm:
  58. await self._send_alarm_notification(triggered_alarm)
  59. except Exception as e:
  60. logger.error(f"Error checking alarms for {token}: {e}")
  61. except Exception as e:
  62. logger.error("❌ Error checking price alarms: {e}")
  63. async def _send_alarm_notification(self, alarm: Dict[str, Any]):
  64. """Send notification for triggered alarm."""
  65. try:
  66. if self.notification_manager:
  67. await self.notification_manager.send_alarm_triggered_notification(
  68. alarm['token'],
  69. alarm['target_price'],
  70. alarm['triggered_price'],
  71. alarm['direction']
  72. )
  73. else:
  74. logger.info(f"🔔 ALARM TRIGGERED: {alarm['token']} @ ${alarm['triggered_price']:,.2f}")
  75. except Exception as e:
  76. logger.error(f"❌ Error sending alarm notification: {e}")
  77. async def _determine_position_action_type(self, full_symbol: str, side_from_fill: str,
  78. amount_from_fill: float, existing_lc: Optional[Dict] = None) -> str:
  79. """
  80. Determine the type of position action based on current state and fill details.
  81. Returns one of: 'position_opened', 'position_closed', 'position_increased', 'position_decreased'
  82. """
  83. try:
  84. # Get current position from exchange
  85. current_positions = self._safe_get_positions()
  86. if current_positions is None:
  87. logger.warning(f"⚠️ Failed to fetch positions for {full_symbol} analysis - returning external_unmatched")
  88. return 'external_unmatched'
  89. current_exchange_position = None
  90. for pos in current_positions:
  91. if pos.get('symbol') == full_symbol:
  92. current_exchange_position = pos
  93. break
  94. current_size = 0.0
  95. if current_exchange_position:
  96. current_size = abs(float(current_exchange_position.get('contracts', 0)))
  97. # If no existing lifecycle, this is a position opening
  98. if not existing_lc:
  99. logger.debug(f"🔍 Position analysis: {full_symbol} no existing lifecycle, current size: {current_size}")
  100. if current_size > 1e-9: # Position exists on exchange
  101. return 'position_opened'
  102. else:
  103. return 'external_unmatched'
  104. # Get previous position size from lifecycle
  105. previous_size = existing_lc.get('current_position_size', 0)
  106. lc_position_side = existing_lc.get('position_side')
  107. logger.debug(f"🔍 Position analysis: {full_symbol} {side_from_fill} {amount_from_fill}")
  108. logger.debug(f" Lifecycle side: {lc_position_side}, previous size: {previous_size}, current size: {current_size}")
  109. # Check if this is a closing trade (opposite side)
  110. is_closing_trade = False
  111. if lc_position_side == 'long' and side_from_fill.lower() == 'sell':
  112. is_closing_trade = True
  113. elif lc_position_side == 'short' and side_from_fill.lower() == 'buy':
  114. is_closing_trade = True
  115. logger.debug(f" Is closing trade: {is_closing_trade}")
  116. if is_closing_trade:
  117. if current_size < 1e-9: # Position is now closed
  118. logger.debug(f" → Position closed (current_size < 1e-9)")
  119. return 'position_closed'
  120. elif current_size < previous_size - 1e-9: # Position reduced but not closed
  121. logger.debug(f" → Position decreased (current_size {current_size} < previous_size - 1e-9 {previous_size - 1e-9})")
  122. return 'position_decreased'
  123. else:
  124. # Same side trade - position increase
  125. logger.debug(f" Same side trade check: current_size {current_size} > previous_size + 1e-9 {previous_size + 1e-9}?")
  126. if current_size > previous_size + 1e-9:
  127. logger.debug(f" → Position increased")
  128. return 'position_increased'
  129. else:
  130. logger.debug(f" → Size check failed, not enough increase")
  131. # Default fallback
  132. logger.debug(f" → Fallback to external_unmatched")
  133. return 'external_unmatched'
  134. except Exception as e:
  135. logger.error(f"Error determining position action type: {e}")
  136. return 'external_unmatched'
  137. async def _update_lifecycle_position_size(self, lifecycle_id: str, new_size: float) -> bool:
  138. """Update the current position size in the lifecycle."""
  139. try:
  140. stats = self.trading_engine.get_stats()
  141. if not stats:
  142. return False
  143. # Update the current position size
  144. success = stats.trade_manager.update_trade_market_data(
  145. lifecycle_id, current_position_size=new_size
  146. )
  147. return success
  148. except Exception as e:
  149. logger.error(f"Error updating lifecycle position size: {e}")
  150. return False
  151. async def _send_position_change_notification(self, full_symbol: str, side_from_fill: str,
  152. amount_from_fill: float, price_from_fill: float,
  153. action_type: str, timestamp_dt: datetime,
  154. existing_lc: Optional[Dict] = None,
  155. realized_pnl: Optional[float] = None):
  156. """Send position change notification."""
  157. try:
  158. if not self.notification_manager:
  159. return
  160. token = full_symbol.split('/')[0] if '/' in full_symbol else full_symbol.split(':')[0]
  161. time_str = timestamp_dt.strftime('%Y-%m-%d %H:%M:%S UTC')
  162. formatter = get_formatter()
  163. if action_type == 'position_closed' and existing_lc:
  164. position_side = existing_lc.get('position_side', 'unknown').upper()
  165. entry_price = existing_lc.get('entry_price', 0)
  166. pnl_emoji = "🟢" if realized_pnl and realized_pnl >= 0 else "🔴"
  167. pnl_text = f"{await formatter.format_price_with_symbol(realized_pnl)}" if realized_pnl is not None else "N/A"
  168. # Get ROE directly from exchange data
  169. info_data = existing_lc.get('info', {})
  170. position_info = info_data.get('position', {})
  171. roe_raw = position_info.get('returnOnEquity') # Changed from 'percentage' to 'returnOnEquity'
  172. if roe_raw is not None:
  173. try:
  174. # The exchange provides ROE as a decimal (e.g., -0.326 for -32.6%)
  175. # We need to multiply by 100 and keep the sign
  176. roe = float(roe_raw) * 100
  177. roe_text = f" ({roe:+.2f}%)"
  178. except (ValueError, TypeError):
  179. logger.warning(f"Could not parse ROE value: {roe_raw} for {full_symbol}")
  180. roe_text = ""
  181. else:
  182. logger.warning(f"No ROE data available from exchange for {full_symbol}")
  183. roe_text = ""
  184. message = f"""
  185. 🎯 <b>Position Closed (External)</b>
  186. 📊 <b>Trade Details:</b>
  187. • Token: {token}
  188. • Direction: {position_side}
  189. • Size Closed: {await formatter.format_amount(amount_from_fill, token)}
  190. • Entry Price: {await formatter.format_price_with_symbol(entry_price, token)}
  191. • Exit Price: {await formatter.format_price_with_symbol(price_from_fill, token)}
  192. • Exit Value: {await formatter.format_price_with_symbol(amount_from_fill * price_from_fill)}
  193. {pnl_emoji} <b>P&L:</b> {pnl_text}{roe_text}
  194. ✅ <b>Status:</b> {position_side} position closed externally
  195. ⏰ <b>Time:</b> {time_str}
  196. 📊 Use /stats to view updated performance
  197. """
  198. elif action_type == 'position_opened':
  199. position_side = 'LONG' if side_from_fill.lower() == 'buy' else 'SHORT'
  200. message = f"""
  201. 🚀 <b>Position Opened (External)</b>
  202. 📊 <b>Trade Details:</b>
  203. • Token: {token}
  204. • Direction: {position_side}
  205. • Size: {await formatter.format_amount(amount_from_fill, token)}
  206. • Entry Price: {await formatter.format_price_with_symbol(price_from_fill, token)}
  207. • Position Value: {await formatter.format_price_with_symbol(amount_from_fill * price_from_fill)}
  208. ✅ <b>Status:</b> New {position_side} position opened externally
  209. ⏰ <b>Time:</b> {time_str}
  210. 📱 Use /positions to view all positions
  211. """
  212. elif action_type == 'position_increased' and existing_lc:
  213. position_side = existing_lc.get('position_side', 'unknown').upper()
  214. previous_size = existing_lc.get('current_position_size', 0)
  215. # Get current size from exchange
  216. current_positions = self._safe_get_positions()
  217. if current_positions is None:
  218. # Skip notification if we can't get position data
  219. logger.warning(f"⚠️ Failed to fetch positions for notification - skipping {action_type} notification")
  220. return
  221. current_size = 0
  222. for pos in current_positions:
  223. if pos.get('symbol') == full_symbol:
  224. current_size = abs(float(pos.get('contracts', 0)))
  225. break
  226. message = f"""
  227. 📈 <b>Position Increased (External)</b>
  228. 📊 <b>Trade Details:</b>
  229. • Token: {token}
  230. • Direction: {position_side}
  231. • Size Added: {await formatter.format_amount(amount_from_fill, token)}
  232. • Add Price: {await formatter.format_price_with_symbol(price_from_fill, token)}
  233. • Previous Size: {await formatter.format_amount(previous_size, token)}
  234. • New Size: {await formatter.format_amount(current_size, token)}
  235. • Add Value: {await formatter.format_price_with_symbol(amount_from_fill * price_from_fill)}
  236. 📈 <b>Status:</b> {position_side} position size increased externally
  237. ⏰ <b>Time:</b> {time_str}
  238. 📈 Use /positions to view current position
  239. """
  240. elif action_type == 'position_decreased' and existing_lc:
  241. position_side = existing_lc.get('position_side', 'unknown').upper()
  242. previous_size = existing_lc.get('current_position_size', 0)
  243. entry_price = existing_lc.get('entry_price', 0)
  244. # Get current size from exchange
  245. current_positions = self._safe_get_positions()
  246. if current_positions is None:
  247. # Skip notification if we can't get position data
  248. logger.warning(f"⚠️ Failed to fetch positions for notification - skipping {action_type} notification")
  249. return
  250. current_size = 0
  251. for pos in current_positions:
  252. if pos.get('symbol') == full_symbol:
  253. current_size = abs(float(pos.get('contracts', 0)))
  254. break
  255. # Calculate partial PnL for the reduced amount
  256. partial_pnl = 0
  257. if entry_price > 0:
  258. if position_side == 'LONG':
  259. partial_pnl = amount_from_fill * (price_from_fill - entry_price)
  260. else: # SHORT
  261. partial_pnl = amount_from_fill * (entry_price - price_from_fill)
  262. pnl_emoji = "🟢" if partial_pnl >= 0 else "🔴"
  263. # Calculate ROE for the partial close
  264. roe_text = ""
  265. if entry_price > 0 and amount_from_fill > 0:
  266. cost_basis = amount_from_fill * entry_price
  267. roe = (partial_pnl / cost_basis) * 100
  268. roe_text = f" ({roe:+.2f}%)"
  269. message = f"""
  270. 📉 <b>Position Decreased (External)</b>
  271. 📊 <b>Trade Details:</b>
  272. • Token: {token}
  273. • Direction: {position_side}
  274. • Size Reduced: {await formatter.format_amount(amount_from_fill, token)}
  275. • Exit Price: {await formatter.format_price_with_symbol(price_from_fill, token)}
  276. • Previous Size: {await formatter.format_amount(previous_size, token)}
  277. • Remaining Size: {await formatter.format_amount(current_size, token)}
  278. • Exit Value: {await formatter.format_price_with_symbol(amount_from_fill * price_from_fill)}
  279. {pnl_emoji} <b>Partial P&L:</b> {await formatter.format_price_with_symbol(partial_pnl)}{roe_text}
  280. 📉 <b>Status:</b> {position_side} position size decreased externally
  281. ⏰ <b>Time:</b> {time_str}
  282. 📊 Position remains open. Use /positions to view details
  283. """
  284. else:
  285. # No fallback notification sent - only position-based notifications per user preference
  286. logger.debug(f"No notification sent for action_type: {action_type}")
  287. return
  288. await self.notification_manager.send_generic_notification(message.strip())
  289. except Exception as e:
  290. logger.error(f"Error sending position change notification: {e}")
  291. async def _auto_sync_single_position(self, symbol: str, exchange_position: Dict[str, Any], stats) -> bool:
  292. """Auto-sync a single orphaned position to create a lifecycle record."""
  293. try:
  294. import uuid
  295. from src.utils.token_display_formatter import get_formatter
  296. formatter = get_formatter()
  297. contracts_abs = abs(float(exchange_position.get('contracts', 0)))
  298. if contracts_abs <= 1e-9:
  299. return False
  300. entry_price_from_exchange = float(exchange_position.get('entryPrice', 0)) or float(exchange_position.get('entryPx', 0))
  301. # Determine position side
  302. position_side, order_side = '', ''
  303. ccxt_side = exchange_position.get('side', '').lower()
  304. if ccxt_side == 'long':
  305. position_side, order_side = 'long', 'buy'
  306. elif ccxt_side == 'short':
  307. position_side, order_side = 'short', 'sell'
  308. if not position_side:
  309. contracts_val = float(exchange_position.get('contracts', 0))
  310. if contracts_val > 1e-9:
  311. position_side, order_side = 'long', 'buy'
  312. elif contracts_val < -1e-9:
  313. position_side, order_side = 'short', 'sell'
  314. else:
  315. return False
  316. if not position_side:
  317. logger.error(f"AUTO-SYNC: Could not determine position side for {symbol}.")
  318. return False
  319. final_entry_price = entry_price_from_exchange
  320. if not final_entry_price or final_entry_price <= 0:
  321. # Fallback to a reasonable estimate (current mark price)
  322. mark_price = float(exchange_position.get('markPrice', 0)) or float(exchange_position.get('markPx', 0))
  323. if mark_price > 0:
  324. final_entry_price = mark_price
  325. else:
  326. logger.error(f"AUTO-SYNC: Could not determine entry price for {symbol}.")
  327. return False
  328. logger.info(f"🔄 AUTO-SYNC: Creating lifecycle for {symbol} {position_side.upper()} {contracts_abs} @ {await formatter.format_price_with_symbol(final_entry_price, symbol)}")
  329. unique_sync_id = str(uuid.uuid4())[:8]
  330. lifecycle_id = stats.create_trade_lifecycle(
  331. symbol=symbol,
  332. side=order_side,
  333. entry_order_id=f"external_sync_{unique_sync_id}",
  334. trade_type='external_sync'
  335. )
  336. if lifecycle_id:
  337. success = await stats.update_trade_position_opened(
  338. lifecycle_id,
  339. final_entry_price,
  340. contracts_abs,
  341. f"external_fill_sync_{unique_sync_id}"
  342. )
  343. if success:
  344. logger.info(f"✅ AUTO-SYNC: Successfully synced position for {symbol} (Lifecycle: {lifecycle_id[:8]})")
  345. # Send position opened notification for auto-synced position
  346. try:
  347. await self._send_position_change_notification(
  348. symbol, order_side, contracts_abs, final_entry_price,
  349. 'position_opened', datetime.now(timezone.utc)
  350. )
  351. logger.info(f"📨 AUTO-SYNC: Sent position opened notification for {symbol}")
  352. except Exception as e:
  353. logger.error(f"❌ AUTO-SYNC: Failed to send notification for {symbol}: {e}")
  354. return True
  355. else:
  356. logger.error(f"❌ AUTO-SYNC: Failed to update lifecycle to 'position_opened' for {symbol}")
  357. else:
  358. logger.error(f"❌ AUTO-SYNC: Failed to create lifecycle for {symbol}")
  359. return False
  360. except Exception as e:
  361. logger.error(f"❌ AUTO-SYNC: Error syncing position for {symbol}: {e}")
  362. return False
  363. async def _check_external_trades(self):
  364. """Check for trades made outside the Telegram bot and update stats."""
  365. try:
  366. stats = self.trading_engine.get_stats()
  367. if not stats:
  368. logger.warning("TradingStats not available in _check_external_trades. Skipping.")
  369. return
  370. external_trades_processed = 0
  371. symbols_with_fills = set()
  372. recent_fills = self.trading_engine.get_recent_fills()
  373. if not recent_fills:
  374. logger.debug("No recent fills data available")
  375. return
  376. if not hasattr(self, 'last_processed_trade_time') or self.last_processed_trade_time is None:
  377. try:
  378. # Ensure this metadata key is the one used by MarketMonitor for saving this state.
  379. last_time_str = stats._get_metadata('market_monitor_last_processed_trade_time')
  380. if last_time_str:
  381. self.last_processed_trade_time = datetime.fromisoformat(last_time_str).replace(tzinfo=timezone.utc)
  382. else:
  383. self.last_processed_trade_time = datetime.now(timezone.utc) - timedelta(hours=1)
  384. except Exception:
  385. self.last_processed_trade_time = datetime.now(timezone.utc) - timedelta(hours=1)
  386. for fill in recent_fills:
  387. try:
  388. trade_id = fill.get('id')
  389. timestamp_ms = fill.get('timestamp')
  390. symbol_from_fill = fill.get('symbol')
  391. side_from_fill = fill.get('side')
  392. amount_from_fill = float(fill.get('amount', 0))
  393. price_from_fill = float(fill.get('price', 0))
  394. timestamp_dt = datetime.fromtimestamp(timestamp_ms / 1000, tz=timezone.utc) if timestamp_ms else datetime.now(timezone.utc)
  395. if self.last_processed_trade_time and timestamp_dt <= self.last_processed_trade_time:
  396. continue
  397. # Check if this fill has already been processed to prevent duplicates
  398. if trade_id and stats.has_exchange_fill_been_processed(str(trade_id)):
  399. logger.debug(f"Skipping already processed fill: {trade_id}")
  400. continue
  401. fill_processed_this_iteration = False
  402. if not (symbol_from_fill and side_from_fill and amount_from_fill > 0 and price_from_fill > 0):
  403. logger.warning(f"Skipping fill with incomplete data: {fill}")
  404. continue
  405. full_symbol = symbol_from_fill
  406. token = symbol_from_fill.split('/')[0] if '/' in symbol_from_fill else symbol_from_fill.split(':')[0]
  407. exchange_order_id_from_fill = fill.get('info', {}).get('oid')
  408. # First check if this is a pending entry order fill
  409. if exchange_order_id_from_fill:
  410. pending_lc = stats.get_lifecycle_by_entry_order_id(exchange_order_id_from_fill, status='pending')
  411. if pending_lc and pending_lc.get('symbol') == full_symbol:
  412. success = await stats.update_trade_position_opened(
  413. lifecycle_id=pending_lc['trade_lifecycle_id'],
  414. entry_price=price_from_fill,
  415. entry_amount=amount_from_fill,
  416. exchange_fill_id=trade_id
  417. )
  418. if success:
  419. logger.info(f"📈 Lifecycle ENTRY: {pending_lc['trade_lifecycle_id']} for {full_symbol} updated by fill {trade_id}.")
  420. symbols_with_fills.add(token)
  421. order_in_db_for_entry = stats.get_order_by_exchange_id(exchange_order_id_from_fill)
  422. if order_in_db_for_entry:
  423. stats.update_order_status(order_db_id=order_in_db_for_entry['id'], new_status='filled', amount_filled_increment=amount_from_fill)
  424. # Send position opened notification (this is a bot-initiated position)
  425. await self._send_position_change_notification(
  426. full_symbol, side_from_fill, amount_from_fill, price_from_fill,
  427. 'position_opened', timestamp_dt
  428. )
  429. fill_processed_this_iteration = True
  430. # Check if this is a known bot order (SL/TP/exit)
  431. if not fill_processed_this_iteration and exchange_order_id_from_fill:
  432. active_lc = None
  433. closure_reason_action_type = None
  434. bot_order_db_id_to_update = None
  435. bot_order_for_fill = stats.get_order_by_exchange_id(exchange_order_id_from_fill)
  436. if bot_order_for_fill and bot_order_for_fill.get('symbol') == full_symbol:
  437. order_type = bot_order_for_fill.get('type')
  438. order_side = bot_order_for_fill.get('side')
  439. if order_type == 'market':
  440. potential_lc = stats.get_trade_by_symbol_and_status(full_symbol, 'position_opened')
  441. if potential_lc:
  442. lc_pos_side = potential_lc.get('position_side')
  443. if (lc_pos_side == 'long' and order_side == 'sell' and side_from_fill == 'sell') or \
  444. (lc_pos_side == 'short' and order_side == 'buy' and side_from_fill == 'buy'):
  445. active_lc = potential_lc
  446. closure_reason_action_type = f"bot_exit_{lc_pos_side}_close"
  447. bot_order_db_id_to_update = bot_order_for_fill.get('id')
  448. logger.info(f"ℹ️ Lifecycle BOT EXIT: Fill {trade_id} (OID {exchange_order_id_from_fill}) for {full_symbol} matches bot exit for lifecycle {active_lc['trade_lifecycle_id']}.")
  449. if not active_lc:
  450. lc_by_sl = stats.get_lifecycle_by_sl_order_id(exchange_order_id_from_fill, status='position_opened')
  451. if lc_by_sl and lc_by_sl.get('symbol') == full_symbol:
  452. active_lc = lc_by_sl
  453. closure_reason_action_type = f"sl_{active_lc.get('position_side')}_close"
  454. bot_order_db_id_to_update = bot_order_for_fill.get('id')
  455. logger.info(f"ℹ️ Lifecycle SL: Fill {trade_id} for OID {exchange_order_id_from_fill} matches SL for lifecycle {active_lc['trade_lifecycle_id']}.")
  456. if not active_lc:
  457. lc_by_tp = stats.get_lifecycle_by_tp_order_id(exchange_order_id_from_fill, status='position_opened')
  458. if lc_by_tp and lc_by_tp.get('symbol') == full_symbol:
  459. active_lc = lc_by_tp
  460. closure_reason_action_type = f"tp_{active_lc.get('position_side')}_close"
  461. bot_order_db_id_to_update = bot_order_for_fill.get('id')
  462. logger.info(f"ℹ️ Lifecycle TP: Fill {trade_id} for OID {exchange_order_id_from_fill} matches TP for lifecycle {active_lc['trade_lifecycle_id']}.")
  463. # Process known bot order fills
  464. if active_lc and closure_reason_action_type:
  465. lc_id = active_lc['trade_lifecycle_id']
  466. lc_entry_price = active_lc.get('entry_price', 0)
  467. lc_position_side = active_lc.get('position_side')
  468. realized_pnl = 0
  469. if lc_position_side == 'long':
  470. realized_pnl = amount_from_fill * (price_from_fill - lc_entry_price)
  471. elif lc_position_side == 'short':
  472. realized_pnl = amount_from_fill * (lc_entry_price - price_from_fill)
  473. success = stats.update_trade_position_closed(
  474. lifecycle_id=lc_id, exit_price=price_from_fill,
  475. realized_pnl=realized_pnl, exchange_fill_id=trade_id
  476. )
  477. if success:
  478. pnl_emoji = "🟢" if realized_pnl >= 0 else "🔴"
  479. formatter = get_formatter()
  480. logger.info(f"{pnl_emoji} Lifecycle CLOSED: {lc_id} ({closure_reason_action_type}). PNL for fill: {formatter.format_price_with_symbol(realized_pnl)}")
  481. symbols_with_fills.add(token)
  482. # Send position closed notification
  483. await self._send_position_change_notification(
  484. full_symbol, side_from_fill, amount_from_fill, price_from_fill,
  485. 'position_closed', timestamp_dt, active_lc, realized_pnl
  486. )
  487. stats.migrate_trade_to_aggregated_stats(lc_id)
  488. if bot_order_db_id_to_update:
  489. stats.update_order_status(order_db_id=bot_order_db_id_to_update, new_status='filled', amount_filled_increment=amount_from_fill)
  490. fill_processed_this_iteration = True
  491. # Check for external stop losses
  492. if not fill_processed_this_iteration:
  493. if (exchange_order_id_from_fill and
  494. self.shared_state.get('external_stop_losses') and
  495. exchange_order_id_from_fill in self.shared_state['external_stop_losses']):
  496. stop_loss_info = self.shared_state['external_stop_losses'][exchange_order_id_from_fill]
  497. formatter = get_formatter()
  498. logger.info(f"🛑 External SL (MM Tracking): {token} Order {exchange_order_id_from_fill} filled @ {formatter.format_price_with_symbol(price_from_fill, token)}")
  499. sl_active_lc = stats.get_trade_by_symbol_and_status(full_symbol, 'position_opened')
  500. if sl_active_lc:
  501. lc_id = sl_active_lc['trade_lifecycle_id']
  502. lc_entry_price = sl_active_lc.get('entry_price', 0)
  503. lc_pos_side = sl_active_lc.get('position_side')
  504. realized_pnl = amount_from_fill * (price_from_fill - lc_entry_price) if lc_pos_side == 'long' else amount_from_fill * (lc_entry_price - price_from_fill)
  505. success = stats.update_trade_position_closed(lc_id, price_from_fill, realized_pnl, trade_id)
  506. if success:
  507. pnl_emoji = "🟢" if realized_pnl >= 0 else "🔴"
  508. logger.info(f"{pnl_emoji} Lifecycle CLOSED by External SL (MM): {lc_id}. PNL: {formatter.format_price_with_symbol(realized_pnl)}")
  509. if self.notification_manager:
  510. await self.notification_manager.send_stop_loss_execution_notification(
  511. stop_loss_info, full_symbol, side_from_fill, amount_from_fill, price_from_fill,
  512. f'{lc_pos_side}_closed_external_sl', timestamp_dt.isoformat(), realized_pnl
  513. )
  514. stats.migrate_trade_to_aggregated_stats(lc_id)
  515. # Modify shared state carefully
  516. if exchange_order_id_from_fill in self.shared_state['external_stop_losses']:
  517. del self.shared_state['external_stop_losses'][exchange_order_id_from_fill]
  518. fill_processed_this_iteration = True
  519. else:
  520. logger.warning(f"⚠️ External SL (MM) {exchange_order_id_from_fill} for {full_symbol}, but no active lifecycle found.")
  521. # NEW: Enhanced external trade processing with position state detection
  522. if not fill_processed_this_iteration:
  523. existing_lc = stats.get_trade_by_symbol_and_status(full_symbol, 'position_opened')
  524. # If no lifecycle exists but we have a position on exchange, try to auto-sync first
  525. if not existing_lc:
  526. current_positions = self._safe_get_positions()
  527. if current_positions is None:
  528. logger.warning("⚠️ Failed to fetch positions for external trade detection - skipping this fill")
  529. continue
  530. exchange_position = None
  531. for pos in current_positions:
  532. if pos.get('symbol') == full_symbol:
  533. exchange_position = pos
  534. break
  535. if exchange_position and abs(float(exchange_position.get('contracts', 0))) > 1e-9:
  536. logger.info(f"🔄 AUTO-SYNC: Position exists on exchange for {full_symbol} but no lifecycle found. Auto-syncing before processing fill.")
  537. success = await self._auto_sync_single_position(full_symbol, exchange_position, stats)
  538. if success:
  539. # Re-check for lifecycle after auto-sync
  540. existing_lc = stats.get_trade_by_symbol_and_status(full_symbol, 'position_opened')
  541. action_type = await self._determine_position_action_type(
  542. full_symbol, side_from_fill, amount_from_fill, existing_lc
  543. )
  544. logger.info(f"🔍 External fill analysis: {full_symbol} {side_from_fill} {amount_from_fill} -> {action_type}")
  545. # Additional debug logging for position changes
  546. if existing_lc:
  547. previous_size = existing_lc.get('current_position_size', 0)
  548. current_positions = self._safe_get_positions()
  549. if current_positions is None:
  550. logger.warning("⚠️ Failed to fetch positions for debug logging - skipping debug info")
  551. # Set defaults to avoid reference errors
  552. current_size = previous_size
  553. else:
  554. current_size = 0
  555. for pos in current_positions:
  556. if pos.get('symbol') == full_symbol:
  557. current_size = abs(float(pos.get('contracts', 0)))
  558. break
  559. logger.info(f"📊 Position size change: {previous_size} -> {current_size} (diff: {current_size - previous_size})")
  560. logger.info(f"🎯 Expected change based on fill: {'+' if side_from_fill.lower() == 'buy' else '-'}{amount_from_fill}")
  561. # Check if this might be a position decrease that was misclassified
  562. if (action_type == 'external_unmatched' and
  563. existing_lc.get('position_side') == 'long' and
  564. side_from_fill.lower() == 'sell' and
  565. current_size < previous_size):
  566. logger.warning(f"⚠️ Potential misclassification: {full_symbol} {side_from_fill} looks like position decrease but classified as external_unmatched")
  567. # Force re-check with proper parameters
  568. action_type = 'position_decreased'
  569. logger.info(f"🔄 Corrected action_type to: {action_type}")
  570. elif (action_type == 'external_unmatched' and
  571. existing_lc.get('position_side') == 'long' and
  572. side_from_fill.lower() == 'buy' and
  573. current_size > previous_size):
  574. logger.warning(f"⚠️ Potential misclassification: {full_symbol} {side_from_fill} looks like position increase but classified as external_unmatched")
  575. action_type = 'position_increased'
  576. logger.info(f"🔄 Corrected action_type to: {action_type}")
  577. if action_type == 'position_opened':
  578. # Create new lifecycle for external position
  579. lifecycle_id = stats.create_trade_lifecycle(
  580. symbol=full_symbol,
  581. side=side_from_fill,
  582. entry_order_id=exchange_order_id_from_fill or f"external_{trade_id}",
  583. trade_type="external"
  584. )
  585. if lifecycle_id:
  586. success = stats.update_trade_position_opened(
  587. lifecycle_id=lifecycle_id,
  588. entry_price=price_from_fill,
  589. entry_amount=amount_from_fill,
  590. exchange_fill_id=trade_id
  591. )
  592. if success:
  593. logger.info(f"📈 Created and opened new external lifecycle: {lifecycle_id[:8]} for {full_symbol}")
  594. symbols_with_fills.add(token)
  595. # Send position opened notification
  596. await self._send_position_change_notification(
  597. full_symbol, side_from_fill, amount_from_fill, price_from_fill,
  598. action_type, timestamp_dt
  599. )
  600. fill_processed_this_iteration = True
  601. elif action_type == 'position_closed' and existing_lc:
  602. # Close existing lifecycle
  603. lc_id = existing_lc['trade_lifecycle_id']
  604. lc_entry_price = existing_lc.get('entry_price', 0)
  605. lc_position_side = existing_lc.get('position_side')
  606. realized_pnl = 0
  607. if lc_position_side == 'long':
  608. realized_pnl = amount_from_fill * (price_from_fill - lc_entry_price)
  609. elif lc_position_side == 'short':
  610. realized_pnl = amount_from_fill * (lc_entry_price - price_from_fill)
  611. success = stats.update_trade_position_closed(
  612. lifecycle_id=lc_id,
  613. exit_price=price_from_fill,
  614. realized_pnl=realized_pnl,
  615. exchange_fill_id=trade_id
  616. )
  617. if success:
  618. pnl_emoji = "🟢" if realized_pnl >= 0 else "🔴"
  619. formatter = get_formatter()
  620. logger.info(f"{pnl_emoji} Lifecycle CLOSED (External): {lc_id}. PNL: {formatter.format_price_with_symbol(realized_pnl)}")
  621. symbols_with_fills.add(token)
  622. # Send position closed notification
  623. await self._send_position_change_notification(
  624. full_symbol, side_from_fill, amount_from_fill, price_from_fill,
  625. action_type, timestamp_dt, existing_lc, realized_pnl
  626. )
  627. stats.migrate_trade_to_aggregated_stats(lc_id)
  628. fill_processed_this_iteration = True
  629. elif action_type in ['position_increased', 'position_decreased'] and existing_lc:
  630. # Update lifecycle position size and send notification
  631. current_positions = self._safe_get_positions()
  632. if current_positions is None:
  633. logger.warning("⚠️ Failed to fetch positions for size update - skipping position change processing")
  634. continue
  635. new_size = 0
  636. for pos in current_positions:
  637. if pos.get('symbol') == full_symbol:
  638. new_size = abs(float(pos.get('contracts', 0)))
  639. break
  640. # Update lifecycle with new position size
  641. await self._update_lifecycle_position_size(existing_lc['trade_lifecycle_id'], new_size)
  642. # Send appropriate notification
  643. await self._send_position_change_notification(
  644. full_symbol, side_from_fill, amount_from_fill, price_from_fill,
  645. action_type, timestamp_dt, existing_lc
  646. )
  647. symbols_with_fills.add(token)
  648. fill_processed_this_iteration = True
  649. logger.info(f"📊 Position {action_type}: {full_symbol} new size: {new_size}")
  650. # Fallback for unmatched external trades
  651. if not fill_processed_this_iteration:
  652. all_open_positions_in_db = stats.get_open_positions()
  653. db_open_symbols = {pos_db.get('symbol') for pos_db in all_open_positions_in_db}
  654. if full_symbol in db_open_symbols:
  655. logger.debug(f"Position {full_symbol} found in open positions but no active lifecycle - likely auto-sync failed or timing issue for fill {trade_id}")
  656. # Record as unmatched external trade
  657. linked_order_db_id = None
  658. if exchange_order_id_from_fill:
  659. order_in_db = stats.get_order_by_exchange_id(exchange_order_id_from_fill)
  660. if order_in_db:
  661. linked_order_db_id = order_in_db.get('id')
  662. stats.record_trade(
  663. full_symbol, side_from_fill, amount_from_fill, price_from_fill,
  664. exchange_fill_id=trade_id, trade_type="external_unmatched",
  665. timestamp=timestamp_dt.isoformat(),
  666. linked_order_table_id_to_link=linked_order_db_id
  667. )
  668. logger.info(f"📋 Recorded trade via FALLBACK: {trade_id} (Unmatched External Fill)")
  669. # No notification sent for unmatched external trades per user preference
  670. fill_processed_this_iteration = True
  671. if fill_processed_this_iteration:
  672. external_trades_processed += 1
  673. if self.last_processed_trade_time is None or timestamp_dt > self.last_processed_trade_time:
  674. self.last_processed_trade_time = timestamp_dt
  675. except Exception as e:
  676. logger.error(f"Error processing fill {fill.get('id', 'N/A')}: {e}", exc_info=True)
  677. continue
  678. if external_trades_processed > 0:
  679. stats._set_metadata('market_monitor_last_processed_trade_time', self.last_processed_trade_time.isoformat())
  680. logger.info(f"💾 Saved MarketMonitor state (last_processed_trade_time) to DB: {self.last_processed_trade_time.isoformat()}")
  681. logger.info(f"📊 Processed {external_trades_processed} external trades")
  682. if symbols_with_fills:
  683. logger.info(f"ℹ️ Symbols with processed fills this cycle: {list(symbols_with_fills)}")
  684. except Exception as e:
  685. logger.error(f"❌ Error checking external trades: {e}", exc_info=True)