123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349 |
- #!/usr/bin/env python3
- """
- Database Manager for Trading Statistics
- Handles database connections, schema creation, and basic CRUD operations.
- """
- import sqlite3
- import os
- import logging
- from datetime import datetime, timezone, timedelta
- from typing import Dict, List, Any, Optional
- from src.migrations.migrate_db import run_migrations as run_db_migrations
- from src.config.config import Config
- from src.utils.token_display_formatter import get_formatter
- logger = logging.getLogger(__name__)
- class DatabaseManager:
- """Manages SQLite database connections and basic operations."""
- def __init__(self, db_path: str = "data/trading_stats.sqlite"):
- """Initialize database connection and schema."""
- self.db_path = db_path
- self._ensure_data_directory()
-
- # Run migrations before connecting
- logger.info("Running database migrations if needed...")
- run_db_migrations(self.db_path)
- logger.info("Database migration check complete.")
-
- # Connect to database
- self.conn = sqlite3.connect(self.db_path, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES)
- self.conn.row_factory = self._dict_factory
-
- # Create tables and initialize metadata
- self._create_tables()
- self._initialize_metadata()
-
- # Purge old data on startup
- self.purge_old_daily_aggregated_stats()
- self.purge_old_balance_history()
- def _dict_factory(self, cursor, row):
- """Convert SQLite rows to dictionaries."""
- d = {}
- for idx, col in enumerate(cursor.description):
- d[col[0]] = row[idx]
- return d
- def _ensure_data_directory(self):
- """Ensure the data directory for the SQLite file exists."""
- data_dir = os.path.dirname(self.db_path)
- if data_dir and not os.path.exists(data_dir):
- os.makedirs(data_dir)
- logger.info(f"Created data directory for TradingStats DB: {data_dir}")
- def _execute_query(self, query: str, params: tuple = ()):
- """Execute a query (INSERT, UPDATE, DELETE)."""
- with self.conn:
- self.conn.execute(query, params)
- def _fetch_query(self, query: str, params: tuple = ()) -> List[Dict[str, Any]]:
- """Execute a SELECT query and fetch all results."""
- cur = self.conn.execute(query, params)
- return cur.fetchall()
- def _fetchone_query(self, query: str, params: tuple = ()) -> Optional[Dict[str, Any]]:
- """Execute a SELECT query and fetch one result."""
- cur = self.conn.execute(query, params)
- return cur.fetchone()
- def _create_tables(self):
- """Create SQLite tables if they don't exist."""
- queries = [
- """
- CREATE TABLE IF NOT EXISTS metadata (
- key TEXT PRIMARY KEY,
- value TEXT
- )
- """,
- """
- CREATE TABLE IF NOT EXISTS trades (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- exchange_fill_id TEXT UNIQUE,
- timestamp TEXT NOT NULL,
- symbol TEXT NOT NULL,
- side TEXT NOT NULL,
- amount REAL NOT NULL,
- price REAL NOT NULL,
- value REAL NOT NULL,
- trade_type TEXT NOT NULL,
- pnl REAL DEFAULT 0.0,
- linked_order_table_id INTEGER,
-
- -- Trade lifecycle tracking fields
- status TEXT DEFAULT 'executed',
- trade_lifecycle_id TEXT,
- position_side TEXT,
-
- -- Position tracking
- entry_price REAL,
- current_position_size REAL DEFAULT 0,
-
- -- Order IDs (exchange IDs)
- entry_order_id TEXT,
- stop_loss_order_id TEXT,
- take_profit_order_id TEXT,
-
- -- Risk management
- stop_loss_price REAL,
- take_profit_price REAL,
-
- -- P&L tracking
- realized_pnl REAL DEFAULT 0,
- unrealized_pnl REAL DEFAULT 0,
- mark_price REAL DEFAULT 0,
- position_value REAL DEFAULT NULL,
- unrealized_pnl_percentage REAL DEFAULT NULL,
-
- -- Risk Info from Exchange
- liquidation_price REAL DEFAULT NULL,
- margin_used REAL DEFAULT NULL,
- leverage REAL DEFAULT NULL,
-
- -- Timestamps
- position_opened_at TEXT,
- position_closed_at TEXT,
- updated_at TEXT DEFAULT CURRENT_TIMESTAMP,
-
- -- Notes
- notes TEXT
- )
- """,
- """
- CREATE TABLE IF NOT EXISTS balance_history (
- timestamp TEXT PRIMARY KEY,
- balance REAL NOT NULL
- )
- """,
- """
- CREATE TABLE IF NOT EXISTS balance_adjustments (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- adjustment_id TEXT UNIQUE,
- timestamp TEXT NOT NULL,
- type TEXT NOT NULL,
- amount REAL NOT NULL,
- description TEXT
- )
- """,
- """
- CREATE TABLE IF NOT EXISTS orders (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- bot_order_ref_id TEXT UNIQUE,
- exchange_order_id TEXT UNIQUE,
- symbol TEXT NOT NULL,
- side TEXT NOT NULL,
- type TEXT NOT NULL,
- amount_requested REAL NOT NULL,
- amount_filled REAL DEFAULT 0.0,
- price REAL,
- status TEXT NOT NULL,
- timestamp_created TEXT NOT NULL,
- timestamp_updated TEXT NOT NULL,
- parent_bot_order_ref_id TEXT NULLABLE
- )
- """,
- """
- CREATE TABLE IF NOT EXISTS daily_balances (
- date TEXT PRIMARY KEY,
- balance REAL NOT NULL,
- timestamp TEXT NOT NULL
- )
- """,
- """
- CREATE TABLE IF NOT EXISTS token_stats (
- token TEXT PRIMARY KEY,
- total_realized_pnl REAL DEFAULT 0.0,
- total_completed_cycles INTEGER DEFAULT 0,
- winning_cycles INTEGER DEFAULT 0,
- losing_cycles INTEGER DEFAULT 0,
- total_entry_volume REAL DEFAULT 0.0,
- total_exit_volume REAL DEFAULT 0.0,
- sum_of_winning_pnl REAL DEFAULT 0.0,
- sum_of_losing_pnl REAL DEFAULT 0.0,
- largest_winning_cycle_pnl REAL DEFAULT 0.0,
- largest_losing_cycle_pnl REAL DEFAULT 0.0,
- largest_winning_cycle_entry_volume REAL DEFAULT 0.0,
- largest_losing_cycle_entry_volume REAL DEFAULT 0.0,
- first_cycle_closed_at TEXT,
- last_cycle_closed_at TEXT,
- total_cancelled_cycles INTEGER DEFAULT 0,
- updated_at TEXT DEFAULT CURRENT_TIMESTAMP,
- total_duration_seconds INTEGER DEFAULT 0,
- roe_percentage REAL DEFAULT 0.0
- )
- """,
- """
- CREATE TABLE IF NOT EXISTS daily_aggregated_stats (
- date TEXT NOT NULL,
- token TEXT NOT NULL,
- realized_pnl REAL DEFAULT 0.0,
- completed_cycles INTEGER DEFAULT 0,
- entry_volume REAL DEFAULT 0.0,
- exit_volume REAL DEFAULT 0.0,
- PRIMARY KEY (date, token)
- )
- """
- ]
-
- # Create indexes
- indexes = [
- "CREATE INDEX IF NOT EXISTS idx_orders_bot_order_ref_id ON orders (bot_order_ref_id)",
- "CREATE INDEX IF NOT EXISTS idx_orders_exchange_order_id ON orders (exchange_order_id)",
- "CREATE INDEX IF NOT EXISTS idx_trades_exchange_fill_id ON trades (exchange_fill_id)",
- "CREATE INDEX IF NOT EXISTS idx_trades_linked_order_table_id ON trades (linked_order_table_id)",
- "CREATE INDEX IF NOT EXISTS idx_orders_parent_bot_order_ref_id ON orders (parent_bot_order_ref_id)",
- "CREATE INDEX IF NOT EXISTS idx_orders_status_type ON orders (status, type)",
- "CREATE INDEX IF NOT EXISTS idx_trades_status ON trades (status)",
- "CREATE INDEX IF NOT EXISTS idx_trades_lifecycle_id ON trades (trade_lifecycle_id)",
- "CREATE INDEX IF NOT EXISTS idx_trades_position_side ON trades (position_side)",
- "CREATE INDEX IF NOT EXISTS idx_trades_symbol_status ON trades (symbol, status)",
- "CREATE INDEX IF NOT EXISTS idx_daily_stats_date_token ON daily_aggregated_stats (date, token)"
- ]
-
- all_queries = queries + indexes
- for query in all_queries:
- self._execute_query(query)
-
- logger.info("SQLite tables ensured for TradingStats.")
- def _initialize_metadata(self):
- """Initialize metadata if not already present."""
- start_date = self._get_metadata('start_date')
- initial_balance = self._get_metadata('initial_balance')
- if start_date is None:
- self._set_metadata('start_date', datetime.now(timezone.utc).isoformat())
- logger.info("Initialized 'start_date' in metadata.")
-
- if initial_balance is None:
- self._set_metadata('initial_balance', '0.0')
- logger.info("Initialized 'initial_balance' in metadata.")
-
- logger.info(f"TradingStats initialized. Start Date: {self._get_metadata('start_date')}, Initial Balance: {self._get_metadata('initial_balance')}")
- def _get_metadata(self, key: str) -> Optional[str]:
- """Retrieve a value from the metadata table."""
- row = self._fetchone_query("SELECT value FROM metadata WHERE key = ?", (key,))
- return row['value'] if row else None
- def _set_metadata(self, key: str, value: str):
- """Set a value in the metadata table."""
- self._execute_query("INSERT OR REPLACE INTO metadata (key, value) VALUES (?, ?)", (key, value))
- def set_initial_balance(self, balance: float):
- """Set the initial balance if it hasn't been set yet."""
- # Use a small tolerance for floating point comparison
- if self.get_initial_balance() < 1e-9: # Check if it's effectively zero
- self._set_metadata('initial_balance', str(balance))
- logger.info(f"Initial balance set to: {balance}")
- else:
- logger.info(f"Initial balance is already set. Current value: {self.get_initial_balance()}")
- def get_initial_balance(self) -> float:
- """Retrieve the initial balance from the metadata table."""
- balance_str = self._get_metadata('initial_balance')
- return float(balance_str) if balance_str is not None else 0.0
- def record_balance_snapshot(self, balance: float, unrealized_pnl: float = 0.0,
- timestamp: Optional[str] = None, notes: Optional[str] = None):
- """Records a balance snapshot to both balance_history and daily_balances."""
-
- now_utc = datetime.now(timezone.utc)
-
- # For logging, show the formatted values
- logger.info(f"Recorded balance snapshot: {balance} (unrealized: {unrealized_pnl})")
-
- # Store raw float values in the database
- self._execute_query(
- "INSERT INTO balance_history (balance, unrealized_pnl, timestamp, notes) VALUES (?, ?, ?, ?)",
- (balance, unrealized_pnl, now_utc.isoformat(), notes)
- )
- def purge_old_daily_aggregated_stats(self, months_to_keep: int = 10):
- """Purge records from daily_aggregated_stats older than specified months."""
- try:
- cutoff_date = datetime.now(timezone.utc).date() - timedelta(days=months_to_keep * 30)
- cutoff_datetime_str = cutoff_date.isoformat()
- query = "DELETE FROM daily_aggregated_stats WHERE date < ?"
-
- with self.conn:
- cursor = self.conn.cursor()
- cursor.execute(query, (cutoff_datetime_str,))
- rows_deleted = cursor.rowcount
-
- if rows_deleted > 0:
- logger.info(f"Purged {rows_deleted} old records from daily_aggregated_stats (older than {months_to_keep} months).")
- except Exception as e:
- logger.error(f"Error purging old daily_aggregated_stats: {e}", exc_info=True)
- def purge_old_balance_history(self):
- """Purge records from balance_history older than configured retention period."""
- days_to_keep = getattr(Config, 'BALANCE_HISTORY_RETENTION_DAYS', 30)
- if days_to_keep <= 0:
- return
- try:
- cutoff_date = datetime.now(timezone.utc).date() - timedelta(days=days_to_keep)
- cutoff_datetime_str = cutoff_date.isoformat()
- query = "DELETE FROM balance_history WHERE timestamp < ?"
-
- with self.conn:
- cursor = self.conn.cursor()
- cursor.execute(query, (cutoff_datetime_str,))
- rows_deleted = cursor.rowcount
-
- if rows_deleted > 0:
- logger.info(f"Purged {rows_deleted} old records from balance_history (older than {days_to_keep} days).")
- except Exception as e:
- logger.error(f"Error purging old balance_history: {e}", exc_info=True)
- def get_balance_history_record_count(self) -> int:
- """Get the total number of balance history records."""
- row = self._fetchone_query("SELECT COUNT(*) as count FROM balance_history")
- return row['count'] if row and 'count' in row else 0
- def get_daily_balance_record_count(self) -> int:
- """Get the total number of daily balance records."""
- row = self._fetchone_query("SELECT COUNT(*) as count FROM daily_balances")
- return row['count'] if row and 'count' in row else 0
- def close(self):
- """Close the SQLite database connection."""
- if self.conn:
- self.conn.close()
- logger.info("TradingStats SQLite connection closed.")
-
- def close_connection(self):
- """Close the SQLite database connection (alias for backward compatibility)."""
- self.close()
- def __del__(self):
- """Ensure connection is closed when object is deleted."""
- self.close_connection()
|