|
@@ -1,1341 +0,0 @@
|
|
|
-#!/usr/bin/env python3
|
|
|
-"""
|
|
|
-Monitors positions, external events like trades made outside the bot, and price alarms.
|
|
|
-"""
|
|
|
-
|
|
|
-import logging
|
|
|
-import asyncio
|
|
|
-from datetime import datetime, timedelta, timezone
|
|
|
-from typing import Optional, Dict, Any, List
|
|
|
-
|
|
|
-# Assuming AlarmManager will be moved here or imported appropriately
|
|
|
-# from .alarm_manager import AlarmManager
|
|
|
-from src.monitoring.alarm_manager import AlarmManager # Keep if AlarmManager stays in its own file as per original structure
|
|
|
-from src.utils.token_display_formatter import get_formatter
|
|
|
-
|
|
|
-logger = logging.getLogger(__name__)
|
|
|
-
|
|
|
-class PositionMonitor:
|
|
|
- def __init__(self, trading_engine, notification_manager, alarm_manager, market_monitor_cache, shared_state):
|
|
|
- self.trading_engine = trading_engine
|
|
|
- self.notification_manager = notification_manager
|
|
|
- self.alarm_manager = alarm_manager
|
|
|
- self.market_monitor_cache = market_monitor_cache
|
|
|
- self.shared_state = shared_state # Expected to contain {'external_stop_losses': ...}
|
|
|
- self.last_processed_trade_time: Optional[datetime] = None
|
|
|
- # Add necessary initializations, potentially loading last_processed_trade_time
|
|
|
-
|
|
|
- async def run_monitor_cycle(self):
|
|
|
- """Runs a full monitoring cycle."""
|
|
|
- await self._check_external_trades()
|
|
|
- await self._check_price_alarms()
|
|
|
- await self._reconcile_positions()
|
|
|
-
|
|
|
- def _safe_get_positions(self) -> Optional[List[Dict[str, Any]]]:
|
|
|
- """Safely get positions from trading engine, returning None on API failures instead of empty list."""
|
|
|
- try:
|
|
|
- return self.trading_engine.get_positions()
|
|
|
- except Exception as e:
|
|
|
- logger.warning(f"⚠️ Failed to fetch positions in external event monitor: {e}")
|
|
|
- return None
|
|
|
-
|
|
|
- async def _check_price_alarms(self):
|
|
|
- """Check price alarms and trigger notifications."""
|
|
|
- try:
|
|
|
- active_alarms = self.alarm_manager.get_all_active_alarms()
|
|
|
-
|
|
|
- if not active_alarms:
|
|
|
- return
|
|
|
-
|
|
|
- tokens_to_check = list(set(alarm['token'] for alarm in active_alarms))
|
|
|
-
|
|
|
- for token in tokens_to_check:
|
|
|
- try:
|
|
|
- symbol = f"{token}/USDC:USDC"
|
|
|
- market_data = await self.trading_engine.get_market_data(symbol)
|
|
|
-
|
|
|
- if not market_data or not market_data.get('ticker'):
|
|
|
- continue
|
|
|
-
|
|
|
- current_price = float(market_data['ticker'].get('last', 0))
|
|
|
- if current_price <= 0:
|
|
|
- continue
|
|
|
-
|
|
|
- token_alarms = [alarm for alarm in active_alarms if alarm['token'] == token]
|
|
|
-
|
|
|
- for alarm in token_alarms:
|
|
|
- target_price = alarm['target_price']
|
|
|
- direction = alarm['direction']
|
|
|
-
|
|
|
- should_trigger = False
|
|
|
- if direction == 'above' and current_price >= target_price:
|
|
|
- should_trigger = True
|
|
|
- elif direction == 'below' and current_price <= target_price:
|
|
|
- should_trigger = True
|
|
|
-
|
|
|
- if should_trigger:
|
|
|
- triggered_alarm = self.alarm_manager.trigger_alarm(alarm['id'], current_price)
|
|
|
- if triggered_alarm:
|
|
|
- await self._send_alarm_notification(triggered_alarm)
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"Error checking alarms for {token}: {e}")
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error("❌ Error checking price alarms: {e}")
|
|
|
-
|
|
|
- async def _send_alarm_notification(self, alarm: Dict[str, Any]):
|
|
|
- """Send notification for triggered alarm."""
|
|
|
- try:
|
|
|
- if self.notification_manager:
|
|
|
- await self.notification_manager.send_alarm_triggered_notification(
|
|
|
- alarm['token'],
|
|
|
- alarm['target_price'],
|
|
|
- alarm['triggered_price'],
|
|
|
- alarm['direction']
|
|
|
- )
|
|
|
- else:
|
|
|
- logger.info(f"🔔 ALARM TRIGGERED: {alarm['token']} @ ${alarm['triggered_price']:,.2f}")
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"❌ Error sending alarm notification: {e}")
|
|
|
-
|
|
|
- async def _determine_position_action_type(self, full_symbol: str, side_from_fill: str,
|
|
|
- amount_from_fill: float, existing_lc: Optional[Dict] = None) -> str:
|
|
|
- """
|
|
|
- Determine the type of position action based on current state and fill details.
|
|
|
- Returns one of: 'position_opened', 'position_closed', 'position_increased', 'position_decreased'
|
|
|
- """
|
|
|
- try:
|
|
|
- # Get current position from exchange
|
|
|
- current_positions = self._safe_get_positions()
|
|
|
- if current_positions is None:
|
|
|
- logger.warning(f"⚠️ Failed to fetch positions for {full_symbol} analysis - returning external_unmatched")
|
|
|
- return 'external_unmatched'
|
|
|
-
|
|
|
- current_exchange_position = None
|
|
|
- for pos in current_positions:
|
|
|
- if pos.get('symbol') == full_symbol:
|
|
|
- current_exchange_position = pos
|
|
|
- break
|
|
|
-
|
|
|
- current_size = 0.0
|
|
|
- if current_exchange_position:
|
|
|
- current_size = abs(float(current_exchange_position.get('contracts', 0)))
|
|
|
-
|
|
|
- # If no existing lifecycle, this is a position opening
|
|
|
- if not existing_lc:
|
|
|
- logger.debug(f"🔍 Position analysis: {full_symbol} no existing lifecycle, current size: {current_size}")
|
|
|
- if current_size > 1e-9: # Position exists on exchange
|
|
|
- return 'position_opened'
|
|
|
- else:
|
|
|
- return 'external_unmatched'
|
|
|
-
|
|
|
- # Get previous position size from lifecycle
|
|
|
- previous_size = existing_lc.get('current_position_size', 0)
|
|
|
- lc_position_side = existing_lc.get('position_side')
|
|
|
-
|
|
|
- logger.debug(f"🔍 Position analysis: {full_symbol} {side_from_fill} {amount_from_fill}")
|
|
|
- logger.debug(f" Lifecycle side: {lc_position_side}, previous size: {previous_size}, current size: {current_size}")
|
|
|
-
|
|
|
- # Check if this is a closing trade (opposite side)
|
|
|
- is_closing_trade = False
|
|
|
- if lc_position_side == 'long' and side_from_fill.lower() == 'sell':
|
|
|
- is_closing_trade = True
|
|
|
- elif lc_position_side == 'short' and side_from_fill.lower() == 'buy':
|
|
|
- is_closing_trade = True
|
|
|
-
|
|
|
- logger.debug(f" Is closing trade: {is_closing_trade}")
|
|
|
-
|
|
|
- if is_closing_trade:
|
|
|
- if current_size < 1e-9: # Position is now closed
|
|
|
- logger.debug(f" → Position closed (current_size < 1e-9)")
|
|
|
- return 'position_closed'
|
|
|
- elif current_size < previous_size - 1e-9: # Position reduced but not closed
|
|
|
- logger.debug(f" → Position decreased (current_size {current_size} < previous_size - 1e-9 {previous_size - 1e-9})")
|
|
|
- return 'position_decreased'
|
|
|
- else:
|
|
|
- # Same side trade - position increase
|
|
|
- logger.debug(f" Same side trade check: current_size {current_size} > previous_size + 1e-9 {previous_size + 1e-9}?")
|
|
|
- if current_size > previous_size + 1e-9:
|
|
|
- logger.debug(f" → Position increased")
|
|
|
- return 'position_increased'
|
|
|
- else:
|
|
|
- logger.debug(f" → Size check failed, not enough increase")
|
|
|
-
|
|
|
- # Default fallback
|
|
|
- logger.debug(f" → Fallback to external_unmatched")
|
|
|
- return 'external_unmatched'
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"Error determining position action type: {e}")
|
|
|
- return 'external_unmatched'
|
|
|
-
|
|
|
- async def _update_lifecycle_position_size(self, lifecycle_id: str, new_size: float) -> bool:
|
|
|
- """Update the current position size in the lifecycle."""
|
|
|
- try:
|
|
|
- stats = self.trading_engine.get_stats()
|
|
|
- if not stats:
|
|
|
- return False
|
|
|
-
|
|
|
- # Update the current position size
|
|
|
- success = stats.trade_manager.update_trade_market_data(
|
|
|
- lifecycle_id, current_position_size=new_size
|
|
|
- )
|
|
|
- return success
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"Error updating lifecycle position size: {e}")
|
|
|
- return False
|
|
|
-
|
|
|
- async def _send_position_change_notification(self, full_symbol: str, side_from_fill: str,
|
|
|
- amount_from_fill: float, price_from_fill: float,
|
|
|
- action_type: str, timestamp_dt: datetime,
|
|
|
- existing_lc: Optional[Dict] = None,
|
|
|
- realized_pnl: Optional[float] = None):
|
|
|
- """Send position change notification."""
|
|
|
- try:
|
|
|
- if not self.notification_manager:
|
|
|
- return
|
|
|
-
|
|
|
- token = full_symbol.split('/')[0] if '/' in full_symbol else full_symbol.split(':')[0]
|
|
|
- time_str = timestamp_dt.strftime('%Y-%m-%d %H:%M:%S UTC')
|
|
|
- formatter = get_formatter()
|
|
|
-
|
|
|
- if action_type == 'position_closed' and existing_lc:
|
|
|
- position_side = existing_lc.get('position_side', 'unknown').upper()
|
|
|
- entry_price = existing_lc.get('entry_price', 0)
|
|
|
- pnl_emoji = "🟢" if realized_pnl and realized_pnl >= 0 else "🔴"
|
|
|
- pnl_text = f"{await formatter.format_price_with_symbol(realized_pnl)}" if realized_pnl is not None else "N/A"
|
|
|
-
|
|
|
- # Use the last known ROE from heartbeat data (stored in lifecycle)
|
|
|
- stored_roe = existing_lc.get('roe_percentage')
|
|
|
- if stored_roe is not None:
|
|
|
- try:
|
|
|
- roe = float(stored_roe)
|
|
|
- roe_text = f" ({roe:+.2f}%)"
|
|
|
- logger.debug(f"Using stored ROE from heartbeat for {full_symbol}: {roe:+.2f}%")
|
|
|
- except (ValueError, TypeError):
|
|
|
- logger.warning(f"Could not parse stored ROE value: {stored_roe} for {full_symbol}")
|
|
|
- roe_text = ""
|
|
|
- else:
|
|
|
- logger.debug(f"No stored ROE available for {full_symbol}")
|
|
|
- roe_text = ""
|
|
|
-
|
|
|
- message = f"""
|
|
|
-🎯 <b>Position Closed</b>
|
|
|
-
|
|
|
-📊 <b>Trade Details:</b>
|
|
|
-• Token: {token}
|
|
|
-• Direction: {position_side}
|
|
|
-• Size Closed: {await formatter.format_amount(amount_from_fill, token)}
|
|
|
-• Entry Price: {await formatter.format_price_with_symbol(entry_price, token)}
|
|
|
-• Exit Price: {await formatter.format_price_with_symbol(price_from_fill, token)}
|
|
|
-• Exit Value: {await formatter.format_price_with_symbol(amount_from_fill * price_from_fill)}
|
|
|
-
|
|
|
-{pnl_emoji} <b>P&L:</b> {pnl_text}{roe_text}
|
|
|
-✅ <b>Status:</b> {position_side} position closed externally
|
|
|
-⏰ <b>Time:</b> {time_str}
|
|
|
-
|
|
|
-📊 Use /stats to view updated performance
|
|
|
- """
|
|
|
-
|
|
|
- elif action_type == 'position_opened':
|
|
|
- position_side = 'LONG' if side_from_fill.lower() == 'buy' else 'SHORT'
|
|
|
- message = f"""
|
|
|
-🚀 <b>Position Opened (External)</b>
|
|
|
-
|
|
|
-📊 <b>Trade Details:</b>
|
|
|
-• Token: {token}
|
|
|
-• Direction: {position_side}
|
|
|
-• Size: {await formatter.format_amount(amount_from_fill, token)}
|
|
|
-• Entry Price: {await formatter.format_price_with_symbol(price_from_fill, token)}
|
|
|
-• Position Value: {await formatter.format_price_with_symbol(amount_from_fill * price_from_fill)}
|
|
|
-
|
|
|
-✅ <b>Status:</b> New {position_side} position opened externally
|
|
|
-⏰ <b>Time:</b> {time_str}
|
|
|
-
|
|
|
-📱 Use /positions to view all positions
|
|
|
- """
|
|
|
-
|
|
|
- elif action_type == 'position_increased' and existing_lc:
|
|
|
- position_side = existing_lc.get('position_side', 'unknown').upper()
|
|
|
- previous_size = existing_lc.get('current_position_size', 0)
|
|
|
- # Get current size from exchange
|
|
|
- current_positions = self._safe_get_positions()
|
|
|
- if current_positions is None:
|
|
|
- # Skip notification if we can't get position data
|
|
|
- logger.warning(f"⚠️ Failed to fetch positions for notification - skipping {action_type} notification")
|
|
|
- return
|
|
|
- current_size = 0
|
|
|
- for pos in current_positions:
|
|
|
- if pos.get('symbol') == full_symbol:
|
|
|
- current_size = abs(float(pos.get('contracts', 0)))
|
|
|
- break
|
|
|
-
|
|
|
- message = f"""
|
|
|
-📈 <b>Position Increased (External)</b>
|
|
|
-
|
|
|
-📊 <b>Trade Details:</b>
|
|
|
-• Token: {token}
|
|
|
-• Direction: {position_side}
|
|
|
-• Size Added: {await formatter.format_amount(amount_from_fill, token)}
|
|
|
-• Add Price: {await formatter.format_price_with_symbol(price_from_fill, token)}
|
|
|
-• Previous Size: {await formatter.format_amount(previous_size, token)}
|
|
|
-• New Size: {await formatter.format_amount(current_size, token)}
|
|
|
-• Add Value: {await formatter.format_price_with_symbol(amount_from_fill * price_from_fill)}
|
|
|
-
|
|
|
-📈 <b>Status:</b> {position_side} position size increased externally
|
|
|
-⏰ <b>Time:</b> {time_str}
|
|
|
-
|
|
|
-📈 Use /positions to view current position
|
|
|
- """
|
|
|
-
|
|
|
- elif action_type == 'position_decreased' and existing_lc:
|
|
|
- position_side = existing_lc.get('position_side', 'unknown').upper()
|
|
|
- previous_size = existing_lc.get('current_position_size', 0)
|
|
|
- entry_price = existing_lc.get('entry_price', 0)
|
|
|
-
|
|
|
- # Get current size from exchange
|
|
|
- current_positions = self._safe_get_positions()
|
|
|
- if current_positions is None:
|
|
|
- # Skip notification if we can't get position data
|
|
|
- logger.warning(f"⚠️ Failed to fetch positions for notification - skipping {action_type} notification")
|
|
|
- return
|
|
|
- current_size = 0
|
|
|
- for pos in current_positions:
|
|
|
- if pos.get('symbol') == full_symbol:
|
|
|
- current_size = abs(float(pos.get('contracts', 0)))
|
|
|
- break
|
|
|
-
|
|
|
- # Calculate partial PnL for the reduced amount
|
|
|
- partial_pnl = 0
|
|
|
- if entry_price > 0:
|
|
|
- if position_side == 'LONG':
|
|
|
- partial_pnl = amount_from_fill * (price_from_fill - entry_price)
|
|
|
- else: # SHORT
|
|
|
- partial_pnl = amount_from_fill * (entry_price - price_from_fill)
|
|
|
-
|
|
|
- pnl_emoji = "🟢" if partial_pnl >= 0 else "🔴"
|
|
|
-
|
|
|
- # Calculate ROE for the partial close
|
|
|
- roe_text = ""
|
|
|
- if entry_price > 0 and amount_from_fill > 0:
|
|
|
- cost_basis = amount_from_fill * entry_price
|
|
|
- roe = (partial_pnl / cost_basis) * 100
|
|
|
- roe_text = f" ({roe:+.2f}%)"
|
|
|
-
|
|
|
- message = f"""
|
|
|
-📉 <b>Position Decreased (External)</b>
|
|
|
-
|
|
|
-📊 <b>Trade Details:</b>
|
|
|
-• Token: {token}
|
|
|
-• Direction: {position_side}
|
|
|
-• Size Reduced: {await formatter.format_amount(amount_from_fill, token)}
|
|
|
-• Exit Price: {await formatter.format_price_with_symbol(price_from_fill, token)}
|
|
|
-• Previous Size: {await formatter.format_amount(previous_size, token)}
|
|
|
-• Remaining Size: {await formatter.format_amount(current_size, token)}
|
|
|
-• Exit Value: {await formatter.format_price_with_symbol(amount_from_fill * price_from_fill)}
|
|
|
-
|
|
|
-{pnl_emoji} <b>Partial P&L:</b> {await formatter.format_price_with_symbol(partial_pnl)}{roe_text}
|
|
|
-📉 <b>Status:</b> {position_side} position size decreased externally
|
|
|
-⏰ <b>Time:</b> {time_str}
|
|
|
-
|
|
|
-📊 Position remains open. Use /positions to view details
|
|
|
- """
|
|
|
- else:
|
|
|
- # No fallback notification sent - only position-based notifications per user preference
|
|
|
- logger.debug(f"No notification sent for action_type: {action_type}")
|
|
|
- return
|
|
|
-
|
|
|
- await self.notification_manager.send_generic_notification(message.strip())
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"Error sending position change notification: {e}")
|
|
|
-
|
|
|
- async def _auto_sync_single_position(self, symbol: str, exchange_position: Dict[str, Any], stats) -> bool:
|
|
|
- """Auto-sync a single orphaned position to create a lifecycle record."""
|
|
|
- try:
|
|
|
- import uuid
|
|
|
- from src.utils.token_display_formatter import get_formatter
|
|
|
-
|
|
|
- formatter = get_formatter()
|
|
|
- contracts_abs = abs(float(exchange_position.get('contracts', 0)))
|
|
|
-
|
|
|
- if contracts_abs <= 1e-9:
|
|
|
- return False
|
|
|
-
|
|
|
- entry_price_from_exchange = float(exchange_position.get('entryPrice', 0)) or float(exchange_position.get('entryPx', 0))
|
|
|
-
|
|
|
- # Determine position side
|
|
|
- position_side, order_side = '', ''
|
|
|
- ccxt_side = exchange_position.get('side', '').lower()
|
|
|
- if ccxt_side == 'long':
|
|
|
- position_side, order_side = 'long', 'buy'
|
|
|
- elif ccxt_side == 'short':
|
|
|
- position_side, order_side = 'short', 'sell'
|
|
|
-
|
|
|
- if not position_side:
|
|
|
- contracts_val = float(exchange_position.get('contracts', 0))
|
|
|
- if contracts_val > 1e-9:
|
|
|
- position_side, order_side = 'long', 'buy'
|
|
|
- elif contracts_val < -1e-9:
|
|
|
- position_side, order_side = 'short', 'sell'
|
|
|
- else:
|
|
|
- return False
|
|
|
-
|
|
|
- if not position_side:
|
|
|
- logger.error(f"AUTO-SYNC: Could not determine position side for {symbol}.")
|
|
|
- return False
|
|
|
-
|
|
|
- final_entry_price = entry_price_from_exchange
|
|
|
- if not final_entry_price or final_entry_price <= 0:
|
|
|
- # Fallback to a reasonable estimate (current mark price)
|
|
|
- mark_price = float(exchange_position.get('markPrice', 0)) or float(exchange_position.get('markPx', 0))
|
|
|
- if mark_price > 0:
|
|
|
- final_entry_price = mark_price
|
|
|
- else:
|
|
|
- logger.error(f"AUTO-SYNC: Could not determine entry price for {symbol}.")
|
|
|
- return False
|
|
|
-
|
|
|
- logger.info(f"🔄 AUTO-SYNC: Creating lifecycle for {symbol} {position_side.upper()} {contracts_abs} @ {await formatter.format_price_with_symbol(final_entry_price, symbol)}")
|
|
|
-
|
|
|
- unique_sync_id = str(uuid.uuid4())[:8]
|
|
|
- lifecycle_id = stats.create_trade_lifecycle(
|
|
|
- symbol=symbol,
|
|
|
- side=order_side,
|
|
|
- entry_order_id=f"external_sync_{unique_sync_id}",
|
|
|
- trade_type='external_sync'
|
|
|
- )
|
|
|
-
|
|
|
- if lifecycle_id:
|
|
|
- success = await stats.update_trade_position_opened(
|
|
|
- lifecycle_id,
|
|
|
- final_entry_price,
|
|
|
- contracts_abs,
|
|
|
- f"external_fill_sync_{unique_sync_id}"
|
|
|
- )
|
|
|
-
|
|
|
- if success:
|
|
|
- logger.info(f"✅ AUTO-SYNC: Successfully synced position for {symbol} (Lifecycle: {lifecycle_id[:8]})")
|
|
|
-
|
|
|
- # Send position opened notification for auto-synced position
|
|
|
- try:
|
|
|
- await self._send_position_change_notification(
|
|
|
- symbol, order_side, contracts_abs, final_entry_price,
|
|
|
- 'position_opened', datetime.now(timezone.utc)
|
|
|
- )
|
|
|
- logger.info(f"📨 AUTO-SYNC: Sent position opened notification for {symbol}")
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"❌ AUTO-SYNC: Failed to send notification for {symbol}: {e}")
|
|
|
-
|
|
|
- return True
|
|
|
- else:
|
|
|
- logger.error(f"❌ AUTO-SYNC: Failed to update lifecycle to 'position_opened' for {symbol}")
|
|
|
- else:
|
|
|
- logger.error(f"❌ AUTO-SYNC: Failed to create lifecycle for {symbol}")
|
|
|
-
|
|
|
- return False
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"❌ AUTO-SYNC: Error syncing position for {symbol}: {e}")
|
|
|
- return False
|
|
|
-
|
|
|
- async def _check_external_trades(self):
|
|
|
- """Check for external trades and update internal tracking."""
|
|
|
- stats = self.trading_engine.stats
|
|
|
- if not stats:
|
|
|
- logger.warning("No stats manager available for external trade checking")
|
|
|
- return
|
|
|
-
|
|
|
- try:
|
|
|
- external_trades_processed = 0
|
|
|
- symbols_with_fills = set()
|
|
|
- processed_fills_this_cycle = set() # Track fills processed in this cycle
|
|
|
-
|
|
|
- recent_fills = self.trading_engine.get_recent_fills()
|
|
|
- if not recent_fills:
|
|
|
- logger.debug("No recent fills data available")
|
|
|
- return
|
|
|
-
|
|
|
- if not hasattr(self, 'last_processed_trade_time') or self.last_processed_trade_time is None:
|
|
|
- try:
|
|
|
- # Ensure this metadata key is the one used by MarketMonitor for saving this state.
|
|
|
- last_time_str = stats._get_metadata('market_monitor_last_processed_trade_time')
|
|
|
- if last_time_str:
|
|
|
- self.last_processed_trade_time = datetime.fromisoformat(last_time_str).replace(tzinfo=timezone.utc)
|
|
|
- else:
|
|
|
- self.last_processed_trade_time = datetime.now(timezone.utc) - timedelta(hours=1)
|
|
|
- except Exception:
|
|
|
- self.last_processed_trade_time = datetime.now(timezone.utc) - timedelta(hours=1)
|
|
|
-
|
|
|
- for fill in recent_fills:
|
|
|
- try:
|
|
|
- trade_id = fill.get('id')
|
|
|
- timestamp_ms = fill.get('timestamp')
|
|
|
- symbol_from_fill = fill.get('symbol')
|
|
|
- side_from_fill = fill.get('side')
|
|
|
- amount_from_fill = float(fill.get('amount', 0))
|
|
|
- price_from_fill = float(fill.get('price', 0))
|
|
|
-
|
|
|
- timestamp_dt = datetime.fromtimestamp(timestamp_ms / 1000, tz=timezone.utc) if timestamp_ms else datetime.now(timezone.utc)
|
|
|
-
|
|
|
- if self.last_processed_trade_time and timestamp_dt <= self.last_processed_trade_time:
|
|
|
- continue
|
|
|
-
|
|
|
- # Check if this fill has already been processed to prevent duplicates
|
|
|
- if trade_id and stats.has_exchange_fill_been_processed(str(trade_id)):
|
|
|
- logger.debug(f"Skipping already processed fill: {trade_id}")
|
|
|
- continue
|
|
|
-
|
|
|
- # Check if this fill was already processed in this cycle
|
|
|
- if trade_id and trade_id in processed_fills_this_cycle:
|
|
|
- logger.debug(f"Skipping fill already processed in this cycle: {trade_id}")
|
|
|
- continue
|
|
|
-
|
|
|
- fill_processed_this_iteration = False
|
|
|
-
|
|
|
- if not (symbol_from_fill and side_from_fill and amount_from_fill > 0 and price_from_fill > 0):
|
|
|
- logger.warning(f"Skipping fill with incomplete data: {fill}")
|
|
|
- continue
|
|
|
-
|
|
|
- full_symbol = symbol_from_fill
|
|
|
- token = symbol_from_fill.split('/')[0] if '/' in symbol_from_fill else symbol_from_fill.split(':')[0]
|
|
|
-
|
|
|
- exchange_order_id_from_fill = fill.get('info', {}).get('oid')
|
|
|
-
|
|
|
- # First check if this is a pending entry order fill
|
|
|
- if exchange_order_id_from_fill:
|
|
|
- pending_lc = stats.get_lifecycle_by_entry_order_id(exchange_order_id_from_fill, status='pending')
|
|
|
- if pending_lc and pending_lc.get('symbol') == full_symbol:
|
|
|
- success = await stats.update_trade_position_opened(
|
|
|
- lifecycle_id=pending_lc['trade_lifecycle_id'],
|
|
|
- entry_price=price_from_fill,
|
|
|
- entry_amount=amount_from_fill,
|
|
|
- exchange_fill_id=trade_id
|
|
|
- )
|
|
|
- if success:
|
|
|
- logger.info(f"📈 Lifecycle ENTRY: {pending_lc['trade_lifecycle_id']} for {full_symbol} updated by fill {trade_id}.")
|
|
|
- symbols_with_fills.add(token)
|
|
|
- order_in_db_for_entry = stats.get_order_by_exchange_id(exchange_order_id_from_fill)
|
|
|
- if order_in_db_for_entry:
|
|
|
- stats.update_order_status(order_db_id=order_in_db_for_entry['id'], new_status='filled', amount_filled_increment=amount_from_fill)
|
|
|
-
|
|
|
- # Send position opened notification (this is a bot-initiated position)
|
|
|
- await self._send_position_change_notification(
|
|
|
- full_symbol, side_from_fill, amount_from_fill, price_from_fill,
|
|
|
- 'position_opened', timestamp_dt
|
|
|
- )
|
|
|
-
|
|
|
- # Mark fill as processed in this cycle
|
|
|
- if trade_id:
|
|
|
- processed_fills_this_cycle.add(trade_id)
|
|
|
- fill_processed_this_iteration = True
|
|
|
-
|
|
|
- # Check if this is a bot order to increase an existing position
|
|
|
- if not fill_processed_this_iteration and exchange_order_id_from_fill:
|
|
|
- bot_order_for_fill = stats.get_order_by_exchange_id(exchange_order_id_from_fill)
|
|
|
- if bot_order_for_fill and bot_order_for_fill.get('symbol') == full_symbol:
|
|
|
- order_type = bot_order_for_fill.get('type')
|
|
|
- order_side = bot_order_for_fill.get('side')
|
|
|
-
|
|
|
- # Check if this is a limit order that should increase an existing position
|
|
|
- if order_type == 'limit':
|
|
|
- existing_lc = stats.get_trade_by_symbol_and_status(full_symbol, 'position_opened')
|
|
|
- if existing_lc:
|
|
|
- lc_pos_side = existing_lc.get('position_side')
|
|
|
- # Check if this is a same-side order (position increase)
|
|
|
- if ((lc_pos_side == 'long' and order_side == 'buy' and side_from_fill == 'buy') or
|
|
|
- (lc_pos_side == 'short' and order_side == 'sell' and side_from_fill == 'sell')):
|
|
|
-
|
|
|
- # Update existing position size
|
|
|
- current_positions = self._safe_get_positions()
|
|
|
- if current_positions is not None:
|
|
|
- new_size = 0
|
|
|
- for pos in current_positions:
|
|
|
- if pos.get('symbol') == full_symbol:
|
|
|
- new_size = abs(float(pos.get('contracts', 0)))
|
|
|
- break
|
|
|
-
|
|
|
- # Update lifecycle with new position size
|
|
|
- await self._update_lifecycle_position_size(existing_lc['trade_lifecycle_id'], new_size)
|
|
|
-
|
|
|
- # Mark order as filled
|
|
|
- stats.update_order_status(order_db_id=bot_order_for_fill['id'], new_status='filled', amount_filled_increment=amount_from_fill)
|
|
|
-
|
|
|
- # Send position increased notification
|
|
|
- await self._send_position_change_notification(
|
|
|
- full_symbol, side_from_fill, amount_from_fill, price_from_fill,
|
|
|
- 'position_increased', timestamp_dt, existing_lc
|
|
|
- )
|
|
|
-
|
|
|
- logger.info(f"📈 Position INCREASED: {full_symbol} by {amount_from_fill} for lifecycle {existing_lc['trade_lifecycle_id'][:8]}...")
|
|
|
- symbols_with_fills.add(token)
|
|
|
-
|
|
|
- # Mark fill as processed in this cycle
|
|
|
- if trade_id:
|
|
|
- processed_fills_this_cycle.add(trade_id)
|
|
|
- fill_processed_this_iteration = True
|
|
|
-
|
|
|
- # Check if this is a known bot order (SL/TP/exit)
|
|
|
- if not fill_processed_this_iteration and exchange_order_id_from_fill:
|
|
|
- active_lc = None
|
|
|
- closure_reason_action_type = None
|
|
|
- bot_order_db_id_to_update = None
|
|
|
-
|
|
|
- bot_order_for_fill = stats.get_order_by_exchange_id(exchange_order_id_from_fill)
|
|
|
- if bot_order_for_fill and bot_order_for_fill.get('symbol') == full_symbol:
|
|
|
- order_type = bot_order_for_fill.get('type')
|
|
|
- order_side = bot_order_for_fill.get('side')
|
|
|
- if order_type == 'market':
|
|
|
- potential_lc = stats.get_trade_by_symbol_and_status(full_symbol, 'position_opened')
|
|
|
- if potential_lc:
|
|
|
- lc_pos_side = potential_lc.get('position_side')
|
|
|
- if (lc_pos_side == 'long' and order_side == 'sell' and side_from_fill == 'sell') or \
|
|
|
- (lc_pos_side == 'short' and order_side == 'buy' and side_from_fill == 'buy'):
|
|
|
- active_lc = potential_lc
|
|
|
- closure_reason_action_type = f"bot_exit_{lc_pos_side}_close"
|
|
|
- bot_order_db_id_to_update = bot_order_for_fill.get('id')
|
|
|
- logger.info(f"ℹ️ Lifecycle BOT EXIT: Fill {trade_id} (OID {exchange_order_id_from_fill}) for {full_symbol} matches bot exit for lifecycle {active_lc['trade_lifecycle_id']}.")
|
|
|
-
|
|
|
- if not active_lc:
|
|
|
- lc_by_sl = stats.get_lifecycle_by_sl_order_id(exchange_order_id_from_fill, status='position_opened')
|
|
|
- if lc_by_sl and lc_by_sl.get('symbol') == full_symbol:
|
|
|
- active_lc = lc_by_sl
|
|
|
- closure_reason_action_type = f"sl_{active_lc.get('position_side')}_close"
|
|
|
- bot_order_db_id_to_update = bot_order_for_fill.get('id')
|
|
|
- logger.info(f"ℹ️ Lifecycle SL: Fill {trade_id} for OID {exchange_order_id_from_fill} matches SL for lifecycle {active_lc['trade_lifecycle_id']}.")
|
|
|
-
|
|
|
- if not active_lc:
|
|
|
- lc_by_tp = stats.get_lifecycle_by_tp_order_id(exchange_order_id_from_fill, status='position_opened')
|
|
|
- if lc_by_tp and lc_by_tp.get('symbol') == full_symbol:
|
|
|
- active_lc = lc_by_tp
|
|
|
- closure_reason_action_type = f"tp_{active_lc.get('position_side')}_close"
|
|
|
- bot_order_db_id_to_update = bot_order_for_fill.get('id')
|
|
|
- logger.info(f"ℹ️ Lifecycle TP: Fill {trade_id} for OID {exchange_order_id_from_fill} matches TP for lifecycle {active_lc['trade_lifecycle_id']}.")
|
|
|
-
|
|
|
- # Process known bot order fills
|
|
|
- if active_lc and closure_reason_action_type:
|
|
|
- lc_id = active_lc['trade_lifecycle_id']
|
|
|
- lc_entry_price = active_lc.get('entry_price', 0)
|
|
|
- lc_position_side = active_lc.get('position_side')
|
|
|
-
|
|
|
- realized_pnl = 0
|
|
|
- if lc_position_side == 'long':
|
|
|
- realized_pnl = amount_from_fill * (price_from_fill - lc_entry_price)
|
|
|
- elif lc_position_side == 'short':
|
|
|
- realized_pnl = amount_from_fill * (lc_entry_price - price_from_fill)
|
|
|
-
|
|
|
- success = stats.update_trade_position_closed(
|
|
|
- lifecycle_id=lc_id, exit_price=price_from_fill,
|
|
|
- realized_pnl=realized_pnl, exchange_fill_id=trade_id
|
|
|
- )
|
|
|
- if success:
|
|
|
- pnl_emoji = "🟢" if realized_pnl >= 0 else "🔴"
|
|
|
- formatter = get_formatter()
|
|
|
- logger.info(f"{pnl_emoji} Lifecycle CLOSED: {lc_id} ({closure_reason_action_type}). PNL for fill: {await formatter.format_price_with_symbol(realized_pnl)}")
|
|
|
- symbols_with_fills.add(token)
|
|
|
-
|
|
|
- # Send notification immediately with correct PnL from actual fill
|
|
|
- await self._send_position_change_notification(
|
|
|
- full_symbol, side_from_fill, amount_from_fill, price_from_fill,
|
|
|
- 'position_closed', timestamp_dt, active_lc, realized_pnl
|
|
|
- )
|
|
|
-
|
|
|
- stats.migrate_trade_to_aggregated_stats(lc_id)
|
|
|
- if bot_order_db_id_to_update:
|
|
|
- stats.update_order_status(order_db_id=bot_order_db_id_to_update, new_status='filled', amount_filled_increment=amount_from_fill)
|
|
|
-
|
|
|
- # Mark fill as processed in this cycle
|
|
|
- if trade_id:
|
|
|
- processed_fills_this_cycle.add(trade_id)
|
|
|
- fill_processed_this_iteration = True
|
|
|
-
|
|
|
- # Check for external stop losses
|
|
|
- if not fill_processed_this_iteration:
|
|
|
- if (exchange_order_id_from_fill and
|
|
|
- self.shared_state.get('external_stop_losses') and
|
|
|
- exchange_order_id_from_fill in self.shared_state['external_stop_losses']):
|
|
|
- stop_loss_info = self.shared_state['external_stop_losses'][exchange_order_id_from_fill]
|
|
|
- formatter = get_formatter()
|
|
|
- logger.info(f"🛑 External SL (MM Tracking): {token} Order {exchange_order_id_from_fill} filled @ {await formatter.format_price_with_symbol(price_from_fill, token)}")
|
|
|
-
|
|
|
- sl_active_lc = stats.get_trade_by_symbol_and_status(full_symbol, 'position_opened')
|
|
|
- if sl_active_lc:
|
|
|
- lc_id = sl_active_lc['trade_lifecycle_id']
|
|
|
- lc_entry_price = sl_active_lc.get('entry_price', 0)
|
|
|
- lc_pos_side = sl_active_lc.get('position_side')
|
|
|
- realized_pnl = amount_from_fill * (price_from_fill - lc_entry_price) if lc_pos_side == 'long' else amount_from_fill * (lc_entry_price - price_from_fill)
|
|
|
-
|
|
|
- success = stats.update_trade_position_closed(lc_id, price_from_fill, realized_pnl, trade_id)
|
|
|
- if success:
|
|
|
- pnl_emoji = "🟢" if realized_pnl >= 0 else "🔴"
|
|
|
- logger.info(f"{pnl_emoji} Lifecycle CLOSED by External SL (MM): {lc_id}. PNL: {await formatter.format_price_with_symbol(realized_pnl)}")
|
|
|
- if self.notification_manager:
|
|
|
- await self.notification_manager.send_stop_loss_execution_notification(
|
|
|
- stop_loss_info, full_symbol, side_from_fill, amount_from_fill, price_from_fill,
|
|
|
- f'{lc_pos_side}_closed_external_sl', timestamp_dt.isoformat(), realized_pnl
|
|
|
- )
|
|
|
- stats.migrate_trade_to_aggregated_stats(lc_id)
|
|
|
- # Modify shared state carefully
|
|
|
- if exchange_order_id_from_fill in self.shared_state['external_stop_losses']:
|
|
|
- del self.shared_state['external_stop_losses'][exchange_order_id_from_fill]
|
|
|
-
|
|
|
- # Mark fill as processed in this cycle
|
|
|
- if trade_id:
|
|
|
- processed_fills_this_cycle.add(trade_id)
|
|
|
- fill_processed_this_iteration = True
|
|
|
- else:
|
|
|
- logger.warning(f"⚠️ External SL (MM) {exchange_order_id_from_fill} for {full_symbol}, but no active lifecycle found.")
|
|
|
-
|
|
|
- # NEW: Enhanced external trade processing with position state detection
|
|
|
- if not fill_processed_this_iteration:
|
|
|
- existing_lc = stats.get_trade_by_symbol_and_status(full_symbol, 'position_opened')
|
|
|
-
|
|
|
- # If no lifecycle exists but we have a position on exchange, try to auto-sync first
|
|
|
- if not existing_lc:
|
|
|
- current_positions = self._safe_get_positions()
|
|
|
- if current_positions is None:
|
|
|
- logger.warning("⚠️ Failed to fetch positions for external trade detection - skipping this fill")
|
|
|
- continue
|
|
|
- exchange_position = None
|
|
|
- for pos in current_positions:
|
|
|
- if pos.get('symbol') == full_symbol:
|
|
|
- exchange_position = pos
|
|
|
- break
|
|
|
-
|
|
|
- if exchange_position and abs(float(exchange_position.get('contracts', 0))) > 1e-9:
|
|
|
- logger.info(f"🔄 AUTO-SYNC: Position exists on exchange for {full_symbol} but no lifecycle found. Auto-syncing before processing fill.")
|
|
|
- success = await self._auto_sync_single_position(full_symbol, exchange_position, stats)
|
|
|
- if success:
|
|
|
- # Re-check for lifecycle after auto-sync
|
|
|
- existing_lc = stats.get_trade_by_symbol_and_status(full_symbol, 'position_opened')
|
|
|
-
|
|
|
- action_type = await self._determine_position_action_type(
|
|
|
- full_symbol, side_from_fill, amount_from_fill, existing_lc
|
|
|
- )
|
|
|
-
|
|
|
- logger.info(f"🔍 External fill analysis: {full_symbol} {side_from_fill} {amount_from_fill} -> {action_type}")
|
|
|
-
|
|
|
- # Additional debug logging for position changes
|
|
|
- if existing_lc:
|
|
|
- previous_size = existing_lc.get('current_position_size', 0)
|
|
|
- current_positions = self._safe_get_positions()
|
|
|
- if current_positions is None:
|
|
|
- logger.warning("⚠️ Failed to fetch positions for debug logging - skipping debug info")
|
|
|
- # Set defaults to avoid reference errors
|
|
|
- current_size = previous_size
|
|
|
- else:
|
|
|
- current_size = 0
|
|
|
- for pos in current_positions:
|
|
|
- if pos.get('symbol') == full_symbol:
|
|
|
- current_size = abs(float(pos.get('contracts', 0)))
|
|
|
- break
|
|
|
- logger.info(f"📊 Position size change: {previous_size} -> {current_size} (diff: {current_size - previous_size})")
|
|
|
- logger.info(f"🎯 Expected change based on fill: {'+' if side_from_fill.lower() == 'buy' else '-'}{amount_from_fill}")
|
|
|
-
|
|
|
- # If lifecycle is already closed, we should not re-process as a new closure.
|
|
|
- if existing_lc.get('status') == 'position_closed':
|
|
|
- logger.info(f"ℹ️ Fill {trade_id} received for already closed lifecycle {existing_lc['trade_lifecycle_id']}. Recording fill and skipping further action.")
|
|
|
- stats.record_trade(
|
|
|
- full_symbol, side_from_fill, amount_from_fill, price_from_fill,
|
|
|
- exchange_fill_id=trade_id, trade_type="external_fill_for_closed_pos",
|
|
|
- pnl=None, timestamp=timestamp_dt.isoformat(),
|
|
|
- linked_order_table_id_to_link=None
|
|
|
- )
|
|
|
- fill_processed_this_iteration = True
|
|
|
- continue # Skip to next fill
|
|
|
-
|
|
|
- # Check if this might be a position decrease that was misclassified
|
|
|
- if (action_type == 'external_unmatched' and
|
|
|
- existing_lc.get('position_side') == 'long' and
|
|
|
- side_from_fill.lower() == 'sell' and
|
|
|
- current_size < previous_size):
|
|
|
- logger.warning(f"⚠️ Potential misclassification: {full_symbol} {side_from_fill} looks like position decrease but classified as external_unmatched")
|
|
|
- # Force re-check with proper parameters
|
|
|
- action_type = 'position_decreased'
|
|
|
- logger.info(f"🔄 Corrected action_type to: {action_type}")
|
|
|
- elif (action_type == 'external_unmatched' and
|
|
|
- existing_lc.get('position_side') == 'long' and
|
|
|
- side_from_fill.lower() == 'buy' and
|
|
|
- current_size > previous_size):
|
|
|
- logger.warning(f"⚠️ Potential misclassification: {full_symbol} {side_from_fill} looks like position increase but classified as external_unmatched")
|
|
|
- action_type = 'position_increased'
|
|
|
- logger.info(f"🔄 Corrected action_type to: {action_type}")
|
|
|
-
|
|
|
- if action_type == 'position_opened':
|
|
|
- # Create new lifecycle for external position
|
|
|
- lifecycle_id = stats.create_trade_lifecycle(
|
|
|
- symbol=full_symbol,
|
|
|
- side=side_from_fill,
|
|
|
- entry_order_id=exchange_order_id_from_fill or f"external_{trade_id}",
|
|
|
- trade_type="external"
|
|
|
- )
|
|
|
-
|
|
|
- if lifecycle_id:
|
|
|
- success = stats.update_trade_position_opened(
|
|
|
- lifecycle_id=lifecycle_id,
|
|
|
- entry_price=price_from_fill,
|
|
|
- entry_amount=amount_from_fill,
|
|
|
- exchange_fill_id=trade_id
|
|
|
- )
|
|
|
-
|
|
|
- if success:
|
|
|
- logger.info(f"📈 Created and opened new external lifecycle: {lifecycle_id[:8]} for {full_symbol}")
|
|
|
- symbols_with_fills.add(token)
|
|
|
-
|
|
|
- # Send position opened notification
|
|
|
- await self._send_position_change_notification(
|
|
|
- full_symbol, side_from_fill, amount_from_fill, price_from_fill,
|
|
|
- action_type, timestamp_dt
|
|
|
- )
|
|
|
-
|
|
|
- # Mark fill as processed in this cycle
|
|
|
- if trade_id:
|
|
|
- processed_fills_this_cycle.add(trade_id)
|
|
|
- fill_processed_this_iteration = True
|
|
|
-
|
|
|
- elif action_type == 'position_closed' and existing_lc:
|
|
|
- # Close existing lifecycle
|
|
|
- lc_id = existing_lc['trade_lifecycle_id']
|
|
|
- lc_entry_price = existing_lc.get('entry_price', 0)
|
|
|
- lc_position_side = existing_lc.get('position_side')
|
|
|
-
|
|
|
- realized_pnl = 0
|
|
|
- if lc_position_side == 'long':
|
|
|
- realized_pnl = amount_from_fill * (price_from_fill - lc_entry_price)
|
|
|
- elif lc_position_side == 'short':
|
|
|
- realized_pnl = amount_from_fill * (lc_entry_price - price_from_fill)
|
|
|
-
|
|
|
- success = stats.update_trade_position_closed(
|
|
|
- lifecycle_id=lc_id,
|
|
|
- exit_price=price_from_fill,
|
|
|
- realized_pnl=realized_pnl,
|
|
|
- exchange_fill_id=trade_id
|
|
|
- )
|
|
|
- if success:
|
|
|
- pnl_emoji = "🟢" if realized_pnl >= 0 else "🔴"
|
|
|
- formatter = get_formatter()
|
|
|
- logger.info(f"{pnl_emoji} Lifecycle CLOSED (External): {lc_id}. PNL: {await formatter.format_price_with_symbol(realized_pnl)}")
|
|
|
- symbols_with_fills.add(token)
|
|
|
-
|
|
|
- # Send position closed notification
|
|
|
- await self._send_position_change_notification(
|
|
|
- full_symbol, side_from_fill, amount_from_fill, price_from_fill,
|
|
|
- action_type, timestamp_dt, existing_lc, realized_pnl
|
|
|
- )
|
|
|
-
|
|
|
- stats.migrate_trade_to_aggregated_stats(lc_id)
|
|
|
-
|
|
|
- # Mark fill as processed in this cycle
|
|
|
- if trade_id:
|
|
|
- processed_fills_this_cycle.add(trade_id)
|
|
|
- fill_processed_this_iteration = True
|
|
|
-
|
|
|
- elif action_type in ['position_increased', 'position_decreased'] and existing_lc:
|
|
|
- # Update lifecycle position size and send notification
|
|
|
- current_positions = self._safe_get_positions()
|
|
|
- if current_positions is None:
|
|
|
- logger.warning("⚠️ Failed to fetch positions for size update - skipping position change processing")
|
|
|
- continue
|
|
|
- new_size = 0
|
|
|
- for pos in current_positions:
|
|
|
- if pos.get('symbol') == full_symbol:
|
|
|
- new_size = abs(float(pos.get('contracts', 0)))
|
|
|
- break
|
|
|
-
|
|
|
- # Update lifecycle with new position size
|
|
|
- await self._update_lifecycle_position_size(existing_lc['trade_lifecycle_id'], new_size)
|
|
|
-
|
|
|
- # Send appropriate notification
|
|
|
- await self._send_position_change_notification(
|
|
|
- full_symbol, side_from_fill, amount_from_fill, price_from_fill,
|
|
|
- action_type, timestamp_dt, existing_lc
|
|
|
- )
|
|
|
-
|
|
|
- symbols_with_fills.add(token)
|
|
|
-
|
|
|
- # Mark fill as processed in this cycle
|
|
|
- if trade_id:
|
|
|
- processed_fills_this_cycle.add(trade_id)
|
|
|
- fill_processed_this_iteration = True
|
|
|
- logger.info(f"📊 Position {action_type}: {full_symbol} new size: {new_size}")
|
|
|
-
|
|
|
- # Fallback for unmatched external trades
|
|
|
- if not fill_processed_this_iteration:
|
|
|
- all_open_positions_in_db = stats.get_open_positions()
|
|
|
- db_open_symbols = {pos_db.get('symbol') for pos_db in all_open_positions_in_db}
|
|
|
-
|
|
|
- if full_symbol in db_open_symbols:
|
|
|
- logger.debug(f"Position {full_symbol} found in open positions but no active lifecycle - likely auto-sync failed or timing issue for fill {trade_id}")
|
|
|
-
|
|
|
- # Record as unmatched external trade
|
|
|
- linked_order_db_id = None
|
|
|
- if exchange_order_id_from_fill:
|
|
|
- order_in_db = stats.get_order_by_exchange_id(exchange_order_id_from_fill)
|
|
|
- if order_in_db:
|
|
|
- linked_order_db_id = order_in_db.get('id')
|
|
|
-
|
|
|
- stats.record_trade(
|
|
|
- full_symbol, side_from_fill, amount_from_fill, price_from_fill,
|
|
|
- exchange_fill_id=trade_id, trade_type="external_unmatched",
|
|
|
- timestamp=timestamp_dt.isoformat(),
|
|
|
- linked_order_table_id_to_link=linked_order_db_id
|
|
|
- )
|
|
|
- logger.info(f"📋 Recorded trade via FALLBACK: {trade_id} (Unmatched External Fill)")
|
|
|
-
|
|
|
- # Mark fill as processed in this cycle
|
|
|
- if trade_id:
|
|
|
- processed_fills_this_cycle.add(trade_id)
|
|
|
-
|
|
|
- # No notification sent for unmatched external trades per user preference
|
|
|
- fill_processed_this_iteration = True
|
|
|
-
|
|
|
- if fill_processed_this_iteration:
|
|
|
- external_trades_processed += 1
|
|
|
- if self.last_processed_trade_time is None or timestamp_dt > self.last_processed_trade_time:
|
|
|
- self.last_processed_trade_time = timestamp_dt
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"Error processing fill {fill.get('id', 'N/A')}: {e}", exc_info=True)
|
|
|
- continue
|
|
|
-
|
|
|
- if external_trades_processed > 0:
|
|
|
- stats._set_metadata('market_monitor_last_processed_trade_time', self.last_processed_trade_time.isoformat())
|
|
|
- logger.info(f"💾 Saved MarketMonitor state (last_processed_trade_time) to DB: {self.last_processed_trade_time.isoformat()}")
|
|
|
- logger.info(f"📊 Processed {external_trades_processed} external trades")
|
|
|
- if symbols_with_fills:
|
|
|
- logger.info(f"ℹ️ Symbols with processed fills this cycle: {list(symbols_with_fills)}")
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"❌ Error checking external trades: {e}", exc_info=True)
|
|
|
-
|
|
|
- async def _reconcile_positions(self):
|
|
|
- """Main method - check all positions for changes and send notifications."""
|
|
|
- try:
|
|
|
- stats = self.trading_engine.get_stats()
|
|
|
- if not stats:
|
|
|
- logger.warning("TradingStats not available")
|
|
|
- return
|
|
|
-
|
|
|
- # Get current exchange positions
|
|
|
- exchange_positions = self.trading_engine.get_positions()
|
|
|
-
|
|
|
- # Handle API failures gracefully - don't treat None as empty positions
|
|
|
- if exchange_positions is None:
|
|
|
- logger.warning("⚠️ Failed to fetch exchange positions - skipping position change detection to avoid false closures")
|
|
|
- return
|
|
|
-
|
|
|
- # Get current DB positions (trades with status='position_opened')
|
|
|
- db_positions = stats.get_open_positions()
|
|
|
-
|
|
|
- # Create lookup maps
|
|
|
- exchange_map = {pos['symbol']: pos for pos in exchange_positions if abs(float(pos.get('contracts', 0))) > 1e-9}
|
|
|
- db_map = {pos['symbol']: pos for pos in db_positions}
|
|
|
-
|
|
|
- all_symbols = set(exchange_map.keys()) | set(db_map.keys())
|
|
|
-
|
|
|
- for symbol in all_symbols:
|
|
|
- await self._check_symbol_position_change(symbol, exchange_map.get(symbol), db_map.get(symbol), stats)
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"❌ Error checking position changes: {e}")
|
|
|
-
|
|
|
- async def _check_symbol_position_change(self, symbol: str, exchange_pos: Optional[Dict],
|
|
|
- db_pos: Optional[Dict], stats) -> None:
|
|
|
- """Check position changes for a single symbol."""
|
|
|
- try:
|
|
|
- current_time = datetime.now(timezone.utc)
|
|
|
-
|
|
|
- # Case 1: New position (exchange has, DB doesn't)
|
|
|
- if exchange_pos and not db_pos:
|
|
|
- await self._handle_position_opened(symbol, exchange_pos, stats, current_time)
|
|
|
-
|
|
|
- # Case 2: Position closed (DB has, exchange doesn't)
|
|
|
- elif db_pos and not exchange_pos:
|
|
|
- await self._handle_position_closed(symbol, db_pos, stats, current_time)
|
|
|
-
|
|
|
- # Case 3: Position size changed (both exist, different sizes)
|
|
|
- elif exchange_pos and db_pos:
|
|
|
- await self._handle_position_size_change(symbol, exchange_pos, db_pos, stats, current_time)
|
|
|
-
|
|
|
- # Case 4: Both None - no action needed
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"❌ Error checking position change for {symbol}: {e}")
|
|
|
-
|
|
|
- async def _handle_position_opened(self, symbol: str, exchange_pos: Dict, stats, timestamp: datetime):
|
|
|
- """Handle new position detection."""
|
|
|
- try:
|
|
|
- contracts = float(exchange_pos.get('contracts', 0))
|
|
|
- size = abs(contracts)
|
|
|
-
|
|
|
- # Use CCXT's side field first (more reliable), fallback to contract sign
|
|
|
- ccxt_side = exchange_pos.get('side', '').lower()
|
|
|
- if ccxt_side == 'long':
|
|
|
- side, order_side = 'long', 'buy'
|
|
|
- elif ccxt_side == 'short':
|
|
|
- side, order_side = 'short', 'sell'
|
|
|
- else:
|
|
|
- # Fallback to contract sign (less reliable but better than nothing)
|
|
|
- side = 'long' if contracts > 0 else 'short'
|
|
|
- order_side = 'buy' if side == 'long' else 'sell'
|
|
|
- logger.warning(f"⚠️ Using contract sign fallback for {symbol}: side={side}, ccxt_side='{ccxt_side}'")
|
|
|
-
|
|
|
- # Get entry price from exchange
|
|
|
- entry_price = float(exchange_pos.get('entryPrice', 0)) or float(exchange_pos.get('entryPx', 0))
|
|
|
- if not entry_price:
|
|
|
- entry_price = float(exchange_pos.get('markPrice', 0)) or float(exchange_pos.get('markPx', 0))
|
|
|
-
|
|
|
- if not entry_price:
|
|
|
- logger.error(f"❌ Cannot determine entry price for {symbol}")
|
|
|
- return
|
|
|
-
|
|
|
- # Extract ROE from info.position.returnOnEquity
|
|
|
- roe_raw = None
|
|
|
- if 'info' in exchange_pos and 'position' in exchange_pos['info']:
|
|
|
- roe_raw = exchange_pos['info']['position'].get('returnOnEquity')
|
|
|
- roe_percentage = float(roe_raw) * 100 if roe_raw is not None else 0.0
|
|
|
-
|
|
|
- # Create trade lifecycle using existing manager
|
|
|
- lifecycle_id = stats.create_trade_lifecycle(
|
|
|
- symbol=symbol,
|
|
|
- side=order_side,
|
|
|
- entry_order_id=f"external_position_{timestamp.strftime('%Y%m%d_%H%M%S')}",
|
|
|
- trade_type='external_detected'
|
|
|
- )
|
|
|
-
|
|
|
- if lifecycle_id:
|
|
|
- # Update to position_opened using existing manager
|
|
|
- await stats.update_trade_position_opened(
|
|
|
- lifecycle_id=lifecycle_id,
|
|
|
- entry_price=entry_price,
|
|
|
- entry_amount=size,
|
|
|
- exchange_fill_id=f"position_detected_{timestamp.isoformat()}"
|
|
|
- )
|
|
|
-
|
|
|
- # Now, update the market data for the newly opened position
|
|
|
- margin_used = None
|
|
|
- if 'info' in exchange_pos and isinstance(exchange_pos['info'], dict):
|
|
|
- position_info = exchange_pos['info'].get('position', {})
|
|
|
- if position_info:
|
|
|
- margin_used = position_info.get('marginUsed')
|
|
|
-
|
|
|
- stats.trade_manager.update_trade_market_data(
|
|
|
- lifecycle_id=lifecycle_id,
|
|
|
- current_position_size=size,
|
|
|
- unrealized_pnl=exchange_pos.get('unrealizedPnl'),
|
|
|
- roe_percentage=roe_percentage,
|
|
|
- mark_price=exchange_pos.get('markPrice'),
|
|
|
- position_value=exchange_pos.get('positionValue'),
|
|
|
- margin_used=margin_used,
|
|
|
- leverage=exchange_pos.get('leverage'),
|
|
|
- liquidation_price=exchange_pos.get('liquidationPrice')
|
|
|
- )
|
|
|
-
|
|
|
- logger.info(f"🚀 NEW POSITION: {symbol} {side.upper()} {size} @ {entry_price}")
|
|
|
-
|
|
|
- # Send notification
|
|
|
- await self._send_reconciliation_notification('opened', symbol, {
|
|
|
- 'side': side,
|
|
|
- 'size': size,
|
|
|
- 'price': entry_price,
|
|
|
- 'timestamp': timestamp
|
|
|
- })
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"❌ Error handling position opened for {symbol}: {e}")
|
|
|
-
|
|
|
- async def _handle_position_closed(self, symbol: str, db_pos: Dict, stats, timestamp: datetime):
|
|
|
- """Handle position closed during reconciliation."""
|
|
|
- try:
|
|
|
- lifecycle_id = db_pos['trade_lifecycle_id']
|
|
|
- entry_price = db_pos.get('entry_price', 0)
|
|
|
- position_side = db_pos.get('position_side')
|
|
|
- size = db_pos.get('current_position_size', 0)
|
|
|
-
|
|
|
- # Get the latest lifecycle status to check if it was already closed
|
|
|
- latest_lifecycle = stats.get_trade_by_lifecycle_id(lifecycle_id)
|
|
|
- if latest_lifecycle and latest_lifecycle.get('status') == 'position_closed':
|
|
|
- logger.info(f"ℹ️ Position for {symbol} already marked as closed in lifecycle {lifecycle_id[:8]}. Skipping duplicate close processing.")
|
|
|
- return
|
|
|
-
|
|
|
- # Check if this was already migrated to aggregated stats (bot exit already processed it)
|
|
|
- try:
|
|
|
- if not stats.get_trade_by_lifecycle_id(lifecycle_id):
|
|
|
- logger.info(f"ℹ️ Lifecycle {lifecycle_id[:8]} for {symbol} already migrated to aggregated stats. Bot exit already handled this closure.")
|
|
|
- return
|
|
|
- except Exception:
|
|
|
- # If we can't check, proceed with caution
|
|
|
- pass
|
|
|
-
|
|
|
- # Estimate exit price from market data
|
|
|
- exit_price = 0
|
|
|
- try:
|
|
|
- market_data = await self.trading_engine.get_market_data(symbol)
|
|
|
- if market_data and market_data.get('ticker'):
|
|
|
- exit_price = float(market_data['ticker'].get('last', 0))
|
|
|
- if exit_price <= 0:
|
|
|
- logger.warning(f"⚠️ Invalid market price for {symbol} - using entry price")
|
|
|
- exit_price = entry_price
|
|
|
- else:
|
|
|
- logger.warning(f"⚠️ Could not get market data for {symbol} - using entry price")
|
|
|
- exit_price = entry_price
|
|
|
- except Exception as e:
|
|
|
- logger.warning(f"⚠️ Error fetching market data for {symbol}: {e} - using entry price")
|
|
|
- exit_price = entry_price
|
|
|
-
|
|
|
- # Calculate realized PnL
|
|
|
- realized_pnl = 0
|
|
|
- if position_side == 'long':
|
|
|
- realized_pnl = size * (exit_price - entry_price)
|
|
|
- elif position_side == 'short':
|
|
|
- realized_pnl = size * (entry_price - exit_price)
|
|
|
-
|
|
|
- # Update to position_closed using existing manager
|
|
|
- success = await stats.update_trade_position_closed(
|
|
|
- lifecycle_id=lifecycle_id,
|
|
|
- exit_price=exit_price,
|
|
|
- realized_pnl=realized_pnl,
|
|
|
- exchange_fill_id=f"position_closed_detected_{timestamp.isoformat()}"
|
|
|
- )
|
|
|
-
|
|
|
- if success:
|
|
|
- logger.info(f"🎯 POSITION CLOSED: {symbol} {position_side.upper()} PnL: {realized_pnl:.2f}")
|
|
|
-
|
|
|
- # Get exchange position info for ROE
|
|
|
- exchange_positions = self.trading_engine.get_positions()
|
|
|
- exchange_pos = None
|
|
|
- if exchange_positions:
|
|
|
- for pos in exchange_positions:
|
|
|
- if pos.get('symbol') == symbol:
|
|
|
- exchange_pos = pos
|
|
|
- break
|
|
|
-
|
|
|
- # Send notification
|
|
|
- await self._send_reconciliation_notification('closed', symbol, {
|
|
|
- 'side': position_side,
|
|
|
- 'size': size,
|
|
|
- 'entry_price': entry_price,
|
|
|
- 'exit_price': exit_price,
|
|
|
- 'realized_pnl': realized_pnl,
|
|
|
- 'timestamp': timestamp,
|
|
|
- 'lifecycle_id': lifecycle_id, # Pass lifecycle_id to get stored ROE
|
|
|
- 'info': exchange_pos.get('info', {}) if exchange_pos else {}
|
|
|
- })
|
|
|
-
|
|
|
- # Clear any pending stop losses for this symbol
|
|
|
- stats.order_manager.cancel_pending_stop_losses_by_symbol(symbol, 'cancelled_position_closed')
|
|
|
-
|
|
|
- # Migrate trade to aggregated stats and clean up
|
|
|
- stats.migrate_trade_to_aggregated_stats(lifecycle_id)
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"❌ Error handling position closed for {symbol}: {e}")
|
|
|
-
|
|
|
- async def _handle_position_size_change(self, symbol: str, exchange_pos: Dict,
|
|
|
- db_pos: Dict, stats, timestamp: datetime):
|
|
|
- """Handle position size changes and position flips."""
|
|
|
- try:
|
|
|
- exchange_size = abs(float(exchange_pos.get('contracts', 0)))
|
|
|
- db_size = db_pos.get('current_position_size', 0)
|
|
|
- db_position_side = db_pos.get('position_side')
|
|
|
-
|
|
|
- # Determine current exchange position side
|
|
|
- ccxt_side = exchange_pos.get('side', '').lower()
|
|
|
- if ccxt_side == 'long':
|
|
|
- exchange_position_side = 'long'
|
|
|
- elif ccxt_side == 'short':
|
|
|
- exchange_position_side = 'short'
|
|
|
- else:
|
|
|
- # Fallback to contract sign
|
|
|
- contracts = float(exchange_pos.get('contracts', 0))
|
|
|
- exchange_position_side = 'long' if contracts > 1e-9 else 'short'
|
|
|
- logger.warning(f"⚠️ Using contract sign fallback for side detection: {symbol}")
|
|
|
-
|
|
|
- # Check for POSITION FLIP (LONG ↔ SHORT)
|
|
|
- if db_position_side != exchange_position_side:
|
|
|
- logger.info(f"🔄 POSITION FLIP DETECTED: {symbol} {db_position_side.upper()} → {exchange_position_side.upper()}")
|
|
|
-
|
|
|
- # Handle as: close old position + open new position
|
|
|
- await self._handle_position_closed(symbol, db_pos, stats, timestamp)
|
|
|
- await self._handle_position_opened(symbol, exchange_pos, stats, timestamp)
|
|
|
- return
|
|
|
-
|
|
|
- # If we are here, the side is the same. Now we can update market data for the existing trade.
|
|
|
- lifecycle_id = db_pos['trade_lifecycle_id']
|
|
|
-
|
|
|
- # Extract all relevant market data from the exchange position
|
|
|
- unrealized_pnl = exchange_pos.get('unrealizedPnl')
|
|
|
- leverage = exchange_pos.get('leverage')
|
|
|
- liquidation_price = exchange_pos.get('liquidationPrice')
|
|
|
- mark_price = exchange_pos.get('markPrice')
|
|
|
- position_value = exchange_pos.get('contracts', 0) * exchange_pos.get('markPrice', 0) if mark_price else None
|
|
|
-
|
|
|
- # Safely extract ROE and Margin from the 'info' dictionary
|
|
|
- roe_percentage = None
|
|
|
- margin_used = None
|
|
|
- if 'info' in exchange_pos and isinstance(exchange_pos['info'], dict):
|
|
|
- logger.debug(f"Exchange position info for {symbol}: {exchange_pos['info']}") # Temporary logging
|
|
|
- position_info = exchange_pos['info'].get('position', {})
|
|
|
- if position_info:
|
|
|
- roe_raw = position_info.get('returnOnEquity')
|
|
|
- roe_percentage = float(roe_raw) * 100 if roe_raw is not None else None
|
|
|
- margin_used = position_info.get('marginUsed')
|
|
|
-
|
|
|
- # Call the trade manager to update the database
|
|
|
- success = stats.trade_manager.update_trade_market_data(
|
|
|
- lifecycle_id=lifecycle_id,
|
|
|
- current_position_size=exchange_size,
|
|
|
- unrealized_pnl=unrealized_pnl,
|
|
|
- roe_percentage=roe_percentage,
|
|
|
- mark_price=mark_price,
|
|
|
- position_value=position_value,
|
|
|
- margin_used=margin_used,
|
|
|
- leverage=leverage,
|
|
|
- liquidation_price=liquidation_price
|
|
|
- )
|
|
|
-
|
|
|
- if success:
|
|
|
- logger.debug(f"🔄 Synced market data for {symbol} (Lifecycle: {lifecycle_id[:8]})")
|
|
|
-
|
|
|
-
|
|
|
- # Check if size actually changed (with small tolerance)
|
|
|
- if abs(exchange_size - db_size) < 1e-6:
|
|
|
- return # No meaningful change
|
|
|
-
|
|
|
- lifecycle_id = db_pos['trade_lifecycle_id']
|
|
|
- entry_price = db_pos.get('entry_price', 0)
|
|
|
-
|
|
|
- # Extract ROE from info.position.returnOnEquity
|
|
|
- roe_raw = None
|
|
|
- if 'info' in exchange_pos and 'position' in exchange_pos['info']:
|
|
|
- roe_raw = exchange_pos['info']['position'].get('returnOnEquity')
|
|
|
- roe_percentage = float(roe_raw) * 100 if roe_raw is not None else 0.0
|
|
|
-
|
|
|
- # Update position size using existing manager
|
|
|
- success = stats.trade_manager.update_trade_market_data(
|
|
|
- lifecycle_id,
|
|
|
- current_position_size=exchange_size,
|
|
|
- unrealized_pnl=exchange_pos.get('unrealizedPnl'),
|
|
|
- roe_percentage=roe_percentage,
|
|
|
- mark_price=exchange_pos.get('markPrice'),
|
|
|
- position_value=exchange_pos.get('positionValue'),
|
|
|
- margin_used=exchange_pos.get('marginUsed'),
|
|
|
- leverage=exchange_pos.get('leverage'),
|
|
|
- liquidation_price=exchange_pos.get('liquidationPrice')
|
|
|
- )
|
|
|
-
|
|
|
- if success:
|
|
|
- change_type = 'increased' if exchange_size > db_size else 'decreased'
|
|
|
- size_diff = abs(exchange_size - db_size)
|
|
|
- logger.info(f"📊 Position size {change_type}: {symbol} by {size_diff} (ROE: {roe_percentage:+.2f}%)")
|
|
|
-
|
|
|
- # Send notification
|
|
|
- await self._send_reconciliation_notification('size_changed', symbol, {
|
|
|
- 'side': exchange_position_side,
|
|
|
- 'old_size': db_size,
|
|
|
- 'new_size': exchange_size,
|
|
|
- 'change_type': change_type,
|
|
|
- 'size_diff': size_diff,
|
|
|
- 'roe': roe_percentage,
|
|
|
- 'timestamp': timestamp
|
|
|
- })
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"❌ Error handling position size change for {symbol}: {e}")
|
|
|
-
|
|
|
- async def _send_reconciliation_notification(self, change_type: str, symbol: str, details: Dict[str, Any]):
|
|
|
- """Send position change notification."""
|
|
|
- try:
|
|
|
- if not self.notification_manager:
|
|
|
- return
|
|
|
-
|
|
|
- token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0]
|
|
|
- time_str = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')
|
|
|
- formatter = get_formatter()
|
|
|
-
|
|
|
- if change_type == 'opened':
|
|
|
- side = details['side'].upper()
|
|
|
- size = details['size']
|
|
|
- entry_price = details['price']
|
|
|
-
|
|
|
- message = f"""🎯 <b>Position Opened (Reconciled)</b>
|
|
|
-
|
|
|
-📊 <b>Details:</b>
|
|
|
-• Token: {token}
|
|
|
-• Direction: {side}
|
|
|
-• Size: {await formatter.format_amount(size, token)}
|
|
|
-• Entry: {await formatter.format_price_with_symbol(entry_price, token)}
|
|
|
-
|
|
|
-⏰ <b>Time:</b> {time_str}
|
|
|
-📈 Use /positions to view current status"""
|
|
|
-
|
|
|
- elif change_type == 'closed':
|
|
|
- side = details['side'].upper()
|
|
|
- size = details['size']
|
|
|
- entry_price = details['entry_price']
|
|
|
- exit_price = details['exit_price']
|
|
|
- pnl = details['realized_pnl']
|
|
|
- pnl_emoji = "🟢" if pnl >= 0 else "🔴"
|
|
|
-
|
|
|
- # Use the last known ROE from heartbeat data (stored in lifecycle)
|
|
|
- stored_roe = None
|
|
|
- lifecycle_id = details.get('lifecycle_id')
|
|
|
- if lifecycle_id:
|
|
|
- stats = self.trading_engine.get_stats()
|
|
|
- if stats:
|
|
|
- lifecycle = stats.get_trade_by_lifecycle_id(lifecycle_id)
|
|
|
- if lifecycle:
|
|
|
- stored_roe = lifecycle.get('roe_percentage')
|
|
|
-
|
|
|
- if stored_roe is not None:
|
|
|
- try:
|
|
|
- roe = float(stored_roe)
|
|
|
- roe_text = f"({roe:+.2f}%)"
|
|
|
- logger.debug(f"Using stored ROE from heartbeat for reconciled {symbol}: {roe:+.2f}%")
|
|
|
- except (ValueError, TypeError):
|
|
|
- logger.warning(f"Could not parse stored ROE value: {stored_roe} for {symbol}")
|
|
|
- roe_text = ""
|
|
|
- else:
|
|
|
- logger.debug(f"No stored ROE available for reconciled {symbol}")
|
|
|
- roe_text = ""
|
|
|
-
|
|
|
- message = f"""🎯 <b>Position Closed (Reconciled)</b>
|
|
|
-
|
|
|
-📊 <b>Details:</b>
|
|
|
-• Token: {token}
|
|
|
-• Direction: {side}
|
|
|
-• Size: {await formatter.format_amount(size, token)}
|
|
|
-• Entry: {await formatter.format_price_with_symbol(entry_price, token)}
|
|
|
-• Exit: {await formatter.format_price_with_symbol(exit_price, token)}
|
|
|
-
|
|
|
-{pnl_emoji} <b>P&L:</b> {await formatter.format_price_with_symbol(pnl)} {roe_text}
|
|
|
-
|
|
|
-⏰ <b>Time:</b> {time_str}
|
|
|
-📊 Use /stats to view performance"""
|
|
|
-
|
|
|
- elif change_type in ['increased', 'decreased']:
|
|
|
- side = details['side'].upper()
|
|
|
- old_size = details['old_size']
|
|
|
- new_size = details['new_size']
|
|
|
- size_diff = details['size_diff']
|
|
|
- emoji = "📈" if change_type == 'increased' else "📉"
|
|
|
-
|
|
|
- message = f"""{emoji} <b>Position {change_type.title()} (Reconciled)</b>
|
|
|
-
|
|
|
-📊 <b>Details:</b>
|
|
|
-• Token: {token}
|
|
|
-• Direction: {side}
|
|
|
-• Previous Size: {await formatter.format_amount(old_size, token)}
|
|
|
-• New Size: {await formatter.format_amount(new_size, token)}
|
|
|
-• Change: {await formatter.format_amount(size_diff, token)}
|
|
|
-
|
|
|
-⏰ <b>Time:</b> {time_str}
|
|
|
-📈 Use /positions to view current status"""
|
|
|
- else:
|
|
|
- return
|
|
|
-
|
|
|
- await self.notification_manager.send_generic_notification(message.strip())
|
|
|
- logger.debug(f"📨 Sent {change_type} notification for {symbol}")
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"❌ Error sending notification for {symbol}: {e}")
|