#!/usr/bin/env python3 """ Order Manager for Trading Statistics Handles order tracking, status updates, and order cleanup operations. """ import sqlite3 import logging from datetime import datetime, timezone, timedelta from typing import Dict, List, Any, Optional import uuid logger = logging.getLogger(__name__) class OrderManager: """Manages order operations in the trading statistics database.""" def __init__(self, db_manager): """Initialize with database manager.""" self.db = db_manager def record_order_placed(self, symbol: str, side: str, order_type: str, amount_requested: float, price: Optional[float] = None, bot_order_ref_id: Optional[str] = None, exchange_order_id: Optional[str] = None, status: str = 'open', parent_bot_order_ref_id: Optional[str] = None) -> Optional[int]: """Record a newly placed order. Returns the order ID or None on failure.""" now_iso = datetime.now(timezone.utc).isoformat() query = """ INSERT INTO orders (bot_order_ref_id, exchange_order_id, symbol, side, type, amount_requested, price, status, timestamp_created, timestamp_updated, parent_bot_order_ref_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """ params = (bot_order_ref_id, exchange_order_id, symbol, side.lower(), order_type.lower(), amount_requested, price, status.lower(), now_iso, now_iso, parent_bot_order_ref_id) try: cur = self.db.conn.cursor() cur.execute(query, params) self.db.conn.commit() order_db_id = cur.lastrowid logger.info(f"Recorded order placed: ID {order_db_id}, Symbol {symbol}, Side {side}, Type {order_type}, Amount {amount_requested}") return order_db_id except sqlite3.IntegrityError as e: logger.error(f"Failed to record order due to IntegrityError: {e}") return None except Exception as e: logger.error(f"Failed to record order: {e}") return None def update_order_status(self, order_db_id: Optional[int] = None, bot_order_ref_id: Optional[str] = None, exchange_order_id: Optional[str] = None, new_status: Optional[str] = None, amount_filled_increment: Optional[float] = None, set_exchange_order_id: Optional[str] = None) -> bool: """Update an existing order's status and/or amount_filled.""" if not any([order_db_id, bot_order_ref_id, exchange_order_id]): logger.error("Must provide one of order_db_id, bot_order_ref_id, or exchange_order_id to update order.") return False now_iso = datetime.now(timezone.utc).isoformat() set_clauses = [] params = [] if new_status: set_clauses.append("status = ?") params.append(new_status.lower()) if set_exchange_order_id is not None: set_clauses.append("exchange_order_id = ?") params.append(set_exchange_order_id) identifier_clause = "" identifier_param = None if order_db_id: identifier_clause = "id = ?" identifier_param = order_db_id elif bot_order_ref_id: identifier_clause = "bot_order_ref_id = ?" identifier_param = bot_order_ref_id elif exchange_order_id: identifier_clause = "exchange_order_id = ?" identifier_param = exchange_order_id if amount_filled_increment is not None and amount_filled_increment > 0: order_data = self.db._fetchone_query(f"SELECT amount_filled FROM orders WHERE {identifier_clause}", (identifier_param,)) current_amount_filled = order_data.get('amount_filled', 0.0) if order_data else 0.0 set_clauses.append("amount_filled = ?") params.append(current_amount_filled + amount_filled_increment) if not set_clauses: return True # No update needed set_clauses.append("timestamp_updated = ?") params.append(now_iso) params.append(identifier_param) query = f"UPDATE orders SET {', '.join(set_clauses)} WHERE {identifier_clause}" try: self.db._execute_query(query, tuple(params)) logger.info(f"Updated order ({identifier_clause}={identifier_param}): Status to '{new_status or 'N/A'}'") return True except Exception as e: logger.error(f"Failed to update order: {e}") return False def get_order_by_db_id(self, order_db_id: int) -> Optional[Dict[str, Any]]: """Fetch an order by its database primary key ID.""" return self.db._fetchone_query("SELECT * FROM orders WHERE id = ?", (order_db_id,)) def get_order_by_bot_ref_id(self, bot_order_ref_id: str) -> Optional[Dict[str, Any]]: """Fetch an order by the bot's internal reference ID.""" return self.db._fetchone_query("SELECT * FROM orders WHERE bot_order_ref_id = ?", (bot_order_ref_id,)) def get_order_by_exchange_id(self, exchange_order_id: str) -> Optional[Dict[str, Any]]: """Fetch an order by the exchange's order ID.""" return self.db._fetchone_query("SELECT * FROM orders WHERE exchange_order_id = ?", (exchange_order_id,)) def get_orders_by_status(self, status: str, order_type_filter: Optional[str] = None, parent_bot_order_ref_id: Optional[str] = None) -> List[Dict[str, Any]]: """Fetch all orders with a specific status, with optional filters.""" query = "SELECT * FROM orders WHERE status = ?" params = [status.lower()] if order_type_filter: query += " AND type = ?" params.append(order_type_filter.lower()) if parent_bot_order_ref_id: query += " AND parent_bot_order_ref_id = ?" params.append(parent_bot_order_ref_id) query += " ORDER BY timestamp_created ASC" return self.db._fetch_query(query, tuple(params)) def cancel_linked_orders(self, parent_bot_order_ref_id: str, new_status: str = 'cancelled_parent_filled') -> int: """Cancel all orders linked to a parent order. Returns count of cancelled orders.""" linked_orders = self.get_orders_by_status('pending_trigger', parent_bot_order_ref_id=parent_bot_order_ref_id) cancelled_count = 0 for order in linked_orders: order_db_id = order.get('id') if order_db_id: success = self.update_order_status(order_db_id=order_db_id, new_status=new_status) if success: cancelled_count += 1 logger.info(f"Cancelled linked order ID {order_db_id} (parent: {parent_bot_order_ref_id})") return cancelled_count def cancel_pending_stop_losses_by_symbol(self, symbol: str, new_status: str = 'cancelled_position_closed') -> int: """Cancel all pending stop loss orders for a specific symbol. Returns count cancelled.""" query = "SELECT * FROM orders WHERE symbol = ? AND status = 'pending_trigger' AND type = 'stop_limit_trigger'" pending_stop_losses = self.db._fetch_query(query, (symbol,)) cancelled_count = 0 for order in pending_stop_losses: order_db_id = order.get('id') if order_db_id: success = self.update_order_status(order_db_id=order_db_id, new_status=new_status) if success: cancelled_count += 1 logger.info(f"Cancelled pending SL order ID {order_db_id} for {symbol}") return cancelled_count def get_order_cleanup_summary(self) -> Dict[str, Any]: """Get summary of order cleanup actions for monitoring.""" try: cleanup_stats = {} cancellation_types = [ 'cancelled_parent_cancelled', 'cancelled_parent_disappeared', 'cancelled_manual_exit', 'cancelled_auto_exit', 'cancelled_no_position', 'cancelled_external_position_close', 'cancelled_orphaned_no_position', 'cancelled_externally', 'immediately_executed_on_activation', 'activation_execution_failed', 'activation_execution_error' ] for cancel_type in cancellation_types: count_result = self.db._fetchone_query( "SELECT COUNT(*) as count FROM orders WHERE status = ?", (cancel_type,) ) cleanup_stats[cancel_type] = count_result['count'] if count_result else 0 # Get currently pending stop losses pending_sls = self.get_orders_by_status('pending_trigger', 'stop_limit_trigger') cleanup_stats['currently_pending_stop_losses'] = len(pending_sls) # Get total orders in various states active_orders = self.db._fetchone_query( "SELECT COUNT(*) as count FROM orders WHERE status IN ('open', 'submitted', 'partially_filled')", () ) cleanup_stats['currently_active_orders'] = active_orders['count'] if active_orders else 0 return cleanup_stats except Exception as e: logger.error(f"Error getting order cleanup summary: {e}") return {} def get_external_activity_summary(self, days: int = 7) -> Dict[str, Any]: """Get summary of external activity over the last N days.""" try: cutoff_date = (datetime.now(timezone.utc) - timedelta(days=days)).isoformat() # External trades external_trades = self.db._fetch_query( "SELECT COUNT(*) as count, side FROM trades WHERE trade_type = 'external' AND timestamp >= ? GROUP BY side", (cutoff_date,) ) external_trade_summary = { 'external_buy_trades': 0, 'external_sell_trades': 0, 'total_external_trades': 0 } for trade_group in external_trades: side = trade_group['side'] count = trade_group['count'] external_trade_summary['total_external_trades'] += count if side == 'buy': external_trade_summary['external_buy_trades'] = count elif side == 'sell': external_trade_summary['external_sell_trades'] = count # External cancellations external_cancellations = self.db._fetchone_query( "SELECT COUNT(*) as count FROM orders WHERE status = 'cancelled_externally' AND timestamp_updated >= ?", (cutoff_date,) ) external_trade_summary['external_cancellations'] = external_cancellations['count'] if external_cancellations else 0 # Cleanup actions cleanup_cancellations = self.db._fetchone_query( """SELECT COUNT(*) as count FROM orders WHERE status LIKE 'cancelled_%' AND status != 'cancelled_externally' AND timestamp_updated >= ?""", (cutoff_date,) ) external_trade_summary['cleanup_cancellations'] = cleanup_cancellations['count'] if cleanup_cancellations else 0 external_trade_summary['period_days'] = days return external_trade_summary except Exception as e: logger.error(f"Error getting external activity summary: {e}") return {'period_days': days, 'total_external_trades': 0, 'external_cancellations': 0} def get_recent_orders(self, limit: int = 20) -> List[Dict[str, Any]]: """Get recent orders from the database.""" try: query = "SELECT * FROM orders ORDER BY timestamp_created DESC LIMIT ?" return self.db._fetch_query(query, (limit,)) except Exception as e: logger.error(f"❌ Error getting recent orders: {e}") return []