trade_lifecycle_manager.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405
  1. #!/usr/bin/env python3
  2. """
  3. Trade Lifecycle Manager for Trading Statistics
  4. Handles trade lifecycle management, position tracking, and market data updates.
  5. """
  6. import logging
  7. from datetime import datetime, timezone, timedelta
  8. from typing import Dict, List, Any, Optional
  9. import uuid
  10. from src.utils.token_display_formatter import get_formatter
  11. logger = logging.getLogger(__name__)
  12. class TradeLifecycleManager:
  13. """Manages trade lifecycle operations in the trading statistics database."""
  14. def __init__(self, db_manager):
  15. """Initialize with database manager."""
  16. self.db = db_manager
  17. def create_trade_lifecycle(self, symbol: str, side: str, entry_order_id: Optional[str] = None,
  18. entry_bot_order_ref_id: Optional[str] = None,
  19. stop_loss_price: Optional[float] = None,
  20. take_profit_price: Optional[float] = None,
  21. trade_type: str = 'manual') -> Optional[str]:
  22. """Create a new trade lifecycle. Returns lifecycle_id or None on failure."""
  23. try:
  24. lifecycle_id = str(uuid.uuid4())
  25. # Main lifecycle record in 'trades' table
  26. query = """
  27. INSERT INTO trades (
  28. symbol, side, amount, price, value, trade_type, timestamp,
  29. status, trade_lifecycle_id, position_side, entry_order_id,
  30. stop_loss_price, take_profit_price, updated_at
  31. ) VALUES (?, ?, 0, 0, 0, ?, ?, 'pending', ?, 'flat', ?, ?, ?, ?)
  32. """
  33. timestamp = datetime.now(timezone.utc).isoformat()
  34. params = (symbol, side.lower(), trade_type, timestamp, lifecycle_id,
  35. entry_order_id, stop_loss_price, take_profit_price, timestamp)
  36. self.db._execute_query(query, params)
  37. logger.info(f"📊 Created trade lifecycle {lifecycle_id}: {side.upper()} {symbol} (pending for exch_id: {entry_order_id or 'N/A'})")
  38. # If SL price is provided, create a conceptual pending SL order
  39. if stop_loss_price is not None and entry_bot_order_ref_id is not None:
  40. sl_order_side = 'sell' if side.lower() == 'buy' else 'buy'
  41. conceptual_sl_bot_ref_id = f"pending_sl_activation_{entry_bot_order_ref_id}"
  42. # This would need access to order manager, so we'll delegate this
  43. # back to the main TradingStats class or pass order_manager as dependency
  44. logger.info(f"💡 SL price {stop_loss_price} set for lifecycle {lifecycle_id} - will activate after entry fill")
  45. return lifecycle_id
  46. except Exception as e:
  47. logger.error(f"❌ Error creating trade lifecycle: {e}")
  48. return None
  49. def update_trade_position_opened(self, lifecycle_id: str, entry_price: float,
  50. entry_amount: float, exchange_fill_id: str) -> bool:
  51. """Update trade when position is opened (entry order filled)."""
  52. try:
  53. query = """
  54. UPDATE trades
  55. SET status = 'position_opened',
  56. amount = ?,
  57. price = ?,
  58. value = ?,
  59. entry_price = ?,
  60. current_position_size = ?,
  61. position_side = CASE
  62. WHEN side = 'buy' THEN 'long'
  63. WHEN side = 'sell' THEN 'short'
  64. ELSE position_side
  65. END,
  66. exchange_fill_id = ?,
  67. position_opened_at = ?,
  68. updated_at = ?
  69. WHERE trade_lifecycle_id = ? AND status = 'pending'
  70. """
  71. timestamp = datetime.now(timezone.utc).isoformat()
  72. value = entry_amount * entry_price
  73. params = (entry_amount, entry_price, value, entry_price, entry_amount,
  74. exchange_fill_id, timestamp, timestamp, lifecycle_id)
  75. self.db._execute_query(query, params)
  76. formatter = get_formatter()
  77. trade_info = self.get_trade_by_lifecycle_id(lifecycle_id)
  78. symbol_for_formatting = trade_info.get('symbol', 'UNKNOWN_SYMBOL') if trade_info else 'UNKNOWN_SYMBOL'
  79. base_asset_for_amount = symbol_for_formatting.split('/')[0] if '/' in symbol_for_formatting else symbol_for_formatting
  80. logger.info(f"📈 Trade lifecycle {lifecycle_id} position opened: {formatter.format_amount(entry_amount, base_asset_for_amount)} {symbol_for_formatting} @ {formatter.format_price(entry_price, symbol_for_formatting)}")
  81. return True
  82. except Exception as e:
  83. logger.error(f"❌ Error updating trade position opened: {e}")
  84. return False
  85. def update_trade_position_closed(self, lifecycle_id: str, exit_price: float,
  86. realized_pnl: float, exchange_fill_id: str) -> bool:
  87. """Update trade when position is fully closed."""
  88. try:
  89. query = """
  90. UPDATE trades
  91. SET status = 'position_closed',
  92. current_position_size = 0,
  93. position_side = 'flat',
  94. realized_pnl = ?,
  95. position_closed_at = ?,
  96. updated_at = ?
  97. WHERE trade_lifecycle_id = ? AND status = 'position_opened'
  98. """
  99. timestamp = datetime.now(timezone.utc).isoformat()
  100. params = (realized_pnl, timestamp, timestamp, lifecycle_id)
  101. self.db._execute_query(query, params)
  102. formatter = get_formatter()
  103. pnl_emoji = "🟢" if realized_pnl >= 0 else "🔴"
  104. logger.info(f"{pnl_emoji} Trade lifecycle {lifecycle_id} position closed: P&L {formatter.format_price_with_symbol(realized_pnl)}")
  105. return True
  106. except Exception as e:
  107. logger.error(f"❌ Error updating trade position closed: {e}")
  108. return False
  109. def update_trade_cancelled(self, lifecycle_id: str, reason: str = "order_cancelled") -> bool:
  110. """Update trade when entry order is cancelled (never opened)."""
  111. try:
  112. query = """
  113. UPDATE trades
  114. SET status = 'cancelled',
  115. notes = ?,
  116. updated_at = ?
  117. WHERE trade_lifecycle_id = ? AND status = 'pending'
  118. """
  119. timestamp = datetime.now(timezone.utc).isoformat()
  120. params = (f"Cancelled: {reason}", timestamp, lifecycle_id)
  121. self.db._execute_query(query, params)
  122. logger.info(f"❌ Trade lifecycle {lifecycle_id} cancelled: {reason}")
  123. return True
  124. except Exception as e:
  125. logger.error(f"❌ Error updating trade cancelled: {e}")
  126. return False
  127. def link_stop_loss_to_trade(self, lifecycle_id: str, stop_loss_order_id: str,
  128. stop_loss_price: float) -> bool:
  129. """Link a stop loss order to a trade lifecycle."""
  130. try:
  131. query = """
  132. UPDATE trades
  133. SET stop_loss_order_id = ?,
  134. stop_loss_price = ?,
  135. updated_at = ?
  136. WHERE trade_lifecycle_id = ? AND status = 'position_opened'
  137. """
  138. timestamp = datetime.now(timezone.utc).isoformat()
  139. params = (stop_loss_order_id, stop_loss_price, timestamp, lifecycle_id)
  140. self.db._execute_query(query, params)
  141. formatter = get_formatter()
  142. trade_info = self.get_trade_by_lifecycle_id(lifecycle_id)
  143. symbol_for_formatting = trade_info.get('symbol', 'UNKNOWN_SYMBOL') if trade_info else 'UNKNOWN_SYMBOL'
  144. logger.info(f"🛑 Linked stop loss order {stop_loss_order_id} ({formatter.format_price(stop_loss_price, symbol_for_formatting)}) to trade {lifecycle_id}")
  145. return True
  146. except Exception as e:
  147. logger.error(f"❌ Error linking stop loss to trade: {e}")
  148. return False
  149. def link_take_profit_to_trade(self, lifecycle_id: str, take_profit_order_id: str,
  150. take_profit_price: float) -> bool:
  151. """Link a take profit order to a trade lifecycle."""
  152. try:
  153. query = """
  154. UPDATE trades
  155. SET take_profit_order_id = ?,
  156. take_profit_price = ?,
  157. updated_at = ?
  158. WHERE trade_lifecycle_id = ? AND status = 'position_opened'
  159. """
  160. timestamp = datetime.now(timezone.utc).isoformat()
  161. params = (take_profit_order_id, take_profit_price, timestamp, lifecycle_id)
  162. self.db._execute_query(query, params)
  163. formatter = get_formatter()
  164. trade_info = self.get_trade_by_lifecycle_id(lifecycle_id)
  165. symbol_for_formatting = trade_info.get('symbol', 'UNKNOWN_SYMBOL') if trade_info else 'UNKNOWN_SYMBOL'
  166. logger.info(f"🎯 Linked take profit order {take_profit_order_id} ({formatter.format_price(take_profit_price, symbol_for_formatting)}) to trade {lifecycle_id}")
  167. return True
  168. except Exception as e:
  169. logger.error(f"❌ Error linking take profit to trade: {e}")
  170. return False
  171. def get_trade_by_lifecycle_id(self, lifecycle_id: str) -> Optional[Dict[str, Any]]:
  172. """Get trade by lifecycle ID."""
  173. query = "SELECT * FROM trades WHERE trade_lifecycle_id = ?"
  174. return self.db._fetchone_query(query, (lifecycle_id,))
  175. def get_trade_by_symbol_and_status(self, symbol: str, status: str = 'position_opened') -> Optional[Dict[str, Any]]:
  176. """Get trade by symbol and status."""
  177. query = "SELECT * FROM trades WHERE symbol = ? AND status = ? ORDER BY updated_at DESC LIMIT 1"
  178. return self.db._fetchone_query(query, (symbol, status))
  179. def get_open_positions(self, symbol: Optional[str] = None) -> List[Dict[str, Any]]:
  180. """Get all open positions, optionally filtered by symbol."""
  181. if symbol:
  182. query = "SELECT * FROM trades WHERE status = 'position_opened' AND symbol = ? ORDER BY position_opened_at DESC"
  183. return self.db._fetch_query(query, (symbol,))
  184. else:
  185. query = "SELECT * FROM trades WHERE status = 'position_opened' ORDER BY position_opened_at DESC"
  186. return self.db._fetch_query(query)
  187. def get_trades_by_status(self, status: str, limit: int = 50) -> List[Dict[str, Any]]:
  188. """Get trades by status."""
  189. query = "SELECT * FROM trades WHERE status = ? ORDER BY updated_at DESC LIMIT ?"
  190. return self.db._fetch_query(query, (status, limit))
  191. def get_lifecycle_by_entry_order_id(self, entry_exchange_order_id: str, status: Optional[str] = None) -> Optional[Dict[str, Any]]:
  192. """Get a trade lifecycle by its entry_order_id (exchange ID) and optionally by status."""
  193. if status:
  194. query = "SELECT * FROM trades WHERE entry_order_id = ? AND status = ? LIMIT 1"
  195. params = (entry_exchange_order_id, status)
  196. else:
  197. query = "SELECT * FROM trades WHERE entry_order_id = ? LIMIT 1"
  198. params = (entry_exchange_order_id,)
  199. return self.db._fetchone_query(query, params)
  200. def get_lifecycle_by_sl_order_id(self, sl_exchange_order_id: str, status: str = 'position_opened') -> Optional[Dict[str, Any]]:
  201. """Get an active trade lifecycle by its stop_loss_order_id (exchange ID)."""
  202. query = "SELECT * FROM trades WHERE stop_loss_order_id = ? AND status = ? LIMIT 1"
  203. return self.db._fetchone_query(query, (sl_exchange_order_id, status))
  204. def get_lifecycle_by_tp_order_id(self, tp_exchange_order_id: str, status: str = 'position_opened') -> Optional[Dict[str, Any]]:
  205. """Get an active trade lifecycle by its take_profit_order_id (exchange ID)."""
  206. query = "SELECT * FROM trades WHERE take_profit_order_id = ? AND status = ? LIMIT 1"
  207. return self.db._fetchone_query(query, (tp_exchange_order_id, status))
  208. def get_pending_stop_loss_activations(self) -> List[Dict[str, Any]]:
  209. """Get open positions that need stop loss activation."""
  210. query = """
  211. SELECT * FROM trades
  212. WHERE status = 'position_opened'
  213. AND stop_loss_price IS NOT NULL
  214. AND stop_loss_order_id IS NULL
  215. ORDER BY updated_at ASC
  216. """
  217. return self.db._fetch_query(query)
  218. def cleanup_old_cancelled_trades(self, days_old: int = 7) -> int:
  219. """Clean up old cancelled trades (optional - for housekeeping)."""
  220. try:
  221. cutoff_date = (datetime.now(timezone.utc) - timedelta(days=days_old)).isoformat()
  222. # Count before deletion
  223. count_query = """
  224. SELECT COUNT(*) as count FROM trades
  225. WHERE status = 'cancelled' AND updated_at < ?
  226. """
  227. count_result = self.db._fetchone_query(count_query, (cutoff_date,))
  228. count_to_delete = count_result['count'] if count_result else 0
  229. if count_to_delete > 0:
  230. delete_query = """
  231. DELETE FROM trades
  232. WHERE status = 'cancelled' AND updated_at < ?
  233. """
  234. self.db._execute_query(delete_query, (cutoff_date,))
  235. logger.info(f"🧹 Cleaned up {count_to_delete} old cancelled trades (older than {days_old} days)")
  236. return count_to_delete
  237. except Exception as e:
  238. logger.error(f"❌ Error cleaning up old cancelled trades: {e}")
  239. return 0
  240. def confirm_position_with_exchange(self, symbol: str, exchange_position_size: float,
  241. exchange_open_orders: List[Dict]) -> bool:
  242. """Confirm position status with exchange before updating status."""
  243. try:
  244. # Get current trade status
  245. current_trade = self.get_trade_by_symbol_and_status(symbol, 'position_opened')
  246. if not current_trade:
  247. return True # No open position to confirm
  248. lifecycle_id = current_trade['trade_lifecycle_id']
  249. has_open_orders = len([o for o in exchange_open_orders if o.get('symbol') == symbol]) > 0
  250. # Only close position if exchange confirms no position AND no pending orders
  251. if abs(exchange_position_size) < 1e-8 and not has_open_orders:
  252. # Calculate realized P&L based on position side
  253. entry_price_db = current_trade['entry_price']
  254. estimated_pnl = current_trade.get('realized_pnl', 0)
  255. success = self.update_trade_position_closed(
  256. lifecycle_id,
  257. entry_price_db, # Using entry price as estimate since position is confirmed closed
  258. estimated_pnl,
  259. "exchange_confirmed_closed"
  260. )
  261. if success:
  262. logger.info(f"✅ Confirmed position closed for {symbol} with exchange")
  263. return success
  264. return True # Position still exists on exchange, no update needed
  265. except Exception as e:
  266. logger.error(f"❌ Error confirming position with exchange: {e}")
  267. return False
  268. def update_trade_market_data(self,
  269. trade_lifecycle_id: str,
  270. unrealized_pnl: Optional[float] = None,
  271. mark_price: Optional[float] = None,
  272. current_position_size: Optional[float] = None,
  273. entry_price: Optional[float] = None,
  274. liquidation_price: Optional[float] = None,
  275. margin_used: Optional[float] = None,
  276. leverage: Optional[float] = None,
  277. position_value: Optional[float] = None,
  278. unrealized_pnl_percentage: Optional[float] = None) -> bool:
  279. """Update market-related data for an open trade lifecycle."""
  280. try:
  281. updates = []
  282. params = []
  283. if unrealized_pnl is not None:
  284. updates.append("unrealized_pnl = ?")
  285. params.append(unrealized_pnl)
  286. if mark_price is not None:
  287. updates.append("mark_price = ?")
  288. params.append(mark_price)
  289. if current_position_size is not None:
  290. updates.append("current_position_size = ?")
  291. params.append(current_position_size)
  292. if entry_price is not None:
  293. updates.append("entry_price = ?")
  294. params.append(entry_price)
  295. if liquidation_price is not None:
  296. updates.append("liquidation_price = ?")
  297. params.append(liquidation_price)
  298. if margin_used is not None:
  299. updates.append("margin_used = ?")
  300. params.append(margin_used)
  301. if leverage is not None:
  302. updates.append("leverage = ?")
  303. params.append(leverage)
  304. if position_value is not None:
  305. updates.append("position_value = ?")
  306. params.append(position_value)
  307. if unrealized_pnl_percentage is not None:
  308. updates.append("unrealized_pnl_percentage = ?")
  309. params.append(unrealized_pnl_percentage)
  310. if not updates:
  311. logger.debug(f"No market data fields provided to update for lifecycle {trade_lifecycle_id}.")
  312. return True
  313. timestamp = datetime.now(timezone.utc).isoformat()
  314. updates.append("updated_at = ?")
  315. params.append(timestamp)
  316. set_clause = ", ".join(updates)
  317. query = f"""
  318. UPDATE trades
  319. SET {set_clause}
  320. WHERE trade_lifecycle_id = ? AND status = 'position_opened'
  321. """
  322. params.append(trade_lifecycle_id)
  323. cursor = self.db.conn.cursor()
  324. cursor.execute(query, tuple(params))
  325. self.db.conn.commit()
  326. updated_rows = cursor.rowcount
  327. if updated_rows > 0:
  328. logger.debug(f"💹 Updated market data for lifecycle {trade_lifecycle_id}")
  329. return True
  330. else:
  331. return False
  332. except Exception as e:
  333. logger.error(f"❌ Error updating market data for trade lifecycle {trade_lifecycle_id}: {e}")
  334. return False
  335. def get_recent_trades(self, limit: int = 10) -> List[Dict[str, Any]]:
  336. """Get recent trades (these are active/open trades, as completed ones are migrated)."""
  337. return self.db._fetch_query("SELECT * FROM trades WHERE status = 'position_opened' ORDER BY updated_at DESC LIMIT ?", (limit,))
  338. def get_all_trades(self) -> List[Dict[str, Any]]:
  339. """Fetch all trades from the database, ordered by timestamp."""
  340. return self.db._fetch_query("SELECT * FROM trades ORDER BY timestamp ASC")