1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675 |
- #!/usr/bin/env python3
- """
- Market Monitor - Handles external trade monitoring and heartbeat functionality.
- """
- import logging
- import asyncio
- from datetime import datetime, timedelta, timezone
- from typing import Optional, Dict, Any, List
- import os
- import json
- from src.config.config import Config
- from src.monitoring.alarm_manager import AlarmManager
- logger = logging.getLogger(__name__)
- class MarketMonitor:
- """Handles external trade monitoring and market events."""
-
- def __init__(self, trading_engine, notification_manager=None):
- """Initialize the market monitor."""
- self.trading_engine = trading_engine
- self.notification_manager = notification_manager
- self.client = trading_engine.client
- self._last_processed_trade_time = datetime.now(timezone.utc) - timedelta(seconds=120)
- self._monitoring_active = False
-
- # 🆕 External stop loss tracking
- self._external_stop_loss_orders = {} # Format: {exchange_order_id: {'token': str, 'trigger_price': float, 'side': str, 'detected_at': datetime}}
-
- # External trade monitoring
- # self.state_file = "data/market_monitor_state.json" # Removed, state now in DB
- self.last_processed_trade_time: Optional[datetime] = None
-
- # Alarm management
- self.alarm_manager = AlarmManager()
-
- # Order monitoring
- self.last_known_orders = set()
- self.last_known_positions = {}
-
- self._load_state()
-
- async def start(self):
- """Start the market monitor."""
- if self._monitoring_active:
- return
-
- self._monitoring_active = True
- logger.info("🔄 Market monitor started")
-
- # Initialize tracking
- await self._initialize_tracking()
-
- # Start monitoring task
- self._monitor_task = asyncio.create_task(self._monitor_loop())
-
- async def stop(self):
- """Stop the market monitor."""
- if not self._monitoring_active:
- return
-
- self._monitoring_active = False
-
- if self._monitor_task:
- self._monitor_task.cancel()
- try:
- await self._monitor_task
- except asyncio.CancelledError:
- pass
-
- self._save_state()
- logger.info("🛑 Market monitor stopped")
-
- def _load_state(self):
- """Load market monitor state from SQLite DB via TradingStats."""
- stats = self.trading_engine.get_stats()
- if not stats:
- logger.warning("⚠️ TradingStats not available, cannot load MarketMonitor state.")
- self.last_processed_trade_time = None
- return
- try:
- last_time_str = stats._get_metadata('market_monitor_last_processed_trade_time')
- if last_time_str:
- self.last_processed_trade_time = datetime.fromisoformat(last_time_str)
- # Ensure it's timezone-aware (UTC)
- if self.last_processed_trade_time.tzinfo is None:
- self.last_processed_trade_time = self.last_processed_trade_time.replace(tzinfo=timezone.utc)
- else:
- self.last_processed_trade_time = self.last_processed_trade_time.astimezone(timezone.utc)
- logger.info(f"🔄 Loaded MarketMonitor state from DB: last_processed_trade_time = {self.last_processed_trade_time.isoformat()}")
- else:
- logger.info("💨 No MarketMonitor state (last_processed_trade_time) found in DB. Will start with fresh external trade tracking.")
- self.last_processed_trade_time = None
- except Exception as e:
- logger.error(f"Error loading MarketMonitor state from DB: {e}. Proceeding with default state.")
- self.last_processed_trade_time = None
- def _save_state(self):
- """Save market monitor state to SQLite DB via TradingStats."""
- stats = self.trading_engine.get_stats()
- if not stats:
- logger.warning("⚠️ TradingStats not available, cannot save MarketMonitor state.")
- return
- try:
- if self.last_processed_trade_time:
- # Ensure timestamp is UTC before saving
- lptt_utc = self.last_processed_trade_time
- if lptt_utc.tzinfo is None:
- lptt_utc = lptt_utc.replace(tzinfo=timezone.utc)
- else:
- lptt_utc = lptt_utc.astimezone(timezone.utc)
-
- stats._set_metadata('market_monitor_last_processed_trade_time', lptt_utc.isoformat())
- logger.info(f"💾 Saved MarketMonitor state (last_processed_trade_time) to DB: {lptt_utc.isoformat()}")
- else:
- # If it's None, we might want to remove the key or save it as an empty string
- # For now, let's assume we only save if there is a time. Or remove it.
- stats._set_metadata('market_monitor_last_processed_trade_time', '') # Or handle deletion
- logger.info("💾 MarketMonitor state (last_processed_trade_time) is None, saved as empty in DB.")
- except Exception as e:
- logger.error(f"Error saving MarketMonitor state to DB: {e}")
-
- async def _initialize_tracking(self):
- """Initialize order and position tracking."""
- try:
- # Get current open orders to initialize tracking
- orders = self.trading_engine.get_orders()
- if orders:
- self.last_known_orders = {order.get('id') for order in orders if order.get('id')}
- logger.info(f"📋 Initialized tracking with {len(self.last_known_orders)} open orders")
-
- # Get current positions for P&L tracking
- positions = self.trading_engine.get_positions()
- if positions:
- for position in positions:
- symbol = position.get('symbol')
- contracts = float(position.get('contracts', 0))
- entry_price = float(position.get('entryPx', 0))
-
- if symbol and contracts != 0:
- self.last_known_positions[symbol] = {
- 'contracts': contracts,
- 'entry_price': entry_price
- }
- logger.info(f"📊 Initialized tracking with {len(self.last_known_positions)} positions")
-
- except Exception as e:
- logger.error(f"❌ Error initializing tracking: {e}")
-
- async def _monitor_loop(self):
- """Main monitoring loop that runs every BOT_HEARTBEAT_SECONDS."""
- try:
- loop_count = 0
- while self._monitoring_active:
- # 🆕 PHASE 4: Check trades table for pending stop loss activation first (highest priority)
- await self._activate_pending_stop_losses_from_trades()
-
- await self._check_order_fills()
- await self._check_price_alarms()
- await self._check_external_trades()
- await self._check_pending_triggers()
- await self._check_automatic_risk_management()
- await self._check_external_stop_loss_orders()
-
- # Run orphaned stop loss cleanup every 10 heartbeats (less frequent but regular)
- loop_count += 1
- if loop_count % 10 == 0:
- await self._cleanup_orphaned_stop_losses()
- await self._cleanup_external_stop_loss_tracking()
-
- # 🆕 AUTO-SYNC: Check for orphaned positions every 10 heartbeats
- await self._auto_sync_orphaned_positions()
-
- loop_count = 0 # Reset counter to prevent overflow
-
- await asyncio.sleep(Config.BOT_HEARTBEAT_SECONDS)
- except asyncio.CancelledError:
- logger.info("Market monitor loop cancelled")
- raise
- except Exception as e:
- logger.error(f"Error in market monitor loop: {e}")
- # Restart after error
- if self._monitoring_active:
- await asyncio.sleep(5)
- await self._monitor_loop()
-
- async def _check_order_fills(self):
- """Check for filled orders and send notifications."""
- try:
- # Get current orders and positions
- current_orders = self.trading_engine.get_orders() or []
- current_positions = self.trading_engine.get_positions() or []
-
- # Get current order IDs
- current_order_ids = {order.get('id') for order in current_orders if order.get('id')}
-
- # Find filled orders (orders that were in last_known_orders but not in current_orders)
- disappeared_order_ids = self.last_known_orders - current_order_ids
-
- if disappeared_order_ids:
- logger.info(f"🎯 Detected {len(disappeared_order_ids)} bot orders no longer open: {list(disappeared_order_ids)}. Corresponding fills (if any) are processed by external trade checker.")
- await self._process_disappeared_orders(disappeared_order_ids)
-
- # Update tracking data for open bot orders
- self.last_known_orders = current_order_ids
- # Position state is primarily managed by TradingStats based on all fills.
- # This local tracking can provide supplementary logging if needed.
- # await self._update_position_tracking(current_positions)
-
- except Exception as e:
- logger.error(f"❌ Error checking order fills: {e}")
-
- async def _process_filled_orders(self, filled_order_ids: set, current_positions: list):
- """Process filled orders using enhanced position tracking."""
- try:
- # For bot-initiated orders, we'll detect changes in position size
- # and send appropriate notifications using the enhanced system
-
- # This method will be triggered when orders placed through the bot are filled
- # The external trade monitoring will handle trades made outside the bot
-
- # Update position tracking based on current positions
- await self._update_position_tracking(current_positions)
-
- except Exception as e:
- logger.error(f"❌ Error processing filled orders: {e}")
-
- async def _update_position_tracking(self, current_positions: list):
- """Update position tracking and calculate P&L changes."""
- try:
- new_position_map = {}
-
- for position in current_positions:
- symbol = position.get('symbol')
- contracts = float(position.get('contracts', 0))
- entry_price = float(position.get('entryPx', 0))
-
- if symbol and contracts != 0:
- new_position_map[symbol] = {
- 'contracts': contracts,
- 'entry_price': entry_price
- }
-
- # Compare with previous positions to detect changes
- for symbol, new_data in new_position_map.items():
- old_data = self.last_known_positions.get(symbol)
-
- if not old_data:
- # New position opened
- logger.info(f"📈 New position detected (observed by MarketMonitor): {symbol} {new_data['contracts']} @ ${new_data['entry_price']:.4f}. TradingStats is the definitive source.")
- elif abs(new_data['contracts'] - old_data['contracts']) > 0.000001:
- # Position size changed
- change = new_data['contracts'] - old_data['contracts']
- logger.info(f"📊 Position change detected (observed by MarketMonitor): {symbol} {change:+.6f} contracts. TradingStats is the definitive source.")
-
- # Check for closed positions
- for symbol in self.last_known_positions:
- if symbol not in new_position_map:
- logger.info(f"📉 Position closed (observed by MarketMonitor): {symbol}. TradingStats is the definitive source.")
-
- # Update tracking
- self.last_known_positions = new_position_map
-
- except Exception as e:
- logger.error(f"❌ Error updating position tracking: {e}")
-
- async def _process_disappeared_orders(self, disappeared_order_ids: set):
- """Log and investigate bot orders that have disappeared from the exchange."""
- stats = self.trading_engine.get_stats()
- if not stats:
- logger.warning("⚠️ TradingStats not available in _process_disappeared_orders.")
- return
- try:
- total_linked_cancelled = 0
- external_cancellations = []
-
- for exchange_oid in disappeared_order_ids:
- order_in_db = stats.get_order_by_exchange_id(exchange_oid)
-
- if order_in_db:
- last_status = order_in_db.get('status', 'unknown')
- order_type = order_in_db.get('type', 'unknown')
- symbol = order_in_db.get('symbol', 'unknown')
- token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0]
-
- logger.info(f"Order {exchange_oid} was in our DB with status '{last_status}' but has now disappeared from exchange.")
-
- # Check if this was an unexpected disappearance (likely external cancellation)
- active_statuses = ['open', 'submitted', 'partially_filled', 'pending_submission']
- if last_status in active_statuses:
- logger.warning(f"⚠️ EXTERNAL CANCELLATION: Order {exchange_oid} with status '{last_status}' was likely cancelled externally on Hyperliquid")
- stats.update_order_status(exchange_order_id=exchange_oid, new_status='cancelled_externally')
-
- # Track external cancellations for notification
- external_cancellations.append({
- 'exchange_oid': exchange_oid,
- 'token': token,
- 'type': order_type,
- 'last_status': last_status
- })
-
- # Send notification about external cancellation
- if self.notification_manager:
- await self.notification_manager.send_generic_notification(
- f"⚠️ <b>External Order Cancellation Detected</b>\n\n"
- f"Token: {token}\n"
- f"Order Type: {order_type.replace('_', ' ').title()}\n"
- f"Exchange Order ID: <code>{exchange_oid[:8]}...</code>\n"
- f"Previous Status: {last_status.replace('_', ' ').title()}\n"
- f"Source: Cancelled directly on Hyperliquid\n"
- f"Time: {datetime.now().strftime('%H:%M:%S')}\n\n"
- f"🤖 Bot status updated automatically"
- )
-
- # 🔧 EDGE CASE FIX: Wait briefly before cancelling stop losses
- # Sometimes orders are cancelled externally but fills come through simultaneously
- logger.info(f"⏳ Waiting 3 seconds to check for potential fills before cancelling stop losses for {exchange_oid}")
- await asyncio.sleep(3)
-
- # Re-check the order status after waiting - it might have been filled
- order_in_db_updated = stats.get_order_by_exchange_id(exchange_oid)
- if order_in_db_updated and order_in_db_updated.get('status') in ['filled', 'partially_filled']:
- logger.info(f"✅ Order {exchange_oid} was filled during the wait period - NOT cancelling stop losses")
- # Don't cancel stop losses - let them be activated normally
- continue
-
- # Additional check: Look for very recent fills that might match this order
- recent_fill_found = await self._check_for_recent_fills_for_order(exchange_oid, order_in_db)
- if recent_fill_found:
- logger.info(f"✅ Found recent fill for order {exchange_oid} - NOT cancelling stop losses")
- continue
- else:
- # Normal completion/cancellation - update status
- stats.update_order_status(exchange_order_id=exchange_oid, new_status='disappeared_from_exchange')
-
- # Cancel any pending stop losses linked to this order (only if not filled)
- if order_in_db.get('bot_order_ref_id'):
- # Double-check one more time that the order wasn't filled
- final_order_check = stats.get_order_by_exchange_id(exchange_oid)
- if final_order_check and final_order_check.get('status') in ['filled', 'partially_filled']:
- logger.info(f"🛑 Order {exchange_oid} was filled - preserving stop losses")
- continue
-
- cancelled_sl_count = stats.cancel_linked_orders(
- parent_bot_order_ref_id=order_in_db['bot_order_ref_id'],
- new_status='cancelled_parent_disappeared'
- )
- total_linked_cancelled += cancelled_sl_count
-
- if cancelled_sl_count > 0:
- logger.info(f"Cancelled {cancelled_sl_count} pending stop losses linked to disappeared order {exchange_oid}")
-
- if self.notification_manager:
- await self.notification_manager.send_generic_notification(
- f"🛑 <b>Linked Stop Losses Cancelled</b>\n\n"
- f"Token: {token}\n"
- f"Cancelled: {cancelled_sl_count} stop loss(es)\n"
- f"Reason: Parent order {exchange_oid[:8]}... disappeared\n"
- f"Likely Cause: External cancellation on Hyperliquid\n"
- f"Time: {datetime.now().strftime('%H:%M:%S')}"
- )
- else:
- logger.warning(f"Order {exchange_oid} disappeared from exchange but was not found in our DB. This might be an order placed externally.")
- # Send summary notification if multiple external cancellations occurred
- if len(external_cancellations) > 1:
- tokens_affected = list(set(item['token'] for item in external_cancellations))
-
- if self.notification_manager:
- await self.notification_manager.send_generic_notification(
- f"⚠️ <b>Multiple External Cancellations Detected</b>\n\n"
- f"Orders Cancelled: {len(external_cancellations)}\n"
- f"Tokens Affected: {', '.join(tokens_affected)}\n"
- f"Source: Direct cancellation on Hyperliquid\n"
- f"Linked Stop Losses Cancelled: {total_linked_cancelled}\n"
- f"Time: {datetime.now().strftime('%H:%M:%S')}\n\n"
- f"💡 Check individual orders for details"
- )
- except Exception as e:
- logger.error(f"❌ Error processing disappeared orders: {e}", exc_info=True)
-
- async def _check_price_alarms(self):
- """Check price alarms and trigger notifications."""
- try:
- active_alarms = self.alarm_manager.get_all_active_alarms()
-
- if not active_alarms:
- return
-
- # Group alarms by token to minimize API calls
- tokens_to_check = list(set(alarm['token'] for alarm in active_alarms))
-
- for token in tokens_to_check:
- try:
- # Get current market price
- symbol = f"{token}/USDC:USDC"
- market_data = self.trading_engine.get_market_data(symbol)
-
- if not market_data or not market_data.get('ticker'):
- continue
-
- current_price = float(market_data['ticker'].get('last', 0))
- if current_price <= 0:
- continue
-
- # Check alarms for this token
- token_alarms = [alarm for alarm in active_alarms if alarm['token'] == token]
-
- for alarm in token_alarms:
- target_price = alarm['target_price']
- direction = alarm['direction']
-
- # Check if alarm should trigger
- should_trigger = False
- if direction == 'above' and current_price >= target_price:
- should_trigger = True
- elif direction == 'below' and current_price <= target_price:
- should_trigger = True
-
- if should_trigger:
- # Trigger the alarm
- triggered_alarm = self.alarm_manager.trigger_alarm(alarm['id'], current_price)
- if triggered_alarm:
- await self._send_alarm_notification(triggered_alarm)
-
- except Exception as e:
- logger.error(f"Error checking alarms for {token}: {e}")
-
- except Exception as e:
- logger.error(f"❌ Error checking price alarms: {e}")
-
- async def _send_alarm_notification(self, alarm: Dict[str, Any]):
- """Send notification for triggered alarm."""
- try:
- # Send through notification manager if available
- if self.notification_manager:
- await self.notification_manager.send_alarm_triggered_notification(
- alarm['token'],
- alarm['target_price'],
- alarm['triggered_price'],
- alarm['direction']
- )
- else:
- # Fallback to logging if notification manager not available
- logger.info(f"🔔 ALARM TRIGGERED: {alarm['token']} @ ${alarm['triggered_price']:,.2f}")
-
- except Exception as e:
- logger.error(f"❌ Error sending alarm notification: {e}")
-
- async def _check_external_trades(self):
- """Check for trades made outside the Telegram bot and update stats."""
- try:
- # Get recent fills from exchange
- recent_fills = self.trading_engine.get_recent_fills()
- if not recent_fills:
- logger.debug("No recent fills data available")
- return
- # Get last processed timestamp from database
- if not hasattr(self, '_last_processed_trade_time') or self._last_processed_trade_time is None:
- try:
- last_time_str = self.trading_engine.stats._get_metadata('last_processed_trade_time')
- if last_time_str:
- self._last_processed_trade_time = datetime.fromisoformat(last_time_str)
- logger.debug(f"Loaded last_processed_trade_time from DB: {self._last_processed_trade_time}")
- else:
- # If no last processed time, start from 1 hour ago to avoid processing too much history
- self._last_processed_trade_time = datetime.now(timezone.utc) - timedelta(hours=1)
- logger.info("No last_processed_trade_time found, setting to 1 hour ago (UTC).")
- except Exception as e:
- logger.warning(f"Could not load last_processed_trade_time from DB: {e}")
- self._last_processed_trade_time = datetime.now(timezone.utc) - timedelta(hours=1)
- # Process new fills
- external_trades_processed = 0
- symbols_with_fills = set() # Track symbols that had fills for stop loss activation
-
- for fill in recent_fills:
- try:
- # Parse fill data - CCXT format from fetch_my_trades
- trade_id = fill.get('id') # CCXT uses 'id' for trade ID
- timestamp_ms = fill.get('timestamp') # CCXT uses 'timestamp' (milliseconds)
- symbol = fill.get('symbol') # CCXT uses 'symbol' in full format like 'LTC/USDC:USDC'
- side = fill.get('side') # CCXT uses 'side' ('buy' or 'sell')
- amount = float(fill.get('amount', 0)) # CCXT uses 'amount'
- price = float(fill.get('price', 0)) # CCXT uses 'price'
-
- # Convert timestamp
- if timestamp_ms:
- timestamp_dt = datetime.fromtimestamp(timestamp_ms / 1000, tz=timezone.utc)
- else:
- timestamp_dt = datetime.now(timezone.utc)
-
- # Skip if already processed
- if timestamp_dt <= self._last_processed_trade_time:
- continue
-
- # Process as external trade if we reach here
- if symbol and side and amount > 0 and price > 0:
- # Symbol is already in full format for CCXT
- full_symbol = symbol
- token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0]
-
- # Check if this might be a bot order fill by looking for exchange order ID
- # CCXT might have this in 'info' sub-object with the raw exchange data
- exchange_order_id_from_fill = None
- if 'info' in fill and isinstance(fill['info'], dict):
- # Look for Hyperliquid order ID in the raw response
- exchange_order_id_from_fill = fill['info'].get('oid')
-
- # 🆕 Check if this fill corresponds to an external stop loss order
- is_external_stop_loss = False
- stop_loss_info = None
- if exchange_order_id_from_fill and exchange_order_id_from_fill in self._external_stop_loss_orders:
- is_external_stop_loss = True
- stop_loss_info = self._external_stop_loss_orders[exchange_order_id_from_fill]
- logger.info(f"🛑 EXTERNAL STOP LOSS EXECUTION: {token} - Order {exchange_order_id_from_fill} filled @ ${price:.2f}")
-
- logger.info(f"🔍 Processing {'external stop loss' if is_external_stop_loss else 'external trade'}: {trade_id} - {side} {amount} {full_symbol} @ ${price:.2f}")
-
- stats = self.trading_engine.stats
- if not stats:
- logger.warning("⚠️ TradingStats not available in _check_external_trades.")
- continue
-
- # If this is an external stop loss execution, handle it specially
- if is_external_stop_loss and stop_loss_info:
- # 🧹 PHASE 3: Close active trade for stop loss execution
- active_trade = stats.get_active_trade_by_symbol(full_symbol, status='active')
- if active_trade:
- entry_price = active_trade.get('entry_price', 0)
- active_trade_side = active_trade.get('side')
-
- # Calculate realized P&L
- if active_trade_side == 'buy': # Long position
- realized_pnl = amount * (price - entry_price)
- else: # Short position
- realized_pnl = amount * (entry_price - price)
-
- stats.update_active_trade_closed(
- active_trade['id'], realized_pnl, timestamp_dt.isoformat()
- )
- logger.info(f"🛑 Active trade {active_trade['id']} closed via external stop loss - P&L: ${realized_pnl:.2f}")
-
- # Send specialized stop loss execution notification
- if self.notification_manager:
- await self.notification_manager.send_stop_loss_execution_notification(
- stop_loss_info, full_symbol, side, amount, price, 'long_closed', timestamp_dt.isoformat()
- )
-
- # Remove from tracking since it's now executed
- del self._external_stop_loss_orders[exchange_order_id_from_fill]
-
- logger.info(f"🛑 Processed external stop loss execution: {side} {amount} {full_symbol} @ ${price:.2f} (long_closed)")
-
- else:
- # Handle as regular external trade
- # Check if this corresponds to a bot order by exchange_order_id
- linked_order_db_id = None
- if exchange_order_id_from_fill:
- order_in_db = stats.get_order_by_exchange_id(exchange_order_id_from_fill)
- if order_in_db:
- linked_order_db_id = order_in_db.get('id')
- logger.info(f"🔗 Linked external fill {trade_id} to bot order DB ID {linked_order_db_id} (Exchange OID: {exchange_order_id_from_fill})")
-
- # Update order status to filled if it was open
- current_status = order_in_db.get('status', '')
- if current_status in ['open', 'partially_filled', 'pending_submission']:
- # Determine if this is a partial or full fill
- order_amount_requested = float(order_in_db.get('amount_requested', 0))
- if abs(amount - order_amount_requested) < 0.000001: # Allow small floating point differences
- new_status_after_fill = 'filled'
- else:
- new_status_after_fill = 'partially_filled'
-
- stats.update_order_status(
- order_db_id=linked_order_db_id,
- new_status=new_status_after_fill
- )
- logger.info(f"📊 Updated bot order {linked_order_db_id} status: {current_status} → {new_status_after_fill}")
-
- # Check if this order is now fully filled and has pending stop losses to activate
- if new_status_after_fill == 'filled':
- await self._activate_pending_stop_losses(order_in_db, stats)
-
- # 🧹 PHASE 3: Record trade simply, use active_trades for tracking
- stats.record_trade(
- full_symbol, side, amount, price,
- exchange_fill_id=trade_id, trade_type="external",
- timestamp=timestamp_dt.isoformat(),
- linked_order_table_id_to_link=linked_order_db_id
- )
-
- # Derive action type from trade context for notifications
- if linked_order_db_id:
- # Bot order - determine action from order context
- order_side = order_in_db.get('side', side).lower()
- if order_side == 'buy':
- action_type = 'long_opened'
- elif order_side == 'sell':
- action_type = 'short_opened'
- else:
- action_type = 'position_opened'
- else:
- # External trade - determine from current active trades
- existing_active_trade = stats.get_active_trade_by_symbol(full_symbol, status='active')
- if existing_active_trade:
- # Has active position - this is likely a closure
- existing_side = existing_active_trade.get('side')
- if existing_side == 'buy' and side.lower() == 'sell':
- action_type = 'long_closed'
- elif existing_side == 'sell' and side.lower() == 'buy':
- action_type = 'short_closed'
- else:
- action_type = 'position_modified'
- else:
- # No active position - this opens a new one
- if side.lower() == 'buy':
- action_type = 'long_opened'
- else:
- action_type = 'short_opened'
-
- # 🧹 PHASE 3: Update active trades based on action
- if linked_order_db_id:
- # Bot order - update linked active trade
- order_data = stats.get_order_by_db_id(linked_order_db_id)
- if order_data:
- exchange_order_id = order_data.get('exchange_order_id')
-
- # Find active trade by entry order ID
- all_active_trades = stats.get_all_active_trades()
- for at in all_active_trades:
- if at.get('entry_order_id') == exchange_order_id:
- active_trade_id = at['id']
- current_status = at['status']
-
- if current_status == 'pending' and action_type in ['long_opened', 'short_opened']:
- # Entry order filled - update active trade to active
- stats.update_active_trade_opened(
- active_trade_id, price, amount, timestamp_dt.isoformat()
- )
- logger.info(f"🆕 Active trade {active_trade_id} opened via fill {trade_id}")
-
- elif current_status == 'active' and action_type in ['long_closed', 'short_closed']:
- # Exit order filled - calculate P&L and close active trade
- entry_price = at.get('entry_price', 0)
- active_trade_side = at.get('side')
-
- # Calculate realized P&L
- if active_trade_side == 'buy': # Long position
- realized_pnl = amount * (price - entry_price)
- else: # Short position
- realized_pnl = amount * (entry_price - price)
-
- stats.update_active_trade_closed(
- active_trade_id, realized_pnl, timestamp_dt.isoformat()
- )
- logger.info(f"🆕 Active trade {active_trade_id} closed via fill {trade_id} - P&L: ${realized_pnl:.2f}")
- break
-
- elif action_type in ['long_opened', 'short_opened']:
- # External trade that opened a position - create external active trade
- active_trade_id = stats.create_active_trade(
- symbol=full_symbol,
- side=side.lower(),
- entry_order_id=None, # External order
- trade_type='external'
- )
- if active_trade_id:
- stats.update_active_trade_opened(
- active_trade_id, price, amount, timestamp_dt.isoformat()
- )
- logger.info(f"🆕 Created external active trade {active_trade_id} for {side.upper()} {full_symbol}")
-
- elif action_type in ['long_closed', 'short_closed']:
- # External closure - close any active trade for this symbol
- active_trade = stats.get_active_trade_by_symbol(full_symbol, status='active')
- if active_trade:
- entry_price = active_trade.get('entry_price', 0)
- active_trade_side = active_trade.get('side')
-
- # Calculate realized P&L
- if active_trade_side == 'buy': # Long position
- realized_pnl = amount * (price - entry_price)
- else: # Short position
- realized_pnl = amount * (entry_price - price)
-
- stats.update_active_trade_closed(
- active_trade['id'], realized_pnl, timestamp_dt.isoformat()
- )
- logger.info(f"🆕 External closure: Active trade {active_trade['id']} closed - P&L: ${realized_pnl:.2f}")
-
- # Track symbol for potential stop loss activation
- symbols_with_fills.add(token)
-
- # Send notification for external trade
- if self.notification_manager:
- await self.notification_manager.send_external_trade_notification(
- full_symbol, side, amount, price, action_type, timestamp_dt.isoformat()
- )
-
- logger.info(f"📋 Processed external trade: {side} {amount} {full_symbol} @ ${price:.2f} ({action_type}) using timestamp {timestamp_dt.isoformat()}")
-
- external_trades_processed += 1
-
- # Update last processed time
- self._last_processed_trade_time = timestamp_dt
-
- except Exception as e:
- logger.error(f"Error processing fill {fill}: {e}")
- continue
-
- # Save the last processed timestamp to database
- if external_trades_processed > 0:
- self.trading_engine.stats._set_metadata('last_processed_trade_time', self._last_processed_trade_time.isoformat())
- logger.info(f"💾 Saved MarketMonitor state (last_processed_trade_time) to DB: {self._last_processed_trade_time.isoformat()}")
- logger.info(f"📊 Processed {external_trades_processed} external trades")
-
- # Additional check: Activate any pending stop losses for symbols that had fills
- # This is a safety net for cases where the fill linking above didn't work
- await self._check_pending_stop_losses_for_filled_symbols(symbols_with_fills)
- except Exception as e:
- logger.error(f"❌ Error checking external trades: {e}", exc_info=True)
- async def _check_pending_stop_losses_for_filled_symbols(self, symbols_with_fills: set):
- """
- Safety net: Check if any symbols that just had fills have pending stop losses
- that should be activated. This handles cases where fill linking failed.
- """
- try:
- if not symbols_with_fills:
- return
-
- stats = self.trading_engine.stats
- if not stats:
- return
-
- # Get all pending stop losses
- pending_stop_losses = stats.get_orders_by_status('pending_trigger', 'stop_limit_trigger')
-
- if not pending_stop_losses:
- return
-
- # Check each pending stop loss to see if its symbol had fills
- activated_any = False
- for sl_order in pending_stop_losses:
- symbol = sl_order.get('symbol', '')
- token = symbol.split('/')[0] if '/' in symbol else ''
-
- if token in symbols_with_fills:
- # This symbol had fills - check if we should activate the stop loss
- parent_bot_ref_id = sl_order.get('parent_bot_order_ref_id')
- if parent_bot_ref_id:
- # Get the parent order
- parent_order = stats.get_order_by_bot_ref_id(parent_bot_ref_id)
- if parent_order and parent_order.get('status') in ['filled', 'partially_filled']:
- logger.info(f"🛑 Safety net activation: Found pending SL for {token} with filled parent order {parent_bot_ref_id}")
- await self._activate_pending_stop_losses(parent_order, stats)
- activated_any = True
-
- if activated_any:
- logger.info("✅ Safety net activated pending stop losses for symbols with recent fills")
-
- except Exception as e:
- logger.error(f"Error in safety net stop loss activation: {e}", exc_info=True)
-
- async def _check_pending_triggers(self):
- """Check and process pending conditional triggers (e.g., SL/TP)."""
- stats = self.trading_engine.get_stats()
- if not stats:
- logger.warning("⚠️ TradingStats not available in _check_pending_triggers.")
- return
- try:
- # Fetch pending SL triggers (adjust type if TP triggers are different)
- # For now, assuming 'STOP_LIMIT_TRIGGER' is the type used for SLs that become limit orders
- pending_sl_triggers = stats.get_orders_by_status(status='pending_trigger', order_type_filter='stop_limit_trigger')
-
- if not pending_sl_triggers:
- return
- logger.debug(f"Found {len(pending_sl_triggers)} pending SL triggers to check.")
- for trigger_order in pending_sl_triggers:
- symbol = trigger_order['symbol']
- trigger_price = trigger_order['price'] # This is the stop price
- trigger_side = trigger_order['side'] # This is the side of the SL order (e.g., sell for a long position's SL)
- order_db_id = trigger_order['id']
- parent_ref_id = trigger_order.get('parent_bot_order_ref_id')
- if not symbol or trigger_price is None:
- logger.warning(f"Invalid trigger order data for DB ID {order_db_id}, skipping: {trigger_order}")
- continue
- market_data = self.trading_engine.get_market_data(symbol)
- if not market_data or not market_data.get('ticker'):
- logger.warning(f"Could not fetch market data for {symbol} to check SL trigger {order_db_id}.")
- continue
-
- current_price = float(market_data['ticker'].get('last', 0))
- if current_price <= 0:
- logger.warning(f"Invalid current price ({current_price}) for {symbol} checking SL trigger {order_db_id}.")
- continue
- trigger_hit = False
- if trigger_side.lower() == 'sell' and current_price <= trigger_price:
- trigger_hit = True
- logger.info(f"🔴 SL TRIGGER HIT (Sell): Order DB ID {order_db_id}, Symbol {symbol}, Trigger@ ${trigger_price:.4f}, Market@ ${current_price:.4f}")
- elif trigger_side.lower() == 'buy' and current_price >= trigger_price:
- trigger_hit = True
- logger.info(f"🟢 SL TRIGGER HIT (Buy): Order DB ID {order_db_id}, Symbol {symbol}, Trigger@ ${trigger_price:.4f}, Market@ ${current_price:.4f}")
-
- if trigger_hit:
- logger.info(f"Attempting to execute actual stop order for triggered DB ID: {order_db_id} (Parent Bot Ref: {trigger_order.get('parent_bot_order_ref_id')})")
-
- execution_result = await self.trading_engine.execute_triggered_stop_order(original_trigger_order_db_id=order_db_id)
-
- notification_message_detail = ""
- if execution_result.get("success"):
- new_trigger_status = 'triggered_order_placed'
- placed_sl_details = execution_result.get("placed_sl_order_details", {})
- logger.info(f"Successfully placed actual SL order from trigger {order_db_id}. New SL Order DB ID: {placed_sl_details.get('order_db_id')}, Exchange ID: {placed_sl_details.get('exchange_order_id')}")
- notification_message_detail = f"Actual SL order placed (New DB ID: {placed_sl_details.get('order_db_id', 'N/A')})."
- else:
- new_trigger_status = 'trigger_execution_failed'
- error_msg = execution_result.get("error", "Unknown error during SL execution.")
- logger.error(f"Failed to execute actual SL order from trigger {order_db_id}: {error_msg}")
- notification_message_detail = f"Failed to place actual SL order: {error_msg}"
- stats.update_order_status(order_db_id=order_db_id, new_status=new_trigger_status)
-
- if self.notification_manager:
- await self.notification_manager.send_generic_notification(
- f"🔔 Stop-Loss Update!\nSymbol: {symbol}\nSide: {trigger_side.upper()}\nTrigger Price: ${trigger_price:.4f}\nMarket Price: ${current_price:.4f}\n(Original Trigger DB ID: {order_db_id}, Parent: {parent_ref_id or 'N/A'})\nStatus: {new_trigger_status.replace('_', ' ').title()}\nDetails: {notification_message_detail}"
- )
- except Exception as e:
- logger.error(f"❌ Error checking pending SL triggers: {e}", exc_info=True)
- async def _check_automatic_risk_management(self):
- """Check for automatic stop loss triggers based on Config.STOP_LOSS_PERCENTAGE as safety net."""
- try:
- # Skip if risk management is disabled or percentage is 0
- if not getattr(Config, 'RISK_MANAGEMENT_ENABLED', True) or Config.STOP_LOSS_PERCENTAGE <= 0:
- return
- # Get current positions
- positions = self.trading_engine.get_positions()
- if not positions:
- # If no positions exist, clean up any orphaned pending stop losses
- await self._cleanup_orphaned_stop_losses()
- return
- for position in positions:
- try:
- symbol = position.get('symbol', '')
- contracts = float(position.get('contracts', 0))
- entry_price = float(position.get('entryPx', 0))
- mark_price = float(position.get('markPx', 0))
- unrealized_pnl = float(position.get('unrealizedPnl', 0))
-
- # Skip if no position or missing data
- if contracts == 0 or entry_price <= 0 or mark_price <= 0:
- continue
- # Calculate PnL percentage based on entry value
- entry_value = abs(contracts) * entry_price
- if entry_value <= 0:
- continue
-
- pnl_percentage = (unrealized_pnl / entry_value) * 100
- # Check if loss exceeds the safety threshold
- if pnl_percentage <= -Config.STOP_LOSS_PERCENTAGE:
- token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0]
- position_side = "LONG" if contracts > 0 else "SHORT"
-
- logger.warning(f"🚨 AUTOMATIC STOP LOSS TRIGGERED: {token} {position_side} position has {pnl_percentage:.2f}% loss (threshold: -{Config.STOP_LOSS_PERCENTAGE}%)")
-
- # Send notification before attempting exit
- if self.notification_manager:
- await self.notification_manager.send_generic_notification(
- f"🚨 AUTOMATIC STOP LOSS TRIGGERED!\n"
- f"Token: {token}\n"
- f"Position: {position_side} {abs(contracts):.6f}\n"
- f"Entry Price: ${entry_price:.4f}\n"
- f"Current Price: ${mark_price:.4f}\n"
- f"Unrealized PnL: ${unrealized_pnl:.2f} ({pnl_percentage:.2f}%)\n"
- f"Safety Threshold: -{Config.STOP_LOSS_PERCENTAGE}%\n"
- f"Action: Executing emergency exit order..."
- )
- # Execute emergency exit order
- exit_result = await self.trading_engine.execute_exit_order(token)
-
- if exit_result.get('success'):
- logger.info(f"✅ Emergency exit order placed successfully for {token}. Order details: {exit_result.get('order_placed_details', {})}")
-
- # Cancel any pending stop losses for this symbol since position is now closed
- stats = self.trading_engine.get_stats()
- if stats:
- cancelled_sl_count = stats.cancel_pending_stop_losses_by_symbol(
- symbol=symbol,
- new_status='cancelled_auto_exit'
- )
- if cancelled_sl_count > 0:
- logger.info(f"🛑 Cancelled {cancelled_sl_count} pending stop losses for {symbol} after automatic exit")
-
- if self.notification_manager:
- await self.notification_manager.send_generic_notification(
- f"✅ <b>Emergency Exit Completed</b>\n\n"
- f"📊 <b>Position:</b> {token} {position_side}\n"
- f"📉 <b>Loss:</b> {pnl_percentage:.2f}% (${unrealized_pnl:.2f})\n"
- f"⚠️ <b>Threshold:</b> -{Config.STOP_LOSS_PERCENTAGE}%\n"
- f"✅ <b>Action:</b> Position automatically closed\n"
- f"💰 <b>Exit Price:</b> ~${mark_price:.2f}\n"
- f"🆔 <b>Order ID:</b> {exit_result.get('order_placed_details', {}).get('exchange_order_id', 'N/A')}\n"
- f"{f'🛑 <b>Cleanup:</b> Cancelled {cancelled_sl_count} pending stop losses' if cancelled_sl_count > 0 else ''}\n\n"
- f"🛡️ This was an automatic safety stop triggered by the risk management system."
- )
- else:
- error_msg = exit_result.get('error', 'Unknown error')
- logger.error(f"❌ Failed to execute emergency exit order for {token}: {error_msg}")
-
- if self.notification_manager:
- await self.notification_manager.send_generic_notification(
- f"❌ <b>CRITICAL: Emergency Exit Failed!</b>\n\n"
- f"📊 <b>Position:</b> {token} {position_side}\n"
- f"📉 <b>Loss:</b> {pnl_percentage:.2f}%\n"
- f"❌ <b>Error:</b> {error_msg}\n\n"
- f"⚠️ <b>MANUAL INTERVENTION REQUIRED</b>\n"
- f"Please close this position manually via /exit {token}"
- )
- except Exception as pos_error:
- logger.error(f"Error processing position for automatic stop loss: {pos_error}")
- continue
- except Exception as e:
- logger.error(f"❌ Error in automatic risk management check: {e}", exc_info=True)
- async def _cleanup_orphaned_stop_losses(self):
- """Clean up pending stop losses that no longer have corresponding positions OR whose parent orders have been cancelled/failed."""
- try:
- stats = self.trading_engine.get_stats()
- if not stats:
- return
- # Get all pending stop loss triggers
- pending_stop_losses = stats.get_orders_by_status('pending_trigger', 'stop_limit_trigger')
-
- if not pending_stop_losses:
- return
- logger.debug(f"Checking {len(pending_stop_losses)} pending stop losses for orphaned orders")
- # Get current positions to check against
- current_positions = self.trading_engine.get_positions()
- position_symbols = set()
-
- if current_positions:
- for pos in current_positions:
- symbol = pos.get('symbol')
- contracts = float(pos.get('contracts', 0))
- if symbol and contracts != 0:
- position_symbols.add(symbol)
- # Check each pending stop loss
- orphaned_count = 0
- for sl_order in pending_stop_losses:
- symbol = sl_order.get('symbol')
- order_db_id = sl_order.get('id')
- parent_bot_ref_id = sl_order.get('parent_bot_order_ref_id')
-
- should_cancel = False
- cancel_reason = ""
-
- # Check if parent order exists and its status
- if parent_bot_ref_id:
- parent_order = stats.get_order_by_bot_ref_id(parent_bot_ref_id)
- if parent_order:
- parent_status = parent_order.get('status', '').lower()
-
- # Cancel if parent order failed, was cancelled, or disappeared
- if parent_status in ['failed_submission', 'failed_submission_no_data', 'cancelled_manually',
- 'cancelled_externally', 'disappeared_from_exchange']:
- should_cancel = True
- cancel_reason = f"parent order {parent_status}"
- elif parent_status == 'filled':
- # Parent order filled but no position - position might have been closed externally
- if symbol not in position_symbols:
- should_cancel = True
- cancel_reason = "parent filled but position no longer exists"
- # If parent is still 'open', 'submitted', or 'partially_filled', keep the stop loss
- else:
- # Parent order not found in DB - this is truly orphaned
- should_cancel = True
- cancel_reason = "parent order not found in database"
- else:
- # No parent reference - fallback to old logic (position-based check)
- if symbol not in position_symbols:
- should_cancel = True
- cancel_reason = "no position exists and no parent reference"
-
- if should_cancel:
- # Cancel this orphaned stop loss
- success = stats.update_order_status(
- order_db_id=order_db_id,
- new_status='cancelled_orphaned_no_position'
- )
-
- if success:
- orphaned_count += 1
- token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0]
- logger.info(f"🧹 Cancelled orphaned stop loss for {token} (Order DB ID: {order_db_id}) - {cancel_reason}")
- if orphaned_count > 0:
- logger.info(f"🧹 Cleanup completed: Cancelled {orphaned_count} orphaned stop loss orders")
-
- if self.notification_manager:
- await self.notification_manager.send_generic_notification(
- f"🧹 <b>Cleanup Completed</b>\n\n"
- f"Cancelled {orphaned_count} orphaned stop loss order(s)\n"
- f"Reason: Parent orders cancelled/failed or positions closed externally\n"
- f"Time: {datetime.now().strftime('%H:%M:%S')}\n\n"
- f"💡 This automatic cleanup ensures stop losses stay synchronized with actual orders and positions."
- )
- except Exception as e:
- logger.error(f"❌ Error cleaning up orphaned stop losses: {e}", exc_info=True)
- async def _activate_pending_stop_losses_from_trades(self):
- """🆕 PHASE 4: Check trades table for pending stop loss activation first (highest priority)"""
- try:
- stats = self.trading_engine.get_stats()
- if not stats:
- return
-
- # Get open positions that need stop loss activation
- trades_needing_sl = stats.get_pending_stop_loss_activations()
-
- if not trades_needing_sl:
- return
-
- logger.debug(f"🆕 Found {len(trades_needing_sl)} open positions needing stop loss activation")
-
- for position_trade in trades_needing_sl:
- try:
- symbol = position_trade['symbol']
- token = symbol.split('/')[0] if '/' in symbol else symbol
- stop_loss_price = position_trade['stop_loss_price']
- position_side = position_trade['position_side']
- current_amount = position_trade.get('current_position_size', 0)
- lifecycle_id = position_trade['trade_lifecycle_id']
-
- # Get current market price
- current_price = None
- try:
- market_data = self.trading_engine.get_market_data(symbol)
- if market_data and market_data.get('ticker'):
- current_price = float(market_data['ticker'].get('last', 0))
- except Exception as price_error:
- logger.warning(f"Could not fetch current price for {symbol}: {price_error}")
-
- # Determine stop loss side based on position side
- sl_side = 'sell' if position_side == 'long' else 'buy' # Long SL = sell, Short SL = buy
-
- # Check if trigger condition is already met
- trigger_already_hit = False
- trigger_reason = ""
-
- if current_price and current_price > 0:
- if sl_side == 'sell' and current_price <= stop_loss_price:
- # LONG position stop loss - price below trigger
- trigger_already_hit = True
- trigger_reason = f"LONG SL: Current ${current_price:.4f} ≤ Stop ${stop_loss_price:.4f}"
- elif sl_side == 'buy' and current_price >= stop_loss_price:
- # SHORT position stop loss - price above trigger
- trigger_already_hit = True
- trigger_reason = f"SHORT SL: Current ${current_price:.4f} ≥ Stop ${stop_loss_price:.4f}"
-
- if trigger_already_hit:
- # Execute immediate market close
- logger.warning(f"🚨 IMMEDIATE SL EXECUTION (Trades Table): {token} - {trigger_reason}")
-
- try:
- exit_result = await self.trading_engine.execute_exit_order(token)
-
- if exit_result.get('success'):
- logger.info(f"✅ Immediate {position_side.upper()} SL execution successful for {token}")
-
- if self.notification_manager:
- await self.notification_manager.send_generic_notification(
- f"🚨 <b>Immediate Stop Loss Execution</b>\n\n"
- f"🆕 <b>Source: Unified Trades Table (Phase 4)</b>\n"
- f"Token: {token}\n"
- f"Position Type: {position_side.upper()}\n"
- f"SL Trigger Price: ${stop_loss_price:.4f}\n"
- f"Current Market Price: ${current_price:.4f}\n"
- f"Trigger Logic: {trigger_reason}\n"
- f"Action: Market close order placed immediately\n"
- f"Order ID: {exit_result.get('order_placed_details', {}).get('exchange_order_id', 'N/A')}\n"
- f"Lifecycle ID: {lifecycle_id[:8]}\n"
- f"Time: {datetime.now().strftime('%H:%M:%S')}\n\n"
- f"⚡ Single source of truth prevents missed stop losses"
- )
- else:
- logger.error(f"❌ Failed to execute immediate SL for {token}: {exit_result.get('error')}")
-
- except Exception as exec_error:
- logger.error(f"❌ Exception during immediate SL execution for {token}: {exec_error}")
-
- else:
- # Normal activation - place stop loss order
- try:
- sl_result = await self.trading_engine.execute_stop_loss_order(token, stop_loss_price)
-
- if sl_result.get('success'):
- sl_order_id = sl_result.get('order_placed_details', {}).get('exchange_order_id')
-
- # Link the stop loss order to the trade lifecycle
- stats.link_stop_loss_to_trade(lifecycle_id, sl_order_id, stop_loss_price)
-
- logger.info(f"✅ Activated {position_side.upper()} stop loss for {token}: ${stop_loss_price:.4f}")
-
- if self.notification_manager:
- await self.notification_manager.send_generic_notification(
- f"🛑 <b>Stop Loss Activated</b>\n\n"
- f"🆕 <b>Source: Unified Trades Table (Phase 4)</b>\n"
- f"Token: {token}\n"
- f"Position Type: {position_side.upper()}\n"
- f"Stop Loss Price: ${stop_loss_price:.4f}\n"
- f"Current Price: ${current_price:.4f if current_price else 'Unknown'}\n"
- f"Order ID: {sl_order_id or 'N/A'}\n"
- f"Lifecycle ID: {lifecycle_id[:8]}\n"
- f"Time: {datetime.now().strftime('%H:%M:%S')}\n\n"
- f"🛡️ Your position is now protected"
- )
- else:
- logger.error(f"❌ Failed to activate SL for {token}: {sl_result.get('error')}")
-
- except Exception as activation_error:
- logger.error(f"❌ Exception during SL activation for {token}: {activation_error}")
-
- except Exception as trade_error:
- logger.error(f"❌ Error processing position trade for SL activation: {trade_error}")
-
- except Exception as e:
- logger.error(f"❌ Error activating pending stop losses from trades table: {e}", exc_info=True)
- async def _activate_pending_stop_losses(self, order_in_db, stats):
- """Activate pending stop losses for a filled order, checking current price for immediate execution."""
- try:
- # Fetch pending stop losses for this order
- pending_stop_losses = stats.get_orders_by_status('pending_trigger', 'stop_limit_trigger', order_in_db['bot_order_ref_id'])
-
- if not pending_stop_losses:
- return
- symbol = order_in_db.get('symbol')
- token = symbol.split('/')[0] if '/' in symbol and symbol else 'Unknown'
-
- logger.debug(f"Found {len(pending_stop_losses)} pending stop loss(es) for filled order {order_in_db.get('exchange_order_id', 'N/A')}")
-
- # Get current market price for the symbol
- current_price = None
- try:
- market_data = self.trading_engine.get_market_data(symbol)
- if market_data and market_data.get('ticker'):
- current_price = float(market_data['ticker'].get('last', 0))
- if current_price <= 0:
- current_price = None
- except Exception as price_error:
- logger.warning(f"Could not fetch current price for {symbol}: {price_error}")
- current_price = None
-
- # Check if we still have a position for this symbol after the fill
- if symbol:
- # Try to get current position
- try:
- position = self.trading_engine.find_position(token)
-
- if position and float(position.get('contracts', 0)) != 0:
- # Position exists - check each stop loss
- activated_count = 0
- immediately_executed_count = 0
-
- for sl_order in pending_stop_losses:
- sl_trigger_price = float(sl_order.get('price', 0))
- sl_side = sl_order.get('side', '').lower() # 'sell' for long SL, 'buy' for short SL
- sl_db_id = sl_order.get('id')
- sl_amount = sl_order.get('amount_requested', 0)
-
- if not sl_trigger_price or not sl_side or not sl_db_id:
- logger.warning(f"Invalid stop loss data for DB ID {sl_db_id}, skipping")
- continue
-
- # Check if trigger condition is already met
- trigger_already_hit = False
- trigger_reason = ""
-
- if current_price and current_price > 0:
- if sl_side == 'sell' and current_price <= sl_trigger_price:
- # LONG position stop loss - price has fallen below trigger
- # Long SL = SELL order that triggers when price drops below stop price
- trigger_already_hit = True
- trigger_reason = f"LONG SL: Current price ${current_price:.4f} ≤ Stop price ${sl_trigger_price:.4f}"
- elif sl_side == 'buy' and current_price >= sl_trigger_price:
- # SHORT position stop loss - price has risen above trigger
- # Short SL = BUY order that triggers when price rises above stop price
- trigger_already_hit = True
- trigger_reason = f"SHORT SL: Current price ${current_price:.4f} ≥ Stop price ${sl_trigger_price:.4f}"
-
- if trigger_already_hit:
- # Execute immediate market close instead of activating stop loss
- logger.warning(f"🚨 IMMEDIATE SL EXECUTION: {token} - {trigger_reason}")
-
- # Update the stop loss status to show it was immediately executed
- stats.update_order_status(order_db_id=sl_db_id, new_status='immediately_executed_on_activation')
-
- # Execute market order to close position
- try:
- exit_result = await self.trading_engine.execute_exit_order(token)
-
- if exit_result.get('success'):
- immediately_executed_count += 1
- position_side = "LONG" if sl_side == 'sell' else "SHORT"
- logger.info(f"✅ Immediate {position_side} SL execution successful for {token}. Market order placed: {exit_result.get('order_placed_details', {}).get('exchange_order_id', 'N/A')}")
-
- if self.notification_manager:
- await self.notification_manager.send_generic_notification(
- f"🚨 <b>Immediate Stop Loss Execution</b>\n\n"
- f"Token: {token}\n"
- f"Position Type: {position_side}\n"
- f"SL Trigger Price: ${sl_trigger_price:.4f}\n"
- f"Current Market Price: ${current_price:.4f}\n"
- f"Trigger Logic: {trigger_reason}\n"
- f"Action: Market close order placed immediately\n"
- f"Reason: Trigger condition already met when activating\n"
- f"Order ID: {exit_result.get('order_placed_details', {}).get('exchange_order_id', 'N/A')}\n"
- f"Time: {datetime.now().strftime('%H:%M:%S')}\n\n"
- f"⚡ This prevents waiting for a trigger that's already passed"
- )
- else:
- logger.error(f"❌ Failed to execute immediate SL for {token}: {exit_result.get('error', 'Unknown error')}")
-
- if self.notification_manager:
- await self.notification_manager.send_generic_notification(
- f"❌ <b>Immediate SL Execution Failed</b>\n\n"
- f"Token: {token}\n"
- f"SL Price: ${sl_trigger_price:.4f}\n"
- f"Current Price: ${current_price:.4f}\n"
- f"Trigger Logic: {trigger_reason}\n"
- f"Error: {exit_result.get('error', 'Unknown error')}\n\n"
- f"⚠️ Manual intervention may be required"
- )
-
- # Revert status since execution failed
- stats.update_order_status(order_db_id=sl_db_id, new_status='activation_execution_failed')
-
- except Exception as exec_error:
- logger.error(f"❌ Exception during immediate SL execution for {token}: {exec_error}")
- stats.update_order_status(order_db_id=sl_db_id, new_status='activation_execution_error')
- else:
- # Normal activation - trigger condition not yet met
- activated_count += 1
- position_side = "LONG" if sl_side == 'sell' else "SHORT"
- logger.info(f"✅ Activating {position_side} stop loss for {token}: SL price ${sl_trigger_price:.4f} (Current: ${current_price:.4f if current_price else 'Unknown'})")
-
- # Send summary notification for normal activations
- if activated_count > 0 and self.notification_manager:
- await self.notification_manager.send_generic_notification(
- f"🛑 <b>Stop Losses Activated</b>\n\n"
- f"Symbol: {token}\n"
- f"Activated: {activated_count} stop loss(es)\n"
- f"Current Price: ${current_price:.4f if current_price else 'Unknown'}\n"
- f"Status: Monitoring for trigger conditions"
- f"{f'\\n\\n⚡ Additionally executed {immediately_executed_count} stop loss(es) immediately due to current market conditions' if immediately_executed_count > 0 else ''}"
- )
- elif immediately_executed_count > 0 and activated_count == 0:
- # All stop losses were immediately executed
- if self.notification_manager:
- await self.notification_manager.send_generic_notification(
- f"⚡ <b>All Stop Losses Executed Immediately</b>\n\n"
- f"Symbol: {token}\n"
- f"Executed: {immediately_executed_count} stop loss(es)\n"
- f"Reason: Market price already beyond trigger levels\n"
- f"Current Price: ${current_price:.4f if current_price else 'Unknown'}\n\n"
- f"🚀 Position(s) closed at market to prevent further losses"
- )
-
- else:
- # No position exists (might have been closed immediately) - cancel the stop losses
- cancelled_count = stats.cancel_linked_orders(
- parent_bot_order_ref_id=order_in_db['bot_order_ref_id'],
- new_status='cancelled_no_position'
- )
-
- if cancelled_count > 0:
- logger.info(f"❌ Cancelled {cancelled_count} pending stop losses for {symbol} - no position found")
-
- if self.notification_manager:
- await self.notification_manager.send_generic_notification(
- f"🛑 <b>Stop Losses Cancelled</b>\n\n"
- f"Symbol: {token}\n"
- f"Cancelled: {cancelled_count} stop loss(es)\n"
- f"Reason: No open position found"
- )
-
- except Exception as pos_check_error:
- logger.warning(f"Could not check position for {symbol} during SL activation: {pos_check_error}")
- # In case of error, still try to activate (safer to have redundant SLs than none)
-
- except Exception as e:
- logger.error(f"Error in _activate_pending_stop_losses: {e}", exc_info=True)
- async def _check_for_recent_fills_for_order(self, exchange_oid, order_in_db):
- """Check for very recent fills that might match this order."""
- try:
- # Get recent fills from exchange
- recent_fills = self.trading_engine.get_recent_fills()
- if not recent_fills:
- return False
- # Get last processed timestamp from database
- if not hasattr(self, '_last_processed_trade_time') or self._last_processed_trade_time is None:
- try:
- last_time_str = self.trading_engine.stats._get_metadata('last_processed_trade_time')
- if last_time_str:
- self._last_processed_trade_time = datetime.fromisoformat(last_time_str)
- logger.debug(f"Loaded last_processed_trade_time from DB: {self._last_processed_trade_time}")
- else:
- # If no last processed time, start from 1 hour ago to avoid processing too much history
- self._last_processed_trade_time = datetime.now(timezone.utc) - timedelta(hours=1)
- logger.info("No last_processed_trade_time found, setting to 1 hour ago (UTC).")
- except Exception as e:
- logger.warning(f"Could not load last_processed_trade_time from DB: {e}")
- self._last_processed_trade_time = datetime.now(timezone.utc) - timedelta(hours=1)
- # Process new fills
- for fill in recent_fills:
- try:
- # Parse fill data - CCXT format from fetch_my_trades
- trade_id = fill.get('id') # CCXT uses 'id' for trade ID
- timestamp_ms = fill.get('timestamp') # CCXT uses 'timestamp' (milliseconds)
- symbol = fill.get('symbol') # CCXT uses 'symbol' in full format like 'LTC/USDC:USDC'
- side = fill.get('side') # CCXT uses 'side' ('buy' or 'sell')
- amount = float(fill.get('amount', 0)) # CCXT uses 'amount'
- price = float(fill.get('price', 0)) # CCXT uses 'price'
-
- # Convert timestamp
- if timestamp_ms:
- timestamp_dt = datetime.fromtimestamp(timestamp_ms / 1000, tz=timezone.utc)
- else:
- timestamp_dt = datetime.now(timezone.utc)
-
- # Skip if already processed
- if timestamp_dt <= self._last_processed_trade_time:
- continue
-
- # Process as external trade if we reach here
- if symbol and side and amount > 0 and price > 0:
- # Symbol is already in full format for CCXT
- full_symbol = symbol
- token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0]
-
- # Check if this might be a bot order fill by looking for exchange order ID
- # CCXT might have this in 'info' sub-object with the raw exchange data
- exchange_order_id_from_fill = None
- if 'info' in fill and isinstance(fill['info'], dict):
- # Look for Hyperliquid order ID in the raw response
- exchange_order_id_from_fill = fill['info'].get('oid')
-
- if exchange_order_id_from_fill == exchange_oid:
- logger.info(f"✅ Found recent fill for order {exchange_oid} - NOT cancelling stop losses")
- return True
-
- except Exception as e:
- logger.error(f"Error processing fill {fill}: {e}")
- continue
-
- return False
- except Exception as e:
- logger.error(f"❌ Error checking for recent fills for order: {e}", exc_info=True)
- return False
- async def _check_external_stop_loss_orders(self):
- """Check for externally placed stop loss orders and track them."""
- try:
- # Get current open orders
- open_orders = self.trading_engine.get_orders()
- if not open_orders:
- return
-
- # Get current positions to understand what could be stop losses
- positions = self.trading_engine.get_positions()
- if not positions:
- return
-
- # Create a map of current positions
- position_map = {}
- for position in positions:
- symbol = position.get('symbol')
- contracts = float(position.get('contracts', 0))
- if symbol and contracts != 0:
- token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0]
- position_map[token] = {
- 'symbol': symbol,
- 'contracts': contracts,
- 'side': 'long' if contracts > 0 else 'short',
- 'entry_price': float(position.get('entryPx', 0))
- }
-
- # Check each order to see if it could be a stop loss
- newly_detected = 0
- for order in open_orders:
- try:
- exchange_order_id = order.get('id')
- symbol = order.get('symbol')
- side = order.get('side') # 'buy' or 'sell'
- amount = float(order.get('amount', 0))
- price = float(order.get('price', 0))
- order_type = order.get('type', '').lower()
-
- if not all([exchange_order_id, symbol, side, amount, price]):
- continue
-
- # Skip if we're already tracking this order
- if exchange_order_id in self._external_stop_loss_orders:
- continue
-
- # Check if this order could be a stop loss
- token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0]
-
- # Must have a position in this token to have a stop loss
- if token not in position_map:
- continue
-
- position = position_map[token]
-
- # Check if this order matches stop loss pattern
- is_stop_loss = False
-
- if position['side'] == 'long' and side == 'sell':
- # Long position with sell order - could be stop loss if price is below entry
- if price < position['entry_price'] * 0.98: # Allow 2% buffer for approximation
- is_stop_loss = True
-
- elif position['side'] == 'short' and side == 'buy':
- # Short position with buy order - could be stop loss if price is above entry
- if price > position['entry_price'] * 1.02: # Allow 2% buffer for approximation
- is_stop_loss = True
-
- if is_stop_loss:
- # Track this as an external stop loss order
- self._external_stop_loss_orders[exchange_order_id] = {
- 'token': token,
- 'symbol': symbol,
- 'trigger_price': price,
- 'side': side,
- 'amount': amount,
- 'position_side': position['side'],
- 'detected_at': datetime.now(timezone.utc),
- 'entry_price': position['entry_price']
- }
- newly_detected += 1
- logger.info(f"🛑 Detected external stop loss order: {token} {side.upper()} {amount} @ ${price:.2f} (protecting {position['side'].upper()} position)")
-
- except Exception as e:
- logger.error(f"Error analyzing order for stop loss detection: {e}")
- continue
-
- if newly_detected > 0:
- logger.info(f"🔍 Detected {newly_detected} new external stop loss orders")
-
- except Exception as e:
- logger.error(f"❌ Error checking external stop loss orders: {e}")
-
- async def _cleanup_external_stop_loss_tracking(self):
- """Clean up external stop loss orders that are no longer active."""
- try:
- if not self._external_stop_loss_orders:
- return
-
- # Get current open orders
- open_orders = self.trading_engine.get_orders()
- if not open_orders:
- # No open orders, clear all tracking
- removed_count = len(self._external_stop_loss_orders)
- self._external_stop_loss_orders.clear()
- if removed_count > 0:
- logger.info(f"🧹 Cleared {removed_count} external stop loss orders (no open orders)")
- return
-
- # Get set of current order IDs
- current_order_ids = {order.get('id') for order in open_orders if order.get('id')}
-
- # Remove any tracked stop loss orders that are no longer open
- to_remove = []
- for order_id, stop_loss_info in self._external_stop_loss_orders.items():
- if order_id not in current_order_ids:
- to_remove.append(order_id)
-
- for order_id in to_remove:
- stop_loss_info = self._external_stop_loss_orders[order_id]
- del self._external_stop_loss_orders[order_id]
- logger.info(f"🗑️ Removed external stop loss tracking for {stop_loss_info['token']} order {order_id} (no longer open)")
-
- if to_remove:
- logger.info(f"🧹 Cleaned up {len(to_remove)} external stop loss orders")
-
- except Exception as e:
- logger.error(f"❌ Error cleaning up external stop loss tracking: {e}")
- async def _auto_sync_orphaned_positions(self):
- """Automatically detect and sync orphaned positions (positions on exchange without trade lifecycle records)."""
- try:
- stats = self.trading_engine.get_stats()
- if not stats:
- return
- # Get current exchange positions
- exchange_positions = self.trading_engine.get_positions() or []
- synced_count = 0
- for exchange_pos in exchange_positions:
- symbol = exchange_pos.get('symbol')
- contracts = float(exchange_pos.get('contracts', 0))
-
- if symbol and abs(contracts) > 0:
- # Check if we have a trade lifecycle record for this position
- existing_trade = stats.get_trade_by_symbol_and_status(symbol, 'position_opened')
-
- if not existing_trade:
- # 🚨 ORPHANED POSITION: Auto-create trade lifecycle record
- entry_price = float(exchange_pos.get('entryPrice', 0))
- position_side = 'long' if contracts > 0 else 'short'
- order_side = 'buy' if contracts > 0 else 'sell'
- token = symbol.split('/')[0] if '/' in symbol else symbol
-
- # ✅ Use exchange data - no need to estimate!
- if entry_price > 0:
- logger.info(f"🔄 AUTO-SYNC: Orphaned position detected - {symbol} {position_side} {abs(contracts)} @ ${entry_price} (exchange data)")
- else:
- # Fallback only if exchange truly doesn't provide entry price
- entry_price = await self._estimate_entry_price_for_orphaned_position(symbol, contracts)
- logger.warning(f"🔄 AUTO-SYNC: Orphaned position detected - {symbol} {position_side} {abs(contracts)} @ ${entry_price} (estimated)")
-
- # Get additional exchange data for notification
- unrealized_pnl = float(exchange_pos.get('unrealizedPnl', 0))
- position_value = float(exchange_pos.get('notional', 0))
- liquidation_price = float(exchange_pos.get('liquidationPrice', 0))
- leverage = float(exchange_pos.get('leverage', 1))
-
- # Create trade lifecycle for external position
- lifecycle_id = stats.create_trade_lifecycle(
- symbol=symbol,
- side=order_side,
- entry_order_id=f"external_sync_{int(datetime.now().timestamp())}",
- trade_type='external'
- )
-
- if lifecycle_id:
- # Update to position_opened status
- success = stats.update_trade_position_opened(
- lifecycle_id=lifecycle_id,
- entry_price=entry_price,
- entry_amount=abs(contracts),
- exchange_fill_id=f"external_fill_{int(datetime.now().timestamp())}"
- )
-
- if success:
- synced_count += 1
- logger.info(f"✅ AUTO-SYNC: Successfully synced orphaned position for {symbol}")
-
- # Enhanced notification with exchange data
- pnl_emoji = "🟢" if unrealized_pnl >= 0 else "🔴"
- notification_text = (
- f"🔄 <b>Position Auto-Synced</b>\n\n"
- f"Token: {token}\n"
- f"Direction: {position_side.upper()}\n"
- f"Size: {abs(contracts):.6f} {token}\n"
- f"Entry Price: ${entry_price:,.4f}\n"
- f"Position Value: ${position_value:,.2f}\n"
- f"{pnl_emoji} P&L: ${unrealized_pnl:,.2f}\n"
- )
-
- if leverage > 1:
- notification_text += f"⚡ Leverage: {leverage:.1f}x\n"
- if liquidation_price > 0:
- notification_text += f"⚠️ Liquidation: ${liquidation_price:,.2f}\n"
-
- notification_text += (
- f"Reason: Position opened outside bot\n"
- f"Time: {datetime.now().strftime('%H:%M:%S')}\n\n"
- f"✅ Position now tracked in bot\n"
- f"💡 Use /sl {token} [price] to set stop loss"
- )
-
- if self.notification_manager:
- await self.notification_manager.send_generic_notification(notification_text)
- else:
- logger.error(f"❌ AUTO-SYNC: Failed to sync orphaned position for {symbol}")
- else:
- logger.error(f"❌ AUTO-SYNC: Failed to create lifecycle for orphaned position {symbol}")
- if synced_count > 0:
- logger.info(f"🔄 AUTO-SYNC: Synced {synced_count} orphaned position(s) this cycle")
- except Exception as e:
- logger.error(f"❌ Error in auto-sync orphaned positions: {e}", exc_info=True)
- async def _estimate_entry_price_for_orphaned_position(self, symbol: str, contracts: float) -> float:
- """Estimate entry price for an orphaned position by checking recent fills and market data."""
- try:
- # Method 1: Check recent fills from the exchange
- recent_fills = self.trading_engine.get_recent_fills()
- if recent_fills:
- # Look for recent fills for this symbol
- symbol_fills = [fill for fill in recent_fills if fill.get('symbol') == symbol]
-
- if symbol_fills:
- # Get the most recent fill as entry price estimate
- latest_fill = symbol_fills[0] # Assuming sorted by newest first
- fill_price = float(latest_fill.get('price', 0))
-
- if fill_price > 0:
- logger.info(f"💡 AUTO-SYNC: Found recent fill price for {symbol}: ${fill_price:.4f}")
- return fill_price
-
- # Method 2: Use current market price as fallback
- market_data = self.trading_engine.get_market_data(symbol)
- if market_data and market_data.get('ticker'):
- current_price = float(market_data['ticker'].get('last', 0))
-
- if current_price > 0:
- logger.warning(f"⚠️ AUTO-SYNC: Using current market price as entry estimate for {symbol}: ${current_price:.4f}")
- return current_price
-
- # Method 3: Last resort - try bid/ask average
- if market_data and market_data.get('ticker'):
- bid = float(market_data['ticker'].get('bid', 0))
- ask = float(market_data['ticker'].get('ask', 0))
-
- if bid > 0 and ask > 0:
- avg_price = (bid + ask) / 2
- logger.warning(f"⚠️ AUTO-SYNC: Using bid/ask average as entry estimate for {symbol}: ${avg_price:.4f}")
- return avg_price
-
- # Method 4: Absolute fallback - return a small positive value to avoid 0
- logger.error(f"❌ AUTO-SYNC: Could not estimate entry price for {symbol}, using fallback value of $1.00")
- return 1.0
-
- except Exception as e:
- logger.error(f"❌ AUTO-SYNC: Error estimating entry price for {symbol}: {e}")
- return 1.0 # Safe fallback
- async def _handle_orphaned_position(self, symbol, contracts):
- """Handle the orphaned position."""
- try:
- # This method is now deprecated in favor of _auto_sync_orphaned_positions
- # Keeping for backwards compatibility but not implementing
- logger.info(f"🧹 _handle_orphaned_position deprecated: use _auto_sync_orphaned_positions instead")
- except Exception as e:
- logger.error(f"❌ Error handling orphaned position: {e}", exc_info=True)
|