123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829 |
- #!/usr/bin/env python3
- """
- Monitors external events like trades made outside the bot and price alarms.
- """
- import logging
- import asyncio
- from datetime import datetime, timedelta, timezone
- from typing import Optional, Dict, Any, List
- # Assuming AlarmManager will be moved here or imported appropriately
- # from .alarm_manager import AlarmManager
- from src.monitoring.alarm_manager import AlarmManager # Keep if AlarmManager stays in its own file as per original structure
- from src.utils.token_display_formatter import get_formatter
- logger = logging.getLogger(__name__)
- class ExternalEventMonitor:
- def __init__(self, trading_engine, notification_manager, alarm_manager, market_monitor_cache, shared_state):
- self.trading_engine = trading_engine
- self.notification_manager = notification_manager
- self.alarm_manager = alarm_manager
- self.market_monitor_cache = market_monitor_cache
- self.shared_state = shared_state # Expected to contain {'external_stop_losses': ...}
- self.last_processed_trade_time: Optional[datetime] = None
- # Add necessary initializations, potentially loading last_processed_trade_time
- def _safe_get_positions(self) -> Optional[List[Dict[str, Any]]]:
- """Safely get positions from trading engine, returning None on API failures instead of empty list."""
- try:
- return self.trading_engine.get_positions()
- except Exception as e:
- logger.warning(f"⚠️ Failed to fetch positions in external event monitor: {e}")
- return None
- async def _check_price_alarms(self):
- """Check price alarms and trigger notifications."""
- try:
- active_alarms = self.alarm_manager.get_all_active_alarms()
-
- if not active_alarms:
- return
-
- tokens_to_check = list(set(alarm['token'] for alarm in active_alarms))
-
- for token in tokens_to_check:
- try:
- symbol = f"{token}/USDC:USDC"
- market_data = self.trading_engine.get_market_data(symbol)
-
- if not market_data or not market_data.get('ticker'):
- continue
-
- current_price = float(market_data['ticker'].get('last', 0))
- if current_price <= 0:
- continue
-
- token_alarms = [alarm for alarm in active_alarms if alarm['token'] == token]
-
- for alarm in token_alarms:
- target_price = alarm['target_price']
- direction = alarm['direction']
-
- should_trigger = False
- if direction == 'above' and current_price >= target_price:
- should_trigger = True
- elif direction == 'below' and current_price <= target_price:
- should_trigger = True
-
- if should_trigger:
- triggered_alarm = self.alarm_manager.trigger_alarm(alarm['id'], current_price)
- if triggered_alarm:
- await self._send_alarm_notification(triggered_alarm)
-
- except Exception as e:
- logger.error(f"Error checking alarms for {token}: {e}")
-
- except Exception as e:
- logger.error("❌ Error checking price alarms: {e}")
-
- async def _send_alarm_notification(self, alarm: Dict[str, Any]):
- """Send notification for triggered alarm."""
- try:
- if self.notification_manager:
- await self.notification_manager.send_alarm_triggered_notification(
- alarm['token'],
- alarm['target_price'],
- alarm['triggered_price'],
- alarm['direction']
- )
- else:
- logger.info(f"🔔 ALARM TRIGGERED: {alarm['token']} @ ${alarm['triggered_price']:,.2f}")
-
- except Exception as e:
- logger.error(f"❌ Error sending alarm notification: {e}")
- async def _determine_position_action_type(self, full_symbol: str, side_from_fill: str,
- amount_from_fill: float, existing_lc: Optional[Dict] = None) -> str:
- """
- Determine the type of position action based on current state and fill details.
- Returns one of: 'position_opened', 'position_closed', 'position_increased', 'position_decreased'
- """
- try:
- # Get current position from exchange
- current_positions = self._safe_get_positions()
- if current_positions is None:
- logger.warning(f"⚠️ Failed to fetch positions for {full_symbol} analysis - returning external_unmatched")
- return 'external_unmatched'
-
- current_exchange_position = None
- for pos in current_positions:
- if pos.get('symbol') == full_symbol:
- current_exchange_position = pos
- break
-
- current_size = 0.0
- if current_exchange_position:
- current_size = abs(float(current_exchange_position.get('contracts', 0)))
-
- # If no existing lifecycle, this is a position opening
- if not existing_lc:
- logger.debug(f"🔍 Position analysis: {full_symbol} no existing lifecycle, current size: {current_size}")
- if current_size > 1e-9: # Position exists on exchange
- return 'position_opened'
- else:
- return 'external_unmatched'
-
- # Get previous position size from lifecycle
- previous_size = existing_lc.get('current_position_size', 0)
- lc_position_side = existing_lc.get('position_side')
-
- logger.debug(f"🔍 Position analysis: {full_symbol} {side_from_fill} {amount_from_fill}")
- logger.debug(f" Lifecycle side: {lc_position_side}, previous size: {previous_size}, current size: {current_size}")
-
- # Check if this is a closing trade (opposite side)
- is_closing_trade = False
- if lc_position_side == 'long' and side_from_fill.lower() == 'sell':
- is_closing_trade = True
- elif lc_position_side == 'short' and side_from_fill.lower() == 'buy':
- is_closing_trade = True
-
- logger.debug(f" Is closing trade: {is_closing_trade}")
-
- if is_closing_trade:
- if current_size < 1e-9: # Position is now closed
- logger.debug(f" → Position closed (current_size < 1e-9)")
- return 'position_closed'
- elif current_size < previous_size - 1e-9: # Position reduced but not closed
- logger.debug(f" → Position decreased (current_size {current_size} < previous_size - 1e-9 {previous_size - 1e-9})")
- return 'position_decreased'
- else:
- # Same side trade - position increase
- logger.debug(f" Same side trade check: current_size {current_size} > previous_size + 1e-9 {previous_size + 1e-9}?")
- if current_size > previous_size + 1e-9:
- logger.debug(f" → Position increased")
- return 'position_increased'
- else:
- logger.debug(f" → Size check failed, not enough increase")
-
- # Default fallback
- logger.debug(f" → Fallback to external_unmatched")
- return 'external_unmatched'
-
- except Exception as e:
- logger.error(f"Error determining position action type: {e}")
- return 'external_unmatched'
- async def _update_lifecycle_position_size(self, lifecycle_id: str, new_size: float) -> bool:
- """Update the current position size in the lifecycle."""
- try:
- stats = self.trading_engine.get_stats()
- if not stats:
- return False
-
- # Update the current position size
- success = stats.trade_manager.update_trade_market_data(
- lifecycle_id, current_position_size=new_size
- )
- return success
- except Exception as e:
- logger.error(f"Error updating lifecycle position size: {e}")
- return False
- async def _send_position_change_notification(self, full_symbol: str, side_from_fill: str,
- amount_from_fill: float, price_from_fill: float,
- action_type: str, timestamp_dt: datetime,
- existing_lc: Optional[Dict] = None,
- realized_pnl: Optional[float] = None):
- """Send position change notification."""
- try:
- if not self.notification_manager:
- return
-
- token = full_symbol.split('/')[0] if '/' in full_symbol else full_symbol.split(':')[0]
- time_str = timestamp_dt.strftime('%Y-%m-%d %H:%M:%S UTC')
- formatter = get_formatter()
-
- if action_type == 'position_closed' and existing_lc:
- position_side = existing_lc.get('position_side', 'unknown').upper()
- entry_price = existing_lc.get('entry_price', 0)
- pnl_emoji = "🟢" if realized_pnl and realized_pnl >= 0 else "🔴"
- pnl_text = f"{await formatter.format_price_with_symbol(realized_pnl)}" if realized_pnl is not None else "N/A"
-
- # Get ROE directly from exchange data
- info_data = existing_lc.get('info', {})
- position_info = info_data.get('position', {})
- roe_raw = position_info.get('returnOnEquity') # Changed from 'percentage' to 'returnOnEquity'
-
- if roe_raw is not None:
- try:
- # The exchange provides ROE as a decimal (e.g., -0.326 for -32.6%)
- # We need to multiply by 100 and keep the sign
- roe = float(roe_raw) * 100
- roe_text = f" ({roe:+.2f}%)"
- except (ValueError, TypeError):
- logger.warning(f"Could not parse ROE value: {roe_raw} for {full_symbol}")
- roe_text = ""
- else:
- logger.warning(f"No ROE data available from exchange for {full_symbol}")
- roe_text = ""
-
- message = f"""
- 🎯 <b>Position Closed (External)</b>
- 📊 <b>Trade Details:</b>
- • Token: {token}
- • Direction: {position_side}
- • Size Closed: {await formatter.format_amount(amount_from_fill, token)}
- • Entry Price: {await formatter.format_price_with_symbol(entry_price, token)}
- • Exit Price: {await formatter.format_price_with_symbol(price_from_fill, token)}
- • Exit Value: {await formatter.format_price_with_symbol(amount_from_fill * price_from_fill)}
- {pnl_emoji} <b>P&L:</b> {pnl_text}{roe_text}
- ✅ <b>Status:</b> {position_side} position closed externally
- ⏰ <b>Time:</b> {time_str}
- 📊 Use /stats to view updated performance
- """
-
- elif action_type == 'position_opened':
- position_side = 'LONG' if side_from_fill.lower() == 'buy' else 'SHORT'
- message = f"""
- 🚀 <b>Position Opened (External)</b>
- 📊 <b>Trade Details:</b>
- • Token: {token}
- • Direction: {position_side}
- • Size: {await formatter.format_amount(amount_from_fill, token)}
- • Entry Price: {await formatter.format_price_with_symbol(price_from_fill, token)}
- • Position Value: {await formatter.format_price_with_symbol(amount_from_fill * price_from_fill)}
- ✅ <b>Status:</b> New {position_side} position opened externally
- ⏰ <b>Time:</b> {time_str}
- 📱 Use /positions to view all positions
- """
-
- elif action_type == 'position_increased' and existing_lc:
- position_side = existing_lc.get('position_side', 'unknown').upper()
- previous_size = existing_lc.get('current_position_size', 0)
- # Get current size from exchange
- current_positions = self._safe_get_positions()
- if current_positions is None:
- # Skip notification if we can't get position data
- logger.warning(f"⚠️ Failed to fetch positions for notification - skipping {action_type} notification")
- return
- current_size = 0
- for pos in current_positions:
- if pos.get('symbol') == full_symbol:
- current_size = abs(float(pos.get('contracts', 0)))
- break
-
- message = f"""
- 📈 <b>Position Increased (External)</b>
- 📊 <b>Trade Details:</b>
- • Token: {token}
- • Direction: {position_side}
- • Size Added: {await formatter.format_amount(amount_from_fill, token)}
- • Add Price: {await formatter.format_price_with_symbol(price_from_fill, token)}
- • Previous Size: {await formatter.format_amount(previous_size, token)}
- • New Size: {await formatter.format_amount(current_size, token)}
- • Add Value: {await formatter.format_price_with_symbol(amount_from_fill * price_from_fill)}
- 📈 <b>Status:</b> {position_side} position size increased externally
- ⏰ <b>Time:</b> {time_str}
- 📈 Use /positions to view current position
- """
-
- elif action_type == 'position_decreased' and existing_lc:
- position_side = existing_lc.get('position_side', 'unknown').upper()
- previous_size = existing_lc.get('current_position_size', 0)
- entry_price = existing_lc.get('entry_price', 0)
-
- # Get current size from exchange
- current_positions = self._safe_get_positions()
- if current_positions is None:
- # Skip notification if we can't get position data
- logger.warning(f"⚠️ Failed to fetch positions for notification - skipping {action_type} notification")
- return
- current_size = 0
- for pos in current_positions:
- if pos.get('symbol') == full_symbol:
- current_size = abs(float(pos.get('contracts', 0)))
- break
-
- # Calculate partial PnL for the reduced amount
- partial_pnl = 0
- if entry_price > 0:
- if position_side == 'LONG':
- partial_pnl = amount_from_fill * (price_from_fill - entry_price)
- else: # SHORT
- partial_pnl = amount_from_fill * (entry_price - price_from_fill)
-
- pnl_emoji = "🟢" if partial_pnl >= 0 else "🔴"
-
- # Calculate ROE for the partial close
- roe_text = ""
- if entry_price > 0 and amount_from_fill > 0:
- cost_basis = amount_from_fill * entry_price
- roe = (partial_pnl / cost_basis) * 100
- roe_text = f" ({roe:+.2f}%)"
-
- message = f"""
- 📉 <b>Position Decreased (External)</b>
- 📊 <b>Trade Details:</b>
- • Token: {token}
- • Direction: {position_side}
- • Size Reduced: {await formatter.format_amount(amount_from_fill, token)}
- • Exit Price: {await formatter.format_price_with_symbol(price_from_fill, token)}
- • Previous Size: {await formatter.format_amount(previous_size, token)}
- • Remaining Size: {await formatter.format_amount(current_size, token)}
- • Exit Value: {await formatter.format_price_with_symbol(amount_from_fill * price_from_fill)}
- {pnl_emoji} <b>Partial P&L:</b> {await formatter.format_price_with_symbol(partial_pnl)}{roe_text}
- 📉 <b>Status:</b> {position_side} position size decreased externally
- ⏰ <b>Time:</b> {time_str}
- 📊 Position remains open. Use /positions to view details
- """
- else:
- # No fallback notification sent - only position-based notifications per user preference
- logger.debug(f"No notification sent for action_type: {action_type}")
- return
-
- await self.notification_manager.send_generic_notification(message.strip())
-
- except Exception as e:
- logger.error(f"Error sending position change notification: {e}")
-
- async def _auto_sync_single_position(self, symbol: str, exchange_position: Dict[str, Any], stats) -> bool:
- """Auto-sync a single orphaned position to create a lifecycle record."""
- try:
- import uuid
- from src.utils.token_display_formatter import get_formatter
-
- formatter = get_formatter()
- contracts_abs = abs(float(exchange_position.get('contracts', 0)))
-
- if contracts_abs <= 1e-9:
- return False
-
- entry_price_from_exchange = float(exchange_position.get('entryPrice', 0)) or float(exchange_position.get('entryPx', 0))
-
- # Determine position side
- position_side, order_side = '', ''
- ccxt_side = exchange_position.get('side', '').lower()
- if ccxt_side == 'long':
- position_side, order_side = 'long', 'buy'
- elif ccxt_side == 'short':
- position_side, order_side = 'short', 'sell'
-
- if not position_side:
- contracts_val = float(exchange_position.get('contracts', 0))
- if contracts_val > 1e-9:
- position_side, order_side = 'long', 'buy'
- elif contracts_val < -1e-9:
- position_side, order_side = 'short', 'sell'
- else:
- return False
-
- if not position_side:
- logger.error(f"AUTO-SYNC: Could not determine position side for {symbol}.")
- return False
-
- final_entry_price = entry_price_from_exchange
- if not final_entry_price or final_entry_price <= 0:
- # Fallback to a reasonable estimate (current mark price)
- mark_price = float(exchange_position.get('markPrice', 0)) or float(exchange_position.get('markPx', 0))
- if mark_price > 0:
- final_entry_price = mark_price
- else:
- logger.error(f"AUTO-SYNC: Could not determine entry price for {symbol}.")
- return False
-
- logger.info(f"🔄 AUTO-SYNC: Creating lifecycle for {symbol} {position_side.upper()} {contracts_abs} @ {await formatter.format_price_with_symbol(final_entry_price, symbol)}")
-
- unique_sync_id = str(uuid.uuid4())[:8]
- lifecycle_id = stats.create_trade_lifecycle(
- symbol=symbol,
- side=order_side,
- entry_order_id=f"external_sync_{unique_sync_id}",
- trade_type='external_sync'
- )
-
- if lifecycle_id:
- success = await stats.update_trade_position_opened(
- lifecycle_id,
- final_entry_price,
- contracts_abs,
- f"external_fill_sync_{unique_sync_id}"
- )
-
- if success:
- logger.info(f"✅ AUTO-SYNC: Successfully synced position for {symbol} (Lifecycle: {lifecycle_id[:8]})")
-
- # Send position opened notification for auto-synced position
- try:
- await self._send_position_change_notification(
- symbol, order_side, contracts_abs, final_entry_price,
- 'position_opened', datetime.now(timezone.utc)
- )
- logger.info(f"📨 AUTO-SYNC: Sent position opened notification for {symbol}")
- except Exception as e:
- logger.error(f"❌ AUTO-SYNC: Failed to send notification for {symbol}: {e}")
-
- return True
- else:
- logger.error(f"❌ AUTO-SYNC: Failed to update lifecycle to 'position_opened' for {symbol}")
- else:
- logger.error(f"❌ AUTO-SYNC: Failed to create lifecycle for {symbol}")
-
- return False
-
- except Exception as e:
- logger.error(f"❌ AUTO-SYNC: Error syncing position for {symbol}: {e}")
- return False
-
- async def _check_external_trades(self):
- """Check for trades made outside the Telegram bot and update stats."""
- try:
- stats = self.trading_engine.get_stats()
- if not stats:
- logger.warning("TradingStats not available in _check_external_trades. Skipping.")
- return
- external_trades_processed = 0
- symbols_with_fills = set()
- recent_fills = self.trading_engine.get_recent_fills()
- if not recent_fills:
- logger.debug("No recent fills data available")
- return
- if not hasattr(self, 'last_processed_trade_time') or self.last_processed_trade_time is None:
- try:
- # Ensure this metadata key is the one used by MarketMonitor for saving this state.
- last_time_str = stats._get_metadata('market_monitor_last_processed_trade_time')
- if last_time_str:
- self.last_processed_trade_time = datetime.fromisoformat(last_time_str).replace(tzinfo=timezone.utc)
- else:
- self.last_processed_trade_time = datetime.now(timezone.utc) - timedelta(hours=1)
- except Exception:
- self.last_processed_trade_time = datetime.now(timezone.utc) - timedelta(hours=1)
- for fill in recent_fills:
- try:
- trade_id = fill.get('id')
- timestamp_ms = fill.get('timestamp')
- symbol_from_fill = fill.get('symbol')
- side_from_fill = fill.get('side')
- amount_from_fill = float(fill.get('amount', 0))
- price_from_fill = float(fill.get('price', 0))
-
- timestamp_dt = datetime.fromtimestamp(timestamp_ms / 1000, tz=timezone.utc) if timestamp_ms else datetime.now(timezone.utc)
-
- if self.last_processed_trade_time and timestamp_dt <= self.last_processed_trade_time:
- continue
-
- # Check if this fill has already been processed to prevent duplicates
- if trade_id and stats.has_exchange_fill_been_processed(str(trade_id)):
- logger.debug(f"Skipping already processed fill: {trade_id}")
- continue
-
- fill_processed_this_iteration = False
-
- if not (symbol_from_fill and side_from_fill and amount_from_fill > 0 and price_from_fill > 0):
- logger.warning(f"Skipping fill with incomplete data: {fill}")
- continue
- full_symbol = symbol_from_fill
- token = symbol_from_fill.split('/')[0] if '/' in symbol_from_fill else symbol_from_fill.split(':')[0]
- exchange_order_id_from_fill = fill.get('info', {}).get('oid')
- # First check if this is a pending entry order fill
- if exchange_order_id_from_fill:
- pending_lc = stats.get_lifecycle_by_entry_order_id(exchange_order_id_from_fill, status='pending')
- if pending_lc and pending_lc.get('symbol') == full_symbol:
- success = await stats.update_trade_position_opened(
- lifecycle_id=pending_lc['trade_lifecycle_id'],
- entry_price=price_from_fill,
- entry_amount=amount_from_fill,
- exchange_fill_id=trade_id
- )
- if success:
- logger.info(f"📈 Lifecycle ENTRY: {pending_lc['trade_lifecycle_id']} for {full_symbol} updated by fill {trade_id}.")
- symbols_with_fills.add(token)
- order_in_db_for_entry = stats.get_order_by_exchange_id(exchange_order_id_from_fill)
- if order_in_db_for_entry:
- stats.update_order_status(order_db_id=order_in_db_for_entry['id'], new_status='filled', amount_filled_increment=amount_from_fill)
-
- # Send position opened notification (this is a bot-initiated position)
- await self._send_position_change_notification(
- full_symbol, side_from_fill, amount_from_fill, price_from_fill,
- 'position_opened', timestamp_dt
- )
- fill_processed_this_iteration = True
- # Check if this is a known bot order (SL/TP/exit)
- if not fill_processed_this_iteration and exchange_order_id_from_fill:
- active_lc = None
- closure_reason_action_type = None
- bot_order_db_id_to_update = None
- bot_order_for_fill = stats.get_order_by_exchange_id(exchange_order_id_from_fill)
- if bot_order_for_fill and bot_order_for_fill.get('symbol') == full_symbol:
- order_type = bot_order_for_fill.get('type')
- order_side = bot_order_for_fill.get('side')
- if order_type == 'market':
- potential_lc = stats.get_trade_by_symbol_and_status(full_symbol, 'position_opened')
- if potential_lc:
- lc_pos_side = potential_lc.get('position_side')
- if (lc_pos_side == 'long' and order_side == 'sell' and side_from_fill == 'sell') or \
- (lc_pos_side == 'short' and order_side == 'buy' and side_from_fill == 'buy'):
- active_lc = potential_lc
- closure_reason_action_type = f"bot_exit_{lc_pos_side}_close"
- bot_order_db_id_to_update = bot_order_for_fill.get('id')
- logger.info(f"ℹ️ Lifecycle BOT EXIT: Fill {trade_id} (OID {exchange_order_id_from_fill}) for {full_symbol} matches bot exit for lifecycle {active_lc['trade_lifecycle_id']}.")
-
- if not active_lc:
- lc_by_sl = stats.get_lifecycle_by_sl_order_id(exchange_order_id_from_fill, status='position_opened')
- if lc_by_sl and lc_by_sl.get('symbol') == full_symbol:
- active_lc = lc_by_sl
- closure_reason_action_type = f"sl_{active_lc.get('position_side')}_close"
- bot_order_db_id_to_update = bot_order_for_fill.get('id')
- logger.info(f"ℹ️ Lifecycle SL: Fill {trade_id} for OID {exchange_order_id_from_fill} matches SL for lifecycle {active_lc['trade_lifecycle_id']}.")
-
- if not active_lc:
- lc_by_tp = stats.get_lifecycle_by_tp_order_id(exchange_order_id_from_fill, status='position_opened')
- if lc_by_tp and lc_by_tp.get('symbol') == full_symbol:
- active_lc = lc_by_tp
- closure_reason_action_type = f"tp_{active_lc.get('position_side')}_close"
- bot_order_db_id_to_update = bot_order_for_fill.get('id')
- logger.info(f"ℹ️ Lifecycle TP: Fill {trade_id} for OID {exchange_order_id_from_fill} matches TP for lifecycle {active_lc['trade_lifecycle_id']}.")
- # Process known bot order fills
- if active_lc and closure_reason_action_type:
- lc_id = active_lc['trade_lifecycle_id']
- lc_entry_price = active_lc.get('entry_price', 0)
- lc_position_side = active_lc.get('position_side')
- realized_pnl = 0
- if lc_position_side == 'long':
- realized_pnl = amount_from_fill * (price_from_fill - lc_entry_price)
- elif lc_position_side == 'short':
- realized_pnl = amount_from_fill * (lc_entry_price - price_from_fill)
-
- success = stats.update_trade_position_closed(
- lifecycle_id=lc_id, exit_price=price_from_fill,
- realized_pnl=realized_pnl, exchange_fill_id=trade_id
- )
- if success:
- pnl_emoji = "🟢" if realized_pnl >= 0 else "🔴"
- formatter = get_formatter()
- logger.info(f"{pnl_emoji} Lifecycle CLOSED: {lc_id} ({closure_reason_action_type}). PNL for fill: {await formatter.format_price_with_symbol(realized_pnl)}")
- symbols_with_fills.add(token)
-
- # Send position closed notification
- await self._send_position_change_notification(
- full_symbol, side_from_fill, amount_from_fill, price_from_fill,
- 'position_closed', timestamp_dt, active_lc, realized_pnl
- )
-
- stats.migrate_trade_to_aggregated_stats(lc_id)
- if bot_order_db_id_to_update:
- stats.update_order_status(order_db_id=bot_order_db_id_to_update, new_status='filled', amount_filled_increment=amount_from_fill)
- fill_processed_this_iteration = True
- # Check for external stop losses
- if not fill_processed_this_iteration:
- if (exchange_order_id_from_fill and
- self.shared_state.get('external_stop_losses') and
- exchange_order_id_from_fill in self.shared_state['external_stop_losses']):
- stop_loss_info = self.shared_state['external_stop_losses'][exchange_order_id_from_fill]
- formatter = get_formatter()
- logger.info(f"🛑 External SL (MM Tracking): {token} Order {exchange_order_id_from_fill} filled @ {await formatter.format_price_with_symbol(price_from_fill, token)}")
-
- sl_active_lc = stats.get_trade_by_symbol_and_status(full_symbol, 'position_opened')
- if sl_active_lc:
- lc_id = sl_active_lc['trade_lifecycle_id']
- lc_entry_price = sl_active_lc.get('entry_price', 0)
- lc_pos_side = sl_active_lc.get('position_side')
- realized_pnl = amount_from_fill * (price_from_fill - lc_entry_price) if lc_pos_side == 'long' else amount_from_fill * (lc_entry_price - price_from_fill)
-
- success = stats.update_trade_position_closed(lc_id, price_from_fill, realized_pnl, trade_id)
- if success:
- pnl_emoji = "🟢" if realized_pnl >= 0 else "🔴"
- logger.info(f"{pnl_emoji} Lifecycle CLOSED by External SL (MM): {lc_id}. PNL: {await formatter.format_price_with_symbol(realized_pnl)}")
- if self.notification_manager:
- await self.notification_manager.send_stop_loss_execution_notification(
- stop_loss_info, full_symbol, side_from_fill, amount_from_fill, price_from_fill,
- f'{lc_pos_side}_closed_external_sl', timestamp_dt.isoformat(), realized_pnl
- )
- stats.migrate_trade_to_aggregated_stats(lc_id)
- # Modify shared state carefully
- if exchange_order_id_from_fill in self.shared_state['external_stop_losses']:
- del self.shared_state['external_stop_losses'][exchange_order_id_from_fill]
- fill_processed_this_iteration = True
- else:
- logger.warning(f"⚠️ External SL (MM) {exchange_order_id_from_fill} for {full_symbol}, but no active lifecycle found.")
- # NEW: Enhanced external trade processing with position state detection
- if not fill_processed_this_iteration:
- existing_lc = stats.get_trade_by_symbol_and_status(full_symbol, 'position_opened')
-
- # If no lifecycle exists but we have a position on exchange, try to auto-sync first
- if not existing_lc:
- current_positions = self._safe_get_positions()
- if current_positions is None:
- logger.warning("⚠️ Failed to fetch positions for external trade detection - skipping this fill")
- continue
- exchange_position = None
- for pos in current_positions:
- if pos.get('symbol') == full_symbol:
- exchange_position = pos
- break
-
- if exchange_position and abs(float(exchange_position.get('contracts', 0))) > 1e-9:
- logger.info(f"🔄 AUTO-SYNC: Position exists on exchange for {full_symbol} but no lifecycle found. Auto-syncing before processing fill.")
- success = await self._auto_sync_single_position(full_symbol, exchange_position, stats)
- if success:
- # Re-check for lifecycle after auto-sync
- existing_lc = stats.get_trade_by_symbol_and_status(full_symbol, 'position_opened')
-
- action_type = await self._determine_position_action_type(
- full_symbol, side_from_fill, amount_from_fill, existing_lc
- )
-
- logger.info(f"🔍 External fill analysis: {full_symbol} {side_from_fill} {amount_from_fill} -> {action_type}")
-
- # Additional debug logging for position changes
- if existing_lc:
- previous_size = existing_lc.get('current_position_size', 0)
- current_positions = self._safe_get_positions()
- if current_positions is None:
- logger.warning("⚠️ Failed to fetch positions for debug logging - skipping debug info")
- # Set defaults to avoid reference errors
- current_size = previous_size
- else:
- current_size = 0
- for pos in current_positions:
- if pos.get('symbol') == full_symbol:
- current_size = abs(float(pos.get('contracts', 0)))
- break
- logger.info(f"📊 Position size change: {previous_size} -> {current_size} (diff: {current_size - previous_size})")
- logger.info(f"🎯 Expected change based on fill: {'+' if side_from_fill.lower() == 'buy' else '-'}{amount_from_fill}")
-
- # If lifecycle is already closed, we should not re-process as a new closure.
- if existing_lc.get('status') == 'position_closed':
- logger.info(f"ℹ️ Fill {trade_id} received for already closed lifecycle {existing_lc['trade_lifecycle_id']}. Recording fill and skipping further action.")
- stats.record_trade(
- full_symbol, side_from_fill, amount_from_fill, price_from_fill,
- exchange_fill_id=trade_id, trade_type="external_fill_for_closed_pos",
- pnl=None, timestamp=timestamp_dt.isoformat(),
- linked_order_table_id_to_link=None
- )
- fill_processed_this_iteration = True
- continue # Skip to next fill
- # Check if this might be a position decrease that was misclassified
- if (action_type == 'external_unmatched' and
- existing_lc.get('position_side') == 'long' and
- side_from_fill.lower() == 'sell' and
- current_size < previous_size):
- logger.warning(f"⚠️ Potential misclassification: {full_symbol} {side_from_fill} looks like position decrease but classified as external_unmatched")
- # Force re-check with proper parameters
- action_type = 'position_decreased'
- logger.info(f"🔄 Corrected action_type to: {action_type}")
- elif (action_type == 'external_unmatched' and
- existing_lc.get('position_side') == 'long' and
- side_from_fill.lower() == 'buy' and
- current_size > previous_size):
- logger.warning(f"⚠️ Potential misclassification: {full_symbol} {side_from_fill} looks like position increase but classified as external_unmatched")
- action_type = 'position_increased'
- logger.info(f"🔄 Corrected action_type to: {action_type}")
-
- if action_type == 'position_opened':
- # Create new lifecycle for external position
- lifecycle_id = stats.create_trade_lifecycle(
- symbol=full_symbol,
- side=side_from_fill,
- entry_order_id=exchange_order_id_from_fill or f"external_{trade_id}",
- trade_type="external"
- )
-
- if lifecycle_id:
- success = stats.update_trade_position_opened(
- lifecycle_id=lifecycle_id,
- entry_price=price_from_fill,
- entry_amount=amount_from_fill,
- exchange_fill_id=trade_id
- )
-
- if success:
- logger.info(f"📈 Created and opened new external lifecycle: {lifecycle_id[:8]} for {full_symbol}")
- symbols_with_fills.add(token)
-
- # Send position opened notification
- await self._send_position_change_notification(
- full_symbol, side_from_fill, amount_from_fill, price_from_fill,
- action_type, timestamp_dt
- )
- fill_processed_this_iteration = True
-
- elif action_type == 'position_closed' and existing_lc:
- # Close existing lifecycle
- lc_id = existing_lc['trade_lifecycle_id']
- lc_entry_price = existing_lc.get('entry_price', 0)
- lc_position_side = existing_lc.get('position_side')
- realized_pnl = 0
- if lc_position_side == 'long':
- realized_pnl = amount_from_fill * (price_from_fill - lc_entry_price)
- elif lc_position_side == 'short':
- realized_pnl = amount_from_fill * (lc_entry_price - price_from_fill)
-
- success = stats.update_trade_position_closed(
- lifecycle_id=lc_id,
- exit_price=price_from_fill,
- realized_pnl=realized_pnl,
- exchange_fill_id=trade_id
- )
- if success:
- pnl_emoji = "🟢" if realized_pnl >= 0 else "🔴"
- formatter = get_formatter()
- logger.info(f"{pnl_emoji} Lifecycle CLOSED (External): {lc_id}. PNL: {await formatter.format_price_with_symbol(realized_pnl)}")
- symbols_with_fills.add(token)
-
- # Send position closed notification
- await self._send_position_change_notification(
- full_symbol, side_from_fill, amount_from_fill, price_from_fill,
- action_type, timestamp_dt, existing_lc, realized_pnl
- )
-
- stats.migrate_trade_to_aggregated_stats(lc_id)
- fill_processed_this_iteration = True
-
- elif action_type in ['position_increased', 'position_decreased'] and existing_lc:
- # Update lifecycle position size and send notification
- current_positions = self._safe_get_positions()
- if current_positions is None:
- logger.warning("⚠️ Failed to fetch positions for size update - skipping position change processing")
- continue
- new_size = 0
- for pos in current_positions:
- if pos.get('symbol') == full_symbol:
- new_size = abs(float(pos.get('contracts', 0)))
- break
-
- # Update lifecycle with new position size
- await self._update_lifecycle_position_size(existing_lc['trade_lifecycle_id'], new_size)
-
- # Send appropriate notification
- await self._send_position_change_notification(
- full_symbol, side_from_fill, amount_from_fill, price_from_fill,
- action_type, timestamp_dt, existing_lc
- )
-
- symbols_with_fills.add(token)
- fill_processed_this_iteration = True
- logger.info(f"📊 Position {action_type}: {full_symbol} new size: {new_size}")
- # Fallback for unmatched external trades
- if not fill_processed_this_iteration:
- all_open_positions_in_db = stats.get_open_positions()
- db_open_symbols = {pos_db.get('symbol') for pos_db in all_open_positions_in_db}
- if full_symbol in db_open_symbols:
- logger.debug(f"Position {full_symbol} found in open positions but no active lifecycle - likely auto-sync failed or timing issue for fill {trade_id}")
-
- # Record as unmatched external trade
- linked_order_db_id = None
- if exchange_order_id_from_fill:
- order_in_db = stats.get_order_by_exchange_id(exchange_order_id_from_fill)
- if order_in_db:
- linked_order_db_id = order_in_db.get('id')
-
- stats.record_trade(
- full_symbol, side_from_fill, amount_from_fill, price_from_fill,
- exchange_fill_id=trade_id, trade_type="external_unmatched",
- timestamp=timestamp_dt.isoformat(),
- linked_order_table_id_to_link=linked_order_db_id
- )
- logger.info(f"📋 Recorded trade via FALLBACK: {trade_id} (Unmatched External Fill)")
-
- # No notification sent for unmatched external trades per user preference
- fill_processed_this_iteration = True
- if fill_processed_this_iteration:
- external_trades_processed += 1
- if self.last_processed_trade_time is None or timestamp_dt > self.last_processed_trade_time:
- self.last_processed_trade_time = timestamp_dt
-
- except Exception as e:
- logger.error(f"Error processing fill {fill.get('id', 'N/A')}: {e}", exc_info=True)
- continue
-
- if external_trades_processed > 0:
- stats._set_metadata('market_monitor_last_processed_trade_time', self.last_processed_trade_time.isoformat())
- logger.info(f"💾 Saved MarketMonitor state (last_processed_trade_time) to DB: {self.last_processed_trade_time.isoformat()}")
- logger.info(f"📊 Processed {external_trades_processed} external trades")
- if symbols_with_fills:
- logger.info(f"ℹ️ Symbols with processed fills this cycle: {list(symbols_with_fills)}")
-
- except Exception as e:
- logger.error(f"❌ Error checking external trades: {e}", exc_info=True)
|