#!/usr/bin/env python3
"""
Monitors external events like trades made outside the bot and price alarms.
"""
import logging
import asyncio
from datetime import datetime, timedelta, timezone
from typing import Optional, Dict, Any, List
# Assuming AlarmManager will be moved here or imported appropriately
# from .alarm_manager import AlarmManager
from src.monitoring.alarm_manager import AlarmManager # Keep if AlarmManager stays in its own file as per original structure
from src.utils.token_display_formatter import get_formatter
logger = logging.getLogger(__name__)
class ExternalEventMonitor:
def __init__(self, trading_engine, notification_manager, alarm_manager, market_monitor_cache, shared_state):
self.trading_engine = trading_engine
self.notification_manager = notification_manager
self.alarm_manager = alarm_manager
self.market_monitor_cache = market_monitor_cache
self.shared_state = shared_state # Expected to contain {'external_stop_losses': ...}
self.last_processed_trade_time: Optional[datetime] = None
# Add necessary initializations, potentially loading last_processed_trade_time
def _safe_get_positions(self) -> Optional[List[Dict[str, Any]]]:
"""Safely get positions from trading engine, returning None on API failures instead of empty list."""
try:
return self.trading_engine.get_positions()
except Exception as e:
logger.warning(f"â ī¸ Failed to fetch positions in external event monitor: {e}")
return None
async def _check_price_alarms(self):
"""Check price alarms and trigger notifications."""
try:
active_alarms = self.alarm_manager.get_all_active_alarms()
if not active_alarms:
return
tokens_to_check = list(set(alarm['token'] for alarm in active_alarms))
for token in tokens_to_check:
try:
symbol = f"{token}/USDC:USDC"
market_data = self.trading_engine.get_market_data(symbol)
if not market_data or not market_data.get('ticker'):
continue
current_price = float(market_data['ticker'].get('last', 0))
if current_price <= 0:
continue
token_alarms = [alarm for alarm in active_alarms if alarm['token'] == token]
for alarm in token_alarms:
target_price = alarm['target_price']
direction = alarm['direction']
should_trigger = False
if direction == 'above' and current_price >= target_price:
should_trigger = True
elif direction == 'below' and current_price <= target_price:
should_trigger = True
if should_trigger:
triggered_alarm = self.alarm_manager.trigger_alarm(alarm['id'], current_price)
if triggered_alarm:
await self._send_alarm_notification(triggered_alarm)
except Exception as e:
logger.error(f"Error checking alarms for {token}: {e}")
except Exception as e:
logger.error("â Error checking price alarms: {e}")
async def _send_alarm_notification(self, alarm: Dict[str, Any]):
"""Send notification for triggered alarm."""
try:
if self.notification_manager:
await self.notification_manager.send_alarm_triggered_notification(
alarm['token'],
alarm['target_price'],
alarm['triggered_price'],
alarm['direction']
)
else:
logger.info(f"đ ALARM TRIGGERED: {alarm['token']} @ ${alarm['triggered_price']:,.2f}")
except Exception as e:
logger.error(f"â Error sending alarm notification: {e}")
async def _determine_position_action_type(self, full_symbol: str, side_from_fill: str,
amount_from_fill: float, existing_lc: Optional[Dict] = None) -> str:
"""
Determine the type of position action based on current state and fill details.
Returns one of: 'position_opened', 'position_closed', 'position_increased', 'position_decreased'
"""
try:
# Get current position from exchange
current_positions = self._safe_get_positions()
if current_positions is None:
logger.warning(f"â ī¸ Failed to fetch positions for {full_symbol} analysis - returning external_unmatched")
return 'external_unmatched'
current_exchange_position = None
for pos in current_positions:
if pos.get('symbol') == full_symbol:
current_exchange_position = pos
break
current_size = 0.0
if current_exchange_position:
current_size = abs(float(current_exchange_position.get('contracts', 0)))
# If no existing lifecycle, this is a position opening
if not existing_lc:
logger.debug(f"đ Position analysis: {full_symbol} no existing lifecycle, current size: {current_size}")
if current_size > 1e-9: # Position exists on exchange
return 'position_opened'
else:
return 'external_unmatched'
# Get previous position size from lifecycle
previous_size = existing_lc.get('current_position_size', 0)
lc_position_side = existing_lc.get('position_side')
logger.debug(f"đ Position analysis: {full_symbol} {side_from_fill} {amount_from_fill}")
logger.debug(f" Lifecycle side: {lc_position_side}, previous size: {previous_size}, current size: {current_size}")
# Check if this is a closing trade (opposite side)
is_closing_trade = False
if lc_position_side == 'long' and side_from_fill.lower() == 'sell':
is_closing_trade = True
elif lc_position_side == 'short' and side_from_fill.lower() == 'buy':
is_closing_trade = True
logger.debug(f" Is closing trade: {is_closing_trade}")
if is_closing_trade:
if current_size < 1e-9: # Position is now closed
logger.debug(f" â Position closed (current_size < 1e-9)")
return 'position_closed'
elif current_size < previous_size - 1e-9: # Position reduced but not closed
logger.debug(f" â Position decreased (current_size {current_size} < previous_size - 1e-9 {previous_size - 1e-9})")
return 'position_decreased'
else:
# Same side trade - position increase
logger.debug(f" Same side trade check: current_size {current_size} > previous_size + 1e-9 {previous_size + 1e-9}?")
if current_size > previous_size + 1e-9:
logger.debug(f" â Position increased")
return 'position_increased'
else:
logger.debug(f" â Size check failed, not enough increase")
# Default fallback
logger.debug(f" â Fallback to external_unmatched")
return 'external_unmatched'
except Exception as e:
logger.error(f"Error determining position action type: {e}")
return 'external_unmatched'
async def _update_lifecycle_position_size(self, lifecycle_id: str, new_size: float) -> bool:
"""Update the current position size in the lifecycle."""
try:
stats = self.trading_engine.get_stats()
if not stats:
return False
# Update the current position size
success = stats.trade_manager.update_trade_market_data(
lifecycle_id, current_position_size=new_size
)
return success
except Exception as e:
logger.error(f"Error updating lifecycle position size: {e}")
return False
async def _send_position_change_notification(self, full_symbol: str, side_from_fill: str,
amount_from_fill: float, price_from_fill: float,
action_type: str, timestamp_dt: datetime,
existing_lc: Optional[Dict] = None,
realized_pnl: Optional[float] = None):
"""Send position change notification."""
try:
if not self.notification_manager:
return
token = full_symbol.split('/')[0] if '/' in full_symbol else full_symbol.split(':')[0]
time_str = timestamp_dt.strftime('%Y-%m-%d %H:%M:%S UTC')
formatter = get_formatter()
if action_type == 'position_closed' and existing_lc:
position_side = existing_lc.get('position_side', 'unknown').upper()
entry_price = existing_lc.get('entry_price', 0)
pnl_emoji = "đĸ" if realized_pnl and realized_pnl >= 0 else "đ´"
pnl_text = f"{await formatter.format_price_with_symbol(realized_pnl)}" if realized_pnl is not None else "N/A"
# Get ROE directly from exchange data
info_data = existing_lc.get('info', {})
position_info = info_data.get('position', {})
roe_raw = position_info.get('returnOnEquity') # Changed from 'percentage' to 'returnOnEquity'
if roe_raw is not None:
try:
# The exchange provides ROE as a decimal (e.g., -0.326 for -32.6%)
# We need to multiply by 100 and keep the sign
roe = float(roe_raw) * 100
roe_text = f" ({roe:+.2f}%)"
except (ValueError, TypeError):
logger.warning(f"Could not parse ROE value: {roe_raw} for {full_symbol}")
roe_text = ""
else:
logger.warning(f"No ROE data available from exchange for {full_symbol}")
roe_text = ""
message = f"""
đ¯ Position Closed (External)
đ Trade Details:
âĸ Token: {token}
âĸ Direction: {position_side}
âĸ Size Closed: {await formatter.format_amount(amount_from_fill, token)}
âĸ Entry Price: {await formatter.format_price_with_symbol(entry_price, token)}
âĸ Exit Price: {await formatter.format_price_with_symbol(price_from_fill, token)}
âĸ Exit Value: {await formatter.format_price_with_symbol(amount_from_fill * price_from_fill)}
{pnl_emoji} P&L: {pnl_text}{roe_text}
â
Status: {position_side} position closed externally
â° Time: {time_str}
đ Use /stats to view updated performance
"""
elif action_type == 'position_opened':
position_side = 'LONG' if side_from_fill.lower() == 'buy' else 'SHORT'
message = f"""
đ Position Opened (External)
đ Trade Details:
âĸ Token: {token}
âĸ Direction: {position_side}
âĸ Size: {await formatter.format_amount(amount_from_fill, token)}
âĸ Entry Price: {await formatter.format_price_with_symbol(price_from_fill, token)}
âĸ Position Value: {await formatter.format_price_with_symbol(amount_from_fill * price_from_fill)}
â
Status: New {position_side} position opened externally
â° Time: {time_str}
đą Use /positions to view all positions
"""
elif action_type == 'position_increased' and existing_lc:
position_side = existing_lc.get('position_side', 'unknown').upper()
previous_size = existing_lc.get('current_position_size', 0)
# Get current size from exchange
current_positions = self._safe_get_positions()
if current_positions is None:
# Skip notification if we can't get position data
logger.warning(f"â ī¸ Failed to fetch positions for notification - skipping {action_type} notification")
return
current_size = 0
for pos in current_positions:
if pos.get('symbol') == full_symbol:
current_size = abs(float(pos.get('contracts', 0)))
break
message = f"""
đ Position Increased (External)
đ Trade Details:
âĸ Token: {token}
âĸ Direction: {position_side}
âĸ Size Added: {await formatter.format_amount(amount_from_fill, token)}
âĸ Add Price: {await formatter.format_price_with_symbol(price_from_fill, token)}
âĸ Previous Size: {await formatter.format_amount(previous_size, token)}
âĸ New Size: {await formatter.format_amount(current_size, token)}
âĸ Add Value: {await formatter.format_price_with_symbol(amount_from_fill * price_from_fill)}
đ Status: {position_side} position size increased externally
â° Time: {time_str}
đ Use /positions to view current position
"""
elif action_type == 'position_decreased' and existing_lc:
position_side = existing_lc.get('position_side', 'unknown').upper()
previous_size = existing_lc.get('current_position_size', 0)
entry_price = existing_lc.get('entry_price', 0)
# Get current size from exchange
current_positions = self._safe_get_positions()
if current_positions is None:
# Skip notification if we can't get position data
logger.warning(f"â ī¸ Failed to fetch positions for notification - skipping {action_type} notification")
return
current_size = 0
for pos in current_positions:
if pos.get('symbol') == full_symbol:
current_size = abs(float(pos.get('contracts', 0)))
break
# Calculate partial PnL for the reduced amount
partial_pnl = 0
if entry_price > 0:
if position_side == 'LONG':
partial_pnl = amount_from_fill * (price_from_fill - entry_price)
else: # SHORT
partial_pnl = amount_from_fill * (entry_price - price_from_fill)
pnl_emoji = "đĸ" if partial_pnl >= 0 else "đ´"
# Calculate ROE for the partial close
roe_text = ""
if entry_price > 0 and amount_from_fill > 0:
cost_basis = amount_from_fill * entry_price
roe = (partial_pnl / cost_basis) * 100
roe_text = f" ({roe:+.2f}%)"
message = f"""
đ Position Decreased (External)
đ Trade Details:
âĸ Token: {token}
âĸ Direction: {position_side}
âĸ Size Reduced: {await formatter.format_amount(amount_from_fill, token)}
âĸ Exit Price: {await formatter.format_price_with_symbol(price_from_fill, token)}
âĸ Previous Size: {await formatter.format_amount(previous_size, token)}
âĸ Remaining Size: {await formatter.format_amount(current_size, token)}
âĸ Exit Value: {await formatter.format_price_with_symbol(amount_from_fill * price_from_fill)}
{pnl_emoji} Partial P&L: {await formatter.format_price_with_symbol(partial_pnl)}{roe_text}
đ Status: {position_side} position size decreased externally
â° Time: {time_str}
đ Position remains open. Use /positions to view details
"""
else:
# No fallback notification sent - only position-based notifications per user preference
logger.debug(f"No notification sent for action_type: {action_type}")
return
await self.notification_manager.send_generic_notification(message.strip())
except Exception as e:
logger.error(f"Error sending position change notification: {e}")
async def _auto_sync_single_position(self, symbol: str, exchange_position: Dict[str, Any], stats) -> bool:
"""Auto-sync a single orphaned position to create a lifecycle record."""
try:
import uuid
from src.utils.token_display_formatter import get_formatter
formatter = get_formatter()
contracts_abs = abs(float(exchange_position.get('contracts', 0)))
if contracts_abs <= 1e-9:
return False
entry_price_from_exchange = float(exchange_position.get('entryPrice', 0)) or float(exchange_position.get('entryPx', 0))
# Determine position side
position_side, order_side = '', ''
ccxt_side = exchange_position.get('side', '').lower()
if ccxt_side == 'long':
position_side, order_side = 'long', 'buy'
elif ccxt_side == 'short':
position_side, order_side = 'short', 'sell'
if not position_side:
contracts_val = float(exchange_position.get('contracts', 0))
if contracts_val > 1e-9:
position_side, order_side = 'long', 'buy'
elif contracts_val < -1e-9:
position_side, order_side = 'short', 'sell'
else:
return False
if not position_side:
logger.error(f"AUTO-SYNC: Could not determine position side for {symbol}.")
return False
final_entry_price = entry_price_from_exchange
if not final_entry_price or final_entry_price <= 0:
# Fallback to a reasonable estimate (current mark price)
mark_price = float(exchange_position.get('markPrice', 0)) or float(exchange_position.get('markPx', 0))
if mark_price > 0:
final_entry_price = mark_price
else:
logger.error(f"AUTO-SYNC: Could not determine entry price for {symbol}.")
return False
logger.info(f"đ AUTO-SYNC: Creating lifecycle for {symbol} {position_side.upper()} {contracts_abs} @ {await formatter.format_price_with_symbol(final_entry_price, symbol)}")
unique_sync_id = str(uuid.uuid4())[:8]
lifecycle_id = stats.create_trade_lifecycle(
symbol=symbol,
side=order_side,
entry_order_id=f"external_sync_{unique_sync_id}",
trade_type='external_sync'
)
if lifecycle_id:
success = await stats.update_trade_position_opened(
lifecycle_id,
final_entry_price,
contracts_abs,
f"external_fill_sync_{unique_sync_id}"
)
if success:
logger.info(f"â
AUTO-SYNC: Successfully synced position for {symbol} (Lifecycle: {lifecycle_id[:8]})")
# Send position opened notification for auto-synced position
try:
await self._send_position_change_notification(
symbol, order_side, contracts_abs, final_entry_price,
'position_opened', datetime.now(timezone.utc)
)
logger.info(f"đ¨ AUTO-SYNC: Sent position opened notification for {symbol}")
except Exception as e:
logger.error(f"â AUTO-SYNC: Failed to send notification for {symbol}: {e}")
return True
else:
logger.error(f"â AUTO-SYNC: Failed to update lifecycle to 'position_opened' for {symbol}")
else:
logger.error(f"â AUTO-SYNC: Failed to create lifecycle for {symbol}")
return False
except Exception as e:
logger.error(f"â AUTO-SYNC: Error syncing position for {symbol}: {e}")
return False
async def _check_external_trades(self):
"""Check for trades made outside the Telegram bot and update stats."""
try:
stats = self.trading_engine.get_stats()
if not stats:
logger.warning("TradingStats not available in _check_external_trades. Skipping.")
return
external_trades_processed = 0
symbols_with_fills = set()
recent_fills = self.trading_engine.get_recent_fills()
if not recent_fills:
logger.debug("No recent fills data available")
return
if not hasattr(self, 'last_processed_trade_time') or self.last_processed_trade_time is None:
try:
# Ensure this metadata key is the one used by MarketMonitor for saving this state.
last_time_str = stats._get_metadata('market_monitor_last_processed_trade_time')
if last_time_str:
self.last_processed_trade_time = datetime.fromisoformat(last_time_str).replace(tzinfo=timezone.utc)
else:
self.last_processed_trade_time = datetime.now(timezone.utc) - timedelta(hours=1)
except Exception:
self.last_processed_trade_time = datetime.now(timezone.utc) - timedelta(hours=1)
for fill in recent_fills:
try:
trade_id = fill.get('id')
timestamp_ms = fill.get('timestamp')
symbol_from_fill = fill.get('symbol')
side_from_fill = fill.get('side')
amount_from_fill = float(fill.get('amount', 0))
price_from_fill = float(fill.get('price', 0))
timestamp_dt = datetime.fromtimestamp(timestamp_ms / 1000, tz=timezone.utc) if timestamp_ms else datetime.now(timezone.utc)
if self.last_processed_trade_time and timestamp_dt <= self.last_processed_trade_time:
continue
# Check if this fill has already been processed to prevent duplicates
if trade_id and stats.has_exchange_fill_been_processed(str(trade_id)):
logger.debug(f"Skipping already processed fill: {trade_id}")
continue
fill_processed_this_iteration = False
if not (symbol_from_fill and side_from_fill and amount_from_fill > 0 and price_from_fill > 0):
logger.warning(f"Skipping fill with incomplete data: {fill}")
continue
full_symbol = symbol_from_fill
token = symbol_from_fill.split('/')[0] if '/' in symbol_from_fill else symbol_from_fill.split(':')[0]
exchange_order_id_from_fill = fill.get('info', {}).get('oid')
# First check if this is a pending entry order fill
if exchange_order_id_from_fill:
pending_lc = stats.get_lifecycle_by_entry_order_id(exchange_order_id_from_fill, status='pending')
if pending_lc and pending_lc.get('symbol') == full_symbol:
success = await stats.update_trade_position_opened(
lifecycle_id=pending_lc['trade_lifecycle_id'],
entry_price=price_from_fill,
entry_amount=amount_from_fill,
exchange_fill_id=trade_id
)
if success:
logger.info(f"đ Lifecycle ENTRY: {pending_lc['trade_lifecycle_id']} for {full_symbol} updated by fill {trade_id}.")
symbols_with_fills.add(token)
order_in_db_for_entry = stats.get_order_by_exchange_id(exchange_order_id_from_fill)
if order_in_db_for_entry:
stats.update_order_status(order_db_id=order_in_db_for_entry['id'], new_status='filled', amount_filled_increment=amount_from_fill)
# Send position opened notification (this is a bot-initiated position)
await self._send_position_change_notification(
full_symbol, side_from_fill, amount_from_fill, price_from_fill,
'position_opened', timestamp_dt
)
fill_processed_this_iteration = True
# Check if this is a known bot order (SL/TP/exit)
if not fill_processed_this_iteration and exchange_order_id_from_fill:
active_lc = None
closure_reason_action_type = None
bot_order_db_id_to_update = None
bot_order_for_fill = stats.get_order_by_exchange_id(exchange_order_id_from_fill)
if bot_order_for_fill and bot_order_for_fill.get('symbol') == full_symbol:
order_type = bot_order_for_fill.get('type')
order_side = bot_order_for_fill.get('side')
if order_type == 'market':
potential_lc = stats.get_trade_by_symbol_and_status(full_symbol, 'position_opened')
if potential_lc:
lc_pos_side = potential_lc.get('position_side')
if (lc_pos_side == 'long' and order_side == 'sell' and side_from_fill == 'sell') or \
(lc_pos_side == 'short' and order_side == 'buy' and side_from_fill == 'buy'):
active_lc = potential_lc
closure_reason_action_type = f"bot_exit_{lc_pos_side}_close"
bot_order_db_id_to_update = bot_order_for_fill.get('id')
logger.info(f"âšī¸ Lifecycle BOT EXIT: Fill {trade_id} (OID {exchange_order_id_from_fill}) for {full_symbol} matches bot exit for lifecycle {active_lc['trade_lifecycle_id']}.")
if not active_lc:
lc_by_sl = stats.get_lifecycle_by_sl_order_id(exchange_order_id_from_fill, status='position_opened')
if lc_by_sl and lc_by_sl.get('symbol') == full_symbol:
active_lc = lc_by_sl
closure_reason_action_type = f"sl_{active_lc.get('position_side')}_close"
bot_order_db_id_to_update = bot_order_for_fill.get('id')
logger.info(f"âšī¸ Lifecycle SL: Fill {trade_id} for OID {exchange_order_id_from_fill} matches SL for lifecycle {active_lc['trade_lifecycle_id']}.")
if not active_lc:
lc_by_tp = stats.get_lifecycle_by_tp_order_id(exchange_order_id_from_fill, status='position_opened')
if lc_by_tp and lc_by_tp.get('symbol') == full_symbol:
active_lc = lc_by_tp
closure_reason_action_type = f"tp_{active_lc.get('position_side')}_close"
bot_order_db_id_to_update = bot_order_for_fill.get('id')
logger.info(f"âšī¸ Lifecycle TP: Fill {trade_id} for OID {exchange_order_id_from_fill} matches TP for lifecycle {active_lc['trade_lifecycle_id']}.")
# Process known bot order fills
if active_lc and closure_reason_action_type:
lc_id = active_lc['trade_lifecycle_id']
lc_entry_price = active_lc.get('entry_price', 0)
lc_position_side = active_lc.get('position_side')
realized_pnl = 0
if lc_position_side == 'long':
realized_pnl = amount_from_fill * (price_from_fill - lc_entry_price)
elif lc_position_side == 'short':
realized_pnl = amount_from_fill * (lc_entry_price - price_from_fill)
success = stats.update_trade_position_closed(
lifecycle_id=lc_id, exit_price=price_from_fill,
realized_pnl=realized_pnl, exchange_fill_id=trade_id
)
if success:
pnl_emoji = "đĸ" if realized_pnl >= 0 else "đ´"
formatter = get_formatter()
logger.info(f"{pnl_emoji} Lifecycle CLOSED: {lc_id} ({closure_reason_action_type}). PNL for fill: {formatter.format_price_with_symbol(realized_pnl)}")
symbols_with_fills.add(token)
# Send position closed notification
await self._send_position_change_notification(
full_symbol, side_from_fill, amount_from_fill, price_from_fill,
'position_closed', timestamp_dt, active_lc, realized_pnl
)
stats.migrate_trade_to_aggregated_stats(lc_id)
if bot_order_db_id_to_update:
stats.update_order_status(order_db_id=bot_order_db_id_to_update, new_status='filled', amount_filled_increment=amount_from_fill)
fill_processed_this_iteration = True
# Check for external stop losses
if not fill_processed_this_iteration:
if (exchange_order_id_from_fill and
self.shared_state.get('external_stop_losses') and
exchange_order_id_from_fill in self.shared_state['external_stop_losses']):
stop_loss_info = self.shared_state['external_stop_losses'][exchange_order_id_from_fill]
formatter = get_formatter()
logger.info(f"đ External SL (MM Tracking): {token} Order {exchange_order_id_from_fill} filled @ {formatter.format_price_with_symbol(price_from_fill, token)}")
sl_active_lc = stats.get_trade_by_symbol_and_status(full_symbol, 'position_opened')
if sl_active_lc:
lc_id = sl_active_lc['trade_lifecycle_id']
lc_entry_price = sl_active_lc.get('entry_price', 0)
lc_pos_side = sl_active_lc.get('position_side')
realized_pnl = amount_from_fill * (price_from_fill - lc_entry_price) if lc_pos_side == 'long' else amount_from_fill * (lc_entry_price - price_from_fill)
success = stats.update_trade_position_closed(lc_id, price_from_fill, realized_pnl, trade_id)
if success:
pnl_emoji = "đĸ" if realized_pnl >= 0 else "đ´"
logger.info(f"{pnl_emoji} Lifecycle CLOSED by External SL (MM): {lc_id}. PNL: {formatter.format_price_with_symbol(realized_pnl)}")
if self.notification_manager:
await self.notification_manager.send_stop_loss_execution_notification(
stop_loss_info, full_symbol, side_from_fill, amount_from_fill, price_from_fill,
f'{lc_pos_side}_closed_external_sl', timestamp_dt.isoformat(), realized_pnl
)
stats.migrate_trade_to_aggregated_stats(lc_id)
# Modify shared state carefully
if exchange_order_id_from_fill in self.shared_state['external_stop_losses']:
del self.shared_state['external_stop_losses'][exchange_order_id_from_fill]
fill_processed_this_iteration = True
else:
logger.warning(f"â ī¸ External SL (MM) {exchange_order_id_from_fill} for {full_symbol}, but no active lifecycle found.")
# NEW: Enhanced external trade processing with position state detection
if not fill_processed_this_iteration:
existing_lc = stats.get_trade_by_symbol_and_status(full_symbol, 'position_opened')
# If no lifecycle exists but we have a position on exchange, try to auto-sync first
if not existing_lc:
current_positions = self._safe_get_positions()
if current_positions is None:
logger.warning("â ī¸ Failed to fetch positions for external trade detection - skipping this fill")
continue
exchange_position = None
for pos in current_positions:
if pos.get('symbol') == full_symbol:
exchange_position = pos
break
if exchange_position and abs(float(exchange_position.get('contracts', 0))) > 1e-9:
logger.info(f"đ AUTO-SYNC: Position exists on exchange for {full_symbol} but no lifecycle found. Auto-syncing before processing fill.")
success = await self._auto_sync_single_position(full_symbol, exchange_position, stats)
if success:
# Re-check for lifecycle after auto-sync
existing_lc = stats.get_trade_by_symbol_and_status(full_symbol, 'position_opened')
action_type = await self._determine_position_action_type(
full_symbol, side_from_fill, amount_from_fill, existing_lc
)
logger.info(f"đ External fill analysis: {full_symbol} {side_from_fill} {amount_from_fill} -> {action_type}")
# Additional debug logging for position changes
if existing_lc:
previous_size = existing_lc.get('current_position_size', 0)
current_positions = self._safe_get_positions()
if current_positions is None:
logger.warning("â ī¸ Failed to fetch positions for debug logging - skipping debug info")
# Set defaults to avoid reference errors
current_size = previous_size
else:
current_size = 0
for pos in current_positions:
if pos.get('symbol') == full_symbol:
current_size = abs(float(pos.get('contracts', 0)))
break
logger.info(f"đ Position size change: {previous_size} -> {current_size} (diff: {current_size - previous_size})")
logger.info(f"đ¯ Expected change based on fill: {'+' if side_from_fill.lower() == 'buy' else '-'}{amount_from_fill}")
# Check if this might be a position decrease that was misclassified
if (action_type == 'external_unmatched' and
existing_lc.get('position_side') == 'long' and
side_from_fill.lower() == 'sell' and
current_size < previous_size):
logger.warning(f"â ī¸ Potential misclassification: {full_symbol} {side_from_fill} looks like position decrease but classified as external_unmatched")
# Force re-check with proper parameters
action_type = 'position_decreased'
logger.info(f"đ Corrected action_type to: {action_type}")
elif (action_type == 'external_unmatched' and
existing_lc.get('position_side') == 'long' and
side_from_fill.lower() == 'buy' and
current_size > previous_size):
logger.warning(f"â ī¸ Potential misclassification: {full_symbol} {side_from_fill} looks like position increase but classified as external_unmatched")
action_type = 'position_increased'
logger.info(f"đ Corrected action_type to: {action_type}")
if action_type == 'position_opened':
# Create new lifecycle for external position
lifecycle_id = stats.create_trade_lifecycle(
symbol=full_symbol,
side=side_from_fill,
entry_order_id=exchange_order_id_from_fill or f"external_{trade_id}",
trade_type="external"
)
if lifecycle_id:
success = stats.update_trade_position_opened(
lifecycle_id=lifecycle_id,
entry_price=price_from_fill,
entry_amount=amount_from_fill,
exchange_fill_id=trade_id
)
if success:
logger.info(f"đ Created and opened new external lifecycle: {lifecycle_id[:8]} for {full_symbol}")
symbols_with_fills.add(token)
# Send position opened notification
await self._send_position_change_notification(
full_symbol, side_from_fill, amount_from_fill, price_from_fill,
action_type, timestamp_dt
)
fill_processed_this_iteration = True
elif action_type == 'position_closed' and existing_lc:
# Close existing lifecycle
lc_id = existing_lc['trade_lifecycle_id']
lc_entry_price = existing_lc.get('entry_price', 0)
lc_position_side = existing_lc.get('position_side')
realized_pnl = 0
if lc_position_side == 'long':
realized_pnl = amount_from_fill * (price_from_fill - lc_entry_price)
elif lc_position_side == 'short':
realized_pnl = amount_from_fill * (lc_entry_price - price_from_fill)
success = stats.update_trade_position_closed(
lifecycle_id=lc_id,
exit_price=price_from_fill,
realized_pnl=realized_pnl,
exchange_fill_id=trade_id
)
if success:
pnl_emoji = "đĸ" if realized_pnl >= 0 else "đ´"
formatter = get_formatter()
logger.info(f"{pnl_emoji} Lifecycle CLOSED (External): {lc_id}. PNL: {formatter.format_price_with_symbol(realized_pnl)}")
symbols_with_fills.add(token)
# Send position closed notification
await self._send_position_change_notification(
full_symbol, side_from_fill, amount_from_fill, price_from_fill,
action_type, timestamp_dt, existing_lc, realized_pnl
)
stats.migrate_trade_to_aggregated_stats(lc_id)
fill_processed_this_iteration = True
elif action_type in ['position_increased', 'position_decreased'] and existing_lc:
# Update lifecycle position size and send notification
current_positions = self._safe_get_positions()
if current_positions is None:
logger.warning("â ī¸ Failed to fetch positions for size update - skipping position change processing")
continue
new_size = 0
for pos in current_positions:
if pos.get('symbol') == full_symbol:
new_size = abs(float(pos.get('contracts', 0)))
break
# Update lifecycle with new position size
await self._update_lifecycle_position_size(existing_lc['trade_lifecycle_id'], new_size)
# Send appropriate notification
await self._send_position_change_notification(
full_symbol, side_from_fill, amount_from_fill, price_from_fill,
action_type, timestamp_dt, existing_lc
)
symbols_with_fills.add(token)
fill_processed_this_iteration = True
logger.info(f"đ Position {action_type}: {full_symbol} new size: {new_size}")
# Fallback for unmatched external trades
if not fill_processed_this_iteration:
all_open_positions_in_db = stats.get_open_positions()
db_open_symbols = {pos_db.get('symbol') for pos_db in all_open_positions_in_db}
if full_symbol in db_open_symbols:
logger.debug(f"Position {full_symbol} found in open positions but no active lifecycle - likely auto-sync failed or timing issue for fill {trade_id}")
# Record as unmatched external trade
linked_order_db_id = None
if exchange_order_id_from_fill:
order_in_db = stats.get_order_by_exchange_id(exchange_order_id_from_fill)
if order_in_db:
linked_order_db_id = order_in_db.get('id')
stats.record_trade(
full_symbol, side_from_fill, amount_from_fill, price_from_fill,
exchange_fill_id=trade_id, trade_type="external_unmatched",
timestamp=timestamp_dt.isoformat(),
linked_order_table_id_to_link=linked_order_db_id
)
logger.info(f"đ Recorded trade via FALLBACK: {trade_id} (Unmatched External Fill)")
# No notification sent for unmatched external trades per user preference
fill_processed_this_iteration = True
if fill_processed_this_iteration:
external_trades_processed += 1
if self.last_processed_trade_time is None or timestamp_dt > self.last_processed_trade_time:
self.last_processed_trade_time = timestamp_dt
except Exception as e:
logger.error(f"Error processing fill {fill.get('id', 'N/A')}: {e}", exc_info=True)
continue
if external_trades_processed > 0:
stats._set_metadata('market_monitor_last_processed_trade_time', self.last_processed_trade_time.isoformat())
logger.info(f"đž Saved MarketMonitor state (last_processed_trade_time) to DB: {self.last_processed_trade_time.isoformat()}")
logger.info(f"đ Processed {external_trades_processed} external trades")
if symbols_with_fills:
logger.info(f"âšī¸ Symbols with processed fills this cycle: {list(symbols_with_fills)}")
except Exception as e:
logger.error(f"â Error checking external trades: {e}", exc_info=True)