#!/usr/bin/env python3 """ Market Monitor - Handles external trade monitoring and heartbeat functionality. """ import logging import asyncio from datetime import datetime, timedelta, timezone from typing import Optional, Dict, Any, List import os import json from telegram.ext import CallbackContext from src.config.config import Config from src.monitoring.alarm_manager import AlarmManager from src.utils.token_display_formatter import get_formatter logger = logging.getLogger(__name__) class MarketMonitor: """Handles external trade monitoring and market events.""" def __init__(self, trading_engine, notification_manager=None): """Initialize market monitor.""" self.trading_engine = trading_engine self.notification_manager = notification_manager self._monitoring_active = False self._monitor_task = None # Enhanced tracking for Phase 3+ self.last_known_orders = set() # Set of order IDs we know about self.last_known_positions = {} # Dict mapping symbol -> position data # Price alarms tracking self.price_alarms = {} # Dict mapping alarm_id -> alarm_data self.next_alarm_id = 1 # External stop loss tracking self.external_stop_losses = {} # symbol -> {order_id, stop_price, side, amount} # ๐Ÿ†• CONTINUOUS DATA CACHE: Keep bot state updated self.cached_positions = [] # Fresh exchange positions self.cached_orders = [] # Fresh exchange orders self.cached_balance = None # Fresh balance data self.last_cache_update = None # External trade monitoring self.last_processed_trade_time: Optional[datetime] = None # Alarm management self.alarm_manager = AlarmManager() # Load persistent state self._load_state() async def start(self): """Start the market monitor.""" if self._monitoring_active: logger.warning("Market monitor is already active") return self._monitoring_active = True logger.info("๐Ÿ”„ Market monitor started") # Initialize tracking await self._initialize_tracking() # Start the monitoring loop self._monitor_task = asyncio.create_task(self._monitor_loop()) async def stop(self): """Stop the market monitor.""" if not self._monitoring_active: return self._monitoring_active = False if self._monitor_task: self._monitor_task.cancel() try: await self._monitor_task except asyncio.CancelledError: pass self._save_state() logger.info("๐Ÿ›‘ Market monitor stopped") def _load_state(self): """Load market monitor state from SQLite DB via TradingStats.""" stats = self.trading_engine.get_stats() if not stats: logger.warning("โš ๏ธ TradingStats not available, cannot load MarketMonitor state.") self.last_processed_trade_time = None return try: last_time_str = stats._get_metadata('market_monitor_last_processed_trade_time') if last_time_str: self.last_processed_trade_time = datetime.fromisoformat(last_time_str) # Ensure it's timezone-aware (UTC) if self.last_processed_trade_time.tzinfo is None: self.last_processed_trade_time = self.last_processed_trade_time.replace(tzinfo=timezone.utc) else: self.last_processed_trade_time = self.last_processed_trade_time.astimezone(timezone.utc) logger.info(f"๐Ÿ”„ Loaded MarketMonitor state from DB: last_processed_trade_time = {self.last_processed_trade_time.isoformat()}") else: logger.info("๐Ÿ’จ No MarketMonitor state (last_processed_trade_time) found in DB. Will start with fresh external trade tracking.") self.last_processed_trade_time = None except Exception as e: logger.error(f"Error loading MarketMonitor state from DB: {e}. Proceeding with default state.") self.last_processed_trade_time = None def _save_state(self): """Save market monitor state to SQLite DB via TradingStats.""" stats = self.trading_engine.get_stats() if not stats: logger.warning("โš ๏ธ TradingStats not available, cannot save MarketMonitor state.") return try: if self.last_processed_trade_time: # Ensure timestamp is UTC before saving lptt_utc = self.last_processed_trade_time if lptt_utc.tzinfo is None: lptt_utc = lptt_utc.replace(tzinfo=timezone.utc) else: lptt_utc = lptt_utc.astimezone(timezone.utc) stats._set_metadata('market_monitor_last_processed_trade_time', lptt_utc.isoformat()) logger.info(f"๐Ÿ’พ Saved MarketMonitor state (last_processed_trade_time) to DB: {lptt_utc.isoformat()}") else: # If it's None, we might want to remove the key or save it as an empty string # For now, let's assume we only save if there is a time. Or remove it. stats._set_metadata('market_monitor_last_processed_trade_time', '') # Or handle deletion logger.info("๐Ÿ’พ MarketMonitor state (last_processed_trade_time) is None, saved as empty in DB.") except Exception as e: logger.error(f"Error saving MarketMonitor state to DB: {e}") async def _initialize_tracking(self): """Initialize order and position tracking.""" try: # Initialize order tracking try: orders = self.trading_engine.get_orders() or [] self.last_known_orders = {order.get('id') for order in orders if order.get('id')} logger.info(f"๐Ÿ“‹ Initialized tracking with {len(orders)} open orders") except Exception as e: logger.error(f"โŒ Failed to initialize order tracking: {e}") self.last_known_orders = set() # Initialize position tracking try: positions = self.trading_engine.get_positions() or [] self.last_known_positions = { pos.get('symbol'): pos for pos in positions if pos.get('symbol') and abs(float(pos.get('contracts', 0))) > 0 } logger.info(f"๐Ÿ“Š Initialized tracking with {len(positions)} positions") # ๐Ÿ†• IMMEDIATE AUTO-SYNC: Check for orphaned positions right after initialization if positions: await self._immediate_startup_auto_sync() except Exception as e: logger.error(f"โŒ Failed to initialize position tracking: {e}") self.last_known_positions = {} except Exception as e: logger.error(f"โŒ Failed to initialize tracking: {e}") self.last_known_orders = set() self.last_known_positions = {} async def _monitor_loop(self): """Main monitoring loop that runs every BOT_HEARTBEAT_SECONDS.""" try: loop_count = 0 while self._monitoring_active: # ๐Ÿ†• CONTINUOUS UPDATE: Cache fresh exchange data every heartbeat await self._update_cached_data() # ๐Ÿ†• PHASE 4: Check trades table for pending stop loss activation first (highest priority) await self._activate_pending_stop_losses_from_trades() await self._check_order_fills() await self._check_price_alarms() await self._check_external_trades() await self._check_pending_triggers() await self._check_automatic_risk_management() await self._check_external_stop_loss_orders() # Run orphaned stop loss cleanup every 10 heartbeats (less frequent but regular) loop_count += 1 if loop_count % 10 == 0: await self._cleanup_orphaned_stop_losses() await self._cleanup_external_stop_loss_tracking() # ๐Ÿ†• AUTO-SYNC: Check for orphaned positions every 10 heartbeats await self._auto_sync_orphaned_positions() loop_count = 0 # Reset counter to prevent overflow await asyncio.sleep(Config.BOT_HEARTBEAT_SECONDS) except asyncio.CancelledError: logger.info("Market monitor loop cancelled") raise except Exception as e: logger.error(f"Error in market monitor loop: {e}") # Restart after error if self._monitoring_active: await asyncio.sleep(5) await self._monitor_loop() async def _update_cached_data(self): """๐Ÿ†• Continuously update cached exchange data every heartbeat.""" try: # Fetch fresh data from exchange fresh_positions_list = self.trading_engine.get_positions() or [] fresh_orders_list = self.trading_engine.get_orders() or [] fresh_balance = self.trading_engine.get_balance() # Update primary cache immediately self.cached_positions = fresh_positions_list self.cached_orders = fresh_orders_list self.cached_balance = fresh_balance self.last_cache_update = datetime.now(timezone.utc) logger.debug(f"๐Ÿ”„ Fetched fresh cache: {len(fresh_positions_list)} positions, {len(fresh_orders_list)} orders") # Prepare current state maps for comparison and for updating last_known state current_exchange_position_map = { pos.get('symbol'): pos for pos in fresh_positions_list if pos.get('symbol') and abs(float(pos.get('contracts', 0))) > 1e-9 } current_exchange_order_ids = {order.get('id') for order in fresh_orders_list if order.get('id')} # Log changes by comparing with the state from the end of the previous cycle if len(current_exchange_position_map) != len(self.last_known_positions): logger.info(f"๐Ÿ“Š Position count changed: {len(self.last_known_positions)} โ†’ {len(current_exchange_position_map)}") if len(current_exchange_order_ids) != len(self.last_known_orders): logger.info(f"๐Ÿ“‹ Order count changed: {len(self.last_known_orders)} โ†’ {len(current_exchange_order_ids)}") # Update last_known_xxx to the current exchange state for the *next* cycle's comparison self.last_known_positions = current_exchange_position_map self.last_known_orders = current_exchange_order_ids # ๐Ÿ’น Update unrealized P&L and mark price in DB for open positions stats = self.trading_engine.get_stats() if stats and fresh_positions_list: for ex_pos in fresh_positions_list: symbol = ex_pos.get('symbol') if not symbol: continue db_trade = stats.get_trade_by_symbol_and_status(symbol, status='position_opened') if db_trade: lifecycle_id = db_trade.get('trade_lifecycle_id') if not lifecycle_id: continue # Extract all relevant data from exchange position (ex_pos) # Ensure to handle cases where keys might be missing or values are None/empty strings current_size_from_ex = ex_pos.get('contracts') # Usually 'contracts' in CCXT if current_size_from_ex is not None: try: current_position_size = float(current_size_from_ex) except (ValueError, TypeError): current_position_size = None else: current_position_size = None entry_price_from_ex = ex_pos.get('entryPrice') or ex_pos.get('entryPx') if entry_price_from_ex is not None: try: entry_price = float(entry_price_from_ex) except (ValueError, TypeError): entry_price = None else: entry_price = None mark_price_from_ex = ex_pos.get('markPrice') or ex_pos.get('markPx') if mark_price_from_ex is not None: try: mark_price = float(mark_price_from_ex) except (ValueError, TypeError): mark_price = None else: mark_price = None unrealized_pnl_from_ex = ex_pos.get('unrealizedPnl') if unrealized_pnl_from_ex is not None: try: unrealized_pnl = float(unrealized_pnl_from_ex) except (ValueError, TypeError): unrealized_pnl = None else: unrealized_pnl = None liquidation_price_from_ex = ex_pos.get('liquidationPrice') if liquidation_price_from_ex is not None: try: liquidation_price = float(liquidation_price_from_ex) except (ValueError, TypeError): liquidation_price = None else: liquidation_price = None margin_used_from_ex = ex_pos.get('marginUsed') # Or other keys like 'initialMargin', 'maintenanceMargin' depending on exchange if margin_used_from_ex is not None: try: margin_used = float(margin_used_from_ex) except (ValueError, TypeError): margin_used = None else: margin_used = None leverage_from_ex = ex_pos.get('leverage') if leverage_from_ex is not None: try: leverage = float(leverage_from_ex) except (ValueError, TypeError): leverage = None else: leverage = None position_value_from_ex = ex_pos.get('notional') # 'notional' is common for position value if position_value_from_ex is not None: try: position_value = float(position_value_from_ex) except (ValueError, TypeError): position_value = None else: position_value = None # Fallback for position_value if notional is not available but mark_price and size are if position_value is None and mark_price is not None and current_position_size is not None: position_value = abs(current_position_size) * mark_price # ๐Ÿ†• Get P&L percentage from exchange if available roe_from_ex = ex_pos.get('percentage') # CCXT often uses 'percentage' for ROE if roe_from_ex is not None: try: unrealized_pnl_percentage_val = float(roe_from_ex) except (ValueError, TypeError): unrealized_pnl_percentage_val = None else: unrealized_pnl_percentage_val = None stats.update_trade_market_data( trade_lifecycle_id=lifecycle_id, unrealized_pnl=unrealized_pnl, mark_price=mark_price, current_position_size=current_position_size, entry_price=entry_price, # This will update the entry price if the exchange provides a new average liquidation_price=liquidation_price, margin_used=margin_used, leverage=leverage, position_value=position_value, unrealized_pnl_percentage=unrealized_pnl_percentage_val # Pass the new field ) # ๐Ÿ†• Detect immediate changes for responsive notifications if len(fresh_positions_list) != len(self.last_known_positions): logger.info(f"๐Ÿ“Š Position count changed: {len(self.last_known_positions)} โ†’ {len(current_exchange_position_map)}") if len(fresh_orders_list) != len(self.last_known_orders): logger.info(f"๐Ÿ“‹ Order count changed: {len(self.last_known_orders)} โ†’ {len(current_exchange_order_ids)}") except Exception as e: logger.error(f"โŒ Error updating cached data: {e}") def get_cached_positions(self): """Get cached positions (updated every heartbeat).""" return self.cached_positions or [] def get_cached_orders(self): """Get cached orders (updated every heartbeat).""" return self.cached_orders or [] def get_cached_balance(self): """Get cached balance (updated every heartbeat).""" return self.cached_balance def get_cache_age_seconds(self): """Get age of cached data in seconds.""" if not self.last_cache_update: return float('inf') return (datetime.now(timezone.utc) - self.last_cache_update).total_seconds() async def _check_order_fills(self): """Check for filled orders and send notifications.""" try: # Get current orders and positions current_orders = self.cached_orders or [] # Use cache current_positions = self.cached_positions or [] # Use cache # Get current order IDs current_order_ids = {order.get('id') for order in current_orders if order.get('id')} # Find filled orders (orders that were in last_known_orders but not in current_orders) disappeared_order_ids = self.last_known_orders - current_order_ids if disappeared_order_ids: logger.info(f"๐ŸŽฏ Detected {len(disappeared_order_ids)} bot orders no longer open: {list(disappeared_order_ids)}. Corresponding fills (if any) are processed by external trade checker.") await self._process_disappeared_orders(disappeared_order_ids) # Update tracking data for open bot orders self.last_known_orders = current_order_ids # Position state is primarily managed by TradingStats based on all fills. # This local tracking can provide supplementary logging if needed. # await self._update_position_tracking(current_positions) except Exception as e: logger.error(f"โŒ Error checking order fills: {e}") async def _process_filled_orders(self, filled_order_ids: set, current_positions: list): """Process filled orders using enhanced position tracking.""" try: # For bot-initiated orders, we'll detect changes in position size # and send appropriate notifications using the enhanced system # This method will be triggered when orders placed through the bot are filled # The external trade monitoring will handle trades made outside the bot # Update position tracking based on current positions await self._update_position_tracking(current_positions) except Exception as e: logger.error(f"โŒ Error processing filled orders: {e}") async def _update_position_tracking(self, current_positions: list): """Update position tracking and calculate P&L changes.""" try: new_position_map = {} formatter = get_formatter() # Get formatter for position in current_positions: symbol = position.get('symbol') contracts = float(position.get('contracts', 0)) entry_price = float(position.get('entryPx', 0)) if symbol and contracts != 0: new_position_map[symbol] = { 'contracts': contracts, 'entry_price': entry_price } # Compare with previous positions to detect changes for symbol, new_data in new_position_map.items(): old_data = self.last_known_positions.get(symbol) token = symbol.split('/')[0] if '/' in symbol else symbol # Extract token if not old_data: # New position opened amount_str = formatter.format_amount(new_data['contracts'], token) price_str = formatter.format_price_with_symbol(new_data['entry_price'], token) logger.info(f"๐Ÿ“ˆ New position detected (observed by MarketMonitor): {symbol} {amount_str} @ {price_str}. TradingStats is the definitive source.") elif abs(new_data['contracts'] - old_data['contracts']) > 0.000001: # Position size changed change = new_data['contracts'] - old_data['contracts'] change_str = formatter.format_amount(change, token) logger.info(f"๐Ÿ“Š Position change detected (observed by MarketMonitor): {symbol} {change_str} contracts. TradingStats is the definitive source.") # Check for closed positions for symbol in self.last_known_positions: if symbol not in new_position_map: logger.info(f"๐Ÿ“‰ Position closed (observed by MarketMonitor): {symbol}. TradingStats is the definitive source.") # Update tracking self.last_known_positions = new_position_map except Exception as e: logger.error(f"โŒ Error updating position tracking: {e}") async def _process_disappeared_orders(self, disappeared_order_ids: set): """Log and investigate bot orders that have disappeared from the exchange.""" stats = self.trading_engine.get_stats() if not stats: logger.warning("โš ๏ธ TradingStats not available in _process_disappeared_orders.") return try: total_linked_cancelled = 0 external_cancellations = [] for exchange_oid in disappeared_order_ids: order_in_db = stats.get_order_by_exchange_id(exchange_oid) if order_in_db: last_status = order_in_db.get('status', 'unknown') order_type = order_in_db.get('type', 'unknown') symbol = order_in_db.get('symbol', 'unknown') token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0] logger.info(f"Order {exchange_oid} was in our DB with status '{last_status}' but has now disappeared from exchange.") # Check if this was an unexpected disappearance (likely external cancellation) active_statuses = ['open', 'submitted', 'partially_filled', 'pending_submission'] if last_status in active_statuses: logger.warning(f"โš ๏ธ EXTERNAL CANCELLATION: Order {exchange_oid} with status '{last_status}' was likely cancelled externally on Hyperliquid") stats.update_order_status(exchange_order_id=exchange_oid, new_status='cancelled_externally') # Track external cancellations for notification external_cancellations.append({ 'exchange_oid': exchange_oid, 'token': token, 'type': order_type, 'last_status': last_status }) # Send notification about external cancellation if self.notification_manager: await self.notification_manager.send_generic_notification( f"โš ๏ธ External Order Cancellation Detected\n\n" f"Token: {token}\n" f"Order Type: {order_type.replace('_', ' ').title()}\n" f"Exchange Order ID: {exchange_oid[:8]}...\n" f"Previous Status: {last_status.replace('_', ' ').title()}\n" f"Source: Cancelled directly on Hyperliquid\n" f"Time: {datetime.now().strftime('%H:%M:%S')}\n\n" f"๐Ÿค– Bot status updated automatically" ) # If the order was externally cancelled, its status in DB is now 'cancelled_externally'. # We rely on TradingStats to correctly update order status if a fill came through. # If the status is still 'cancelled_externally' (or any other non-fill status), then proceed. order_after_external_check = stats.get_order_by_exchange_id(exchange_oid) if order_after_external_check and order_after_external_check.get('status') == 'cancelled_externally': # NEW: Check and cancel corresponding pending trade lifecycle pending_lc = stats.get_lifecycle_by_entry_order_id(exchange_oid, status='pending') if pending_lc: lc_id_to_cancel = pending_lc.get('trade_lifecycle_id') if lc_id_to_cancel: cancel_reason = f"entry_order_{exchange_oid[:8]}_disappeared_externally" cancelled_lc_success = stats.update_trade_cancelled(lc_id_to_cancel, reason=cancel_reason) if cancelled_lc_success: logger.info(f"๐Ÿ”— Trade lifecycle {lc_id_to_cancel} also cancelled for disappeared entry order {exchange_oid}.") if self.notification_manager: await self.notification_manager.send_generic_notification( f"๐Ÿ”— Trade Lifecycle Cancelled\n\n" f"Token: {token}\n" f"Lifecycle ID: {lc_id_to_cancel[:8]}...\\n" f"Reason: Entry order {exchange_oid[:8]}... cancelled externally.\\n" f"Time: {datetime.now().strftime('%H:%M:%S')}" ) else: logger.error(f"โŒ Failed to cancel trade lifecycle {lc_id_to_cancel} for entry order {exchange_oid}.") # Continue to check for linked SL orders regardless of lifecycle cancellation outcome for this parent order elif order_after_external_check and order_after_external_check.get('status') in ['filled', 'partially_filled']: logger.info(f"โ„น๏ธ Order {exchange_oid} was ultimately found to be '{order_after_external_check.get('status')}' despite initial disappearance. Stop losses will not be cancelled by this path. Lifecycle should be active.") continue # Skip SL cancellation if the order is now considered filled/partially_filled else: # Normal completion/cancellation - update status # This path is for orders that disappeared but their last known status wasn't active. # Example: bot cancelled it, then it disappeared. # If stats hasn't already marked it (e.g. 'cancelled_manually'), we mark it disappeared. if last_status not in ['filled', 'partially_filled', 'cancelled_manually', 'cancelled_by_bot', 'failed_submission', 'cancelled_externally']: stats.update_order_status(exchange_order_id=exchange_oid, new_status='disappeared_from_exchange') # Cancel any pending stop losses linked to this order (only if not filled) # The Trade object lifecycle (managed by TradingStats) is the source of truth. # If the parent order (order_in_db) is NOT considered filled, then its SLs can be cancelled. if order_in_db.get('bot_order_ref_id'): # Use the most recent state of the order from the database. parent_order_current_state = stats.get_order_by_exchange_id(exchange_oid) if parent_order_current_state and parent_order_current_state.get('status') not in ['filled', 'partially_filled']: logger.info(f"Cancelling stop losses for order {exchange_oid} (status: {parent_order_current_state.get('status')}) as it is not considered filled.") cancelled_sl_count = stats.cancel_linked_orders( parent_bot_order_ref_id=order_in_db['bot_order_ref_id'], new_status='cancelled_parent_disappeared_or_not_filled' # More descriptive status ) total_linked_cancelled += cancelled_sl_count if cancelled_sl_count > 0: logger.info(f"Cancelled {cancelled_sl_count} pending stop losses linked to disappeared/non-filled order {exchange_oid}") if self.notification_manager: await self.notification_manager.send_generic_notification( f"๐Ÿ›‘ Linked Stop Losses Cancelled\n\n" f"Token: {token}\n" f"Cancelled: {cancelled_sl_count} stop loss(es)\n" f"Reason: Parent order {exchange_oid[:8]}... disappeared or was not filled\n" f"Parent Status: {parent_order_current_state.get('status', 'N/A').replace('_', ' ').title()}\n" f"Time: {datetime.now().strftime('%H:%M:%S')}" ) else: logger.info(f"โ„น๏ธ Stop losses for order {exchange_oid} (status: {parent_order_current_state.get('status') if parent_order_current_state else 'N/A'}) preserved as parent order is considered filled or partially filled.") continue # Explicitly continue to the next disappeared_order_id else: logger.warning(f"Order {exchange_oid} disappeared from exchange but was not found in our DB. This might be an order placed externally.") # Send summary notification if multiple external cancellations occurred if len(external_cancellations) > 1: tokens_affected = list(set(item['token'] for item in external_cancellations)) if self.notification_manager: await self.notification_manager.send_generic_notification( f"โš ๏ธ Multiple External Cancellations Detected\n\n" f"Orders Cancelled: {len(external_cancellations)}\n" f"Tokens Affected: {', '.join(tokens_affected)}\n" f"Source: Direct cancellation on Hyperliquid\n" f"Linked Stop Losses Cancelled: {total_linked_cancelled}\n" f"Time: {datetime.now().strftime('%H:%M:%S')}\n\n" f"๐Ÿ’ก Check individual orders for details" ) except Exception as e: logger.error(f"โŒ Error processing disappeared orders: {e}", exc_info=True) async def _check_price_alarms(self): """Check price alarms and trigger notifications.""" try: active_alarms = self.alarm_manager.get_all_active_alarms() if not active_alarms: return # Group alarms by token to minimize API calls tokens_to_check = list(set(alarm['token'] for alarm in active_alarms)) for token in tokens_to_check: try: # Get current market price symbol = f"{token}/USDC:USDC" market_data = self.trading_engine.get_market_data(symbol) if not market_data or not market_data.get('ticker'): continue current_price = float(market_data['ticker'].get('last', 0)) if current_price <= 0: continue # Check alarms for this token token_alarms = [alarm for alarm in active_alarms if alarm['token'] == token] for alarm in token_alarms: target_price = alarm['target_price'] direction = alarm['direction'] # Check if alarm should trigger should_trigger = False if direction == 'above' and current_price >= target_price: should_trigger = True elif direction == 'below' and current_price <= target_price: should_trigger = True if should_trigger: # Trigger the alarm triggered_alarm = self.alarm_manager.trigger_alarm(alarm['id'], current_price) if triggered_alarm: await self._send_alarm_notification(triggered_alarm) except Exception as e: logger.error(f"Error checking alarms for {token}: {e}") except Exception as e: logger.error(f"โŒ Error checking price alarms: {e}") async def _send_alarm_notification(self, alarm: Dict[str, Any]): """Send notification for triggered alarm.""" try: # Send through notification manager if available if self.notification_manager: await self.notification_manager.send_alarm_triggered_notification( alarm['token'], alarm['target_price'], alarm['triggered_price'], alarm['direction'] ) else: # Fallback to logging if notification manager not available logger.info(f"๐Ÿ”” ALARM TRIGGERED: {alarm['token']} @ ${alarm['triggered_price']:,.2f}") except Exception as e: logger.error(f"โŒ Error sending alarm notification: {e}") async def _check_external_trades(self): """Check for trades made outside the Telegram bot and update stats.""" try: stats = self.trading_engine.get_stats() if not stats: logger.warning("TradingStats not available in _check_external_trades. Skipping.") return external_trades_processed = 0 symbols_with_fills = set() # Keep track of symbols with new fills for potential actions # Get recent fills from exchange recent_fills = self.trading_engine.get_recent_fills() if not recent_fills: logger.debug("No recent fills data available") return # Corrected to use self.last_processed_trade_time (no leading underscore) if not hasattr(self, 'last_processed_trade_time') or self.last_processed_trade_time is None: try: last_time_str = self.trading_engine.stats._get_metadata('last_processed_trade_time') if last_time_str: self.last_processed_trade_time = datetime.fromisoformat(last_time_str) else: # If no last processed time, start from 1 hour ago to avoid processing too much history self.last_processed_trade_time = datetime.now(timezone.utc) - timedelta(hours=1) except Exception: # Fallback on error self.last_processed_trade_time = datetime.now(timezone.utc) - timedelta(hours=1) for fill in recent_fills: try: trade_id = fill.get('id') timestamp_ms = fill.get('timestamp') symbol_from_fill = fill.get('symbol') side_from_fill = fill.get('side') amount_from_fill = float(fill.get('amount', 0)) price_from_fill = float(fill.get('price', 0)) timestamp_dt = datetime.fromtimestamp(timestamp_ms / 1000, tz=timezone.utc) if timestamp_ms else datetime.now(timezone.utc) if timestamp_dt <= self.last_processed_trade_time: continue fill_processed_this_iteration = False # Tracks if this specific fill is handled if not (symbol_from_fill and side_from_fill and amount_from_fill > 0 and price_from_fill > 0): logger.warning(f"Skipping fill with incomplete data: {fill}") continue full_symbol = symbol_from_fill token = symbol_from_fill.split('/')[0] if '/' in symbol_from_fill else symbol_from_fill.split(':')[0] # symbols_with_fills.add(token) # This was here, let's see if it should be moved or is used later exchange_order_id_from_fill = fill.get('info', {}).get('oid') # --- Lifecycle Processing --- # 1. Check if fill matches a PENDING trade lifecycle (entry fill) if exchange_order_id_from_fill: pending_lc = stats.get_lifecycle_by_entry_order_id(exchange_order_id_from_fill, status='pending') if pending_lc and pending_lc.get('symbol') == full_symbol: success = stats.update_trade_position_opened( lifecycle_id=pending_lc['trade_lifecycle_id'], entry_price=price_from_fill, entry_amount=amount_from_fill, exchange_fill_id=trade_id ) if success: logger.info(f"๐Ÿ“ˆ Lifecycle ENTRY: {pending_lc['trade_lifecycle_id']} for {full_symbol} updated by fill {trade_id}.") symbols_with_fills.add(token) order_in_db_for_entry = stats.get_order_by_exchange_id(exchange_order_id_from_fill) if order_in_db_for_entry: stats.update_order_status(order_db_id=order_in_db_for_entry['id'], new_status='filled', amount_filled_increment=amount_from_fill) fill_processed_this_iteration = True # 2. Check if fill matches an OPENED trade lifecycle (Bot Exit, SL/TP by OID, or External Close) if not fill_processed_this_iteration: active_lc = None closure_reason_action_type = None # e.g., "bot_exit_long_close", "sl_long_close", "external_long_close" bot_order_db_id_to_update = None # DB ID of the bot's order (exit, SL, TP) that this fill corresponds to if exchange_order_id_from_fill: # Attempt to link fill to a specific bot order (exit, SL, TP) first bot_order_for_fill = stats.get_order_by_exchange_id(exchange_order_id_from_fill) if bot_order_for_fill and bot_order_for_fill.get('symbol') == full_symbol: order_type = bot_order_for_fill.get('type') order_side = bot_order_for_fill.get('side') # Side of the bot's order # A. Check if this fill corresponds to a bot-initiated EXIT order if order_type == 'market': # Common for /exit, add other explicit exit types if any potential_lc = stats.get_trade_by_symbol_and_status(full_symbol, 'position_opened') if potential_lc: lc_pos_side = potential_lc.get('position_side') # Ensure bot's market order & this fill are closing the position if (lc_pos_side == 'long' and order_side == 'sell' and side_from_fill == 'sell') or \ (lc_pos_side == 'short' and order_side == 'buy' and side_from_fill == 'buy'): active_lc = potential_lc closure_reason_action_type = f"bot_exit_{lc_pos_side}_close" bot_order_db_id_to_update = bot_order_for_fill.get('id') logger.info(f"โ„น๏ธ Lifecycle BOT EXIT: Fill {trade_id} (OID {exchange_order_id_from_fill}) for {full_symbol} matches bot exit for lifecycle {active_lc['trade_lifecycle_id']}.") # B. If not a bot exit, check if it's an SL or TP order linked to a lifecycle if not active_lc: # Check only if not already matched as bot_exit # Check SL by OID (exchange_order_id_from_fill should be the SL's actual exchange OID) lc_by_sl = stats.get_lifecycle_by_sl_order_id(exchange_order_id_from_fill, status='position_opened') if lc_by_sl and lc_by_sl.get('symbol') == full_symbol: active_lc = lc_by_sl closure_reason_action_type = f"sl_{active_lc.get('position_side')}_close" # bot_order_for_fill is the SL order itself in this context bot_order_db_id_to_update = bot_order_for_fill.get('id') logger.info(f"โ„น๏ธ Lifecycle SL: Fill {trade_id} for OID {exchange_order_id_from_fill} matches SL for lifecycle {active_lc['trade_lifecycle_id']}.") if not active_lc: # Check TP only if not SL (and not bot exit) lc_by_tp = stats.get_lifecycle_by_tp_order_id(exchange_order_id_from_fill, status='position_opened') if lc_by_tp and lc_by_tp.get('symbol') == full_symbol: active_lc = lc_by_tp closure_reason_action_type = f"tp_{active_lc.get('position_side')}_close" # bot_order_for_fill is the TP order itself here bot_order_db_id_to_update = bot_order_for_fill.get('id') logger.info(f"โ„น๏ธ Lifecycle TP: Fill {trade_id} for OID {exchange_order_id_from_fill} matches TP for lifecycle {active_lc['trade_lifecycle_id']}.") # C. If fill was not matched to a specific bot order by OID, check for generic external closure if not active_lc: potential_lc_external = stats.get_trade_by_symbol_and_status(full_symbol, 'position_opened') if potential_lc_external: lc_pos_side = potential_lc_external.get('position_side') # Ensure fill side is opposite to position side if (lc_pos_side == 'long' and side_from_fill == 'sell') or \ (lc_pos_side == 'short' and side_from_fill == 'buy'): active_lc = potential_lc_external closure_reason_action_type = f"external_{lc_pos_side}_close" logger.info(f"โ„น๏ธ Lifecycle EXTERNAL CLOSE: Fill {trade_id} for {full_symbol} (no matching bot OID) for lifecycle {active_lc['trade_lifecycle_id']}.") # If a lifecycle was identified for closure by any of the above means if active_lc and closure_reason_action_type: lc_id = active_lc['trade_lifecycle_id'] lc_entry_price = active_lc.get('entry_price', 0) lc_position_side = active_lc.get('position_side') # From the identified active_lc realized_pnl = 0 if lc_position_side == 'long': realized_pnl = amount_from_fill * (price_from_fill - lc_entry_price) elif lc_position_side == 'short': realized_pnl = amount_from_fill * (lc_entry_price - price_from_fill) success = stats.update_trade_position_closed( lifecycle_id=lc_id, exit_price=price_from_fill, realized_pnl=realized_pnl, exchange_fill_id=trade_id ) if success: pnl_emoji = "๐ŸŸข" if realized_pnl >= 0 else "๐Ÿ”ด" formatter = get_formatter() logger.info(f"{pnl_emoji} Lifecycle CLOSED: {lc_id} ({closure_reason_action_type}). PNL for fill: {formatter.format_price_with_symbol(realized_pnl)}") symbols_with_fills.add(token) if self.notification_manager: await self.notification_manager.send_external_trade_notification( full_symbol, side_from_fill, amount_from_fill, price_from_fill, closure_reason_action_type, timestamp_dt.isoformat() ) stats._migrate_trade_to_aggregated_stats(lc_id) if bot_order_db_id_to_update: stats.update_order_status(order_db_id=bot_order_db_id_to_update, new_status='filled', amount_filled_increment=amount_from_fill) fill_processed_this_iteration = True # 3. Handle external stop loss executions (MarketMonitor's separate tracking) if not fill_processed_this_iteration: if exchange_order_id_from_fill and exchange_order_id_from_fill in self.external_stop_losses: stop_loss_info = self.external_stop_losses[exchange_order_id_from_fill] formatter = get_formatter() logger.info(f"๐Ÿ›‘ External SL (MM Tracking): {token} Order {exchange_order_id_from_fill} filled @ {formatter.format_price_with_symbol(price_from_fill, token)}") sl_active_lc = stats.get_trade_by_symbol_and_status(full_symbol, 'position_opened') if sl_active_lc: lc_id = sl_active_lc['trade_lifecycle_id'] lc_entry_price = sl_active_lc.get('entry_price', 0) lc_pos_side = sl_active_lc.get('position_side') realized_pnl = amount_from_fill * (price_from_fill - lc_entry_price) if lc_pos_side == 'long' else amount_from_fill * (lc_entry_price - price_from_fill) success = stats.update_trade_position_closed(lc_id, price_from_fill, realized_pnl, trade_id) if success: pnl_emoji = "๐ŸŸข" if realized_pnl >= 0 else "๐Ÿ”ด" logger.info(f"{pnl_emoji} Lifecycle CLOSED by External SL (MM): {lc_id}. PNL: {formatter.format_price_with_symbol(realized_pnl)}") if self.notification_manager: await self.notification_manager.send_stop_loss_execution_notification( stop_loss_info, full_symbol, side_from_fill, amount_from_fill, price_from_fill, f'{lc_pos_side}_closed_external_sl', timestamp_dt.isoformat(), realized_pnl ) # MIGRATE STATS stats._migrate_trade_to_aggregated_stats(lc_id) del self.external_stop_losses[exchange_order_id_from_fill] fill_processed_this_iteration = True else: logger.warning(f"โš ๏ธ External SL (MM) {exchange_order_id_from_fill} for {full_symbol}, but no active lifecycle found.") # --- Fallback for Fills Not Handled by Lifecycle Logic Above --- if not fill_processed_this_iteration: # NEW: Attempt to match this fill to close an existing open position # This handles cases where an order disappeared from DB, then its fill is processed as external existing_open_lc = stats.get_trade_by_symbol_and_status(full_symbol, 'position_opened') if existing_open_lc: lc_id = existing_open_lc['trade_lifecycle_id'] lc_entry_price = existing_open_lc.get('entry_price', 0) lc_position_side = existing_open_lc.get('position_side') lc_current_size_before_fill = existing_open_lc.get('current_position_size', 0) # Size before this fill is_potentially_closing_external_fill = False if lc_position_side == 'long' and side_from_fill.lower() == 'sell': is_potentially_closing_external_fill = True elif lc_position_side == 'short' and side_from_fill.lower() == 'buy': is_potentially_closing_external_fill = True if is_potentially_closing_external_fill: logger.info(f"โ„น๏ธ Detected potentially closing external fill {trade_id} for {full_symbol} (Lifecycle: {lc_id}). Verifying exchange position state...") # Fetch fresh position data from the exchange to confirm closure fresh_positions_after_fill = self.trading_engine.get_positions() or [] position_on_exchange_after_fill = None for pos in fresh_positions_after_fill: if pos.get('symbol') == full_symbol: position_on_exchange_after_fill = pos break position_is_closed_on_exchange = False if position_on_exchange_after_fill is None: position_is_closed_on_exchange = True logger.info(f"โœ… Exchange Verification: Position for {full_symbol} (Lifecycle: {lc_id}) not found after fill {trade_id}. Confirming closure.") elif abs(float(position_on_exchange_after_fill.get('contracts', 0))) < 1e-9: # Using a small tolerance for float comparison to zero position_is_closed_on_exchange = True logger.info(f"โœ… Exchange Verification: Position for {full_symbol} (Lifecycle: {lc_id}) has zero size on exchange after fill {trade_id}. Confirming closure.") if position_is_closed_on_exchange: # Position is confirmed closed on the exchange. # P&L should be calculated based on the size that was closed by this fill, # which we assume is lc_current_size_before_fill if the position is now entirely gone. # If the fill amount (amount_from_fill) is less than lc_current_size_before_fill # but the position is still gone, it implies other fills might have also occurred. # For simplicity here, we use amount_from_fill for P&L calculation relating to this specific fill, # assuming it's the one that effectively zeroed out the position or was the last part of it. # A more robust P&L would use lc_current_size_before_fill if that was the true amount closed. # Let's use lc_current_size_before_fill if the fill amount is very close to it, otherwise amount_from_fill. amount_for_pnl_calc = amount_from_fill # If the position is fully closed, and this fill's amount is very close to the total size, # assume this fill closed the entire remaining position. if abs(lc_current_size_before_fill - amount_from_fill) < 0.000001 * amount_from_fill: amount_for_pnl_calc = lc_current_size_before_fill logger.info(f"โ„น๏ธ Attempting to close lifecycle {lc_id} for {full_symbol} via confirmed external fill {trade_id}.") realized_pnl = 0 if lc_position_side == 'long': realized_pnl = amount_for_pnl_calc * (price_from_fill - lc_entry_price) elif lc_position_side == 'short': realized_pnl = amount_for_pnl_calc * (lc_entry_price - price_from_fill) success = stats.update_trade_position_closed( lifecycle_id=lc_id, exit_price=price_from_fill, # Price of this specific fill realized_pnl=realized_pnl, exchange_fill_id=trade_id ) if success: pnl_emoji = "๐ŸŸข" if realized_pnl >= 0 else "๐Ÿ”ด" formatter = get_formatter() logger.info(f"{pnl_emoji} Lifecycle CLOSED (Verified External): {lc_id}. PNL for fill: {formatter.format_price_with_symbol(realized_pnl)}") symbols_with_fills.add(token) if self.notification_manager: await self.notification_manager.send_external_trade_notification( full_symbol, side_from_fill, amount_from_fill, price_from_fill, f"verified_external_{lc_position_side}_close", timestamp_dt.isoformat() ) # MIGRATE STATS stats._migrate_trade_to_aggregated_stats(lc_id) fill_processed_this_iteration = True else: logger.error(f"โŒ Failed to close lifecycle {lc_id} via verified external fill {trade_id}.") else: # Position still exists on exchange, so this fill was not a full closure. # Do not mark fill_processed_this_iteration = True here. # Let the original fallback `stats.record_trade` handle this fill as "external_unmatched". # This is important so the fill is recorded, even if it does not close the lifecycle. current_size_on_exchange = float(position_on_exchange_after_fill.get('contracts', 0)) if position_on_exchange_after_fill else 'Unknown' logger.warning(f"โš ๏ธ External fill {trade_id} for {full_symbol} (Lifecycle: {lc_id}, Amount: {amount_from_fill}) did NOT fully close position. Exchange size now: {current_size_on_exchange}. Lifecycle remains open. Fill will be recorded as 'external_unmatched'.") # Future enhancement: Handle partial closure here by updating current_position_size and realizing partial P&L. # Original Fallback logic if still not processed if not fill_processed_this_iteration: # ---- START DIAGNOSTIC BLOCK ---- # Log details if this fill *should* have matched an open lifecycle but didn't. # This condition checks if a position count recently changed, suggesting a closure. # We access _update_cached_data's effect by checking self.last_known_positions vs current from cache. # Note: This is an approximation. A more robust check might involve tracking position count changes more directly. # Get current positions from cache (reflects state after _update_cached_data this cycle) current_positions_from_cache_map = { pos.get('symbol'): pos for pos in (self.cached_positions or []) if pos.get('symbol') and abs(float(pos.get('contracts', 0))) > 1e-9 } # Get DB's view of open positions all_open_positions_in_db = stats.get_open_positions() # Fetches all with status='position_opened' db_open_symbols = {pos_db.get('symbol') for pos_db in all_open_positions_in_db} # Check if the fill's symbol *should* have been in the DB as open, # but wasn't found by get_trade_by_symbol_and_status(full_symbol, 'position_opened') if full_symbol in db_open_symbols: # This is the critical contradiction: DB says it's open via get_open_positions, # but get_trade_by_symbol_and_status(full_symbol, ...) failed. # This should ideally not happen if queries are consistent. logger.error(f"๐Ÿšจ DIAGNOSTIC: Contradiction for {full_symbol}! get_open_positions() includes it, but get_trade_by_symbol_and_status('{full_symbol}', 'position_opened') failed to find it within _check_external_trades context for fill {trade_id}. This needs investigation into TradingStats symbol querying.") # More general diagnostic: if a position disappeared from exchange but we couldn't match this fill # This is a heuristic. A position count change was logged by _update_cached_data. # If len(current_positions_from_cache_map) < len(self.last_known_positions_before_update_this_cycle) # (hypothetical variable, self.last_known_positions is already updated) # For now, we just log if get_trade_by_symbol_and_status failed. potential_match_failure_logged = False if not stats.get_trade_by_symbol_and_status(full_symbol, 'position_opened'): # Re-check to be sure logger.warning(f"โš ๏ธ DIAGNOSTIC for UNMATCHED FILL {trade_id} ({full_symbol}):") logger.warning(f" Fill details: Side={side_from_fill}, Amount={amount_from_fill}, Price={price_from_fill}") logger.warning(f" Attempted lookup with full_symbol='{full_symbol}' and status='position_opened' found NO active lifecycle.") if all_open_positions_in_db: logger.warning(f" However, DB currently has these 'position_opened' lifecycles (symbol - lifecycle_id):") for db_pos in all_open_positions_in_db: logger.warning(f" - '{db_pos.get('symbol')}' - ID: {db_pos.get('trade_lifecycle_id')}") # Check for near matches (e.g. base token match) base_token_fill = full_symbol.split('/')[0].split(':')[0] near_matches = [db_s for db_s in db_open_symbols if base_token_fill in db_s] if near_matches: logger.warning(f" Possible near matches in DB for base token '{base_token_fill}': {near_matches}") else: logger.warning(f" No near matches found in DB for base token '{base_token_fill}'.") else: logger.warning(" DB has NO 'position_opened' lifecycles at all right now.") potential_match_failure_logged = True # ---- END DIAGNOSTIC BLOCK ---- linked_order_db_id = None if exchange_order_id_from_fill: order_in_db = stats.get_order_by_exchange_id(exchange_order_id_from_fill) if order_in_db: linked_order_db_id = order_in_db.get('id') logger.info(f"๐Ÿ”— Fallback: Fill {trade_id} for OID {exchange_order_id_from_fill} (DB ID {linked_order_db_id}) not tied to active lifecycle step.") current_status = order_in_db.get('status', '') if current_status in ['open', 'partially_filled', 'pending_submission']: amt_req = float(order_in_db.get('amount_requested', 0)) amt_filled_so_far = float(order_in_db.get('amount_filled',0)) new_status = 'partially_filled' if (amt_filled_so_far + amount_from_fill) >= amt_req - 1e-9: new_status = 'filled' stats.update_order_status( order_db_id=linked_order_db_id, new_status=new_status, amount_filled_increment=amount_from_fill ) logger.info(f"๐Ÿ“Š Updated bot order {linked_order_db_id} (fallback): {current_status} โ†’ {new_status}") if not (hasattr(stats, 'get_trade_by_exchange_fill_id') and stats.get_trade_by_exchange_fill_id(trade_id)): stats.record_trade( # Old record_trade for truly unassociated fills full_symbol, side_from_fill, amount_from_fill, price_from_fill, exchange_fill_id=trade_id, trade_type="external_unmatched", timestamp=timestamp_dt.isoformat(), linked_order_table_id_to_link=linked_order_db_id ) logger.info(f"๐Ÿ“‹ Recorded trade via FALLBACK: {trade_id} (Unmatched External Fill)") fill_processed_this_iteration = True # Fallback is also a form of processing for this fill # Update timestamp if any processing occurred for this fill if fill_processed_this_iteration: external_trades_processed += 1 if self.last_processed_trade_time is None or timestamp_dt > self.last_processed_trade_time: self.last_processed_trade_time = timestamp_dt except Exception as e: logger.error(f"Error processing fill {fill.get('id', 'N/A')}: {e}", exc_info=True) continue # Important to continue to next fill # Save the last processed timestamp to database if external_trades_processed > 0: # self.trading_engine.stats._set_metadata('last_processed_trade_time', self.last_processed_trade_time.isoformat()) # Corrected variable stats._set_metadata('market_monitor_last_processed_trade_time', self.last_processed_trade_time.isoformat()) logger.info(f"๐Ÿ’พ Saved MarketMonitor state (last_processed_trade_time) to DB: {self.last_processed_trade_time.isoformat()}") logger.info(f"๐Ÿ“Š Processed {external_trades_processed} external trades") if symbols_with_fills: # Check if set is not empty logger.info(f"โ„น๏ธ Symbols with processed fills this cycle: {list(symbols_with_fills)}") except Exception as e: logger.error(f"โŒ Error checking external trades: {e}", exc_info=True) async def _check_pending_triggers(self): """Check and process pending conditional triggers (e.g., SL/TP).""" stats = self.trading_engine.get_stats() if not stats: logger.warning("โš ๏ธ TradingStats not available in _check_pending_triggers.") return try: # Fetch pending SL triggers (adjust type if TP triggers are different) # For now, assuming 'STOP_LIMIT_TRIGGER' is the type used for SLs that become limit orders pending_sl_triggers = stats.get_orders_by_status(status='pending_trigger', order_type_filter='stop_limit_trigger') if not pending_sl_triggers: return logger.debug(f"Found {len(pending_sl_triggers)} pending SL triggers to check.") for trigger_order in pending_sl_triggers: symbol = trigger_order['symbol'] trigger_price = trigger_order['price'] # This is the stop price trigger_side = trigger_order['side'] # This is the side of the SL order (e.g., sell for a long position's SL) order_db_id = trigger_order['id'] parent_ref_id = trigger_order.get('parent_bot_order_ref_id') if not symbol or trigger_price is None: logger.warning(f"Invalid trigger order data for DB ID {order_db_id}, skipping: {trigger_order}") continue market_data = self.trading_engine.get_market_data(symbol) if not market_data or not market_data.get('ticker'): logger.warning(f"Could not fetch market data for {symbol} to check SL trigger {order_db_id}.") continue current_price = float(market_data['ticker'].get('last', 0)) if current_price <= 0: logger.warning(f"Invalid current price ({current_price}) for {symbol} checking SL trigger {order_db_id}.") continue trigger_hit = False if trigger_side.lower() == 'sell' and current_price <= trigger_price: trigger_hit = True logger.info(f"๐Ÿ”ด SL TRIGGER HIT (Sell): Order DB ID {order_db_id}, Symbol {symbol}, Trigger@ ${trigger_price:.4f}, Market@ ${current_price:.4f}") elif trigger_side.lower() == 'buy' and current_price >= trigger_price: trigger_hit = True logger.info(f"๐ŸŸข SL TRIGGER HIT (Buy): Order DB ID {order_db_id}, Symbol {symbol}, Trigger@ ${trigger_price:.4f}, Market@ ${current_price:.4f}") if trigger_hit: logger.info(f"Attempting to execute actual stop order for triggered DB ID: {order_db_id} (Parent Bot Ref: {trigger_order.get('parent_bot_order_ref_id')})") execution_result = await self.trading_engine.execute_triggered_stop_order(original_trigger_order_db_id=order_db_id) notification_message_detail = "" if execution_result.get("success"): new_trigger_status = 'triggered_order_placed' placed_sl_details = execution_result.get("placed_sl_order_details", {}) logger.info(f"Successfully placed actual SL order from trigger {order_db_id}. New SL Order DB ID: {placed_sl_details.get('order_db_id')}, Exchange ID: {placed_sl_details.get('exchange_order_id')}") notification_message_detail = f"Actual SL order placed (New DB ID: {placed_sl_details.get('order_db_id', 'N/A')})." else: new_trigger_status = 'trigger_execution_failed' error_msg = execution_result.get("error", "Unknown error during SL execution.") logger.error(f"Failed to execute actual SL order from trigger {order_db_id}: {error_msg}") notification_message_detail = f"Failed to place actual SL order: {error_msg}" stats.update_order_status(order_db_id=order_db_id, new_status=new_trigger_status) if self.notification_manager: await self.notification_manager.send_generic_notification( f"๐Ÿ”” Stop-Loss Update!\nSymbol: {symbol}\nSide: {trigger_side.upper()}\nTrigger Price: ${trigger_price:.4f}\nMarket Price: ${current_price:.4f}\n(Original Trigger DB ID: {order_db_id}, Parent: {parent_ref_id or 'N/A'})\nStatus: {new_trigger_status.replace('_', ' ').title()}\nDetails: {notification_message_detail}" ) except Exception as e: logger.error(f"โŒ Error checking pending SL triggers: {e}", exc_info=True) async def _check_automatic_risk_management(self): """Check for automatic stop loss triggers based on Config.STOP_LOSS_PERCENTAGE as safety net.""" try: # Skip if risk management is disabled or percentage is 0 if not getattr(Config, 'RISK_MANAGEMENT_ENABLED', True) or Config.STOP_LOSS_PERCENTAGE <= 0: return # Get current positions positions = self.cached_positions or [] # Use cache if not positions: # If no positions exist, clean up any orphaned pending stop losses await self._cleanup_orphaned_stop_losses() return for position in positions: try: symbol = position.get('symbol', '') contracts = float(position.get('contracts', 0)) entry_price = float(position.get('entryPx', 0)) mark_price = float(position.get('markPx', 0)) unrealized_pnl = float(position.get('unrealizedPnl', 0)) # Skip if no position or missing data if contracts == 0 or entry_price <= 0 or mark_price <= 0: continue # Calculate PnL percentage based on entry value entry_value = abs(contracts) * entry_price if entry_value <= 0: continue pnl_percentage = (unrealized_pnl / entry_value) * 100 # Check if loss exceeds the safety threshold if pnl_percentage <= -Config.STOP_LOSS_PERCENTAGE: token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0] position_side = "LONG" if contracts > 0 else "SHORT" # Fetch the active trade lifecycle for logging context stats = self.trading_engine.get_stats() lifecycle_id_str = "N/A" if stats: active_trade_lc = stats.get_trade_by_symbol_and_status(symbol, 'position_opened') if active_trade_lc: lifecycle_id_str = active_trade_lc.get('trade_lifecycle_id', "N/A")[:8] + "..." logger.warning(f"๐Ÿšจ AUTOMATIC STOP LOSS TRIGGERED: {token} {position_side} position (Lifecycle: {lifecycle_id_str}) has {pnl_percentage:.2f}% loss (threshold: -{Config.STOP_LOSS_PERCENTAGE}%)") # Send notification before attempting exit if self.notification_manager: await self.notification_manager.send_generic_notification( f"๐Ÿšจ AUTOMATIC STOP LOSS TRIGGERED!\\n" f"Token: {token}\\n" f"Lifecycle ID: {lifecycle_id_str}\\n" f"Position: {position_side} {abs(contracts):.6f}\\n" f"Entry Price: ${entry_price:.4f}\\n" f"Current Price: ${mark_price:.4f}\\n" f"Unrealized P&L: ${unrealized_pnl:.2f} ({pnl_percentage:.2f}%)\\n" f"Safety Threshold: -{Config.STOP_LOSS_PERCENTAGE}%\\n" f"Action: Executing emergency exit order..." ) # Execute emergency exit order exit_result = await self.trading_engine.execute_exit_order(token) if exit_result.get('success'): placed_order_details = exit_result.get('order_placed_details', {}) logger.info(f"โœ… Emergency exit order placed for {token} (Lifecycle: {lifecycle_id_str}). Order details: {placed_order_details}") # Cancel any pending stop losses for this symbol since position is now closed # stats object is already fetched above if stats: cancelled_sl_count = stats.cancel_pending_stop_losses_by_symbol( symbol=symbol, new_status='cancelled_auto_exit' ) if cancelled_sl_count > 0: logger.info(f"๐Ÿ›‘ Cancelled {cancelled_sl_count} pending stop losses for {symbol} (Lifecycle: {lifecycle_id_str}) after automatic exit") if self.notification_manager: await self.notification_manager.send_generic_notification( f"โœ… Emergency Exit Initiated\\n\\n" f"๐Ÿ“Š Position: {token} {position_side}\\n" f"๐Ÿ†” Lifecycle ID: {lifecycle_id_str}\\n" f"๐Ÿ“‰ Loss at Trigger: {pnl_percentage:.2f}% (${unrealized_pnl:.2f})\\n" f"โš ๏ธ Threshold: -{Config.STOP_LOSS_PERCENTAGE}%\\n" f"โœ… Action: Market exit order placed successfully\\n" f"๐Ÿ†” Exit Order ID: {placed_order_details.get('exchange_order_id', 'N/A')}\\n" f"{f'๐Ÿ›‘ Cleanup: Cancelled {cancelled_sl_count} other pending stop losses' if cancelled_sl_count > 0 else ''}\\n\\n" f"๐Ÿ›ก๏ธ The system will confirm closure and P&L once the exit order fill is processed." ) else: error_msg = exit_result.get('error', 'Unknown error') logger.error(f"โŒ Failed to execute emergency exit order for {token} (Lifecycle: {lifecycle_id_str}): {error_msg}") if self.notification_manager: await self.notification_manager.send_generic_notification( f"โŒ CRITICAL: Emergency Exit Failed!\\n\\n" f"๐Ÿ“Š Position: {token} {position_side}\\n" f"๐Ÿ†” Lifecycle ID: {lifecycle_id_str}\\n" f"๐Ÿ“‰ Loss: {pnl_percentage:.2f}%\\n" f"โŒ Error Placing Order: {error_msg}\\n\\n" f"โš ๏ธ MANUAL INTERVENTION REQUIRED\\n" f"Please close this position manually via /exit {token}" ) except Exception as pos_error: logger.error(f"Error processing position for automatic stop loss: {pos_error}") continue except Exception as e: logger.error(f"โŒ Error in automatic risk management check: {e}", exc_info=True) async def _cleanup_orphaned_stop_losses(self): """Clean up pending stop losses that no longer have corresponding positions OR whose parent orders have been cancelled/failed.""" try: stats = self.trading_engine.get_stats() if not stats: return pending_stop_losses = stats.get_orders_by_status('pending_trigger', 'stop_limit_trigger') if not pending_stop_losses: return logger.debug(f"Checking {len(pending_stop_losses)} pending stop losses for orphaned orders") current_positions = self.cached_positions or [] position_symbols = set() if current_positions: for pos in current_positions: symbol = pos.get('symbol') contracts = float(pos.get('contracts', 0)) if symbol and contracts != 0: position_symbols.add(symbol) orphaned_count = 0 for sl_order in pending_stop_losses: symbol = sl_order.get('symbol') order_db_id = sl_order.get('id') parent_bot_ref_id = sl_order.get('parent_bot_order_ref_id') should_cancel = False cancel_reason = "" if parent_bot_ref_id: parent_order = stats.get_order_by_bot_ref_id(parent_bot_ref_id) if parent_order: parent_status = parent_order.get('status', '').lower() if parent_order.get('exchange_order_id'): entry_oid = parent_order['exchange_order_id'] lc_pending = stats.get_lifecycle_by_entry_order_id(entry_oid, status='pending') lc_cancelled = stats.get_lifecycle_by_entry_order_id(entry_oid, status='cancelled') lc_opened = stats.get_lifecycle_by_entry_order_id(entry_oid, status='position_opened') if parent_status == 'cancelled_externally': if lc_cancelled: should_cancel = True cancel_reason = f"parent order ({entry_oid[:6]}...) {parent_status} and lifecycle explicitly cancelled" elif not lc_pending and not lc_opened: should_cancel = True cancel_reason = f"parent order ({entry_oid[:6]}...) {parent_status} and no active/pending/cancelled lifecycle found" else: current_lc_status = "N/A" if lc_pending: current_lc_status = lc_pending.get('status') elif lc_opened: current_lc_status = lc_opened.get('status') logger.info(f"SL {order_db_id} for parent {parent_bot_ref_id} (status {parent_status}) - lifecycle is '{current_lc_status}'. SL not cancelled by this rule.") should_cancel = False elif parent_status in ['failed_submission', 'failed_submission_no_data', 'cancelled_manually', 'disappeared_from_exchange']: if not lc_opened: should_cancel = True cancel_reason = f"parent order ({entry_oid[:6]}...) {parent_status} and no 'position_opened' lifecycle" else: logger.info(f"SL {order_db_id} for parent {parent_bot_ref_id} (status {parent_status}) - lifecycle is 'position_opened'. SL not cancelled by this rule.") should_cancel = False elif parent_status == 'filled': if symbol not in position_symbols: should_cancel = True cancel_reason = "parent filled but actual position no longer exists" # If parent_status is 'open' and has exchange_order_id, it falls through, should_cancel remains False (correct). else: # Parent order does not have an exchange_order_id in the fetched DB record # Possible states for parent_status here: 'pending_submission', 'open' (anomalous), or other non-live states. if parent_status in ['open', 'pending_submission', 'submitted']: # 'submitted' can be another pre-fill active state logger.info(f"SL Cleanup: Parent order {parent_order.get('id')} (status '{parent_status}') is missing exchange_id in DB record. SL {sl_order.get('id')} will NOT be cancelled by this rule, assuming parent is still active or pending.") should_cancel = False # Preserve SL if parent is in an active or attempting-to-be-active state if parent_status == 'open': # This specific case is a data anomaly logger.warning(f"SL Cleanup: DATA ANOMALY - Parent order {parent_order.get('id')} status is 'open' but fetched DB record shows no exchange_id. Investigate DB state for order {parent_order.get('id')}.") else: # Parent is in a non-live/non-pending status (e.g., 'failed_submission' before getting an ID) # and never got an exchange_id. It's likely safe to cancel the SL. should_cancel = True cancel_reason = f"parent order {parent_status} (no exch_id) and status indicates it's not live/pending." else: # Parent order not found in DB by bot_order_ref_id should_cancel = True cancel_reason = "parent order not found in database" else: # No parent_bot_ref_id on the SL order itself. # This SL is not tied to a bot order, so cancel if no position for the symbol. if symbol not in position_symbols: should_cancel = True cancel_reason = "no position exists and no parent reference" if should_cancel: success = stats.update_order_status( order_db_id=order_db_id, new_status='cancelled_orphaned' ) if success: orphaned_count += 1 token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0] logger.info(f"๐Ÿงน Cancelled orphaned stop loss for {token} (Order DB ID: {order_db_id}) - Reason: {cancel_reason}") if orphaned_count > 0: logger.info(f"๐Ÿงน Cleanup completed: Cancelled {orphaned_count} orphaned stop loss order(s)") if self.notification_manager: await self.notification_manager.send_generic_notification( f"๐Ÿงน Cleanup Completed\n\n" f"Cancelled {orphaned_count} orphaned stop loss order(s)\n" f"Reason: Parent orders invalid or positions closed externally\n" f"Time: {datetime.now().strftime('%H:%M:%S')}\n\n" f"๐Ÿ’ก This ensures stop losses sync with actual orders/positions." ) except Exception as e: logger.error(f"โŒ Error cleaning up orphaned stop losses: {e}", exc_info=True) async def _activate_pending_stop_losses_from_trades(self): """๐Ÿ†• PHASE 4: Check trades table for pending stop loss activation first (highest priority)""" try: stats = self.trading_engine.get_stats() if not stats: return formatter = get_formatter() # Get formatter trades_needing_sl = stats.get_pending_stop_loss_activations() if not trades_needing_sl: return logger.debug(f"๐Ÿ†• Found {len(trades_needing_sl)} open positions needing stop loss activation") for position_trade in trades_needing_sl: try: symbol = position_trade['symbol'] token = symbol.split('/')[0] if '/' in symbol else symbol stop_loss_price = position_trade['stop_loss_price'] position_side = position_trade['position_side'] # 'long' or 'short' # current_amount = position_trade.get('current_position_size', 0) # Amount not needed for SL placement logic here lifecycle_id = position_trade['trade_lifecycle_id'] # Get current market price current_price = None try: market_data = self.trading_engine.get_market_data(symbol) if market_data and market_data.get('ticker'): current_price = float(market_data['ticker'].get('last', 0)) except Exception as price_error: logger.warning(f"Could not fetch current price for {symbol} for SL activation of {lifecycle_id}: {price_error}") # Determine stop loss side based on position side sl_side = 'sell' if position_side == 'long' else 'buy' trigger_already_hit = False trigger_reason = "" if current_price and current_price > 0 and stop_loss_price and stop_loss_price > 0: current_price_str = formatter.format_price_with_symbol(current_price, token) stop_loss_price_str = formatter.format_price_with_symbol(stop_loss_price, token) if sl_side == 'sell' and current_price <= stop_loss_price: trigger_already_hit = True trigger_reason = f"LONG SL: Current {current_price_str} โ‰ค Stop {stop_loss_price_str}" elif sl_side == 'buy' and current_price >= stop_loss_price: trigger_already_hit = True trigger_reason = f"SHORT SL: Current {current_price_str} โ‰ฅ Stop {stop_loss_price_str}" if trigger_already_hit: logger.warning(f"๐Ÿšจ IMMEDIATE SL EXECUTION (Trades Table): {token} (Lifecycle: {lifecycle_id[:8]}) - {trigger_reason}. Executing market exit.") try: # Execute market order to close position exit_result = await self.trading_engine.execute_exit_order(token) # Assumes token is sufficient for exit if exit_result.get('success'): exit_order_id = exit_result.get('order_placed_details', {}).get('exchange_order_id', 'N/A') logger.info(f"โœ… Immediate {position_side.upper()} SL execution successful for {token} (Lifecycle: {lifecycle_id[:8]}). Market order {exit_order_id} placed.") # The actual lifecycle closure will be handled by _check_external_trades when this market order fill is processed. # We can mark the SL as "handled" on the lifecycle to prevent re-triggering here. stats.link_stop_loss_to_trade(lifecycle_id, f"immediate_market_exit_{exit_order_id}", stop_loss_price) if self.notification_manager: # Re-fetch formatted prices for notification if not already strings current_price_str_notify = formatter.format_price_with_symbol(current_price, token) if current_price else "N/A" stop_loss_price_str_notify = formatter.format_price_with_symbol(stop_loss_price, token) if stop_loss_price else "N/A" await self.notification_manager.send_generic_notification( f"๐Ÿšจ Immediate Stop Loss Execution\n\n" f"๐Ÿ†• Source: Unified Trades Table\n" f"Token: {token}\n" f"Lifecycle ID: {lifecycle_id[:8]}...\n" f"Position Type: {position_side.upper()}\n" f"SL Trigger Price: {stop_loss_price_str_notify}\n" f"Current Market Price: {current_price_str_notify}\n" f"Trigger Logic: {trigger_reason}\n" f"Action: Market close order placed immediately\n" f"Exit Order ID: {exit_order_id}\n" f"Time: {datetime.now().strftime('%H:%M:%S')}" ) else: logger.error(f"โŒ Failed to execute immediate market SL for {token} (Lifecycle: {lifecycle_id[:8]}): {exit_result.get('error')}") except Exception as exec_error: logger.error(f"โŒ Exception during immediate market SL execution for {token} (Lifecycle: {lifecycle_id[:8]}): {exec_error}") else: # Normal activation - place stop loss order (which creates a 'pending_trigger' in DB) try: # The execute_stop_loss_order should create the 'pending_trigger' order in the DB # and return details, including the DB ID of this 'pending_trigger' order. sl_result = await self.trading_engine.execute_stop_loss_order( token=token, # Assuming token is enough for execute_stop_loss_order stop_price=stop_loss_price, # Pass lifecycle_id or other refs if needed by trading_engine to create the DB order trade_lifecycle_id_for_sl=lifecycle_id ) if sl_result.get('success'): # The 'order_placed_details' should contain the exchange_order_id of the *actual* SL order # if it was placed directly, OR the db_id of the 'pending_trigger' order. # For this flow, execute_stop_loss_order is expected to manage the DB record for 'pending_trigger'. # We then link this 'pending_trigger' order's concept (e.g. its future exchange_order_id if known, or a reference) # to the trade lifecycle. # Assuming sl_result might give us the exchange_order_id if the SL is directly placed, # or a reference to the DB order that is now 'pending_trigger'. # TradingStats.link_stop_loss_to_trade expects the actual exchange order ID of the stop loss. # This implies execute_stop_loss_order directly places it or we have a two step. # Given _check_pending_triggers, execute_stop_loss_order likely sets up the DB record. # For now, let's assume sl_result gives an identifier for the placed SL concept. sl_exchange_order_id = sl_result.get('order_placed_details', {}).get('exchange_order_id') # This might be of the actual order or the trigger sl_db_order_id = sl_result.get('order_placed_details', {}).get('order_db_id') # ID of the order in 'orders' table # We need to link the trade lifecycle to the concept of the SL order. # If sl_exchange_order_id is directly available (e.g. for exchange-based SLs), use it. # If it's a DB-managed trigger, the sl_db_order_id (of 'pending_trigger' type) is key. # TradingStats.link_stop_loss_to_trade primarily expects the 'stop_loss_order_id' (exchange ID). # This part needs to be clear: # Option A: execute_stop_loss_order directly places a conditional order on exchange, returns its ID. # Option B: execute_stop_loss_order creates a 'pending_trigger' in DB. _check_pending_triggers later places it. # If B, then link_stop_loss_to_trade might need to store the DB ID of the trigger, # or wait until _check_pending_triggers successfully places it. # The current TradingStats.link_stop_loss_to_trade seems to expect an exchange_order_id. # For now, assume execute_stop_loss_order in TE handles creation of DB order, # and if it gets an exchange_order_id immediately (e.g. for a true stop-market order), it's returned. # If it only creates a pending_trigger, the exchange_order_id might be null initially in order_placed_details. if sl_exchange_order_id: # If an actual exchange order ID was returned for the SL stats.link_stop_loss_to_trade(lifecycle_id, sl_exchange_order_id, stop_loss_price) stop_loss_price_str_log = formatter.format_price_with_symbol(stop_loss_price, token) logger.info(f"โœ… Activated {position_side.upper()} stop loss for {token} (Lifecycle: {lifecycle_id[:8]}): SL Price {stop_loss_price_str_log}, Exchange SL Order ID: {sl_exchange_order_id}") if self.notification_manager: current_price_str_notify = formatter.format_price_with_symbol(current_price, token) if current_price else 'Unknown' stop_loss_price_str_notify = formatter.format_price_with_symbol(stop_loss_price, token) await self.notification_manager.send_generic_notification( f"๐Ÿ›‘ Stop Loss Activated\n\n" f"๐Ÿ†• Source: Unified Trades Table\n" f"Token: {token}\n" f"Lifecycle ID: {lifecycle_id[:8]}...\n" f"Position Type: {position_side.upper()}\n" f"Stop Loss Price: {stop_loss_price_str_notify}\n" f"Current Price: {current_price_str_notify}\n" f"Exchange SL Order ID: {sl_exchange_order_id}\n" # Actual exchange order f"Time: {datetime.now().strftime('%H:%M:%S')}" ) elif sl_db_order_id: # If a DB order (pending_trigger) was created # If execute_stop_loss_order just creates a pending_trigger in DB, # we should still mark the lifecycle's stop_loss_price. # The stop_loss_order_id in the lifecycle might remain NULL until _check_pending_triggers places it. # Or, link_stop_loss_to_trade could be adapted to also store the trigger_db_id. # For simplicity now, assume link_stop_loss_to_trade focuses on exchange_order_id. # We'll rely on _check_pending_triggers to eventually place it. # The key is that the lifecycle now has stop_loss_price set. # We need to make sure get_pending_stop_loss_activations() correctly excludes # lifecycles where SL processing has begun (even if only a DB trigger exists). # TradingStats.link_stop_loss_to_trade is what sets stop_loss_order_id. # If we don't have an exchange ID yet, we can't call it. # This means get_pending_stop_loss_activations will keep returning this trade # until an exchange SL ID is linked. # This suggests execute_stop_loss_order SHOULD try to place and get an ID, # or the flow is more complex. # Revised Assumption: execute_stop_loss_order either: # 1. Places SL on exchange, returns exchange_order_id -> link_stop_loss_to_trade. # 2. Creates a 'pending_trigger' DB order AND updates lifecycle's stop_loss_order_id with a temporary ref or the trigger_db_id. # Let's assume for now, if sl_db_order_id is returned, it means a DB trigger was set up. # The lifecycle's stop_loss_order_id will be set once the actual exchange order is placed by _check_pending_triggers. # This means get_pending_stop_loss_activations might need refinement. # For now, if we have a db_order_id, we assume the process has started. logger.info(f"โœ… Initiated {position_side.upper()} stop loss process for {token} (Lifecycle: {lifecycle_id[:8]}): SL Price ${stop_loss_price:.4f}. DB Trigger Order ID: {sl_db_order_id}. Waiting for market trigger.") if self.notification_manager: await self.notification_manager.send_generic_notification( f"๐Ÿ›ก๏ธ Stop Loss Armed (Pending Trigger)\n\n" f"Token: {token}\n" f"Lifecycle ID: {lifecycle_id[:8]}...\n" f"Position Type: {position_side.upper()}\n" f"Stop Loss Price: ${stop_loss_price:.4f}\n" f"Current Price: ${current_price:.4f if current_price else 'Unknown'}\n" f"Status: Monitoring market to place SL order at trigger.\n" f"DB Trigger ID: {sl_db_order_id}\n" f"Time: {datetime.now().strftime('%H:%M:%S')}" ) # To prevent re-processing by get_pending_stop_loss_activations, # we need to signify that SL setup for this price is in progress. # This might involve setting a temporary value in trade.stop_loss_order_id # or adding another field like 'stop_loss_status'. # For now, this log indicates it's being handled. # A simple way is to link it with a temporary ID. stats.link_stop_loss_to_trade(lifecycle_id, f"pending_db_trigger_{sl_db_order_id}", stop_loss_price) else: # No exchange_id and no db_id returned, but success true? Unlikely. logger.warning(f"โš ๏ธ Stop loss activation for {token} (Lifecycle: {lifecycle_id[:8]}) reported success but no order ID (Exchange or DB) provided.") else: logger.error(f"โŒ Failed to activate SL for {token} (Lifecycle: {lifecycle_id[:8]}): {sl_result.get('error')}") except Exception as activation_error: logger.error(f"โŒ Exception during SL activation for {token} (Lifecycle: {lifecycle_id[:8]}): {activation_error}") except Exception as trade_error: logger.error(f"โŒ Error processing position trade for SL activation (Lifecycle: {position_trade.get('trade_lifecycle_id','N/A')}): {trade_error}") except Exception as e: logger.error(f"โŒ Error activating pending stop losses from trades table: {e}", exc_info=True) async def _check_for_recent_fills_for_order(self, exchange_oid, order_in_db): """Check for very recent fills that might match this order.""" try: # Get recent fills from exchange recent_fills = self.trading_engine.get_recent_fills() if not recent_fills: return False # Corrected to use self.last_processed_trade_time if not hasattr(self, 'last_processed_trade_time') or self.last_processed_trade_time is None: # Attempt to load if not present (should generally be pre-loaded) try: last_time_str = self.trading_engine.stats._get_metadata('last_processed_trade_time') if last_time_str: self.last_processed_trade_time = datetime.fromisoformat(last_time_str) else: self.last_processed_trade_time = datetime.now(timezone.utc) - timedelta(hours=1) except Exception: # Fallback on error self.last_processed_trade_time = datetime.now(timezone.utc) - timedelta(hours=1) for fill in recent_fills: try: trade_id = fill.get('id') timestamp_ms = fill.get('timestamp') symbol_from_fill = fill.get('symbol') side_from_fill = fill.get('side') amount_from_fill = float(fill.get('amount', 0)) price_from_fill = float(fill.get('price', 0)) timestamp_dt = datetime.fromtimestamp(timestamp_ms / 1000, tz=timezone.utc) if timestamp_ms else datetime.now(timezone.utc) # Skip if already processed by the main external trade checker logic if timestamp_dt <= self.last_processed_trade_time: continue if symbol_from_fill and side_from_fill and amount_from_fill > 0 and price_from_fill > 0: exchange_order_id_from_fill = fill.get('info', {}).get('oid') if exchange_order_id_from_fill == exchange_oid: # Check if this fill matches the order details (symbol, side, approx amount) if order_in_db.get('symbol') == symbol_from_fill and \ order_in_db.get('side') == side_from_fill and \ abs(float(order_in_db.get('amount_requested', 0)) - amount_from_fill) < 0.01 * amount_from_fill : # Allow 1% tolerance logger.info(f"โœ… Found recent matching fill {trade_id} for order {exchange_oid}. Not cancelling stop losses.") return True except Exception as e: logger.error(f"Error processing fill {fill.get('id','N/A')} in _check_for_recent_fills_for_order: {e}") continue return False except Exception as e: logger.error(f"โŒ Error in _check_for_recent_fills_for_order for OID {exchange_oid}: {e}", exc_info=True) return False async def _check_external_stop_loss_orders(self): """Check for externally placed stop loss orders and track them.""" try: # Get current open orders open_orders = self.cached_orders or [] # Use cache if not open_orders: return # Get current positions to understand what could be stop losses positions = self.cached_positions or [] # Use cache if not positions: return # Create a map of current positions position_map = {} for position in positions: symbol = position.get('symbol') contracts = float(position.get('contracts', 0)) if symbol and contracts != 0: token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0] position_map[token] = { 'symbol': symbol, 'contracts': contracts, 'side': 'long' if contracts > 0 else 'short', 'entry_price': float(position.get('entryPx', 0)) } # Check each order to see if it could be a stop loss newly_detected = 0 for order in open_orders: try: exchange_order_id = order.get('id') symbol = order.get('symbol') side = order.get('side') # 'buy' or 'sell' amount = float(order.get('amount', 0)) price = float(order.get('price', 0)) # order_type = order.get('type', '').lower() # Not strictly needed for this detection logic if not all([exchange_order_id, symbol, side, amount, price]): continue # Skip if we're already tracking this order if exchange_order_id in self.external_stop_losses: continue token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0] if token not in position_map: continue position_data = position_map[token] is_potential_stop_loss = False # For a LONG position, a SELL order below current entry could be a stop. if position_data['side'] == 'long' and side == 'sell' and price < position_data['entry_price']: is_potential_stop_loss = True # For a SHORT position, a BUY order above current entry could be a stop. elif position_data['side'] == 'short' and side == 'buy' and price > position_data['entry_price']: is_potential_stop_loss = True if is_potential_stop_loss: self.external_stop_losses[exchange_order_id] = { 'token': token, 'symbol': symbol, 'trigger_price': price, # This is the order's price, acting as trigger 'side': side, # Side of the SL order itself 'amount': amount, 'position_side': position_data['side'], # Side of the position it's protecting 'detected_at': datetime.now(timezone.utc), 'entry_price': position_data['entry_price'] # Entry of the protected position } newly_detected += 1 logger.info(f"๐Ÿ›‘ Detected potential external stop loss: {token} {side.upper()} {amount} @ ${price:.2f} (protecting {position_data['side'].upper()} position)") except Exception as e: logger.error(f"Error analyzing order for stop loss detection: {e}") continue if newly_detected > 0: logger.info(f"๐Ÿ” Detected {newly_detected} new potential external stop loss orders") except Exception as e: logger.error(f"โŒ Error checking external stop loss orders: {e}") async def _cleanup_external_stop_loss_tracking(self): """Clean up external stop loss orders that are no longer active.""" try: if not self.external_stop_losses: return # Get current open orders open_orders = self.cached_orders or [] # Use cache if not open_orders: removed_count = len(self.external_stop_losses) if removed_count > 0: logger.info(f"๐Ÿงน Cleared {removed_count} external stop loss orders (no open orders on exchange)") self.external_stop_losses.clear() return current_order_ids = {order.get('id') for order in open_orders if order.get('id')} to_remove = [order_id for order_id in self.external_stop_losses if order_id not in current_order_ids] for order_id in to_remove: stop_loss_info = self.external_stop_losses.pop(order_id) # Use pop to remove and get value logger.info(f"๐Ÿ—‘๏ธ Removed external stop loss tracking for {stop_loss_info['token']} order {order_id} (no longer open)") if to_remove: logger.info(f"๐Ÿงน Cleaned up {len(to_remove)} disappeared external stop loss orders from tracking") except Exception as e: logger.error(f"โŒ Error cleaning up external stop loss tracking: {e}") async def _auto_sync_orphaned_positions(self): """Automatically detect and sync orphaned positions (positions on exchange without trade lifecycle records).""" try: stats = self.trading_engine.get_stats() if not stats: return formatter = get_formatter() exchange_positions = self.cached_positions or [] # Use fresh cache synced_count = 0 for exchange_pos in exchange_positions: symbol = exchange_pos.get('symbol') contracts_abs = abs(float(exchange_pos.get('contracts', 0))) if not (symbol and contracts_abs > 1e-9): # Ensure position is substantial continue # Check if we have an active trade lifecycle record for this position # A more robust check would be against a specific exchange identifier for the position if available existing_trade = stats.get_trade_by_symbol_and_status(symbol, 'position_opened') if not existing_trade: entry_price_from_exchange = float(exchange_pos.get('entryPrice', 0)) or float(exchange_pos.get('entryPx', 0)) position_side, order_side = '', '' ccxt_side = exchange_pos.get('side', '').lower() if ccxt_side == 'long': position_side, order_side = 'long', 'buy' elif ccxt_side == 'short': position_side, order_side = 'short', 'sell' if not position_side: # Fallback to raw info raw_info = exchange_pos.get('info', {}).get('position', {}) if isinstance(raw_info, dict): szi_str = raw_info.get('szi') if szi_str is not None: try: szi_val = float(szi_str) except ValueError: szi_val = 0 if szi_val > 1e-9: position_side, order_side = 'long', 'buy' elif szi_val < -1e-9: position_side, order_side = 'short', 'sell' if not position_side: # Final fallback contracts_val = float(exchange_pos.get('contracts',0)) if contracts_val > 1e-9: position_side, order_side = 'long', 'buy' elif contracts_val < -1e-9: position_side, order_side = 'short', 'sell' # Assumes negative for short else: logger.warning(f"AUTO-SYNC: Position size is effectively 0 for {symbol} after side checks, skipping sync. Data: {exchange_pos}") continue if not position_side: logger.error(f"AUTO-SYNC: CRITICAL - Could not determine position side for {symbol}. Data: {exchange_pos}. Skipping.") continue token = symbol.split('/')[0] if '/' in symbol else symbol actual_contracts_size = contracts_abs # Already absolute final_entry_price = entry_price_from_exchange price_source_log = "(exchange data)" if not final_entry_price or final_entry_price <= 0: estimated_entry_price = await self._estimate_entry_price_for_orphaned_position(symbol, actual_contracts_size, position_side) if estimated_entry_price > 0: final_entry_price = estimated_entry_price price_source_log = "(estimated)" else: logger.error(f"AUTO-SYNC: Could not determine/estimate entry price for {symbol}. Skipping sync.") continue logger.info(f"๐Ÿ”„ AUTO-SYNC: Orphaned position detected - {symbol} {position_side.upper()} {actual_contracts_size} @ ${final_entry_price:.4f} {price_source_log}") lifecycle_id = stats.create_trade_lifecycle( symbol=symbol, side=order_side, # side of the entry order entry_order_id=f"external_sync_{int(datetime.now().timestamp())}", trade_type='external_sync' # Specific type for auto-synced trades ) if lifecycle_id: success = stats.update_trade_position_opened( lifecycle_id, final_entry_price, actual_contracts_size, f"external_fill_sync_{int(datetime.now().timestamp())}" ) if success: synced_count += 1 logger.info(f"โœ… AUTO-SYNC: Successfully synced orphaned position for {symbol} (Lifecycle: {lifecycle_id[:8]}).") if self.notification_manager: unrealized_pnl = float(exchange_pos.get('unrealizedPnl', 0)) pnl_emoji = "๐ŸŸข" if unrealized_pnl >= 0 else "๐Ÿ”ด" notification_text = ( f"๐Ÿ”„ Position Auto-Synced\n\n" f"Token: {token}\n" f"Lifecycle ID: {lifecycle_id[:8]}...\n" f"Direction: {position_side.upper()}\n" f"Size: {actual_contracts_size:.6f} {token}\n" f"Entry Price: ${final_entry_price:,.4f} {price_source_log}\n" f"{pnl_emoji} P&L (Unrealized): ${unrealized_pnl:,.2f}\n" f"Reason: Position found on exchange without bot record.\n" f"Time: {datetime.now().strftime('%H:%M:%S')}\n\n" f"โœ… Position now tracked. Use /sl or /tp if needed." ) await self.notification_manager.send_generic_notification(notification_text) else: logger.error(f"โŒ AUTO-SYNC: Failed to update lifecycle to 'position_opened' for {symbol} (Lifecycle: {lifecycle_id[:8]}).") else: logger.error(f"โŒ AUTO-SYNC: Failed to create lifecycle for orphaned position {symbol}.") if synced_count > 0: logger.info(f"๐Ÿ”„ AUTO-SYNC: Synced {synced_count} orphaned position(s) this cycle (Exchange had position, Bot did not).") # --- NEW LOGIC: Bot thinks position is open, but exchange does not --- # bot_open_lifecycles = stats.get_trades_by_status('position_opened') if not bot_open_lifecycles: return # No open lifecycles according to the bot, nothing to check here. # Create a map of current exchange positions for quick lookup: symbol -> position_data current_exchange_positions_map = {} for ex_pos in (self.cached_positions or []): # Use cached, recently updated positions if ex_pos.get('symbol') and abs(float(ex_pos.get('contracts', 0))) > 1e-9: current_exchange_positions_map[ex_pos.get('symbol')] = ex_pos closed_due_to_discrepancy = 0 for lc in bot_open_lifecycles: symbol = lc.get('symbol') lc_id = lc.get('trade_lifecycle_id') token = symbol.split('/')[0] if '/' in symbol else symbol if symbol not in current_exchange_positions_map: # Bot has an open lifecycle, but no corresponding position found on exchange. logger.warning(f"๐Ÿ”„ AUTO-SYNC (Discrepancy): Bot lifecycle {lc_id} for {symbol} is 'position_opened', but NO position found on exchange. Closing lifecycle.") entry_price = lc.get('entry_price', 0) position_side = lc.get('position_side') position_size_for_pnl = lc.get('current_position_size', 0) exit_price_for_calc = 0 price_source_info = "unknown" # Attempt to find a recent closing fill from the exchange try: # Fetch all recent fills for the account, then filter by symbol all_recent_fills = self.trading_engine.get_recent_fills() # Increased limit slightly if all_recent_fills: symbol_specific_fills = [f for f in all_recent_fills if f.get('symbol') == symbol] if symbol_specific_fills: closing_side = 'sell' if position_side == 'long' else 'buy' relevant_fills = sorted( [f for f in symbol_specific_fills if f.get('side') == closing_side], # Already filtered by symbol key=lambda f: f.get('timestamp'), reverse=True # Most recent first ) if relevant_fills: last_closing_fill = relevant_fills[0] exit_price_for_calc = float(last_closing_fill.get('price', 0)) fill_timestamp = datetime.fromtimestamp(last_closing_fill.get('timestamp')/1000, tz=timezone.utc).isoformat() if last_closing_fill.get('timestamp') else "N/A" price_source_info = f"last exchange fill ({formatter.format_price(exit_price_for_calc, symbol)} @ {fill_timestamp})" logger.info(f"AUTO-SYNC: Using exit price {price_source_info} for {symbol} lifecycle {lc_id}.") except Exception as e: logger.warning(f"AUTO-SYNC: Error fetching recent fills for {symbol} to determine exit price: {e}") if not exit_price_for_calc or exit_price_for_calc <= 0: # Fallback to mark_price from lifecycle if available mark_price_from_lc = lc.get('mark_price') if mark_price_from_lc and float(mark_price_from_lc) > 0: exit_price_for_calc = float(mark_price_from_lc) price_source_info = "lifecycle mark_price" logger.info(f"AUTO-SYNC: No recent fill found. Using exit price from lifecycle mark_price: {formatter.format_price(exit_price_for_calc, symbol)} for {symbol} lifecycle {lc_id}.") else: # Last resort: use entry_price (implies 0 PNL for this closure action) exit_price_for_calc = entry_price price_source_info = "lifecycle entry_price (0 PNL)" logger.info(f"AUTO-SYNC: No recent fill or mark_price. Using entry_price: {formatter.format_price(exit_price_for_calc, symbol)} for {symbol} lifecycle {lc_id}.") realized_pnl = 0 if position_side == 'long': realized_pnl = position_size_for_pnl * (exit_price_for_calc - entry_price) elif position_side == 'short': realized_pnl = position_size_for_pnl * (entry_price - exit_price_for_calc) success = stats.update_trade_position_closed( lifecycle_id=lc_id, exit_price=exit_price_for_calc, realized_pnl=realized_pnl, exchange_fill_id=f"auto_sync_flat_{int(datetime.now().timestamp())}" ) if success: closed_due_to_discrepancy += 1 logger.info(f"โœ… AUTO-SYNC (Discrepancy): Successfully closed bot lifecycle {lc_id} for {symbol}.") # MIGRATE STATS stats._migrate_trade_to_aggregated_stats(lc_id) if self.notification_manager: pnl_emoji = "๐ŸŸข" if realized_pnl >= 0 else "๐Ÿ”ด" # formatter is already defined in the outer scope of _auto_sync_orphaned_positions notification_text = ( f"๐Ÿ”„ Position Auto-Closed (Discrepancy)\n\n" f"Token: {token}\n" f"Lifecycle ID: {lc_id[:8]}...\n" f"Reason: Bot showed open position, but no corresponding position found on exchange.\n" f"Assumed Exit Price: {formatter.format_price(exit_price_for_calc, symbol)}\n" f"{pnl_emoji} Realized P&L for this closure: {formatter.format_price_with_symbol(realized_pnl)}\n" f"Time: {datetime.now().strftime('%H:%M:%S')}\n\n" f"โ„น๏ธ Bot state synchronized with exchange." ) await self.notification_manager.send_generic_notification(notification_text) else: logger.error(f"โŒ AUTO-SYNC (Discrepancy): Failed to close bot lifecycle {lc_id} for {symbol}.") if closed_due_to_discrepancy > 0: logger.info(f"๐Ÿ”„ AUTO-SYNC: Closed {closed_due_to_discrepancy} lifecycle(s) due to discrepancy (Bot had position, Exchange did not).") except Exception as e: logger.error(f"โŒ Error in auto-sync orphaned positions: {e}", exc_info=True) async def _estimate_entry_price_for_orphaned_position(self, symbol: str, contracts: float, side: str) -> float: """Estimate entry price for an orphaned position by checking recent fills and market data.""" try: entry_fill_side = 'buy' if side == 'long' else 'sell' formatter = get_formatter() token = symbol.split('/')[0] if '/' in symbol else symbol all_recent_fills = self.trading_engine.get_recent_fills() # Removed symbol and limit arguments recent_fills = [f for f in all_recent_fills if f.get('symbol') == symbol] # Filter by symbol if recent_fills: symbol_side_fills = [ fill for fill in recent_fills if fill.get('symbol') == symbol and fill.get('side') == entry_fill_side and float(fill.get('amount',0)) > 0 ] if symbol_side_fills: symbol_side_fills.sort(key=lambda f: ( datetime.fromtimestamp(f.get('timestamp') / 1000, tz=timezone.utc) if f.get('timestamp') else datetime.min.replace(tzinfo=timezone.utc), abs(float(f.get('amount',0)) - contracts) ), reverse=True) best_fill = symbol_side_fills[0] fill_price = float(best_fill.get('price', 0)) fill_amount = float(best_fill.get('amount', 0)) if fill_price > 0: logger.info(f"๐Ÿ’ก AUTO-SYNC: Estimated entry for {side} {symbol} via recent {entry_fill_side} fill: {formatter.format_price_with_symbol(fill_price, token)} (Amount: {formatter.format_amount(fill_amount, token)})") return fill_price market_data = self.trading_engine.get_market_data(symbol) if market_data and market_data.get('ticker'): current_price = float(market_data['ticker'].get('last', 0)) if current_price > 0: logger.warning(f"โš ๏ธ AUTO-SYNC: Using current market price as entry estimate for {side} {symbol}: {formatter.format_price_with_symbol(current_price, token)}") return current_price if market_data and market_data.get('ticker'): bid = float(market_data['ticker'].get('bid', 0)) ask = float(market_data['ticker'].get('ask', 0)) if bid > 0 and ask > 0: return (bid + ask) / 2 logger.warning(f"AUTO-SYNC: Could not estimate entry price for {side} {symbol} through any method.") return 0.0 except Exception as e: logger.error(f"โŒ Error estimating entry price for orphaned position {symbol}: {e}", exc_info=True) return 0.0 async def _immediate_startup_auto_sync(self): """๐Ÿ†• Immediately check for and sync orphaned positions on startup.""" try: logger.info("๐Ÿ” STARTUP: Checking for orphaned positions...") stats = self.trading_engine.get_stats() if not stats: logger.warning("โš ๏ธ STARTUP: TradingStats not available for auto-sync.") return formatter = get_formatter() # Ensure formatter is available exchange_positions = self.trading_engine.get_positions() or [] if not exchange_positions: logger.info("โœ… STARTUP: No positions found on exchange.") return synced_count = 0 for exchange_pos in exchange_positions: symbol = exchange_pos.get('symbol') contracts_abs = abs(float(exchange_pos.get('contracts', 0))) token_for_log = symbol.split('/')[0] if symbol and '/' in symbol else symbol # Prepare token for logging if not (symbol and contracts_abs > 1e-9): continue existing_trade_lc = stats.get_trade_by_symbol_and_status(symbol, 'position_opened') if not existing_trade_lc: position_side, order_side = '', '' ccxt_side = exchange_pos.get('side', '').lower() if ccxt_side == 'long': position_side, order_side = 'long', 'buy' elif ccxt_side == 'short': position_side, order_side = 'short', 'sell' if not position_side: raw_info = exchange_pos.get('info', {}).get('position', {}) if isinstance(raw_info, dict): szi_str = raw_info.get('szi') if szi_str is not None: try: szi_val = float(szi_str) except ValueError: szi_val = 0 if szi_val > 1e-9: position_side, order_side = 'long', 'buy' elif szi_val < -1e-9: position_side, order_side = 'short', 'sell' if not position_side: contracts_val = float(exchange_pos.get('contracts',0)) if contracts_val > 1e-9: position_side, order_side = 'long', 'buy' elif contracts_val < -1e-9: position_side, order_side = 'short', 'sell' else: logger.warning(f"AUTO-SYNC: Position size is effectively 0 for {symbol} after side checks, skipping sync. Data: {exchange_pos}") continue if not position_side: logger.error(f"AUTO-SYNC: CRITICAL - Could not determine position side for {symbol}. Data: {exchange_pos}. Skipping.") continue entry_price = float(exchange_pos.get('entryPrice', 0)) or float(exchange_pos.get('entryPx', 0)) price_source_log = "(exchange data)" if not entry_price or entry_price <= 0: estimated_price = await self._estimate_entry_price_for_orphaned_position(symbol, contracts_abs, position_side) if estimated_price > 0: entry_price = estimated_price price_source_log = "(estimated)" else: logger.error(f"AUTO-SYNC: Could not determine/estimate entry price for {symbol}. Skipping sync.") continue logger.info(f"๐Ÿ”„ STARTUP: Auto-syncing orphaned position: {symbol} {position_side.upper()} {formatter.format_amount(contracts_abs, token_for_log)} @ {formatter.format_price_with_symbol(entry_price, token_for_log)} {price_source_log}") lifecycle_id = stats.create_trade_lifecycle( symbol=symbol, side=order_side, entry_order_id=f"startup_sync_{int(datetime.now().timestamp())}", trade_type='external_startup_sync' ) if lifecycle_id: success = stats.update_trade_position_opened( lifecycle_id, entry_price, contracts_abs, f"startup_fill_sync_{int(datetime.now().timestamp())}" ) if success: synced_count += 1 logger.info(f"โœ… STARTUP: Successfully synced orphaned position for {symbol} (Lifecycle: {lifecycle_id[:8]}).") await self._send_startup_auto_sync_notification(exchange_pos, symbol, position_side, contracts_abs, entry_price, lifecycle_id, price_source_log) else: logger.error(f"โŒ STARTUP: Failed to update lifecycle for {symbol} (Lifecycle: {lifecycle_id[:8]}).") else: logger.error(f"โŒ STARTUP: Failed to create lifecycle for {symbol}.") if synced_count == 0 and exchange_positions: logger.info("โœ… STARTUP: All existing exchange positions are already tracked.") elif synced_count > 0: logger.info(f"๐ŸŽ‰ STARTUP: Auto-synced {synced_count} orphaned position(s) (Exchange had pos, Bot did not).") # --- NEW LOGIC FOR STARTUP: Bot thinks position is open, but exchange does not --- # logger.info("๐Ÿ” STARTUP: Checking for discrepancies (Bot has pos, Exchange does not)...") bot_open_lifecycles = stats.get_trades_by_status('position_opened') # Create a map of current exchange positions for quick lookup: symbol -> position_data current_exchange_positions_map = {} for ex_pos in (exchange_positions or []): # Use the exchange_positions fetched at the start of this method if ex_pos.get('symbol') and abs(float(ex_pos.get('contracts', 0))) > 1e-9: current_exchange_positions_map[ex_pos.get('symbol')] = ex_pos closed_due_to_discrepancy_startup = 0 if bot_open_lifecycles: for lc in bot_open_lifecycles: symbol = lc.get('symbol') lc_id = lc.get('trade_lifecycle_id') token_for_log_discrepancy = symbol.split('/')[0] if symbol and '/' in symbol else symbol if symbol not in current_exchange_positions_map: logger.warning(f"๐Ÿ”„ STARTUP (Discrepancy): Bot lifecycle {lc_id} for {symbol} is 'position_opened', but NO position found on exchange. Closing lifecycle.") entry_price = lc.get('entry_price', 0) position_side = lc.get('position_side') position_size_for_pnl = lc.get('current_position_size', 0) exit_price_for_calc = 0 price_source_info = "unknown" try: # Fetch all recent fills, then filter by symbol all_recent_fills_for_startup_sync = self.trading_engine.get_recent_fills() # Fetch more to increase chance if all_recent_fills_for_startup_sync: symbol_specific_fills_startup = [f for f in all_recent_fills_for_startup_sync if f.get('symbol') == symbol] if symbol_specific_fills_startup: closing_side = 'sell' if position_side == 'long' else 'buy' relevant_fills = sorted( [f for f in symbol_specific_fills_startup if f.get('side') == closing_side], # Already filtered by symbol key=lambda f: f.get('timestamp'), reverse=True ) if relevant_fills: last_closing_fill = relevant_fills[0] exit_price_for_calc = float(last_closing_fill.get('price', 0)) fill_ts_val = last_closing_fill.get('timestamp') fill_timestamp_str = datetime.fromtimestamp(fill_ts_val/1000, tz=timezone.utc).isoformat() if fill_ts_val else "N/A" price_source_info = f"last exchange fill ({formatter.format_price(exit_price_for_calc, symbol)} @ {fill_timestamp_str})" logger.info(f"STARTUP SYNC: Using exit price {price_source_info} for {symbol} lifecycle {lc_id}.") except Exception as e: logger.warning(f"STARTUP SYNC: Error fetching recent fills for {symbol} to determine exit price: {e}") if not exit_price_for_calc or exit_price_for_calc <= 0: mark_price_from_lc = lc.get('mark_price') if mark_price_from_lc and float(mark_price_from_lc) > 0: exit_price_for_calc = float(mark_price_from_lc) price_source_info = "lifecycle mark_price" else: exit_price_for_calc = entry_price price_source_info = "lifecycle entry_price (0 PNL)" realized_pnl = 0 if position_side == 'long': realized_pnl = position_size_for_pnl * (exit_price_for_calc - entry_price) elif position_side == 'short': realized_pnl = position_size_for_pnl * (entry_price - exit_price_for_calc) success_close = stats.update_trade_position_closed( lifecycle_id=lc_id, exit_price=exit_price_for_calc, realized_pnl=realized_pnl, exchange_fill_id=f"startup_sync_flat_{int(datetime.now().timestamp())}" ) if success_close: closed_due_to_discrepancy_startup += 1 logger.info(f"โœ… STARTUP (Discrepancy): Successfully closed bot lifecycle {lc_id} for {symbol}.") # MIGRATE STATS stats._migrate_trade_to_aggregated_stats(lc_id) if self.notification_manager: pnl_emoji = "๐ŸŸข" if realized_pnl >= 0 else "๐Ÿ”ด" notification_text = ( f"๐Ÿ”„ Position Auto-Closed (Startup Sync)\n\n" f"Token: {token_for_log_discrepancy}\n" f"Lifecycle ID: {lc_id[:8]}...\n" f"Reason: Bot startup - found open lifecycle, but no corresponding position on exchange.\n" f"Assumed Exit Price: {formatter.format_price(exit_price_for_calc, symbol)} (Source: {price_source_info})\n" f"{pnl_emoji} Realized P&L: {formatter.format_price_with_symbol(realized_pnl)}\n" f"Time: {datetime.now().strftime('%H:%M:%S')}" ) await self.notification_manager.send_generic_notification(notification_text) else: logger.error(f"โŒ STARTUP (Discrepancy): Failed to close bot lifecycle {lc_id} for {symbol}.") if closed_due_to_discrepancy_startup > 0: logger.info(f"๐ŸŽ‰ STARTUP: Auto-closed {closed_due_to_discrepancy_startup} lifecycle(s) due to discrepancy (Bot had pos, Exchange did not).") else: logger.info("โœ… STARTUP: No discrepancies found where bot had position and exchange did not.") except Exception as e: logger.error(f"โŒ Error in startup auto-sync: {e}", exc_info=True) async def _send_startup_auto_sync_notification(self, exchange_pos, symbol, position_side, contracts, entry_price, lifecycle_id, price_source_log): """Send notification for positions auto-synced on startup.""" try: if not self.notification_manager: return formatter = get_formatter() token = symbol.split('/')[0] if '/' in symbol else symbol unrealized_pnl = float(exchange_pos.get('unrealizedPnl', 0)) pnl_emoji = "๐ŸŸข" if unrealized_pnl >= 0 else "๐Ÿ”ด" size_str = formatter.format_amount(contracts, token) entry_price_str = formatter.format_price_with_symbol(entry_price, token) pnl_str = formatter.format_price_with_symbol(unrealized_pnl) notification_text_parts = [ f"๐Ÿšจ Bot Startup: Position Auto-Synced\n", f"Token: {token}", f"Lifecycle ID: {lifecycle_id[:8]}...", f"Direction: {position_side.upper()}", f"Size: {size_str} {token}", f"Entry Price: {entry_price_str} {price_source_log}", f"{pnl_emoji} P&L (Unrealized): {pnl_str}", f"Reason: Position found on exchange without bot record.", # f"Time: {datetime.now().strftime('%H:%M:%S')}", # Time is in the main header of notification usually "\nโœ… Position now tracked. Use /sl or /tp if needed." ] liq_price = float(exchange_pos.get('liquidationPrice', 0)) if liq_price > 0: liq_price_str = formatter.format_price_with_symbol(liq_price, token) notification_text_parts.append(f"โš ๏ธ Liquidation: {liq_price_str}") # Combined details into the main block # notification_text_parts.append("\n๐Ÿ“ Discovered on bot startup") # notification_text_parts.append(f"โฐ Time: {datetime.now().strftime('%H:%M:%S')}") # notification_text_parts.append("\nโœ… Position now tracked. Use /sl or /tp if needed.") await self.notification_manager.send_generic_notification("\n".join(notification_text_parts)) logger.info(f"๐Ÿ“ค STARTUP: Sent auto-sync notification for {symbol} (Lifecycle: {lifecycle_id[:8]}).") except Exception as e: logger.error(f"โŒ STARTUP: Failed to send auto-sync notification for {symbol}: {e}") # Note: The _activate_pending_stop_losses method was intentionally removed # as its functionality is now covered by _activate_pending_stop_losses_from_trades # and _check_pending_triggers, driven by the Trade Lifecycle.