|
@@ -1,61 +1,91 @@
|
|
|
|
|
|
"""
|
|
|
-Market Monitor - Handles external trade monitoring and heartbeat functionality.
|
|
|
+Market Monitor - Main coordinator for monitoring market events, orders, and positions.
|
|
|
"""
|
|
|
|
|
|
import logging
|
|
|
import asyncio
|
|
|
from datetime import datetime, timedelta, timezone
|
|
|
from typing import Optional, Dict, Any, List
|
|
|
-import os
|
|
|
-import json
|
|
|
-
|
|
|
-from telegram.ext import CallbackContext
|
|
|
|
|
|
from src.config.config import Config
|
|
|
from src.monitoring.alarm_manager import AlarmManager
|
|
|
from src.utils.token_display_formatter import get_formatter
|
|
|
|
|
|
+
|
|
|
+from src.monitoring.order_fill_processor import OrderFillProcessor
|
|
|
+from src.monitoring.position_synchronizer import PositionSynchronizer
|
|
|
+from src.monitoring.external_event_monitor import ExternalEventMonitor
|
|
|
+from src.monitoring.risk_cleanup_manager import RiskCleanupManager
|
|
|
+
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
+class MarketMonitorCache:
|
|
|
+ """Simple data class to hold cached data for MarketMonitor and its delegates."""
|
|
|
+ def __init__(self):
|
|
|
+ self.cached_positions: List[Dict[str, Any]] = []
|
|
|
+ self.cached_orders: List[Dict[str, Any]] = []
|
|
|
+ self.cached_balance: Optional[Dict[str, Any]] = None
|
|
|
+ self.last_cache_update: Optional[datetime] = None
|
|
|
+
|
|
|
+
|
|
|
+ self.last_known_orders: set = set()
|
|
|
+
|
|
|
+ self.last_known_positions: Dict[str, Any] = {}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ self.last_processed_trade_time_helper: Optional[datetime] = None
|
|
|
+
|
|
|
+
|
|
|
class MarketMonitor:
|
|
|
- """Handles external trade monitoring and market events."""
|
|
|
+ """Coordinates monitoring activities by delegating to specialized processors and managers."""
|
|
|
|
|
|
def __init__(self, trading_engine, notification_manager=None):
|
|
|
- """Initialize market monitor."""
|
|
|
self.trading_engine = trading_engine
|
|
|
self.notification_manager = notification_manager
|
|
|
self._monitoring_active = False
|
|
|
self._monitor_task = None
|
|
|
|
|
|
-
|
|
|
- self.last_known_orders = set()
|
|
|
- self.last_known_positions = {}
|
|
|
+ self.alarm_manager = AlarmManager()
|
|
|
|
|
|
-
|
|
|
- self.price_alarms = {}
|
|
|
- self.next_alarm_id = 1
|
|
|
+
|
|
|
+ self.cache = MarketMonitorCache()
|
|
|
+
|
|
|
+
|
|
|
+ self.shared_state = {
|
|
|
+ 'external_stop_losses': {}
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ self.order_fill_processor = OrderFillProcessor(
|
|
|
+ trading_engine=self.trading_engine,
|
|
|
+ notification_manager=self.notification_manager,
|
|
|
+ market_monitor_cache=self.cache
|
|
|
+ )
|
|
|
+ self.position_synchronizer = PositionSynchronizer(
|
|
|
+ trading_engine=self.trading_engine,
|
|
|
+ notification_manager=self.notification_manager,
|
|
|
+ market_monitor_cache=self.cache
|
|
|
+ )
|
|
|
+ self.external_event_monitor = ExternalEventMonitor(
|
|
|
+ trading_engine=self.trading_engine,
|
|
|
+ notification_manager=self.notification_manager,
|
|
|
+ alarm_manager=self.alarm_manager,
|
|
|
+ market_monitor_cache=self.cache,
|
|
|
+ shared_state=self.shared_state
|
|
|
+ )
|
|
|
+ self.risk_cleanup_manager = RiskCleanupManager(
|
|
|
+ trading_engine=self.trading_engine,
|
|
|
+ notification_manager=self.notification_manager,
|
|
|
+ market_monitor_cache=self.cache,
|
|
|
+ shared_state=self.shared_state
|
|
|
+ )
|
|
|
|
|
|
-
|
|
|
- self.external_stop_losses = {}
|
|
|
-
|
|
|
-
|
|
|
- self.cached_positions = []
|
|
|
- self.cached_orders = []
|
|
|
- self.cached_balance = None
|
|
|
- self.last_cache_update = None
|
|
|
-
|
|
|
-
|
|
|
- self.last_processed_trade_time: Optional[datetime] = None
|
|
|
-
|
|
|
-
|
|
|
- self.alarm_manager = AlarmManager()
|
|
|
-
|
|
|
-
|
|
|
+
|
|
|
self._load_state()
|
|
|
|
|
|
async def start(self):
|
|
|
- """Start the market monitor."""
|
|
|
if self._monitoring_active:
|
|
|
logger.warning("Market monitor is already active")
|
|
|
return
|
|
@@ -63,19 +93,14 @@ class MarketMonitor:
|
|
|
self._monitoring_active = True
|
|
|
logger.info("🔄 Market monitor started")
|
|
|
|
|
|
-
|
|
|
await self._initialize_tracking()
|
|
|
-
|
|
|
-
|
|
|
self._monitor_task = asyncio.create_task(self._monitor_loop())
|
|
|
|
|
|
async def stop(self):
|
|
|
- """Stop the market monitor."""
|
|
|
if not self._monitoring_active:
|
|
|
return
|
|
|
|
|
|
self._monitoring_active = False
|
|
|
-
|
|
|
if self._monitor_task:
|
|
|
self._monitor_task.cancel()
|
|
|
try:
|
|
@@ -83,2032 +108,285 @@ class MarketMonitor:
|
|
|
except asyncio.CancelledError:
|
|
|
pass
|
|
|
|
|
|
- self._save_state()
|
|
|
+ self._save_state()
|
|
|
logger.info("🛑 Market monitor stopped")
|
|
|
|
|
|
def _load_state(self):
|
|
|
- """Load market monitor state from SQLite DB via TradingStats."""
|
|
|
+ """Load minimal MarketMonitor-specific state if necessary. Most state is now managed by delegates."""
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
stats = self.trading_engine.get_stats()
|
|
|
if not stats:
|
|
|
- logger.warning("⚠️ TradingStats not available, cannot load MarketMonitor state.")
|
|
|
- self.last_processed_trade_time = None
|
|
|
+ logger.warning("⚠️ TradingStats not available, cannot load MarketMonitor helper states.")
|
|
|
return
|
|
|
-
|
|
|
try:
|
|
|
- last_time_str = stats._get_metadata('market_monitor_last_processed_trade_time')
|
|
|
- if last_time_str:
|
|
|
- self.last_processed_trade_time = datetime.fromisoformat(last_time_str)
|
|
|
-
|
|
|
- if self.last_processed_trade_time.tzinfo is None:
|
|
|
- self.last_processed_trade_time = self.last_processed_trade_time.replace(tzinfo=timezone.utc)
|
|
|
- else:
|
|
|
- self.last_processed_trade_time = self.last_processed_trade_time.astimezone(timezone.utc)
|
|
|
- logger.info(f"🔄 Loaded MarketMonitor state from DB: last_processed_trade_time = {self.last_processed_trade_time.isoformat()}")
|
|
|
- else:
|
|
|
- logger.info("💨 No MarketMonitor state (last_processed_trade_time) found in DB. Will start with fresh external trade tracking.")
|
|
|
- self.last_processed_trade_time = None
|
|
|
+ helper_time_str = stats._get_metadata('order_fill_processor_last_processed_trade_time_helper')
|
|
|
+ if helper_time_str:
|
|
|
+ dt_obj = datetime.fromisoformat(helper_time_str)
|
|
|
+ self.cache.last_processed_trade_time_helper = dt_obj.replace(tzinfo=timezone.utc) if dt_obj.tzinfo is None else dt_obj.astimezone(timezone.utc)
|
|
|
+ logger.info(f"🔄 Loaded OrderFillProcessor helper state: last_processed_trade_time_helper = {self.cache.last_processed_trade_time_helper.isoformat()}")
|
|
|
except Exception as e:
|
|
|
- logger.error(f"Error loading MarketMonitor state from DB: {e}. Proceeding with default state.")
|
|
|
- self.last_processed_trade_time = None
|
|
|
+ logger.error(f"Error loading OrderFillProcessor helper state: {e}")
|
|
|
+
|
|
|
+ logger.info("MarketMonitor _load_state: Minimal state loaded (most state handled by delegates).")
|
|
|
|
|
|
def _save_state(self):
|
|
|
- """Save market monitor state to SQLite DB via TradingStats."""
|
|
|
+ """Save minimal MarketMonitor-specific state if necessary."""
|
|
|
+
|
|
|
stats = self.trading_engine.get_stats()
|
|
|
if not stats:
|
|
|
- logger.warning("⚠️ TradingStats not available, cannot save MarketMonitor state.")
|
|
|
+ logger.warning("⚠️ TradingStats not available, cannot save MarketMonitor helper states.")
|
|
|
return
|
|
|
-
|
|
|
try:
|
|
|
- if self.last_processed_trade_time:
|
|
|
-
|
|
|
- lptt_utc = self.last_processed_trade_time
|
|
|
- if lptt_utc.tzinfo is None:
|
|
|
- lptt_utc = lptt_utc.replace(tzinfo=timezone.utc)
|
|
|
- else:
|
|
|
- lptt_utc = lptt_utc.astimezone(timezone.utc)
|
|
|
-
|
|
|
- stats._set_metadata('market_monitor_last_processed_trade_time', lptt_utc.isoformat())
|
|
|
- logger.info(f"💾 Saved MarketMonitor state (last_processed_trade_time) to DB: {lptt_utc.isoformat()}")
|
|
|
- else:
|
|
|
-
|
|
|
-
|
|
|
- stats._set_metadata('market_monitor_last_processed_trade_time', '')
|
|
|
- logger.info("💾 MarketMonitor state (last_processed_trade_time) is None, saved as empty in DB.")
|
|
|
+ if self.cache.last_processed_trade_time_helper:
|
|
|
+ lptt_helper_utc = self.cache.last_processed_trade_time_helper.astimezone(timezone.utc)
|
|
|
+ stats._set_metadata('order_fill_processor_last_processed_trade_time_helper', lptt_helper_utc.isoformat())
|
|
|
+ logger.info(f"💾 Saved OrderFillProcessor helper state (last_processed_trade_time_helper) to DB: {lptt_helper_utc.isoformat()}")
|
|
|
except Exception as e:
|
|
|
- logger.error(f"Error saving MarketMonitor state to DB: {e}")
|
|
|
+ logger.error(f"Error saving OrderFillProcessor helper state: {e}")
|
|
|
+
|
|
|
+ logger.info("MarketMonitor _save_state: Minimal state saved.")
|
|
|
|
|
|
async def _initialize_tracking(self):
|
|
|
- """Initialize order and position tracking."""
|
|
|
+ """Initialize basic tracking for cache."""
|
|
|
try:
|
|
|
-
|
|
|
- try:
|
|
|
- orders = self.trading_engine.get_orders() or []
|
|
|
- self.last_known_orders = {order.get('id') for order in orders if order.get('id')}
|
|
|
- logger.info(f"📋 Initialized tracking with {len(orders)} open orders")
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"❌ Failed to initialize order tracking: {e}")
|
|
|
- self.last_known_orders = set()
|
|
|
-
|
|
|
-
|
|
|
- try:
|
|
|
- positions = self.trading_engine.get_positions() or []
|
|
|
- self.last_known_positions = {
|
|
|
- pos.get('symbol'): pos for pos in positions
|
|
|
- if pos.get('symbol') and abs(float(pos.get('contracts', 0))) > 0
|
|
|
- }
|
|
|
- logger.info(f"📊 Initialized tracking with {len(positions)} positions")
|
|
|
-
|
|
|
-
|
|
|
- if positions:
|
|
|
- await self._immediate_startup_auto_sync()
|
|
|
+ orders = self.trading_engine.get_orders() or []
|
|
|
+
|
|
|
+ self.cache.last_known_orders = {order.get('id') for order in orders if order.get('id')}
|
|
|
+ logger.info(f"📋 Initialized cache with {len(orders)} open orders for first cycle comparison")
|
|
|
+
|
|
|
+ positions = self.trading_engine.get_positions() or []
|
|
|
+
|
|
|
+ self.cache.last_known_positions = {
|
|
|
+ pos.get('symbol'): pos for pos in positions
|
|
|
+ if pos.get('symbol') and abs(float(pos.get('contracts', 0))) > 1e-9
|
|
|
+ }
|
|
|
+ logger.info(f"📊 Initialized cache with {len(positions)} positions for first cycle comparison")
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ await self.position_synchronizer._immediate_startup_auto_sync()
|
|
|
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"❌ Failed to initialize position tracking: {e}")
|
|
|
- self.last_known_positions = {}
|
|
|
-
|
|
|
except Exception as e:
|
|
|
- logger.error(f"❌ Failed to initialize tracking: {e}")
|
|
|
- self.last_known_orders = set()
|
|
|
- self.last_known_positions = {}
|
|
|
+ logger.error(f"❌ Failed to initialize tracking: {e}", exc_info=True)
|
|
|
+ self.cache.last_known_orders = set()
|
|
|
+ self.cache.last_known_positions = {}
|
|
|
|
|
|
async def _monitor_loop(self):
|
|
|
- """Main monitoring loop that runs every BOT_HEARTBEAT_SECONDS."""
|
|
|
try:
|
|
|
loop_count = 0
|
|
|
while self._monitoring_active:
|
|
|
-
|
|
|
- await self._update_cached_data()
|
|
|
+ await self._update_cached_data()
|
|
|
|
|
|
-
|
|
|
- await self._activate_pending_stop_losses_from_trades()
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ await self.order_fill_processor._activate_pending_stop_losses_from_trades()
|
|
|
+ await self.order_fill_processor._check_order_fills()
|
|
|
+
|
|
|
+ await self.external_event_monitor._check_price_alarms()
|
|
|
+ await self.external_event_monitor._check_external_trades()
|
|
|
|
|
|
- await self._check_order_fills()
|
|
|
- await self._check_price_alarms()
|
|
|
- await self._check_external_trades()
|
|
|
- await self._check_pending_triggers()
|
|
|
- await self._check_automatic_risk_management()
|
|
|
- await self._check_external_stop_loss_orders()
|
|
|
+ await self.risk_cleanup_manager._check_pending_triggers()
|
|
|
+ await self.risk_cleanup_manager._check_automatic_risk_management()
|
|
|
+ await self.risk_cleanup_manager._check_external_stop_loss_orders()
|
|
|
|
|
|
-
|
|
|
loop_count += 1
|
|
|
- if loop_count % 10 == 0:
|
|
|
- await self._cleanup_orphaned_stop_losses()
|
|
|
- await self._cleanup_external_stop_loss_tracking()
|
|
|
+ if loop_count >= Config.MARKET_MONITOR_CLEANUP_INTERVAL_HEARTBEATS:
|
|
|
+ logger.info(f"Running periodic cleanup and sync tasks (Loop count: {loop_count})")
|
|
|
+ await self.risk_cleanup_manager._cleanup_orphaned_stop_losses()
|
|
|
+ await self.risk_cleanup_manager._cleanup_external_stop_loss_tracking()
|
|
|
+ await self.risk_cleanup_manager._cleanup_orphaned_pending_sl_activations()
|
|
|
|
|
|
-
|
|
|
- await self._auto_sync_orphaned_positions()
|
|
|
+ await self.position_synchronizer._auto_sync_orphaned_positions()
|
|
|
|
|
|
- loop_count = 0
|
|
|
+ loop_count = 0
|
|
|
|
|
|
await asyncio.sleep(Config.BOT_HEARTBEAT_SECONDS)
|
|
|
except asyncio.CancelledError:
|
|
|
logger.info("Market monitor loop cancelled")
|
|
|
raise
|
|
|
except Exception as e:
|
|
|
- logger.error(f"Error in market monitor loop: {e}")
|
|
|
-
|
|
|
- if self._monitoring_active:
|
|
|
- await asyncio.sleep(5)
|
|
|
- await self._monitor_loop()
|
|
|
-
|
|
|
+ logger.error(f"Error in market monitor loop: {e}", exc_info=True)
|
|
|
+ if self._monitoring_active:
|
|
|
+ logger.info("Attempting to restart market monitor loop after error...")
|
|
|
+ await asyncio.sleep(Config.BOT_HEARTBEAT_SECONDS * 2)
|
|
|
+
|
|
|
+
|
|
|
+ if self._monitoring_active:
|
|
|
+ self._monitor_task = asyncio.create_task(self._monitor_loop())
|
|
|
+
|
|
|
async def _update_cached_data(self):
|
|
|
- """🆕 Continuously update cached exchange data every heartbeat."""
|
|
|
+ """Continuously update cached exchange data for all components to use."""
|
|
|
try:
|
|
|
-
|
|
|
fresh_positions_list = self.trading_engine.get_positions() or []
|
|
|
fresh_orders_list = self.trading_engine.get_orders() or []
|
|
|
fresh_balance = self.trading_engine.get_balance()
|
|
|
|
|
|
-
|
|
|
- self.cached_positions = fresh_positions_list
|
|
|
- self.cached_orders = fresh_orders_list
|
|
|
- self.cached_balance = fresh_balance
|
|
|
- self.last_cache_update = datetime.now(timezone.utc)
|
|
|
+
|
|
|
+ self.cache.cached_positions = fresh_positions_list
|
|
|
+ self.cache.cached_orders = fresh_orders_list
|
|
|
+ self.cache.cached_balance = fresh_balance
|
|
|
+ self.cache.last_cache_update = datetime.now(timezone.utc)
|
|
|
|
|
|
- logger.debug(f"🔄 Fetched fresh cache: {len(fresh_positions_list)} positions, {len(fresh_orders_list)} orders")
|
|
|
+ logger.debug(f"🔄 Cache updated: {len(fresh_positions_list)} positions, {len(fresh_orders_list)} orders")
|
|
|
|
|
|
-
|
|
|
current_exchange_position_map = {
|
|
|
pos.get('symbol'): pos for pos in fresh_positions_list
|
|
|
if pos.get('symbol') and abs(float(pos.get('contracts', 0))) > 1e-9
|
|
|
}
|
|
|
current_exchange_order_ids = {order.get('id') for order in fresh_orders_list if order.get('id')}
|
|
|
|
|
|
-
|
|
|
- if len(current_exchange_position_map) != len(self.last_known_positions):
|
|
|
- logger.info(f"📊 Position count changed: {len(self.last_known_positions)} → {len(current_exchange_position_map)}")
|
|
|
+ if len(current_exchange_position_map) != len(self.cache.last_known_positions):
|
|
|
+ logger.info(f"📊 Position count changed: {len(self.cache.last_known_positions)} → {len(current_exchange_position_map)}")
|
|
|
|
|
|
- if len(current_exchange_order_ids) != len(self.last_known_orders):
|
|
|
- logger.info(f"📋 Order count changed: {len(self.last_known_orders)} → {len(current_exchange_order_ids)}")
|
|
|
+ if len(current_exchange_order_ids) != len(self.cache.last_known_orders):
|
|
|
+ logger.info(f"📋 Order count changed: {len(self.cache.last_known_orders)} → {len(current_exchange_order_ids)}")
|
|
|
|
|
|
-
|
|
|
- self.last_known_positions = current_exchange_position_map
|
|
|
- self.last_known_orders = current_exchange_order_ids
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ self.cache.last_known_positions = current_exchange_position_map
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
|
|
|
-
|
|
|
stats = self.trading_engine.get_stats()
|
|
|
if stats and fresh_positions_list:
|
|
|
- for ex_pos in fresh_positions_list:
|
|
|
+ for ex_pos in fresh_positions_list:
|
|
|
symbol = ex_pos.get('symbol')
|
|
|
- if not symbol:
|
|
|
+ if not symbol:
|
|
|
continue
|
|
|
|
|
|
+
|
|
|
db_trade = stats.get_trade_by_symbol_and_status(symbol, status='position_opened')
|
|
|
-
|
|
|
if db_trade:
|
|
|
lifecycle_id = db_trade.get('trade_lifecycle_id')
|
|
|
if not lifecycle_id:
|
|
|
- continue
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- current_size_from_ex = ex_pos.get('contracts')
|
|
|
- if current_size_from_ex is not None:
|
|
|
- try: current_position_size = float(current_size_from_ex)
|
|
|
- except (ValueError, TypeError): current_position_size = None
|
|
|
- else: current_position_size = None
|
|
|
-
|
|
|
- entry_price_from_ex = ex_pos.get('entryPrice') or ex_pos.get('entryPx')
|
|
|
- if entry_price_from_ex is not None:
|
|
|
- try: entry_price = float(entry_price_from_ex)
|
|
|
- except (ValueError, TypeError): entry_price = None
|
|
|
- else: entry_price = None
|
|
|
-
|
|
|
- mark_price_from_ex = ex_pos.get('markPrice') or ex_pos.get('markPx')
|
|
|
- if mark_price_from_ex is not None:
|
|
|
- try: mark_price = float(mark_price_from_ex)
|
|
|
- except (ValueError, TypeError): mark_price = None
|
|
|
- else: mark_price = None
|
|
|
-
|
|
|
- unrealized_pnl_from_ex = ex_pos.get('unrealizedPnl')
|
|
|
- if unrealized_pnl_from_ex is not None:
|
|
|
- try: unrealized_pnl = float(unrealized_pnl_from_ex)
|
|
|
- except (ValueError, TypeError): unrealized_pnl = None
|
|
|
- else: unrealized_pnl = None
|
|
|
-
|
|
|
- liquidation_price_from_ex = ex_pos.get('liquidationPrice')
|
|
|
- if liquidation_price_from_ex is not None:
|
|
|
- try: liquidation_price = float(liquidation_price_from_ex)
|
|
|
- except (ValueError, TypeError): liquidation_price = None
|
|
|
- else: liquidation_price = None
|
|
|
-
|
|
|
- margin_used_from_ex = ex_pos.get('marginUsed')
|
|
|
- if margin_used_from_ex is not None:
|
|
|
- try: margin_used = float(margin_used_from_ex)
|
|
|
- except (ValueError, TypeError): margin_used = None
|
|
|
- else: margin_used = None
|
|
|
-
|
|
|
- leverage_from_ex = ex_pos.get('leverage')
|
|
|
- if leverage_from_ex is not None:
|
|
|
- try: leverage = float(leverage_from_ex)
|
|
|
- except (ValueError, TypeError): leverage = None
|
|
|
- else: leverage = None
|
|
|
-
|
|
|
- position_value_from_ex = ex_pos.get('notional')
|
|
|
- if position_value_from_ex is not None:
|
|
|
- try: position_value = float(position_value_from_ex)
|
|
|
- except (ValueError, TypeError): position_value = None
|
|
|
- else: position_value = None
|
|
|
-
|
|
|
-
|
|
|
- if position_value is None and mark_price is not None and current_position_size is not None:
|
|
|
- position_value = abs(current_position_size) * mark_price
|
|
|
-
|
|
|
-
|
|
|
- roe_from_ex = ex_pos.get('percentage')
|
|
|
- if roe_from_ex is not None:
|
|
|
- try: unrealized_pnl_percentage_val = float(roe_from_ex)
|
|
|
- except (ValueError, TypeError): unrealized_pnl_percentage_val = None
|
|
|
- else: unrealized_pnl_percentage_val = None
|
|
|
-
|
|
|
- stats.update_trade_market_data(
|
|
|
- trade_lifecycle_id=lifecycle_id,
|
|
|
- unrealized_pnl=unrealized_pnl,
|
|
|
- mark_price=mark_price,
|
|
|
- current_position_size=current_position_size,
|
|
|
- entry_price=entry_price,
|
|
|
- liquidation_price=liquidation_price,
|
|
|
- margin_used=margin_used,
|
|
|
- leverage=leverage,
|
|
|
- position_value=position_value,
|
|
|
- unrealized_pnl_percentage=unrealized_pnl_percentage_val
|
|
|
- )
|
|
|
-
|
|
|
-
|
|
|
- if len(fresh_positions_list) != len(self.last_known_positions):
|
|
|
- logger.info(f"📊 Position count changed: {len(self.last_known_positions)} → {len(current_exchange_position_map)}")
|
|
|
-
|
|
|
- if len(fresh_orders_list) != len(self.last_known_orders):
|
|
|
- logger.info(f"📋 Order count changed: {len(self.last_known_orders)} → {len(current_exchange_order_ids)}")
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"❌ Error updating cached data: {e}")
|
|
|
-
|
|
|
- def get_cached_positions(self):
|
|
|
- """Get cached positions (updated every heartbeat)."""
|
|
|
- return self.cached_positions or []
|
|
|
-
|
|
|
- def get_cached_orders(self):
|
|
|
- """Get cached orders (updated every heartbeat)."""
|
|
|
- return self.cached_orders or []
|
|
|
-
|
|
|
- def get_cached_balance(self):
|
|
|
- """Get cached balance (updated every heartbeat)."""
|
|
|
- return self.cached_balance
|
|
|
-
|
|
|
- def get_cache_age_seconds(self):
|
|
|
- """Get age of cached data in seconds."""
|
|
|
- if not self.last_cache_update:
|
|
|
- return float('inf')
|
|
|
- return (datetime.now(timezone.utc) - self.last_cache_update).total_seconds()
|
|
|
-
|
|
|
- async def _check_order_fills(self):
|
|
|
- """Check for filled orders and send notifications."""
|
|
|
- try:
|
|
|
-
|
|
|
- current_orders = self.cached_orders or []
|
|
|
- current_positions = self.cached_positions or []
|
|
|
-
|
|
|
-
|
|
|
- current_order_ids = {order.get('id') for order in current_orders if order.get('id')}
|
|
|
-
|
|
|
-
|
|
|
- disappeared_order_ids = self.last_known_orders - current_order_ids
|
|
|
-
|
|
|
- if disappeared_order_ids:
|
|
|
- logger.info(f"🎯 Detected {len(disappeared_order_ids)} bot orders no longer open: {list(disappeared_order_ids)}. Corresponding fills (if any) are processed by external trade checker.")
|
|
|
- await self._process_disappeared_orders(disappeared_order_ids)
|
|
|
-
|
|
|
-
|
|
|
- self.last_known_orders = current_order_ids
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"❌ Error checking order fills: {e}")
|
|
|
-
|
|
|
- async def _process_filled_orders(self, filled_order_ids: set, current_positions: list):
|
|
|
- """Process filled orders using enhanced position tracking."""
|
|
|
- try:
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- await self._update_position_tracking(current_positions)
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"❌ Error processing filled orders: {e}")
|
|
|
-
|
|
|
- async def _update_position_tracking(self, current_positions: list):
|
|
|
- """Update position tracking and calculate P&L changes."""
|
|
|
- try:
|
|
|
- new_position_map = {}
|
|
|
- formatter = get_formatter()
|
|
|
-
|
|
|
- for position in current_positions:
|
|
|
- symbol = position.get('symbol')
|
|
|
- contracts = float(position.get('contracts', 0))
|
|
|
- entry_price = float(position.get('entryPx', 0))
|
|
|
-
|
|
|
- if symbol and contracts != 0:
|
|
|
- new_position_map[symbol] = {
|
|
|
- 'contracts': contracts,
|
|
|
- 'entry_price': entry_price
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- for symbol, new_data in new_position_map.items():
|
|
|
- old_data = self.last_known_positions.get(symbol)
|
|
|
- token = symbol.split('/')[0] if '/' in symbol else symbol
|
|
|
-
|
|
|
- if not old_data:
|
|
|
-
|
|
|
- amount_str = formatter.format_amount(new_data['contracts'], token)
|
|
|
- price_str = formatter.format_price_with_symbol(new_data['entry_price'], token)
|
|
|
- logger.info(f"📈 New position detected (observed by MarketMonitor): {symbol} {amount_str} @ {price_str}. TradingStats is the definitive source.")
|
|
|
- elif abs(new_data['contracts'] - old_data['contracts']) > 0.000001:
|
|
|
-
|
|
|
- change = new_data['contracts'] - old_data['contracts']
|
|
|
- change_str = formatter.format_amount(change, token)
|
|
|
- logger.info(f"📊 Position change detected (observed by MarketMonitor): {symbol} {change_str} contracts. TradingStats is the definitive source.")
|
|
|
-
|
|
|
-
|
|
|
- for symbol in self.last_known_positions:
|
|
|
- if symbol not in new_position_map:
|
|
|
- logger.info(f"📉 Position closed (observed by MarketMonitor): {symbol}. TradingStats is the definitive source.")
|
|
|
-
|
|
|
-
|
|
|
- self.last_known_positions = new_position_map
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"❌ Error updating position tracking: {e}")
|
|
|
-
|
|
|
- async def _process_disappeared_orders(self, disappeared_order_ids: set):
|
|
|
- """Log and investigate bot orders that have disappeared from the exchange."""
|
|
|
- stats = self.trading_engine.get_stats()
|
|
|
- if not stats:
|
|
|
- logger.warning("⚠️ TradingStats not available in _process_disappeared_orders.")
|
|
|
- return
|
|
|
-
|
|
|
- try:
|
|
|
- total_linked_cancelled = 0
|
|
|
- external_cancellations = []
|
|
|
-
|
|
|
- for exchange_oid in disappeared_order_ids:
|
|
|
- order_in_db = stats.get_order_by_exchange_id(exchange_oid)
|
|
|
-
|
|
|
- if order_in_db:
|
|
|
- last_status = order_in_db.get('status', 'unknown')
|
|
|
- order_type = order_in_db.get('type', 'unknown')
|
|
|
- symbol = order_in_db.get('symbol', 'unknown')
|
|
|
- token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0]
|
|
|
-
|
|
|
- logger.info(f"Order {exchange_oid} was in our DB with status '{last_status}' but has now disappeared from exchange.")
|
|
|
-
|
|
|
-
|
|
|
- active_statuses = ['open', 'submitted', 'partially_filled', 'pending_submission']
|
|
|
- if last_status in active_statuses:
|
|
|
- logger.warning(f"⚠️ EXTERNAL CANCELLATION: Order {exchange_oid} with status '{last_status}' was likely cancelled externally on Hyperliquid")
|
|
|
- stats.update_order_status(exchange_order_id=exchange_oid, new_status='cancelled_externally')
|
|
|
-
|
|
|
-
|
|
|
- external_cancellations.append({
|
|
|
- 'exchange_oid': exchange_oid,
|
|
|
- 'token': token,
|
|
|
- 'type': order_type,
|
|
|
- 'last_status': last_status
|
|
|
- })
|
|
|
-
|
|
|
-
|
|
|
- if self.notification_manager:
|
|
|
- await self.notification_manager.send_generic_notification(
|
|
|
- f"⚠️ <b>External Order Cancellation Detected</b>\n\n"
|
|
|
- f"Token: {token}\n"
|
|
|
- f"Order Type: {order_type.replace('_', ' ').title()}\n"
|
|
|
- f"Exchange Order ID: <code>{exchange_oid[:8]}...</code>\n"
|
|
|
- f"Previous Status: {last_status.replace('_', ' ').title()}\n"
|
|
|
- f"Source: Cancelled directly on Hyperliquid\n"
|
|
|
- f"Time: {datetime.now().strftime('%H:%M:%S')}\n\n"
|
|
|
- f"🤖 Bot status updated automatically"
|
|
|
- )
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- order_after_external_check = stats.get_order_by_exchange_id(exchange_oid)
|
|
|
- if order_after_external_check and order_after_external_check.get('status') == 'cancelled_externally':
|
|
|
-
|
|
|
- pending_lc = stats.get_lifecycle_by_entry_order_id(exchange_oid, status='pending')
|
|
|
- if pending_lc:
|
|
|
- lc_id_to_cancel = pending_lc.get('trade_lifecycle_id')
|
|
|
- if lc_id_to_cancel:
|
|
|
- cancel_reason = f"entry_order_{exchange_oid[:8]}_disappeared_externally"
|
|
|
- cancelled_lc_success = stats.update_trade_cancelled(lc_id_to_cancel, reason=cancel_reason)
|
|
|
- if cancelled_lc_success:
|
|
|
- logger.info(f"🔗 Trade lifecycle {lc_id_to_cancel} also cancelled for disappeared entry order {exchange_oid}.")
|
|
|
- if self.notification_manager:
|
|
|
- await self.notification_manager.send_generic_notification(
|
|
|
- f"🔗 <b>Trade Lifecycle Cancelled</b>\n\n"
|
|
|
- f"Token: {token}\n"
|
|
|
- f"Lifecycle ID: {lc_id_to_cancel[:8]}...\\n"
|
|
|
- f"Reason: Entry order {exchange_oid[:8]}... cancelled externally.\\n"
|
|
|
- f"Time: {datetime.now().strftime('%H:%M:%S')}"
|
|
|
- )
|
|
|
- else:
|
|
|
- logger.error(f"❌ Failed to cancel trade lifecycle {lc_id_to_cancel} for entry order {exchange_oid}.")
|
|
|
-
|
|
|
-
|
|
|
- elif order_after_external_check and order_after_external_check.get('status') in ['filled', 'partially_filled']:
|
|
|
- logger.info(f"ℹ️ Order {exchange_oid} was ultimately found to be '{order_after_external_check.get('status')}' despite initial disappearance. Stop losses will not be cancelled by this path. Lifecycle should be active.")
|
|
|
- continue
|
|
|
-
|
|
|
- else:
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- if last_status not in ['filled', 'partially_filled', 'cancelled_manually', 'cancelled_by_bot', 'failed_submission', 'cancelled_externally']:
|
|
|
- stats.update_order_status(exchange_order_id=exchange_oid, new_status='disappeared_from_exchange')
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- if order_in_db.get('bot_order_ref_id'):
|
|
|
-
|
|
|
- parent_order_current_state = stats.get_order_by_exchange_id(exchange_oid)
|
|
|
-
|
|
|
- if parent_order_current_state and parent_order_current_state.get('status') not in ['filled', 'partially_filled']:
|
|
|
- logger.info(f"Cancelling stop losses for order {exchange_oid} (status: {parent_order_current_state.get('status')}) as it is not considered filled.")
|
|
|
- cancelled_sl_count = stats.cancel_linked_orders(
|
|
|
- parent_bot_order_ref_id=order_in_db['bot_order_ref_id'],
|
|
|
- new_status='cancelled_parent_disappeared_or_not_filled'
|
|
|
- )
|
|
|
- total_linked_cancelled += cancelled_sl_count
|
|
|
-
|
|
|
- if cancelled_sl_count > 0:
|
|
|
- logger.info(f"Cancelled {cancelled_sl_count} pending stop losses linked to disappeared/non-filled order {exchange_oid}")
|
|
|
-
|
|
|
- if self.notification_manager:
|
|
|
- await self.notification_manager.send_generic_notification(
|
|
|
- f"🛑 <b>Linked Stop Losses Cancelled</b>\n\n"
|
|
|
- f"Token: {token}\n"
|
|
|
- f"Cancelled: {cancelled_sl_count} stop loss(es)\n"
|
|
|
- f"Reason: Parent order {exchange_oid[:8]}... disappeared or was not filled\n"
|
|
|
- f"Parent Status: {parent_order_current_state.get('status', 'N/A').replace('_', ' ').title()}\n"
|
|
|
- f"Time: {datetime.now().strftime('%H:%M:%S')}"
|
|
|
- )
|
|
|
- else:
|
|
|
- logger.info(f"ℹ️ Stop losses for order {exchange_oid} (status: {parent_order_current_state.get('status') if parent_order_current_state else 'N/A'}) preserved as parent order is considered filled or partially filled.")
|
|
|
- continue
|
|
|
- else:
|
|
|
- logger.warning(f"Order {exchange_oid} disappeared from exchange but was not found in our DB. This might be an order placed externally.")
|
|
|
-
|
|
|
-
|
|
|
- if len(external_cancellations) > 1:
|
|
|
- tokens_affected = list(set(item['token'] for item in external_cancellations))
|
|
|
-
|
|
|
- if self.notification_manager:
|
|
|
- await self.notification_manager.send_generic_notification(
|
|
|
- f"⚠️ <b>Multiple External Cancellations Detected</b>\n\n"
|
|
|
- f"Orders Cancelled: {len(external_cancellations)}\n"
|
|
|
- f"Tokens Affected: {', '.join(tokens_affected)}\n"
|
|
|
- f"Source: Direct cancellation on Hyperliquid\n"
|
|
|
- f"Linked Stop Losses Cancelled: {total_linked_cancelled}\n"
|
|
|
- f"Time: {datetime.now().strftime('%H:%M:%S')}\n\n"
|
|
|
- f"💡 Check individual orders for details"
|
|
|
- )
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"❌ Error processing disappeared orders: {e}", exc_info=True)
|
|
|
-
|
|
|
- async def _check_price_alarms(self):
|
|
|
- """Check price alarms and trigger notifications."""
|
|
|
- try:
|
|
|
- active_alarms = self.alarm_manager.get_all_active_alarms()
|
|
|
-
|
|
|
- if not active_alarms:
|
|
|
- return
|
|
|
-
|
|
|
-
|
|
|
- tokens_to_check = list(set(alarm['token'] for alarm in active_alarms))
|
|
|
-
|
|
|
- for token in tokens_to_check:
|
|
|
- try:
|
|
|
-
|
|
|
- symbol = f"{token}/USDC:USDC"
|
|
|
- market_data = self.trading_engine.get_market_data(symbol)
|
|
|
-
|
|
|
- if not market_data or not market_data.get('ticker'):
|
|
|
- continue
|
|
|
-
|
|
|
- current_price = float(market_data['ticker'].get('last', 0))
|
|
|
- if current_price <= 0:
|
|
|
- continue
|
|
|
-
|
|
|
-
|
|
|
- token_alarms = [alarm for alarm in active_alarms if alarm['token'] == token]
|
|
|
-
|
|
|
- for alarm in token_alarms:
|
|
|
- target_price = alarm['target_price']
|
|
|
- direction = alarm['direction']
|
|
|
-
|
|
|
-
|
|
|
- should_trigger = False
|
|
|
- if direction == 'above' and current_price >= target_price:
|
|
|
- should_trigger = True
|
|
|
- elif direction == 'below' and current_price <= target_price:
|
|
|
- should_trigger = True
|
|
|
-
|
|
|
- if should_trigger:
|
|
|
-
|
|
|
- triggered_alarm = self.alarm_manager.trigger_alarm(alarm['id'], current_price)
|
|
|
- if triggered_alarm:
|
|
|
- await self._send_alarm_notification(triggered_alarm)
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"Error checking alarms for {token}: {e}")
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"❌ Error checking price alarms: {e}")
|
|
|
-
|
|
|
- async def _send_alarm_notification(self, alarm: Dict[str, Any]):
|
|
|
- """Send notification for triggered alarm."""
|
|
|
- try:
|
|
|
-
|
|
|
- if self.notification_manager:
|
|
|
- await self.notification_manager.send_alarm_triggered_notification(
|
|
|
- alarm['token'],
|
|
|
- alarm['target_price'],
|
|
|
- alarm['triggered_price'],
|
|
|
- alarm['direction']
|
|
|
- )
|
|
|
- else:
|
|
|
-
|
|
|
- logger.info(f"🔔 ALARM TRIGGERED: {alarm['token']} @ ${alarm['triggered_price']:,.2f}")
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"❌ Error sending alarm notification: {e}")
|
|
|
-
|
|
|
- async def _check_external_trades(self):
|
|
|
- """Check for trades made outside the Telegram bot and update stats."""
|
|
|
- try:
|
|
|
- stats = self.trading_engine.get_stats()
|
|
|
- if not stats:
|
|
|
- logger.warning("TradingStats not available in _check_external_trades. Skipping.")
|
|
|
- return
|
|
|
-
|
|
|
- external_trades_processed = 0
|
|
|
- symbols_with_fills = set()
|
|
|
-
|
|
|
-
|
|
|
- recent_fills = self.trading_engine.get_recent_fills()
|
|
|
- if not recent_fills:
|
|
|
- logger.debug("No recent fills data available")
|
|
|
- return
|
|
|
-
|
|
|
-
|
|
|
- if not hasattr(self, 'last_processed_trade_time') or self.last_processed_trade_time is None:
|
|
|
- try:
|
|
|
- last_time_str = self.trading_engine.stats._get_metadata('last_processed_trade_time')
|
|
|
- if last_time_str:
|
|
|
- self.last_processed_trade_time = datetime.fromisoformat(last_time_str)
|
|
|
- else:
|
|
|
-
|
|
|
- self.last_processed_trade_time = datetime.now(timezone.utc) - timedelta(hours=1)
|
|
|
- except Exception:
|
|
|
- self.last_processed_trade_time = datetime.now(timezone.utc) - timedelta(hours=1)
|
|
|
-
|
|
|
-
|
|
|
- for fill in recent_fills:
|
|
|
- try:
|
|
|
- trade_id = fill.get('id')
|
|
|
- timestamp_ms = fill.get('timestamp')
|
|
|
- symbol_from_fill = fill.get('symbol')
|
|
|
- side_from_fill = fill.get('side')
|
|
|
- amount_from_fill = float(fill.get('amount', 0))
|
|
|
- price_from_fill = float(fill.get('price', 0))
|
|
|
-
|
|
|
- timestamp_dt = datetime.fromtimestamp(timestamp_ms / 1000, tz=timezone.utc) if timestamp_ms else datetime.now(timezone.utc)
|
|
|
-
|
|
|
- if timestamp_dt <= self.last_processed_trade_time:
|
|
|
- continue
|
|
|
-
|
|
|
- fill_processed_this_iteration = False
|
|
|
-
|
|
|
- if not (symbol_from_fill and side_from_fill and amount_from_fill > 0 and price_from_fill > 0):
|
|
|
- logger.warning(f"Skipping fill with incomplete data: {fill}")
|
|
|
- continue
|
|
|
-
|
|
|
- full_symbol = symbol_from_fill
|
|
|
- token = symbol_from_fill.split('/')[0] if '/' in symbol_from_fill else symbol_from_fill.split(':')[0]
|
|
|
-
|
|
|
-
|
|
|
- exchange_order_id_from_fill = fill.get('info', {}).get('oid')
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- if exchange_order_id_from_fill:
|
|
|
- pending_lc = stats.get_lifecycle_by_entry_order_id(exchange_order_id_from_fill, status='pending')
|
|
|
- if pending_lc and pending_lc.get('symbol') == full_symbol:
|
|
|
- success = stats.update_trade_position_opened(
|
|
|
- lifecycle_id=pending_lc['trade_lifecycle_id'],
|
|
|
- entry_price=price_from_fill,
|
|
|
- entry_amount=amount_from_fill,
|
|
|
- exchange_fill_id=trade_id
|
|
|
- )
|
|
|
- if success:
|
|
|
- logger.info(f"📈 Lifecycle ENTRY: {pending_lc['trade_lifecycle_id']} for {full_symbol} updated by fill {trade_id}.")
|
|
|
- symbols_with_fills.add(token)
|
|
|
- order_in_db_for_entry = stats.get_order_by_exchange_id(exchange_order_id_from_fill)
|
|
|
- if order_in_db_for_entry:
|
|
|
- stats.update_order_status(order_db_id=order_in_db_for_entry['id'], new_status='filled', amount_filled_increment=amount_from_fill)
|
|
|
- fill_processed_this_iteration = True
|
|
|
-
|
|
|
-
|
|
|
- if not fill_processed_this_iteration:
|
|
|
- active_lc = None
|
|
|
- closure_reason_action_type = None
|
|
|
- bot_order_db_id_to_update = None
|
|
|
-
|
|
|
- if exchange_order_id_from_fill:
|
|
|
-
|
|
|
- bot_order_for_fill = stats.get_order_by_exchange_id(exchange_order_id_from_fill)
|
|
|
-
|
|
|
- if bot_order_for_fill and bot_order_for_fill.get('symbol') == full_symbol:
|
|
|
- order_type = bot_order_for_fill.get('type')
|
|
|
- order_side = bot_order_for_fill.get('side')
|
|
|
-
|
|
|
-
|
|
|
- if order_type == 'market':
|
|
|
- potential_lc = stats.get_trade_by_symbol_and_status(full_symbol, 'position_opened')
|
|
|
- if potential_lc:
|
|
|
- lc_pos_side = potential_lc.get('position_side')
|
|
|
-
|
|
|
- if (lc_pos_side == 'long' and order_side == 'sell' and side_from_fill == 'sell') or \
|
|
|
- (lc_pos_side == 'short' and order_side == 'buy' and side_from_fill == 'buy'):
|
|
|
- active_lc = potential_lc
|
|
|
- closure_reason_action_type = f"bot_exit_{lc_pos_side}_close"
|
|
|
- bot_order_db_id_to_update = bot_order_for_fill.get('id')
|
|
|
- logger.info(f"ℹ️ Lifecycle BOT EXIT: Fill {trade_id} (OID {exchange_order_id_from_fill}) for {full_symbol} matches bot exit for lifecycle {active_lc['trade_lifecycle_id']}.")
|
|
|
-
|
|
|
-
|
|
|
- if not active_lc:
|
|
|
-
|
|
|
- lc_by_sl = stats.get_lifecycle_by_sl_order_id(exchange_order_id_from_fill, status='position_opened')
|
|
|
- if lc_by_sl and lc_by_sl.get('symbol') == full_symbol:
|
|
|
- active_lc = lc_by_sl
|
|
|
- closure_reason_action_type = f"sl_{active_lc.get('position_side')}_close"
|
|
|
-
|
|
|
- bot_order_db_id_to_update = bot_order_for_fill.get('id')
|
|
|
- logger.info(f"ℹ️ Lifecycle SL: Fill {trade_id} for OID {exchange_order_id_from_fill} matches SL for lifecycle {active_lc['trade_lifecycle_id']}.")
|
|
|
-
|
|
|
- if not active_lc:
|
|
|
- lc_by_tp = stats.get_lifecycle_by_tp_order_id(exchange_order_id_from_fill, status='position_opened')
|
|
|
- if lc_by_tp and lc_by_tp.get('symbol') == full_symbol:
|
|
|
- active_lc = lc_by_tp
|
|
|
- closure_reason_action_type = f"tp_{active_lc.get('position_side')}_close"
|
|
|
-
|
|
|
- bot_order_db_id_to_update = bot_order_for_fill.get('id')
|
|
|
- logger.info(f"ℹ️ Lifecycle TP: Fill {trade_id} for OID {exchange_order_id_from_fill} matches TP for lifecycle {active_lc['trade_lifecycle_id']}.")
|
|
|
-
|
|
|
-
|
|
|
- if not active_lc:
|
|
|
- potential_lc_external = stats.get_trade_by_symbol_and_status(full_symbol, 'position_opened')
|
|
|
- if potential_lc_external:
|
|
|
- lc_pos_side = potential_lc_external.get('position_side')
|
|
|
-
|
|
|
- if (lc_pos_side == 'long' and side_from_fill == 'sell') or \
|
|
|
- (lc_pos_side == 'short' and side_from_fill == 'buy'):
|
|
|
- active_lc = potential_lc_external
|
|
|
- closure_reason_action_type = f"external_{lc_pos_side}_close"
|
|
|
- logger.info(f"ℹ️ Lifecycle EXTERNAL CLOSE: Fill {trade_id} for {full_symbol} (no matching bot OID) for lifecycle {active_lc['trade_lifecycle_id']}.")
|
|
|
-
|
|
|
-
|
|
|
- if active_lc and closure_reason_action_type:
|
|
|
- lc_id = active_lc['trade_lifecycle_id']
|
|
|
- lc_entry_price = active_lc.get('entry_price', 0)
|
|
|
- lc_position_side = active_lc.get('position_side')
|
|
|
-
|
|
|
- realized_pnl = 0
|
|
|
- if lc_position_side == 'long':
|
|
|
- realized_pnl = amount_from_fill * (price_from_fill - lc_entry_price)
|
|
|
- elif lc_position_side == 'short':
|
|
|
- realized_pnl = amount_from_fill * (lc_entry_price - price_from_fill)
|
|
|
-
|
|
|
- success = stats.update_trade_position_closed(
|
|
|
- lifecycle_id=lc_id, exit_price=price_from_fill,
|
|
|
- realized_pnl=realized_pnl, exchange_fill_id=trade_id
|
|
|
- )
|
|
|
- if success:
|
|
|
- pnl_emoji = "🟢" if realized_pnl >= 0 else "🔴"
|
|
|
- formatter = get_formatter()
|
|
|
- logger.info(f"{pnl_emoji} Lifecycle CLOSED: {lc_id} ({closure_reason_action_type}). PNL for fill: {formatter.format_price_with_symbol(realized_pnl)}")
|
|
|
- symbols_with_fills.add(token)
|
|
|
- if self.notification_manager:
|
|
|
- await self.notification_manager.send_external_trade_notification(
|
|
|
- full_symbol, side_from_fill, amount_from_fill, price_from_fill,
|
|
|
- closure_reason_action_type, timestamp_dt.isoformat()
|
|
|
- )
|
|
|
- stats._migrate_trade_to_aggregated_stats(lc_id)
|
|
|
- if bot_order_db_id_to_update:
|
|
|
- stats.update_order_status(order_db_id=bot_order_db_id_to_update, new_status='filled', amount_filled_increment=amount_from_fill)
|
|
|
- fill_processed_this_iteration = True
|
|
|
-
|
|
|
-
|
|
|
- if not fill_processed_this_iteration:
|
|
|
- if exchange_order_id_from_fill and exchange_order_id_from_fill in self.external_stop_losses:
|
|
|
- stop_loss_info = self.external_stop_losses[exchange_order_id_from_fill]
|
|
|
- formatter = get_formatter()
|
|
|
- logger.info(f"🛑 External SL (MM Tracking): {token} Order {exchange_order_id_from_fill} filled @ {formatter.format_price_with_symbol(price_from_fill, token)}")
|
|
|
-
|
|
|
- sl_active_lc = stats.get_trade_by_symbol_and_status(full_symbol, 'position_opened')
|
|
|
- if sl_active_lc:
|
|
|
- lc_id = sl_active_lc['trade_lifecycle_id']
|
|
|
- lc_entry_price = sl_active_lc.get('entry_price', 0)
|
|
|
- lc_pos_side = sl_active_lc.get('position_side')
|
|
|
- realized_pnl = amount_from_fill * (price_from_fill - lc_entry_price) if lc_pos_side == 'long' else amount_from_fill * (lc_entry_price - price_from_fill)
|
|
|
-
|
|
|
- success = stats.update_trade_position_closed(lc_id, price_from_fill, realized_pnl, trade_id)
|
|
|
- if success:
|
|
|
- pnl_emoji = "🟢" if realized_pnl >= 0 else "🔴"
|
|
|
- logger.info(f"{pnl_emoji} Lifecycle CLOSED by External SL (MM): {lc_id}. PNL: {formatter.format_price_with_symbol(realized_pnl)}")
|
|
|
- if self.notification_manager:
|
|
|
- await self.notification_manager.send_stop_loss_execution_notification(
|
|
|
- stop_loss_info, full_symbol, side_from_fill, amount_from_fill, price_from_fill,
|
|
|
- f'{lc_pos_side}_closed_external_sl', timestamp_dt.isoformat(), realized_pnl
|
|
|
- )
|
|
|
-
|
|
|
- stats._migrate_trade_to_aggregated_stats(lc_id)
|
|
|
- del self.external_stop_losses[exchange_order_id_from_fill]
|
|
|
- fill_processed_this_iteration = True
|
|
|
- else:
|
|
|
- logger.warning(f"⚠️ External SL (MM) {exchange_order_id_from_fill} for {full_symbol}, but no active lifecycle found.")
|
|
|
-
|
|
|
-
|
|
|
- if not fill_processed_this_iteration:
|
|
|
-
|
|
|
-
|
|
|
- existing_open_lc = stats.get_trade_by_symbol_and_status(full_symbol, 'position_opened')
|
|
|
- if existing_open_lc:
|
|
|
- lc_id = existing_open_lc['trade_lifecycle_id']
|
|
|
- lc_entry_price = existing_open_lc.get('entry_price', 0)
|
|
|
- lc_position_side = existing_open_lc.get('position_side')
|
|
|
- lc_current_size_before_fill = existing_open_lc.get('current_position_size', 0)
|
|
|
-
|
|
|
- is_potentially_closing_external_fill = False
|
|
|
- if lc_position_side == 'long' and side_from_fill.lower() == 'sell':
|
|
|
- is_potentially_closing_external_fill = True
|
|
|
- elif lc_position_side == 'short' and side_from_fill.lower() == 'buy':
|
|
|
- is_potentially_closing_external_fill = True
|
|
|
-
|
|
|
- if is_potentially_closing_external_fill:
|
|
|
- logger.info(f"ℹ️ Detected potentially closing external fill {trade_id} for {full_symbol} (Lifecycle: {lc_id}). Verifying exchange position state...")
|
|
|
-
|
|
|
-
|
|
|
- fresh_positions_after_fill = self.trading_engine.get_positions() or []
|
|
|
- position_on_exchange_after_fill = None
|
|
|
- for pos in fresh_positions_after_fill:
|
|
|
- if pos.get('symbol') == full_symbol:
|
|
|
- position_on_exchange_after_fill = pos
|
|
|
- break
|
|
|
-
|
|
|
- position_is_closed_on_exchange = False
|
|
|
- if position_on_exchange_after_fill is None:
|
|
|
- position_is_closed_on_exchange = True
|
|
|
- logger.info(f"✅ Exchange Verification: Position for {full_symbol} (Lifecycle: {lc_id}) not found after fill {trade_id}. Confirming closure.")
|
|
|
- elif abs(float(position_on_exchange_after_fill.get('contracts', 0))) < 1e-9:
|
|
|
- position_is_closed_on_exchange = True
|
|
|
- logger.info(f"✅ Exchange Verification: Position for {full_symbol} (Lifecycle: {lc_id}) has zero size on exchange after fill {trade_id}. Confirming closure.")
|
|
|
-
|
|
|
- if position_is_closed_on_exchange:
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- amount_for_pnl_calc = amount_from_fill
|
|
|
-
|
|
|
-
|
|
|
- if abs(lc_current_size_before_fill - amount_from_fill) < 0.000001 * amount_from_fill:
|
|
|
- amount_for_pnl_calc = lc_current_size_before_fill
|
|
|
-
|
|
|
-
|
|
|
- logger.info(f"ℹ️ Attempting to close lifecycle {lc_id} for {full_symbol} via confirmed external fill {trade_id}.")
|
|
|
- realized_pnl = 0
|
|
|
- if lc_position_side == 'long':
|
|
|
- realized_pnl = amount_for_pnl_calc * (price_from_fill - lc_entry_price)
|
|
|
- elif lc_position_side == 'short':
|
|
|
- realized_pnl = amount_for_pnl_calc * (lc_entry_price - price_from_fill)
|
|
|
-
|
|
|
- success = stats.update_trade_position_closed(
|
|
|
- lifecycle_id=lc_id,
|
|
|
- exit_price=price_from_fill,
|
|
|
- realized_pnl=realized_pnl,
|
|
|
- exchange_fill_id=trade_id
|
|
|
- )
|
|
|
- if success:
|
|
|
- pnl_emoji = "🟢" if realized_pnl >= 0 else "🔴"
|
|
|
- formatter = get_formatter()
|
|
|
- logger.info(f"{pnl_emoji} Lifecycle CLOSED (Verified External): {lc_id}. PNL for fill: {formatter.format_price_with_symbol(realized_pnl)}")
|
|
|
- symbols_with_fills.add(token)
|
|
|
- if self.notification_manager:
|
|
|
- await self.notification_manager.send_external_trade_notification(
|
|
|
- full_symbol, side_from_fill, amount_from_fill, price_from_fill,
|
|
|
- f"verified_external_{lc_position_side}_close",
|
|
|
- timestamp_dt.isoformat()
|
|
|
- )
|
|
|
-
|
|
|
- stats._migrate_trade_to_aggregated_stats(lc_id)
|
|
|
- fill_processed_this_iteration = True
|
|
|
- else:
|
|
|
- logger.error(f"❌ Failed to close lifecycle {lc_id} via verified external fill {trade_id}.")
|
|
|
- else:
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- current_size_on_exchange = float(position_on_exchange_after_fill.get('contracts', 0)) if position_on_exchange_after_fill else 'Unknown'
|
|
|
- logger.warning(f"⚠️ External fill {trade_id} for {full_symbol} (Lifecycle: {lc_id}, Amount: {amount_from_fill}) did NOT fully close position. Exchange size now: {current_size_on_exchange}. Lifecycle remains open. Fill will be recorded as 'external_unmatched'.")
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- if not fill_processed_this_iteration:
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- current_positions_from_cache_map = {
|
|
|
- pos.get('symbol'): pos for pos in (self.cached_positions or [])
|
|
|
- if pos.get('symbol') and abs(float(pos.get('contracts', 0))) > 1e-9
|
|
|
- }
|
|
|
-
|
|
|
- all_open_positions_in_db = stats.get_open_positions()
|
|
|
- db_open_symbols = {pos_db.get('symbol') for pos_db in all_open_positions_in_db}
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- if full_symbol in db_open_symbols:
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- logger.error(f"🚨 DIAGNOSTIC: Contradiction for {full_symbol}! get_open_positions() includes it, but get_trade_by_symbol_and_status('{full_symbol}', 'position_opened') failed to find it within _check_external_trades context for fill {trade_id}. This needs investigation into TradingStats symbol querying.")
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- potential_match_failure_logged = False
|
|
|
- if not stats.get_trade_by_symbol_and_status(full_symbol, 'position_opened'):
|
|
|
- logger.warning(f"⚠️ DIAGNOSTIC for UNMATCHED FILL {trade_id} ({full_symbol}):")
|
|
|
- logger.warning(f" Fill details: Side={side_from_fill}, Amount={amount_from_fill}, Price={price_from_fill}")
|
|
|
- logger.warning(f" Attempted lookup with full_symbol='{full_symbol}' and status='position_opened' found NO active lifecycle.")
|
|
|
- if all_open_positions_in_db:
|
|
|
- logger.warning(f" However, DB currently has these 'position_opened' lifecycles (symbol - lifecycle_id):")
|
|
|
- for db_pos in all_open_positions_in_db:
|
|
|
- logger.warning(f" - '{db_pos.get('symbol')}' - ID: {db_pos.get('trade_lifecycle_id')}")
|
|
|
-
|
|
|
- base_token_fill = full_symbol.split('/')[0].split(':')[0]
|
|
|
- near_matches = [db_s for db_s in db_open_symbols if base_token_fill in db_s]
|
|
|
- if near_matches:
|
|
|
- logger.warning(f" Possible near matches in DB for base token '{base_token_fill}': {near_matches}")
|
|
|
- else:
|
|
|
- logger.warning(f" No near matches found in DB for base token '{base_token_fill}'.")
|
|
|
- else:
|
|
|
- logger.warning(" DB has NO 'position_opened' lifecycles at all right now.")
|
|
|
- potential_match_failure_logged = True
|
|
|
-
|
|
|
-
|
|
|
- linked_order_db_id = None
|
|
|
- if exchange_order_id_from_fill:
|
|
|
- order_in_db = stats.get_order_by_exchange_id(exchange_order_id_from_fill)
|
|
|
- if order_in_db:
|
|
|
- linked_order_db_id = order_in_db.get('id')
|
|
|
- logger.info(f"🔗 Fallback: Fill {trade_id} for OID {exchange_order_id_from_fill} (DB ID {linked_order_db_id}) not tied to active lifecycle step.")
|
|
|
- current_status = order_in_db.get('status', '')
|
|
|
- if current_status in ['open', 'partially_filled', 'pending_submission']:
|
|
|
- amt_req = float(order_in_db.get('amount_requested', 0))
|
|
|
- amt_filled_so_far = float(order_in_db.get('amount_filled',0))
|
|
|
- new_status = 'partially_filled'
|
|
|
- if (amt_filled_so_far + amount_from_fill) >= amt_req - 1e-9:
|
|
|
- new_status = 'filled'
|
|
|
- stats.update_order_status(
|
|
|
- order_db_id=linked_order_db_id, new_status=new_status,
|
|
|
- amount_filled_increment=amount_from_fill
|
|
|
- )
|
|
|
- logger.info(f"📊 Updated bot order {linked_order_db_id} (fallback): {current_status} → {new_status}")
|
|
|
-
|
|
|
- if not (hasattr(stats, 'get_trade_by_exchange_fill_id') and stats.get_trade_by_exchange_fill_id(trade_id)):
|
|
|
- stats.record_trade(
|
|
|
- full_symbol, side_from_fill, amount_from_fill, price_from_fill,
|
|
|
- exchange_fill_id=trade_id, trade_type="external_unmatched",
|
|
|
- timestamp=timestamp_dt.isoformat(),
|
|
|
- linked_order_table_id_to_link=linked_order_db_id
|
|
|
- )
|
|
|
- logger.info(f"📋 Recorded trade via FALLBACK: {trade_id} (Unmatched External Fill)")
|
|
|
- fill_processed_this_iteration = True
|
|
|
-
|
|
|
-
|
|
|
- if fill_processed_this_iteration:
|
|
|
- external_trades_processed += 1
|
|
|
- if self.last_processed_trade_time is None or timestamp_dt > self.last_processed_trade_time:
|
|
|
- self.last_processed_trade_time = timestamp_dt
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"Error processing fill {fill.get('id', 'N/A')}: {e}", exc_info=True)
|
|
|
- continue
|
|
|
-
|
|
|
-
|
|
|
- if external_trades_processed > 0:
|
|
|
-
|
|
|
- stats._set_metadata('market_monitor_last_processed_trade_time', self.last_processed_trade_time.isoformat())
|
|
|
- logger.info(f"💾 Saved MarketMonitor state (last_processed_trade_time) to DB: {self.last_processed_trade_time.isoformat()}")
|
|
|
- logger.info(f"📊 Processed {external_trades_processed} external trades")
|
|
|
- if symbols_with_fills:
|
|
|
- logger.info(f"ℹ️ Symbols with processed fills this cycle: {list(symbols_with_fills)}")
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"❌ Error checking external trades: {e}", exc_info=True)
|
|
|
-
|
|
|
- async def _check_pending_triggers(self):
|
|
|
- """Check and process pending conditional triggers (e.g., SL/TP)."""
|
|
|
- stats = self.trading_engine.get_stats()
|
|
|
- if not stats:
|
|
|
- logger.warning("⚠️ TradingStats not available in _check_pending_triggers.")
|
|
|
- return
|
|
|
-
|
|
|
- try:
|
|
|
-
|
|
|
-
|
|
|
- pending_sl_triggers = stats.get_orders_by_status(status='pending_trigger', order_type_filter='stop_limit_trigger')
|
|
|
-
|
|
|
- if not pending_sl_triggers:
|
|
|
- return
|
|
|
-
|
|
|
- logger.debug(f"Found {len(pending_sl_triggers)} pending SL triggers to check.")
|
|
|
-
|
|
|
- for trigger_order in pending_sl_triggers:
|
|
|
- symbol = trigger_order['symbol']
|
|
|
- trigger_price = trigger_order['price']
|
|
|
- trigger_side = trigger_order['side']
|
|
|
- order_db_id = trigger_order['id']
|
|
|
- parent_ref_id = trigger_order.get('parent_bot_order_ref_id')
|
|
|
-
|
|
|
- if not symbol or trigger_price is None:
|
|
|
- logger.warning(f"Invalid trigger order data for DB ID {order_db_id}, skipping: {trigger_order}")
|
|
|
- continue
|
|
|
-
|
|
|
- market_data = self.trading_engine.get_market_data(symbol)
|
|
|
- if not market_data or not market_data.get('ticker'):
|
|
|
- logger.warning(f"Could not fetch market data for {symbol} to check SL trigger {order_db_id}.")
|
|
|
- continue
|
|
|
-
|
|
|
- current_price = float(market_data['ticker'].get('last', 0))
|
|
|
- if current_price <= 0:
|
|
|
- logger.warning(f"Invalid current price ({current_price}) for {symbol} checking SL trigger {order_db_id}.")
|
|
|
- continue
|
|
|
-
|
|
|
- trigger_hit = False
|
|
|
- if trigger_side.lower() == 'sell' and current_price <= trigger_price:
|
|
|
- trigger_hit = True
|
|
|
- logger.info(f"🔴 SL TRIGGER HIT (Sell): Order DB ID {order_db_id}, Symbol {symbol}, Trigger@ ${trigger_price:.4f}, Market@ ${current_price:.4f}")
|
|
|
- elif trigger_side.lower() == 'buy' and current_price >= trigger_price:
|
|
|
- trigger_hit = True
|
|
|
- logger.info(f"🟢 SL TRIGGER HIT (Buy): Order DB ID {order_db_id}, Symbol {symbol}, Trigger@ ${trigger_price:.4f}, Market@ ${current_price:.4f}")
|
|
|
-
|
|
|
- if trigger_hit:
|
|
|
- logger.info(f"Attempting to execute actual stop order for triggered DB ID: {order_db_id} (Parent Bot Ref: {trigger_order.get('parent_bot_order_ref_id')})")
|
|
|
-
|
|
|
- execution_result = await self.trading_engine.execute_triggered_stop_order(original_trigger_order_db_id=order_db_id)
|
|
|
-
|
|
|
- notification_message_detail = ""
|
|
|
-
|
|
|
- if execution_result.get("success"):
|
|
|
- new_trigger_status = 'triggered_order_placed'
|
|
|
- placed_sl_details = execution_result.get("placed_sl_order_details", {})
|
|
|
- logger.info(f"Successfully placed actual SL order from trigger {order_db_id}. New SL Order DB ID: {placed_sl_details.get('order_db_id')}, Exchange ID: {placed_sl_details.get('exchange_order_id')}")
|
|
|
- notification_message_detail = f"Actual SL order placed (New DB ID: {placed_sl_details.get('order_db_id', 'N/A')})."
|
|
|
- else:
|
|
|
- new_trigger_status = 'trigger_execution_failed'
|
|
|
- error_msg = execution_result.get("error", "Unknown error during SL execution.")
|
|
|
- logger.error(f"Failed to execute actual SL order from trigger {order_db_id}: {error_msg}")
|
|
|
- notification_message_detail = f"Failed to place actual SL order: {error_msg}"
|
|
|
+ logger.debug(f"No lifecycle_id for open position {symbol} in _update_cached_data, skipping detailed stats update.")
|
|
|
+ continue
|
|
|
|
|
|
- stats.update_order_status(order_db_id=order_db_id, new_status=new_trigger_status)
|
|
|
-
|
|
|
- if self.notification_manager:
|
|
|
- await self.notification_manager.send_generic_notification(
|
|
|
- f"🔔 Stop-Loss Update!\nSymbol: {symbol}\nSide: {trigger_side.upper()}\nTrigger Price: ${trigger_price:.4f}\nMarket Price: ${current_price:.4f}\n(Original Trigger DB ID: {order_db_id}, Parent: {parent_ref_id or 'N/A'})\nStatus: {new_trigger_status.replace('_', ' ').title()}\nDetails: {notification_message_detail}"
|
|
|
- )
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"❌ Error checking pending SL triggers: {e}", exc_info=True)
|
|
|
-
|
|
|
- async def _check_automatic_risk_management(self):
|
|
|
- """Check for automatic stop loss triggers based on Config.STOP_LOSS_PERCENTAGE as safety net."""
|
|
|
- try:
|
|
|
-
|
|
|
- if not getattr(Config, 'RISK_MANAGEMENT_ENABLED', True) or Config.STOP_LOSS_PERCENTAGE <= 0:
|
|
|
- return
|
|
|
-
|
|
|
-
|
|
|
- positions = self.cached_positions or []
|
|
|
- if not positions:
|
|
|
-
|
|
|
- await self._cleanup_orphaned_stop_losses()
|
|
|
- return
|
|
|
-
|
|
|
- for position in positions:
|
|
|
- try:
|
|
|
- symbol = position.get('symbol', '')
|
|
|
- contracts = float(position.get('contracts', 0))
|
|
|
- entry_price = float(position.get('entryPx', 0))
|
|
|
- mark_price = float(position.get('markPx', 0))
|
|
|
- unrealized_pnl = float(position.get('unrealizedPnl', 0))
|
|
|
-
|
|
|
-
|
|
|
- if contracts == 0 or entry_price <= 0 or mark_price <= 0:
|
|
|
- continue
|
|
|
-
|
|
|
-
|
|
|
- entry_value = abs(contracts) * entry_price
|
|
|
- if entry_value <= 0:
|
|
|
- continue
|
|
|
-
|
|
|
- pnl_percentage = (unrealized_pnl / entry_value) * 100
|
|
|
-
|
|
|
-
|
|
|
- if pnl_percentage <= -Config.STOP_LOSS_PERCENTAGE:
|
|
|
- token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0]
|
|
|
- position_side = "LONG" if contracts > 0 else "SHORT"
|
|
|
-
|
|
|
-
|
|
|
- stats = self.trading_engine.get_stats()
|
|
|
- lifecycle_id_str = "N/A"
|
|
|
- if stats:
|
|
|
- active_trade_lc = stats.get_trade_by_symbol_and_status(symbol, 'position_opened')
|
|
|
- if active_trade_lc:
|
|
|
- lifecycle_id_str = active_trade_lc.get('trade_lifecycle_id', "N/A")[:8] + "..."
|
|
|
-
|
|
|
- logger.warning(f"🚨 AUTOMATIC STOP LOSS TRIGGERED: {token} {position_side} position (Lifecycle: {lifecycle_id_str}) has {pnl_percentage:.2f}% loss (threshold: -{Config.STOP_LOSS_PERCENTAGE}%)")
|
|
|
-
|
|
|
-
|
|
|
- if self.notification_manager:
|
|
|
- await self.notification_manager.send_generic_notification(
|
|
|
- f"🚨 AUTOMATIC STOP LOSS TRIGGERED!\\n"
|
|
|
- f"Token: {token}\\n"
|
|
|
- f"Lifecycle ID: {lifecycle_id_str}\\n"
|
|
|
- f"Position: {position_side} {abs(contracts):.6f}\\n"
|
|
|
- f"Entry Price: ${entry_price:.4f}\\n"
|
|
|
- f"Current Price: ${mark_price:.4f}\\n"
|
|
|
- f"Unrealized P&L: ${unrealized_pnl:.2f} ({pnl_percentage:.2f}%)\\n"
|
|
|
- f"Safety Threshold: -{Config.STOP_LOSS_PERCENTAGE}%\\n"
|
|
|
- f"Action: Executing emergency exit order..."
|
|
|
- )
|
|
|
-
|
|
|
-
|
|
|
- exit_result = await self.trading_engine.execute_exit_order(token)
|
|
|
-
|
|
|
- if exit_result.get('success'):
|
|
|
- placed_order_details = exit_result.get('order_placed_details', {})
|
|
|
- logger.info(f"✅ Emergency exit order placed for {token} (Lifecycle: {lifecycle_id_str}). Order details: {placed_order_details}")
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- if stats:
|
|
|
- cancelled_sl_count = stats.cancel_pending_stop_losses_by_symbol(
|
|
|
- symbol=symbol,
|
|
|
- new_status='cancelled_auto_exit'
|
|
|
- )
|
|
|
- if cancelled_sl_count > 0:
|
|
|
- logger.info(f"🛑 Cancelled {cancelled_sl_count} pending stop losses for {symbol} (Lifecycle: {lifecycle_id_str}) after automatic exit")
|
|
|
-
|
|
|
- if self.notification_manager:
|
|
|
- await self.notification_manager.send_generic_notification(
|
|
|
- f"✅ <b>Emergency Exit Initiated</b>\\n\\n"
|
|
|
- f"📊 <b>Position:</b> {token} {position_side}\\n"
|
|
|
- f"🆔 <b>Lifecycle ID:</b> {lifecycle_id_str}\\n"
|
|
|
- f"📉 <b>Loss at Trigger:</b> {pnl_percentage:.2f}% (${unrealized_pnl:.2f})\\n"
|
|
|
- f"⚠️ <b>Threshold:</b> -{Config.STOP_LOSS_PERCENTAGE}%\\n"
|
|
|
- f"✅ <b>Action:</b> Market exit order placed successfully\\n"
|
|
|
- f"🆔 <b>Exit Order ID:</b> {placed_order_details.get('exchange_order_id', 'N/A')}\\n"
|
|
|
- f"{f'🛑 <b>Cleanup:</b> Cancelled {cancelled_sl_count} other pending stop losses' if cancelled_sl_count > 0 else ''}\\n\\n"
|
|
|
- f"🛡️ The system will confirm closure and P&L once the exit order fill is processed."
|
|
|
- )
|
|
|
- else:
|
|
|
- error_msg = exit_result.get('error', 'Unknown error')
|
|
|
- logger.error(f"❌ Failed to execute emergency exit order for {token} (Lifecycle: {lifecycle_id_str}): {error_msg}")
|
|
|
-
|
|
|
- if self.notification_manager:
|
|
|
- await self.notification_manager.send_generic_notification(
|
|
|
- f"❌ <b>CRITICAL: Emergency Exit Failed!</b>\\n\\n"
|
|
|
- f"📊 <b>Position:</b> {token} {position_side}\\n"
|
|
|
- f"🆔 <b>Lifecycle ID:</b> {lifecycle_id_str}\\n"
|
|
|
- f"📉 <b>Loss:</b> {pnl_percentage:.2f}%\\n"
|
|
|
- f"❌ <b>Error Placing Order:</b> {error_msg}\\n\\n"
|
|
|
- f"⚠️ <b>MANUAL INTERVENTION REQUIRED</b>\\n"
|
|
|
- f"Please close this position manually via /exit {token}"
|
|
|
- )
|
|
|
-
|
|
|
- except Exception as pos_error:
|
|
|
- logger.error(f"Error processing position for automatic stop loss: {pos_error}")
|
|
|
- continue
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"❌ Error in automatic risk management check: {e}", exc_info=True)
|
|
|
-
|
|
|
- async def _cleanup_orphaned_stop_losses(self):
|
|
|
- """Clean up pending stop losses that no longer have corresponding positions OR whose parent orders have been cancelled/failed."""
|
|
|
- try:
|
|
|
- stats = self.trading_engine.get_stats()
|
|
|
- if not stats:
|
|
|
- return
|
|
|
-
|
|
|
- pending_stop_losses = stats.get_orders_by_status('pending_trigger', 'stop_limit_trigger')
|
|
|
-
|
|
|
- if not pending_stop_losses:
|
|
|
- return
|
|
|
-
|
|
|
- logger.debug(f"Checking {len(pending_stop_losses)} pending stop losses for orphaned orders")
|
|
|
-
|
|
|
- current_positions = self.cached_positions or []
|
|
|
- position_symbols = set()
|
|
|
-
|
|
|
- if current_positions:
|
|
|
- for pos in current_positions:
|
|
|
- symbol = pos.get('symbol')
|
|
|
- contracts = float(pos.get('contracts', 0))
|
|
|
- if symbol and contracts != 0:
|
|
|
- position_symbols.add(symbol)
|
|
|
-
|
|
|
- orphaned_count = 0
|
|
|
- for sl_order in pending_stop_losses:
|
|
|
- symbol = sl_order.get('symbol')
|
|
|
- order_db_id = sl_order.get('id')
|
|
|
- parent_bot_ref_id = sl_order.get('parent_bot_order_ref_id')
|
|
|
-
|
|
|
- should_cancel = False
|
|
|
- cancel_reason = ""
|
|
|
-
|
|
|
- if parent_bot_ref_id:
|
|
|
- parent_order = stats.get_order_by_bot_ref_id(parent_bot_ref_id)
|
|
|
- if parent_order:
|
|
|
- parent_status = parent_order.get('status', '').lower()
|
|
|
-
|
|
|
- if parent_order.get('exchange_order_id'):
|
|
|
- entry_oid = parent_order['exchange_order_id']
|
|
|
- lc_pending = stats.get_lifecycle_by_entry_order_id(entry_oid, status='pending')
|
|
|
- lc_cancelled = stats.get_lifecycle_by_entry_order_id(entry_oid, status='cancelled')
|
|
|
- lc_opened = stats.get_lifecycle_by_entry_order_id(entry_oid, status='position_opened')
|
|
|
-
|
|
|
- if parent_status == 'cancelled_externally':
|
|
|
- if lc_cancelled:
|
|
|
- should_cancel = True
|
|
|
- cancel_reason = f"parent order ({entry_oid[:6]}...) {parent_status} and lifecycle explicitly cancelled"
|
|
|
- elif not lc_pending and not lc_opened:
|
|
|
- should_cancel = True
|
|
|
- cancel_reason = f"parent order ({entry_oid[:6]}...) {parent_status} and no active/pending/cancelled lifecycle found"
|
|
|
- else:
|
|
|
- current_lc_status = "N/A"
|
|
|
- if lc_pending: current_lc_status = lc_pending.get('status')
|
|
|
- elif lc_opened: current_lc_status = lc_opened.get('status')
|
|
|
- logger.info(f"SL {order_db_id} for parent {parent_bot_ref_id} (status {parent_status}) - lifecycle is '{current_lc_status}'. SL not cancelled by this rule.")
|
|
|
- should_cancel = False
|
|
|
-
|
|
|
- elif parent_status in ['failed_submission', 'failed_submission_no_data', 'cancelled_manually', 'disappeared_from_exchange']:
|
|
|
- if not lc_opened:
|
|
|
- should_cancel = True
|
|
|
- cancel_reason = f"parent order ({entry_oid[:6]}...) {parent_status} and no 'position_opened' lifecycle"
|
|
|
- else:
|
|
|
- logger.info(f"SL {order_db_id} for parent {parent_bot_ref_id} (status {parent_status}) - lifecycle is 'position_opened'. SL not cancelled by this rule.")
|
|
|
- should_cancel = False
|
|
|
- elif parent_status == 'filled':
|
|
|
- if symbol not in position_symbols:
|
|
|
- should_cancel = True
|
|
|
- cancel_reason = "parent filled but actual position no longer exists"
|
|
|
-
|
|
|
-
|
|
|
- else:
|
|
|
-
|
|
|
- if parent_status in ['open', 'pending_submission', 'submitted']:
|
|
|
- logger.info(f"SL Cleanup: Parent order {parent_order.get('id')} (status '{parent_status}') is missing exchange_id in DB record. SL {sl_order.get('id')} will NOT be cancelled by this rule, assuming parent is still active or pending.")
|
|
|
- should_cancel = False
|
|
|
- if parent_status == 'open':
|
|
|
- logger.warning(f"SL Cleanup: DATA ANOMALY - Parent order {parent_order.get('id')} status is 'open' but fetched DB record shows no exchange_id. Investigate DB state for order {parent_order.get('id')}.")
|
|
|
- else:
|
|
|
-
|
|
|
-
|
|
|
- should_cancel = True
|
|
|
- cancel_reason = f"parent order {parent_status} (no exch_id) and status indicates it's not live/pending."
|
|
|
- else:
|
|
|
- should_cancel = True
|
|
|
- cancel_reason = "parent order not found in database"
|
|
|
- else:
|
|
|
-
|
|
|
-
|
|
|
- if symbol not in position_symbols:
|
|
|
- should_cancel = True
|
|
|
- cancel_reason = "no position exists and no parent reference"
|
|
|
-
|
|
|
- if should_cancel:
|
|
|
- success = stats.update_order_status(
|
|
|
- order_db_id=order_db_id,
|
|
|
- new_status='cancelled_orphaned'
|
|
|
- )
|
|
|
-
|
|
|
- if success:
|
|
|
- orphaned_count += 1
|
|
|
- token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0]
|
|
|
- logger.info(f"🧹 Cancelled orphaned stop loss for {token} (Order DB ID: {order_db_id}) - Reason: {cancel_reason}")
|
|
|
-
|
|
|
- if orphaned_count > 0:
|
|
|
- logger.info(f"🧹 Cleanup completed: Cancelled {orphaned_count} orphaned stop loss order(s)")
|
|
|
-
|
|
|
- if self.notification_manager:
|
|
|
- await self.notification_manager.send_generic_notification(
|
|
|
- f"🧹 <b>Cleanup Completed</b>\n\n"
|
|
|
- f"Cancelled {orphaned_count} orphaned stop loss order(s)\n"
|
|
|
- f"Reason: Parent orders invalid or positions closed externally\n"
|
|
|
- f"Time: {datetime.now().strftime('%H:%M:%S')}\n\n"
|
|
|
- f"💡 This ensures stop losses sync with actual orders/positions."
|
|
|
- )
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"❌ Error cleaning up orphaned stop losses: {e}", exc_info=True)
|
|
|
-
|
|
|
-
|
|
|
- async def _activate_pending_stop_losses_from_trades(self):
|
|
|
- """🆕 PHASE 4: Check trades table for pending stop loss activation first (highest priority)"""
|
|
|
- try:
|
|
|
- stats = self.trading_engine.get_stats()
|
|
|
- if not stats:
|
|
|
- return
|
|
|
-
|
|
|
- formatter = get_formatter()
|
|
|
- trades_needing_sl = stats.get_pending_stop_loss_activations()
|
|
|
-
|
|
|
- if not trades_needing_sl:
|
|
|
- return
|
|
|
-
|
|
|
- logger.debug(f"🆕 Found {len(trades_needing_sl)} open positions needing stop loss activation")
|
|
|
-
|
|
|
- for position_trade in trades_needing_sl:
|
|
|
- try:
|
|
|
- symbol = position_trade['symbol']
|
|
|
-
|
|
|
- token = symbol.split('/')[0] if symbol and '/' in symbol else (symbol if symbol else "TOKEN")
|
|
|
- stop_loss_price = position_trade['stop_loss_price']
|
|
|
- position_side = position_trade['position_side']
|
|
|
- current_amount = position_trade.get('current_position_size', 0)
|
|
|
- lifecycle_id = position_trade['trade_lifecycle_id']
|
|
|
-
|
|
|
- if not all([symbol, stop_loss_price, position_side, abs(current_amount) > 1e-9, lifecycle_id]):
|
|
|
- logger.warning(f"Skipping SL activation for lifecycle {lifecycle_id} due to incomplete data: sym={symbol}, sl_price={stop_loss_price}, side={position_side}, amt={current_amount}")
|
|
|
- continue
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- logger.info(f"Attempting to place LIMIT stop loss for lifecycle {lifecycle_id} ({position_side} {token} @ SL {formatter.format_price(stop_loss_price, symbol)})")
|
|
|
- sl_result = await self.trading_engine.place_limit_stop_for_lifecycle(
|
|
|
- lifecycle_id=lifecycle_id,
|
|
|
- symbol=symbol,
|
|
|
- sl_price=stop_loss_price,
|
|
|
- position_side=position_side,
|
|
|
- amount_to_cover=abs(current_amount)
|
|
|
- )
|
|
|
-
|
|
|
- if sl_result.get('success'):
|
|
|
- placed_sl_details = sl_result.get('order_placed_details', {})
|
|
|
- sl_exchange_order_id = placed_sl_details.get('exchange_order_id')
|
|
|
- sl_db_order_id = placed_sl_details.get('order_db_id')
|
|
|
- stop_loss_price_str_log = formatter.format_price_with_symbol(stop_loss_price, token)
|
|
|
-
|
|
|
- logger.info(f"✅ Successfully processed SL request for {token} (Lifecycle: {lifecycle_id[:8]}): SL Price {stop_loss_price_str_log}, Exchange SL Order ID: {sl_exchange_order_id or 'N/A'}, DB ID: {sl_db_order_id or 'N/A'}")
|
|
|
-
|
|
|
- if self.notification_manager and sl_exchange_order_id:
|
|
|
-
|
|
|
- current_price_for_notification = None
|
|
|
- try:
|
|
|
- market_data_notify = self.trading_engine.get_market_data(symbol)
|
|
|
- if market_data_notify and market_data_notify.get('ticker'):
|
|
|
- current_price_for_notification = float(market_data_notify['ticker'].get('last', 0))
|
|
|
- except:
|
|
|
- pass
|
|
|
-
|
|
|
- current_price_str_notify = formatter.format_price_with_symbol(current_price_for_notification, token) if current_price_for_notification else 'Unknown'
|
|
|
- stop_loss_price_str_notify = formatter.format_price_with_symbol(stop_loss_price, token)
|
|
|
-
|
|
|
- await self.notification_manager.send_generic_notification(
|
|
|
- f"🛡️ <b>Stop Loss LIMIT Order Placed</b>\n\n"
|
|
|
- f"Token: {token}\n"
|
|
|
- f"Lifecycle ID: {lifecycle_id[:8]}...\n"
|
|
|
- f"Position Type: {position_side.upper()}\n"
|
|
|
- f"Stop Loss Price: {stop_loss_price_str_notify}\n"
|
|
|
- f"Amount: {formatter.format_amount(abs(current_amount), token)}\n"
|
|
|
- f"Current Price: {current_price_str_notify}\n"
|
|
|
- f"Exchange SL Order ID: {sl_exchange_order_id}\n"
|
|
|
- f"Time: {datetime.now().strftime('%H:%M:%S')}"
|
|
|
- )
|
|
|
- elif not sl_exchange_order_id:
|
|
|
- logger.warning(f"SL Limit order for {token} (Lifecycle: {lifecycle_id[:8]}) placed in DB (ID: {sl_db_order_id}) but no exchange ID returned immediately.")
|
|
|
-
|
|
|
- else:
|
|
|
- logger.error(f"❌ Failed to place SL limit order for {token} (Lifecycle: {lifecycle_id[:8]}): {sl_result.get('error')}")
|
|
|
-
|
|
|
- except Exception as trade_error:
|
|
|
- logger.error(f"❌ Error processing position trade for SL activation (Lifecycle: {position_trade.get('trade_lifecycle_id','N/A')}): {trade_error}")
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"❌ Error activating pending stop losses from trades table: {e}", exc_info=True)
|
|
|
-
|
|
|
- async def _check_for_recent_fills_for_order(self, exchange_oid, order_in_db):
|
|
|
- """Check for very recent fills that might match this order."""
|
|
|
- try:
|
|
|
-
|
|
|
- recent_fills = self.trading_engine.get_recent_fills()
|
|
|
- if not recent_fills:
|
|
|
- return False
|
|
|
-
|
|
|
-
|
|
|
- if not hasattr(self, 'last_processed_trade_time') or self.last_processed_trade_time is None:
|
|
|
-
|
|
|
- try:
|
|
|
- last_time_str = self.trading_engine.stats._get_metadata('last_processed_trade_time')
|
|
|
- if last_time_str:
|
|
|
- self.last_processed_trade_time = datetime.fromisoformat(last_time_str)
|
|
|
- else:
|
|
|
- self.last_processed_trade_time = datetime.now(timezone.utc) - timedelta(hours=1)
|
|
|
- except Exception:
|
|
|
- self.last_processed_trade_time = datetime.now(timezone.utc) - timedelta(hours=1)
|
|
|
-
|
|
|
-
|
|
|
- for fill in recent_fills:
|
|
|
- try:
|
|
|
- trade_id = fill.get('id')
|
|
|
- timestamp_ms = fill.get('timestamp')
|
|
|
- symbol_from_fill = fill.get('symbol')
|
|
|
- side_from_fill = fill.get('side')
|
|
|
- amount_from_fill = float(fill.get('amount', 0))
|
|
|
- price_from_fill = float(fill.get('price', 0))
|
|
|
-
|
|
|
- timestamp_dt = datetime.fromtimestamp(timestamp_ms / 1000, tz=timezone.utc) if timestamp_ms else datetime.now(timezone.utc)
|
|
|
-
|
|
|
-
|
|
|
- if timestamp_dt <= self.last_processed_trade_time:
|
|
|
- continue
|
|
|
-
|
|
|
- if symbol_from_fill and side_from_fill and amount_from_fill > 0 and price_from_fill > 0:
|
|
|
- exchange_order_id_from_fill = fill.get('info', {}).get('oid')
|
|
|
-
|
|
|
- if exchange_order_id_from_fill == exchange_oid:
|
|
|
-
|
|
|
- if order_in_db.get('symbol') == symbol_from_fill and \
|
|
|
- order_in_db.get('side') == side_from_fill and \
|
|
|
- abs(float(order_in_db.get('amount_requested', 0)) - amount_from_fill) < 0.01 * amount_from_fill :
|
|
|
- logger.info(f"✅ Found recent matching fill {trade_id} for order {exchange_oid}. Not cancelling stop losses.")
|
|
|
- return True
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"Error processing fill {fill.get('id','N/A')} in _check_for_recent_fills_for_order: {e}")
|
|
|
- continue
|
|
|
-
|
|
|
- return False
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"❌ Error in _check_for_recent_fills_for_order for OID {exchange_oid}: {e}", exc_info=True)
|
|
|
- return False
|
|
|
-
|
|
|
- async def _check_external_stop_loss_orders(self):
|
|
|
- """Check for externally placed stop loss orders and track them."""
|
|
|
- try:
|
|
|
-
|
|
|
- open_orders = self.cached_orders or []
|
|
|
- if not open_orders:
|
|
|
- return
|
|
|
-
|
|
|
-
|
|
|
- positions = self.cached_positions or []
|
|
|
- if not positions:
|
|
|
- return
|
|
|
-
|
|
|
-
|
|
|
- position_map = {}
|
|
|
- for position in positions:
|
|
|
- symbol = position.get('symbol')
|
|
|
- contracts = float(position.get('contracts', 0))
|
|
|
- if symbol and contracts != 0:
|
|
|
- token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0]
|
|
|
- position_map[token] = {
|
|
|
- 'symbol': symbol,
|
|
|
- 'contracts': contracts,
|
|
|
- 'side': 'long' if contracts > 0 else 'short',
|
|
|
- 'entry_price': float(position.get('entryPx', 0))
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- newly_detected = 0
|
|
|
- for order in open_orders:
|
|
|
- try:
|
|
|
- exchange_order_id = order.get('id')
|
|
|
- symbol = order.get('symbol')
|
|
|
- side = order.get('side')
|
|
|
- amount = float(order.get('amount', 0))
|
|
|
- price = float(order.get('price', 0))
|
|
|
-
|
|
|
-
|
|
|
- if not all([exchange_order_id, symbol, side, amount, price]):
|
|
|
- continue
|
|
|
-
|
|
|
-
|
|
|
- if exchange_order_id in self.external_stop_losses:
|
|
|
- continue
|
|
|
-
|
|
|
- token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0]
|
|
|
-
|
|
|
- if token not in position_map:
|
|
|
- continue
|
|
|
-
|
|
|
- position_data = position_map[token]
|
|
|
-
|
|
|
- is_potential_stop_loss = False
|
|
|
-
|
|
|
- if position_data['side'] == 'long' and side == 'sell' and price < position_data['entry_price']:
|
|
|
- is_potential_stop_loss = True
|
|
|
-
|
|
|
- elif position_data['side'] == 'short' and side == 'buy' and price > position_data['entry_price']:
|
|
|
- is_potential_stop_loss = True
|
|
|
-
|
|
|
- if is_potential_stop_loss:
|
|
|
- self.external_stop_losses[exchange_order_id] = {
|
|
|
- 'token': token,
|
|
|
- 'symbol': symbol,
|
|
|
- 'trigger_price': price,
|
|
|
- 'side': side,
|
|
|
- 'amount': amount,
|
|
|
- 'position_side': position_data['side'],
|
|
|
- 'detected_at': datetime.now(timezone.utc),
|
|
|
- 'entry_price': position_data['entry_price']
|
|
|
- }
|
|
|
- newly_detected += 1
|
|
|
- logger.info(f"🛑 Detected potential external stop loss: {token} {side.upper()} {amount} @ ${price:.2f} (protecting {position_data['side'].upper()} position)")
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"Error analyzing order for stop loss detection: {e}")
|
|
|
- continue
|
|
|
-
|
|
|
- if newly_detected > 0:
|
|
|
- logger.info(f"🔍 Detected {newly_detected} new potential external stop loss orders")
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"❌ Error checking external stop loss orders: {e}")
|
|
|
-
|
|
|
- async def _cleanup_external_stop_loss_tracking(self):
|
|
|
- """Clean up external stop loss orders that are no longer active."""
|
|
|
- try:
|
|
|
- if not self.external_stop_losses:
|
|
|
- return
|
|
|
-
|
|
|
-
|
|
|
- open_orders = self.cached_orders or []
|
|
|
- if not open_orders:
|
|
|
- removed_count = len(self.external_stop_losses)
|
|
|
- if removed_count > 0:
|
|
|
- logger.info(f"🧹 Cleared {removed_count} external stop loss orders (no open orders on exchange)")
|
|
|
- self.external_stop_losses.clear()
|
|
|
- return
|
|
|
-
|
|
|
- current_order_ids = {order.get('id') for order in open_orders if order.get('id')}
|
|
|
-
|
|
|
- to_remove = [order_id for order_id in self.external_stop_losses if order_id not in current_order_ids]
|
|
|
-
|
|
|
- for order_id in to_remove:
|
|
|
- stop_loss_info = self.external_stop_losses.pop(order_id)
|
|
|
- logger.info(f"🗑️ Removed external stop loss tracking for {stop_loss_info['token']} order {order_id} (no longer open)")
|
|
|
-
|
|
|
- if to_remove:
|
|
|
- logger.info(f"🧹 Cleaned up {len(to_remove)} disappeared external stop loss orders from tracking")
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"❌ Error cleaning up external stop loss tracking: {e}")
|
|
|
+ try:
|
|
|
+
|
|
|
+ current_size_from_ex = ex_pos.get('contracts')
|
|
|
+ current_position_size = float(current_size_from_ex) if current_size_from_ex is not None else None
|
|
|
|
|
|
- async def _auto_sync_orphaned_positions(self):
|
|
|
- """Automatically detect and sync orphaned positions (positions on exchange without trade lifecycle records)."""
|
|
|
- try:
|
|
|
- stats = self.trading_engine.get_stats()
|
|
|
- if not stats:
|
|
|
- return
|
|
|
+ entry_price_from_ex = ex_pos.get('entryPrice') or ex_pos.get('entryPx')
|
|
|
+ entry_price = float(entry_price_from_ex) if entry_price_from_ex is not None else None
|
|
|
|
|
|
- formatter = get_formatter()
|
|
|
+ mark_price_from_ex = ex_pos.get('markPrice') or ex_pos.get('markPx')
|
|
|
+ mark_price = float(mark_price_from_ex) if mark_price_from_ex is not None else None
|
|
|
|
|
|
- exchange_positions = self.cached_positions or []
|
|
|
- synced_count = 0
|
|
|
+ unrealized_pnl_from_ex = ex_pos.get('unrealizedPnl')
|
|
|
+ unrealized_pnl = float(unrealized_pnl_from_ex) if unrealized_pnl_from_ex is not None else None
|
|
|
|
|
|
- for exchange_pos in exchange_positions:
|
|
|
- symbol = exchange_pos.get('symbol')
|
|
|
- contracts_abs = abs(float(exchange_pos.get('contracts', 0)))
|
|
|
-
|
|
|
- if not (symbol and contracts_abs > 1e-9):
|
|
|
- continue
|
|
|
+ liquidation_price_from_ex = ex_pos.get('liquidationPrice')
|
|
|
+ liquidation_price = float(liquidation_price_from_ex) if liquidation_price_from_ex is not None else None
|
|
|
|
|
|
-
|
|
|
-
|
|
|
- existing_trade = stats.get_trade_by_symbol_and_status(symbol, 'position_opened')
|
|
|
-
|
|
|
- if not existing_trade:
|
|
|
- entry_price_from_exchange = float(exchange_pos.get('entryPrice', 0)) or float(exchange_pos.get('entryPx', 0))
|
|
|
-
|
|
|
- position_side, order_side = '', ''
|
|
|
- ccxt_side = exchange_pos.get('side', '').lower()
|
|
|
- if ccxt_side == 'long': position_side, order_side = 'long', 'buy'
|
|
|
- elif ccxt_side == 'short': position_side, order_side = 'short', 'sell'
|
|
|
-
|
|
|
- if not position_side:
|
|
|
- raw_info = exchange_pos.get('info', {}).get('position', {})
|
|
|
- if isinstance(raw_info, dict):
|
|
|
- szi_str = raw_info.get('szi')
|
|
|
- if szi_str is not None:
|
|
|
- try: szi_val = float(szi_str)
|
|
|
- except ValueError: szi_val = 0
|
|
|
- if szi_val > 1e-9: position_side, order_side = 'long', 'buy'
|
|
|
- elif szi_val < -1e-9: position_side, order_side = 'short', 'sell'
|
|
|
-
|
|
|
- if not position_side:
|
|
|
- contracts_val = float(exchange_pos.get('contracts',0))
|
|
|
- if contracts_val > 1e-9: position_side, order_side = 'long', 'buy'
|
|
|
- elif contracts_val < -1e-9: position_side, order_side = 'short', 'sell'
|
|
|
- else:
|
|
|
- logger.warning(f"AUTO-SYNC: Position size is effectively 0 for {symbol} after side checks, skipping sync. Data: {exchange_pos}")
|
|
|
- continue
|
|
|
-
|
|
|
- if not position_side:
|
|
|
- logger.error(f"AUTO-SYNC: CRITICAL - Could not determine position side for {symbol}. Data: {exchange_pos}. Skipping.")
|
|
|
- continue
|
|
|
+ margin_used_from_ex = ex_pos.get('marginUsed')
|
|
|
+ margin_used = float(margin_used_from_ex) if margin_used_from_ex is not None else None
|
|
|
|
|
|
- token = symbol.split('/')[0] if '/' in symbol else symbol
|
|
|
- actual_contracts_size = contracts_abs
|
|
|
+ leverage_from_ex = ex_pos.get('leverage')
|
|
|
+ leverage = float(leverage_from_ex) if leverage_from_ex is not None else None
|
|
|
|
|
|
- final_entry_price = entry_price_from_exchange
|
|
|
- price_source_log = "(exchange data)"
|
|
|
- if not final_entry_price or final_entry_price <= 0:
|
|
|
- estimated_entry_price = await self._estimate_entry_price_for_orphaned_position(symbol, actual_contracts_size, position_side)
|
|
|
- if estimated_entry_price > 0:
|
|
|
- final_entry_price = estimated_entry_price
|
|
|
- price_source_log = "(estimated)"
|
|
|
- else:
|
|
|
- logger.error(f"AUTO-SYNC: Could not determine/estimate entry price for {symbol}. Skipping sync.")
|
|
|
- continue
|
|
|
-
|
|
|
- logger.info(f"🔄 AUTO-SYNC: Orphaned position detected - {symbol} {position_side.upper()} {actual_contracts_size} @ ${final_entry_price:.4f} {price_source_log}")
|
|
|
-
|
|
|
- lifecycle_id = stats.create_trade_lifecycle(
|
|
|
- symbol=symbol, side=order_side,
|
|
|
- entry_order_id=f"external_sync_{int(datetime.now().timestamp())}",
|
|
|
- trade_type='external_sync'
|
|
|
- )
|
|
|
-
|
|
|
- if lifecycle_id:
|
|
|
- success = stats.update_trade_position_opened(
|
|
|
- lifecycle_id, final_entry_price, actual_contracts_size,
|
|
|
- f"external_fill_sync_{int(datetime.now().timestamp())}"
|
|
|
- )
|
|
|
-
|
|
|
- if success:
|
|
|
- synced_count += 1
|
|
|
- logger.info(f"✅ AUTO-SYNC: Successfully synced orphaned position for {symbol} (Lifecycle: {lifecycle_id[:8]}).")
|
|
|
+ position_value_from_ex = ex_pos.get('notional')
|
|
|
+ position_value = float(position_value_from_ex) if position_value_from_ex is not None else None
|
|
|
|
|
|
- if self.notification_manager:
|
|
|
- unrealized_pnl = float(exchange_pos.get('unrealizedPnl', 0))
|
|
|
- pnl_emoji = "🟢" if unrealized_pnl >= 0 else "🔴"
|
|
|
- notification_text = (
|
|
|
- f"🔄 <b>Position Auto-Synced</b>\n\n"
|
|
|
- f"Token: {token}\n"
|
|
|
- f"Lifecycle ID: {lifecycle_id[:8]}...\n"
|
|
|
- f"Direction: {position_side.upper()}\n"
|
|
|
- f"Size: {actual_contracts_size:.6f} {token}\n"
|
|
|
- f"Entry Price: ${final_entry_price:,.4f} {price_source_log}\n"
|
|
|
- f"{pnl_emoji} P&L (Unrealized): ${unrealized_pnl:,.2f}\n"
|
|
|
- f"Reason: Position found on exchange without bot record.\n"
|
|
|
- f"Time: {datetime.now().strftime('%H:%M:%S')}\n\n"
|
|
|
- f"✅ Position now tracked. Use /sl or /tp if needed."
|
|
|
- )
|
|
|
- await self.notification_manager.send_generic_notification(notification_text)
|
|
|
- else:
|
|
|
- logger.error(f"❌ AUTO-SYNC: Failed to update lifecycle to 'position_opened' for {symbol} (Lifecycle: {lifecycle_id[:8]}).")
|
|
|
- else:
|
|
|
- logger.error(f"❌ AUTO-SYNC: Failed to create lifecycle for orphaned position {symbol}.")
|
|
|
-
|
|
|
- if synced_count > 0:
|
|
|
- logger.info(f"🔄 AUTO-SYNC: Synced {synced_count} orphaned position(s) this cycle (Exchange had position, Bot did not).")
|
|
|
-
|
|
|
-
|
|
|
- bot_open_lifecycles = stats.get_trades_by_status('position_opened')
|
|
|
- if not bot_open_lifecycles:
|
|
|
- return
|
|
|
-
|
|
|
-
|
|
|
- current_exchange_positions_map = {}
|
|
|
- for ex_pos in (self.cached_positions or []):
|
|
|
- if ex_pos.get('symbol') and abs(float(ex_pos.get('contracts', 0))) > 1e-9:
|
|
|
- current_exchange_positions_map[ex_pos.get('symbol')] = ex_pos
|
|
|
-
|
|
|
- closed_due_to_discrepancy = 0
|
|
|
- for lc in bot_open_lifecycles:
|
|
|
- symbol = lc.get('symbol')
|
|
|
- lc_id = lc.get('trade_lifecycle_id')
|
|
|
- token = symbol.split('/')[0] if '/' in symbol else symbol
|
|
|
-
|
|
|
- if symbol not in current_exchange_positions_map:
|
|
|
-
|
|
|
- logger.warning(f"🔄 AUTO-SYNC (Discrepancy): Bot lifecycle {lc_id} for {symbol} is 'position_opened', but NO position found on exchange. Closing lifecycle.")
|
|
|
-
|
|
|
- entry_price = lc.get('entry_price', 0)
|
|
|
- position_side = lc.get('position_side')
|
|
|
- position_size_for_pnl = lc.get('current_position_size', 0)
|
|
|
- exit_price_for_calc = 0
|
|
|
- price_source_info = "unknown"
|
|
|
-
|
|
|
-
|
|
|
- try:
|
|
|
-
|
|
|
- all_recent_fills = self.trading_engine.get_recent_fills()
|
|
|
- if all_recent_fills:
|
|
|
- symbol_specific_fills = [f for f in all_recent_fills if f.get('symbol') == symbol]
|
|
|
- if symbol_specific_fills:
|
|
|
- closing_side = 'sell' if position_side == 'long' else 'buy'
|
|
|
- relevant_fills = sorted(
|
|
|
- [f for f in symbol_specific_fills if f.get('side') == closing_side],
|
|
|
- key=lambda f: f.get('timestamp'), reverse=True
|
|
|
- )
|
|
|
- if relevant_fills:
|
|
|
- last_closing_fill = relevant_fills[0]
|
|
|
- exit_price_for_calc = float(last_closing_fill.get('price', 0))
|
|
|
- fill_timestamp = datetime.fromtimestamp(last_closing_fill.get('timestamp')/1000, tz=timezone.utc).isoformat() if last_closing_fill.get('timestamp') else "N/A"
|
|
|
- price_source_info = f"last exchange fill ({formatter.format_price(exit_price_for_calc, symbol)} @ {fill_timestamp})"
|
|
|
- logger.info(f"AUTO-SYNC: Using exit price {price_source_info} for {symbol} lifecycle {lc_id}.")
|
|
|
- except Exception as e:
|
|
|
- logger.warning(f"AUTO-SYNC: Error fetching recent fills for {symbol} to determine exit price: {e}")
|
|
|
-
|
|
|
- if not exit_price_for_calc or exit_price_for_calc <= 0:
|
|
|
-
|
|
|
- mark_price_from_lc = lc.get('mark_price')
|
|
|
- if mark_price_from_lc and float(mark_price_from_lc) > 0:
|
|
|
- exit_price_for_calc = float(mark_price_from_lc)
|
|
|
- price_source_info = "lifecycle mark_price"
|
|
|
- logger.info(f"AUTO-SYNC: No recent fill found. Using exit price from lifecycle mark_price: {formatter.format_price(exit_price_for_calc, symbol)} for {symbol} lifecycle {lc_id}.")
|
|
|
- else:
|
|
|
-
|
|
|
- exit_price_for_calc = entry_price
|
|
|
- price_source_info = "lifecycle entry_price (0 PNL)"
|
|
|
- logger.info(f"AUTO-SYNC: No recent fill or mark_price. Using entry_price: {formatter.format_price(exit_price_for_calc, symbol)} for {symbol} lifecycle {lc_id}.")
|
|
|
-
|
|
|
- realized_pnl = 0
|
|
|
- if position_side == 'long':
|
|
|
- realized_pnl = position_size_for_pnl * (exit_price_for_calc - entry_price)
|
|
|
- elif position_side == 'short':
|
|
|
- realized_pnl = position_size_for_pnl * (entry_price - exit_price_for_calc)
|
|
|
-
|
|
|
- success = stats.update_trade_position_closed(
|
|
|
- lifecycle_id=lc_id,
|
|
|
- exit_price=exit_price_for_calc,
|
|
|
- realized_pnl=realized_pnl,
|
|
|
- exchange_fill_id=f"auto_sync_flat_{int(datetime.now().timestamp())}"
|
|
|
- )
|
|
|
-
|
|
|
- if success:
|
|
|
- closed_due_to_discrepancy += 1
|
|
|
- logger.info(f"✅ AUTO-SYNC (Discrepancy): Successfully closed bot lifecycle {lc_id} for {symbol}.")
|
|
|
-
|
|
|
- stats._migrate_trade_to_aggregated_stats(lc_id)
|
|
|
- if self.notification_manager:
|
|
|
- pnl_emoji = "🟢" if realized_pnl >= 0 else "🔴"
|
|
|
-
|
|
|
- notification_text = (
|
|
|
- f"🔄 <b>Position Auto-Closed (Discrepancy)</b>\n\n"
|
|
|
- f"Token: {token}\n"
|
|
|
- f"Lifecycle ID: {lc_id[:8]}...\n"
|
|
|
- f"Reason: Bot showed open position, but no corresponding position found on exchange.\n"
|
|
|
- f"Assumed Exit Price: {formatter.format_price(exit_price_for_calc, symbol)}\n"
|
|
|
- f"{pnl_emoji} Realized P&L for this closure: {formatter.format_price_with_symbol(realized_pnl)}\n"
|
|
|
- f"Time: {datetime.now().strftime('%H:%M:%S')}\n\n"
|
|
|
- f"ℹ️ Bot state synchronized with exchange."
|
|
|
+ if position_value is None and mark_price is not None and current_position_size is not None:
|
|
|
+ position_value = abs(current_position_size) * mark_price
|
|
|
+
|
|
|
+ roe_from_ex = ex_pos.get('percentage')
|
|
|
+ unrealized_pnl_percentage_val = float(roe_from_ex) if roe_from_ex is not None else None
|
|
|
+
|
|
|
+ stats.update_trade_market_data(
|
|
|
+ trade_lifecycle_id=lifecycle_id,
|
|
|
+ unrealized_pnl=unrealized_pnl,
|
|
|
+ mark_price=mark_price,
|
|
|
+ current_position_size=current_position_size,
|
|
|
+ entry_price=entry_price,
|
|
|
+ liquidation_price=liquidation_price,
|
|
|
+ margin_used=margin_used,
|
|
|
+ leverage=leverage,
|
|
|
+ position_value=position_value,
|
|
|
+ unrealized_pnl_percentage=unrealized_pnl_percentage_val
|
|
|
)
|
|
|
- await self.notification_manager.send_generic_notification(notification_text)
|
|
|
+ except (ValueError, TypeError) as e:
|
|
|
+ logger.warning(f"Could not parse full market data for {symbol} (Lifecycle: {lifecycle_id}) from {ex_pos}: {e}")
|
|
|
else:
|
|
|
- logger.error(f"❌ AUTO-SYNC (Discrepancy): Failed to close bot lifecycle {lc_id} for {symbol}.")
|
|
|
-
|
|
|
- if closed_due_to_discrepancy > 0:
|
|
|
- logger.info(f"🔄 AUTO-SYNC: Closed {closed_due_to_discrepancy} lifecycle(s) due to discrepancy (Bot had position, Exchange did not).")
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"❌ Error in auto-sync orphaned positions: {e}", exc_info=True)
|
|
|
-
|
|
|
- async def _estimate_entry_price_for_orphaned_position(self, symbol: str, contracts: float, side: str) -> float:
|
|
|
- """Estimate entry price for an orphaned position by checking recent fills and market data."""
|
|
|
- try:
|
|
|
- entry_fill_side = 'buy' if side == 'long' else 'sell'
|
|
|
- formatter = get_formatter()
|
|
|
- token = symbol.split('/')[0] if '/' in symbol else symbol
|
|
|
- all_recent_fills = self.trading_engine.get_recent_fills()
|
|
|
- recent_fills = [f for f in all_recent_fills if f.get('symbol') == symbol]
|
|
|
-
|
|
|
- if recent_fills:
|
|
|
- symbol_side_fills = [
|
|
|
- fill for fill in recent_fills
|
|
|
- if fill.get('symbol') == symbol and fill.get('side') == entry_fill_side and float(fill.get('amount',0)) > 0
|
|
|
- ]
|
|
|
- if symbol_side_fills:
|
|
|
- symbol_side_fills.sort(key=lambda f: (
|
|
|
- datetime.fromtimestamp(f.get('timestamp') / 1000, tz=timezone.utc) if f.get('timestamp') else datetime.min.replace(tzinfo=timezone.utc),
|
|
|
- abs(float(f.get('amount',0)) - contracts)
|
|
|
- ), reverse=True)
|
|
|
-
|
|
|
- best_fill = symbol_side_fills[0]
|
|
|
- fill_price = float(best_fill.get('price', 0))
|
|
|
- fill_amount = float(best_fill.get('amount', 0))
|
|
|
- if fill_price > 0:
|
|
|
- logger.info(f"💡 AUTO-SYNC: Estimated entry for {side} {symbol} via recent {entry_fill_side} fill: {formatter.format_price_with_symbol(fill_price, token)} (Amount: {formatter.format_amount(fill_amount, token)})")
|
|
|
- return fill_price
|
|
|
-
|
|
|
- market_data = self.trading_engine.get_market_data(symbol)
|
|
|
- if market_data and market_data.get('ticker'):
|
|
|
- current_price = float(market_data['ticker'].get('last', 0))
|
|
|
- if current_price > 0:
|
|
|
- logger.warning(f"⚠️ AUTO-SYNC: Using current market price as entry estimate for {side} {symbol}: {formatter.format_price_with_symbol(current_price, token)}")
|
|
|
- return current_price
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ logger.debug(f"No 'position_opened' lifecycle found for symbol {symbol} during _update_cached_data. Orphan sync should handle it.")
|
|
|
|
|
|
- if market_data and market_data.get('ticker'):
|
|
|
- bid = float(market_data['ticker'].get('bid', 0))
|
|
|
- ask = float(market_data['ticker'].get('ask', 0))
|
|
|
- if bid > 0 and ask > 0: return (bid + ask) / 2
|
|
|
-
|
|
|
- logger.warning(f"AUTO-SYNC: Could not estimate entry price for {side} {symbol} through any method.")
|
|
|
- return 0.0
|
|
|
except Exception as e:
|
|
|
- logger.error(f"❌ Error estimating entry price for orphaned position {symbol}: {e}", exc_info=True)
|
|
|
- return 0.0
|
|
|
-
|
|
|
- async def _immediate_startup_auto_sync(self):
|
|
|
- """🆕 Immediately check for and sync orphaned positions on startup."""
|
|
|
- try:
|
|
|
- logger.info("🔍 STARTUP: Checking for orphaned positions...")
|
|
|
- stats = self.trading_engine.get_stats()
|
|
|
- if not stats:
|
|
|
- logger.warning("⚠️ STARTUP: TradingStats not available for auto-sync.")
|
|
|
- return
|
|
|
-
|
|
|
- formatter = get_formatter()
|
|
|
- exchange_positions = self.trading_engine.get_positions() or []
|
|
|
- if not exchange_positions:
|
|
|
- logger.info("✅ STARTUP: No positions found on exchange.")
|
|
|
- return
|
|
|
-
|
|
|
- synced_count = 0
|
|
|
- for exchange_pos in exchange_positions:
|
|
|
- symbol = exchange_pos.get('symbol')
|
|
|
- contracts_abs = abs(float(exchange_pos.get('contracts', 0)))
|
|
|
- token_for_log = symbol.split('/')[0] if symbol and '/' in symbol else symbol
|
|
|
-
|
|
|
- if not (symbol and contracts_abs > 1e-9): continue
|
|
|
-
|
|
|
- existing_trade_lc = stats.get_trade_by_symbol_and_status(symbol, 'position_opened')
|
|
|
- if not existing_trade_lc:
|
|
|
- position_side, order_side = '', ''
|
|
|
- ccxt_side = exchange_pos.get('side', '').lower()
|
|
|
- if ccxt_side == 'long': position_side, order_side = 'long', 'buy'
|
|
|
- elif ccxt_side == 'short': position_side, order_side = 'short', 'sell'
|
|
|
-
|
|
|
- if not position_side:
|
|
|
- raw_info = exchange_pos.get('info', {}).get('position', {})
|
|
|
- if isinstance(raw_info, dict):
|
|
|
- szi_str = raw_info.get('szi')
|
|
|
- if szi_str is not None:
|
|
|
- try: szi_val = float(szi_str)
|
|
|
- except ValueError: szi_val = 0
|
|
|
- if szi_val > 1e-9: position_side, order_side = 'long', 'buy'
|
|
|
- elif szi_val < -1e-9: position_side, order_side = 'short', 'sell'
|
|
|
-
|
|
|
- if not position_side:
|
|
|
- contracts_val = float(exchange_pos.get('contracts',0))
|
|
|
- if contracts_val > 1e-9: position_side, order_side = 'long', 'buy'
|
|
|
- elif contracts_val < -1e-9: position_side, order_side = 'short', 'sell'
|
|
|
- else:
|
|
|
- logger.warning(f"AUTO-SYNC: Position size is effectively 0 for {symbol} after side checks, skipping sync. Data: {exchange_pos}")
|
|
|
- continue
|
|
|
-
|
|
|
- if not position_side:
|
|
|
- logger.error(f"AUTO-SYNC: CRITICAL - Could not determine position side for {symbol}. Data: {exchange_pos}. Skipping.")
|
|
|
- continue
|
|
|
-
|
|
|
- entry_price = float(exchange_pos.get('entryPrice', 0)) or float(exchange_pos.get('entryPx', 0))
|
|
|
- price_source_log = "(exchange data)"
|
|
|
- if not entry_price or entry_price <= 0:
|
|
|
- estimated_price = await self._estimate_entry_price_for_orphaned_position(symbol, contracts_abs, position_side)
|
|
|
- if estimated_price > 0:
|
|
|
- entry_price = estimated_price
|
|
|
- price_source_log = "(estimated)"
|
|
|
- else:
|
|
|
- logger.error(f"AUTO-SYNC: Could not determine/estimate entry price for {symbol}. Skipping sync.")
|
|
|
- continue
|
|
|
-
|
|
|
- logger.info(f"🔄 STARTUP: Auto-syncing orphaned position: {symbol} {position_side.upper()} {formatter.format_amount(contracts_abs, token_for_log)} @ {formatter.format_price_with_symbol(entry_price, token_for_log)} {price_source_log}")
|
|
|
-
|
|
|
- lifecycle_id = stats.create_trade_lifecycle(
|
|
|
- symbol=symbol, side=order_side,
|
|
|
- entry_order_id=f"startup_sync_{int(datetime.now().timestamp())}",
|
|
|
- trade_type='external_startup_sync'
|
|
|
- )
|
|
|
-
|
|
|
- if lifecycle_id:
|
|
|
- success = stats.update_trade_position_opened(
|
|
|
- lifecycle_id, entry_price, contracts_abs,
|
|
|
- f"startup_fill_sync_{int(datetime.now().timestamp())}"
|
|
|
- )
|
|
|
- if success:
|
|
|
- synced_count += 1
|
|
|
- logger.info(f"✅ STARTUP: Successfully synced orphaned position for {symbol} (Lifecycle: {lifecycle_id[:8]}).")
|
|
|
- await self._send_startup_auto_sync_notification(exchange_pos, symbol, position_side, contracts_abs, entry_price, lifecycle_id, price_source_log)
|
|
|
- else: logger.error(f"❌ STARTUP: Failed to update lifecycle for {symbol} (Lifecycle: {lifecycle_id[:8]}).")
|
|
|
- else: logger.error(f"❌ STARTUP: Failed to create lifecycle for {symbol}.")
|
|
|
-
|
|
|
- if synced_count == 0 and exchange_positions:
|
|
|
- logger.info("✅ STARTUP: All existing exchange positions are already tracked.")
|
|
|
- elif synced_count > 0:
|
|
|
- logger.info(f"🎉 STARTUP: Auto-synced {synced_count} orphaned position(s) (Exchange had pos, Bot did not).")
|
|
|
-
|
|
|
-
|
|
|
- logger.info("🔍 STARTUP: Checking for discrepancies (Bot has pos, Exchange does not)...")
|
|
|
- bot_open_lifecycles = stats.get_trades_by_status('position_opened')
|
|
|
-
|
|
|
-
|
|
|
- current_exchange_positions_map = {}
|
|
|
- for ex_pos in (exchange_positions or []):
|
|
|
- if ex_pos.get('symbol') and abs(float(ex_pos.get('contracts', 0))) > 1e-9:
|
|
|
- current_exchange_positions_map[ex_pos.get('symbol')] = ex_pos
|
|
|
-
|
|
|
- closed_due_to_discrepancy_startup = 0
|
|
|
- if bot_open_lifecycles:
|
|
|
- for lc in bot_open_lifecycles:
|
|
|
- symbol = lc.get('symbol')
|
|
|
- lc_id = lc.get('trade_lifecycle_id')
|
|
|
- token_for_log_discrepancy = symbol.split('/')[0] if symbol and '/' in symbol else symbol
|
|
|
-
|
|
|
- if symbol not in current_exchange_positions_map:
|
|
|
- logger.warning(f"🔄 STARTUP (Discrepancy): Bot lifecycle {lc_id} for {symbol} is 'position_opened', but NO position found on exchange. Closing lifecycle.")
|
|
|
-
|
|
|
- entry_price = lc.get('entry_price', 0)
|
|
|
- position_side = lc.get('position_side')
|
|
|
- position_size_for_pnl = lc.get('current_position_size', 0)
|
|
|
- exit_price_for_calc = 0
|
|
|
- price_source_info = "unknown"
|
|
|
-
|
|
|
- try:
|
|
|
-
|
|
|
- all_recent_fills_for_startup_sync = self.trading_engine.get_recent_fills()
|
|
|
- if all_recent_fills_for_startup_sync:
|
|
|
- symbol_specific_fills_startup = [f for f in all_recent_fills_for_startup_sync if f.get('symbol') == symbol]
|
|
|
- if symbol_specific_fills_startup:
|
|
|
- closing_side = 'sell' if position_side == 'long' else 'buy'
|
|
|
- relevant_fills = sorted(
|
|
|
- [f for f in symbol_specific_fills_startup if f.get('side') == closing_side],
|
|
|
- key=lambda f: f.get('timestamp'), reverse=True
|
|
|
- )
|
|
|
- if relevant_fills:
|
|
|
- last_closing_fill = relevant_fills[0]
|
|
|
- exit_price_for_calc = float(last_closing_fill.get('price', 0))
|
|
|
- fill_ts_val = last_closing_fill.get('timestamp')
|
|
|
- fill_timestamp_str = datetime.fromtimestamp(fill_ts_val/1000, tz=timezone.utc).isoformat() if fill_ts_val else "N/A"
|
|
|
- price_source_info = f"last exchange fill ({formatter.format_price(exit_price_for_calc, symbol)} @ {fill_timestamp_str})"
|
|
|
- logger.info(f"STARTUP SYNC: Using exit price {price_source_info} for {symbol} lifecycle {lc_id}.")
|
|
|
- except Exception as e:
|
|
|
- logger.warning(f"STARTUP SYNC: Error fetching recent fills for {symbol} to determine exit price: {e}")
|
|
|
-
|
|
|
- if not exit_price_for_calc or exit_price_for_calc <= 0:
|
|
|
- mark_price_from_lc = lc.get('mark_price')
|
|
|
- if mark_price_from_lc and float(mark_price_from_lc) > 0:
|
|
|
- exit_price_for_calc = float(mark_price_from_lc)
|
|
|
- price_source_info = "lifecycle mark_price"
|
|
|
- else:
|
|
|
- exit_price_for_calc = entry_price
|
|
|
- price_source_info = "lifecycle entry_price (0 PNL)"
|
|
|
-
|
|
|
- realized_pnl = 0
|
|
|
- if position_side == 'long':
|
|
|
- realized_pnl = position_size_for_pnl * (exit_price_for_calc - entry_price)
|
|
|
- elif position_side == 'short':
|
|
|
- realized_pnl = position_size_for_pnl * (entry_price - exit_price_for_calc)
|
|
|
-
|
|
|
- success_close = stats.update_trade_position_closed(
|
|
|
- lifecycle_id=lc_id,
|
|
|
- exit_price=exit_price_for_calc,
|
|
|
- realized_pnl=realized_pnl,
|
|
|
- exchange_fill_id=f"startup_sync_flat_{int(datetime.now().timestamp())}"
|
|
|
- )
|
|
|
-
|
|
|
- if success_close:
|
|
|
- closed_due_to_discrepancy_startup += 1
|
|
|
- logger.info(f"✅ STARTUP (Discrepancy): Successfully closed bot lifecycle {lc_id} for {symbol}.")
|
|
|
-
|
|
|
- stats._migrate_trade_to_aggregated_stats(lc_id)
|
|
|
- if self.notification_manager:
|
|
|
- pnl_emoji = "🟢" if realized_pnl >= 0 else "🔴"
|
|
|
- notification_text = (
|
|
|
- f"🔄 <b>Position Auto-Closed (Startup Sync)</b>\n\n"
|
|
|
- f"Token: {token_for_log_discrepancy}\n"
|
|
|
- f"Lifecycle ID: {lc_id[:8]}...\n"
|
|
|
- f"Reason: Bot startup - found open lifecycle, but no corresponding position on exchange.\n"
|
|
|
- f"Assumed Exit Price: {formatter.format_price(exit_price_for_calc, symbol)} (Source: {price_source_info})\n"
|
|
|
- f"{pnl_emoji} Realized P&L: {formatter.format_price_with_symbol(realized_pnl)}\n"
|
|
|
- f"Time: {datetime.now().strftime('%H:%M:%S')}"
|
|
|
- )
|
|
|
- await self.notification_manager.send_generic_notification(notification_text)
|
|
|
- else:
|
|
|
- logger.error(f"❌ STARTUP (Discrepancy): Failed to close bot lifecycle {lc_id} for {symbol}.")
|
|
|
-
|
|
|
- if closed_due_to_discrepancy_startup > 0:
|
|
|
- logger.info(f"🎉 STARTUP: Auto-closed {closed_due_to_discrepancy_startup} lifecycle(s) due to discrepancy (Bot had pos, Exchange did not).")
|
|
|
- else:
|
|
|
- logger.info("✅ STARTUP: No discrepancies found where bot had position and exchange did not.")
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"❌ Error in startup auto-sync: {e}", exc_info=True)
|
|
|
-
|
|
|
- async def _send_startup_auto_sync_notification(self, exchange_pos, symbol, position_side, contracts, entry_price, lifecycle_id, price_source_log):
|
|
|
- """Send notification for positions auto-synced on startup."""
|
|
|
- try:
|
|
|
- if not self.notification_manager: return
|
|
|
-
|
|
|
- formatter = get_formatter()
|
|
|
- token = symbol.split('/')[0] if '/' in symbol else symbol
|
|
|
- unrealized_pnl = float(exchange_pos.get('unrealizedPnl', 0))
|
|
|
- pnl_emoji = "🟢" if unrealized_pnl >= 0 else "🔴"
|
|
|
-
|
|
|
- size_str = formatter.format_amount(contracts, token)
|
|
|
- entry_price_str = formatter.format_price_with_symbol(entry_price, token)
|
|
|
- pnl_str = formatter.format_price_with_symbol(unrealized_pnl)
|
|
|
-
|
|
|
- notification_text_parts = [
|
|
|
- f"🚨 <b>Bot Startup: Position Auto-Synced</b>\n",
|
|
|
- f"Token: {token}",
|
|
|
- f"Lifecycle ID: {lifecycle_id[:8]}...",
|
|
|
- f"Direction: {position_side.upper()}",
|
|
|
- f"Size: {size_str} {token}",
|
|
|
- f"Entry Price: {entry_price_str} {price_source_log}",
|
|
|
- f"{pnl_emoji} P&L (Unrealized): {pnl_str}",
|
|
|
- f"Reason: Position found on exchange without bot record.",
|
|
|
-
|
|
|
- "\n✅ Position now tracked. Use /sl or /tp if needed."
|
|
|
- ]
|
|
|
-
|
|
|
- liq_price = float(exchange_pos.get('liquidationPrice', 0))
|
|
|
- if liq_price > 0:
|
|
|
- liq_price_str = formatter.format_price_with_symbol(liq_price, token)
|
|
|
- notification_text_parts.append(f"⚠️ Liquidation: {liq_price_str}")
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- await self.notification_manager.send_generic_notification("\n".join(notification_text_parts))
|
|
|
- logger.info(f"📤 STARTUP: Sent auto-sync notification for {symbol} (Lifecycle: {lifecycle_id[:8]}).")
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"❌ STARTUP: Failed to send auto-sync notification for {symbol}: {e}")
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
+ logger.error(f"❌ Error updating cached data: {e}", exc_info=True)
|
|
|
+
|
|
|
+
|
|
|
+ def get_cached_positions(self) -> List[Dict[str, Any]]:
|
|
|
+ return self.cache.cached_positions
|
|
|
+
|
|
|
+ def get_cached_orders(self) -> List[Dict[str, Any]]:
|
|
|
+ return self.cache.cached_orders
|
|
|
+
|
|
|
+ def get_cached_balance(self) -> Optional[Dict[str, Any]]:
|
|
|
+ return self.cache.cached_balance
|
|
|
+
|
|
|
+ def get_cache_age_seconds(self) -> Optional[float]:
|
|
|
+ if self.cache.last_cache_update:
|
|
|
+ return (datetime.now(timezone.utc) - self.cache.last_cache_update).total_seconds()
|
|
|
+ return None
|
|
|
+
|
|
|
+
|
|
|
+ def add_price_alarm(self, token: str, target_price: float, direction: str, user_id: int) -> Optional[int]:
|
|
|
+ alarm_id = self.alarm_manager.add_alarm(token, target_price, direction, user_id)
|
|
|
+ if alarm_id:
|
|
|
+ logger.info(f"Price alarm added: ID {alarm_id} for {token} {direction} ${target_price}")
|
|
|
+
|
|
|
+ return alarm_id
|
|
|
+
|
|
|
+ def remove_price_alarm(self, alarm_id: int, user_id: int) -> bool:
|
|
|
+ removed = self.alarm_manager.remove_alarm(alarm_id, user_id)
|
|
|
+ if removed:
|
|
|
+ logger.info(f"Price alarm {alarm_id} removed by user {user_id}")
|
|
|
+
|
|
|
+ return removed
|
|
|
+
|
|
|
+ def get_user_alarms(self, user_id: int) -> List[Dict[str, Any]]:
|
|
|
+ return self.alarm_manager.get_alarms_by_user(user_id)
|
|
|
+
|
|
|
+ def get_all_active_alarms(self) -> List[Dict[str, Any]]:
|
|
|
+ return self.alarm_manager.get_all_active_alarms()
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|