#!/usr/bin/env python3 """ Monitors external events like trades made outside the bot and price alarms. """ import logging import asyncio from datetime import datetime, timedelta, timezone from typing import Optional, Dict, Any, List # Assuming AlarmManager will be moved here or imported appropriately # from .alarm_manager import AlarmManager from src.monitoring.alarm_manager import AlarmManager # Keep if AlarmManager stays in its own file as per original structure from src.utils.token_display_formatter import get_formatter logger = logging.getLogger(__name__) class ExternalEventMonitor: def __init__(self, trading_engine, notification_manager, alarm_manager, market_monitor_cache, shared_state): self.trading_engine = trading_engine self.notification_manager = notification_manager self.alarm_manager = alarm_manager self.market_monitor_cache = market_monitor_cache self.shared_state = shared_state # Expected to contain {'external_stop_losses': ...} self.last_processed_trade_time: Optional[datetime] = None # Add necessary initializations, potentially loading last_processed_trade_time def _safe_get_positions(self) -> Optional[List[Dict[str, Any]]]: """Safely get positions from trading engine, returning None on API failures instead of empty list.""" try: return self.trading_engine.get_positions() except Exception as e: logger.warning(f"âš ī¸ Failed to fetch positions in external event monitor: {e}") return None async def _check_price_alarms(self): """Check price alarms and trigger notifications.""" try: active_alarms = self.alarm_manager.get_all_active_alarms() if not active_alarms: return tokens_to_check = list(set(alarm['token'] for alarm in active_alarms)) for token in tokens_to_check: try: symbol = f"{token}/USDC:USDC" market_data = self.trading_engine.get_market_data(symbol) if not market_data or not market_data.get('ticker'): continue current_price = float(market_data['ticker'].get('last', 0)) if current_price <= 0: continue token_alarms = [alarm for alarm in active_alarms if alarm['token'] == token] for alarm in token_alarms: target_price = alarm['target_price'] direction = alarm['direction'] should_trigger = False if direction == 'above' and current_price >= target_price: should_trigger = True elif direction == 'below' and current_price <= target_price: should_trigger = True if should_trigger: triggered_alarm = self.alarm_manager.trigger_alarm(alarm['id'], current_price) if triggered_alarm: await self._send_alarm_notification(triggered_alarm) except Exception as e: logger.error(f"Error checking alarms for {token}: {e}") except Exception as e: logger.error("❌ Error checking price alarms: {e}") async def _send_alarm_notification(self, alarm: Dict[str, Any]): """Send notification for triggered alarm.""" try: if self.notification_manager: await self.notification_manager.send_alarm_triggered_notification( alarm['token'], alarm['target_price'], alarm['triggered_price'], alarm['direction'] ) else: logger.info(f"🔔 ALARM TRIGGERED: {alarm['token']} @ ${alarm['triggered_price']:,.2f}") except Exception as e: logger.error(f"❌ Error sending alarm notification: {e}") async def _determine_position_action_type(self, full_symbol: str, side_from_fill: str, amount_from_fill: float, existing_lc: Optional[Dict] = None) -> str: """ Determine the type of position action based on current state and fill details. Returns one of: 'position_opened', 'position_closed', 'position_increased', 'position_decreased' """ try: # Get current position from exchange current_positions = self._safe_get_positions() if current_positions is None: logger.warning(f"âš ī¸ Failed to fetch positions for {full_symbol} analysis - returning external_unmatched") return 'external_unmatched' current_exchange_position = None for pos in current_positions: if pos.get('symbol') == full_symbol: current_exchange_position = pos break current_size = 0.0 if current_exchange_position: current_size = abs(float(current_exchange_position.get('contracts', 0))) # If no existing lifecycle, this is a position opening if not existing_lc: logger.debug(f"🔍 Position analysis: {full_symbol} no existing lifecycle, current size: {current_size}") if current_size > 1e-9: # Position exists on exchange return 'position_opened' else: return 'external_unmatched' # Get previous position size from lifecycle previous_size = existing_lc.get('current_position_size', 0) lc_position_side = existing_lc.get('position_side') logger.debug(f"🔍 Position analysis: {full_symbol} {side_from_fill} {amount_from_fill}") logger.debug(f" Lifecycle side: {lc_position_side}, previous size: {previous_size}, current size: {current_size}") # Check if this is a closing trade (opposite side) is_closing_trade = False if lc_position_side == 'long' and side_from_fill.lower() == 'sell': is_closing_trade = True elif lc_position_side == 'short' and side_from_fill.lower() == 'buy': is_closing_trade = True logger.debug(f" Is closing trade: {is_closing_trade}") if is_closing_trade: if current_size < 1e-9: # Position is now closed logger.debug(f" → Position closed (current_size < 1e-9)") return 'position_closed' elif current_size < previous_size - 1e-9: # Position reduced but not closed logger.debug(f" → Position decreased (current_size {current_size} < previous_size - 1e-9 {previous_size - 1e-9})") return 'position_decreased' else: # Same side trade - position increase logger.debug(f" Same side trade check: current_size {current_size} > previous_size + 1e-9 {previous_size + 1e-9}?") if current_size > previous_size + 1e-9: logger.debug(f" → Position increased") return 'position_increased' else: logger.debug(f" → Size check failed, not enough increase") # Default fallback logger.debug(f" → Fallback to external_unmatched") return 'external_unmatched' except Exception as e: logger.error(f"Error determining position action type: {e}") return 'external_unmatched' async def _update_lifecycle_position_size(self, lifecycle_id: str, new_size: float) -> bool: """Update the current position size in the lifecycle.""" try: stats = self.trading_engine.get_stats() if not stats: return False # Update the current position size success = stats.trade_manager.update_trade_market_data( lifecycle_id, current_position_size=new_size ) return success except Exception as e: logger.error(f"Error updating lifecycle position size: {e}") return False async def _send_position_change_notification(self, full_symbol: str, side_from_fill: str, amount_from_fill: float, price_from_fill: float, action_type: str, timestamp_dt: datetime, existing_lc: Optional[Dict] = None, realized_pnl: Optional[float] = None): """Send position change notification.""" try: if not self.notification_manager: return token = full_symbol.split('/')[0] if '/' in full_symbol else full_symbol.split(':')[0] time_str = timestamp_dt.strftime('%Y-%m-%d %H:%M:%S UTC') formatter = get_formatter() if action_type == 'position_closed' and existing_lc: position_side = existing_lc.get('position_side', 'unknown').upper() entry_price = existing_lc.get('entry_price', 0) pnl_emoji = "đŸŸĸ" if realized_pnl and realized_pnl >= 0 else "🔴" pnl_text = f"{await formatter.format_price_with_symbol(realized_pnl)}" if realized_pnl is not None else "N/A" # Get ROE directly from exchange data info_data = existing_lc.get('info', {}) position_info = info_data.get('position', {}) roe_raw = position_info.get('returnOnEquity') # Changed from 'percentage' to 'returnOnEquity' if roe_raw is not None: try: # The exchange provides ROE as a decimal (e.g., -0.326 for -32.6%) # We need to multiply by 100 and keep the sign roe = float(roe_raw) * 100 roe_text = f" ({roe:+.2f}%)" except (ValueError, TypeError): logger.warning(f"Could not parse ROE value: {roe_raw} for {full_symbol}") roe_text = "" else: logger.warning(f"No ROE data available from exchange for {full_symbol}") roe_text = "" message = f""" đŸŽ¯ Position Closed (External) 📊 Trade Details: â€ĸ Token: {token} â€ĸ Direction: {position_side} â€ĸ Size Closed: {await formatter.format_amount(amount_from_fill, token)} â€ĸ Entry Price: {await formatter.format_price_with_symbol(entry_price, token)} â€ĸ Exit Price: {await formatter.format_price_with_symbol(price_from_fill, token)} â€ĸ Exit Value: {await formatter.format_price_with_symbol(amount_from_fill * price_from_fill)} {pnl_emoji} P&L: {pnl_text}{roe_text} ✅ Status: {position_side} position closed externally ⏰ Time: {time_str} 📊 Use /stats to view updated performance """ elif action_type == 'position_opened': position_side = 'LONG' if side_from_fill.lower() == 'buy' else 'SHORT' message = f""" 🚀 Position Opened (External) 📊 Trade Details: â€ĸ Token: {token} â€ĸ Direction: {position_side} â€ĸ Size: {await formatter.format_amount(amount_from_fill, token)} â€ĸ Entry Price: {await formatter.format_price_with_symbol(price_from_fill, token)} â€ĸ Position Value: {await formatter.format_price_with_symbol(amount_from_fill * price_from_fill)} ✅ Status: New {position_side} position opened externally ⏰ Time: {time_str} 📱 Use /positions to view all positions """ elif action_type == 'position_increased' and existing_lc: position_side = existing_lc.get('position_side', 'unknown').upper() previous_size = existing_lc.get('current_position_size', 0) # Get current size from exchange current_positions = self._safe_get_positions() if current_positions is None: # Skip notification if we can't get position data logger.warning(f"âš ī¸ Failed to fetch positions for notification - skipping {action_type} notification") return current_size = 0 for pos in current_positions: if pos.get('symbol') == full_symbol: current_size = abs(float(pos.get('contracts', 0))) break message = f""" 📈 Position Increased (External) 📊 Trade Details: â€ĸ Token: {token} â€ĸ Direction: {position_side} â€ĸ Size Added: {await formatter.format_amount(amount_from_fill, token)} â€ĸ Add Price: {await formatter.format_price_with_symbol(price_from_fill, token)} â€ĸ Previous Size: {await formatter.format_amount(previous_size, token)} â€ĸ New Size: {await formatter.format_amount(current_size, token)} â€ĸ Add Value: {await formatter.format_price_with_symbol(amount_from_fill * price_from_fill)} 📈 Status: {position_side} position size increased externally ⏰ Time: {time_str} 📈 Use /positions to view current position """ elif action_type == 'position_decreased' and existing_lc: position_side = existing_lc.get('position_side', 'unknown').upper() previous_size = existing_lc.get('current_position_size', 0) entry_price = existing_lc.get('entry_price', 0) # Get current size from exchange current_positions = self._safe_get_positions() if current_positions is None: # Skip notification if we can't get position data logger.warning(f"âš ī¸ Failed to fetch positions for notification - skipping {action_type} notification") return current_size = 0 for pos in current_positions: if pos.get('symbol') == full_symbol: current_size = abs(float(pos.get('contracts', 0))) break # Calculate partial PnL for the reduced amount partial_pnl = 0 if entry_price > 0: if position_side == 'LONG': partial_pnl = amount_from_fill * (price_from_fill - entry_price) else: # SHORT partial_pnl = amount_from_fill * (entry_price - price_from_fill) pnl_emoji = "đŸŸĸ" if partial_pnl >= 0 else "🔴" # Calculate ROE for the partial close roe_text = "" if entry_price > 0 and amount_from_fill > 0: cost_basis = amount_from_fill * entry_price roe = (partial_pnl / cost_basis) * 100 roe_text = f" ({roe:+.2f}%)" message = f""" 📉 Position Decreased (External) 📊 Trade Details: â€ĸ Token: {token} â€ĸ Direction: {position_side} â€ĸ Size Reduced: {await formatter.format_amount(amount_from_fill, token)} â€ĸ Exit Price: {await formatter.format_price_with_symbol(price_from_fill, token)} â€ĸ Previous Size: {await formatter.format_amount(previous_size, token)} â€ĸ Remaining Size: {await formatter.format_amount(current_size, token)} â€ĸ Exit Value: {await formatter.format_price_with_symbol(amount_from_fill * price_from_fill)} {pnl_emoji} Partial P&L: {await formatter.format_price_with_symbol(partial_pnl)}{roe_text} 📉 Status: {position_side} position size decreased externally ⏰ Time: {time_str} 📊 Position remains open. Use /positions to view details """ else: # No fallback notification sent - only position-based notifications per user preference logger.debug(f"No notification sent for action_type: {action_type}") return await self.notification_manager.send_generic_notification(message.strip()) except Exception as e: logger.error(f"Error sending position change notification: {e}") async def _auto_sync_single_position(self, symbol: str, exchange_position: Dict[str, Any], stats) -> bool: """Auto-sync a single orphaned position to create a lifecycle record.""" try: import uuid from src.utils.token_display_formatter import get_formatter formatter = get_formatter() contracts_abs = abs(float(exchange_position.get('contracts', 0))) if contracts_abs <= 1e-9: return False entry_price_from_exchange = float(exchange_position.get('entryPrice', 0)) or float(exchange_position.get('entryPx', 0)) # Determine position side position_side, order_side = '', '' ccxt_side = exchange_position.get('side', '').lower() if ccxt_side == 'long': position_side, order_side = 'long', 'buy' elif ccxt_side == 'short': position_side, order_side = 'short', 'sell' if not position_side: contracts_val = float(exchange_position.get('contracts', 0)) if contracts_val > 1e-9: position_side, order_side = 'long', 'buy' elif contracts_val < -1e-9: position_side, order_side = 'short', 'sell' else: return False if not position_side: logger.error(f"AUTO-SYNC: Could not determine position side for {symbol}.") return False final_entry_price = entry_price_from_exchange if not final_entry_price or final_entry_price <= 0: # Fallback to a reasonable estimate (current mark price) mark_price = float(exchange_position.get('markPrice', 0)) or float(exchange_position.get('markPx', 0)) if mark_price > 0: final_entry_price = mark_price else: logger.error(f"AUTO-SYNC: Could not determine entry price for {symbol}.") return False logger.info(f"🔄 AUTO-SYNC: Creating lifecycle for {symbol} {position_side.upper()} {contracts_abs} @ {await formatter.format_price_with_symbol(final_entry_price, symbol)}") unique_sync_id = str(uuid.uuid4())[:8] lifecycle_id = stats.create_trade_lifecycle( symbol=symbol, side=order_side, entry_order_id=f"external_sync_{unique_sync_id}", trade_type='external_sync' ) if lifecycle_id: success = await stats.update_trade_position_opened( lifecycle_id, final_entry_price, contracts_abs, f"external_fill_sync_{unique_sync_id}" ) if success: logger.info(f"✅ AUTO-SYNC: Successfully synced position for {symbol} (Lifecycle: {lifecycle_id[:8]})") # Send position opened notification for auto-synced position try: await self._send_position_change_notification( symbol, order_side, contracts_abs, final_entry_price, 'position_opened', datetime.now(timezone.utc) ) logger.info(f"📨 AUTO-SYNC: Sent position opened notification for {symbol}") except Exception as e: logger.error(f"❌ AUTO-SYNC: Failed to send notification for {symbol}: {e}") return True else: logger.error(f"❌ AUTO-SYNC: Failed to update lifecycle to 'position_opened' for {symbol}") else: logger.error(f"❌ AUTO-SYNC: Failed to create lifecycle for {symbol}") return False except Exception as e: logger.error(f"❌ AUTO-SYNC: Error syncing position for {symbol}: {e}") return False async def _check_external_trades(self): """Check for trades made outside the Telegram bot and update stats.""" try: stats = self.trading_engine.get_stats() if not stats: logger.warning("TradingStats not available in _check_external_trades. Skipping.") return external_trades_processed = 0 symbols_with_fills = set() recent_fills = self.trading_engine.get_recent_fills() if not recent_fills: logger.debug("No recent fills data available") return if not hasattr(self, 'last_processed_trade_time') or self.last_processed_trade_time is None: try: # Ensure this metadata key is the one used by MarketMonitor for saving this state. last_time_str = stats._get_metadata('market_monitor_last_processed_trade_time') if last_time_str: self.last_processed_trade_time = datetime.fromisoformat(last_time_str).replace(tzinfo=timezone.utc) else: self.last_processed_trade_time = datetime.now(timezone.utc) - timedelta(hours=1) except Exception: self.last_processed_trade_time = datetime.now(timezone.utc) - timedelta(hours=1) for fill in recent_fills: try: trade_id = fill.get('id') timestamp_ms = fill.get('timestamp') symbol_from_fill = fill.get('symbol') side_from_fill = fill.get('side') amount_from_fill = float(fill.get('amount', 0)) price_from_fill = float(fill.get('price', 0)) timestamp_dt = datetime.fromtimestamp(timestamp_ms / 1000, tz=timezone.utc) if timestamp_ms else datetime.now(timezone.utc) if self.last_processed_trade_time and timestamp_dt <= self.last_processed_trade_time: continue # Check if this fill has already been processed to prevent duplicates if trade_id and stats.has_exchange_fill_been_processed(str(trade_id)): logger.debug(f"Skipping already processed fill: {trade_id}") continue fill_processed_this_iteration = False if not (symbol_from_fill and side_from_fill and amount_from_fill > 0 and price_from_fill > 0): logger.warning(f"Skipping fill with incomplete data: {fill}") continue full_symbol = symbol_from_fill token = symbol_from_fill.split('/')[0] if '/' in symbol_from_fill else symbol_from_fill.split(':')[0] exchange_order_id_from_fill = fill.get('info', {}).get('oid') # First check if this is a pending entry order fill if exchange_order_id_from_fill: pending_lc = stats.get_lifecycle_by_entry_order_id(exchange_order_id_from_fill, status='pending') if pending_lc and pending_lc.get('symbol') == full_symbol: success = await stats.update_trade_position_opened( lifecycle_id=pending_lc['trade_lifecycle_id'], entry_price=price_from_fill, entry_amount=amount_from_fill, exchange_fill_id=trade_id ) if success: logger.info(f"📈 Lifecycle ENTRY: {pending_lc['trade_lifecycle_id']} for {full_symbol} updated by fill {trade_id}.") symbols_with_fills.add(token) order_in_db_for_entry = stats.get_order_by_exchange_id(exchange_order_id_from_fill) if order_in_db_for_entry: stats.update_order_status(order_db_id=order_in_db_for_entry['id'], new_status='filled', amount_filled_increment=amount_from_fill) # Send position opened notification (this is a bot-initiated position) await self._send_position_change_notification( full_symbol, side_from_fill, amount_from_fill, price_from_fill, 'position_opened', timestamp_dt ) fill_processed_this_iteration = True # Check if this is a known bot order (SL/TP/exit) if not fill_processed_this_iteration and exchange_order_id_from_fill: active_lc = None closure_reason_action_type = None bot_order_db_id_to_update = None bot_order_for_fill = stats.get_order_by_exchange_id(exchange_order_id_from_fill) if bot_order_for_fill and bot_order_for_fill.get('symbol') == full_symbol: order_type = bot_order_for_fill.get('type') order_side = bot_order_for_fill.get('side') if order_type == 'market': potential_lc = stats.get_trade_by_symbol_and_status(full_symbol, 'position_opened') if potential_lc: lc_pos_side = potential_lc.get('position_side') if (lc_pos_side == 'long' and order_side == 'sell' and side_from_fill == 'sell') or \ (lc_pos_side == 'short' and order_side == 'buy' and side_from_fill == 'buy'): active_lc = potential_lc closure_reason_action_type = f"bot_exit_{lc_pos_side}_close" bot_order_db_id_to_update = bot_order_for_fill.get('id') logger.info(f"â„šī¸ Lifecycle BOT EXIT: Fill {trade_id} (OID {exchange_order_id_from_fill}) for {full_symbol} matches bot exit for lifecycle {active_lc['trade_lifecycle_id']}.") if not active_lc: lc_by_sl = stats.get_lifecycle_by_sl_order_id(exchange_order_id_from_fill, status='position_opened') if lc_by_sl and lc_by_sl.get('symbol') == full_symbol: active_lc = lc_by_sl closure_reason_action_type = f"sl_{active_lc.get('position_side')}_close" bot_order_db_id_to_update = bot_order_for_fill.get('id') logger.info(f"â„šī¸ Lifecycle SL: Fill {trade_id} for OID {exchange_order_id_from_fill} matches SL for lifecycle {active_lc['trade_lifecycle_id']}.") if not active_lc: lc_by_tp = stats.get_lifecycle_by_tp_order_id(exchange_order_id_from_fill, status='position_opened') if lc_by_tp and lc_by_tp.get('symbol') == full_symbol: active_lc = lc_by_tp closure_reason_action_type = f"tp_{active_lc.get('position_side')}_close" bot_order_db_id_to_update = bot_order_for_fill.get('id') logger.info(f"â„šī¸ Lifecycle TP: Fill {trade_id} for OID {exchange_order_id_from_fill} matches TP for lifecycle {active_lc['trade_lifecycle_id']}.") # Process known bot order fills if active_lc and closure_reason_action_type: lc_id = active_lc['trade_lifecycle_id'] lc_entry_price = active_lc.get('entry_price', 0) lc_position_side = active_lc.get('position_side') realized_pnl = 0 if lc_position_side == 'long': realized_pnl = amount_from_fill * (price_from_fill - lc_entry_price) elif lc_position_side == 'short': realized_pnl = amount_from_fill * (lc_entry_price - price_from_fill) success = stats.update_trade_position_closed( lifecycle_id=lc_id, exit_price=price_from_fill, realized_pnl=realized_pnl, exchange_fill_id=trade_id ) if success: pnl_emoji = "đŸŸĸ" if realized_pnl >= 0 else "🔴" formatter = get_formatter() logger.info(f"{pnl_emoji} Lifecycle CLOSED: {lc_id} ({closure_reason_action_type}). PNL for fill: {formatter.format_price_with_symbol(realized_pnl)}") symbols_with_fills.add(token) # Send position closed notification await self._send_position_change_notification( full_symbol, side_from_fill, amount_from_fill, price_from_fill, 'position_closed', timestamp_dt, active_lc, realized_pnl ) stats.migrate_trade_to_aggregated_stats(lc_id) if bot_order_db_id_to_update: stats.update_order_status(order_db_id=bot_order_db_id_to_update, new_status='filled', amount_filled_increment=amount_from_fill) fill_processed_this_iteration = True # Check for external stop losses if not fill_processed_this_iteration: if (exchange_order_id_from_fill and self.shared_state.get('external_stop_losses') and exchange_order_id_from_fill in self.shared_state['external_stop_losses']): stop_loss_info = self.shared_state['external_stop_losses'][exchange_order_id_from_fill] formatter = get_formatter() logger.info(f"🛑 External SL (MM Tracking): {token} Order {exchange_order_id_from_fill} filled @ {formatter.format_price_with_symbol(price_from_fill, token)}") sl_active_lc = stats.get_trade_by_symbol_and_status(full_symbol, 'position_opened') if sl_active_lc: lc_id = sl_active_lc['trade_lifecycle_id'] lc_entry_price = sl_active_lc.get('entry_price', 0) lc_pos_side = sl_active_lc.get('position_side') realized_pnl = amount_from_fill * (price_from_fill - lc_entry_price) if lc_pos_side == 'long' else amount_from_fill * (lc_entry_price - price_from_fill) success = stats.update_trade_position_closed(lc_id, price_from_fill, realized_pnl, trade_id) if success: pnl_emoji = "đŸŸĸ" if realized_pnl >= 0 else "🔴" logger.info(f"{pnl_emoji} Lifecycle CLOSED by External SL (MM): {lc_id}. PNL: {formatter.format_price_with_symbol(realized_pnl)}") if self.notification_manager: await self.notification_manager.send_stop_loss_execution_notification( stop_loss_info, full_symbol, side_from_fill, amount_from_fill, price_from_fill, f'{lc_pos_side}_closed_external_sl', timestamp_dt.isoformat(), realized_pnl ) stats.migrate_trade_to_aggregated_stats(lc_id) # Modify shared state carefully if exchange_order_id_from_fill in self.shared_state['external_stop_losses']: del self.shared_state['external_stop_losses'][exchange_order_id_from_fill] fill_processed_this_iteration = True else: logger.warning(f"âš ī¸ External SL (MM) {exchange_order_id_from_fill} for {full_symbol}, but no active lifecycle found.") # NEW: Enhanced external trade processing with position state detection if not fill_processed_this_iteration: existing_lc = stats.get_trade_by_symbol_and_status(full_symbol, 'position_opened') # If no lifecycle exists but we have a position on exchange, try to auto-sync first if not existing_lc: current_positions = self._safe_get_positions() if current_positions is None: logger.warning("âš ī¸ Failed to fetch positions for external trade detection - skipping this fill") continue exchange_position = None for pos in current_positions: if pos.get('symbol') == full_symbol: exchange_position = pos break if exchange_position and abs(float(exchange_position.get('contracts', 0))) > 1e-9: logger.info(f"🔄 AUTO-SYNC: Position exists on exchange for {full_symbol} but no lifecycle found. Auto-syncing before processing fill.") success = await self._auto_sync_single_position(full_symbol, exchange_position, stats) if success: # Re-check for lifecycle after auto-sync existing_lc = stats.get_trade_by_symbol_and_status(full_symbol, 'position_opened') action_type = await self._determine_position_action_type( full_symbol, side_from_fill, amount_from_fill, existing_lc ) logger.info(f"🔍 External fill analysis: {full_symbol} {side_from_fill} {amount_from_fill} -> {action_type}") # Additional debug logging for position changes if existing_lc: previous_size = existing_lc.get('current_position_size', 0) current_positions = self._safe_get_positions() if current_positions is None: logger.warning("âš ī¸ Failed to fetch positions for debug logging - skipping debug info") # Set defaults to avoid reference errors current_size = previous_size else: current_size = 0 for pos in current_positions: if pos.get('symbol') == full_symbol: current_size = abs(float(pos.get('contracts', 0))) break logger.info(f"📊 Position size change: {previous_size} -> {current_size} (diff: {current_size - previous_size})") logger.info(f"đŸŽ¯ Expected change based on fill: {'+' if side_from_fill.lower() == 'buy' else '-'}{amount_from_fill}") # Check if this might be a position decrease that was misclassified if (action_type == 'external_unmatched' and existing_lc.get('position_side') == 'long' and side_from_fill.lower() == 'sell' and current_size < previous_size): logger.warning(f"âš ī¸ Potential misclassification: {full_symbol} {side_from_fill} looks like position decrease but classified as external_unmatched") # Force re-check with proper parameters action_type = 'position_decreased' logger.info(f"🔄 Corrected action_type to: {action_type}") elif (action_type == 'external_unmatched' and existing_lc.get('position_side') == 'long' and side_from_fill.lower() == 'buy' and current_size > previous_size): logger.warning(f"âš ī¸ Potential misclassification: {full_symbol} {side_from_fill} looks like position increase but classified as external_unmatched") action_type = 'position_increased' logger.info(f"🔄 Corrected action_type to: {action_type}") if action_type == 'position_opened': # Create new lifecycle for external position lifecycle_id = stats.create_trade_lifecycle( symbol=full_symbol, side=side_from_fill, entry_order_id=exchange_order_id_from_fill or f"external_{trade_id}", trade_type="external" ) if lifecycle_id: success = stats.update_trade_position_opened( lifecycle_id=lifecycle_id, entry_price=price_from_fill, entry_amount=amount_from_fill, exchange_fill_id=trade_id ) if success: logger.info(f"📈 Created and opened new external lifecycle: {lifecycle_id[:8]} for {full_symbol}") symbols_with_fills.add(token) # Send position opened notification await self._send_position_change_notification( full_symbol, side_from_fill, amount_from_fill, price_from_fill, action_type, timestamp_dt ) fill_processed_this_iteration = True elif action_type == 'position_closed' and existing_lc: # Close existing lifecycle lc_id = existing_lc['trade_lifecycle_id'] lc_entry_price = existing_lc.get('entry_price', 0) lc_position_side = existing_lc.get('position_side') realized_pnl = 0 if lc_position_side == 'long': realized_pnl = amount_from_fill * (price_from_fill - lc_entry_price) elif lc_position_side == 'short': realized_pnl = amount_from_fill * (lc_entry_price - price_from_fill) success = stats.update_trade_position_closed( lifecycle_id=lc_id, exit_price=price_from_fill, realized_pnl=realized_pnl, exchange_fill_id=trade_id ) if success: pnl_emoji = "đŸŸĸ" if realized_pnl >= 0 else "🔴" formatter = get_formatter() logger.info(f"{pnl_emoji} Lifecycle CLOSED (External): {lc_id}. PNL: {formatter.format_price_with_symbol(realized_pnl)}") symbols_with_fills.add(token) # Send position closed notification await self._send_position_change_notification( full_symbol, side_from_fill, amount_from_fill, price_from_fill, action_type, timestamp_dt, existing_lc, realized_pnl ) stats.migrate_trade_to_aggregated_stats(lc_id) fill_processed_this_iteration = True elif action_type in ['position_increased', 'position_decreased'] and existing_lc: # Update lifecycle position size and send notification current_positions = self._safe_get_positions() if current_positions is None: logger.warning("âš ī¸ Failed to fetch positions for size update - skipping position change processing") continue new_size = 0 for pos in current_positions: if pos.get('symbol') == full_symbol: new_size = abs(float(pos.get('contracts', 0))) break # Update lifecycle with new position size await self._update_lifecycle_position_size(existing_lc['trade_lifecycle_id'], new_size) # Send appropriate notification await self._send_position_change_notification( full_symbol, side_from_fill, amount_from_fill, price_from_fill, action_type, timestamp_dt, existing_lc ) symbols_with_fills.add(token) fill_processed_this_iteration = True logger.info(f"📊 Position {action_type}: {full_symbol} new size: {new_size}") # Fallback for unmatched external trades if not fill_processed_this_iteration: all_open_positions_in_db = stats.get_open_positions() db_open_symbols = {pos_db.get('symbol') for pos_db in all_open_positions_in_db} if full_symbol in db_open_symbols: logger.debug(f"Position {full_symbol} found in open positions but no active lifecycle - likely auto-sync failed or timing issue for fill {trade_id}") # Record as unmatched external trade linked_order_db_id = None if exchange_order_id_from_fill: order_in_db = stats.get_order_by_exchange_id(exchange_order_id_from_fill) if order_in_db: linked_order_db_id = order_in_db.get('id') stats.record_trade( full_symbol, side_from_fill, amount_from_fill, price_from_fill, exchange_fill_id=trade_id, trade_type="external_unmatched", timestamp=timestamp_dt.isoformat(), linked_order_table_id_to_link=linked_order_db_id ) logger.info(f"📋 Recorded trade via FALLBACK: {trade_id} (Unmatched External Fill)") # No notification sent for unmatched external trades per user preference fill_processed_this_iteration = True if fill_processed_this_iteration: external_trades_processed += 1 if self.last_processed_trade_time is None or timestamp_dt > self.last_processed_trade_time: self.last_processed_trade_time = timestamp_dt except Exception as e: logger.error(f"Error processing fill {fill.get('id', 'N/A')}: {e}", exc_info=True) continue if external_trades_processed > 0: stats._set_metadata('market_monitor_last_processed_trade_time', self.last_processed_trade_time.isoformat()) logger.info(f"💾 Saved MarketMonitor state (last_processed_trade_time) to DB: {self.last_processed_trade_time.isoformat()}") logger.info(f"📊 Processed {external_trades_processed} external trades") if symbols_with_fills: logger.info(f"â„šī¸ Symbols with processed fills this cycle: {list(symbols_with_fills)}") except Exception as e: logger.error(f"❌ Error checking external trades: {e}", exc_info=True)