order_manager.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. #!/usr/bin/env python3
  2. """
  3. Order Manager for Trading Statistics
  4. Handles order tracking, status updates, and order cleanup operations.
  5. """
  6. import sqlite3
  7. import logging
  8. from datetime import datetime, timezone, timedelta
  9. from typing import Dict, List, Any, Optional
  10. import uuid
  11. logger = logging.getLogger(__name__)
  12. class OrderManager:
  13. """Manages order operations in the trading statistics database."""
  14. def __init__(self, db_manager):
  15. """Initialize with database manager."""
  16. self.db = db_manager
  17. def record_order_placed(self, symbol: str, side: str, order_type: str,
  18. amount_requested: float, price: Optional[float] = None,
  19. bot_order_ref_id: Optional[str] = None,
  20. exchange_order_id: Optional[str] = None,
  21. status: str = 'open',
  22. parent_bot_order_ref_id: Optional[str] = None) -> Optional[int]:
  23. """Record a newly placed order. Returns the order ID or None on failure."""
  24. now_iso = datetime.now(timezone.utc).isoformat()
  25. query = """
  26. INSERT INTO orders (bot_order_ref_id, exchange_order_id, symbol, side, type,
  27. amount_requested, price, status, timestamp_created, timestamp_updated, parent_bot_order_ref_id)
  28. VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
  29. """
  30. params = (bot_order_ref_id, exchange_order_id, symbol, side.lower(), order_type.lower(),
  31. amount_requested, price, status.lower(), now_iso, now_iso, parent_bot_order_ref_id)
  32. try:
  33. cur = self.db.conn.cursor()
  34. cur.execute(query, params)
  35. self.db.conn.commit()
  36. order_db_id = cur.lastrowid
  37. logger.info(f"Recorded order placed: ID {order_db_id}, Symbol {symbol}, Side {side}, Type {order_type}, Amount {amount_requested}")
  38. return order_db_id
  39. except sqlite3.IntegrityError as e:
  40. logger.error(f"Failed to record order due to IntegrityError: {e}")
  41. return None
  42. except Exception as e:
  43. logger.error(f"Failed to record order: {e}")
  44. return None
  45. def update_order_status(self, order_db_id: Optional[int] = None, bot_order_ref_id: Optional[str] = None,
  46. exchange_order_id: Optional[str] = None, new_status: Optional[str] = None,
  47. amount_filled_increment: Optional[float] = None, set_exchange_order_id: Optional[str] = None) -> bool:
  48. """Update an existing order's status and/or amount_filled."""
  49. if not any([order_db_id, bot_order_ref_id, exchange_order_id]):
  50. logger.error("Must provide one of order_db_id, bot_order_ref_id, or exchange_order_id to update order.")
  51. return False
  52. now_iso = datetime.now(timezone.utc).isoformat()
  53. set_clauses = []
  54. params = []
  55. if new_status:
  56. set_clauses.append("status = ?")
  57. params.append(new_status.lower())
  58. if set_exchange_order_id is not None:
  59. set_clauses.append("exchange_order_id = ?")
  60. params.append(set_exchange_order_id)
  61. identifier_clause = ""
  62. identifier_param = None
  63. if order_db_id:
  64. identifier_clause = "id = ?"
  65. identifier_param = order_db_id
  66. elif bot_order_ref_id:
  67. identifier_clause = "bot_order_ref_id = ?"
  68. identifier_param = bot_order_ref_id
  69. elif exchange_order_id:
  70. identifier_clause = "exchange_order_id = ?"
  71. identifier_param = exchange_order_id
  72. if amount_filled_increment is not None and amount_filled_increment > 0:
  73. order_data = self.db._fetchone_query(f"SELECT amount_filled FROM orders WHERE {identifier_clause}", (identifier_param,))
  74. current_amount_filled = order_data.get('amount_filled', 0.0) if order_data else 0.0
  75. set_clauses.append("amount_filled = ?")
  76. params.append(current_amount_filled + amount_filled_increment)
  77. if not set_clauses:
  78. return True # No update needed
  79. set_clauses.append("timestamp_updated = ?")
  80. params.append(now_iso)
  81. params.append(identifier_param)
  82. query = f"UPDATE orders SET {', '.join(set_clauses)} WHERE {identifier_clause}"
  83. try:
  84. self.db._execute_query(query, tuple(params))
  85. logger.info(f"Updated order ({identifier_clause}={identifier_param}): Status to '{new_status or 'N/A'}'")
  86. return True
  87. except Exception as e:
  88. logger.error(f"Failed to update order: {e}")
  89. return False
  90. def get_order_by_db_id(self, order_db_id: int) -> Optional[Dict[str, Any]]:
  91. """Fetch an order by its database primary key ID."""
  92. return self.db._fetchone_query("SELECT * FROM orders WHERE id = ?", (order_db_id,))
  93. def get_order_by_bot_ref_id(self, bot_order_ref_id: str) -> Optional[Dict[str, Any]]:
  94. """Fetch an order by the bot's internal reference ID."""
  95. return self.db._fetchone_query("SELECT * FROM orders WHERE bot_order_ref_id = ?", (bot_order_ref_id,))
  96. def get_order_by_exchange_id(self, exchange_order_id: str) -> Optional[Dict[str, Any]]:
  97. """Fetch an order by the exchange's order ID."""
  98. return self.db._fetchone_query("SELECT * FROM orders WHERE exchange_order_id = ?", (exchange_order_id,))
  99. def get_orders_by_status(self, status: str, order_type_filter: Optional[str] = None,
  100. parent_bot_order_ref_id: Optional[str] = None) -> List[Dict[str, Any]]:
  101. """Fetch all orders with a specific status, with optional filters."""
  102. query = "SELECT * FROM orders WHERE status = ?"
  103. params = [status.lower()]
  104. if order_type_filter:
  105. query += " AND type = ?"
  106. params.append(order_type_filter.lower())
  107. if parent_bot_order_ref_id:
  108. query += " AND parent_bot_order_ref_id = ?"
  109. params.append(parent_bot_order_ref_id)
  110. query += " ORDER BY timestamp_created ASC"
  111. return self.db._fetch_query(query, tuple(params))
  112. def cancel_linked_orders(self, parent_bot_order_ref_id: str, new_status: str = 'cancelled_parent_filled') -> int:
  113. """Cancel all orders linked to a parent order. Returns count of cancelled orders."""
  114. linked_orders = self.get_orders_by_status('pending_trigger', parent_bot_order_ref_id=parent_bot_order_ref_id)
  115. cancelled_count = 0
  116. for order in linked_orders:
  117. order_db_id = order.get('id')
  118. if order_db_id:
  119. success = self.update_order_status(order_db_id=order_db_id, new_status=new_status)
  120. if success:
  121. cancelled_count += 1
  122. logger.info(f"Cancelled linked order ID {order_db_id} (parent: {parent_bot_order_ref_id})")
  123. return cancelled_count
  124. def cancel_pending_stop_losses_by_symbol(self, symbol: str, new_status: str = 'cancelled_position_closed') -> int:
  125. """Cancel all pending stop loss orders for a specific symbol. Returns count cancelled."""
  126. query = "SELECT * FROM orders WHERE symbol = ? AND status = 'pending_trigger' AND type = 'stop_limit_trigger'"
  127. pending_stop_losses = self.db._fetch_query(query, (symbol,))
  128. cancelled_count = 0
  129. for order in pending_stop_losses:
  130. order_db_id = order.get('id')
  131. if order_db_id:
  132. success = self.update_order_status(order_db_id=order_db_id, new_status=new_status)
  133. if success:
  134. cancelled_count += 1
  135. logger.info(f"Cancelled pending SL order ID {order_db_id} for {symbol}")
  136. return cancelled_count
  137. def get_order_cleanup_summary(self) -> Dict[str, Any]:
  138. """Get summary of order cleanup actions for monitoring."""
  139. try:
  140. cleanup_stats = {}
  141. cancellation_types = [
  142. 'cancelled_parent_cancelled',
  143. 'cancelled_parent_disappeared',
  144. 'cancelled_manual_exit',
  145. 'cancelled_auto_exit',
  146. 'cancelled_no_position',
  147. 'cancelled_external_position_close',
  148. 'cancelled_orphaned_no_position',
  149. 'cancelled_externally',
  150. 'immediately_executed_on_activation',
  151. 'activation_execution_failed',
  152. 'activation_execution_error'
  153. ]
  154. for cancel_type in cancellation_types:
  155. count_result = self.db._fetchone_query(
  156. "SELECT COUNT(*) as count FROM orders WHERE status = ?",
  157. (cancel_type,)
  158. )
  159. cleanup_stats[cancel_type] = count_result['count'] if count_result else 0
  160. # Get currently pending stop losses
  161. pending_sls = self.get_orders_by_status('pending_trigger', 'stop_limit_trigger')
  162. cleanup_stats['currently_pending_stop_losses'] = len(pending_sls)
  163. # Get total orders in various states
  164. active_orders = self.db._fetchone_query(
  165. "SELECT COUNT(*) as count FROM orders WHERE status IN ('open', 'submitted', 'partially_filled')",
  166. ()
  167. )
  168. cleanup_stats['currently_active_orders'] = active_orders['count'] if active_orders else 0
  169. return cleanup_stats
  170. except Exception as e:
  171. logger.error(f"Error getting order cleanup summary: {e}")
  172. return {}
  173. def get_external_activity_summary(self, days: int = 7) -> Dict[str, Any]:
  174. """Get summary of external activity over the last N days."""
  175. try:
  176. cutoff_date = (datetime.now(timezone.utc) - timedelta(days=days)).isoformat()
  177. # External trades
  178. external_trades = self.db._fetch_query(
  179. "SELECT COUNT(*) as count, side FROM trades WHERE trade_type = 'external' AND timestamp >= ? GROUP BY side",
  180. (cutoff_date,)
  181. )
  182. external_trade_summary = {
  183. 'external_buy_trades': 0,
  184. 'external_sell_trades': 0,
  185. 'total_external_trades': 0
  186. }
  187. for trade_group in external_trades:
  188. side = trade_group['side']
  189. count = trade_group['count']
  190. external_trade_summary['total_external_trades'] += count
  191. if side == 'buy':
  192. external_trade_summary['external_buy_trades'] = count
  193. elif side == 'sell':
  194. external_trade_summary['external_sell_trades'] = count
  195. # External cancellations
  196. external_cancellations = self.db._fetchone_query(
  197. "SELECT COUNT(*) as count FROM orders WHERE status = 'cancelled_externally' AND timestamp_updated >= ?",
  198. (cutoff_date,)
  199. )
  200. external_trade_summary['external_cancellations'] = external_cancellations['count'] if external_cancellations else 0
  201. # Cleanup actions
  202. cleanup_cancellations = self.db._fetchone_query(
  203. """SELECT COUNT(*) as count FROM orders
  204. WHERE status LIKE 'cancelled_%'
  205. AND status != 'cancelled_externally'
  206. AND timestamp_updated >= ?""",
  207. (cutoff_date,)
  208. )
  209. external_trade_summary['cleanup_cancellations'] = cleanup_cancellations['count'] if cleanup_cancellations else 0
  210. external_trade_summary['period_days'] = days
  211. return external_trade_summary
  212. except Exception as e:
  213. logger.error(f"Error getting external activity summary: {e}")
  214. return {'period_days': days, 'total_external_trades': 0, 'external_cancellations': 0}
  215. def get_recent_orders(self, limit: int = 20) -> List[Dict[str, Any]]:
  216. """Get recent orders from the database."""
  217. try:
  218. query = "SELECT * FROM orders ORDER BY timestamp_created DESC LIMIT ?"
  219. return self.db._fetch_query(query, (limit,))
  220. except Exception as e:
  221. logger.error(f"❌ Error getting recent orders: {e}")
  222. return []