|
@@ -0,0 +1,1236 @@
|
|
|
+#!/usr/bin/env python3
|
|
|
+"""
|
|
|
+Monitors positions, external events like trades made outside the bot, and price alarms.
|
|
|
+"""
|
|
|
+
|
|
|
+import logging
|
|
|
+import asyncio
|
|
|
+from datetime import datetime, timedelta, timezone
|
|
|
+from typing import Optional, Dict, Any, List
|
|
|
+
|
|
|
+# Assuming AlarmManager will be moved here or imported appropriately
|
|
|
+# from .alarm_manager import AlarmManager
|
|
|
+from src.monitoring.alarm_manager import AlarmManager # Keep if AlarmManager stays in its own file as per original structure
|
|
|
+from src.utils.token_display_formatter import get_formatter
|
|
|
+
|
|
|
+logger = logging.getLogger(__name__)
|
|
|
+
|
|
|
+class PositionMonitor:
|
|
|
+ def __init__(self, trading_engine, notification_manager, alarm_manager, market_monitor_cache, shared_state):
|
|
|
+ self.trading_engine = trading_engine
|
|
|
+ self.notification_manager = notification_manager
|
|
|
+ self.alarm_manager = alarm_manager
|
|
|
+ self.market_monitor_cache = market_monitor_cache
|
|
|
+ self.shared_state = shared_state # Expected to contain {'external_stop_losses': ...}
|
|
|
+ self.last_processed_trade_time: Optional[datetime] = None
|
|
|
+ # Add necessary initializations, potentially loading last_processed_trade_time
|
|
|
+
|
|
|
+ async def run_monitor_cycle(self):
|
|
|
+ """Runs a full monitoring cycle."""
|
|
|
+ await self._check_external_trades()
|
|
|
+ await self._check_price_alarms()
|
|
|
+ await self._reconcile_positions()
|
|
|
+
|
|
|
+ def _safe_get_positions(self) -> Optional[List[Dict[str, Any]]]:
|
|
|
+ """Safely get positions from trading engine, returning None on API failures instead of empty list."""
|
|
|
+ try:
|
|
|
+ return self.trading_engine.get_positions()
|
|
|
+ except Exception as e:
|
|
|
+ logger.warning(f"⚠️ Failed to fetch positions in external event monitor: {e}")
|
|
|
+ return None
|
|
|
+
|
|
|
+ async def _check_price_alarms(self):
|
|
|
+ """Check price alarms and trigger notifications."""
|
|
|
+ try:
|
|
|
+ active_alarms = self.alarm_manager.get_all_active_alarms()
|
|
|
+
|
|
|
+ if not active_alarms:
|
|
|
+ return
|
|
|
+
|
|
|
+ tokens_to_check = list(set(alarm['token'] for alarm in active_alarms))
|
|
|
+
|
|
|
+ for token in tokens_to_check:
|
|
|
+ try:
|
|
|
+ symbol = f"{token}/USDC:USDC"
|
|
|
+ market_data = self.trading_engine.get_market_data(symbol)
|
|
|
+
|
|
|
+ if not market_data or not market_data.get('ticker'):
|
|
|
+ continue
|
|
|
+
|
|
|
+ current_price = float(market_data['ticker'].get('last', 0))
|
|
|
+ if current_price <= 0:
|
|
|
+ continue
|
|
|
+
|
|
|
+ token_alarms = [alarm for alarm in active_alarms if alarm['token'] == token]
|
|
|
+
|
|
|
+ for alarm in token_alarms:
|
|
|
+ target_price = alarm['target_price']
|
|
|
+ direction = alarm['direction']
|
|
|
+
|
|
|
+ should_trigger = False
|
|
|
+ if direction == 'above' and current_price >= target_price:
|
|
|
+ should_trigger = True
|
|
|
+ elif direction == 'below' and current_price <= target_price:
|
|
|
+ should_trigger = True
|
|
|
+
|
|
|
+ if should_trigger:
|
|
|
+ triggered_alarm = self.alarm_manager.trigger_alarm(alarm['id'], current_price)
|
|
|
+ if triggered_alarm:
|
|
|
+ await self._send_alarm_notification(triggered_alarm)
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Error checking alarms for {token}: {e}")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error("❌ Error checking price alarms: {e}")
|
|
|
+
|
|
|
+ async def _send_alarm_notification(self, alarm: Dict[str, Any]):
|
|
|
+ """Send notification for triggered alarm."""
|
|
|
+ try:
|
|
|
+ if self.notification_manager:
|
|
|
+ await self.notification_manager.send_alarm_triggered_notification(
|
|
|
+ alarm['token'],
|
|
|
+ alarm['target_price'],
|
|
|
+ alarm['triggered_price'],
|
|
|
+ alarm['direction']
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ logger.info(f"🔔 ALARM TRIGGERED: {alarm['token']} @ ${alarm['triggered_price']:,.2f}")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ Error sending alarm notification: {e}")
|
|
|
+
|
|
|
+ async def _determine_position_action_type(self, full_symbol: str, side_from_fill: str,
|
|
|
+ amount_from_fill: float, existing_lc: Optional[Dict] = None) -> str:
|
|
|
+ """
|
|
|
+ Determine the type of position action based on current state and fill details.
|
|
|
+ Returns one of: 'position_opened', 'position_closed', 'position_increased', 'position_decreased'
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ # Get current position from exchange
|
|
|
+ current_positions = self._safe_get_positions()
|
|
|
+ if current_positions is None:
|
|
|
+ logger.warning(f"⚠️ Failed to fetch positions for {full_symbol} analysis - returning external_unmatched")
|
|
|
+ return 'external_unmatched'
|
|
|
+
|
|
|
+ current_exchange_position = None
|
|
|
+ for pos in current_positions:
|
|
|
+ if pos.get('symbol') == full_symbol:
|
|
|
+ current_exchange_position = pos
|
|
|
+ break
|
|
|
+
|
|
|
+ current_size = 0.0
|
|
|
+ if current_exchange_position:
|
|
|
+ current_size = abs(float(current_exchange_position.get('contracts', 0)))
|
|
|
+
|
|
|
+ # If no existing lifecycle, this is a position opening
|
|
|
+ if not existing_lc:
|
|
|
+ logger.debug(f"🔍 Position analysis: {full_symbol} no existing lifecycle, current size: {current_size}")
|
|
|
+ if current_size > 1e-9: # Position exists on exchange
|
|
|
+ return 'position_opened'
|
|
|
+ else:
|
|
|
+ return 'external_unmatched'
|
|
|
+
|
|
|
+ # Get previous position size from lifecycle
|
|
|
+ previous_size = existing_lc.get('current_position_size', 0)
|
|
|
+ lc_position_side = existing_lc.get('position_side')
|
|
|
+
|
|
|
+ logger.debug(f"🔍 Position analysis: {full_symbol} {side_from_fill} {amount_from_fill}")
|
|
|
+ logger.debug(f" Lifecycle side: {lc_position_side}, previous size: {previous_size}, current size: {current_size}")
|
|
|
+
|
|
|
+ # Check if this is a closing trade (opposite side)
|
|
|
+ is_closing_trade = False
|
|
|
+ if lc_position_side == 'long' and side_from_fill.lower() == 'sell':
|
|
|
+ is_closing_trade = True
|
|
|
+ elif lc_position_side == 'short' and side_from_fill.lower() == 'buy':
|
|
|
+ is_closing_trade = True
|
|
|
+
|
|
|
+ logger.debug(f" Is closing trade: {is_closing_trade}")
|
|
|
+
|
|
|
+ if is_closing_trade:
|
|
|
+ if current_size < 1e-9: # Position is now closed
|
|
|
+ logger.debug(f" → Position closed (current_size < 1e-9)")
|
|
|
+ return 'position_closed'
|
|
|
+ elif current_size < previous_size - 1e-9: # Position reduced but not closed
|
|
|
+ logger.debug(f" → Position decreased (current_size {current_size} < previous_size - 1e-9 {previous_size - 1e-9})")
|
|
|
+ return 'position_decreased'
|
|
|
+ else:
|
|
|
+ # Same side trade - position increase
|
|
|
+ logger.debug(f" Same side trade check: current_size {current_size} > previous_size + 1e-9 {previous_size + 1e-9}?")
|
|
|
+ if current_size > previous_size + 1e-9:
|
|
|
+ logger.debug(f" → Position increased")
|
|
|
+ return 'position_increased'
|
|
|
+ else:
|
|
|
+ logger.debug(f" → Size check failed, not enough increase")
|
|
|
+
|
|
|
+ # Default fallback
|
|
|
+ logger.debug(f" → Fallback to external_unmatched")
|
|
|
+ return 'external_unmatched'
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Error determining position action type: {e}")
|
|
|
+ return 'external_unmatched'
|
|
|
+
|
|
|
+ async def _update_lifecycle_position_size(self, lifecycle_id: str, new_size: float) -> bool:
|
|
|
+ """Update the current position size in the lifecycle."""
|
|
|
+ try:
|
|
|
+ stats = self.trading_engine.get_stats()
|
|
|
+ if not stats:
|
|
|
+ return False
|
|
|
+
|
|
|
+ # Update the current position size
|
|
|
+ success = stats.trade_manager.update_trade_market_data(
|
|
|
+ lifecycle_id, current_position_size=new_size
|
|
|
+ )
|
|
|
+ return success
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Error updating lifecycle position size: {e}")
|
|
|
+ return False
|
|
|
+
|
|
|
+ async def _send_position_change_notification(self, full_symbol: str, side_from_fill: str,
|
|
|
+ amount_from_fill: float, price_from_fill: float,
|
|
|
+ action_type: str, timestamp_dt: datetime,
|
|
|
+ existing_lc: Optional[Dict] = None,
|
|
|
+ realized_pnl: Optional[float] = None):
|
|
|
+ """Send position change notification."""
|
|
|
+ try:
|
|
|
+ if not self.notification_manager:
|
|
|
+ return
|
|
|
+
|
|
|
+ token = full_symbol.split('/')[0] if '/' in full_symbol else full_symbol.split(':')[0]
|
|
|
+ time_str = timestamp_dt.strftime('%Y-%m-%d %H:%M:%S UTC')
|
|
|
+ formatter = get_formatter()
|
|
|
+
|
|
|
+ if action_type == 'position_closed' and existing_lc:
|
|
|
+ position_side = existing_lc.get('position_side', 'unknown').upper()
|
|
|
+ entry_price = existing_lc.get('entry_price', 0)
|
|
|
+ pnl_emoji = "🟢" if realized_pnl and realized_pnl >= 0 else "🔴"
|
|
|
+ pnl_text = f"{await formatter.format_price_with_symbol(realized_pnl)}" if realized_pnl is not None else "N/A"
|
|
|
+
|
|
|
+ # Get ROE directly from exchange data
|
|
|
+ info_data = existing_lc.get('info', {})
|
|
|
+ position_info = info_data.get('position', {})
|
|
|
+ roe_raw = position_info.get('returnOnEquity') # Changed from 'percentage' to 'returnOnEquity'
|
|
|
+
|
|
|
+ if roe_raw is not None:
|
|
|
+ try:
|
|
|
+ # The exchange provides ROE as a decimal (e.g., -0.326 for -32.6%)
|
|
|
+ # We need to multiply by 100 and keep the sign
|
|
|
+ roe = float(roe_raw) * 100
|
|
|
+ roe_text = f" ({roe:+.2f}%)"
|
|
|
+ except (ValueError, TypeError):
|
|
|
+ logger.warning(f"Could not parse ROE value: {roe_raw} for {full_symbol}")
|
|
|
+ roe_text = ""
|
|
|
+ else:
|
|
|
+ logger.warning(f"No ROE data available from exchange for {full_symbol}")
|
|
|
+ roe_text = ""
|
|
|
+
|
|
|
+ message = f"""
|
|
|
+🎯 <b>Position Closed (External)</b>
|
|
|
+
|
|
|
+📊 <b>Trade Details:</b>
|
|
|
+• Token: {token}
|
|
|
+• Direction: {position_side}
|
|
|
+• Size Closed: {await formatter.format_amount(amount_from_fill, token)}
|
|
|
+• Entry Price: {await formatter.format_price_with_symbol(entry_price, token)}
|
|
|
+• Exit Price: {await formatter.format_price_with_symbol(price_from_fill, token)}
|
|
|
+• Exit Value: {await formatter.format_price_with_symbol(amount_from_fill * price_from_fill)}
|
|
|
+
|
|
|
+{pnl_emoji} <b>P&L:</b> {pnl_text}{roe_text}
|
|
|
+✅ <b>Status:</b> {position_side} position closed externally
|
|
|
+⏰ <b>Time:</b> {time_str}
|
|
|
+
|
|
|
+📊 Use /stats to view updated performance
|
|
|
+ """
|
|
|
+
|
|
|
+ elif action_type == 'position_opened':
|
|
|
+ position_side = 'LONG' if side_from_fill.lower() == 'buy' else 'SHORT'
|
|
|
+ message = f"""
|
|
|
+🚀 <b>Position Opened (External)</b>
|
|
|
+
|
|
|
+📊 <b>Trade Details:</b>
|
|
|
+• Token: {token}
|
|
|
+• Direction: {position_side}
|
|
|
+• Size: {await formatter.format_amount(amount_from_fill, token)}
|
|
|
+• Entry Price: {await formatter.format_price_with_symbol(price_from_fill, token)}
|
|
|
+• Position Value: {await formatter.format_price_with_symbol(amount_from_fill * price_from_fill)}
|
|
|
+
|
|
|
+✅ <b>Status:</b> New {position_side} position opened externally
|
|
|
+⏰ <b>Time:</b> {time_str}
|
|
|
+
|
|
|
+📱 Use /positions to view all positions
|
|
|
+ """
|
|
|
+
|
|
|
+ elif action_type == 'position_increased' and existing_lc:
|
|
|
+ position_side = existing_lc.get('position_side', 'unknown').upper()
|
|
|
+ previous_size = existing_lc.get('current_position_size', 0)
|
|
|
+ # Get current size from exchange
|
|
|
+ current_positions = self._safe_get_positions()
|
|
|
+ if current_positions is None:
|
|
|
+ # Skip notification if we can't get position data
|
|
|
+ logger.warning(f"⚠️ Failed to fetch positions for notification - skipping {action_type} notification")
|
|
|
+ return
|
|
|
+ current_size = 0
|
|
|
+ for pos in current_positions:
|
|
|
+ if pos.get('symbol') == full_symbol:
|
|
|
+ current_size = abs(float(pos.get('contracts', 0)))
|
|
|
+ break
|
|
|
+
|
|
|
+ message = f"""
|
|
|
+📈 <b>Position Increased (External)</b>
|
|
|
+
|
|
|
+📊 <b>Trade Details:</b>
|
|
|
+• Token: {token}
|
|
|
+• Direction: {position_side}
|
|
|
+• Size Added: {await formatter.format_amount(amount_from_fill, token)}
|
|
|
+• Add Price: {await formatter.format_price_with_symbol(price_from_fill, token)}
|
|
|
+• Previous Size: {await formatter.format_amount(previous_size, token)}
|
|
|
+• New Size: {await formatter.format_amount(current_size, token)}
|
|
|
+• Add Value: {await formatter.format_price_with_symbol(amount_from_fill * price_from_fill)}
|
|
|
+
|
|
|
+📈 <b>Status:</b> {position_side} position size increased externally
|
|
|
+⏰ <b>Time:</b> {time_str}
|
|
|
+
|
|
|
+📈 Use /positions to view current position
|
|
|
+ """
|
|
|
+
|
|
|
+ elif action_type == 'position_decreased' and existing_lc:
|
|
|
+ position_side = existing_lc.get('position_side', 'unknown').upper()
|
|
|
+ previous_size = existing_lc.get('current_position_size', 0)
|
|
|
+ entry_price = existing_lc.get('entry_price', 0)
|
|
|
+
|
|
|
+ # Get current size from exchange
|
|
|
+ current_positions = self._safe_get_positions()
|
|
|
+ if current_positions is None:
|
|
|
+ # Skip notification if we can't get position data
|
|
|
+ logger.warning(f"⚠️ Failed to fetch positions for notification - skipping {action_type} notification")
|
|
|
+ return
|
|
|
+ current_size = 0
|
|
|
+ for pos in current_positions:
|
|
|
+ if pos.get('symbol') == full_symbol:
|
|
|
+ current_size = abs(float(pos.get('contracts', 0)))
|
|
|
+ break
|
|
|
+
|
|
|
+ # Calculate partial PnL for the reduced amount
|
|
|
+ partial_pnl = 0
|
|
|
+ if entry_price > 0:
|
|
|
+ if position_side == 'LONG':
|
|
|
+ partial_pnl = amount_from_fill * (price_from_fill - entry_price)
|
|
|
+ else: # SHORT
|
|
|
+ partial_pnl = amount_from_fill * (entry_price - price_from_fill)
|
|
|
+
|
|
|
+ pnl_emoji = "🟢" if partial_pnl >= 0 else "🔴"
|
|
|
+
|
|
|
+ # Calculate ROE for the partial close
|
|
|
+ roe_text = ""
|
|
|
+ if entry_price > 0 and amount_from_fill > 0:
|
|
|
+ cost_basis = amount_from_fill * entry_price
|
|
|
+ roe = (partial_pnl / cost_basis) * 100
|
|
|
+ roe_text = f" ({roe:+.2f}%)"
|
|
|
+
|
|
|
+ message = f"""
|
|
|
+📉 <b>Position Decreased (External)</b>
|
|
|
+
|
|
|
+📊 <b>Trade Details:</b>
|
|
|
+• Token: {token}
|
|
|
+• Direction: {position_side}
|
|
|
+• Size Reduced: {await formatter.format_amount(amount_from_fill, token)}
|
|
|
+• Exit Price: {await formatter.format_price_with_symbol(price_from_fill, token)}
|
|
|
+• Previous Size: {await formatter.format_amount(previous_size, token)}
|
|
|
+• Remaining Size: {await formatter.format_amount(current_size, token)}
|
|
|
+• Exit Value: {await formatter.format_price_with_symbol(amount_from_fill * price_from_fill)}
|
|
|
+
|
|
|
+{pnl_emoji} <b>Partial P&L:</b> {await formatter.format_price_with_symbol(partial_pnl)}{roe_text}
|
|
|
+📉 <b>Status:</b> {position_side} position size decreased externally
|
|
|
+⏰ <b>Time:</b> {time_str}
|
|
|
+
|
|
|
+📊 Position remains open. Use /positions to view details
|
|
|
+ """
|
|
|
+ else:
|
|
|
+ # No fallback notification sent - only position-based notifications per user preference
|
|
|
+ logger.debug(f"No notification sent for action_type: {action_type}")
|
|
|
+ return
|
|
|
+
|
|
|
+ await self.notification_manager.send_generic_notification(message.strip())
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Error sending position change notification: {e}")
|
|
|
+
|
|
|
+ async def _auto_sync_single_position(self, symbol: str, exchange_position: Dict[str, Any], stats) -> bool:
|
|
|
+ """Auto-sync a single orphaned position to create a lifecycle record."""
|
|
|
+ try:
|
|
|
+ import uuid
|
|
|
+ from src.utils.token_display_formatter import get_formatter
|
|
|
+
|
|
|
+ formatter = get_formatter()
|
|
|
+ contracts_abs = abs(float(exchange_position.get('contracts', 0)))
|
|
|
+
|
|
|
+ if contracts_abs <= 1e-9:
|
|
|
+ return False
|
|
|
+
|
|
|
+ entry_price_from_exchange = float(exchange_position.get('entryPrice', 0)) or float(exchange_position.get('entryPx', 0))
|
|
|
+
|
|
|
+ # Determine position side
|
|
|
+ position_side, order_side = '', ''
|
|
|
+ ccxt_side = exchange_position.get('side', '').lower()
|
|
|
+ if ccxt_side == 'long':
|
|
|
+ position_side, order_side = 'long', 'buy'
|
|
|
+ elif ccxt_side == 'short':
|
|
|
+ position_side, order_side = 'short', 'sell'
|
|
|
+
|
|
|
+ if not position_side:
|
|
|
+ contracts_val = float(exchange_position.get('contracts', 0))
|
|
|
+ if contracts_val > 1e-9:
|
|
|
+ position_side, order_side = 'long', 'buy'
|
|
|
+ elif contracts_val < -1e-9:
|
|
|
+ position_side, order_side = 'short', 'sell'
|
|
|
+ else:
|
|
|
+ return False
|
|
|
+
|
|
|
+ if not position_side:
|
|
|
+ logger.error(f"AUTO-SYNC: Could not determine position side for {symbol}.")
|
|
|
+ return False
|
|
|
+
|
|
|
+ final_entry_price = entry_price_from_exchange
|
|
|
+ if not final_entry_price or final_entry_price <= 0:
|
|
|
+ # Fallback to a reasonable estimate (current mark price)
|
|
|
+ mark_price = float(exchange_position.get('markPrice', 0)) or float(exchange_position.get('markPx', 0))
|
|
|
+ if mark_price > 0:
|
|
|
+ final_entry_price = mark_price
|
|
|
+ else:
|
|
|
+ logger.error(f"AUTO-SYNC: Could not determine entry price for {symbol}.")
|
|
|
+ return False
|
|
|
+
|
|
|
+ logger.info(f"🔄 AUTO-SYNC: Creating lifecycle for {symbol} {position_side.upper()} {contracts_abs} @ {await formatter.format_price_with_symbol(final_entry_price, symbol)}")
|
|
|
+
|
|
|
+ unique_sync_id = str(uuid.uuid4())[:8]
|
|
|
+ lifecycle_id = stats.create_trade_lifecycle(
|
|
|
+ symbol=symbol,
|
|
|
+ side=order_side,
|
|
|
+ entry_order_id=f"external_sync_{unique_sync_id}",
|
|
|
+ trade_type='external_sync'
|
|
|
+ )
|
|
|
+
|
|
|
+ if lifecycle_id:
|
|
|
+ success = await stats.update_trade_position_opened(
|
|
|
+ lifecycle_id,
|
|
|
+ final_entry_price,
|
|
|
+ contracts_abs,
|
|
|
+ f"external_fill_sync_{unique_sync_id}"
|
|
|
+ )
|
|
|
+
|
|
|
+ if success:
|
|
|
+ logger.info(f"✅ AUTO-SYNC: Successfully synced position for {symbol} (Lifecycle: {lifecycle_id[:8]})")
|
|
|
+
|
|
|
+ # Send position opened notification for auto-synced position
|
|
|
+ try:
|
|
|
+ await self._send_position_change_notification(
|
|
|
+ symbol, order_side, contracts_abs, final_entry_price,
|
|
|
+ 'position_opened', datetime.now(timezone.utc)
|
|
|
+ )
|
|
|
+ logger.info(f"📨 AUTO-SYNC: Sent position opened notification for {symbol}")
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ AUTO-SYNC: Failed to send notification for {symbol}: {e}")
|
|
|
+
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ logger.error(f"❌ AUTO-SYNC: Failed to update lifecycle to 'position_opened' for {symbol}")
|
|
|
+ else:
|
|
|
+ logger.error(f"❌ AUTO-SYNC: Failed to create lifecycle for {symbol}")
|
|
|
+
|
|
|
+ return False
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ AUTO-SYNC: Error syncing position for {symbol}: {e}")
|
|
|
+ return False
|
|
|
+
|
|
|
+ async def _check_external_trades(self):
|
|
|
+ """Check for trades made outside the Telegram bot and update stats."""
|
|
|
+ try:
|
|
|
+ stats = self.trading_engine.get_stats()
|
|
|
+ if not stats:
|
|
|
+ logger.warning("TradingStats not available in _check_external_trades. Skipping.")
|
|
|
+ return
|
|
|
+
|
|
|
+ external_trades_processed = 0
|
|
|
+ symbols_with_fills = set()
|
|
|
+
|
|
|
+ recent_fills = self.trading_engine.get_recent_fills()
|
|
|
+ if not recent_fills:
|
|
|
+ logger.debug("No recent fills data available")
|
|
|
+ return
|
|
|
+
|
|
|
+ if not hasattr(self, 'last_processed_trade_time') or self.last_processed_trade_time is None:
|
|
|
+ try:
|
|
|
+ # Ensure this metadata key is the one used by MarketMonitor for saving this state.
|
|
|
+ last_time_str = stats._get_metadata('market_monitor_last_processed_trade_time')
|
|
|
+ if last_time_str:
|
|
|
+ self.last_processed_trade_time = datetime.fromisoformat(last_time_str).replace(tzinfo=timezone.utc)
|
|
|
+ else:
|
|
|
+ self.last_processed_trade_time = datetime.now(timezone.utc) - timedelta(hours=1)
|
|
|
+ except Exception:
|
|
|
+ self.last_processed_trade_time = datetime.now(timezone.utc) - timedelta(hours=1)
|
|
|
+
|
|
|
+ for fill in recent_fills:
|
|
|
+ try:
|
|
|
+ trade_id = fill.get('id')
|
|
|
+ timestamp_ms = fill.get('timestamp')
|
|
|
+ symbol_from_fill = fill.get('symbol')
|
|
|
+ side_from_fill = fill.get('side')
|
|
|
+ amount_from_fill = float(fill.get('amount', 0))
|
|
|
+ price_from_fill = float(fill.get('price', 0))
|
|
|
+
|
|
|
+ timestamp_dt = datetime.fromtimestamp(timestamp_ms / 1000, tz=timezone.utc) if timestamp_ms else datetime.now(timezone.utc)
|
|
|
+
|
|
|
+ if self.last_processed_trade_time and timestamp_dt <= self.last_processed_trade_time:
|
|
|
+ continue
|
|
|
+
|
|
|
+ # Check if this fill has already been processed to prevent duplicates
|
|
|
+ if trade_id and stats.has_exchange_fill_been_processed(str(trade_id)):
|
|
|
+ logger.debug(f"Skipping already processed fill: {trade_id}")
|
|
|
+ continue
|
|
|
+
|
|
|
+ fill_processed_this_iteration = False
|
|
|
+
|
|
|
+ if not (symbol_from_fill and side_from_fill and amount_from_fill > 0 and price_from_fill > 0):
|
|
|
+ logger.warning(f"Skipping fill with incomplete data: {fill}")
|
|
|
+ continue
|
|
|
+
|
|
|
+ full_symbol = symbol_from_fill
|
|
|
+ token = symbol_from_fill.split('/')[0] if '/' in symbol_from_fill else symbol_from_fill.split(':')[0]
|
|
|
+
|
|
|
+ exchange_order_id_from_fill = fill.get('info', {}).get('oid')
|
|
|
+
|
|
|
+ # First check if this is a pending entry order fill
|
|
|
+ if exchange_order_id_from_fill:
|
|
|
+ pending_lc = stats.get_lifecycle_by_entry_order_id(exchange_order_id_from_fill, status='pending')
|
|
|
+ if pending_lc and pending_lc.get('symbol') == full_symbol:
|
|
|
+ success = await stats.update_trade_position_opened(
|
|
|
+ lifecycle_id=pending_lc['trade_lifecycle_id'],
|
|
|
+ entry_price=price_from_fill,
|
|
|
+ entry_amount=amount_from_fill,
|
|
|
+ exchange_fill_id=trade_id
|
|
|
+ )
|
|
|
+ if success:
|
|
|
+ logger.info(f"📈 Lifecycle ENTRY: {pending_lc['trade_lifecycle_id']} for {full_symbol} updated by fill {trade_id}.")
|
|
|
+ symbols_with_fills.add(token)
|
|
|
+ order_in_db_for_entry = stats.get_order_by_exchange_id(exchange_order_id_from_fill)
|
|
|
+ if order_in_db_for_entry:
|
|
|
+ stats.update_order_status(order_db_id=order_in_db_for_entry['id'], new_status='filled', amount_filled_increment=amount_from_fill)
|
|
|
+
|
|
|
+ # Send position opened notification (this is a bot-initiated position)
|
|
|
+ await self._send_position_change_notification(
|
|
|
+ full_symbol, side_from_fill, amount_from_fill, price_from_fill,
|
|
|
+ 'position_opened', timestamp_dt
|
|
|
+ )
|
|
|
+ fill_processed_this_iteration = True
|
|
|
+
|
|
|
+ # Check if this is a known bot order (SL/TP/exit)
|
|
|
+ if not fill_processed_this_iteration and exchange_order_id_from_fill:
|
|
|
+ active_lc = None
|
|
|
+ closure_reason_action_type = None
|
|
|
+ bot_order_db_id_to_update = None
|
|
|
+
|
|
|
+ bot_order_for_fill = stats.get_order_by_exchange_id(exchange_order_id_from_fill)
|
|
|
+ if bot_order_for_fill and bot_order_for_fill.get('symbol') == full_symbol:
|
|
|
+ order_type = bot_order_for_fill.get('type')
|
|
|
+ order_side = bot_order_for_fill.get('side')
|
|
|
+ if order_type == 'market':
|
|
|
+ potential_lc = stats.get_trade_by_symbol_and_status(full_symbol, 'position_opened')
|
|
|
+ if potential_lc:
|
|
|
+ lc_pos_side = potential_lc.get('position_side')
|
|
|
+ if (lc_pos_side == 'long' and order_side == 'sell' and side_from_fill == 'sell') or \
|
|
|
+ (lc_pos_side == 'short' and order_side == 'buy' and side_from_fill == 'buy'):
|
|
|
+ active_lc = potential_lc
|
|
|
+ closure_reason_action_type = f"bot_exit_{lc_pos_side}_close"
|
|
|
+ bot_order_db_id_to_update = bot_order_for_fill.get('id')
|
|
|
+ logger.info(f"ℹ️ Lifecycle BOT EXIT: Fill {trade_id} (OID {exchange_order_id_from_fill}) for {full_symbol} matches bot exit for lifecycle {active_lc['trade_lifecycle_id']}.")
|
|
|
+
|
|
|
+ if not active_lc:
|
|
|
+ lc_by_sl = stats.get_lifecycle_by_sl_order_id(exchange_order_id_from_fill, status='position_opened')
|
|
|
+ if lc_by_sl and lc_by_sl.get('symbol') == full_symbol:
|
|
|
+ active_lc = lc_by_sl
|
|
|
+ closure_reason_action_type = f"sl_{active_lc.get('position_side')}_close"
|
|
|
+ bot_order_db_id_to_update = bot_order_for_fill.get('id')
|
|
|
+ logger.info(f"ℹ️ Lifecycle SL: Fill {trade_id} for OID {exchange_order_id_from_fill} matches SL for lifecycle {active_lc['trade_lifecycle_id']}.")
|
|
|
+
|
|
|
+ if not active_lc:
|
|
|
+ lc_by_tp = stats.get_lifecycle_by_tp_order_id(exchange_order_id_from_fill, status='position_opened')
|
|
|
+ if lc_by_tp and lc_by_tp.get('symbol') == full_symbol:
|
|
|
+ active_lc = lc_by_tp
|
|
|
+ closure_reason_action_type = f"tp_{active_lc.get('position_side')}_close"
|
|
|
+ bot_order_db_id_to_update = bot_order_for_fill.get('id')
|
|
|
+ logger.info(f"ℹ️ Lifecycle TP: Fill {trade_id} for OID {exchange_order_id_from_fill} matches TP for lifecycle {active_lc['trade_lifecycle_id']}.")
|
|
|
+
|
|
|
+ # Process known bot order fills
|
|
|
+ if active_lc and closure_reason_action_type:
|
|
|
+ lc_id = active_lc['trade_lifecycle_id']
|
|
|
+ lc_entry_price = active_lc.get('entry_price', 0)
|
|
|
+ lc_position_side = active_lc.get('position_side')
|
|
|
+
|
|
|
+ realized_pnl = 0
|
|
|
+ if lc_position_side == 'long':
|
|
|
+ realized_pnl = amount_from_fill * (price_from_fill - lc_entry_price)
|
|
|
+ elif lc_position_side == 'short':
|
|
|
+ realized_pnl = amount_from_fill * (lc_entry_price - price_from_fill)
|
|
|
+
|
|
|
+ success = stats.update_trade_position_closed(
|
|
|
+ lifecycle_id=lc_id, exit_price=price_from_fill,
|
|
|
+ realized_pnl=realized_pnl, exchange_fill_id=trade_id
|
|
|
+ )
|
|
|
+ if success:
|
|
|
+ pnl_emoji = "🟢" if realized_pnl >= 0 else "🔴"
|
|
|
+ formatter = get_formatter()
|
|
|
+ logger.info(f"{pnl_emoji} Lifecycle CLOSED: {lc_id} ({closure_reason_action_type}). PNL for fill: {await formatter.format_price_with_symbol(realized_pnl)}")
|
|
|
+ symbols_with_fills.add(token)
|
|
|
+
|
|
|
+ # Send position closed notification
|
|
|
+ await self._send_position_change_notification(
|
|
|
+ full_symbol, side_from_fill, amount_from_fill, price_from_fill,
|
|
|
+ 'position_closed', timestamp_dt, active_lc, realized_pnl
|
|
|
+ )
|
|
|
+
|
|
|
+ stats.migrate_trade_to_aggregated_stats(lc_id)
|
|
|
+ if bot_order_db_id_to_update:
|
|
|
+ stats.update_order_status(order_db_id=bot_order_db_id_to_update, new_status='filled', amount_filled_increment=amount_from_fill)
|
|
|
+ fill_processed_this_iteration = True
|
|
|
+
|
|
|
+ # Check for external stop losses
|
|
|
+ if not fill_processed_this_iteration:
|
|
|
+ if (exchange_order_id_from_fill and
|
|
|
+ self.shared_state.get('external_stop_losses') and
|
|
|
+ exchange_order_id_from_fill in self.shared_state['external_stop_losses']):
|
|
|
+ stop_loss_info = self.shared_state['external_stop_losses'][exchange_order_id_from_fill]
|
|
|
+ formatter = get_formatter()
|
|
|
+ logger.info(f"🛑 External SL (MM Tracking): {token} Order {exchange_order_id_from_fill} filled @ {await formatter.format_price_with_symbol(price_from_fill, token)}")
|
|
|
+
|
|
|
+ sl_active_lc = stats.get_trade_by_symbol_and_status(full_symbol, 'position_opened')
|
|
|
+ if sl_active_lc:
|
|
|
+ lc_id = sl_active_lc['trade_lifecycle_id']
|
|
|
+ lc_entry_price = sl_active_lc.get('entry_price', 0)
|
|
|
+ lc_pos_side = sl_active_lc.get('position_side')
|
|
|
+ realized_pnl = amount_from_fill * (price_from_fill - lc_entry_price) if lc_pos_side == 'long' else amount_from_fill * (lc_entry_price - price_from_fill)
|
|
|
+
|
|
|
+ success = stats.update_trade_position_closed(lc_id, price_from_fill, realized_pnl, trade_id)
|
|
|
+ if success:
|
|
|
+ pnl_emoji = "🟢" if realized_pnl >= 0 else "🔴"
|
|
|
+ logger.info(f"{pnl_emoji} Lifecycle CLOSED by External SL (MM): {lc_id}. PNL: {await formatter.format_price_with_symbol(realized_pnl)}")
|
|
|
+ if self.notification_manager:
|
|
|
+ await self.notification_manager.send_stop_loss_execution_notification(
|
|
|
+ stop_loss_info, full_symbol, side_from_fill, amount_from_fill, price_from_fill,
|
|
|
+ f'{lc_pos_side}_closed_external_sl', timestamp_dt.isoformat(), realized_pnl
|
|
|
+ )
|
|
|
+ stats.migrate_trade_to_aggregated_stats(lc_id)
|
|
|
+ # Modify shared state carefully
|
|
|
+ if exchange_order_id_from_fill in self.shared_state['external_stop_losses']:
|
|
|
+ del self.shared_state['external_stop_losses'][exchange_order_id_from_fill]
|
|
|
+ fill_processed_this_iteration = True
|
|
|
+ else:
|
|
|
+ logger.warning(f"⚠️ External SL (MM) {exchange_order_id_from_fill} for {full_symbol}, but no active lifecycle found.")
|
|
|
+
|
|
|
+ # NEW: Enhanced external trade processing with position state detection
|
|
|
+ if not fill_processed_this_iteration:
|
|
|
+ existing_lc = stats.get_trade_by_symbol_and_status(full_symbol, 'position_opened')
|
|
|
+
|
|
|
+ # If no lifecycle exists but we have a position on exchange, try to auto-sync first
|
|
|
+ if not existing_lc:
|
|
|
+ current_positions = self._safe_get_positions()
|
|
|
+ if current_positions is None:
|
|
|
+ logger.warning("⚠️ Failed to fetch positions for external trade detection - skipping this fill")
|
|
|
+ continue
|
|
|
+ exchange_position = None
|
|
|
+ for pos in current_positions:
|
|
|
+ if pos.get('symbol') == full_symbol:
|
|
|
+ exchange_position = pos
|
|
|
+ break
|
|
|
+
|
|
|
+ if exchange_position and abs(float(exchange_position.get('contracts', 0))) > 1e-9:
|
|
|
+ logger.info(f"🔄 AUTO-SYNC: Position exists on exchange for {full_symbol} but no lifecycle found. Auto-syncing before processing fill.")
|
|
|
+ success = await self._auto_sync_single_position(full_symbol, exchange_position, stats)
|
|
|
+ if success:
|
|
|
+ # Re-check for lifecycle after auto-sync
|
|
|
+ existing_lc = stats.get_trade_by_symbol_and_status(full_symbol, 'position_opened')
|
|
|
+
|
|
|
+ action_type = await self._determine_position_action_type(
|
|
|
+ full_symbol, side_from_fill, amount_from_fill, existing_lc
|
|
|
+ )
|
|
|
+
|
|
|
+ logger.info(f"🔍 External fill analysis: {full_symbol} {side_from_fill} {amount_from_fill} -> {action_type}")
|
|
|
+
|
|
|
+ # Additional debug logging for position changes
|
|
|
+ if existing_lc:
|
|
|
+ previous_size = existing_lc.get('current_position_size', 0)
|
|
|
+ current_positions = self._safe_get_positions()
|
|
|
+ if current_positions is None:
|
|
|
+ logger.warning("⚠️ Failed to fetch positions for debug logging - skipping debug info")
|
|
|
+ # Set defaults to avoid reference errors
|
|
|
+ current_size = previous_size
|
|
|
+ else:
|
|
|
+ current_size = 0
|
|
|
+ for pos in current_positions:
|
|
|
+ if pos.get('symbol') == full_symbol:
|
|
|
+ current_size = abs(float(pos.get('contracts', 0)))
|
|
|
+ break
|
|
|
+ logger.info(f"📊 Position size change: {previous_size} -> {current_size} (diff: {current_size - previous_size})")
|
|
|
+ logger.info(f"🎯 Expected change based on fill: {'+' if side_from_fill.lower() == 'buy' else '-'}{amount_from_fill}")
|
|
|
+
|
|
|
+ # If lifecycle is already closed, we should not re-process as a new closure.
|
|
|
+ if existing_lc.get('status') == 'position_closed':
|
|
|
+ logger.info(f"ℹ️ Fill {trade_id} received for already closed lifecycle {existing_lc['trade_lifecycle_id']}. Recording fill and skipping further action.")
|
|
|
+ stats.record_trade(
|
|
|
+ full_symbol, side_from_fill, amount_from_fill, price_from_fill,
|
|
|
+ exchange_fill_id=trade_id, trade_type="external_fill_for_closed_pos",
|
|
|
+ pnl=None, timestamp=timestamp_dt.isoformat(),
|
|
|
+ linked_order_table_id_to_link=None
|
|
|
+ )
|
|
|
+ fill_processed_this_iteration = True
|
|
|
+ continue # Skip to next fill
|
|
|
+
|
|
|
+ # Check if this might be a position decrease that was misclassified
|
|
|
+ if (action_type == 'external_unmatched' and
|
|
|
+ existing_lc.get('position_side') == 'long' and
|
|
|
+ side_from_fill.lower() == 'sell' and
|
|
|
+ current_size < previous_size):
|
|
|
+ logger.warning(f"⚠️ Potential misclassification: {full_symbol} {side_from_fill} looks like position decrease but classified as external_unmatched")
|
|
|
+ # Force re-check with proper parameters
|
|
|
+ action_type = 'position_decreased'
|
|
|
+ logger.info(f"🔄 Corrected action_type to: {action_type}")
|
|
|
+ elif (action_type == 'external_unmatched' and
|
|
|
+ existing_lc.get('position_side') == 'long' and
|
|
|
+ side_from_fill.lower() == 'buy' and
|
|
|
+ current_size > previous_size):
|
|
|
+ logger.warning(f"⚠️ Potential misclassification: {full_symbol} {side_from_fill} looks like position increase but classified as external_unmatched")
|
|
|
+ action_type = 'position_increased'
|
|
|
+ logger.info(f"🔄 Corrected action_type to: {action_type}")
|
|
|
+
|
|
|
+ if action_type == 'position_opened':
|
|
|
+ # Create new lifecycle for external position
|
|
|
+ lifecycle_id = stats.create_trade_lifecycle(
|
|
|
+ symbol=full_symbol,
|
|
|
+ side=side_from_fill,
|
|
|
+ entry_order_id=exchange_order_id_from_fill or f"external_{trade_id}",
|
|
|
+ trade_type="external"
|
|
|
+ )
|
|
|
+
|
|
|
+ if lifecycle_id:
|
|
|
+ success = stats.update_trade_position_opened(
|
|
|
+ lifecycle_id=lifecycle_id,
|
|
|
+ entry_price=price_from_fill,
|
|
|
+ entry_amount=amount_from_fill,
|
|
|
+ exchange_fill_id=trade_id
|
|
|
+ )
|
|
|
+
|
|
|
+ if success:
|
|
|
+ logger.info(f"📈 Created and opened new external lifecycle: {lifecycle_id[:8]} for {full_symbol}")
|
|
|
+ symbols_with_fills.add(token)
|
|
|
+
|
|
|
+ # Send position opened notification
|
|
|
+ await self._send_position_change_notification(
|
|
|
+ full_symbol, side_from_fill, amount_from_fill, price_from_fill,
|
|
|
+ action_type, timestamp_dt
|
|
|
+ )
|
|
|
+ fill_processed_this_iteration = True
|
|
|
+
|
|
|
+ elif action_type == 'position_closed' and existing_lc:
|
|
|
+ # Close existing lifecycle
|
|
|
+ lc_id = existing_lc['trade_lifecycle_id']
|
|
|
+ lc_entry_price = existing_lc.get('entry_price', 0)
|
|
|
+ lc_position_side = existing_lc.get('position_side')
|
|
|
+
|
|
|
+ realized_pnl = 0
|
|
|
+ if lc_position_side == 'long':
|
|
|
+ realized_pnl = amount_from_fill * (price_from_fill - lc_entry_price)
|
|
|
+ elif lc_position_side == 'short':
|
|
|
+ realized_pnl = amount_from_fill * (lc_entry_price - price_from_fill)
|
|
|
+
|
|
|
+ success = stats.update_trade_position_closed(
|
|
|
+ lifecycle_id=lc_id,
|
|
|
+ exit_price=price_from_fill,
|
|
|
+ realized_pnl=realized_pnl,
|
|
|
+ exchange_fill_id=trade_id
|
|
|
+ )
|
|
|
+ if success:
|
|
|
+ pnl_emoji = "🟢" if realized_pnl >= 0 else "🔴"
|
|
|
+ formatter = get_formatter()
|
|
|
+ logger.info(f"{pnl_emoji} Lifecycle CLOSED (External): {lc_id}. PNL: {await formatter.format_price_with_symbol(realized_pnl)}")
|
|
|
+ symbols_with_fills.add(token)
|
|
|
+
|
|
|
+ # Send position closed notification
|
|
|
+ await self._send_position_change_notification(
|
|
|
+ full_symbol, side_from_fill, amount_from_fill, price_from_fill,
|
|
|
+ action_type, timestamp_dt, existing_lc, realized_pnl
|
|
|
+ )
|
|
|
+
|
|
|
+ stats.migrate_trade_to_aggregated_stats(lc_id)
|
|
|
+ fill_processed_this_iteration = True
|
|
|
+
|
|
|
+ elif action_type in ['position_increased', 'position_decreased'] and existing_lc:
|
|
|
+ # Update lifecycle position size and send notification
|
|
|
+ current_positions = self._safe_get_positions()
|
|
|
+ if current_positions is None:
|
|
|
+ logger.warning("⚠️ Failed to fetch positions for size update - skipping position change processing")
|
|
|
+ continue
|
|
|
+ new_size = 0
|
|
|
+ for pos in current_positions:
|
|
|
+ if pos.get('symbol') == full_symbol:
|
|
|
+ new_size = abs(float(pos.get('contracts', 0)))
|
|
|
+ break
|
|
|
+
|
|
|
+ # Update lifecycle with new position size
|
|
|
+ await self._update_lifecycle_position_size(existing_lc['trade_lifecycle_id'], new_size)
|
|
|
+
|
|
|
+ # Send appropriate notification
|
|
|
+ await self._send_position_change_notification(
|
|
|
+ full_symbol, side_from_fill, amount_from_fill, price_from_fill,
|
|
|
+ action_type, timestamp_dt, existing_lc
|
|
|
+ )
|
|
|
+
|
|
|
+ symbols_with_fills.add(token)
|
|
|
+ fill_processed_this_iteration = True
|
|
|
+ logger.info(f"📊 Position {action_type}: {full_symbol} new size: {new_size}")
|
|
|
+
|
|
|
+ # Fallback for unmatched external trades
|
|
|
+ if not fill_processed_this_iteration:
|
|
|
+ all_open_positions_in_db = stats.get_open_positions()
|
|
|
+ db_open_symbols = {pos_db.get('symbol') for pos_db in all_open_positions_in_db}
|
|
|
+
|
|
|
+ if full_symbol in db_open_symbols:
|
|
|
+ logger.debug(f"Position {full_symbol} found in open positions but no active lifecycle - likely auto-sync failed or timing issue for fill {trade_id}")
|
|
|
+
|
|
|
+ # Record as unmatched external trade
|
|
|
+ linked_order_db_id = None
|
|
|
+ if exchange_order_id_from_fill:
|
|
|
+ order_in_db = stats.get_order_by_exchange_id(exchange_order_id_from_fill)
|
|
|
+ if order_in_db:
|
|
|
+ linked_order_db_id = order_in_db.get('id')
|
|
|
+
|
|
|
+ stats.record_trade(
|
|
|
+ full_symbol, side_from_fill, amount_from_fill, price_from_fill,
|
|
|
+ exchange_fill_id=trade_id, trade_type="external_unmatched",
|
|
|
+ timestamp=timestamp_dt.isoformat(),
|
|
|
+ linked_order_table_id_to_link=linked_order_db_id
|
|
|
+ )
|
|
|
+ logger.info(f"📋 Recorded trade via FALLBACK: {trade_id} (Unmatched External Fill)")
|
|
|
+
|
|
|
+ # No notification sent for unmatched external trades per user preference
|
|
|
+ fill_processed_this_iteration = True
|
|
|
+
|
|
|
+ if fill_processed_this_iteration:
|
|
|
+ external_trades_processed += 1
|
|
|
+ if self.last_processed_trade_time is None or timestamp_dt > self.last_processed_trade_time:
|
|
|
+ self.last_processed_trade_time = timestamp_dt
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Error processing fill {fill.get('id', 'N/A')}: {e}", exc_info=True)
|
|
|
+ continue
|
|
|
+
|
|
|
+ if external_trades_processed > 0:
|
|
|
+ stats._set_metadata('market_monitor_last_processed_trade_time', self.last_processed_trade_time.isoformat())
|
|
|
+ logger.info(f"💾 Saved MarketMonitor state (last_processed_trade_time) to DB: {self.last_processed_trade_time.isoformat()}")
|
|
|
+ logger.info(f"📊 Processed {external_trades_processed} external trades")
|
|
|
+ if symbols_with_fills:
|
|
|
+ logger.info(f"ℹ️ Symbols with processed fills this cycle: {list(symbols_with_fills)}")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ Error checking external trades: {e}", exc_info=True)
|
|
|
+
|
|
|
+ async def _reconcile_positions(self):
|
|
|
+ """Main method - check all positions for changes and send notifications."""
|
|
|
+ try:
|
|
|
+ stats = self.trading_engine.get_stats()
|
|
|
+ if not stats:
|
|
|
+ logger.warning("TradingStats not available")
|
|
|
+ return
|
|
|
+
|
|
|
+ # Get current exchange positions
|
|
|
+ exchange_positions = self.trading_engine.get_positions()
|
|
|
+
|
|
|
+ # Handle API failures gracefully - don't treat None as empty positions
|
|
|
+ if exchange_positions is None:
|
|
|
+ logger.warning("⚠️ Failed to fetch exchange positions - skipping position change detection to avoid false closures")
|
|
|
+ return
|
|
|
+
|
|
|
+ # Get current DB positions (trades with status='position_opened')
|
|
|
+ db_positions = stats.get_open_positions()
|
|
|
+
|
|
|
+ # Create lookup maps
|
|
|
+ exchange_map = {pos['symbol']: pos for pos in exchange_positions if abs(float(pos.get('contracts', 0))) > 1e-9}
|
|
|
+ db_map = {pos['symbol']: pos for pos in db_positions}
|
|
|
+
|
|
|
+ all_symbols = set(exchange_map.keys()) | set(db_map.keys())
|
|
|
+
|
|
|
+ for symbol in all_symbols:
|
|
|
+ await self._check_symbol_position_change(symbol, exchange_map.get(symbol), db_map.get(symbol), stats)
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ Error checking position changes: {e}")
|
|
|
+
|
|
|
+ async def _check_symbol_position_change(self, symbol: str, exchange_pos: Optional[Dict],
|
|
|
+ db_pos: Optional[Dict], stats) -> None:
|
|
|
+ """Check position changes for a single symbol."""
|
|
|
+ try:
|
|
|
+ current_time = datetime.now(timezone.utc)
|
|
|
+
|
|
|
+ # Case 1: New position (exchange has, DB doesn't)
|
|
|
+ if exchange_pos and not db_pos:
|
|
|
+ await self._handle_position_opened(symbol, exchange_pos, stats, current_time)
|
|
|
+
|
|
|
+ # Case 2: Position closed (DB has, exchange doesn't)
|
|
|
+ elif db_pos and not exchange_pos:
|
|
|
+ await self._handle_position_closed(symbol, db_pos, stats, current_time)
|
|
|
+
|
|
|
+ # Case 3: Position size changed (both exist, different sizes)
|
|
|
+ elif exchange_pos and db_pos:
|
|
|
+ await self._handle_position_size_change(symbol, exchange_pos, db_pos, stats, current_time)
|
|
|
+
|
|
|
+ # Case 4: Both None - no action needed
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ Error checking position change for {symbol}: {e}")
|
|
|
+
|
|
|
+ async def _handle_position_opened(self, symbol: str, exchange_pos: Dict, stats, timestamp: datetime):
|
|
|
+ """Handle new position detection."""
|
|
|
+ try:
|
|
|
+ contracts = float(exchange_pos.get('contracts', 0))
|
|
|
+ size = abs(contracts)
|
|
|
+
|
|
|
+ # Use CCXT's side field first (more reliable), fallback to contract sign
|
|
|
+ ccxt_side = exchange_pos.get('side', '').lower()
|
|
|
+ if ccxt_side == 'long':
|
|
|
+ side, order_side = 'long', 'buy'
|
|
|
+ elif ccxt_side == 'short':
|
|
|
+ side, order_side = 'short', 'sell'
|
|
|
+ else:
|
|
|
+ # Fallback to contract sign (less reliable but better than nothing)
|
|
|
+ side = 'long' if contracts > 0 else 'short'
|
|
|
+ order_side = 'buy' if side == 'long' else 'sell'
|
|
|
+ logger.warning(f"⚠️ Using contract sign fallback for {symbol}: side={side}, ccxt_side='{ccxt_side}'")
|
|
|
+
|
|
|
+ # Get entry price from exchange
|
|
|
+ entry_price = float(exchange_pos.get('entryPrice', 0)) or float(exchange_pos.get('entryPx', 0))
|
|
|
+ if not entry_price:
|
|
|
+ entry_price = float(exchange_pos.get('markPrice', 0)) or float(exchange_pos.get('markPx', 0))
|
|
|
+
|
|
|
+ if not entry_price:
|
|
|
+ logger.error(f"❌ Cannot determine entry price for {symbol}")
|
|
|
+ return
|
|
|
+
|
|
|
+ # Extract ROE from info.position.returnOnEquity
|
|
|
+ roe_raw = None
|
|
|
+ if 'info' in exchange_pos and 'position' in exchange_pos['info']:
|
|
|
+ roe_raw = exchange_pos['info']['position'].get('returnOnEquity')
|
|
|
+ roe_percentage = float(roe_raw) * 100 if roe_raw is not None else 0.0
|
|
|
+
|
|
|
+ # Create trade lifecycle using existing manager
|
|
|
+ lifecycle_id = stats.create_trade_lifecycle(
|
|
|
+ symbol=symbol,
|
|
|
+ side=order_side,
|
|
|
+ entry_order_id=f"external_position_{timestamp.strftime('%Y%m%d_%H%M%S')}",
|
|
|
+ trade_type='external_detected'
|
|
|
+ )
|
|
|
+
|
|
|
+ if lifecycle_id:
|
|
|
+ # Update to position_opened using existing manager
|
|
|
+ await stats.update_trade_position_opened(
|
|
|
+ lifecycle_id=lifecycle_id,
|
|
|
+ entry_price=entry_price,
|
|
|
+ entry_amount=size,
|
|
|
+ exchange_fill_id=f"position_detected_{timestamp.isoformat()}"
|
|
|
+ )
|
|
|
+
|
|
|
+ # Now, update the market data for the newly opened position
|
|
|
+ margin_used = None
|
|
|
+ if 'info' in exchange_pos and isinstance(exchange_pos['info'], dict):
|
|
|
+ position_info = exchange_pos['info'].get('position', {})
|
|
|
+ if position_info:
|
|
|
+ margin_used = position_info.get('marginUsed')
|
|
|
+
|
|
|
+ stats.trade_manager.update_trade_market_data(
|
|
|
+ lifecycle_id=lifecycle_id,
|
|
|
+ current_position_size=size,
|
|
|
+ unrealized_pnl=exchange_pos.get('unrealizedPnl'),
|
|
|
+ roe_percentage=roe_percentage,
|
|
|
+ mark_price=exchange_pos.get('markPrice'),
|
|
|
+ position_value=exchange_pos.get('positionValue'),
|
|
|
+ margin_used=margin_used,
|
|
|
+ leverage=exchange_pos.get('leverage'),
|
|
|
+ liquidation_price=exchange_pos.get('liquidationPrice')
|
|
|
+ )
|
|
|
+
|
|
|
+ logger.info(f"🚀 NEW POSITION: {symbol} {side.upper()} {size} @ {entry_price}")
|
|
|
+
|
|
|
+ # Send notification
|
|
|
+ await self._send_reconciliation_notification('opened', symbol, {
|
|
|
+ 'side': side,
|
|
|
+ 'size': size,
|
|
|
+ 'price': entry_price,
|
|
|
+ 'timestamp': timestamp
|
|
|
+ })
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ Error handling position opened for {symbol}: {e}")
|
|
|
+
|
|
|
+ async def _handle_position_closed(self, symbol: str, db_pos: Dict, stats, timestamp: datetime):
|
|
|
+ """Handle position closure detection."""
|
|
|
+ try:
|
|
|
+ lifecycle_id = db_pos['trade_lifecycle_id']
|
|
|
+ entry_price = db_pos.get('entry_price', 0)
|
|
|
+ position_side = db_pos.get('position_side')
|
|
|
+ size = db_pos.get('current_position_size', 0)
|
|
|
+
|
|
|
+ # Estimate exit price (could be improved with recent fills)
|
|
|
+ market_data = await self.trading_engine.get_market_data(symbol)
|
|
|
+ exit_price = entry_price # Fallback
|
|
|
+ if market_data and market_data.get('ticker'):
|
|
|
+ exit_price = float(market_data['ticker'].get('last', exit_price))
|
|
|
+
|
|
|
+ # Calculate realized PnL
|
|
|
+ realized_pnl = 0
|
|
|
+ if position_side == 'long':
|
|
|
+ realized_pnl = size * (exit_price - entry_price)
|
|
|
+ elif position_side == 'short':
|
|
|
+ realized_pnl = size * (entry_price - exit_price)
|
|
|
+
|
|
|
+ # Update to position_closed using existing manager
|
|
|
+ success = await stats.update_trade_position_closed(
|
|
|
+ lifecycle_id=lifecycle_id,
|
|
|
+ exit_price=exit_price,
|
|
|
+ realized_pnl=realized_pnl,
|
|
|
+ exchange_fill_id=f"position_closed_detected_{timestamp.isoformat()}"
|
|
|
+ )
|
|
|
+
|
|
|
+ if success:
|
|
|
+ logger.info(f"🎯 POSITION CLOSED: {symbol} {position_side.upper()} PnL: {realized_pnl:.2f}")
|
|
|
+
|
|
|
+ # Get exchange position info for ROE
|
|
|
+ exchange_positions = self.trading_engine.get_positions()
|
|
|
+ exchange_pos = None
|
|
|
+ if exchange_positions:
|
|
|
+ for pos in exchange_positions:
|
|
|
+ if pos.get('symbol') == symbol:
|
|
|
+ exchange_pos = pos
|
|
|
+ break
|
|
|
+
|
|
|
+ # Send notification
|
|
|
+ await self._send_reconciliation_notification('closed', symbol, {
|
|
|
+ 'side': position_side,
|
|
|
+ 'size': size,
|
|
|
+ 'entry_price': entry_price,
|
|
|
+ 'exit_price': exit_price,
|
|
|
+ 'realized_pnl': realized_pnl,
|
|
|
+ 'timestamp': timestamp,
|
|
|
+ 'info': exchange_pos.get('info', {}) if exchange_pos else {}
|
|
|
+ })
|
|
|
+
|
|
|
+ # Clear any pending stop losses for this symbol
|
|
|
+ stats.order_manager.cancel_pending_stop_losses_by_symbol(symbol, 'cancelled_position_closed')
|
|
|
+
|
|
|
+ # Migrate trade to aggregated stats and clean up
|
|
|
+ stats.migrate_trade_to_aggregated_stats(lifecycle_id)
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ Error handling position closed for {symbol}: {e}")
|
|
|
+
|
|
|
+ async def _handle_position_size_change(self, symbol: str, exchange_pos: Dict,
|
|
|
+ db_pos: Dict, stats, timestamp: datetime):
|
|
|
+ """Handle position size changes and position flips."""
|
|
|
+ try:
|
|
|
+ exchange_size = abs(float(exchange_pos.get('contracts', 0)))
|
|
|
+ db_size = db_pos.get('current_position_size', 0)
|
|
|
+ db_position_side = db_pos.get('position_side')
|
|
|
+
|
|
|
+ # Determine current exchange position side
|
|
|
+ ccxt_side = exchange_pos.get('side', '').lower()
|
|
|
+ if ccxt_side == 'long':
|
|
|
+ exchange_position_side = 'long'
|
|
|
+ elif ccxt_side == 'short':
|
|
|
+ exchange_position_side = 'short'
|
|
|
+ else:
|
|
|
+ # Fallback to contract sign
|
|
|
+ contracts = float(exchange_pos.get('contracts', 0))
|
|
|
+ exchange_position_side = 'long' if contracts > 1e-9 else 'short'
|
|
|
+ logger.warning(f"⚠️ Using contract sign fallback for side detection: {symbol}")
|
|
|
+
|
|
|
+ # Check for POSITION FLIP (LONG ↔ SHORT)
|
|
|
+ if db_position_side != exchange_position_side:
|
|
|
+ logger.info(f"🔄 POSITION FLIP DETECTED: {symbol} {db_position_side.upper()} → {exchange_position_side.upper()}")
|
|
|
+
|
|
|
+ # Handle as: close old position + open new position
|
|
|
+ await self._handle_position_closed(symbol, db_pos, stats, timestamp)
|
|
|
+ await self._handle_position_opened(symbol, exchange_pos, stats, timestamp)
|
|
|
+ return
|
|
|
+
|
|
|
+ # If we are here, the side is the same. Now we can update market data for the existing trade.
|
|
|
+ lifecycle_id = db_pos['trade_lifecycle_id']
|
|
|
+
|
|
|
+ # Extract all relevant market data from the exchange position
|
|
|
+ unrealized_pnl = exchange_pos.get('unrealizedPnl')
|
|
|
+ leverage = exchange_pos.get('leverage')
|
|
|
+ liquidation_price = exchange_pos.get('liquidationPrice')
|
|
|
+ mark_price = exchange_pos.get('markPrice')
|
|
|
+ position_value = exchange_pos.get('contracts', 0) * exchange_pos.get('markPrice', 0) if mark_price else None
|
|
|
+
|
|
|
+ # Safely extract ROE and Margin from the 'info' dictionary
|
|
|
+ roe_percentage = None
|
|
|
+ margin_used = None
|
|
|
+ if 'info' in exchange_pos and isinstance(exchange_pos['info'], dict):
|
|
|
+ logger.debug(f"Exchange position info for {symbol}: {exchange_pos['info']}") # Temporary logging
|
|
|
+ position_info = exchange_pos['info'].get('position', {})
|
|
|
+ if position_info:
|
|
|
+ roe_raw = position_info.get('returnOnEquity')
|
|
|
+ roe_percentage = float(roe_raw) * 100 if roe_raw is not None else None
|
|
|
+ margin_used = position_info.get('marginUsed')
|
|
|
+
|
|
|
+ # Call the trade manager to update the database
|
|
|
+ success = stats.trade_manager.update_trade_market_data(
|
|
|
+ lifecycle_id=lifecycle_id,
|
|
|
+ current_position_size=exchange_size,
|
|
|
+ unrealized_pnl=unrealized_pnl,
|
|
|
+ roe_percentage=roe_percentage,
|
|
|
+ mark_price=mark_price,
|
|
|
+ position_value=position_value,
|
|
|
+ margin_used=margin_used,
|
|
|
+ leverage=leverage,
|
|
|
+ liquidation_price=liquidation_price
|
|
|
+ )
|
|
|
+
|
|
|
+ if success:
|
|
|
+ logger.debug(f"🔄 Synced market data for {symbol} (Lifecycle: {lifecycle_id[:8]})")
|
|
|
+
|
|
|
+
|
|
|
+ # Check if size actually changed (with small tolerance)
|
|
|
+ if abs(exchange_size - db_size) < 1e-6:
|
|
|
+ return # No meaningful change
|
|
|
+
|
|
|
+ lifecycle_id = db_pos['trade_lifecycle_id']
|
|
|
+ entry_price = db_pos.get('entry_price', 0)
|
|
|
+
|
|
|
+ # Extract ROE from info.position.returnOnEquity
|
|
|
+ roe_raw = None
|
|
|
+ if 'info' in exchange_pos and 'position' in exchange_pos['info']:
|
|
|
+ roe_raw = exchange_pos['info']['position'].get('returnOnEquity')
|
|
|
+ roe_percentage = float(roe_raw) * 100 if roe_raw is not None else 0.0
|
|
|
+
|
|
|
+ # Update position size using existing manager
|
|
|
+ success = stats.trade_manager.update_trade_market_data(
|
|
|
+ lifecycle_id,
|
|
|
+ current_position_size=exchange_size,
|
|
|
+ unrealized_pnl=exchange_pos.get('unrealizedPnl'),
|
|
|
+ roe_percentage=roe_percentage,
|
|
|
+ mark_price=exchange_pos.get('markPrice'),
|
|
|
+ position_value=exchange_pos.get('positionValue'),
|
|
|
+ margin_used=exchange_pos.get('marginUsed'),
|
|
|
+ leverage=exchange_pos.get('leverage'),
|
|
|
+ liquidation_price=exchange_pos.get('liquidationPrice')
|
|
|
+ )
|
|
|
+
|
|
|
+ if success:
|
|
|
+ change_type = 'increased' if exchange_size > db_size else 'decreased'
|
|
|
+ size_diff = abs(exchange_size - db_size)
|
|
|
+ logger.info(f"📊 Position size {change_type}: {symbol} by {size_diff} (ROE: {roe_percentage:+.2f}%)")
|
|
|
+
|
|
|
+ # Send notification
|
|
|
+ await self._send_reconciliation_notification('size_changed', symbol, {
|
|
|
+ 'side': exchange_position_side,
|
|
|
+ 'old_size': db_size,
|
|
|
+ 'new_size': exchange_size,
|
|
|
+ 'change_type': change_type,
|
|
|
+ 'size_diff': size_diff,
|
|
|
+ 'roe': roe_percentage,
|
|
|
+ 'timestamp': timestamp
|
|
|
+ })
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ Error handling position size change for {symbol}: {e}")
|
|
|
+
|
|
|
+ async def _send_reconciliation_notification(self, change_type: str, symbol: str, details: Dict[str, Any]):
|
|
|
+ """Send position change notification."""
|
|
|
+ try:
|
|
|
+ if not self.notification_manager:
|
|
|
+ return
|
|
|
+
|
|
|
+ token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0]
|
|
|
+ time_str = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')
|
|
|
+ formatter = get_formatter()
|
|
|
+
|
|
|
+ if change_type == 'opened':
|
|
|
+ side = details['side'].upper()
|
|
|
+ size = details['size']
|
|
|
+ entry_price = details['price']
|
|
|
+
|
|
|
+ message = f"""🎯 <b>Position Opened (Reconciled)</b>
|
|
|
+
|
|
|
+📊 <b>Details:</b>
|
|
|
+• Token: {token}
|
|
|
+• Direction: {side}
|
|
|
+• Size: {await formatter.format_amount(size, token)}
|
|
|
+• Entry: {await formatter.format_price_with_symbol(entry_price, token)}
|
|
|
+
|
|
|
+⏰ <b>Time:</b> {time_str}
|
|
|
+📈 Use /positions to view current status"""
|
|
|
+
|
|
|
+ elif change_type == 'closed':
|
|
|
+ side = details['side'].upper()
|
|
|
+ size = details['size']
|
|
|
+ entry_price = details['entry_price']
|
|
|
+ exit_price = details['exit_price']
|
|
|
+ pnl = details['realized_pnl']
|
|
|
+ pnl_emoji = "🟢" if pnl >= 0 else "🔴"
|
|
|
+
|
|
|
+ # Get ROE directly from exchange data
|
|
|
+ info_data = details.get('info', {})
|
|
|
+ position_info = info_data.get('position', {})
|
|
|
+ roe_raw = position_info.get('returnOnEquity') # Changed from 'percentage' to 'returnOnEquity'
|
|
|
+
|
|
|
+ if roe_raw is not None:
|
|
|
+ try:
|
|
|
+ # The exchange provides ROE as a decimal (e.g., -0.326 for -32.6%)
|
|
|
+ # We need to multiply by 100 and keep the sign
|
|
|
+ roe = float(roe_raw) * 100
|
|
|
+ roe_text = f"({roe:+.2f}%)"
|
|
|
+ except (ValueError, TypeError):
|
|
|
+ logger.warning(f"Could not parse ROE value: {roe_raw} for {symbol}")
|
|
|
+ roe_text = ""
|
|
|
+ else:
|
|
|
+ logger.warning(f"No ROE data available from exchange for {symbol}")
|
|
|
+ roe_text = ""
|
|
|
+
|
|
|
+ message = f"""🎯 <b>Position Closed (Reconciled)</b>
|
|
|
+
|
|
|
+📊 <b>Details:</b>
|
|
|
+• Token: {token}
|
|
|
+• Direction: {side}
|
|
|
+• Size: {await formatter.format_amount(size, token)}
|
|
|
+• Entry: {await formatter.format_price_with_symbol(entry_price, token)}
|
|
|
+• Exit: {await formatter.format_price_with_symbol(exit_price, token)}
|
|
|
+
|
|
|
+{pnl_emoji} <b>P&L:</b> {await formatter.format_price_with_symbol(pnl)} {roe_text}
|
|
|
+
|
|
|
+⏰ <b>Time:</b> {time_str}
|
|
|
+📊 Use /stats to view performance"""
|
|
|
+
|
|
|
+ elif change_type in ['increased', 'decreased']:
|
|
|
+ side = details['side'].upper()
|
|
|
+ old_size = details['old_size']
|
|
|
+ new_size = details['new_size']
|
|
|
+ size_diff = details['size_diff']
|
|
|
+ emoji = "📈" if change_type == 'increased' else "📉"
|
|
|
+
|
|
|
+ message = f"""{emoji} <b>Position {change_type.title()} (Reconciled)</b>
|
|
|
+
|
|
|
+📊 <b>Details:</b>
|
|
|
+• Token: {token}
|
|
|
+• Direction: {side}
|
|
|
+• Previous Size: {await formatter.format_amount(old_size, token)}
|
|
|
+• New Size: {await formatter.format_amount(new_size, token)}
|
|
|
+• Change: {await formatter.format_amount(size_diff, token)}
|
|
|
+
|
|
|
+⏰ <b>Time:</b> {time_str}
|
|
|
+📈 Use /positions to view current status"""
|
|
|
+ else:
|
|
|
+ return
|
|
|
+
|
|
|
+ await self.notification_manager.send_generic_notification(message.strip())
|
|
|
+ logger.debug(f"📨 Sent {change_type} notification for {symbol}")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ Error sending notification for {symbol}: {e}")
|