#!/usr/bin/env python3
"""
Market Monitor - Handles external trade monitoring and heartbeat functionality.
"""
import logging
import asyncio
from datetime import datetime, timedelta, timezone
from typing import Optional, Dict, Any, List
import os
import json
from src.config.config import Config
from src.monitoring.alarm_manager import AlarmManager
logger = logging.getLogger(__name__)
class MarketMonitor:
"""Handles external trade monitoring and market events."""
def __init__(self, trading_engine, notification_manager=None):
"""Initialize the market monitor."""
self.trading_engine = trading_engine
self.notification_manager = notification_manager
self.client = trading_engine.client
self._last_processed_trade_time = datetime.now(timezone.utc) - timedelta(seconds=120)
self._monitoring_active = False
# ๐ External stop loss tracking
self._external_stop_loss_orders = {} # Format: {exchange_order_id: {'token': str, 'trigger_price': float, 'side': str, 'detected_at': datetime}}
# External trade monitoring
# self.state_file = "data/market_monitor_state.json" # Removed, state now in DB
self.last_processed_trade_time: Optional[datetime] = None
# Alarm management
self.alarm_manager = AlarmManager()
# Order monitoring
self.last_known_orders = set()
self.last_known_positions = {}
self._load_state()
async def start(self):
"""Start the market monitor."""
if self._monitoring_active:
return
self._monitoring_active = True
logger.info("๐ Market monitor started")
# Initialize tracking
await self._initialize_tracking()
# Start monitoring task
self._monitor_task = asyncio.create_task(self._monitor_loop())
async def stop(self):
"""Stop the market monitor."""
if not self._monitoring_active:
return
self._monitoring_active = False
if self._monitor_task:
self._monitor_task.cancel()
try:
await self._monitor_task
except asyncio.CancelledError:
pass
self._save_state()
logger.info("๐ Market monitor stopped")
def _load_state(self):
"""Load market monitor state from SQLite DB via TradingStats."""
stats = self.trading_engine.get_stats()
if not stats:
logger.warning("โ ๏ธ TradingStats not available, cannot load MarketMonitor state.")
self.last_processed_trade_time = None
return
try:
last_time_str = stats._get_metadata('market_monitor_last_processed_trade_time')
if last_time_str:
self.last_processed_trade_time = datetime.fromisoformat(last_time_str)
# Ensure it's timezone-aware (UTC)
if self.last_processed_trade_time.tzinfo is None:
self.last_processed_trade_time = self.last_processed_trade_time.replace(tzinfo=timezone.utc)
else:
self.last_processed_trade_time = self.last_processed_trade_time.astimezone(timezone.utc)
logger.info(f"๐ Loaded MarketMonitor state from DB: last_processed_trade_time = {self.last_processed_trade_time.isoformat()}")
else:
logger.info("๐จ No MarketMonitor state (last_processed_trade_time) found in DB. Will start with fresh external trade tracking.")
self.last_processed_trade_time = None
except Exception as e:
logger.error(f"Error loading MarketMonitor state from DB: {e}. Proceeding with default state.")
self.last_processed_trade_time = None
def _save_state(self):
"""Save market monitor state to SQLite DB via TradingStats."""
stats = self.trading_engine.get_stats()
if not stats:
logger.warning("โ ๏ธ TradingStats not available, cannot save MarketMonitor state.")
return
try:
if self.last_processed_trade_time:
# Ensure timestamp is UTC before saving
lptt_utc = self.last_processed_trade_time
if lptt_utc.tzinfo is None:
lptt_utc = lptt_utc.replace(tzinfo=timezone.utc)
else:
lptt_utc = lptt_utc.astimezone(timezone.utc)
stats._set_metadata('market_monitor_last_processed_trade_time', lptt_utc.isoformat())
logger.info(f"๐พ Saved MarketMonitor state (last_processed_trade_time) to DB: {lptt_utc.isoformat()}")
else:
# If it's None, we might want to remove the key or save it as an empty string
# For now, let's assume we only save if there is a time. Or remove it.
stats._set_metadata('market_monitor_last_processed_trade_time', '') # Or handle deletion
logger.info("๐พ MarketMonitor state (last_processed_trade_time) is None, saved as empty in DB.")
except Exception as e:
logger.error(f"Error saving MarketMonitor state to DB: {e}")
async def _initialize_tracking(self):
"""Initialize order and position tracking."""
try:
# Get current open orders to initialize tracking
orders = self.trading_engine.get_orders()
if orders:
self.last_known_orders = {order.get('id') for order in orders if order.get('id')}
logger.info(f"๐ Initialized tracking with {len(self.last_known_orders)} open orders")
# Get current positions for P&L tracking
positions = self.trading_engine.get_positions()
if positions:
for position in positions:
symbol = position.get('symbol')
contracts = float(position.get('contracts', 0))
entry_price = float(position.get('entryPx', 0))
if symbol and contracts != 0:
self.last_known_positions[symbol] = {
'contracts': contracts,
'entry_price': entry_price
}
logger.info(f"๐ Initialized tracking with {len(self.last_known_positions)} positions")
except Exception as e:
logger.error(f"โ Error initializing tracking: {e}")
async def _monitor_loop(self):
"""Main monitoring loop that runs every BOT_HEARTBEAT_SECONDS."""
try:
loop_count = 0
while self._monitoring_active:
# ๐ PHASE 2: Check active trades for pending stop loss activation first (highest priority)
await self._activate_pending_stop_losses_from_active_trades()
await self._check_order_fills()
await self._check_price_alarms()
await self._check_external_trades()
await self._check_pending_triggers()
await self._check_automatic_risk_management()
await self._check_external_stop_loss_orders()
# Run orphaned stop loss cleanup every 10 heartbeats (less frequent but regular)
loop_count += 1
if loop_count % 10 == 0:
await self._cleanup_orphaned_stop_losses()
await self._cleanup_external_stop_loss_tracking()
loop_count = 0 # Reset counter to prevent overflow
await asyncio.sleep(Config.BOT_HEARTBEAT_SECONDS)
except asyncio.CancelledError:
logger.info("Market monitor loop cancelled")
raise
except Exception as e:
logger.error(f"Error in market monitor loop: {e}")
# Restart after error
if self._monitoring_active:
await asyncio.sleep(5)
await self._monitor_loop()
async def _check_order_fills(self):
"""Check for filled orders and send notifications."""
try:
# Get current orders and positions
current_orders = self.trading_engine.get_orders() or []
current_positions = self.trading_engine.get_positions() or []
# Get current order IDs
current_order_ids = {order.get('id') for order in current_orders if order.get('id')}
# Find filled orders (orders that were in last_known_orders but not in current_orders)
disappeared_order_ids = self.last_known_orders - current_order_ids
if disappeared_order_ids:
logger.info(f"๐ฏ Detected {len(disappeared_order_ids)} bot orders no longer open: {list(disappeared_order_ids)}. Corresponding fills (if any) are processed by external trade checker.")
await self._process_disappeared_orders(disappeared_order_ids)
# Update tracking data for open bot orders
self.last_known_orders = current_order_ids
# Position state is primarily managed by TradingStats based on all fills.
# This local tracking can provide supplementary logging if needed.
# await self._update_position_tracking(current_positions)
except Exception as e:
logger.error(f"โ Error checking order fills: {e}")
async def _process_filled_orders(self, filled_order_ids: set, current_positions: list):
"""Process filled orders using enhanced position tracking."""
try:
# For bot-initiated orders, we'll detect changes in position size
# and send appropriate notifications using the enhanced system
# This method will be triggered when orders placed through the bot are filled
# The external trade monitoring will handle trades made outside the bot
# Update position tracking based on current positions
await self._update_position_tracking(current_positions)
except Exception as e:
logger.error(f"โ Error processing filled orders: {e}")
async def _update_position_tracking(self, current_positions: list):
"""Update position tracking and calculate P&L changes."""
try:
new_position_map = {}
for position in current_positions:
symbol = position.get('symbol')
contracts = float(position.get('contracts', 0))
entry_price = float(position.get('entryPx', 0))
if symbol and contracts != 0:
new_position_map[symbol] = {
'contracts': contracts,
'entry_price': entry_price
}
# Compare with previous positions to detect changes
for symbol, new_data in new_position_map.items():
old_data = self.last_known_positions.get(symbol)
if not old_data:
# New position opened
logger.info(f"๐ New position detected (observed by MarketMonitor): {symbol} {new_data['contracts']} @ ${new_data['entry_price']:.4f}. TradingStats is the definitive source.")
elif abs(new_data['contracts'] - old_data['contracts']) > 0.000001:
# Position size changed
change = new_data['contracts'] - old_data['contracts']
logger.info(f"๐ Position change detected (observed by MarketMonitor): {symbol} {change:+.6f} contracts. TradingStats is the definitive source.")
# Check for closed positions
for symbol in self.last_known_positions:
if symbol not in new_position_map:
logger.info(f"๐ Position closed (observed by MarketMonitor): {symbol}. TradingStats is the definitive source.")
# Update tracking
self.last_known_positions = new_position_map
except Exception as e:
logger.error(f"โ Error updating position tracking: {e}")
async def _process_disappeared_orders(self, disappeared_order_ids: set):
"""Log and investigate bot orders that have disappeared from the exchange."""
stats = self.trading_engine.get_stats()
if not stats:
logger.warning("โ ๏ธ TradingStats not available in _process_disappeared_orders.")
return
try:
total_linked_cancelled = 0
external_cancellations = []
for exchange_oid in disappeared_order_ids:
order_in_db = stats.get_order_by_exchange_id(exchange_oid)
if order_in_db:
last_status = order_in_db.get('status', 'unknown')
order_type = order_in_db.get('type', 'unknown')
symbol = order_in_db.get('symbol', 'unknown')
token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0]
logger.info(f"Order {exchange_oid} was in our DB with status '{last_status}' but has now disappeared from exchange.")
# Check if this was an unexpected disappearance (likely external cancellation)
active_statuses = ['open', 'submitted', 'partially_filled', 'pending_submission']
if last_status in active_statuses:
logger.warning(f"โ ๏ธ EXTERNAL CANCELLATION: Order {exchange_oid} with status '{last_status}' was likely cancelled externally on Hyperliquid")
stats.update_order_status(exchange_order_id=exchange_oid, new_status='cancelled_externally')
# Track external cancellations for notification
external_cancellations.append({
'exchange_oid': exchange_oid,
'token': token,
'type': order_type,
'last_status': last_status
})
# Send notification about external cancellation
if self.notification_manager:
await self.notification_manager.send_generic_notification(
f"โ ๏ธ External Order Cancellation Detected\n\n"
f"Token: {token}\n"
f"Order Type: {order_type.replace('_', ' ').title()}\n"
f"Exchange Order ID: {exchange_oid[:8]}...
\n"
f"Previous Status: {last_status.replace('_', ' ').title()}\n"
f"Source: Cancelled directly on Hyperliquid\n"
f"Time: {datetime.now().strftime('%H:%M:%S')}\n\n"
f"๐ค Bot status updated automatically"
)
# ๐ง EDGE CASE FIX: Wait briefly before cancelling stop losses
# Sometimes orders are cancelled externally but fills come through simultaneously
logger.info(f"โณ Waiting 3 seconds to check for potential fills before cancelling stop losses for {exchange_oid}")
await asyncio.sleep(3)
# Re-check the order status after waiting - it might have been filled
order_in_db_updated = stats.get_order_by_exchange_id(exchange_oid)
if order_in_db_updated and order_in_db_updated.get('status') in ['filled', 'partially_filled']:
logger.info(f"โ
Order {exchange_oid} was filled during the wait period - NOT cancelling stop losses")
# Don't cancel stop losses - let them be activated normally
continue
# Additional check: Look for very recent fills that might match this order
recent_fill_found = await self._check_for_recent_fills_for_order(exchange_oid, order_in_db)
if recent_fill_found:
logger.info(f"โ
Found recent fill for order {exchange_oid} - NOT cancelling stop losses")
continue
else:
# Normal completion/cancellation - update status
stats.update_order_status(exchange_order_id=exchange_oid, new_status='disappeared_from_exchange')
# Cancel any pending stop losses linked to this order (only if not filled)
if order_in_db.get('bot_order_ref_id'):
# Double-check one more time that the order wasn't filled
final_order_check = stats.get_order_by_exchange_id(exchange_oid)
if final_order_check and final_order_check.get('status') in ['filled', 'partially_filled']:
logger.info(f"๐ Order {exchange_oid} was filled - preserving stop losses")
continue
cancelled_sl_count = stats.cancel_linked_orders(
parent_bot_order_ref_id=order_in_db['bot_order_ref_id'],
new_status='cancelled_parent_disappeared'
)
total_linked_cancelled += cancelled_sl_count
if cancelled_sl_count > 0:
logger.info(f"Cancelled {cancelled_sl_count} pending stop losses linked to disappeared order {exchange_oid}")
if self.notification_manager:
await self.notification_manager.send_generic_notification(
f"๐ Linked Stop Losses Cancelled\n\n"
f"Token: {token}\n"
f"Cancelled: {cancelled_sl_count} stop loss(es)\n"
f"Reason: Parent order {exchange_oid[:8]}... disappeared\n"
f"Likely Cause: External cancellation on Hyperliquid\n"
f"Time: {datetime.now().strftime('%H:%M:%S')}"
)
else:
logger.warning(f"Order {exchange_oid} disappeared from exchange but was not found in our DB. This might be an order placed externally.")
# Send summary notification if multiple external cancellations occurred
if len(external_cancellations) > 1:
tokens_affected = list(set(item['token'] for item in external_cancellations))
if self.notification_manager:
await self.notification_manager.send_generic_notification(
f"โ ๏ธ Multiple External Cancellations Detected\n\n"
f"Orders Cancelled: {len(external_cancellations)}\n"
f"Tokens Affected: {', '.join(tokens_affected)}\n"
f"Source: Direct cancellation on Hyperliquid\n"
f"Linked Stop Losses Cancelled: {total_linked_cancelled}\n"
f"Time: {datetime.now().strftime('%H:%M:%S')}\n\n"
f"๐ก Check individual orders for details"
)
except Exception as e:
logger.error(f"โ Error processing disappeared orders: {e}", exc_info=True)
async def _check_price_alarms(self):
"""Check price alarms and trigger notifications."""
try:
active_alarms = self.alarm_manager.get_all_active_alarms()
if not active_alarms:
return
# Group alarms by token to minimize API calls
tokens_to_check = list(set(alarm['token'] for alarm in active_alarms))
for token in tokens_to_check:
try:
# Get current market price
symbol = f"{token}/USDC:USDC"
market_data = self.trading_engine.get_market_data(symbol)
if not market_data or not market_data.get('ticker'):
continue
current_price = float(market_data['ticker'].get('last', 0))
if current_price <= 0:
continue
# Check alarms for this token
token_alarms = [alarm for alarm in active_alarms if alarm['token'] == token]
for alarm in token_alarms:
target_price = alarm['target_price']
direction = alarm['direction']
# Check if alarm should trigger
should_trigger = False
if direction == 'above' and current_price >= target_price:
should_trigger = True
elif direction == 'below' and current_price <= target_price:
should_trigger = True
if should_trigger:
# Trigger the alarm
triggered_alarm = self.alarm_manager.trigger_alarm(alarm['id'], current_price)
if triggered_alarm:
await self._send_alarm_notification(triggered_alarm)
except Exception as e:
logger.error(f"Error checking alarms for {token}: {e}")
except Exception as e:
logger.error(f"โ Error checking price alarms: {e}")
async def _send_alarm_notification(self, alarm: Dict[str, Any]):
"""Send notification for triggered alarm."""
try:
# Send through notification manager if available
if self.notification_manager:
await self.notification_manager.send_alarm_triggered_notification(
alarm['token'],
alarm['target_price'],
alarm['triggered_price'],
alarm['direction']
)
else:
# Fallback to logging if notification manager not available
logger.info(f"๐ ALARM TRIGGERED: {alarm['token']} @ ${alarm['triggered_price']:,.2f}")
except Exception as e:
logger.error(f"โ Error sending alarm notification: {e}")
async def _check_external_trades(self):
"""Check for trades made outside the Telegram bot and update stats."""
try:
# Get recent fills from exchange
recent_fills = self.trading_engine.get_recent_fills()
if not recent_fills:
logger.debug("No recent fills data available")
return
# Get last processed timestamp from database
if not hasattr(self, '_last_processed_trade_time') or self._last_processed_trade_time is None:
try:
last_time_str = self.trading_engine.stats._get_metadata('last_processed_trade_time')
if last_time_str:
self._last_processed_trade_time = datetime.fromisoformat(last_time_str)
logger.debug(f"Loaded last_processed_trade_time from DB: {self._last_processed_trade_time}")
else:
# If no last processed time, start from 1 hour ago to avoid processing too much history
self._last_processed_trade_time = datetime.now(timezone.utc) - timedelta(hours=1)
logger.info("No last_processed_trade_time found, setting to 1 hour ago (UTC).")
except Exception as e:
logger.warning(f"Could not load last_processed_trade_time from DB: {e}")
self._last_processed_trade_time = datetime.now(timezone.utc) - timedelta(hours=1)
# Process new fills
external_trades_processed = 0
symbols_with_fills = set() # Track symbols that had fills for stop loss activation
for fill in recent_fills:
try:
# Parse fill data - CCXT format from fetch_my_trades
trade_id = fill.get('id') # CCXT uses 'id' for trade ID
timestamp_ms = fill.get('timestamp') # CCXT uses 'timestamp' (milliseconds)
symbol = fill.get('symbol') # CCXT uses 'symbol' in full format like 'LTC/USDC:USDC'
side = fill.get('side') # CCXT uses 'side' ('buy' or 'sell')
amount = float(fill.get('amount', 0)) # CCXT uses 'amount'
price = float(fill.get('price', 0)) # CCXT uses 'price'
# Convert timestamp
if timestamp_ms:
timestamp_dt = datetime.fromtimestamp(timestamp_ms / 1000, tz=timezone.utc)
else:
timestamp_dt = datetime.now(timezone.utc)
# Skip if already processed
if timestamp_dt <= self._last_processed_trade_time:
continue
# Process as external trade if we reach here
if symbol and side and amount > 0 and price > 0:
# Symbol is already in full format for CCXT
full_symbol = symbol
token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0]
# Check if this might be a bot order fill by looking for exchange order ID
# CCXT might have this in 'info' sub-object with the raw exchange data
exchange_order_id_from_fill = None
if 'info' in fill and isinstance(fill['info'], dict):
# Look for Hyperliquid order ID in the raw response
exchange_order_id_from_fill = fill['info'].get('oid')
# ๐ Check if this fill corresponds to an external stop loss order
is_external_stop_loss = False
stop_loss_info = None
if exchange_order_id_from_fill and exchange_order_id_from_fill in self._external_stop_loss_orders:
is_external_stop_loss = True
stop_loss_info = self._external_stop_loss_orders[exchange_order_id_from_fill]
logger.info(f"๐ EXTERNAL STOP LOSS EXECUTION: {token} - Order {exchange_order_id_from_fill} filled @ ${price:.2f}")
logger.info(f"๐ Processing {'external stop loss' if is_external_stop_loss else 'external trade'}: {trade_id} - {side} {amount} {full_symbol} @ ${price:.2f}")
stats = self.trading_engine.stats
if not stats:
logger.warning("โ ๏ธ TradingStats not available in _check_external_trades.")
continue
# If this is an external stop loss execution, handle it specially
if is_external_stop_loss and stop_loss_info:
# ๐งน PHASE 3: Close active trade for stop loss execution
active_trade = stats.get_active_trade_by_symbol(full_symbol, status='active')
if active_trade:
entry_price = active_trade.get('entry_price', 0)
active_trade_side = active_trade.get('side')
# Calculate realized P&L
if active_trade_side == 'buy': # Long position
realized_pnl = amount * (price - entry_price)
else: # Short position
realized_pnl = amount * (entry_price - price)
stats.update_active_trade_closed(
active_trade['id'], realized_pnl, timestamp_dt.isoformat()
)
logger.info(f"๐ Active trade {active_trade['id']} closed via external stop loss - P&L: ${realized_pnl:.2f}")
# Send specialized stop loss execution notification
if self.notification_manager:
await self.notification_manager.send_stop_loss_execution_notification(
stop_loss_info, full_symbol, side, amount, price, 'long_closed', timestamp_dt.isoformat()
)
# Remove from tracking since it's now executed
del self._external_stop_loss_orders[exchange_order_id_from_fill]
logger.info(f"๐ Processed external stop loss execution: {side} {amount} {full_symbol} @ ${price:.2f} (long_closed)")
else:
# Handle as regular external trade
# Check if this corresponds to a bot order by exchange_order_id
linked_order_db_id = None
if exchange_order_id_from_fill:
order_in_db = stats.get_order_by_exchange_id(exchange_order_id_from_fill)
if order_in_db:
linked_order_db_id = order_in_db.get('id')
logger.info(f"๐ Linked external fill {trade_id} to bot order DB ID {linked_order_db_id} (Exchange OID: {exchange_order_id_from_fill})")
# Update order status to filled if it was open
current_status = order_in_db.get('status', '')
if current_status in ['open', 'partially_filled', 'pending_submission']:
# Determine if this is a partial or full fill
order_amount_requested = float(order_in_db.get('amount_requested', 0))
if abs(amount - order_amount_requested) < 0.000001: # Allow small floating point differences
new_status_after_fill = 'filled'
else:
new_status_after_fill = 'partially_filled'
stats.update_order_status(
order_db_id=linked_order_db_id,
new_status=new_status_after_fill
)
logger.info(f"๐ Updated bot order {linked_order_db_id} status: {current_status} โ {new_status_after_fill}")
# Check if this order is now fully filled and has pending stop losses to activate
if new_status_after_fill == 'filled':
await self._activate_pending_stop_losses(order_in_db, stats)
# ๐งน PHASE 3: Record trade simply, use active_trades for tracking
stats.record_trade(
full_symbol, side, amount, price,
exchange_fill_id=trade_id, trade_type="external",
timestamp=timestamp_dt.isoformat(),
linked_order_table_id_to_link=linked_order_db_id
)
# Derive action type from trade context for notifications
if linked_order_db_id:
# Bot order - determine action from order context
order_side = order_in_db.get('side', side).lower()
if order_side == 'buy':
action_type = 'long_opened'
elif order_side == 'sell':
action_type = 'short_opened'
else:
action_type = 'position_opened'
else:
# External trade - determine from current active trades
existing_active_trade = stats.get_active_trade_by_symbol(full_symbol, status='active')
if existing_active_trade:
# Has active position - this is likely a closure
existing_side = existing_active_trade.get('side')
if existing_side == 'buy' and side.lower() == 'sell':
action_type = 'long_closed'
elif existing_side == 'sell' and side.lower() == 'buy':
action_type = 'short_closed'
else:
action_type = 'position_modified'
else:
# No active position - this opens a new one
if side.lower() == 'buy':
action_type = 'long_opened'
else:
action_type = 'short_opened'
# ๐งน PHASE 3: Update active trades based on action
if linked_order_db_id:
# Bot order - update linked active trade
order_data = stats.get_order_by_db_id(linked_order_db_id)
if order_data:
exchange_order_id = order_data.get('exchange_order_id')
# Find active trade by entry order ID
all_active_trades = stats.get_all_active_trades()
for at in all_active_trades:
if at.get('entry_order_id') == exchange_order_id:
active_trade_id = at['id']
current_status = at['status']
if current_status == 'pending' and action_type in ['long_opened', 'short_opened']:
# Entry order filled - update active trade to active
stats.update_active_trade_opened(
active_trade_id, price, amount, timestamp_dt.isoformat()
)
logger.info(f"๐ Active trade {active_trade_id} opened via fill {trade_id}")
elif current_status == 'active' and action_type in ['long_closed', 'short_closed']:
# Exit order filled - calculate P&L and close active trade
entry_price = at.get('entry_price', 0)
active_trade_side = at.get('side')
# Calculate realized P&L
if active_trade_side == 'buy': # Long position
realized_pnl = amount * (price - entry_price)
else: # Short position
realized_pnl = amount * (entry_price - price)
stats.update_active_trade_closed(
active_trade_id, realized_pnl, timestamp_dt.isoformat()
)
logger.info(f"๐ Active trade {active_trade_id} closed via fill {trade_id} - P&L: ${realized_pnl:.2f}")
break
elif action_type in ['long_opened', 'short_opened']:
# External trade that opened a position - create external active trade
active_trade_id = stats.create_active_trade(
symbol=full_symbol,
side=side.lower(),
entry_order_id=None, # External order
trade_type='external'
)
if active_trade_id:
stats.update_active_trade_opened(
active_trade_id, price, amount, timestamp_dt.isoformat()
)
logger.info(f"๐ Created external active trade {active_trade_id} for {side.upper()} {full_symbol}")
elif action_type in ['long_closed', 'short_closed']:
# External closure - close any active trade for this symbol
active_trade = stats.get_active_trade_by_symbol(full_symbol, status='active')
if active_trade:
entry_price = active_trade.get('entry_price', 0)
active_trade_side = active_trade.get('side')
# Calculate realized P&L
if active_trade_side == 'buy': # Long position
realized_pnl = amount * (price - entry_price)
else: # Short position
realized_pnl = amount * (entry_price - price)
stats.update_active_trade_closed(
active_trade['id'], realized_pnl, timestamp_dt.isoformat()
)
logger.info(f"๐ External closure: Active trade {active_trade['id']} closed - P&L: ${realized_pnl:.2f}")
# Track symbol for potential stop loss activation
symbols_with_fills.add(token)
# Send notification for external trade
if self.notification_manager:
await self.notification_manager.send_external_trade_notification(
full_symbol, side, amount, price, action_type, timestamp_dt.isoformat()
)
logger.info(f"๐ Processed external trade: {side} {amount} {full_symbol} @ ${price:.2f} ({action_type}) using timestamp {timestamp_dt.isoformat()}")
external_trades_processed += 1
# Update last processed time
self._last_processed_trade_time = timestamp_dt
except Exception as e:
logger.error(f"Error processing fill {fill}: {e}")
continue
# Save the last processed timestamp to database
if external_trades_processed > 0:
self.trading_engine.stats._set_metadata('last_processed_trade_time', self._last_processed_trade_time.isoformat())
logger.info(f"๐พ Saved MarketMonitor state (last_processed_trade_time) to DB: {self._last_processed_trade_time.isoformat()}")
logger.info(f"๐ Processed {external_trades_processed} external trades")
# Additional check: Activate any pending stop losses for symbols that had fills
# This is a safety net for cases where the fill linking above didn't work
await self._check_pending_stop_losses_for_filled_symbols(symbols_with_fills)
except Exception as e:
logger.error(f"โ Error checking external trades: {e}", exc_info=True)
async def _check_pending_stop_losses_for_filled_symbols(self, symbols_with_fills: set):
"""
Safety net: Check if any symbols that just had fills have pending stop losses
that should be activated. This handles cases where fill linking failed.
"""
try:
if not symbols_with_fills:
return
stats = self.trading_engine.stats
if not stats:
return
# Get all pending stop losses
pending_stop_losses = stats.get_orders_by_status('pending_trigger', 'stop_limit_trigger')
if not pending_stop_losses:
return
# Check each pending stop loss to see if its symbol had fills
activated_any = False
for sl_order in pending_stop_losses:
symbol = sl_order.get('symbol', '')
token = symbol.split('/')[0] if '/' in symbol else ''
if token in symbols_with_fills:
# This symbol had fills - check if we should activate the stop loss
parent_bot_ref_id = sl_order.get('parent_bot_order_ref_id')
if parent_bot_ref_id:
# Get the parent order
parent_order = stats.get_order_by_bot_ref_id(parent_bot_ref_id)
if parent_order and parent_order.get('status') in ['filled', 'partially_filled']:
logger.info(f"๐ Safety net activation: Found pending SL for {token} with filled parent order {parent_bot_ref_id}")
await self._activate_pending_stop_losses(parent_order, stats)
activated_any = True
if activated_any:
logger.info("โ
Safety net activated pending stop losses for symbols with recent fills")
except Exception as e:
logger.error(f"Error in safety net stop loss activation: {e}", exc_info=True)
async def _check_pending_triggers(self):
"""Check and process pending conditional triggers (e.g., SL/TP)."""
stats = self.trading_engine.get_stats()
if not stats:
logger.warning("โ ๏ธ TradingStats not available in _check_pending_triggers.")
return
try:
# Fetch pending SL triggers (adjust type if TP triggers are different)
# For now, assuming 'STOP_LIMIT_TRIGGER' is the type used for SLs that become limit orders
pending_sl_triggers = stats.get_orders_by_status(status='pending_trigger', order_type_filter='stop_limit_trigger')
if not pending_sl_triggers:
return
logger.debug(f"Found {len(pending_sl_triggers)} pending SL triggers to check.")
for trigger_order in pending_sl_triggers:
symbol = trigger_order['symbol']
trigger_price = trigger_order['price'] # This is the stop price
trigger_side = trigger_order['side'] # This is the side of the SL order (e.g., sell for a long position's SL)
order_db_id = trigger_order['id']
parent_ref_id = trigger_order.get('parent_bot_order_ref_id')
if not symbol or trigger_price is None:
logger.warning(f"Invalid trigger order data for DB ID {order_db_id}, skipping: {trigger_order}")
continue
market_data = self.trading_engine.get_market_data(symbol)
if not market_data or not market_data.get('ticker'):
logger.warning(f"Could not fetch market data for {symbol} to check SL trigger {order_db_id}.")
continue
current_price = float(market_data['ticker'].get('last', 0))
if current_price <= 0:
logger.warning(f"Invalid current price ({current_price}) for {symbol} checking SL trigger {order_db_id}.")
continue
trigger_hit = False
if trigger_side.lower() == 'sell' and current_price <= trigger_price:
trigger_hit = True
logger.info(f"๐ด SL TRIGGER HIT (Sell): Order DB ID {order_db_id}, Symbol {symbol}, Trigger@ ${trigger_price:.4f}, Market@ ${current_price:.4f}")
elif trigger_side.lower() == 'buy' and current_price >= trigger_price:
trigger_hit = True
logger.info(f"๐ข SL TRIGGER HIT (Buy): Order DB ID {order_db_id}, Symbol {symbol}, Trigger@ ${trigger_price:.4f}, Market@ ${current_price:.4f}")
if trigger_hit:
logger.info(f"Attempting to execute actual stop order for triggered DB ID: {order_db_id} (Parent Bot Ref: {trigger_order.get('parent_bot_order_ref_id')})")
execution_result = await self.trading_engine.execute_triggered_stop_order(original_trigger_order_db_id=order_db_id)
notification_message_detail = ""
if execution_result.get("success"):
new_trigger_status = 'triggered_order_placed'
placed_sl_details = execution_result.get("placed_sl_order_details", {})
logger.info(f"Successfully placed actual SL order from trigger {order_db_id}. New SL Order DB ID: {placed_sl_details.get('order_db_id')}, Exchange ID: {placed_sl_details.get('exchange_order_id')}")
notification_message_detail = f"Actual SL order placed (New DB ID: {placed_sl_details.get('order_db_id', 'N/A')})."
else:
new_trigger_status = 'trigger_execution_failed'
error_msg = execution_result.get("error", "Unknown error during SL execution.")
logger.error(f"Failed to execute actual SL order from trigger {order_db_id}: {error_msg}")
notification_message_detail = f"Failed to place actual SL order: {error_msg}"
stats.update_order_status(order_db_id=order_db_id, new_status=new_trigger_status)
if self.notification_manager:
await self.notification_manager.send_generic_notification(
f"๐ Stop-Loss Update!\nSymbol: {symbol}\nSide: {trigger_side.upper()}\nTrigger Price: ${trigger_price:.4f}\nMarket Price: ${current_price:.4f}\n(Original Trigger DB ID: {order_db_id}, Parent: {parent_ref_id or 'N/A'})\nStatus: {new_trigger_status.replace('_', ' ').title()}\nDetails: {notification_message_detail}"
)
except Exception as e:
logger.error(f"โ Error checking pending SL triggers: {e}", exc_info=True)
async def _check_automatic_risk_management(self):
"""Check for automatic stop loss triggers based on Config.STOP_LOSS_PERCENTAGE as safety net."""
try:
# Skip if risk management is disabled or percentage is 0
if not getattr(Config, 'RISK_MANAGEMENT_ENABLED', True) or Config.STOP_LOSS_PERCENTAGE <= 0:
return
# Get current positions
positions = self.trading_engine.get_positions()
if not positions:
# If no positions exist, clean up any orphaned pending stop losses
await self._cleanup_orphaned_stop_losses()
return
for position in positions:
try:
symbol = position.get('symbol', '')
contracts = float(position.get('contracts', 0))
entry_price = float(position.get('entryPx', 0))
mark_price = float(position.get('markPx', 0))
unrealized_pnl = float(position.get('unrealizedPnl', 0))
# Skip if no position or missing data
if contracts == 0 or entry_price <= 0 or mark_price <= 0:
continue
# Calculate PnL percentage based on entry value
entry_value = abs(contracts) * entry_price
if entry_value <= 0:
continue
pnl_percentage = (unrealized_pnl / entry_value) * 100
# Check if loss exceeds the safety threshold
if pnl_percentage <= -Config.STOP_LOSS_PERCENTAGE:
token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0]
position_side = "LONG" if contracts > 0 else "SHORT"
logger.warning(f"๐จ AUTOMATIC STOP LOSS TRIGGERED: {token} {position_side} position has {pnl_percentage:.2f}% loss (threshold: -{Config.STOP_LOSS_PERCENTAGE}%)")
# Send notification before attempting exit
if self.notification_manager:
await self.notification_manager.send_generic_notification(
f"๐จ AUTOMATIC STOP LOSS TRIGGERED!\n"
f"Token: {token}\n"
f"Position: {position_side} {abs(contracts):.6f}\n"
f"Entry Price: ${entry_price:.4f}\n"
f"Current Price: ${mark_price:.4f}\n"
f"Unrealized PnL: ${unrealized_pnl:.2f} ({pnl_percentage:.2f}%)\n"
f"Safety Threshold: -{Config.STOP_LOSS_PERCENTAGE}%\n"
f"Action: Executing emergency exit order..."
)
# Execute emergency exit order
exit_result = await self.trading_engine.execute_exit_order(token)
if exit_result.get('success'):
logger.info(f"โ
Emergency exit order placed successfully for {token}. Order details: {exit_result.get('order_placed_details', {})}")
# Cancel any pending stop losses for this symbol since position is now closed
stats = self.trading_engine.get_stats()
if stats:
cancelled_sl_count = stats.cancel_pending_stop_losses_by_symbol(
symbol=symbol,
new_status='cancelled_auto_exit'
)
if cancelled_sl_count > 0:
logger.info(f"๐ Cancelled {cancelled_sl_count} pending stop losses for {symbol} after automatic exit")
if self.notification_manager:
await self.notification_manager.send_generic_notification(
f"โ
Emergency Exit Completed\n\n"
f"๐ Position: {token} {position_side}\n"
f"๐ Loss: {pnl_percentage:.2f}% (${unrealized_pnl:.2f})\n"
f"โ ๏ธ Threshold: -{Config.STOP_LOSS_PERCENTAGE}%\n"
f"โ
Action: Position automatically closed\n"
f"๐ฐ Exit Price: ~${mark_price:.2f}\n"
f"๐ Order ID: {exit_result.get('order_placed_details', {}).get('exchange_order_id', 'N/A')}\n"
f"{f'๐ Cleanup: Cancelled {cancelled_sl_count} pending stop losses' if cancelled_sl_count > 0 else ''}\n\n"
f"๐ก๏ธ This was an automatic safety stop triggered by the risk management system."
)
else:
error_msg = exit_result.get('error', 'Unknown error')
logger.error(f"โ Failed to execute emergency exit order for {token}: {error_msg}")
if self.notification_manager:
await self.notification_manager.send_generic_notification(
f"โ CRITICAL: Emergency Exit Failed!\n\n"
f"๐ Position: {token} {position_side}\n"
f"๐ Loss: {pnl_percentage:.2f}%\n"
f"โ Error: {error_msg}\n\n"
f"โ ๏ธ MANUAL INTERVENTION REQUIRED\n"
f"Please close this position manually via /exit {token}"
)
except Exception as pos_error:
logger.error(f"Error processing position for automatic stop loss: {pos_error}")
continue
except Exception as e:
logger.error(f"โ Error in automatic risk management check: {e}", exc_info=True)
async def _cleanup_orphaned_stop_losses(self):
"""Clean up pending stop losses that no longer have corresponding positions OR whose parent orders have been cancelled/failed."""
try:
stats = self.trading_engine.get_stats()
if not stats:
return
# Get all pending stop loss triggers
pending_stop_losses = stats.get_orders_by_status('pending_trigger', 'stop_limit_trigger')
if not pending_stop_losses:
return
logger.debug(f"Checking {len(pending_stop_losses)} pending stop losses for orphaned orders")
# Get current positions to check against
current_positions = self.trading_engine.get_positions()
position_symbols = set()
if current_positions:
for pos in current_positions:
symbol = pos.get('symbol')
contracts = float(pos.get('contracts', 0))
if symbol and contracts != 0:
position_symbols.add(symbol)
# Check each pending stop loss
orphaned_count = 0
for sl_order in pending_stop_losses:
symbol = sl_order.get('symbol')
order_db_id = sl_order.get('id')
parent_bot_ref_id = sl_order.get('parent_bot_order_ref_id')
should_cancel = False
cancel_reason = ""
# Check if parent order exists and its status
if parent_bot_ref_id:
parent_order = stats.get_order_by_bot_ref_id(parent_bot_ref_id)
if parent_order:
parent_status = parent_order.get('status', '').lower()
# Cancel if parent order failed, was cancelled, or disappeared
if parent_status in ['failed_submission', 'failed_submission_no_data', 'cancelled_manually',
'cancelled_externally', 'disappeared_from_exchange']:
should_cancel = True
cancel_reason = f"parent order {parent_status}"
elif parent_status == 'filled':
# Parent order filled but no position - position might have been closed externally
if symbol not in position_symbols:
should_cancel = True
cancel_reason = "parent filled but position no longer exists"
# If parent is still 'open', 'submitted', or 'partially_filled', keep the stop loss
else:
# Parent order not found in DB - this is truly orphaned
should_cancel = True
cancel_reason = "parent order not found in database"
else:
# No parent reference - fallback to old logic (position-based check)
if symbol not in position_symbols:
should_cancel = True
cancel_reason = "no position exists and no parent reference"
if should_cancel:
# Cancel this orphaned stop loss
success = stats.update_order_status(
order_db_id=order_db_id,
new_status='cancelled_orphaned_no_position'
)
if success:
orphaned_count += 1
token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0]
logger.info(f"๐งน Cancelled orphaned stop loss for {token} (Order DB ID: {order_db_id}) - {cancel_reason}")
if orphaned_count > 0:
logger.info(f"๐งน Cleanup completed: Cancelled {orphaned_count} orphaned stop loss orders")
if self.notification_manager:
await self.notification_manager.send_generic_notification(
f"๐งน Cleanup Completed\n\n"
f"Cancelled {orphaned_count} orphaned stop loss order(s)\n"
f"Reason: Parent orders cancelled/failed or positions closed externally\n"
f"Time: {datetime.now().strftime('%H:%M:%S')}\n\n"
f"๐ก This automatic cleanup ensures stop losses stay synchronized with actual orders and positions."
)
except Exception as e:
logger.error(f"โ Error cleaning up orphaned stop losses: {e}", exc_info=True)
async def _activate_pending_stop_losses_from_trades(self):
"""๐ PHASE 4: Check trades table for pending stop loss activation first (highest priority)"""
try:
stats = self.trading_engine.get_stats()
if not stats:
return
# Get open positions that need stop loss activation
trades_needing_sl = stats.get_pending_stop_loss_activations()
if not trades_needing_sl:
return
logger.debug(f"๐ Found {len(trades_needing_sl)} open positions needing stop loss activation")
for position_trade in trades_needing_sl:
try:
symbol = position_trade['symbol']
token = symbol.split('/')[0] if '/' in symbol else symbol
stop_loss_price = position_trade['stop_loss_price']
position_side = position_trade['position_side']
current_amount = position_trade.get('current_position_size', 0)
lifecycle_id = position_trade['trade_lifecycle_id']
# Get current market price
current_price = None
try:
market_data = self.trading_engine.get_market_data(symbol)
if market_data and market_data.get('ticker'):
current_price = float(market_data['ticker'].get('last', 0))
except Exception as price_error:
logger.warning(f"Could not fetch current price for {symbol}: {price_error}")
# Determine stop loss side based on position side
sl_side = 'sell' if position_side == 'long' else 'buy' # Long SL = sell, Short SL = buy
# Check if trigger condition is already met
trigger_already_hit = False
trigger_reason = ""
if current_price and current_price > 0:
if sl_side == 'sell' and current_price <= stop_loss_price:
# LONG position stop loss - price below trigger
trigger_already_hit = True
trigger_reason = f"LONG SL: Current ${current_price:.4f} โค Stop ${stop_loss_price:.4f}"
elif sl_side == 'buy' and current_price >= stop_loss_price:
# SHORT position stop loss - price above trigger
trigger_already_hit = True
trigger_reason = f"SHORT SL: Current ${current_price:.4f} โฅ Stop ${stop_loss_price:.4f}"
if trigger_already_hit:
# Execute immediate market close
logger.warning(f"๐จ IMMEDIATE SL EXECUTION (Trades Table): {token} - {trigger_reason}")
try:
exit_result = await self.trading_engine.execute_exit_order(token)
if exit_result.get('success'):
logger.info(f"โ
Immediate {position_side.upper()} SL execution successful for {token}")
if self.notification_manager:
await self.notification_manager.send_generic_notification(
f"๐จ Immediate Stop Loss Execution\n\n"
f"๐ Source: Unified Trades Table (Phase 4)\n"
f"Token: {token}\n"
f"Position Type: {position_side.upper()}\n"
f"SL Trigger Price: ${stop_loss_price:.4f}\n"
f"Current Market Price: ${current_price:.4f}\n"
f"Trigger Logic: {trigger_reason}\n"
f"Action: Market close order placed immediately\n"
f"Order ID: {exit_result.get('order_placed_details', {}).get('exchange_order_id', 'N/A')}\n"
f"Lifecycle ID: {lifecycle_id[:8]}\n"
f"Time: {datetime.now().strftime('%H:%M:%S')}\n\n"
f"โก Single source of truth prevents missed stop losses"
)
else:
logger.error(f"โ Failed to execute immediate SL for {token}: {exit_result.get('error')}")
except Exception as exec_error:
logger.error(f"โ Exception during immediate SL execution for {token}: {exec_error}")
else:
# Normal activation - place stop loss order
try:
sl_result = await self.trading_engine.execute_stop_loss_order(token, stop_loss_price)
if sl_result.get('success'):
sl_order_id = sl_result.get('order_placed_details', {}).get('exchange_order_id')
# Link the stop loss order to the trade lifecycle
stats.link_stop_loss_to_trade(lifecycle_id, sl_order_id, stop_loss_price)
logger.info(f"โ
Activated {position_side.upper()} stop loss for {token}: ${stop_loss_price:.4f}")
if self.notification_manager:
await self.notification_manager.send_generic_notification(
f"๐ Stop Loss Activated\n\n"
f"๐ Source: Unified Trades Table (Phase 4)\n"
f"Token: {token}\n"
f"Position Type: {position_side.upper()}\n"
f"Stop Loss Price: ${stop_loss_price:.4f}\n"
f"Current Price: ${current_price:.4f if current_price else 'Unknown'}\n"
f"Order ID: {sl_order_id or 'N/A'}\n"
f"Lifecycle ID: {lifecycle_id[:8]}\n"
f"Time: {datetime.now().strftime('%H:%M:%S')}\n\n"
f"๐ก๏ธ Your position is now protected"
)
else:
logger.error(f"โ Failed to activate SL for {token}: {sl_result.get('error')}")
except Exception as activation_error:
logger.error(f"โ Exception during SL activation for {token}: {activation_error}")
except Exception as trade_error:
logger.error(f"โ Error processing position trade for SL activation: {trade_error}")
except Exception as e:
logger.error(f"โ Error activating pending stop losses from trades table: {e}", exc_info=True)
async def _activate_pending_stop_losses(self, order_in_db, stats):
"""Activate pending stop losses for a filled order, checking current price for immediate execution."""
try:
# Fetch pending stop losses for this order
pending_stop_losses = stats.get_orders_by_status('pending_trigger', 'stop_limit_trigger', order_in_db['bot_order_ref_id'])
if not pending_stop_losses:
return
symbol = order_in_db.get('symbol')
token = symbol.split('/')[0] if '/' in symbol and symbol else 'Unknown'
logger.debug(f"Found {len(pending_stop_losses)} pending stop loss(es) for filled order {order_in_db.get('exchange_order_id', 'N/A')}")
# Get current market price for the symbol
current_price = None
try:
market_data = self.trading_engine.get_market_data(symbol)
if market_data and market_data.get('ticker'):
current_price = float(market_data['ticker'].get('last', 0))
if current_price <= 0:
current_price = None
except Exception as price_error:
logger.warning(f"Could not fetch current price for {symbol}: {price_error}")
current_price = None
# Check if we still have a position for this symbol after the fill
if symbol:
# Try to get current position
try:
position = self.trading_engine.find_position(token)
if position and float(position.get('contracts', 0)) != 0:
# Position exists - check each stop loss
activated_count = 0
immediately_executed_count = 0
for sl_order in pending_stop_losses:
sl_trigger_price = float(sl_order.get('price', 0))
sl_side = sl_order.get('side', '').lower() # 'sell' for long SL, 'buy' for short SL
sl_db_id = sl_order.get('id')
sl_amount = sl_order.get('amount_requested', 0)
if not sl_trigger_price or not sl_side or not sl_db_id:
logger.warning(f"Invalid stop loss data for DB ID {sl_db_id}, skipping")
continue
# Check if trigger condition is already met
trigger_already_hit = False
trigger_reason = ""
if current_price and current_price > 0:
if sl_side == 'sell' and current_price <= sl_trigger_price:
# LONG position stop loss - price has fallen below trigger
# Long SL = SELL order that triggers when price drops below stop price
trigger_already_hit = True
trigger_reason = f"LONG SL: Current price ${current_price:.4f} โค Stop price ${sl_trigger_price:.4f}"
elif sl_side == 'buy' and current_price >= sl_trigger_price:
# SHORT position stop loss - price has risen above trigger
# Short SL = BUY order that triggers when price rises above stop price
trigger_already_hit = True
trigger_reason = f"SHORT SL: Current price ${current_price:.4f} โฅ Stop price ${sl_trigger_price:.4f}"
if trigger_already_hit:
# Execute immediate market close instead of activating stop loss
logger.warning(f"๐จ IMMEDIATE SL EXECUTION: {token} - {trigger_reason}")
# Update the stop loss status to show it was immediately executed
stats.update_order_status(order_db_id=sl_db_id, new_status='immediately_executed_on_activation')
# Execute market order to close position
try:
exit_result = await self.trading_engine.execute_exit_order(token)
if exit_result.get('success'):
immediately_executed_count += 1
position_side = "LONG" if sl_side == 'sell' else "SHORT"
logger.info(f"โ
Immediate {position_side} SL execution successful for {token}. Market order placed: {exit_result.get('order_placed_details', {}).get('exchange_order_id', 'N/A')}")
if self.notification_manager:
await self.notification_manager.send_generic_notification(
f"๐จ Immediate Stop Loss Execution\n\n"
f"Token: {token}\n"
f"Position Type: {position_side}\n"
f"SL Trigger Price: ${sl_trigger_price:.4f}\n"
f"Current Market Price: ${current_price:.4f}\n"
f"Trigger Logic: {trigger_reason}\n"
f"Action: Market close order placed immediately\n"
f"Reason: Trigger condition already met when activating\n"
f"Order ID: {exit_result.get('order_placed_details', {}).get('exchange_order_id', 'N/A')}\n"
f"Time: {datetime.now().strftime('%H:%M:%S')}\n\n"
f"โก This prevents waiting for a trigger that's already passed"
)
else:
logger.error(f"โ Failed to execute immediate SL for {token}: {exit_result.get('error', 'Unknown error')}")
if self.notification_manager:
await self.notification_manager.send_generic_notification(
f"โ Immediate SL Execution Failed\n\n"
f"Token: {token}\n"
f"SL Price: ${sl_trigger_price:.4f}\n"
f"Current Price: ${current_price:.4f}\n"
f"Trigger Logic: {trigger_reason}\n"
f"Error: {exit_result.get('error', 'Unknown error')}\n\n"
f"โ ๏ธ Manual intervention may be required"
)
# Revert status since execution failed
stats.update_order_status(order_db_id=sl_db_id, new_status='activation_execution_failed')
except Exception as exec_error:
logger.error(f"โ Exception during immediate SL execution for {token}: {exec_error}")
stats.update_order_status(order_db_id=sl_db_id, new_status='activation_execution_error')
else:
# Normal activation - trigger condition not yet met
activated_count += 1
position_side = "LONG" if sl_side == 'sell' else "SHORT"
logger.info(f"โ
Activating {position_side} stop loss for {token}: SL price ${sl_trigger_price:.4f} (Current: ${current_price:.4f if current_price else 'Unknown'})")
# Send summary notification for normal activations
if activated_count > 0 and self.notification_manager:
await self.notification_manager.send_generic_notification(
f"๐ Stop Losses Activated\n\n"
f"Symbol: {token}\n"
f"Activated: {activated_count} stop loss(es)\n"
f"Current Price: ${current_price:.4f if current_price else 'Unknown'}\n"
f"Status: Monitoring for trigger conditions"
f"{f'\\n\\nโก Additionally executed {immediately_executed_count} stop loss(es) immediately due to current market conditions' if immediately_executed_count > 0 else ''}"
)
elif immediately_executed_count > 0 and activated_count == 0:
# All stop losses were immediately executed
if self.notification_manager:
await self.notification_manager.send_generic_notification(
f"โก All Stop Losses Executed Immediately\n\n"
f"Symbol: {token}\n"
f"Executed: {immediately_executed_count} stop loss(es)\n"
f"Reason: Market price already beyond trigger levels\n"
f"Current Price: ${current_price:.4f if current_price else 'Unknown'}\n\n"
f"๐ Position(s) closed at market to prevent further losses"
)
else:
# No position exists (might have been closed immediately) - cancel the stop losses
cancelled_count = stats.cancel_linked_orders(
parent_bot_order_ref_id=order_in_db['bot_order_ref_id'],
new_status='cancelled_no_position'
)
if cancelled_count > 0:
logger.info(f"โ Cancelled {cancelled_count} pending stop losses for {symbol} - no position found")
if self.notification_manager:
await self.notification_manager.send_generic_notification(
f"๐ Stop Losses Cancelled\n\n"
f"Symbol: {token}\n"
f"Cancelled: {cancelled_count} stop loss(es)\n"
f"Reason: No open position found"
)
except Exception as pos_check_error:
logger.warning(f"Could not check position for {symbol} during SL activation: {pos_check_error}")
# In case of error, still try to activate (safer to have redundant SLs than none)
except Exception as e:
logger.error(f"Error in _activate_pending_stop_losses: {e}", exc_info=True)
async def _check_for_recent_fills_for_order(self, exchange_oid, order_in_db):
"""Check for very recent fills that might match this order."""
try:
# Get recent fills from exchange
recent_fills = self.trading_engine.get_recent_fills()
if not recent_fills:
return False
# Get last processed timestamp from database
if not hasattr(self, '_last_processed_trade_time') or self._last_processed_trade_time is None:
try:
last_time_str = self.trading_engine.stats._get_metadata('last_processed_trade_time')
if last_time_str:
self._last_processed_trade_time = datetime.fromisoformat(last_time_str)
logger.debug(f"Loaded last_processed_trade_time from DB: {self._last_processed_trade_time}")
else:
# If no last processed time, start from 1 hour ago to avoid processing too much history
self._last_processed_trade_time = datetime.now(timezone.utc) - timedelta(hours=1)
logger.info("No last_processed_trade_time found, setting to 1 hour ago (UTC).")
except Exception as e:
logger.warning(f"Could not load last_processed_trade_time from DB: {e}")
self._last_processed_trade_time = datetime.now(timezone.utc) - timedelta(hours=1)
# Process new fills
for fill in recent_fills:
try:
# Parse fill data - CCXT format from fetch_my_trades
trade_id = fill.get('id') # CCXT uses 'id' for trade ID
timestamp_ms = fill.get('timestamp') # CCXT uses 'timestamp' (milliseconds)
symbol = fill.get('symbol') # CCXT uses 'symbol' in full format like 'LTC/USDC:USDC'
side = fill.get('side') # CCXT uses 'side' ('buy' or 'sell')
amount = float(fill.get('amount', 0)) # CCXT uses 'amount'
price = float(fill.get('price', 0)) # CCXT uses 'price'
# Convert timestamp
if timestamp_ms:
timestamp_dt = datetime.fromtimestamp(timestamp_ms / 1000, tz=timezone.utc)
else:
timestamp_dt = datetime.now(timezone.utc)
# Skip if already processed
if timestamp_dt <= self._last_processed_trade_time:
continue
# Process as external trade if we reach here
if symbol and side and amount > 0 and price > 0:
# Symbol is already in full format for CCXT
full_symbol = symbol
token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0]
# Check if this might be a bot order fill by looking for exchange order ID
# CCXT might have this in 'info' sub-object with the raw exchange data
exchange_order_id_from_fill = None
if 'info' in fill and isinstance(fill['info'], dict):
# Look for Hyperliquid order ID in the raw response
exchange_order_id_from_fill = fill['info'].get('oid')
if exchange_order_id_from_fill == exchange_oid:
logger.info(f"โ
Found recent fill for order {exchange_oid} - NOT cancelling stop losses")
return True
except Exception as e:
logger.error(f"Error processing fill {fill}: {e}")
continue
return False
except Exception as e:
logger.error(f"โ Error checking for recent fills for order: {e}", exc_info=True)
return False
async def _check_external_stop_loss_orders(self):
"""Check for externally placed stop loss orders and track them."""
try:
# Get current open orders
open_orders = self.trading_engine.get_orders()
if not open_orders:
return
# Get current positions to understand what could be stop losses
positions = self.trading_engine.get_positions()
if not positions:
return
# Create a map of current positions
position_map = {}
for position in positions:
symbol = position.get('symbol')
contracts = float(position.get('contracts', 0))
if symbol and contracts != 0:
token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0]
position_map[token] = {
'symbol': symbol,
'contracts': contracts,
'side': 'long' if contracts > 0 else 'short',
'entry_price': float(position.get('entryPx', 0))
}
# Check each order to see if it could be a stop loss
newly_detected = 0
for order in open_orders:
try:
exchange_order_id = order.get('id')
symbol = order.get('symbol')
side = order.get('side') # 'buy' or 'sell'
amount = float(order.get('amount', 0))
price = float(order.get('price', 0))
order_type = order.get('type', '').lower()
if not all([exchange_order_id, symbol, side, amount, price]):
continue
# Skip if we're already tracking this order
if exchange_order_id in self._external_stop_loss_orders:
continue
# Check if this order could be a stop loss
token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0]
# Must have a position in this token to have a stop loss
if token not in position_map:
continue
position = position_map[token]
# Check if this order matches stop loss pattern
is_stop_loss = False
if position['side'] == 'long' and side == 'sell':
# Long position with sell order - could be stop loss if price is below entry
if price < position['entry_price'] * 0.98: # Allow 2% buffer for approximation
is_stop_loss = True
elif position['side'] == 'short' and side == 'buy':
# Short position with buy order - could be stop loss if price is above entry
if price > position['entry_price'] * 1.02: # Allow 2% buffer for approximation
is_stop_loss = True
if is_stop_loss:
# Track this as an external stop loss order
self._external_stop_loss_orders[exchange_order_id] = {
'token': token,
'symbol': symbol,
'trigger_price': price,
'side': side,
'amount': amount,
'position_side': position['side'],
'detected_at': datetime.now(timezone.utc),
'entry_price': position['entry_price']
}
newly_detected += 1
logger.info(f"๐ Detected external stop loss order: {token} {side.upper()} {amount} @ ${price:.2f} (protecting {position['side'].upper()} position)")
except Exception as e:
logger.error(f"Error analyzing order for stop loss detection: {e}")
continue
if newly_detected > 0:
logger.info(f"๐ Detected {newly_detected} new external stop loss orders")
except Exception as e:
logger.error(f"โ Error checking external stop loss orders: {e}")
async def _cleanup_external_stop_loss_tracking(self):
"""Clean up external stop loss orders that are no longer active."""
try:
if not self._external_stop_loss_orders:
return
# Get current open orders
open_orders = self.trading_engine.get_orders()
if not open_orders:
# No open orders, clear all tracking
removed_count = len(self._external_stop_loss_orders)
self._external_stop_loss_orders.clear()
if removed_count > 0:
logger.info(f"๐งน Cleared {removed_count} external stop loss orders (no open orders)")
return
# Get set of current order IDs
current_order_ids = {order.get('id') for order in open_orders if order.get('id')}
# Remove any tracked stop loss orders that are no longer open
to_remove = []
for order_id, stop_loss_info in self._external_stop_loss_orders.items():
if order_id not in current_order_ids:
to_remove.append(order_id)
for order_id in to_remove:
stop_loss_info = self._external_stop_loss_orders[order_id]
del self._external_stop_loss_orders[order_id]
logger.info(f"๐๏ธ Removed external stop loss tracking for {stop_loss_info['token']} order {order_id} (no longer open)")
if to_remove:
logger.info(f"๐งน Cleaned up {len(to_remove)} external stop loss orders")
except Exception as e:
logger.error(f"โ Error cleaning up external stop loss tracking: {e}")