123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265 |
- #!/usr/bin/env python3
- """
- Order Manager for Trading Statistics
- Handles order tracking, status updates, and order cleanup operations.
- """
- import sqlite3
- import logging
- from datetime import datetime, timezone, timedelta
- from typing import Dict, List, Any, Optional
- import uuid
- logger = logging.getLogger(__name__)
- class OrderManager:
- """Manages order operations in the trading statistics database."""
- def __init__(self, db_manager):
- """Initialize with database manager."""
- self.db = db_manager
- def record_order_placed(self, symbol: str, side: str, order_type: str,
- amount_requested: float, price: Optional[float] = None,
- bot_order_ref_id: Optional[str] = None,
- exchange_order_id: Optional[str] = None,
- status: str = 'open',
- parent_bot_order_ref_id: Optional[str] = None) -> Optional[int]:
- """Record a newly placed order. Returns the order ID or None on failure."""
- now_iso = datetime.now(timezone.utc).isoformat()
- query = """
- INSERT INTO orders (bot_order_ref_id, exchange_order_id, symbol, side, type,
- amount_requested, price, status, timestamp_created, timestamp_updated, parent_bot_order_ref_id)
- VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- """
- params = (bot_order_ref_id, exchange_order_id, symbol, side.lower(), order_type.lower(),
- amount_requested, price, status.lower(), now_iso, now_iso, parent_bot_order_ref_id)
- try:
- cur = self.db.conn.cursor()
- cur.execute(query, params)
- self.db.conn.commit()
- order_db_id = cur.lastrowid
- logger.info(f"Recorded order placed: ID {order_db_id}, Symbol {symbol}, Side {side}, Type {order_type}, Amount {amount_requested}")
- return order_db_id
- except sqlite3.IntegrityError as e:
- logger.error(f"Failed to record order due to IntegrityError: {e}")
- return None
- except Exception as e:
- logger.error(f"Failed to record order: {e}")
- return None
- def update_order_status(self, order_db_id: Optional[int] = None, bot_order_ref_id: Optional[str] = None,
- exchange_order_id: Optional[str] = None, new_status: Optional[str] = None,
- amount_filled_increment: Optional[float] = None, set_exchange_order_id: Optional[str] = None) -> bool:
- """Update an existing order's status and/or amount_filled."""
- if not any([order_db_id, bot_order_ref_id, exchange_order_id]):
- logger.error("Must provide one of order_db_id, bot_order_ref_id, or exchange_order_id to update order.")
- return False
- now_iso = datetime.now(timezone.utc).isoformat()
- set_clauses = []
- params = []
- if new_status:
- set_clauses.append("status = ?")
- params.append(new_status.lower())
-
- if set_exchange_order_id is not None:
- set_clauses.append("exchange_order_id = ?")
- params.append(set_exchange_order_id)
-
- identifier_clause = ""
- identifier_param = None
- if order_db_id:
- identifier_clause = "id = ?"
- identifier_param = order_db_id
- elif bot_order_ref_id:
- identifier_clause = "bot_order_ref_id = ?"
- identifier_param = bot_order_ref_id
- elif exchange_order_id:
- identifier_clause = "exchange_order_id = ?"
- identifier_param = exchange_order_id
- if amount_filled_increment is not None and amount_filled_increment > 0:
- order_data = self.db._fetchone_query(f"SELECT amount_filled FROM orders WHERE {identifier_clause}", (identifier_param,))
- current_amount_filled = order_data.get('amount_filled', 0.0) if order_data else 0.0
- set_clauses.append("amount_filled = ?")
- params.append(current_amount_filled + amount_filled_increment)
- if not set_clauses:
- return True # No update needed
- set_clauses.append("timestamp_updated = ?")
- params.append(now_iso)
- params.append(identifier_param)
- query = f"UPDATE orders SET {', '.join(set_clauses)} WHERE {identifier_clause}"
-
- try:
- self.db._execute_query(query, tuple(params))
- logger.info(f"Updated order ({identifier_clause}={identifier_param}): Status to '{new_status or 'N/A'}'")
- return True
- except Exception as e:
- logger.error(f"Failed to update order: {e}")
- return False
- def get_order_by_db_id(self, order_db_id: int) -> Optional[Dict[str, Any]]:
- """Fetch an order by its database primary key ID."""
- return self.db._fetchone_query("SELECT * FROM orders WHERE id = ?", (order_db_id,))
- def get_order_by_bot_ref_id(self, bot_order_ref_id: str) -> Optional[Dict[str, Any]]:
- """Fetch an order by the bot's internal reference ID."""
- return self.db._fetchone_query("SELECT * FROM orders WHERE bot_order_ref_id = ?", (bot_order_ref_id,))
- def get_order_by_exchange_id(self, exchange_order_id: str) -> Optional[Dict[str, Any]]:
- """Fetch an order by the exchange's order ID."""
- return self.db._fetchone_query("SELECT * FROM orders WHERE exchange_order_id = ?", (exchange_order_id,))
- def get_orders_by_status(self, status: str, order_type_filter: Optional[str] = None,
- parent_bot_order_ref_id: Optional[str] = None) -> List[Dict[str, Any]]:
- """Fetch all orders with a specific status, with optional filters."""
- query = "SELECT * FROM orders WHERE status = ?"
- params = [status.lower()]
- if order_type_filter:
- query += " AND type = ?"
- params.append(order_type_filter.lower())
- if parent_bot_order_ref_id:
- query += " AND parent_bot_order_ref_id = ?"
- params.append(parent_bot_order_ref_id)
- query += " ORDER BY timestamp_created ASC"
- return self.db._fetch_query(query, tuple(params))
- def cancel_linked_orders(self, parent_bot_order_ref_id: str, new_status: str = 'cancelled_parent_filled') -> int:
- """Cancel all orders linked to a parent order. Returns count of cancelled orders."""
- linked_orders = self.get_orders_by_status('pending_trigger', parent_bot_order_ref_id=parent_bot_order_ref_id)
- cancelled_count = 0
-
- for order in linked_orders:
- order_db_id = order.get('id')
- if order_db_id:
- success = self.update_order_status(order_db_id=order_db_id, new_status=new_status)
- if success:
- cancelled_count += 1
- logger.info(f"Cancelled linked order ID {order_db_id} (parent: {parent_bot_order_ref_id})")
-
- return cancelled_count
- def cancel_pending_stop_losses_by_symbol(self, symbol: str, new_status: str = 'cancelled_position_closed') -> int:
- """Cancel all pending stop loss orders for a specific symbol. Returns count cancelled."""
- query = "SELECT * FROM orders WHERE symbol = ? AND status = 'pending_trigger' AND type = 'stop_limit_trigger'"
- pending_stop_losses = self.db._fetch_query(query, (symbol,))
- cancelled_count = 0
-
- for order in pending_stop_losses:
- order_db_id = order.get('id')
- if order_db_id:
- success = self.update_order_status(order_db_id=order_db_id, new_status=new_status)
- if success:
- cancelled_count += 1
- logger.info(f"Cancelled pending SL order ID {order_db_id} for {symbol}")
-
- return cancelled_count
- def get_order_cleanup_summary(self) -> Dict[str, Any]:
- """Get summary of order cleanup actions for monitoring."""
- try:
- cleanup_stats = {}
-
- cancellation_types = [
- 'cancelled_parent_cancelled',
- 'cancelled_parent_disappeared',
- 'cancelled_manual_exit',
- 'cancelled_auto_exit',
- 'cancelled_no_position',
- 'cancelled_external_position_close',
- 'cancelled_orphaned_no_position',
- 'cancelled_externally',
- 'immediately_executed_on_activation',
- 'activation_execution_failed',
- 'activation_execution_error'
- ]
-
- for cancel_type in cancellation_types:
- count_result = self.db._fetchone_query(
- "SELECT COUNT(*) as count FROM orders WHERE status = ?",
- (cancel_type,)
- )
- cleanup_stats[cancel_type] = count_result['count'] if count_result else 0
-
- # Get currently pending stop losses
- pending_sls = self.get_orders_by_status('pending_trigger', 'stop_limit_trigger')
- cleanup_stats['currently_pending_stop_losses'] = len(pending_sls)
-
- # Get total orders in various states
- active_orders = self.db._fetchone_query(
- "SELECT COUNT(*) as count FROM orders WHERE status IN ('open', 'submitted', 'partially_filled')",
- ()
- )
- cleanup_stats['currently_active_orders'] = active_orders['count'] if active_orders else 0
-
- return cleanup_stats
-
- except Exception as e:
- logger.error(f"Error getting order cleanup summary: {e}")
- return {}
- def get_external_activity_summary(self, days: int = 7) -> Dict[str, Any]:
- """Get summary of external activity over the last N days."""
- try:
- cutoff_date = (datetime.now(timezone.utc) - timedelta(days=days)).isoformat()
-
- # External trades
- external_trades = self.db._fetch_query(
- "SELECT COUNT(*) as count, side FROM trades WHERE trade_type = 'external' AND timestamp >= ? GROUP BY side",
- (cutoff_date,)
- )
-
- external_trade_summary = {
- 'external_buy_trades': 0,
- 'external_sell_trades': 0,
- 'total_external_trades': 0
- }
-
- for trade_group in external_trades:
- side = trade_group['side']
- count = trade_group['count']
- external_trade_summary['total_external_trades'] += count
- if side == 'buy':
- external_trade_summary['external_buy_trades'] = count
- elif side == 'sell':
- external_trade_summary['external_sell_trades'] = count
-
- # External cancellations
- external_cancellations = self.db._fetchone_query(
- "SELECT COUNT(*) as count FROM orders WHERE status = 'cancelled_externally' AND timestamp_updated >= ?",
- (cutoff_date,)
- )
- external_trade_summary['external_cancellations'] = external_cancellations['count'] if external_cancellations else 0
-
- # Cleanup actions
- cleanup_cancellations = self.db._fetchone_query(
- """SELECT COUNT(*) as count FROM orders
- WHERE status LIKE 'cancelled_%'
- AND status != 'cancelled_externally'
- AND timestamp_updated >= ?""",
- (cutoff_date,)
- )
- external_trade_summary['cleanup_cancellations'] = cleanup_cancellations['count'] if cleanup_cancellations else 0
- external_trade_summary['period_days'] = days
-
- return external_trade_summary
-
- except Exception as e:
- logger.error(f"Error getting external activity summary: {e}")
- return {'period_days': days, 'total_external_trades': 0, 'external_cancellations': 0}
- def get_recent_orders(self, limit: int = 20) -> List[Dict[str, Any]]:
- """Get recent orders from the database."""
- try:
- query = "SELECT * FROM orders ORDER BY timestamp_created DESC LIMIT ?"
- return self.db._fetch_query(query, (limit,))
- except Exception as e:
- logger.error(f"❌ Error getting recent orders: {e}")
- return []
|