#!/usr/bin/env python3 """ Market Monitor - Handles external trade monitoring and heartbeat functionality. """ import logging import asyncio from datetime import datetime, timedelta, timezone from typing import Optional, Dict, Any, List import os import json from src.config.config import Config from src.monitoring.alarm_manager import AlarmManager logger = logging.getLogger(__name__) class MarketMonitor: """Handles external trade monitoring and market events.""" def __init__(self, trading_engine, notification_manager=None): """Initialize the market monitor.""" self.trading_engine = trading_engine self.notification_manager = notification_manager self.client = trading_engine.client self._last_processed_trade_time = datetime.now(timezone.utc) - timedelta(seconds=120) self._monitoring_active = False # ๐Ÿ†• External stop loss tracking self._external_stop_loss_orders = {} # Format: {exchange_order_id: {'token': str, 'trigger_price': float, 'side': str, 'detected_at': datetime}} # External trade monitoring # self.state_file = "data/market_monitor_state.json" # Removed, state now in DB self.last_processed_trade_time: Optional[datetime] = None # Alarm management self.alarm_manager = AlarmManager() # Order monitoring self.last_known_orders = set() self.last_known_positions = {} self._load_state() async def start(self): """Start the market monitor.""" if self._monitoring_active: return self._monitoring_active = True logger.info("๐Ÿ”„ Market monitor started") # Initialize tracking await self._initialize_tracking() # Start monitoring task self._monitor_task = asyncio.create_task(self._monitor_loop()) async def stop(self): """Stop the market monitor.""" if not self._monitoring_active: return self._monitoring_active = False if self._monitor_task: self._monitor_task.cancel() try: await self._monitor_task except asyncio.CancelledError: pass self._save_state() logger.info("๐Ÿ›‘ Market monitor stopped") def _load_state(self): """Load market monitor state from SQLite DB via TradingStats.""" stats = self.trading_engine.get_stats() if not stats: logger.warning("โš ๏ธ TradingStats not available, cannot load MarketMonitor state.") self.last_processed_trade_time = None return try: last_time_str = stats._get_metadata('market_monitor_last_processed_trade_time') if last_time_str: self.last_processed_trade_time = datetime.fromisoformat(last_time_str) # Ensure it's timezone-aware (UTC) if self.last_processed_trade_time.tzinfo is None: self.last_processed_trade_time = self.last_processed_trade_time.replace(tzinfo=timezone.utc) else: self.last_processed_trade_time = self.last_processed_trade_time.astimezone(timezone.utc) logger.info(f"๐Ÿ”„ Loaded MarketMonitor state from DB: last_processed_trade_time = {self.last_processed_trade_time.isoformat()}") else: logger.info("๐Ÿ’จ No MarketMonitor state (last_processed_trade_time) found in DB. Will start with fresh external trade tracking.") self.last_processed_trade_time = None except Exception as e: logger.error(f"Error loading MarketMonitor state from DB: {e}. Proceeding with default state.") self.last_processed_trade_time = None def _save_state(self): """Save market monitor state to SQLite DB via TradingStats.""" stats = self.trading_engine.get_stats() if not stats: logger.warning("โš ๏ธ TradingStats not available, cannot save MarketMonitor state.") return try: if self.last_processed_trade_time: # Ensure timestamp is UTC before saving lptt_utc = self.last_processed_trade_time if lptt_utc.tzinfo is None: lptt_utc = lptt_utc.replace(tzinfo=timezone.utc) else: lptt_utc = lptt_utc.astimezone(timezone.utc) stats._set_metadata('market_monitor_last_processed_trade_time', lptt_utc.isoformat()) logger.info(f"๐Ÿ’พ Saved MarketMonitor state (last_processed_trade_time) to DB: {lptt_utc.isoformat()}") else: # If it's None, we might want to remove the key or save it as an empty string # For now, let's assume we only save if there is a time. Or remove it. stats._set_metadata('market_monitor_last_processed_trade_time', '') # Or handle deletion logger.info("๐Ÿ’พ MarketMonitor state (last_processed_trade_time) is None, saved as empty in DB.") except Exception as e: logger.error(f"Error saving MarketMonitor state to DB: {e}") async def _initialize_tracking(self): """Initialize order and position tracking.""" try: # Get current open orders to initialize tracking orders = self.trading_engine.get_orders() if orders: self.last_known_orders = {order.get('id') for order in orders if order.get('id')} logger.info(f"๐Ÿ“‹ Initialized tracking with {len(self.last_known_orders)} open orders") # Get current positions for P&L tracking positions = self.trading_engine.get_positions() if positions: for position in positions: symbol = position.get('symbol') contracts = float(position.get('contracts', 0)) entry_price = float(position.get('entryPx', 0)) if symbol and contracts != 0: self.last_known_positions[symbol] = { 'contracts': contracts, 'entry_price': entry_price } logger.info(f"๐Ÿ“Š Initialized tracking with {len(self.last_known_positions)} positions") except Exception as e: logger.error(f"โŒ Error initializing tracking: {e}") async def _monitor_loop(self): """Main monitoring loop that runs every BOT_HEARTBEAT_SECONDS.""" try: loop_count = 0 while self._monitoring_active: # ๐Ÿ†• PHASE 2: Check active trades for pending stop loss activation first (highest priority) await self._activate_pending_stop_losses_from_active_trades() await self._check_order_fills() await self._check_price_alarms() await self._check_external_trades() await self._check_pending_triggers() await self._check_automatic_risk_management() await self._check_external_stop_loss_orders() # Run orphaned stop loss cleanup every 10 heartbeats (less frequent but regular) loop_count += 1 if loop_count % 10 == 0: await self._cleanup_orphaned_stop_losses() await self._cleanup_external_stop_loss_tracking() loop_count = 0 # Reset counter to prevent overflow await asyncio.sleep(Config.BOT_HEARTBEAT_SECONDS) except asyncio.CancelledError: logger.info("Market monitor loop cancelled") raise except Exception as e: logger.error(f"Error in market monitor loop: {e}") # Restart after error if self._monitoring_active: await asyncio.sleep(5) await self._monitor_loop() async def _check_order_fills(self): """Check for filled orders and send notifications.""" try: # Get current orders and positions current_orders = self.trading_engine.get_orders() or [] current_positions = self.trading_engine.get_positions() or [] # Get current order IDs current_order_ids = {order.get('id') for order in current_orders if order.get('id')} # Find filled orders (orders that were in last_known_orders but not in current_orders) disappeared_order_ids = self.last_known_orders - current_order_ids if disappeared_order_ids: logger.info(f"๐ŸŽฏ Detected {len(disappeared_order_ids)} bot orders no longer open: {list(disappeared_order_ids)}. Corresponding fills (if any) are processed by external trade checker.") await self._process_disappeared_orders(disappeared_order_ids) # Update tracking data for open bot orders self.last_known_orders = current_order_ids # Position state is primarily managed by TradingStats based on all fills. # This local tracking can provide supplementary logging if needed. # await self._update_position_tracking(current_positions) except Exception as e: logger.error(f"โŒ Error checking order fills: {e}") async def _process_filled_orders(self, filled_order_ids: set, current_positions: list): """Process filled orders using enhanced position tracking.""" try: # For bot-initiated orders, we'll detect changes in position size # and send appropriate notifications using the enhanced system # This method will be triggered when orders placed through the bot are filled # The external trade monitoring will handle trades made outside the bot # Update position tracking based on current positions await self._update_position_tracking(current_positions) except Exception as e: logger.error(f"โŒ Error processing filled orders: {e}") async def _update_position_tracking(self, current_positions: list): """Update position tracking and calculate P&L changes.""" try: new_position_map = {} for position in current_positions: symbol = position.get('symbol') contracts = float(position.get('contracts', 0)) entry_price = float(position.get('entryPx', 0)) if symbol and contracts != 0: new_position_map[symbol] = { 'contracts': contracts, 'entry_price': entry_price } # Compare with previous positions to detect changes for symbol, new_data in new_position_map.items(): old_data = self.last_known_positions.get(symbol) if not old_data: # New position opened logger.info(f"๐Ÿ“ˆ New position detected (observed by MarketMonitor): {symbol} {new_data['contracts']} @ ${new_data['entry_price']:.4f}. TradingStats is the definitive source.") elif abs(new_data['contracts'] - old_data['contracts']) > 0.000001: # Position size changed change = new_data['contracts'] - old_data['contracts'] logger.info(f"๐Ÿ“Š Position change detected (observed by MarketMonitor): {symbol} {change:+.6f} contracts. TradingStats is the definitive source.") # Check for closed positions for symbol in self.last_known_positions: if symbol not in new_position_map: logger.info(f"๐Ÿ“‰ Position closed (observed by MarketMonitor): {symbol}. TradingStats is the definitive source.") # Update tracking self.last_known_positions = new_position_map except Exception as e: logger.error(f"โŒ Error updating position tracking: {e}") async def _process_disappeared_orders(self, disappeared_order_ids: set): """Log and investigate bot orders that have disappeared from the exchange.""" stats = self.trading_engine.get_stats() if not stats: logger.warning("โš ๏ธ TradingStats not available in _process_disappeared_orders.") return try: total_linked_cancelled = 0 external_cancellations = [] for exchange_oid in disappeared_order_ids: order_in_db = stats.get_order_by_exchange_id(exchange_oid) if order_in_db: last_status = order_in_db.get('status', 'unknown') order_type = order_in_db.get('type', 'unknown') symbol = order_in_db.get('symbol', 'unknown') token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0] logger.info(f"Order {exchange_oid} was in our DB with status '{last_status}' but has now disappeared from exchange.") # Check if this was an unexpected disappearance (likely external cancellation) active_statuses = ['open', 'submitted', 'partially_filled', 'pending_submission'] if last_status in active_statuses: logger.warning(f"โš ๏ธ EXTERNAL CANCELLATION: Order {exchange_oid} with status '{last_status}' was likely cancelled externally on Hyperliquid") stats.update_order_status(exchange_order_id=exchange_oid, new_status='cancelled_externally') # Track external cancellations for notification external_cancellations.append({ 'exchange_oid': exchange_oid, 'token': token, 'type': order_type, 'last_status': last_status }) # Send notification about external cancellation if self.notification_manager: await self.notification_manager.send_generic_notification( f"โš ๏ธ External Order Cancellation Detected\n\n" f"Token: {token}\n" f"Order Type: {order_type.replace('_', ' ').title()}\n" f"Exchange Order ID: {exchange_oid[:8]}...\n" f"Previous Status: {last_status.replace('_', ' ').title()}\n" f"Source: Cancelled directly on Hyperliquid\n" f"Time: {datetime.now().strftime('%H:%M:%S')}\n\n" f"๐Ÿค– Bot status updated automatically" ) # ๐Ÿ”ง EDGE CASE FIX: Wait briefly before cancelling stop losses # Sometimes orders are cancelled externally but fills come through simultaneously logger.info(f"โณ Waiting 3 seconds to check for potential fills before cancelling stop losses for {exchange_oid}") await asyncio.sleep(3) # Re-check the order status after waiting - it might have been filled order_in_db_updated = stats.get_order_by_exchange_id(exchange_oid) if order_in_db_updated and order_in_db_updated.get('status') in ['filled', 'partially_filled']: logger.info(f"โœ… Order {exchange_oid} was filled during the wait period - NOT cancelling stop losses") # Don't cancel stop losses - let them be activated normally continue # Additional check: Look for very recent fills that might match this order recent_fill_found = await self._check_for_recent_fills_for_order(exchange_oid, order_in_db) if recent_fill_found: logger.info(f"โœ… Found recent fill for order {exchange_oid} - NOT cancelling stop losses") continue else: # Normal completion/cancellation - update status stats.update_order_status(exchange_order_id=exchange_oid, new_status='disappeared_from_exchange') # Cancel any pending stop losses linked to this order (only if not filled) if order_in_db.get('bot_order_ref_id'): # Double-check one more time that the order wasn't filled final_order_check = stats.get_order_by_exchange_id(exchange_oid) if final_order_check and final_order_check.get('status') in ['filled', 'partially_filled']: logger.info(f"๐Ÿ›‘ Order {exchange_oid} was filled - preserving stop losses") continue cancelled_sl_count = stats.cancel_linked_orders( parent_bot_order_ref_id=order_in_db['bot_order_ref_id'], new_status='cancelled_parent_disappeared' ) total_linked_cancelled += cancelled_sl_count if cancelled_sl_count > 0: logger.info(f"Cancelled {cancelled_sl_count} pending stop losses linked to disappeared order {exchange_oid}") if self.notification_manager: await self.notification_manager.send_generic_notification( f"๐Ÿ›‘ Linked Stop Losses Cancelled\n\n" f"Token: {token}\n" f"Cancelled: {cancelled_sl_count} stop loss(es)\n" f"Reason: Parent order {exchange_oid[:8]}... disappeared\n" f"Likely Cause: External cancellation on Hyperliquid\n" f"Time: {datetime.now().strftime('%H:%M:%S')}" ) else: logger.warning(f"Order {exchange_oid} disappeared from exchange but was not found in our DB. This might be an order placed externally.") # Send summary notification if multiple external cancellations occurred if len(external_cancellations) > 1: tokens_affected = list(set(item['token'] for item in external_cancellations)) if self.notification_manager: await self.notification_manager.send_generic_notification( f"โš ๏ธ Multiple External Cancellations Detected\n\n" f"Orders Cancelled: {len(external_cancellations)}\n" f"Tokens Affected: {', '.join(tokens_affected)}\n" f"Source: Direct cancellation on Hyperliquid\n" f"Linked Stop Losses Cancelled: {total_linked_cancelled}\n" f"Time: {datetime.now().strftime('%H:%M:%S')}\n\n" f"๐Ÿ’ก Check individual orders for details" ) except Exception as e: logger.error(f"โŒ Error processing disappeared orders: {e}", exc_info=True) async def _check_price_alarms(self): """Check price alarms and trigger notifications.""" try: active_alarms = self.alarm_manager.get_all_active_alarms() if not active_alarms: return # Group alarms by token to minimize API calls tokens_to_check = list(set(alarm['token'] for alarm in active_alarms)) for token in tokens_to_check: try: # Get current market price symbol = f"{token}/USDC:USDC" market_data = self.trading_engine.get_market_data(symbol) if not market_data or not market_data.get('ticker'): continue current_price = float(market_data['ticker'].get('last', 0)) if current_price <= 0: continue # Check alarms for this token token_alarms = [alarm for alarm in active_alarms if alarm['token'] == token] for alarm in token_alarms: target_price = alarm['target_price'] direction = alarm['direction'] # Check if alarm should trigger should_trigger = False if direction == 'above' and current_price >= target_price: should_trigger = True elif direction == 'below' and current_price <= target_price: should_trigger = True if should_trigger: # Trigger the alarm triggered_alarm = self.alarm_manager.trigger_alarm(alarm['id'], current_price) if triggered_alarm: await self._send_alarm_notification(triggered_alarm) except Exception as e: logger.error(f"Error checking alarms for {token}: {e}") except Exception as e: logger.error(f"โŒ Error checking price alarms: {e}") async def _send_alarm_notification(self, alarm: Dict[str, Any]): """Send notification for triggered alarm.""" try: # Send through notification manager if available if self.notification_manager: await self.notification_manager.send_alarm_triggered_notification( alarm['token'], alarm['target_price'], alarm['triggered_price'], alarm['direction'] ) else: # Fallback to logging if notification manager not available logger.info(f"๐Ÿ”” ALARM TRIGGERED: {alarm['token']} @ ${alarm['triggered_price']:,.2f}") except Exception as e: logger.error(f"โŒ Error sending alarm notification: {e}") async def _check_external_trades(self): """Check for trades made outside the Telegram bot and update stats.""" try: # Get recent fills from exchange recent_fills = self.trading_engine.get_recent_fills() if not recent_fills: logger.debug("No recent fills data available") return # Get last processed timestamp from database if not hasattr(self, '_last_processed_trade_time') or self._last_processed_trade_time is None: try: last_time_str = self.trading_engine.stats._get_metadata('last_processed_trade_time') if last_time_str: self._last_processed_trade_time = datetime.fromisoformat(last_time_str) logger.debug(f"Loaded last_processed_trade_time from DB: {self._last_processed_trade_time}") else: # If no last processed time, start from 1 hour ago to avoid processing too much history self._last_processed_trade_time = datetime.now(timezone.utc) - timedelta(hours=1) logger.info("No last_processed_trade_time found, setting to 1 hour ago (UTC).") except Exception as e: logger.warning(f"Could not load last_processed_trade_time from DB: {e}") self._last_processed_trade_time = datetime.now(timezone.utc) - timedelta(hours=1) # Process new fills external_trades_processed = 0 symbols_with_fills = set() # Track symbols that had fills for stop loss activation for fill in recent_fills: try: # Parse fill data - CCXT format from fetch_my_trades trade_id = fill.get('id') # CCXT uses 'id' for trade ID timestamp_ms = fill.get('timestamp') # CCXT uses 'timestamp' (milliseconds) symbol = fill.get('symbol') # CCXT uses 'symbol' in full format like 'LTC/USDC:USDC' side = fill.get('side') # CCXT uses 'side' ('buy' or 'sell') amount = float(fill.get('amount', 0)) # CCXT uses 'amount' price = float(fill.get('price', 0)) # CCXT uses 'price' # Convert timestamp if timestamp_ms: timestamp_dt = datetime.fromtimestamp(timestamp_ms / 1000, tz=timezone.utc) else: timestamp_dt = datetime.now(timezone.utc) # Skip if already processed if timestamp_dt <= self._last_processed_trade_time: continue # Process as external trade if we reach here if symbol and side and amount > 0 and price > 0: # Symbol is already in full format for CCXT full_symbol = symbol token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0] # Check if this might be a bot order fill by looking for exchange order ID # CCXT might have this in 'info' sub-object with the raw exchange data exchange_order_id_from_fill = None if 'info' in fill and isinstance(fill['info'], dict): # Look for Hyperliquid order ID in the raw response exchange_order_id_from_fill = fill['info'].get('oid') # ๐Ÿ†• Check if this fill corresponds to an external stop loss order is_external_stop_loss = False stop_loss_info = None if exchange_order_id_from_fill and exchange_order_id_from_fill in self._external_stop_loss_orders: is_external_stop_loss = True stop_loss_info = self._external_stop_loss_orders[exchange_order_id_from_fill] logger.info(f"๐Ÿ›‘ EXTERNAL STOP LOSS EXECUTION: {token} - Order {exchange_order_id_from_fill} filled @ ${price:.2f}") logger.info(f"๐Ÿ” Processing {'external stop loss' if is_external_stop_loss else 'external trade'}: {trade_id} - {side} {amount} {full_symbol} @ ${price:.2f}") stats = self.trading_engine.stats if not stats: logger.warning("โš ๏ธ TradingStats not available in _check_external_trades.") continue # If this is an external stop loss execution, handle it specially if is_external_stop_loss and stop_loss_info: # ๐Ÿงน PHASE 3: Close active trade for stop loss execution active_trade = stats.get_active_trade_by_symbol(full_symbol, status='active') if active_trade: entry_price = active_trade.get('entry_price', 0) active_trade_side = active_trade.get('side') # Calculate realized P&L if active_trade_side == 'buy': # Long position realized_pnl = amount * (price - entry_price) else: # Short position realized_pnl = amount * (entry_price - price) stats.update_active_trade_closed( active_trade['id'], realized_pnl, timestamp_dt.isoformat() ) logger.info(f"๐Ÿ›‘ Active trade {active_trade['id']} closed via external stop loss - P&L: ${realized_pnl:.2f}") # Send specialized stop loss execution notification if self.notification_manager: await self.notification_manager.send_stop_loss_execution_notification( stop_loss_info, full_symbol, side, amount, price, 'long_closed', timestamp_dt.isoformat() ) # Remove from tracking since it's now executed del self._external_stop_loss_orders[exchange_order_id_from_fill] logger.info(f"๐Ÿ›‘ Processed external stop loss execution: {side} {amount} {full_symbol} @ ${price:.2f} (long_closed)") else: # Handle as regular external trade # Check if this corresponds to a bot order by exchange_order_id linked_order_db_id = None if exchange_order_id_from_fill: order_in_db = stats.get_order_by_exchange_id(exchange_order_id_from_fill) if order_in_db: linked_order_db_id = order_in_db.get('id') logger.info(f"๐Ÿ”— Linked external fill {trade_id} to bot order DB ID {linked_order_db_id} (Exchange OID: {exchange_order_id_from_fill})") # Update order status to filled if it was open current_status = order_in_db.get('status', '') if current_status in ['open', 'partially_filled', 'pending_submission']: # Determine if this is a partial or full fill order_amount_requested = float(order_in_db.get('amount_requested', 0)) if abs(amount - order_amount_requested) < 0.000001: # Allow small floating point differences new_status_after_fill = 'filled' else: new_status_after_fill = 'partially_filled' stats.update_order_status( order_db_id=linked_order_db_id, new_status=new_status_after_fill ) logger.info(f"๐Ÿ“Š Updated bot order {linked_order_db_id} status: {current_status} โ†’ {new_status_after_fill}") # Check if this order is now fully filled and has pending stop losses to activate if new_status_after_fill == 'filled': await self._activate_pending_stop_losses(order_in_db, stats) # ๐Ÿงน PHASE 3: Record trade simply, use active_trades for tracking stats.record_trade( full_symbol, side, amount, price, exchange_fill_id=trade_id, trade_type="external", timestamp=timestamp_dt.isoformat(), linked_order_table_id_to_link=linked_order_db_id ) # Derive action type from trade context for notifications if linked_order_db_id: # Bot order - determine action from order context order_side = order_in_db.get('side', side).lower() if order_side == 'buy': action_type = 'long_opened' elif order_side == 'sell': action_type = 'short_opened' else: action_type = 'position_opened' else: # External trade - determine from current active trades existing_active_trade = stats.get_active_trade_by_symbol(full_symbol, status='active') if existing_active_trade: # Has active position - this is likely a closure existing_side = existing_active_trade.get('side') if existing_side == 'buy' and side.lower() == 'sell': action_type = 'long_closed' elif existing_side == 'sell' and side.lower() == 'buy': action_type = 'short_closed' else: action_type = 'position_modified' else: # No active position - this opens a new one if side.lower() == 'buy': action_type = 'long_opened' else: action_type = 'short_opened' # ๐Ÿงน PHASE 3: Update active trades based on action if linked_order_db_id: # Bot order - update linked active trade order_data = stats.get_order_by_db_id(linked_order_db_id) if order_data: exchange_order_id = order_data.get('exchange_order_id') # Find active trade by entry order ID all_active_trades = stats.get_all_active_trades() for at in all_active_trades: if at.get('entry_order_id') == exchange_order_id: active_trade_id = at['id'] current_status = at['status'] if current_status == 'pending' and action_type in ['long_opened', 'short_opened']: # Entry order filled - update active trade to active stats.update_active_trade_opened( active_trade_id, price, amount, timestamp_dt.isoformat() ) logger.info(f"๐Ÿ†• Active trade {active_trade_id} opened via fill {trade_id}") elif current_status == 'active' and action_type in ['long_closed', 'short_closed']: # Exit order filled - calculate P&L and close active trade entry_price = at.get('entry_price', 0) active_trade_side = at.get('side') # Calculate realized P&L if active_trade_side == 'buy': # Long position realized_pnl = amount * (price - entry_price) else: # Short position realized_pnl = amount * (entry_price - price) stats.update_active_trade_closed( active_trade_id, realized_pnl, timestamp_dt.isoformat() ) logger.info(f"๐Ÿ†• Active trade {active_trade_id} closed via fill {trade_id} - P&L: ${realized_pnl:.2f}") break elif action_type in ['long_opened', 'short_opened']: # External trade that opened a position - create external active trade active_trade_id = stats.create_active_trade( symbol=full_symbol, side=side.lower(), entry_order_id=None, # External order trade_type='external' ) if active_trade_id: stats.update_active_trade_opened( active_trade_id, price, amount, timestamp_dt.isoformat() ) logger.info(f"๐Ÿ†• Created external active trade {active_trade_id} for {side.upper()} {full_symbol}") elif action_type in ['long_closed', 'short_closed']: # External closure - close any active trade for this symbol active_trade = stats.get_active_trade_by_symbol(full_symbol, status='active') if active_trade: entry_price = active_trade.get('entry_price', 0) active_trade_side = active_trade.get('side') # Calculate realized P&L if active_trade_side == 'buy': # Long position realized_pnl = amount * (price - entry_price) else: # Short position realized_pnl = amount * (entry_price - price) stats.update_active_trade_closed( active_trade['id'], realized_pnl, timestamp_dt.isoformat() ) logger.info(f"๐Ÿ†• External closure: Active trade {active_trade['id']} closed - P&L: ${realized_pnl:.2f}") # Track symbol for potential stop loss activation symbols_with_fills.add(token) # Send notification for external trade if self.notification_manager: await self.notification_manager.send_external_trade_notification( full_symbol, side, amount, price, action_type, timestamp_dt.isoformat() ) logger.info(f"๐Ÿ“‹ Processed external trade: {side} {amount} {full_symbol} @ ${price:.2f} ({action_type}) using timestamp {timestamp_dt.isoformat()}") external_trades_processed += 1 # Update last processed time self._last_processed_trade_time = timestamp_dt except Exception as e: logger.error(f"Error processing fill {fill}: {e}") continue # Save the last processed timestamp to database if external_trades_processed > 0: self.trading_engine.stats._set_metadata('last_processed_trade_time', self._last_processed_trade_time.isoformat()) logger.info(f"๐Ÿ’พ Saved MarketMonitor state (last_processed_trade_time) to DB: {self._last_processed_trade_time.isoformat()}") logger.info(f"๐Ÿ“Š Processed {external_trades_processed} external trades") # Additional check: Activate any pending stop losses for symbols that had fills # This is a safety net for cases where the fill linking above didn't work await self._check_pending_stop_losses_for_filled_symbols(symbols_with_fills) except Exception as e: logger.error(f"โŒ Error checking external trades: {e}", exc_info=True) async def _check_pending_stop_losses_for_filled_symbols(self, symbols_with_fills: set): """ Safety net: Check if any symbols that just had fills have pending stop losses that should be activated. This handles cases where fill linking failed. """ try: if not symbols_with_fills: return stats = self.trading_engine.stats if not stats: return # Get all pending stop losses pending_stop_losses = stats.get_orders_by_status('pending_trigger', 'stop_limit_trigger') if not pending_stop_losses: return # Check each pending stop loss to see if its symbol had fills activated_any = False for sl_order in pending_stop_losses: symbol = sl_order.get('symbol', '') token = symbol.split('/')[0] if '/' in symbol else '' if token in symbols_with_fills: # This symbol had fills - check if we should activate the stop loss parent_bot_ref_id = sl_order.get('parent_bot_order_ref_id') if parent_bot_ref_id: # Get the parent order parent_order = stats.get_order_by_bot_ref_id(parent_bot_ref_id) if parent_order and parent_order.get('status') in ['filled', 'partially_filled']: logger.info(f"๐Ÿ›‘ Safety net activation: Found pending SL for {token} with filled parent order {parent_bot_ref_id}") await self._activate_pending_stop_losses(parent_order, stats) activated_any = True if activated_any: logger.info("โœ… Safety net activated pending stop losses for symbols with recent fills") except Exception as e: logger.error(f"Error in safety net stop loss activation: {e}", exc_info=True) async def _check_pending_triggers(self): """Check and process pending conditional triggers (e.g., SL/TP).""" stats = self.trading_engine.get_stats() if not stats: logger.warning("โš ๏ธ TradingStats not available in _check_pending_triggers.") return try: # Fetch pending SL triggers (adjust type if TP triggers are different) # For now, assuming 'STOP_LIMIT_TRIGGER' is the type used for SLs that become limit orders pending_sl_triggers = stats.get_orders_by_status(status='pending_trigger', order_type_filter='stop_limit_trigger') if not pending_sl_triggers: return logger.debug(f"Found {len(pending_sl_triggers)} pending SL triggers to check.") for trigger_order in pending_sl_triggers: symbol = trigger_order['symbol'] trigger_price = trigger_order['price'] # This is the stop price trigger_side = trigger_order['side'] # This is the side of the SL order (e.g., sell for a long position's SL) order_db_id = trigger_order['id'] parent_ref_id = trigger_order.get('parent_bot_order_ref_id') if not symbol or trigger_price is None: logger.warning(f"Invalid trigger order data for DB ID {order_db_id}, skipping: {trigger_order}") continue market_data = self.trading_engine.get_market_data(symbol) if not market_data or not market_data.get('ticker'): logger.warning(f"Could not fetch market data for {symbol} to check SL trigger {order_db_id}.") continue current_price = float(market_data['ticker'].get('last', 0)) if current_price <= 0: logger.warning(f"Invalid current price ({current_price}) for {symbol} checking SL trigger {order_db_id}.") continue trigger_hit = False if trigger_side.lower() == 'sell' and current_price <= trigger_price: trigger_hit = True logger.info(f"๐Ÿ”ด SL TRIGGER HIT (Sell): Order DB ID {order_db_id}, Symbol {symbol}, Trigger@ ${trigger_price:.4f}, Market@ ${current_price:.4f}") elif trigger_side.lower() == 'buy' and current_price >= trigger_price: trigger_hit = True logger.info(f"๐ŸŸข SL TRIGGER HIT (Buy): Order DB ID {order_db_id}, Symbol {symbol}, Trigger@ ${trigger_price:.4f}, Market@ ${current_price:.4f}") if trigger_hit: logger.info(f"Attempting to execute actual stop order for triggered DB ID: {order_db_id} (Parent Bot Ref: {trigger_order.get('parent_bot_order_ref_id')})") execution_result = await self.trading_engine.execute_triggered_stop_order(original_trigger_order_db_id=order_db_id) notification_message_detail = "" if execution_result.get("success"): new_trigger_status = 'triggered_order_placed' placed_sl_details = execution_result.get("placed_sl_order_details", {}) logger.info(f"Successfully placed actual SL order from trigger {order_db_id}. New SL Order DB ID: {placed_sl_details.get('order_db_id')}, Exchange ID: {placed_sl_details.get('exchange_order_id')}") notification_message_detail = f"Actual SL order placed (New DB ID: {placed_sl_details.get('order_db_id', 'N/A')})." else: new_trigger_status = 'trigger_execution_failed' error_msg = execution_result.get("error", "Unknown error during SL execution.") logger.error(f"Failed to execute actual SL order from trigger {order_db_id}: {error_msg}") notification_message_detail = f"Failed to place actual SL order: {error_msg}" stats.update_order_status(order_db_id=order_db_id, new_status=new_trigger_status) if self.notification_manager: await self.notification_manager.send_generic_notification( f"๐Ÿ”” Stop-Loss Update!\nSymbol: {symbol}\nSide: {trigger_side.upper()}\nTrigger Price: ${trigger_price:.4f}\nMarket Price: ${current_price:.4f}\n(Original Trigger DB ID: {order_db_id}, Parent: {parent_ref_id or 'N/A'})\nStatus: {new_trigger_status.replace('_', ' ').title()}\nDetails: {notification_message_detail}" ) except Exception as e: logger.error(f"โŒ Error checking pending SL triggers: {e}", exc_info=True) async def _check_automatic_risk_management(self): """Check for automatic stop loss triggers based on Config.STOP_LOSS_PERCENTAGE as safety net.""" try: # Skip if risk management is disabled or percentage is 0 if not getattr(Config, 'RISK_MANAGEMENT_ENABLED', True) or Config.STOP_LOSS_PERCENTAGE <= 0: return # Get current positions positions = self.trading_engine.get_positions() if not positions: # If no positions exist, clean up any orphaned pending stop losses await self._cleanup_orphaned_stop_losses() return for position in positions: try: symbol = position.get('symbol', '') contracts = float(position.get('contracts', 0)) entry_price = float(position.get('entryPx', 0)) mark_price = float(position.get('markPx', 0)) unrealized_pnl = float(position.get('unrealizedPnl', 0)) # Skip if no position or missing data if contracts == 0 or entry_price <= 0 or mark_price <= 0: continue # Calculate PnL percentage based on entry value entry_value = abs(contracts) * entry_price if entry_value <= 0: continue pnl_percentage = (unrealized_pnl / entry_value) * 100 # Check if loss exceeds the safety threshold if pnl_percentage <= -Config.STOP_LOSS_PERCENTAGE: token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0] position_side = "LONG" if contracts > 0 else "SHORT" logger.warning(f"๐Ÿšจ AUTOMATIC STOP LOSS TRIGGERED: {token} {position_side} position has {pnl_percentage:.2f}% loss (threshold: -{Config.STOP_LOSS_PERCENTAGE}%)") # Send notification before attempting exit if self.notification_manager: await self.notification_manager.send_generic_notification( f"๐Ÿšจ AUTOMATIC STOP LOSS TRIGGERED!\n" f"Token: {token}\n" f"Position: {position_side} {abs(contracts):.6f}\n" f"Entry Price: ${entry_price:.4f}\n" f"Current Price: ${mark_price:.4f}\n" f"Unrealized PnL: ${unrealized_pnl:.2f} ({pnl_percentage:.2f}%)\n" f"Safety Threshold: -{Config.STOP_LOSS_PERCENTAGE}%\n" f"Action: Executing emergency exit order..." ) # Execute emergency exit order exit_result = await self.trading_engine.execute_exit_order(token) if exit_result.get('success'): logger.info(f"โœ… Emergency exit order placed successfully for {token}. Order details: {exit_result.get('order_placed_details', {})}") # Cancel any pending stop losses for this symbol since position is now closed stats = self.trading_engine.get_stats() if stats: cancelled_sl_count = stats.cancel_pending_stop_losses_by_symbol( symbol=symbol, new_status='cancelled_auto_exit' ) if cancelled_sl_count > 0: logger.info(f"๐Ÿ›‘ Cancelled {cancelled_sl_count} pending stop losses for {symbol} after automatic exit") if self.notification_manager: await self.notification_manager.send_generic_notification( f"โœ… Emergency Exit Completed\n\n" f"๐Ÿ“Š Position: {token} {position_side}\n" f"๐Ÿ“‰ Loss: {pnl_percentage:.2f}% (${unrealized_pnl:.2f})\n" f"โš ๏ธ Threshold: -{Config.STOP_LOSS_PERCENTAGE}%\n" f"โœ… Action: Position automatically closed\n" f"๐Ÿ’ฐ Exit Price: ~${mark_price:.2f}\n" f"๐Ÿ†” Order ID: {exit_result.get('order_placed_details', {}).get('exchange_order_id', 'N/A')}\n" f"{f'๐Ÿ›‘ Cleanup: Cancelled {cancelled_sl_count} pending stop losses' if cancelled_sl_count > 0 else ''}\n\n" f"๐Ÿ›ก๏ธ This was an automatic safety stop triggered by the risk management system." ) else: error_msg = exit_result.get('error', 'Unknown error') logger.error(f"โŒ Failed to execute emergency exit order for {token}: {error_msg}") if self.notification_manager: await self.notification_manager.send_generic_notification( f"โŒ CRITICAL: Emergency Exit Failed!\n\n" f"๐Ÿ“Š Position: {token} {position_side}\n" f"๐Ÿ“‰ Loss: {pnl_percentage:.2f}%\n" f"โŒ Error: {error_msg}\n\n" f"โš ๏ธ MANUAL INTERVENTION REQUIRED\n" f"Please close this position manually via /exit {token}" ) except Exception as pos_error: logger.error(f"Error processing position for automatic stop loss: {pos_error}") continue except Exception as e: logger.error(f"โŒ Error in automatic risk management check: {e}", exc_info=True) async def _cleanup_orphaned_stop_losses(self): """Clean up pending stop losses that no longer have corresponding positions OR whose parent orders have been cancelled/failed.""" try: stats = self.trading_engine.get_stats() if not stats: return # Get all pending stop loss triggers pending_stop_losses = stats.get_orders_by_status('pending_trigger', 'stop_limit_trigger') if not pending_stop_losses: return logger.debug(f"Checking {len(pending_stop_losses)} pending stop losses for orphaned orders") # Get current positions to check against current_positions = self.trading_engine.get_positions() position_symbols = set() if current_positions: for pos in current_positions: symbol = pos.get('symbol') contracts = float(pos.get('contracts', 0)) if symbol and contracts != 0: position_symbols.add(symbol) # Check each pending stop loss orphaned_count = 0 for sl_order in pending_stop_losses: symbol = sl_order.get('symbol') order_db_id = sl_order.get('id') parent_bot_ref_id = sl_order.get('parent_bot_order_ref_id') should_cancel = False cancel_reason = "" # Check if parent order exists and its status if parent_bot_ref_id: parent_order = stats.get_order_by_bot_ref_id(parent_bot_ref_id) if parent_order: parent_status = parent_order.get('status', '').lower() # Cancel if parent order failed, was cancelled, or disappeared if parent_status in ['failed_submission', 'failed_submission_no_data', 'cancelled_manually', 'cancelled_externally', 'disappeared_from_exchange']: should_cancel = True cancel_reason = f"parent order {parent_status}" elif parent_status == 'filled': # Parent order filled but no position - position might have been closed externally if symbol not in position_symbols: should_cancel = True cancel_reason = "parent filled but position no longer exists" # If parent is still 'open', 'submitted', or 'partially_filled', keep the stop loss else: # Parent order not found in DB - this is truly orphaned should_cancel = True cancel_reason = "parent order not found in database" else: # No parent reference - fallback to old logic (position-based check) if symbol not in position_symbols: should_cancel = True cancel_reason = "no position exists and no parent reference" if should_cancel: # Cancel this orphaned stop loss success = stats.update_order_status( order_db_id=order_db_id, new_status='cancelled_orphaned_no_position' ) if success: orphaned_count += 1 token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0] logger.info(f"๐Ÿงน Cancelled orphaned stop loss for {token} (Order DB ID: {order_db_id}) - {cancel_reason}") if orphaned_count > 0: logger.info(f"๐Ÿงน Cleanup completed: Cancelled {orphaned_count} orphaned stop loss orders") if self.notification_manager: await self.notification_manager.send_generic_notification( f"๐Ÿงน Cleanup Completed\n\n" f"Cancelled {orphaned_count} orphaned stop loss order(s)\n" f"Reason: Parent orders cancelled/failed or positions closed externally\n" f"Time: {datetime.now().strftime('%H:%M:%S')}\n\n" f"๐Ÿ’ก This automatic cleanup ensures stop losses stay synchronized with actual orders and positions." ) except Exception as e: logger.error(f"โŒ Error cleaning up orphaned stop losses: {e}", exc_info=True) async def _activate_pending_stop_losses_from_trades(self): """๐Ÿ†• PHASE 4: Check trades table for pending stop loss activation first (highest priority)""" try: stats = self.trading_engine.get_stats() if not stats: return # Get open positions that need stop loss activation trades_needing_sl = stats.get_pending_stop_loss_activations() if not trades_needing_sl: return logger.debug(f"๐Ÿ†• Found {len(trades_needing_sl)} open positions needing stop loss activation") for position_trade in trades_needing_sl: try: symbol = position_trade['symbol'] token = symbol.split('/')[0] if '/' in symbol else symbol stop_loss_price = position_trade['stop_loss_price'] position_side = position_trade['position_side'] current_amount = position_trade.get('current_position_size', 0) lifecycle_id = position_trade['trade_lifecycle_id'] # Get current market price current_price = None try: market_data = self.trading_engine.get_market_data(symbol) if market_data and market_data.get('ticker'): current_price = float(market_data['ticker'].get('last', 0)) except Exception as price_error: logger.warning(f"Could not fetch current price for {symbol}: {price_error}") # Determine stop loss side based on position side sl_side = 'sell' if position_side == 'long' else 'buy' # Long SL = sell, Short SL = buy # Check if trigger condition is already met trigger_already_hit = False trigger_reason = "" if current_price and current_price > 0: if sl_side == 'sell' and current_price <= stop_loss_price: # LONG position stop loss - price below trigger trigger_already_hit = True trigger_reason = f"LONG SL: Current ${current_price:.4f} โ‰ค Stop ${stop_loss_price:.4f}" elif sl_side == 'buy' and current_price >= stop_loss_price: # SHORT position stop loss - price above trigger trigger_already_hit = True trigger_reason = f"SHORT SL: Current ${current_price:.4f} โ‰ฅ Stop ${stop_loss_price:.4f}" if trigger_already_hit: # Execute immediate market close logger.warning(f"๐Ÿšจ IMMEDIATE SL EXECUTION (Trades Table): {token} - {trigger_reason}") try: exit_result = await self.trading_engine.execute_exit_order(token) if exit_result.get('success'): logger.info(f"โœ… Immediate {position_side.upper()} SL execution successful for {token}") if self.notification_manager: await self.notification_manager.send_generic_notification( f"๐Ÿšจ Immediate Stop Loss Execution\n\n" f"๐Ÿ†• Source: Unified Trades Table (Phase 4)\n" f"Token: {token}\n" f"Position Type: {position_side.upper()}\n" f"SL Trigger Price: ${stop_loss_price:.4f}\n" f"Current Market Price: ${current_price:.4f}\n" f"Trigger Logic: {trigger_reason}\n" f"Action: Market close order placed immediately\n" f"Order ID: {exit_result.get('order_placed_details', {}).get('exchange_order_id', 'N/A')}\n" f"Lifecycle ID: {lifecycle_id[:8]}\n" f"Time: {datetime.now().strftime('%H:%M:%S')}\n\n" f"โšก Single source of truth prevents missed stop losses" ) else: logger.error(f"โŒ Failed to execute immediate SL for {token}: {exit_result.get('error')}") except Exception as exec_error: logger.error(f"โŒ Exception during immediate SL execution for {token}: {exec_error}") else: # Normal activation - place stop loss order try: sl_result = await self.trading_engine.execute_stop_loss_order(token, stop_loss_price) if sl_result.get('success'): sl_order_id = sl_result.get('order_placed_details', {}).get('exchange_order_id') # Link the stop loss order to the trade lifecycle stats.link_stop_loss_to_trade(lifecycle_id, sl_order_id, stop_loss_price) logger.info(f"โœ… Activated {position_side.upper()} stop loss for {token}: ${stop_loss_price:.4f}") if self.notification_manager: await self.notification_manager.send_generic_notification( f"๐Ÿ›‘ Stop Loss Activated\n\n" f"๐Ÿ†• Source: Unified Trades Table (Phase 4)\n" f"Token: {token}\n" f"Position Type: {position_side.upper()}\n" f"Stop Loss Price: ${stop_loss_price:.4f}\n" f"Current Price: ${current_price:.4f if current_price else 'Unknown'}\n" f"Order ID: {sl_order_id or 'N/A'}\n" f"Lifecycle ID: {lifecycle_id[:8]}\n" f"Time: {datetime.now().strftime('%H:%M:%S')}\n\n" f"๐Ÿ›ก๏ธ Your position is now protected" ) else: logger.error(f"โŒ Failed to activate SL for {token}: {sl_result.get('error')}") except Exception as activation_error: logger.error(f"โŒ Exception during SL activation for {token}: {activation_error}") except Exception as trade_error: logger.error(f"โŒ Error processing position trade for SL activation: {trade_error}") except Exception as e: logger.error(f"โŒ Error activating pending stop losses from trades table: {e}", exc_info=True) async def _activate_pending_stop_losses(self, order_in_db, stats): """Activate pending stop losses for a filled order, checking current price for immediate execution.""" try: # Fetch pending stop losses for this order pending_stop_losses = stats.get_orders_by_status('pending_trigger', 'stop_limit_trigger', order_in_db['bot_order_ref_id']) if not pending_stop_losses: return symbol = order_in_db.get('symbol') token = symbol.split('/')[0] if '/' in symbol and symbol else 'Unknown' logger.debug(f"Found {len(pending_stop_losses)} pending stop loss(es) for filled order {order_in_db.get('exchange_order_id', 'N/A')}") # Get current market price for the symbol current_price = None try: market_data = self.trading_engine.get_market_data(symbol) if market_data and market_data.get('ticker'): current_price = float(market_data['ticker'].get('last', 0)) if current_price <= 0: current_price = None except Exception as price_error: logger.warning(f"Could not fetch current price for {symbol}: {price_error}") current_price = None # Check if we still have a position for this symbol after the fill if symbol: # Try to get current position try: position = self.trading_engine.find_position(token) if position and float(position.get('contracts', 0)) != 0: # Position exists - check each stop loss activated_count = 0 immediately_executed_count = 0 for sl_order in pending_stop_losses: sl_trigger_price = float(sl_order.get('price', 0)) sl_side = sl_order.get('side', '').lower() # 'sell' for long SL, 'buy' for short SL sl_db_id = sl_order.get('id') sl_amount = sl_order.get('amount_requested', 0) if not sl_trigger_price or not sl_side or not sl_db_id: logger.warning(f"Invalid stop loss data for DB ID {sl_db_id}, skipping") continue # Check if trigger condition is already met trigger_already_hit = False trigger_reason = "" if current_price and current_price > 0: if sl_side == 'sell' and current_price <= sl_trigger_price: # LONG position stop loss - price has fallen below trigger # Long SL = SELL order that triggers when price drops below stop price trigger_already_hit = True trigger_reason = f"LONG SL: Current price ${current_price:.4f} โ‰ค Stop price ${sl_trigger_price:.4f}" elif sl_side == 'buy' and current_price >= sl_trigger_price: # SHORT position stop loss - price has risen above trigger # Short SL = BUY order that triggers when price rises above stop price trigger_already_hit = True trigger_reason = f"SHORT SL: Current price ${current_price:.4f} โ‰ฅ Stop price ${sl_trigger_price:.4f}" if trigger_already_hit: # Execute immediate market close instead of activating stop loss logger.warning(f"๐Ÿšจ IMMEDIATE SL EXECUTION: {token} - {trigger_reason}") # Update the stop loss status to show it was immediately executed stats.update_order_status(order_db_id=sl_db_id, new_status='immediately_executed_on_activation') # Execute market order to close position try: exit_result = await self.trading_engine.execute_exit_order(token) if exit_result.get('success'): immediately_executed_count += 1 position_side = "LONG" if sl_side == 'sell' else "SHORT" logger.info(f"โœ… Immediate {position_side} SL execution successful for {token}. Market order placed: {exit_result.get('order_placed_details', {}).get('exchange_order_id', 'N/A')}") if self.notification_manager: await self.notification_manager.send_generic_notification( f"๐Ÿšจ Immediate Stop Loss Execution\n\n" f"Token: {token}\n" f"Position Type: {position_side}\n" f"SL Trigger Price: ${sl_trigger_price:.4f}\n" f"Current Market Price: ${current_price:.4f}\n" f"Trigger Logic: {trigger_reason}\n" f"Action: Market close order placed immediately\n" f"Reason: Trigger condition already met when activating\n" f"Order ID: {exit_result.get('order_placed_details', {}).get('exchange_order_id', 'N/A')}\n" f"Time: {datetime.now().strftime('%H:%M:%S')}\n\n" f"โšก This prevents waiting for a trigger that's already passed" ) else: logger.error(f"โŒ Failed to execute immediate SL for {token}: {exit_result.get('error', 'Unknown error')}") if self.notification_manager: await self.notification_manager.send_generic_notification( f"โŒ Immediate SL Execution Failed\n\n" f"Token: {token}\n" f"SL Price: ${sl_trigger_price:.4f}\n" f"Current Price: ${current_price:.4f}\n" f"Trigger Logic: {trigger_reason}\n" f"Error: {exit_result.get('error', 'Unknown error')}\n\n" f"โš ๏ธ Manual intervention may be required" ) # Revert status since execution failed stats.update_order_status(order_db_id=sl_db_id, new_status='activation_execution_failed') except Exception as exec_error: logger.error(f"โŒ Exception during immediate SL execution for {token}: {exec_error}") stats.update_order_status(order_db_id=sl_db_id, new_status='activation_execution_error') else: # Normal activation - trigger condition not yet met activated_count += 1 position_side = "LONG" if sl_side == 'sell' else "SHORT" logger.info(f"โœ… Activating {position_side} stop loss for {token}: SL price ${sl_trigger_price:.4f} (Current: ${current_price:.4f if current_price else 'Unknown'})") # Send summary notification for normal activations if activated_count > 0 and self.notification_manager: await self.notification_manager.send_generic_notification( f"๐Ÿ›‘ Stop Losses Activated\n\n" f"Symbol: {token}\n" f"Activated: {activated_count} stop loss(es)\n" f"Current Price: ${current_price:.4f if current_price else 'Unknown'}\n" f"Status: Monitoring for trigger conditions" f"{f'\\n\\nโšก Additionally executed {immediately_executed_count} stop loss(es) immediately due to current market conditions' if immediately_executed_count > 0 else ''}" ) elif immediately_executed_count > 0 and activated_count == 0: # All stop losses were immediately executed if self.notification_manager: await self.notification_manager.send_generic_notification( f"โšก All Stop Losses Executed Immediately\n\n" f"Symbol: {token}\n" f"Executed: {immediately_executed_count} stop loss(es)\n" f"Reason: Market price already beyond trigger levels\n" f"Current Price: ${current_price:.4f if current_price else 'Unknown'}\n\n" f"๐Ÿš€ Position(s) closed at market to prevent further losses" ) else: # No position exists (might have been closed immediately) - cancel the stop losses cancelled_count = stats.cancel_linked_orders( parent_bot_order_ref_id=order_in_db['bot_order_ref_id'], new_status='cancelled_no_position' ) if cancelled_count > 0: logger.info(f"โŒ Cancelled {cancelled_count} pending stop losses for {symbol} - no position found") if self.notification_manager: await self.notification_manager.send_generic_notification( f"๐Ÿ›‘ Stop Losses Cancelled\n\n" f"Symbol: {token}\n" f"Cancelled: {cancelled_count} stop loss(es)\n" f"Reason: No open position found" ) except Exception as pos_check_error: logger.warning(f"Could not check position for {symbol} during SL activation: {pos_check_error}") # In case of error, still try to activate (safer to have redundant SLs than none) except Exception as e: logger.error(f"Error in _activate_pending_stop_losses: {e}", exc_info=True) async def _check_for_recent_fills_for_order(self, exchange_oid, order_in_db): """Check for very recent fills that might match this order.""" try: # Get recent fills from exchange recent_fills = self.trading_engine.get_recent_fills() if not recent_fills: return False # Get last processed timestamp from database if not hasattr(self, '_last_processed_trade_time') or self._last_processed_trade_time is None: try: last_time_str = self.trading_engine.stats._get_metadata('last_processed_trade_time') if last_time_str: self._last_processed_trade_time = datetime.fromisoformat(last_time_str) logger.debug(f"Loaded last_processed_trade_time from DB: {self._last_processed_trade_time}") else: # If no last processed time, start from 1 hour ago to avoid processing too much history self._last_processed_trade_time = datetime.now(timezone.utc) - timedelta(hours=1) logger.info("No last_processed_trade_time found, setting to 1 hour ago (UTC).") except Exception as e: logger.warning(f"Could not load last_processed_trade_time from DB: {e}") self._last_processed_trade_time = datetime.now(timezone.utc) - timedelta(hours=1) # Process new fills for fill in recent_fills: try: # Parse fill data - CCXT format from fetch_my_trades trade_id = fill.get('id') # CCXT uses 'id' for trade ID timestamp_ms = fill.get('timestamp') # CCXT uses 'timestamp' (milliseconds) symbol = fill.get('symbol') # CCXT uses 'symbol' in full format like 'LTC/USDC:USDC' side = fill.get('side') # CCXT uses 'side' ('buy' or 'sell') amount = float(fill.get('amount', 0)) # CCXT uses 'amount' price = float(fill.get('price', 0)) # CCXT uses 'price' # Convert timestamp if timestamp_ms: timestamp_dt = datetime.fromtimestamp(timestamp_ms / 1000, tz=timezone.utc) else: timestamp_dt = datetime.now(timezone.utc) # Skip if already processed if timestamp_dt <= self._last_processed_trade_time: continue # Process as external trade if we reach here if symbol and side and amount > 0 and price > 0: # Symbol is already in full format for CCXT full_symbol = symbol token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0] # Check if this might be a bot order fill by looking for exchange order ID # CCXT might have this in 'info' sub-object with the raw exchange data exchange_order_id_from_fill = None if 'info' in fill and isinstance(fill['info'], dict): # Look for Hyperliquid order ID in the raw response exchange_order_id_from_fill = fill['info'].get('oid') if exchange_order_id_from_fill == exchange_oid: logger.info(f"โœ… Found recent fill for order {exchange_oid} - NOT cancelling stop losses") return True except Exception as e: logger.error(f"Error processing fill {fill}: {e}") continue return False except Exception as e: logger.error(f"โŒ Error checking for recent fills for order: {e}", exc_info=True) return False async def _check_external_stop_loss_orders(self): """Check for externally placed stop loss orders and track them.""" try: # Get current open orders open_orders = self.trading_engine.get_orders() if not open_orders: return # Get current positions to understand what could be stop losses positions = self.trading_engine.get_positions() if not positions: return # Create a map of current positions position_map = {} for position in positions: symbol = position.get('symbol') contracts = float(position.get('contracts', 0)) if symbol and contracts != 0: token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0] position_map[token] = { 'symbol': symbol, 'contracts': contracts, 'side': 'long' if contracts > 0 else 'short', 'entry_price': float(position.get('entryPx', 0)) } # Check each order to see if it could be a stop loss newly_detected = 0 for order in open_orders: try: exchange_order_id = order.get('id') symbol = order.get('symbol') side = order.get('side') # 'buy' or 'sell' amount = float(order.get('amount', 0)) price = float(order.get('price', 0)) order_type = order.get('type', '').lower() if not all([exchange_order_id, symbol, side, amount, price]): continue # Skip if we're already tracking this order if exchange_order_id in self._external_stop_loss_orders: continue # Check if this order could be a stop loss token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0] # Must have a position in this token to have a stop loss if token not in position_map: continue position = position_map[token] # Check if this order matches stop loss pattern is_stop_loss = False if position['side'] == 'long' and side == 'sell': # Long position with sell order - could be stop loss if price is below entry if price < position['entry_price'] * 0.98: # Allow 2% buffer for approximation is_stop_loss = True elif position['side'] == 'short' and side == 'buy': # Short position with buy order - could be stop loss if price is above entry if price > position['entry_price'] * 1.02: # Allow 2% buffer for approximation is_stop_loss = True if is_stop_loss: # Track this as an external stop loss order self._external_stop_loss_orders[exchange_order_id] = { 'token': token, 'symbol': symbol, 'trigger_price': price, 'side': side, 'amount': amount, 'position_side': position['side'], 'detected_at': datetime.now(timezone.utc), 'entry_price': position['entry_price'] } newly_detected += 1 logger.info(f"๐Ÿ›‘ Detected external stop loss order: {token} {side.upper()} {amount} @ ${price:.2f} (protecting {position['side'].upper()} position)") except Exception as e: logger.error(f"Error analyzing order for stop loss detection: {e}") continue if newly_detected > 0: logger.info(f"๐Ÿ” Detected {newly_detected} new external stop loss orders") except Exception as e: logger.error(f"โŒ Error checking external stop loss orders: {e}") async def _cleanup_external_stop_loss_tracking(self): """Clean up external stop loss orders that are no longer active.""" try: if not self._external_stop_loss_orders: return # Get current open orders open_orders = self.trading_engine.get_orders() if not open_orders: # No open orders, clear all tracking removed_count = len(self._external_stop_loss_orders) self._external_stop_loss_orders.clear() if removed_count > 0: logger.info(f"๐Ÿงน Cleared {removed_count} external stop loss orders (no open orders)") return # Get set of current order IDs current_order_ids = {order.get('id') for order in open_orders if order.get('id')} # Remove any tracked stop loss orders that are no longer open to_remove = [] for order_id, stop_loss_info in self._external_stop_loss_orders.items(): if order_id not in current_order_ids: to_remove.append(order_id) for order_id in to_remove: stop_loss_info = self._external_stop_loss_orders[order_id] del self._external_stop_loss_orders[order_id] logger.info(f"๐Ÿ—‘๏ธ Removed external stop loss tracking for {stop_loss_info['token']} order {order_id} (no longer open)") if to_remove: logger.info(f"๐Ÿงน Cleaned up {len(to_remove)} external stop loss orders") except Exception as e: logger.error(f"โŒ Error cleaning up external stop loss tracking: {e}")