#!/usr/bin/env python3 """ Trade Lifecycle Manager for Trading Statistics Handles trade lifecycle management, position tracking, and market data updates. """ import logging from datetime import datetime, timezone, timedelta from typing import Dict, List, Any, Optional import uuid from src.utils.token_display_formatter import get_formatter logger = logging.getLogger(__name__) class TradeLifecycleManager: """Manages trade lifecycle operations in the trading statistics database.""" def __init__(self, db_manager): """Initialize with database manager.""" self.db = db_manager def create_trade_lifecycle(self, symbol: str, side: str, entry_order_id: Optional[str] = None, entry_bot_order_ref_id: Optional[str] = None, stop_loss_price: Optional[float] = None, take_profit_price: Optional[float] = None, trade_type: str = 'manual') -> Optional[str]: """Create a new trade lifecycle. Returns lifecycle_id or None on failure.""" try: lifecycle_id = str(uuid.uuid4()) # Main lifecycle record in 'trades' table query = """ INSERT INTO trades ( symbol, side, amount, price, value, trade_type, timestamp, status, trade_lifecycle_id, position_side, entry_order_id, stop_loss_price, take_profit_price, updated_at ) VALUES (?, ?, 0, 0, 0, ?, ?, 'pending', ?, 'flat', ?, ?, ?, ?) """ timestamp = datetime.now(timezone.utc).isoformat() params = (symbol, side.lower(), trade_type, timestamp, lifecycle_id, entry_order_id, stop_loss_price, take_profit_price, timestamp) self.db._execute_query(query, params) logger.info(f"๐Ÿ“Š Created trade lifecycle {lifecycle_id}: {side.upper()} {symbol} (pending for exch_id: {entry_order_id or 'N/A'})") # If SL price is provided, create a conceptual pending SL order if stop_loss_price is not None and entry_bot_order_ref_id is not None: sl_order_side = 'sell' if side.lower() == 'buy' else 'buy' conceptual_sl_bot_ref_id = f"pending_sl_activation_{entry_bot_order_ref_id}" # This would need access to order manager, so we'll delegate this # back to the main TradingStats class or pass order_manager as dependency logger.info(f"๐Ÿ’ก SL price {stop_loss_price} set for lifecycle {lifecycle_id} - will activate after entry fill") return lifecycle_id except Exception as e: logger.error(f"โŒ Error creating trade lifecycle: {e}") return None def update_trade_position_opened(self, lifecycle_id: str, entry_price: float, entry_amount: float, exchange_fill_id: str) -> bool: """Update trade when position is opened (entry order filled).""" try: query = """ UPDATE trades SET status = 'position_opened', amount = ?, price = ?, value = ?, entry_price = ?, current_position_size = ?, position_side = CASE WHEN side = 'buy' THEN 'long' WHEN side = 'sell' THEN 'short' ELSE position_side END, exchange_fill_id = ?, position_opened_at = ?, updated_at = ? WHERE trade_lifecycle_id = ? AND status = 'pending' """ timestamp = datetime.now(timezone.utc).isoformat() value = entry_amount * entry_price params = (entry_amount, entry_price, value, entry_price, entry_amount, exchange_fill_id, timestamp, timestamp, lifecycle_id) self.db._execute_query(query, params) formatter = get_formatter() trade_info = self.get_trade_by_lifecycle_id(lifecycle_id) symbol_for_formatting = trade_info.get('symbol', 'UNKNOWN_SYMBOL') if trade_info else 'UNKNOWN_SYMBOL' base_asset_for_amount = symbol_for_formatting.split('/')[0] if '/' in symbol_for_formatting else symbol_for_formatting logger.info(f"๐Ÿ“ˆ Trade lifecycle {lifecycle_id} position opened: {formatter.format_amount(entry_amount, base_asset_for_amount)} {symbol_for_formatting} @ {formatter.format_price(entry_price, symbol_for_formatting)}") return True except Exception as e: logger.error(f"โŒ Error updating trade position opened: {e}") return False def update_trade_position_closed(self, lifecycle_id: str, exit_price: float, realized_pnl: float, exchange_fill_id: str) -> bool: """Update trade when position is fully closed.""" try: query = """ UPDATE trades SET status = 'position_closed', current_position_size = 0, position_side = 'flat', realized_pnl = ?, position_closed_at = ?, updated_at = ? WHERE trade_lifecycle_id = ? AND status = 'position_opened' """ timestamp = datetime.now(timezone.utc).isoformat() params = (realized_pnl, timestamp, timestamp, lifecycle_id) self.db._execute_query(query, params) formatter = get_formatter() pnl_emoji = "๐ŸŸข" if realized_pnl >= 0 else "๐Ÿ”ด" logger.info(f"{pnl_emoji} Trade lifecycle {lifecycle_id} position closed: P&L {formatter.format_price_with_symbol(realized_pnl)}") return True except Exception as e: logger.error(f"โŒ Error updating trade position closed: {e}") return False def update_trade_cancelled(self, lifecycle_id: str, reason: str = "order_cancelled") -> bool: """Update trade when entry order is cancelled (never opened).""" try: query = """ UPDATE trades SET status = 'cancelled', notes = ?, updated_at = ? WHERE trade_lifecycle_id = ? AND status = 'pending' """ timestamp = datetime.now(timezone.utc).isoformat() params = (f"Cancelled: {reason}", timestamp, lifecycle_id) self.db._execute_query(query, params) logger.info(f"โŒ Trade lifecycle {lifecycle_id} cancelled: {reason}") return True except Exception as e: logger.error(f"โŒ Error updating trade cancelled: {e}") return False def link_stop_loss_to_trade(self, lifecycle_id: str, stop_loss_order_id: str, stop_loss_price: float) -> bool: """Link a stop loss order to a trade lifecycle.""" try: query = """ UPDATE trades SET stop_loss_order_id = ?, stop_loss_price = ?, updated_at = ? WHERE trade_lifecycle_id = ? AND status = 'position_opened' """ timestamp = datetime.now(timezone.utc).isoformat() params = (stop_loss_order_id, stop_loss_price, timestamp, lifecycle_id) self.db._execute_query(query, params) formatter = get_formatter() trade_info = self.get_trade_by_lifecycle_id(lifecycle_id) symbol_for_formatting = trade_info.get('symbol', 'UNKNOWN_SYMBOL') if trade_info else 'UNKNOWN_SYMBOL' logger.info(f"๐Ÿ›‘ Linked stop loss order {stop_loss_order_id} ({formatter.format_price(stop_loss_price, symbol_for_formatting)}) to trade {lifecycle_id}") return True except Exception as e: logger.error(f"โŒ Error linking stop loss to trade: {e}") return False def link_take_profit_to_trade(self, lifecycle_id: str, take_profit_order_id: str, take_profit_price: float) -> bool: """Link a take profit order to a trade lifecycle.""" try: query = """ UPDATE trades SET take_profit_order_id = ?, take_profit_price = ?, updated_at = ? WHERE trade_lifecycle_id = ? AND status = 'position_opened' """ timestamp = datetime.now(timezone.utc).isoformat() params = (take_profit_order_id, take_profit_price, timestamp, lifecycle_id) self.db._execute_query(query, params) formatter = get_formatter() trade_info = self.get_trade_by_lifecycle_id(lifecycle_id) symbol_for_formatting = trade_info.get('symbol', 'UNKNOWN_SYMBOL') if trade_info else 'UNKNOWN_SYMBOL' logger.info(f"๐ŸŽฏ Linked take profit order {take_profit_order_id} ({formatter.format_price(take_profit_price, symbol_for_formatting)}) to trade {lifecycle_id}") return True except Exception as e: logger.error(f"โŒ Error linking take profit to trade: {e}") return False def get_trade_by_lifecycle_id(self, lifecycle_id: str) -> Optional[Dict[str, Any]]: """Get trade by lifecycle ID.""" query = "SELECT * FROM trades WHERE trade_lifecycle_id = ?" return self.db._fetchone_query(query, (lifecycle_id,)) def get_trade_by_symbol_and_status(self, symbol: str, status: str = 'position_opened') -> Optional[Dict[str, Any]]: """Get trade by symbol and status.""" query = "SELECT * FROM trades WHERE symbol = ? AND status = ? ORDER BY updated_at DESC LIMIT 1" return self.db._fetchone_query(query, (symbol, status)) def get_open_positions(self, symbol: Optional[str] = None) -> List[Dict[str, Any]]: """Get all open positions, optionally filtered by symbol.""" if symbol: query = "SELECT * FROM trades WHERE status = 'position_opened' AND symbol = ? ORDER BY position_opened_at DESC" return self.db._fetch_query(query, (symbol,)) else: query = "SELECT * FROM trades WHERE status = 'position_opened' ORDER BY position_opened_at DESC" return self.db._fetch_query(query) def get_trades_by_status(self, status: str, limit: int = 50) -> List[Dict[str, Any]]: """Get trades by status.""" query = "SELECT * FROM trades WHERE status = ? ORDER BY updated_at DESC LIMIT ?" return self.db._fetch_query(query, (status, limit)) def get_lifecycle_by_entry_order_id(self, entry_exchange_order_id: str, status: Optional[str] = None) -> Optional[Dict[str, Any]]: """Get a trade lifecycle by its entry_order_id (exchange ID) and optionally by status.""" if status: query = "SELECT * FROM trades WHERE entry_order_id = ? AND status = ? LIMIT 1" params = (entry_exchange_order_id, status) else: query = "SELECT * FROM trades WHERE entry_order_id = ? LIMIT 1" params = (entry_exchange_order_id,) return self.db._fetchone_query(query, params) def get_lifecycle_by_sl_order_id(self, sl_exchange_order_id: str, status: str = 'position_opened') -> Optional[Dict[str, Any]]: """Get an active trade lifecycle by its stop_loss_order_id (exchange ID).""" query = "SELECT * FROM trades WHERE stop_loss_order_id = ? AND status = ? LIMIT 1" return self.db._fetchone_query(query, (sl_exchange_order_id, status)) def get_lifecycle_by_tp_order_id(self, tp_exchange_order_id: str, status: str = 'position_opened') -> Optional[Dict[str, Any]]: """Get an active trade lifecycle by its take_profit_order_id (exchange ID).""" query = "SELECT * FROM trades WHERE take_profit_order_id = ? AND status = ? LIMIT 1" return self.db._fetchone_query(query, (tp_exchange_order_id, status)) def get_pending_stop_loss_activations(self) -> List[Dict[str, Any]]: """Get open positions that need stop loss activation.""" query = """ SELECT * FROM trades WHERE status = 'position_opened' AND stop_loss_price IS NOT NULL AND stop_loss_order_id IS NULL ORDER BY updated_at ASC """ return self.db._fetch_query(query) def cleanup_old_cancelled_trades(self, days_old: int = 7) -> int: """Clean up old cancelled trades (optional - for housekeeping).""" try: cutoff_date = (datetime.now(timezone.utc) - timedelta(days=days_old)).isoformat() # Count before deletion count_query = """ SELECT COUNT(*) as count FROM trades WHERE status = 'cancelled' AND updated_at < ? """ count_result = self.db._fetchone_query(count_query, (cutoff_date,)) count_to_delete = count_result['count'] if count_result else 0 if count_to_delete > 0: delete_query = """ DELETE FROM trades WHERE status = 'cancelled' AND updated_at < ? """ self.db._execute_query(delete_query, (cutoff_date,)) logger.info(f"๐Ÿงน Cleaned up {count_to_delete} old cancelled trades (older than {days_old} days)") return count_to_delete except Exception as e: logger.error(f"โŒ Error cleaning up old cancelled trades: {e}") return 0 def confirm_position_with_exchange(self, symbol: str, exchange_position_size: float, exchange_open_orders: List[Dict]) -> bool: """Confirm position status with exchange before updating status.""" try: # Get current trade status current_trade = self.get_trade_by_symbol_and_status(symbol, 'position_opened') if not current_trade: return True # No open position to confirm lifecycle_id = current_trade['trade_lifecycle_id'] has_open_orders = len([o for o in exchange_open_orders if o.get('symbol') == symbol]) > 0 # Only close position if exchange confirms no position AND no pending orders if abs(exchange_position_size) < 1e-8 and not has_open_orders: # Calculate realized P&L based on position side entry_price_db = current_trade['entry_price'] estimated_pnl = current_trade.get('realized_pnl', 0) success = self.update_trade_position_closed( lifecycle_id, entry_price_db, # Using entry price as estimate since position is confirmed closed estimated_pnl, "exchange_confirmed_closed" ) if success: logger.info(f"โœ… Confirmed position closed for {symbol} with exchange") return success return True # Position still exists on exchange, no update needed except Exception as e: logger.error(f"โŒ Error confirming position with exchange: {e}") return False def update_trade_market_data(self, trade_lifecycle_id: str, unrealized_pnl: Optional[float] = None, mark_price: Optional[float] = None, current_position_size: Optional[float] = None, entry_price: Optional[float] = None, liquidation_price: Optional[float] = None, margin_used: Optional[float] = None, leverage: Optional[float] = None, position_value: Optional[float] = None, unrealized_pnl_percentage: Optional[float] = None) -> bool: """Update market-related data for an open trade lifecycle.""" try: updates = [] params = [] if unrealized_pnl is not None: updates.append("unrealized_pnl = ?") params.append(unrealized_pnl) if mark_price is not None: updates.append("mark_price = ?") params.append(mark_price) if current_position_size is not None: updates.append("current_position_size = ?") params.append(current_position_size) if entry_price is not None: updates.append("entry_price = ?") params.append(entry_price) if liquidation_price is not None: updates.append("liquidation_price = ?") params.append(liquidation_price) if margin_used is not None: updates.append("margin_used = ?") params.append(margin_used) if leverage is not None: updates.append("leverage = ?") params.append(leverage) if position_value is not None: updates.append("position_value = ?") params.append(position_value) if unrealized_pnl_percentage is not None: updates.append("unrealized_pnl_percentage = ?") params.append(unrealized_pnl_percentage) if not updates: logger.debug(f"No market data fields provided to update for lifecycle {trade_lifecycle_id}.") return True timestamp = datetime.now(timezone.utc).isoformat() updates.append("updated_at = ?") params.append(timestamp) set_clause = ", ".join(updates) query = f""" UPDATE trades SET {set_clause} WHERE trade_lifecycle_id = ? AND status = 'position_opened' """ params.append(trade_lifecycle_id) cursor = self.db.conn.cursor() cursor.execute(query, tuple(params)) self.db.conn.commit() updated_rows = cursor.rowcount if updated_rows > 0: logger.debug(f"๐Ÿ’น Updated market data for lifecycle {trade_lifecycle_id}") return True else: return False except Exception as e: logger.error(f"โŒ Error updating market data for trade lifecycle {trade_lifecycle_id}: {e}") return False def get_recent_trades(self, limit: int = 10) -> List[Dict[str, Any]]: """Get recent trades (these are active/open trades, as completed ones are migrated).""" return self.db._fetch_query("SELECT * FROM trades WHERE status = 'position_opened' ORDER BY updated_at DESC LIMIT ?", (limit,)) def get_all_trades(self) -> List[Dict[str, Any]]: """Fetch all trades from the database, ordered by timestamp.""" return self.db._fetch_query("SELECT * FROM trades ORDER BY timestamp ASC")