123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405 |
- #!/usr/bin/env python3
- """
- Trade Lifecycle Manager for Trading Statistics
- Handles trade lifecycle management, position tracking, and market data updates.
- """
- import logging
- from datetime import datetime, timezone, timedelta
- from typing import Dict, List, Any, Optional
- import uuid
- from src.utils.token_display_formatter import get_formatter
- logger = logging.getLogger(__name__)
- class TradeLifecycleManager:
- """Manages trade lifecycle operations in the trading statistics database."""
- def __init__(self, db_manager):
- """Initialize with database manager."""
- self.db = db_manager
- def create_trade_lifecycle(self, symbol: str, side: str, entry_order_id: Optional[str] = None,
- entry_bot_order_ref_id: Optional[str] = None,
- stop_loss_price: Optional[float] = None,
- take_profit_price: Optional[float] = None,
- trade_type: str = 'manual') -> Optional[str]:
- """Create a new trade lifecycle. Returns lifecycle_id or None on failure."""
- try:
- lifecycle_id = str(uuid.uuid4())
-
- # Main lifecycle record in 'trades' table
- query = """
- INSERT INTO trades (
- symbol, side, amount, price, value, trade_type, timestamp,
- status, trade_lifecycle_id, position_side, entry_order_id,
- stop_loss_price, take_profit_price, updated_at
- ) VALUES (?, ?, 0, 0, 0, ?, ?, 'pending', ?, 'flat', ?, ?, ?, ?)
- """
- timestamp = datetime.now(timezone.utc).isoformat()
- params = (symbol, side.lower(), trade_type, timestamp, lifecycle_id,
- entry_order_id, stop_loss_price, take_profit_price, timestamp)
-
- self.db._execute_query(query, params)
- logger.info(f"📊 Created trade lifecycle {lifecycle_id}: {side.upper()} {symbol} (pending for exch_id: {entry_order_id or 'N/A'})")
- # If SL price is provided, create a conceptual pending SL order
- if stop_loss_price is not None and entry_bot_order_ref_id is not None:
- sl_order_side = 'sell' if side.lower() == 'buy' else 'buy'
- conceptual_sl_bot_ref_id = f"pending_sl_activation_{entry_bot_order_ref_id}"
-
- # This would need access to order manager, so we'll delegate this
- # back to the main TradingStats class or pass order_manager as dependency
- logger.info(f"💡 SL price {stop_loss_price} set for lifecycle {lifecycle_id} - will activate after entry fill")
- return lifecycle_id
-
- except Exception as e:
- logger.error(f"❌ Error creating trade lifecycle: {e}")
- return None
-
- def update_trade_position_opened(self, lifecycle_id: str, entry_price: float,
- entry_amount: float, exchange_fill_id: str) -> bool:
- """Update trade when position is opened (entry order filled)."""
- try:
- query = """
- UPDATE trades
- SET status = 'position_opened',
- amount = ?,
- price = ?,
- value = ?,
- entry_price = ?,
- current_position_size = ?,
- position_side = CASE
- WHEN side = 'buy' THEN 'long'
- WHEN side = 'sell' THEN 'short'
- ELSE position_side
- END,
- exchange_fill_id = ?,
- position_opened_at = ?,
- updated_at = ?
- WHERE trade_lifecycle_id = ? AND status = 'pending'
- """
- timestamp = datetime.now(timezone.utc).isoformat()
- value = entry_amount * entry_price
- params = (entry_amount, entry_price, value, entry_price, entry_amount,
- exchange_fill_id, timestamp, timestamp, lifecycle_id)
-
- self.db._execute_query(query, params)
-
- formatter = get_formatter()
- trade_info = self.get_trade_by_lifecycle_id(lifecycle_id)
- symbol_for_formatting = trade_info.get('symbol', 'UNKNOWN_SYMBOL') if trade_info else 'UNKNOWN_SYMBOL'
- base_asset_for_amount = symbol_for_formatting.split('/')[0] if '/' in symbol_for_formatting else symbol_for_formatting
- logger.info(f"📈 Trade lifecycle {lifecycle_id} position opened: {formatter.format_amount(entry_amount, base_asset_for_amount)} {symbol_for_formatting} @ {formatter.format_price(entry_price, symbol_for_formatting)}")
- return True
-
- except Exception as e:
- logger.error(f"❌ Error updating trade position opened: {e}")
- return False
-
- def update_trade_position_closed(self, lifecycle_id: str, exit_price: float,
- realized_pnl: float, exchange_fill_id: str) -> bool:
- """Update trade when position is fully closed."""
- try:
- query = """
- UPDATE trades
- SET status = 'position_closed',
- current_position_size = 0,
- position_side = 'flat',
- realized_pnl = ?,
- position_closed_at = ?,
- updated_at = ?
- WHERE trade_lifecycle_id = ? AND status = 'position_opened'
- """
- timestamp = datetime.now(timezone.utc).isoformat()
- params = (realized_pnl, timestamp, timestamp, lifecycle_id)
-
- self.db._execute_query(query, params)
-
- formatter = get_formatter()
- pnl_emoji = "🟢" if realized_pnl >= 0 else "🔴"
- logger.info(f"{pnl_emoji} Trade lifecycle {lifecycle_id} position closed: P&L {formatter.format_price_with_symbol(realized_pnl)}")
- return True
-
- except Exception as e:
- logger.error(f"❌ Error updating trade position closed: {e}")
- return False
-
- def update_trade_cancelled(self, lifecycle_id: str, reason: str = "order_cancelled") -> bool:
- """Update trade when entry order is cancelled (never opened)."""
- try:
- query = """
- UPDATE trades
- SET status = 'cancelled',
- notes = ?,
- updated_at = ?
- WHERE trade_lifecycle_id = ? AND status = 'pending'
- """
- timestamp = datetime.now(timezone.utc).isoformat()
- params = (f"Cancelled: {reason}", timestamp, lifecycle_id)
-
- self.db._execute_query(query, params)
-
- logger.info(f"❌ Trade lifecycle {lifecycle_id} cancelled: {reason}")
- return True
-
- except Exception as e:
- logger.error(f"❌ Error updating trade cancelled: {e}")
- return False
-
- def link_stop_loss_to_trade(self, lifecycle_id: str, stop_loss_order_id: str,
- stop_loss_price: float) -> bool:
- """Link a stop loss order to a trade lifecycle."""
- try:
- query = """
- UPDATE trades
- SET stop_loss_order_id = ?,
- stop_loss_price = ?,
- updated_at = ?
- WHERE trade_lifecycle_id = ? AND status = 'position_opened'
- """
- timestamp = datetime.now(timezone.utc).isoformat()
- params = (stop_loss_order_id, stop_loss_price, timestamp, lifecycle_id)
-
- self.db._execute_query(query, params)
-
- formatter = get_formatter()
- trade_info = self.get_trade_by_lifecycle_id(lifecycle_id)
- symbol_for_formatting = trade_info.get('symbol', 'UNKNOWN_SYMBOL') if trade_info else 'UNKNOWN_SYMBOL'
- logger.info(f"🛑 Linked stop loss order {stop_loss_order_id} ({formatter.format_price(stop_loss_price, symbol_for_formatting)}) to trade {lifecycle_id}")
- return True
-
- except Exception as e:
- logger.error(f"❌ Error linking stop loss to trade: {e}")
- return False
-
- def link_take_profit_to_trade(self, lifecycle_id: str, take_profit_order_id: str,
- take_profit_price: float) -> bool:
- """Link a take profit order to a trade lifecycle."""
- try:
- query = """
- UPDATE trades
- SET take_profit_order_id = ?,
- take_profit_price = ?,
- updated_at = ?
- WHERE trade_lifecycle_id = ? AND status = 'position_opened'
- """
- timestamp = datetime.now(timezone.utc).isoformat()
- params = (take_profit_order_id, take_profit_price, timestamp, lifecycle_id)
-
- self.db._execute_query(query, params)
-
- formatter = get_formatter()
- trade_info = self.get_trade_by_lifecycle_id(lifecycle_id)
- symbol_for_formatting = trade_info.get('symbol', 'UNKNOWN_SYMBOL') if trade_info else 'UNKNOWN_SYMBOL'
- logger.info(f"🎯 Linked take profit order {take_profit_order_id} ({formatter.format_price(take_profit_price, symbol_for_formatting)}) to trade {lifecycle_id}")
- return True
-
- except Exception as e:
- logger.error(f"❌ Error linking take profit to trade: {e}")
- return False
-
- def get_trade_by_lifecycle_id(self, lifecycle_id: str) -> Optional[Dict[str, Any]]:
- """Get trade by lifecycle ID."""
- query = "SELECT * FROM trades WHERE trade_lifecycle_id = ?"
- return self.db._fetchone_query(query, (lifecycle_id,))
-
- def get_trade_by_symbol_and_status(self, symbol: str, status: str = 'position_opened') -> Optional[Dict[str, Any]]:
- """Get trade by symbol and status."""
- query = "SELECT * FROM trades WHERE symbol = ? AND status = ? ORDER BY updated_at DESC LIMIT 1"
- return self.db._fetchone_query(query, (symbol, status))
-
- def get_open_positions(self, symbol: Optional[str] = None) -> List[Dict[str, Any]]:
- """Get all open positions, optionally filtered by symbol."""
- if symbol:
- query = "SELECT * FROM trades WHERE status = 'position_opened' AND symbol = ? ORDER BY position_opened_at DESC"
- return self.db._fetch_query(query, (symbol,))
- else:
- query = "SELECT * FROM trades WHERE status = 'position_opened' ORDER BY position_opened_at DESC"
- return self.db._fetch_query(query)
-
- def get_trades_by_status(self, status: str, limit: int = 50) -> List[Dict[str, Any]]:
- """Get trades by status."""
- query = "SELECT * FROM trades WHERE status = ? ORDER BY updated_at DESC LIMIT ?"
- return self.db._fetch_query(query, (status, limit))
-
- def get_lifecycle_by_entry_order_id(self, entry_exchange_order_id: str, status: Optional[str] = None) -> Optional[Dict[str, Any]]:
- """Get a trade lifecycle by its entry_order_id (exchange ID) and optionally by status."""
- if status:
- query = "SELECT * FROM trades WHERE entry_order_id = ? AND status = ? LIMIT 1"
- params = (entry_exchange_order_id, status)
- else:
- query = "SELECT * FROM trades WHERE entry_order_id = ? LIMIT 1"
- params = (entry_exchange_order_id,)
- return self.db._fetchone_query(query, params)
- def get_lifecycle_by_sl_order_id(self, sl_exchange_order_id: str, status: str = 'position_opened') -> Optional[Dict[str, Any]]:
- """Get an active trade lifecycle by its stop_loss_order_id (exchange ID)."""
- query = "SELECT * FROM trades WHERE stop_loss_order_id = ? AND status = ? LIMIT 1"
- return self.db._fetchone_query(query, (sl_exchange_order_id, status))
- def get_lifecycle_by_tp_order_id(self, tp_exchange_order_id: str, status: str = 'position_opened') -> Optional[Dict[str, Any]]:
- """Get an active trade lifecycle by its take_profit_order_id (exchange ID)."""
- query = "SELECT * FROM trades WHERE take_profit_order_id = ? AND status = ? LIMIT 1"
- return self.db._fetchone_query(query, (tp_exchange_order_id, status))
-
- def get_pending_stop_loss_activations(self) -> List[Dict[str, Any]]:
- """Get open positions that need stop loss activation."""
- query = """
- SELECT * FROM trades
- WHERE status = 'position_opened'
- AND stop_loss_price IS NOT NULL
- AND stop_loss_order_id IS NULL
- ORDER BY updated_at ASC
- """
- return self.db._fetch_query(query)
-
- def cleanup_old_cancelled_trades(self, days_old: int = 7) -> int:
- """Clean up old cancelled trades (optional - for housekeeping)."""
- try:
- cutoff_date = (datetime.now(timezone.utc) - timedelta(days=days_old)).isoformat()
-
- # Count before deletion
- count_query = """
- SELECT COUNT(*) as count FROM trades
- WHERE status = 'cancelled' AND updated_at < ?
- """
- count_result = self.db._fetchone_query(count_query, (cutoff_date,))
- count_to_delete = count_result['count'] if count_result else 0
-
- if count_to_delete > 0:
- delete_query = """
- DELETE FROM trades
- WHERE status = 'cancelled' AND updated_at < ?
- """
- self.db._execute_query(delete_query, (cutoff_date,))
- logger.info(f"🧹 Cleaned up {count_to_delete} old cancelled trades (older than {days_old} days)")
-
- return count_to_delete
-
- except Exception as e:
- logger.error(f"❌ Error cleaning up old cancelled trades: {e}")
- return 0
-
- def confirm_position_with_exchange(self, symbol: str, exchange_position_size: float,
- exchange_open_orders: List[Dict]) -> bool:
- """Confirm position status with exchange before updating status."""
- try:
- # Get current trade status
- current_trade = self.get_trade_by_symbol_and_status(symbol, 'position_opened')
-
- if not current_trade:
- return True # No open position to confirm
-
- lifecycle_id = current_trade['trade_lifecycle_id']
- has_open_orders = len([o for o in exchange_open_orders if o.get('symbol') == symbol]) > 0
-
- # Only close position if exchange confirms no position AND no pending orders
- if abs(exchange_position_size) < 1e-8 and not has_open_orders:
- # Calculate realized P&L based on position side
- entry_price_db = current_trade['entry_price']
- estimated_pnl = current_trade.get('realized_pnl', 0)
-
- success = self.update_trade_position_closed(
- lifecycle_id,
- entry_price_db, # Using entry price as estimate since position is confirmed closed
- estimated_pnl,
- "exchange_confirmed_closed"
- )
-
- if success:
- logger.info(f"✅ Confirmed position closed for {symbol} with exchange")
-
- return success
-
- return True # Position still exists on exchange, no update needed
-
- except Exception as e:
- logger.error(f"❌ Error confirming position with exchange: {e}")
- return False
- def update_trade_market_data(self,
- trade_lifecycle_id: str,
- unrealized_pnl: Optional[float] = None,
- mark_price: Optional[float] = None,
- current_position_size: Optional[float] = None,
- entry_price: Optional[float] = None,
- liquidation_price: Optional[float] = None,
- margin_used: Optional[float] = None,
- leverage: Optional[float] = None,
- position_value: Optional[float] = None,
- unrealized_pnl_percentage: Optional[float] = None) -> bool:
- """Update market-related data for an open trade lifecycle."""
- try:
- updates = []
- params = []
-
- if unrealized_pnl is not None:
- updates.append("unrealized_pnl = ?")
- params.append(unrealized_pnl)
- if mark_price is not None:
- updates.append("mark_price = ?")
- params.append(mark_price)
- if current_position_size is not None:
- updates.append("current_position_size = ?")
- params.append(current_position_size)
- if entry_price is not None:
- updates.append("entry_price = ?")
- params.append(entry_price)
- if liquidation_price is not None:
- updates.append("liquidation_price = ?")
- params.append(liquidation_price)
- if margin_used is not None:
- updates.append("margin_used = ?")
- params.append(margin_used)
- if leverage is not None:
- updates.append("leverage = ?")
- params.append(leverage)
- if position_value is not None:
- updates.append("position_value = ?")
- params.append(position_value)
- if unrealized_pnl_percentage is not None:
- updates.append("unrealized_pnl_percentage = ?")
- params.append(unrealized_pnl_percentage)
- if not updates:
- logger.debug(f"No market data fields provided to update for lifecycle {trade_lifecycle_id}.")
- return True
- timestamp = datetime.now(timezone.utc).isoformat()
- updates.append("updated_at = ?")
- params.append(timestamp)
- set_clause = ", ".join(updates)
- query = f"""
- UPDATE trades
- SET {set_clause}
- WHERE trade_lifecycle_id = ? AND status = 'position_opened'
- """
- params.append(trade_lifecycle_id)
-
- cursor = self.db.conn.cursor()
- cursor.execute(query, tuple(params))
- self.db.conn.commit()
- updated_rows = cursor.rowcount
- if updated_rows > 0:
- logger.debug(f"💹 Updated market data for lifecycle {trade_lifecycle_id}")
- return True
- else:
- return False
- except Exception as e:
- logger.error(f"❌ Error updating market data for trade lifecycle {trade_lifecycle_id}: {e}")
- return False
- def get_recent_trades(self, limit: int = 10) -> List[Dict[str, Any]]:
- """Get recent trades (these are active/open trades, as completed ones are migrated)."""
- return self.db._fetch_query("SELECT * FROM trades WHERE status = 'position_opened' ORDER BY updated_at DESC LIMIT ?", (limit,))
- def get_all_trades(self) -> List[Dict[str, Any]]:
- """Fetch all trades from the database, ordered by timestamp."""
- return self.db._fetch_query("SELECT * FROM trades ORDER BY timestamp ASC")
|