1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579 |
- #!/usr/bin/env python3
- # MOVED TO src/trading/stats/ - This file kept for reference
- # Use: from src.stats import TradingStats
- #!/usr/bin/env python3
- """
- Trading Statistics Tracker (SQLite Version)
- Tracks and calculates comprehensive trading statistics using an SQLite database.
- """
- import sqlite3
- import os
- import logging
- from datetime import datetime, timedelta, timezone
- from typing import Dict, List, Any, Optional, Tuple, Union
- import math
- from collections import defaultdict
- import uuid
- import numpy as np # Ensure numpy is imported as np
- # 🆕 Import the migration runner
- from src.migrations.migrate_db import run_migrations as run_db_migrations
- from src.utils.token_display_formatter import get_formatter # Added import
- from src.config.config import Config
- logger = logging.getLogger(__name__)
- def _normalize_token_case(token: str) -> str:
- """
- Normalize token case: if any characters are already uppercase, keep as-is.
- Otherwise, convert to uppercase. This handles mixed-case tokens like kPEPE, kBONK.
- """
- # Check if any character is already uppercase
- if any(c.isupper() for c in token):
- return token # Keep original case for mixed-case tokens
- else:
- return token.upper() # Convert to uppercase for all-lowercase input
- class TradingStats:
- """Comprehensive trading statistics tracker using SQLite."""
- def __init__(self, db_path: str = "data/trading_stats.sqlite"):
- """Initialize the stats tracker and connect to SQLite DB."""
- self.db_path = db_path
- self._ensure_data_directory()
-
- # 🆕 Run database migrations before connecting and creating tables
- # This ensures the schema is up-to-date when the connection is made
- # and tables are potentially created for the first time.
- logger.info("Running database migrations if needed...")
- run_db_migrations(self.db_path) # Pass the correct db_path
- logger.info("Database migration check complete.")
-
- self.conn = sqlite3.connect(self.db_path, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES)
- self.conn.row_factory = self._dict_factory
- self._create_tables() # CREATE IF NOT EXISTS will still be useful for first-time setup
- self._initialize_metadata() # Also potentially sets schema_version if DB was just created
- # 🆕 Purge old daily aggregated stats on startup
- self.purge_old_daily_aggregated_stats()
- # 🆕 Purge old balance history on startup
- self.purge_old_balance_history()
- def _dict_factory(self, cursor, row):
- """Convert SQLite rows to dictionaries."""
- d = {}
- for idx, col in enumerate(cursor.description):
- d[col[0]] = row[idx]
- return d
- def _ensure_data_directory(self):
- """Ensure the data directory for the SQLite file exists."""
- data_dir = os.path.dirname(self.db_path)
- if data_dir and not os.path.exists(data_dir):
- os.makedirs(data_dir)
- logger.info(f"Created data directory for TradingStats DB: {data_dir}")
- def _execute_query(self, query: str, params: tuple = ()):
- """Execute a query (INSERT, UPDATE, DELETE)."""
- with self.conn:
- self.conn.execute(query, params)
- def _fetch_query(self, query: str, params: tuple = ()) -> List[Dict[str, Any]]:
- """Execute a SELECT query and fetch all results."""
- cur = self.conn.cursor()
- cur.execute(query, params)
- return cur.fetchall()
- def _fetchone_query(self, query: str, params: tuple = ()) -> Optional[Dict[str, Any]]:
- """Execute a SELECT query and fetch one result."""
- cur = self.conn.cursor()
- cur.execute(query, params)
- return cur.fetchone()
- def _create_tables(self):
- """Create SQLite tables if they don't exist."""
- queries = [
- """
- CREATE TABLE IF NOT EXISTS metadata (
- key TEXT PRIMARY KEY,
- value TEXT
- )
- """,
- """
- CREATE TABLE IF NOT EXISTS trades (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- exchange_fill_id TEXT UNIQUE,
- timestamp TEXT NOT NULL,
- symbol TEXT NOT NULL,
- side TEXT NOT NULL,
- amount REAL NOT NULL,
- price REAL NOT NULL,
- value REAL NOT NULL,
- trade_type TEXT NOT NULL,
- pnl REAL DEFAULT 0.0,
- linked_order_table_id INTEGER,
-
- -- 🆕 PHASE 4: Lifecycle tracking fields (merged from active_trades)
- status TEXT DEFAULT 'executed', -- 'pending', 'executed', 'position_opened', 'position_closed', 'cancelled'
- trade_lifecycle_id TEXT, -- Groups related trades into one lifecycle
- position_side TEXT, -- 'long', 'short', 'flat' - the resulting position side
-
- -- Position tracking
- entry_price REAL,
- current_position_size REAL DEFAULT 0,
-
- -- Order IDs (exchange IDs)
- entry_order_id TEXT,
- stop_loss_order_id TEXT,
- take_profit_order_id TEXT,
-
- -- Risk management
- stop_loss_price REAL,
- take_profit_price REAL,
-
- -- P&L tracking
- realized_pnl REAL DEFAULT 0,
- unrealized_pnl REAL DEFAULT 0,
- mark_price REAL DEFAULT 0,
- position_value REAL DEFAULT NULL,
- unrealized_pnl_percentage REAL DEFAULT NULL,
-
- -- Risk Info from Exchange
- liquidation_price REAL DEFAULT NULL,
- margin_used REAL DEFAULT NULL,
- leverage REAL DEFAULT NULL,
-
- -- Timestamps
- position_opened_at TEXT,
- position_closed_at TEXT,
- updated_at TEXT DEFAULT CURRENT_TIMESTAMP,
-
- -- Notes
- notes TEXT
- )
- """,
- """
- CREATE TABLE IF NOT EXISTS balance_history (
- timestamp TEXT PRIMARY KEY,
- balance REAL NOT NULL
- )
- """,
- """
- CREATE TABLE IF NOT EXISTS balance_adjustments (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- adjustment_id TEXT UNIQUE,
- timestamp TEXT NOT NULL,
- type TEXT NOT NULL, -- 'deposit' or 'withdrawal'
- amount REAL NOT NULL, -- Always positive, type indicates direction
- description TEXT
- )
- """,
- """
- CREATE TABLE IF NOT EXISTS orders (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- bot_order_ref_id TEXT UNIQUE,
- exchange_order_id TEXT UNIQUE,
- symbol TEXT NOT NULL,
- side TEXT NOT NULL,
- type TEXT NOT NULL,
- amount_requested REAL NOT NULL,
- amount_filled REAL DEFAULT 0.0,
- price REAL, -- For limit, stop, etc.
- status TEXT NOT NULL, -- e.g., 'open', 'partially_filled', 'filled', 'cancelled', 'rejected', 'expired', 'pending_trigger'
- timestamp_created TEXT NOT NULL,
- timestamp_updated TEXT NOT NULL,
- parent_bot_order_ref_id TEXT NULLABLE -- To link conditional orders (like SL triggers) to their parent order
- )
- """,
- """
- CREATE INDEX IF NOT EXISTS idx_orders_bot_order_ref_id ON orders (bot_order_ref_id);
- """,
- """
- CREATE INDEX IF NOT EXISTS idx_orders_exchange_order_id ON orders (exchange_order_id);
- """,
- """
- CREATE INDEX IF NOT EXISTS idx_trades_exchange_fill_id ON trades (exchange_fill_id);
- """,
- """
- CREATE INDEX IF NOT EXISTS idx_trades_linked_order_table_id ON trades (linked_order_table_id);
- """,
- """
- CREATE INDEX IF NOT EXISTS idx_orders_parent_bot_order_ref_id ON orders (parent_bot_order_ref_id);
- """,
- """
- CREATE INDEX IF NOT EXISTS idx_orders_status_type ON orders (status, type);
- """,
- """
- CREATE INDEX IF NOT EXISTS idx_trades_status ON trades (status);
- """,
- """
- CREATE INDEX IF NOT EXISTS idx_trades_lifecycle_id ON trades (trade_lifecycle_id);
- """,
- """
- CREATE INDEX IF NOT EXISTS idx_trades_position_side ON trades (position_side);
- """,
- """
- CREATE INDEX IF NOT EXISTS idx_trades_symbol_status ON trades (symbol, status);
- """,
- """
- CREATE TABLE IF NOT EXISTS daily_balances (
- date TEXT PRIMARY KEY,
- balance REAL NOT NULL,
- timestamp TEXT NOT NULL
- )
- """,
- ]
- # 🆕 Add new table creation queries
- queries.extend([
- """
- CREATE TABLE IF NOT EXISTS token_stats (
- token TEXT PRIMARY KEY,
- total_realized_pnl REAL DEFAULT 0.0,
- total_completed_cycles INTEGER DEFAULT 0,
- winning_cycles INTEGER DEFAULT 0,
- losing_cycles INTEGER DEFAULT 0,
- total_entry_volume REAL DEFAULT 0.0, -- Sum of (amount * entry_price) for completed cycles
- total_exit_volume REAL DEFAULT 0.0, -- Sum of (amount * exit_price) for completed cycles
- sum_of_winning_pnl REAL DEFAULT 0.0,
- sum_of_losing_pnl REAL DEFAULT 0.0, -- Stored as a positive value
- largest_winning_cycle_pnl REAL DEFAULT 0.0,
- largest_losing_cycle_pnl REAL DEFAULT 0.0, -- Stored as a positive value
- first_cycle_closed_at TEXT,
- last_cycle_closed_at TEXT,
- total_cancelled_cycles INTEGER DEFAULT 0, -- Count of lifecycles that ended in 'cancelled'
- updated_at TEXT DEFAULT CURRENT_TIMESTAMP,
- total_duration_seconds INTEGER DEFAULT 0
- )
- """,
- """
- CREATE TABLE IF NOT EXISTS daily_aggregated_stats (
- date TEXT NOT NULL, -- YYYY-MM-DD
- token TEXT NOT NULL, -- Specific token or a general identifier like '_OVERALL_'
- realized_pnl REAL DEFAULT 0.0,
- completed_cycles INTEGER DEFAULT 0,
- entry_volume REAL DEFAULT 0.0,
- exit_volume REAL DEFAULT 0.0,
- PRIMARY KEY (date, token)
- )
- """,
- """
- CREATE INDEX IF NOT EXISTS idx_daily_stats_date_token ON daily_aggregated_stats (date, token);
- """
- ])
- for query in queries:
- self._execute_query(query)
-
- logger.info("SQLite tables ensured for TradingStats.")
- def _initialize_metadata(self):
- """Initialize metadata if not already present."""
- start_date = self._get_metadata('start_date')
- initial_balance = self._get_metadata('initial_balance')
- if start_date is None:
- self._set_metadata('start_date', datetime.now(timezone.utc).isoformat())
- logger.info("Initialized 'start_date' in metadata.")
-
- if initial_balance is None:
- self._set_metadata('initial_balance', '0.0')
- logger.info("Initialized 'initial_balance' in metadata.")
-
- logger.info(f"TradingStats initialized. Start Date: {self._get_metadata('start_date')}, Initial Balance: {self._get_metadata('initial_balance')}")
- def _get_metadata(self, key: str) -> Optional[str]:
- """Retrieve a value from the metadata table."""
- row = self._fetchone_query("SELECT value FROM metadata WHERE key = ?", (key,))
- return row['value'] if row else None
- def _set_metadata(self, key: str, value: str):
- """Set a value in the metadata table."""
- self._execute_query("INSERT OR REPLACE INTO metadata (key, value) VALUES (?, ?)", (key, value))
- def set_initial_balance(self, balance: float):
- """Set the initial balance if not already set or zero."""
- current_initial_balance_str = self._get_metadata('initial_balance')
- current_initial_balance = float(current_initial_balance_str) if current_initial_balance_str else 0.0
-
- if current_initial_balance == 0.0: # Only set if it's effectively unset
- self._set_metadata('initial_balance', str(balance))
- # Also set start_date if it's the first time setting balance
- if self._get_metadata('start_date') is None or float(current_initial_balance_str if current_initial_balance_str else '0.0') == 0.0:
- self._set_metadata('start_date', datetime.now(timezone.utc).isoformat())
- formatter = get_formatter()
- logger.info(f"Initial balance set to: {formatter.format_price_with_symbol(balance)}")
- else:
- formatter = get_formatter()
- logger.info(f"Initial balance already set to {formatter.format_price_with_symbol(current_initial_balance)}. Not changing.")
- def record_balance(self, balance: float):
- """Record daily balance snapshot."""
- today_iso = datetime.now(timezone.utc).date().isoformat()
- now_iso = datetime.now(timezone.utc).isoformat()
-
- existing_entry = self._fetchone_query("SELECT date FROM daily_balances WHERE date = ?", (today_iso,))
- if existing_entry:
- self._execute_query("UPDATE daily_balances SET balance = ?, timestamp = ? WHERE date = ?",
- (balance, now_iso, today_iso))
- else:
- self._execute_query("INSERT INTO daily_balances (date, balance, timestamp) VALUES (?, ?, ?)",
- (today_iso, balance, now_iso))
- # logger.debug(f"Recorded balance for {today_iso}: ${balance:.2f}") # Potentially too verbose
- def record_trade(self, symbol: str, side: str, amount: float, price: float,
- exchange_fill_id: Optional[str] = None, trade_type: str = "manual",
- pnl: Optional[float] = None, timestamp: Optional[str] = None,
- linked_order_table_id_to_link: Optional[int] = None):
- """Record a trade in the database."""
- if timestamp is None:
- timestamp = datetime.now(timezone.utc).isoformat()
-
- value = amount * price
- self._execute_query(
- "INSERT OR IGNORE INTO trades (symbol, side, amount, price, value, trade_type, timestamp, exchange_fill_id, pnl, linked_order_table_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
- (symbol, side, amount, price, value, trade_type, timestamp, exchange_fill_id, pnl or 0.0, linked_order_table_id_to_link)
- )
- formatter = get_formatter()
- # Assuming symbol's base asset for amount formatting. If symbol is like BTC/USDT, base is BTC.
- base_asset_for_amount = symbol.split('/')[0] if '/' in symbol else symbol
- logger.info(f"📈 Trade recorded: {side.upper()} {formatter.format_amount(amount, base_asset_for_amount)} {symbol} @ {formatter.format_price(price, symbol)} ({formatter.format_price(value, symbol)}) [{trade_type}]")
- def get_all_trades(self) -> List[Dict[str, Any]]:
- """Fetch all trades from the database, ordered by timestamp."""
- return self._fetch_query("SELECT * FROM trades ORDER BY timestamp ASC")
- def get_trade_by_symbol_and_status(self, symbol: str, status: str) -> Optional[Dict[str, Any]]:
- """
- Fetches a single trade record for a given symbol and status.
- Typically used to find an open position master record.
- Assumes that for a given symbol, there's at most one trade record with a specific
- active status like 'position_opened'. If multiple could exist, this fetches the most recent.
- """
- query = "SELECT * FROM trades WHERE symbol = ? AND status = ? ORDER BY id DESC LIMIT 1"
- trade = self._fetchone_query(query, (symbol, status))
- if trade:
- logger.debug(f"Found trade for {symbol} with status {status}: ID {trade.get('id')}")
- # else: # Can be noisy if not finding a trade is a common occurrence
- # logger.debug(f"No trade found for {symbol} with status {status}")
- return trade
-
- def get_basic_stats(self, current_balance: Optional[float] = None) -> Dict[str, Any]:
- """Get basic trading statistics from DB, primarily using aggregated tables."""
-
- # Get counts of open positions (trades that are not yet migrated)
- open_positions_count = self._get_open_positions_count_from_db()
- # Get overall aggregated stats from token_stats table
- query_token_stats_summary = """
- SELECT
- SUM(total_realized_pnl) as total_pnl_from_cycles,
- SUM(total_completed_cycles) as total_completed_cycles_sum,
- MIN(first_cycle_closed_at) as overall_first_cycle_closed,
- MAX(last_cycle_closed_at) as overall_last_cycle_closed
- FROM token_stats
- """
- token_stats_summary = self._fetchone_query(query_token_stats_summary)
- total_pnl_from_cycles = token_stats_summary['total_pnl_from_cycles'] if token_stats_summary and token_stats_summary['total_pnl_from_cycles'] is not None else 0.0
- total_completed_cycles_sum = token_stats_summary['total_completed_cycles_sum'] if token_stats_summary and token_stats_summary['total_completed_cycles_sum'] is not None else 0
- # Total trades considered as sum of completed cycles and currently open positions
- # This redefines 'total_trades' from its previous meaning of individual fills.
- total_trades_redefined = total_completed_cycles_sum + open_positions_count
- initial_balance_str = self._get_metadata('initial_balance')
- initial_balance = float(initial_balance_str) if initial_balance_str else 0.0
-
- start_date_iso = self._get_metadata('start_date')
- start_date_obj = datetime.fromisoformat(start_date_iso) if start_date_iso else datetime.now(timezone.utc)
- days_active = (datetime.now(timezone.utc) - start_date_obj).days + 1
-
- # 'last_trade' timestamp could be the last update to token_stats or an open trade
- last_activity_ts = token_stats_summary['overall_last_cycle_closed'] if token_stats_summary else None
- last_open_trade_ts_row = self._fetchone_query("SELECT MAX(updated_at) as last_update FROM trades WHERE status = 'position_opened'")
- if last_open_trade_ts_row and last_open_trade_ts_row['last_update']:
- if not last_activity_ts or datetime.fromisoformat(last_open_trade_ts_row['last_update']) > datetime.fromisoformat(last_activity_ts):
- last_activity_ts = last_open_trade_ts_row['last_update']
- # Buy/Sell trades count from individual fills is no longer directly available for completed cycles.
- # If needed, this requires schema change in token_stats or a different approach.
- # For now, these are omitted from basic_stats.
- return {
- 'total_trades': total_trades_redefined, # This is now cycles + open positions
- 'completed_trades': total_completed_cycles_sum, # This is sum of total_completed_cycles from token_stats
- # 'buy_trades': buy_trades_count, # Omitted
- # 'sell_trades': sell_trades_count, # Omitted
- 'initial_balance': initial_balance,
- 'total_pnl': total_pnl_from_cycles, # PNL from closed cycles via token_stats
- 'days_active': days_active,
- 'start_date': start_date_obj.strftime('%Y-%m-%d'),
- 'last_trade': last_activity_ts, # Reflects last known activity (cycle close or open trade update)
- 'open_positions_count': open_positions_count
- }
- def get_performance_stats(self) -> Dict[str, Any]:
- """Calculate advanced performance statistics using aggregated data from token_stats."""
- query = """
- SELECT
- SUM(total_completed_cycles) as total_cycles,
- SUM(winning_cycles) as total_wins,
- SUM(losing_cycles) as total_losses,
- SUM(sum_of_winning_pnl) as total_winning_pnl,
- SUM(sum_of_losing_pnl) as total_losing_pnl, -- Stored positive
- MAX(largest_winning_cycle_pnl) as overall_largest_win,
- MAX(largest_losing_cycle_pnl) as overall_largest_loss -- Stored positive
- FROM token_stats
- """
- summary = self._fetchone_query(query)
- # Add total volume
- volume_summary = self._fetchone_query("SELECT SUM(total_entry_volume) as total_volume FROM token_stats")
- total_trading_volume = volume_summary['total_volume'] if volume_summary and volume_summary['total_volume'] is not None else 0.0
-
- # 🆕 Calculate Average Trade Duration
- duration_summary = self._fetchone_query("SELECT SUM(total_duration_seconds) as total_seconds, SUM(total_completed_cycles) as total_cycles FROM token_stats")
- avg_trade_duration_formatted = "N/A"
- if duration_summary and duration_summary['total_cycles'] and duration_summary['total_cycles'] > 0:
- avg_seconds = duration_summary['total_seconds'] / duration_summary['total_cycles']
- avg_trade_duration_formatted = self._format_duration(avg_seconds)
- # Get individual token performances for best/worst
- all_token_perf_stats = self.get_token_performance()
- best_token_pnl_pct = -float('inf')
- best_token_name = "N/A"
- worst_token_pnl_pct = float('inf')
- worst_token_name = "N/A"
- if all_token_perf_stats:
- for token_name_iter, stats_data in all_token_perf_stats.items():
- pnl_pct = stats_data.get('pnl_percentage', 0.0)
- # Ensure token has completed trades and pnl_pct is a valid number
- if stats_data.get('completed_trades', 0) > 0 and isinstance(pnl_pct, (int, float)) and not math.isinf(pnl_pct) and not math.isnan(pnl_pct):
- if pnl_pct > best_token_pnl_pct:
- best_token_pnl_pct = pnl_pct
- best_token_name = token_name_iter
- if pnl_pct < worst_token_pnl_pct:
- worst_token_pnl_pct = pnl_pct
- worst_token_name = token_name_iter
-
- # Handle cases where no valid tokens were found for best/worst
- if best_token_name == "N/A":
- best_token_pnl_pct = 0.0
- if worst_token_name == "N/A":
- worst_token_pnl_pct = 0.0
- if not summary or summary['total_cycles'] is None or summary['total_cycles'] == 0:
- return {
- 'win_rate': 0.0, 'profit_factor': 0.0, 'avg_win': 0.0, 'avg_loss': 0.0,
- 'largest_win': 0.0, 'largest_loss': 0.0,
- 'total_wins': 0, 'total_losses': 0, 'expectancy': 0.0,
- 'total_trading_volume': total_trading_volume,
- 'best_performing_token': {'name': best_token_name, 'pnl_percentage': best_token_pnl_pct},
- 'worst_performing_token': {'name': worst_token_name, 'pnl_percentage': worst_token_pnl_pct},
- 'avg_trade_duration': avg_trade_duration_formatted,
- }
- total_completed_count = summary['total_cycles']
- total_wins_count = summary['total_wins'] if summary['total_wins'] is not None else 0
- total_losses_count = summary['total_losses'] if summary['total_losses'] is not None else 0
-
- win_rate = (total_wins_count / total_completed_count * 100) if total_completed_count > 0 else 0.0
-
- sum_of_wins = summary['total_winning_pnl'] if summary['total_winning_pnl'] is not None else 0.0
- sum_of_losses = summary['total_losing_pnl'] if summary['total_losing_pnl'] is not None else 0.0 # This is sum of absolute losses
- profit_factor = (sum_of_wins / sum_of_losses) if sum_of_losses > 0 else float('inf') if sum_of_wins > 0 else 0.0
-
- avg_win = (sum_of_wins / total_wins_count) if total_wins_count > 0 else 0.0
- avg_loss = (sum_of_losses / total_losses_count) if total_losses_count > 0 else 0.0 # Avg of absolute losses
-
- largest_win = summary['overall_largest_win'] if summary['overall_largest_win'] is not None else 0.0
- largest_loss = summary['overall_largest_loss'] if summary['overall_largest_loss'] is not None else 0.0 # Largest absolute loss
- # Consecutive wins/losses removed as it's hard to track with this aggregation model.
-
- expectancy = (avg_win * (win_rate / 100)) - (avg_loss * (1 - (win_rate / 100)))
- return {
- 'win_rate': win_rate, 'profit_factor': profit_factor, 'avg_win': avg_win, 'avg_loss': avg_loss,
- 'largest_win': largest_win, 'largest_loss': largest_loss,
- 'total_wins': total_wins_count, 'total_losses': total_losses_count, 'expectancy': expectancy,
- 'total_trading_volume': total_trading_volume,
- 'best_performing_token': {'name': best_token_name, 'pnl_percentage': best_token_pnl_pct},
- 'worst_performing_token': {'name': worst_token_name, 'pnl_percentage': worst_token_pnl_pct},
- 'avg_trade_duration': avg_trade_duration_formatted,
- }
- def get_risk_metrics(self) -> Dict[str, Any]:
- """Calculate risk-adjusted metrics from daily balances."""
- # Get live max drawdown from metadata
- max_drawdown_live_str = self._get_metadata('drawdown_max_drawdown_pct')
- max_drawdown_live = float(max_drawdown_live_str) if max_drawdown_live_str else 0.0
- daily_balances_data = self._fetch_query("SELECT balance FROM daily_balances ORDER BY date ASC")
-
- if not daily_balances_data or len(daily_balances_data) < 2:
- return {'sharpe_ratio': 0.0, 'sortino_ratio': 0.0, 'max_drawdown': 0.0, 'volatility': 0.0, 'var_95': 0.0, 'max_drawdown_live': max_drawdown_live}
- balances = [entry['balance'] for entry in daily_balances_data]
- returns = np.diff(balances) / balances[:-1] # Calculate daily returns
- returns = returns[np.isfinite(returns)] # Remove NaNs or Infs if any balance was 0
- if returns.size == 0:
- return {'sharpe_ratio': 0.0, 'sortino_ratio': 0.0, 'max_drawdown': 0.0, 'volatility': 0.0, 'var_95': 0.0, 'max_drawdown_live': max_drawdown_live}
- risk_free_rate_daily = (1 + 0.02)**(1/365) - 1 # Approx 2% annual risk-free rate, daily
-
- excess_returns = returns - risk_free_rate_daily
- sharpe_ratio = np.mean(excess_returns) / np.std(returns) * np.sqrt(365) if np.std(returns) > 0 else 0.0
-
- downside_returns = returns[returns < 0]
- downside_std = np.std(downside_returns) if len(downside_returns) > 0 else 0.0
- sortino_ratio = np.mean(excess_returns) / downside_std * np.sqrt(365) if downside_std > 0 else 0.0
-
- cumulative_returns = np.cumprod(1 + returns)
- peak = np.maximum.accumulate(cumulative_returns)
- drawdown = (cumulative_returns - peak) / peak
- max_drawdown_daily_pct = abs(np.min(drawdown) * 100) if drawdown.size > 0 else 0.0
-
- volatility_pct = np.std(returns) * np.sqrt(365) * 100
- var_95_pct = abs(np.percentile(returns, 5) * 100) if returns.size > 0 else 0.0
-
- return {
- 'sharpe_ratio': sharpe_ratio, 'sortino_ratio': sortino_ratio,
- 'max_drawdown': max_drawdown_daily_pct, 'volatility': volatility_pct,
- 'var_95': var_95_pct, 'max_drawdown_live': max_drawdown_live
- }
- def get_comprehensive_stats(self, current_balance: Optional[float] = None) -> Dict[str, Any]:
- """Get all statistics combined."""
- if current_balance is not None: # Ensure it's not just None, but explicitly provided
- self.record_balance(current_balance) # Record current balance for today
-
- basic = self.get_basic_stats(current_balance) # Pass current_balance for P&L context if needed
- performance = self.get_performance_stats()
- risk = self.get_risk_metrics()
-
- initial_balance = basic['initial_balance']
- total_return_pct = 0.0
- # Use current_balance if available and valid for total return calculation
- # Otherwise, PNL from basic_stats (closed trades) is the primary PNL source
- # This needs careful thought: current_balance reflects unrealized PNL too.
- # The original code used current_balance - initial_balance for total_pnl if current_balance provided.
-
- effective_balance_for_return = current_balance if current_balance is not None else (initial_balance + basic['total_pnl'])
- if initial_balance > 0:
- total_return_pct = ((effective_balance_for_return - initial_balance) / initial_balance) * 100
-
- return {
- 'basic': basic,
- 'performance': performance,
- 'risk': risk,
- 'current_balance': current_balance if current_balance is not None else initial_balance + basic['total_pnl'], # Best estimate
- 'total_return': total_return_pct, # Percentage
- 'last_updated': datetime.now(timezone.utc).isoformat()
- }
- def _get_open_positions_count_from_db(self) -> int:
- """🧹 PHASE 4: Get count of open positions from enhanced trades table."""
- row = self._fetchone_query("SELECT COUNT(DISTINCT symbol) as count FROM trades WHERE status = 'position_opened'")
- return row['count'] if row else 0
- def format_stats_message(self, current_balance: Optional[float] = None) -> str:
- """Format stats for Telegram display using data from DB."""
- try:
- stats = self.get_comprehensive_stats(current_balance)
- formatter = get_formatter()
-
- basic = stats['basic']
- perf = stats['performance']
- risk = stats['risk'] # For portfolio drawdown
-
- effective_current_balance = stats['current_balance']
- initial_bal = basic['initial_balance']
- total_pnl_val = effective_current_balance - initial_bal if initial_bal > 0 and current_balance is not None else basic['total_pnl']
- total_return_pct = (total_pnl_val / initial_bal * 100) if initial_bal > 0 else 0.0
- pnl_emoji = "🟢" if total_pnl_val >= 0 else "🔴"
- open_positions_count = basic['open_positions_count']
- stats_text_parts = []
- stats_text_parts.append(f"📊 <b>Trading Statistics</b>\n")
-
- # Account Overview
- stats_text_parts.append(f"\n💰 <b>Account Overview:</b>")
- stats_text_parts.append(f"• Current Balance: {formatter.format_price_with_symbol(effective_current_balance)}")
- stats_text_parts.append(f"• Initial Balance: {formatter.format_price_with_symbol(initial_bal)}")
- stats_text_parts.append(f"• Open Positions: {open_positions_count}")
- stats_text_parts.append(f"• {pnl_emoji} Total P&L: {formatter.format_price_with_symbol(total_pnl_val)} ({total_return_pct:+.2f}%)")
- stats_text_parts.append(f"• Days Active: {basic['days_active']}\n")
-
- # Performance Metrics
- stats_text_parts.append(f"\n🏆 <b>Performance Metrics:</b>")
- stats_text_parts.append(f"• Total Completed Trades: {basic['completed_trades']}")
- stats_text_parts.append(f"• Trading Volume (Entry Vol.): {formatter.format_price_with_symbol(perf.get('total_trading_volume', 0.0))}")
- stats_text_parts.append(f"• Profit Factor: {perf['profit_factor']:.2f}")
- stats_text_parts.append(f"• Expectancy: {formatter.format_price_with_symbol(perf['expectancy'])} (Value per trade)")
- # Note for Expectancy Percentage: \"[Info: Percentage representation requires further definition]\" might be too verbose for typical display.
-
- stats_text_parts.append(f"• Largest Winning Trade: {formatter.format_price_with_symbol(perf['largest_win'])} (Value)")
- stats_text_parts.append(f"• Largest Losing Trade: {formatter.format_price_with_symbol(-perf['largest_loss'])} (Value)")
- # Note for Largest Trade P&L %: Similar to expectancy, noting \"[Info: P&L % for specific trades requires data enhancement]\" in the bot message might be too much.
- best_token_stats = perf.get('best_performing_token', {'name': 'N/A', 'pnl_percentage': 0.0})
- worst_token_stats = perf.get('worst_performing_token', {'name': 'N/A', 'pnl_percentage': 0.0})
- stats_text_parts.append(f"• Best Performing Token: {best_token_stats['name']} ({best_token_stats['pnl_percentage']:+.2f}%)")
- stats_text_parts.append(f"• Worst Performing Token: {worst_token_stats['name']} ({worst_token_stats['pnl_percentage']:+.2f}%)")
-
- stats_text_parts.append(f"• Average Trade Duration: {perf.get('avg_trade_duration', 'N/A')}")
- stats_text_parts.append(f"• Portfolio Max Drawdown: {risk.get('max_drawdown_live', 0.0):.2f}% <i>(Live)</i>")
- # Future note: \"[Info: Trading P&L specific drawdown analysis planned]\"
-
- # Session Info
- stats_text_parts.append(f"\n\n⏰ <b>Session Info:</b>")
- stats_text_parts.append(f"• Bot Started: {basic['start_date']}")
- stats_text_parts.append(f"• Stats Last Updated: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}")
-
- return "\n".join(stats_text_parts).strip()
-
- except Exception as e:
- logger.error(f"Error formatting stats message: {e}", exc_info=True)
- return f"""📊 <b>Trading Statistics</b>\n\n❌ <b>Error loading statistics</b>\n\n🔧 <b>Debug info:</b> {str(e)[:100]}"""
- def get_recent_trades(self, limit: int = 10) -> List[Dict[str, Any]]:
- """Get recent trades from DB (these are active/open trades, as completed ones are migrated)."""
- return self._fetch_query("SELECT * FROM trades WHERE status = 'position_opened' ORDER BY updated_at DESC LIMIT ?", (limit,))
- def get_token_performance(self) -> Dict[str, Dict[str, Any]]:
- """Get performance statistics grouped by token using the token_stats table."""
- all_token_stats = self._fetch_query("SELECT * FROM token_stats ORDER BY token ASC")
-
- token_performance_map = {}
- for record in all_token_stats:
- token = record['token']
- total_pnl = record.get('total_realized_pnl', 0.0)
- # total_volume_sold now refers to total_exit_volume from token_stats
- total_volume = record.get('total_entry_volume', 0.0)
-
- pnl_percentage = (total_pnl / total_volume * 100) if total_volume > 0 else 0.0
-
- total_completed_count = record.get('total_completed_cycles', 0)
- total_wins_count = record.get('winning_cycles', 0)
- total_losses_count = record.get('losing_cycles', 0)
-
- win_rate = (total_wins_count / total_completed_count * 100) if total_completed_count > 0 else 0.0
-
- sum_of_wins = record.get('sum_of_winning_pnl', 0.0)
- sum_of_losses = record.get('sum_of_losing_pnl', 0.0) # Stored positive
- profit_factor = (sum_of_wins / sum_of_losses) if sum_of_losses > 0 else float('inf') if sum_of_wins > 0 else 0.0
-
- avg_win = (sum_of_wins / total_wins_count) if total_wins_count > 0 else 0.0
- avg_loss = (sum_of_losses / total_losses_count) if total_losses_count > 0 else 0.0
- expectancy = (avg_win * (win_rate / 100)) - (avg_loss * (1 - (win_rate / 100)))
-
- largest_win = record.get('largest_winning_cycle_pnl', 0.0)
- largest_loss = record.get('largest_losing_cycle_pnl', 0.0) # Stored positive
- token_performance_map[token] = {
- 'token': token, # Added for easier access if iterating over values
- 'total_pnl': total_pnl,
- 'pnl_percentage': pnl_percentage,
- 'completed_trades': total_completed_count,
- 'total_volume': total_volume, # This is total_entry_volume
- 'win_rate': win_rate,
- 'total_wins': total_wins_count,
- 'total_losses': total_losses_count,
- 'profit_factor': profit_factor,
- 'expectancy': expectancy,
- 'largest_win': largest_win,
- 'largest_loss': largest_loss,
- 'avg_win': avg_win,
- 'avg_loss': avg_loss,
- 'first_cycle_closed_at': record.get('first_cycle_closed_at'),
- 'last_cycle_closed_at': record.get('last_cycle_closed_at'),
- 'total_cancelled': record.get('total_cancelled_cycles', 0),
- 'total_duration_seconds': record.get('total_duration_seconds', 0),
- 'avg_trade_duration': self._format_duration(record.get('total_duration_seconds', 0) / total_completed_count) if total_completed_count > 0 else "N/A"
- }
- return token_performance_map
- def get_token_detailed_stats(self, token: str) -> Dict[str, Any]:
- """Get detailed statistics for a specific token using token_stats and current open trades."""
- upper_token = _normalize_token_case(token)
-
- # Get aggregated performance from token_stats
- token_agg_stats = self._fetchone_query("SELECT * FROM token_stats WHERE token = ?", (upper_token,))
-
- # Get currently open trades for this token from the 'trades' table (not yet migrated)
- # These are not completed cycles but represent current exposure.
- open_trades_for_token = self._fetch_query(
- "SELECT * FROM trades WHERE symbol LIKE ? AND status = 'position_opened' ORDER BY timestamp ASC",
- (f"{upper_token}/%",)
- )
-
- if not token_agg_stats and not open_trades_for_token:
- return {
- 'token': upper_token, 'total_trades': 0, 'total_pnl': 0.0, 'win_rate': 0.0,
- 'message': f"No trading history or open positions found for {upper_token}"
- }
- # Initialize with empty performance if no aggregated data
- perf_stats = {}
- if token_agg_stats:
- perf_stats = {
- 'completed_trades': token_agg_stats.get('total_completed_cycles', 0),
- 'total_pnl': token_agg_stats.get('total_realized_pnl', 0.0),
- 'pnl_percentage': 0.0, # Recalculate if needed, or store avg pnl_percentage
- 'win_rate': 0.0,
- 'profit_factor': token_agg_stats.get('profit_factor'), # Placeholder, need to calc from sums
- 'avg_win': 0.0,
- 'avg_loss': 0.0,
- 'largest_win': token_agg_stats.get('largest_winning_cycle_pnl', 0.0),
- 'largest_loss': token_agg_stats.get('largest_losing_cycle_pnl', 0.0),
- 'expectancy': 0.0,
- 'total_wins': token_agg_stats.get('winning_cycles',0),
- 'total_losses': token_agg_stats.get('losing_cycles',0),
- 'completed_entry_volume': token_agg_stats.get('total_entry_volume', 0.0),
- 'completed_exit_volume': token_agg_stats.get('total_exit_volume', 0.0),
- 'total_cancelled': token_agg_stats.get('total_cancelled_cycles', 0),
- 'total_duration_seconds': token_agg_stats.get('total_duration_seconds', 0),
- 'avg_trade_duration': self._format_duration(token_agg_stats.get('total_duration_seconds', 0) / token_agg_stats.get('total_completed_cycles', 0)) if token_agg_stats.get('total_completed_cycles', 0) > 0 else "N/A"
- }
- if perf_stats['completed_trades'] > 0:
- perf_stats['win_rate'] = (perf_stats['total_wins'] / perf_stats['completed_trades'] * 100) if perf_stats['completed_trades'] > 0 else 0.0
- sum_wins = token_agg_stats.get('sum_of_winning_pnl', 0.0)
- sum_losses = token_agg_stats.get('sum_of_losing_pnl', 0.0)
- perf_stats['profit_factor'] = (sum_wins / sum_losses) if sum_losses > 0 else float('inf') if sum_wins > 0 else 0.0
- perf_stats['avg_win'] = (sum_wins / perf_stats['total_wins']) if perf_stats['total_wins'] > 0 else 0.0
- perf_stats['avg_loss'] = (sum_losses / perf_stats['total_losses']) if perf_stats['total_losses'] > 0 else 0.0
- perf_stats['expectancy'] = (perf_stats['avg_win'] * (perf_stats['win_rate'] / 100)) - (perf_stats['avg_loss'] * (1 - (perf_stats['win_rate'] / 100)))
- if perf_stats['completed_entry_volume'] > 0:
- perf_stats['pnl_percentage'] = (perf_stats['total_pnl'] / perf_stats['completed_entry_volume'] * 100)
- else: # No completed cycles for this token yet
- perf_stats = {
- 'completed_trades': 0, 'total_pnl': 0.0, 'pnl_percentage': 0.0, 'win_rate': 0.0,
- 'profit_factor': 0.0, 'avg_win': 0.0, 'avg_loss': 0.0, 'largest_win': 0.0, 'largest_loss': 0.0,
- 'expectancy': 0.0, 'total_wins':0, 'total_losses':0,
- 'completed_entry_volume': 0.0, 'completed_exit_volume': 0.0, 'total_cancelled': 0,
- 'total_duration_seconds': 0, 'avg_trade_duration': "N/A"
- }
- # Info about open positions for this token (raw trades, not cycles)
- open_positions_summary = []
- total_open_value = 0.0
- total_open_unrealized_pnl = 0.0
- for op_trade in open_trades_for_token:
- open_positions_summary.append({
- 'lifecycle_id': op_trade.get('trade_lifecycle_id'),
- 'side': op_trade.get('position_side'),
- 'amount': op_trade.get('current_position_size'),
- 'entry_price': op_trade.get('entry_price'),
- 'mark_price': op_trade.get('mark_price'),
- 'unrealized_pnl': op_trade.get('unrealized_pnl'),
- 'opened_at': op_trade.get('position_opened_at')
- })
- total_open_value += op_trade.get('value', 0.0) # Initial value of open positions
- total_open_unrealized_pnl += op_trade.get('unrealized_pnl', 0.0)
- # Raw individual orders from 'orders' table for this token can be complex to summarize here
- # The old version counted 'buy_orders' and 'sell_orders' from all trades for the token.
- # This is no longer straightforward for completed cycles.
- # We can count open orders for this token.
- open_orders_count_row = self._fetchone_query(
- "SELECT COUNT(*) as count FROM orders WHERE symbol LIKE ? AND status IN ('open', 'submitted', 'pending_trigger')",
- (f"{upper_token}/%",)
- )
- current_open_orders_for_token = open_orders_count_row['count'] if open_orders_count_row else 0
- # 'total_trades' here could mean total orders ever placed for this token, or completed cycles + open positions
- # Let's define it as completed cycles + number of currently open positions for consistency with get_basic_stats
- effective_total_trades = perf_stats['completed_trades'] + len(open_trades_for_token)
- return {
- 'token': upper_token,
- 'message': f"Statistics for {upper_token}",
- 'performance_summary': perf_stats, # From token_stats table
- 'open_positions': open_positions_summary, # List of currently open positions
- 'open_positions_count': len(open_trades_for_token),
- 'current_open_orders_count': current_open_orders_for_token,
- 'summary_total_trades': effective_total_trades, # Completed cycles + open positions
- 'summary_total_realized_pnl': perf_stats['total_pnl'],
- 'summary_total_unrealized_pnl': total_open_unrealized_pnl,
- # 'cycles': token_cycles # Raw cycle data for completed trades is no longer stored directly after migration
- }
- def get_daily_stats(self, limit: int = 10) -> List[Dict[str, Any]]:
- """Get daily performance stats for the last N days from daily_aggregated_stats."""
- daily_stats_list = []
- today_utc = datetime.now(timezone.utc).date()
- for i in range(limit):
- target_date = today_utc - timedelta(days=i)
- date_str = target_date.strftime('%Y-%m-%d')
- date_formatted = target_date.strftime('%m/%d') # For display
- # Query for all tokens for that day and sum them up
- # Or, if daily_aggregated_stats stores an _OVERALL_ record, query that.
- # Assuming for now we sum up all token records for a given day.
- day_aggregated_data = self._fetch_query(
- "SELECT SUM(realized_pnl) as pnl, SUM(completed_cycles) as trades, SUM(exit_volume) as volume FROM daily_aggregated_stats WHERE date = ?",
- (date_str,)
- )
-
- stats_for_day = None
- if day_aggregated_data and len(day_aggregated_data) > 0 and day_aggregated_data[0]['trades'] is not None:
- stats_for_day = day_aggregated_data[0]
- # Calculate pnl_pct if volume is present and positive
- pnl = stats_for_day.get('pnl', 0.0) or 0.0
- volume = stats_for_day.get('volume', 0.0) or 0.0
- stats_for_day['pnl_pct'] = (pnl / volume * 100) if volume > 0 else 0.0
- # Ensure trades is an int
- stats_for_day['trades'] = int(stats_for_day.get('trades', 0) or 0)
- if stats_for_day and stats_for_day['trades'] > 0:
- daily_stats_list.append({
- 'date': date_str, 'date_formatted': date_formatted, 'has_trades': True,
- **stats_for_day
- })
- else:
- daily_stats_list.append({
- 'date': date_str, 'date_formatted': date_formatted, 'has_trades': False,
- 'trades': 0, 'pnl': 0.0, 'volume': 0.0, 'pnl_pct': 0.0
- })
- return daily_stats_list
- def get_weekly_stats(self, limit: int = 10) -> List[Dict[str, Any]]:
- """Get weekly performance stats for the last N weeks by aggregating daily_aggregated_stats."""
- weekly_stats_list = []
- today_utc = datetime.now(timezone.utc).date()
- for i in range(limit):
- target_monday = today_utc - timedelta(days=today_utc.weekday() + (i * 7))
- target_sunday = target_monday + timedelta(days=6)
-
- week_key_display = f"{target_monday.strftime('%Y-W%W')}" # For internal key if needed
- week_formatted_display = f"{target_monday.strftime('%m/%d')}-{target_sunday.strftime('%m/%d/%y')}"
- # Fetch daily records for this week range
- daily_records_for_week = self._fetch_query(
- "SELECT date, realized_pnl, completed_cycles, exit_volume FROM daily_aggregated_stats WHERE date BETWEEN ? AND ?",
- (target_monday.strftime('%Y-%m-%d'), target_sunday.strftime('%Y-%m-%d'))
- )
-
- if daily_records_for_week:
- total_pnl_week = sum(d.get('realized_pnl', 0.0) or 0.0 for d in daily_records_for_week)
- total_trades_week = sum(d.get('completed_cycles', 0) or 0 for d in daily_records_for_week)
- total_volume_week = sum(d.get('exit_volume', 0.0) or 0.0 for d in daily_records_for_week)
- pnl_pct_week = (total_pnl_week / total_volume_week * 100) if total_volume_week > 0 else 0.0
-
- if total_trades_week > 0:
- weekly_stats_list.append({
- 'week': week_key_display,
- 'week_formatted': week_formatted_display,
- 'has_trades': True,
- 'pnl': total_pnl_week,
- 'trades': total_trades_week,
- 'volume': total_volume_week,
- 'pnl_pct': pnl_pct_week
- })
- else:
- weekly_stats_list.append({
- 'week': week_key_display, 'week_formatted': week_formatted_display, 'has_trades': False,
- 'trades': 0, 'pnl': 0.0, 'volume': 0.0, 'pnl_pct': 0.0
- })
- else:
- weekly_stats_list.append({
- 'week': week_key_display, 'week_formatted': week_formatted_display, 'has_trades': False,
- 'trades': 0, 'pnl': 0.0, 'volume': 0.0, 'pnl_pct': 0.0
- })
- return weekly_stats_list
- def get_monthly_stats(self, limit: int = 10) -> List[Dict[str, Any]]:
- """Get monthly performance stats for the last N months by aggregating daily_aggregated_stats."""
- monthly_stats_list = []
- current_month_start_utc = datetime.now(timezone.utc).date().replace(day=1)
- for i in range(limit):
- year = current_month_start_utc.year
- month = current_month_start_utc.month - i
- while month <= 0:
- month += 12
- year -= 1
-
- target_month_start_date = datetime(year, month, 1, tzinfo=timezone.utc).date()
- # Find end of target month
- next_month_start_date = datetime(year + (month // 12), (month % 12) + 1, 1, tzinfo=timezone.utc).date() if month < 12 else datetime(year + 1, 1, 1, tzinfo=timezone.utc).date()
- target_month_end_date = next_month_start_date - timedelta(days=1)
-
- month_key_display = target_month_start_date.strftime('%Y-%m')
- month_formatted_display = target_month_start_date.strftime('%b %Y')
- daily_records_for_month = self._fetch_query(
- "SELECT date, realized_pnl, completed_cycles, exit_volume FROM daily_aggregated_stats WHERE date BETWEEN ? AND ?",
- (target_month_start_date.strftime('%Y-%m-%d'), target_month_end_date.strftime('%Y-%m-%d'))
- )
- if daily_records_for_month:
- total_pnl_month = sum(d.get('realized_pnl', 0.0) or 0.0 for d in daily_records_for_month)
- total_trades_month = sum(d.get('completed_cycles', 0) or 0 for d in daily_records_for_month)
- total_volume_month = sum(d.get('exit_volume', 0.0) or 0.0 for d in daily_records_for_month)
- pnl_pct_month = (total_pnl_month / total_volume_month * 100) if total_volume_month > 0 else 0.0
- if total_trades_month > 0:
- monthly_stats_list.append({
- 'month': month_key_display,
- 'month_formatted': month_formatted_display,
- 'has_trades': True,
- 'pnl': total_pnl_month,
- 'trades': total_trades_month,
- 'volume': total_volume_month,
- 'pnl_pct': pnl_pct_month
- })
- else:
- monthly_stats_list.append({
- 'month': month_key_display, 'month_formatted': month_formatted_display, 'has_trades': False,
- 'trades': 0, 'pnl': 0.0, 'volume': 0.0, 'pnl_pct': 0.0
- })
- else:
- monthly_stats_list.append({
- 'month': month_key_display, 'month_formatted': month_formatted_display, 'has_trades': False,
- 'trades': 0, 'pnl': 0.0, 'volume': 0.0, 'pnl_pct': 0.0
- })
- return monthly_stats_list
- def record_deposit(self, amount: float, timestamp: Optional[str] = None,
- deposit_id: Optional[str] = None, description: Optional[str] = None):
- """Record a deposit."""
- ts = timestamp if timestamp else datetime.now(timezone.utc).isoformat()
- formatter = get_formatter()
- formatted_amount_str = formatter.format_price_with_symbol(amount)
- desc = description if description else f'Deposit of {formatted_amount_str}'
-
- self._execute_query(
- "INSERT INTO balance_adjustments (adjustment_id, timestamp, type, amount, description) VALUES (?, ?, ?, ?, ?)",
- (deposit_id or str(uuid.uuid4()), ts, 'deposit', amount, desc) # Ensured uuid is string
- )
- # Adjust initial_balance in metadata to reflect capital changes
- current_initial = float(self._get_metadata('initial_balance') or '0.0')
- self._set_metadata('initial_balance', str(current_initial + amount))
- logger.info(f"💰 Recorded deposit: {formatted_amount_str}. New effective initial balance: {formatter.format_price_with_symbol(current_initial + amount)}")
- def record_withdrawal(self, amount: float, timestamp: Optional[str] = None,
- withdrawal_id: Optional[str] = None, description: Optional[str] = None):
- """Record a withdrawal."""
- ts = timestamp if timestamp else datetime.now(timezone.utc).isoformat()
- formatter = get_formatter()
- formatted_amount_str = formatter.format_price_with_symbol(amount)
- desc = description if description else f'Withdrawal of {formatted_amount_str}'
-
- self._execute_query(
- "INSERT INTO balance_adjustments (adjustment_id, timestamp, type, amount, description) VALUES (?, ?, ?, ?, ?)",
- (withdrawal_id or str(uuid.uuid4()), ts, 'withdrawal', amount, desc) # Ensured uuid is string
- )
- current_initial = float(self._get_metadata('initial_balance') or '0.0')
- self._set_metadata('initial_balance', str(current_initial - amount))
- logger.info(f"💸 Recorded withdrawal: {formatted_amount_str}. New effective initial balance: {formatter.format_price_with_symbol(current_initial - amount)}")
- def get_balance_adjustments_summary(self) -> Dict[str, Any]:
- """Get summary of all balance adjustments from DB."""
- adjustments = self._fetch_query("SELECT type, amount, timestamp FROM balance_adjustments ORDER BY timestamp ASC")
-
- if not adjustments:
- return {'total_deposits': 0.0, 'total_withdrawals': 0.0, 'net_adjustment': 0.0,
- 'adjustment_count': 0, 'last_adjustment': None}
-
- total_deposits = sum(adj['amount'] for adj in adjustments if adj['type'] == 'deposit')
- total_withdrawals = sum(adj['amount'] for adj in adjustments if adj['type'] == 'withdrawal') # Amounts stored positive
- net_adjustment = total_deposits - total_withdrawals
-
- return {
- 'total_deposits': total_deposits, 'total_withdrawals': total_withdrawals,
- 'net_adjustment': net_adjustment, 'adjustment_count': len(adjustments),
- 'last_adjustment': adjustments[-1]['timestamp'] if adjustments else None
- }
- def close_connection(self):
- """Close the SQLite database connection."""
- if self.conn:
- self.conn.close()
- logger.info("TradingStats SQLite connection closed.")
- def __del__(self):
- """Ensure connection is closed when object is deleted."""
- self.close_connection()
- # --- Order Table Management ---
- def record_order_placed(self, symbol: str, side: str, order_type: str,
- amount_requested: float, price: Optional[float] = None,
- bot_order_ref_id: Optional[str] = None,
- exchange_order_id: Optional[str] = None,
- status: str = 'open',
- parent_bot_order_ref_id: Optional[str] = None) -> Optional[int]:
- """Record a newly placed order in the 'orders' table. Returns the ID of the inserted order or None on failure."""
- now_iso = datetime.now(timezone.utc).isoformat()
- query = """
- INSERT INTO orders (bot_order_ref_id, exchange_order_id, symbol, side, type,
- amount_requested, price, status, timestamp_created, timestamp_updated, parent_bot_order_ref_id)
- VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- """
- params = (bot_order_ref_id, exchange_order_id, symbol, side.lower(), order_type.lower(),
- amount_requested, price, status.lower(), now_iso, now_iso, parent_bot_order_ref_id)
- try:
- cur = self.conn.cursor()
- cur.execute(query, params)
- self.conn.commit()
- order_db_id = cur.lastrowid
- logger.info(f"Recorded order placed: ID {order_db_id}, Symbol {symbol}, Side {side}, Type {order_type}, Amount {amount_requested}, BotRef {bot_order_ref_id}, ExchID {exchange_order_id}")
- return order_db_id
- except sqlite3.IntegrityError as e:
- logger.error(f"Failed to record order due to IntegrityError (likely duplicate bot_order_ref_id '{bot_order_ref_id}' or exchange_order_id '{exchange_order_id}'): {e}")
- return None
- except Exception as e:
- logger.error(f"Failed to record order: {e}")
- return None
- def update_order_status(self, order_db_id: Optional[int] = None, bot_order_ref_id: Optional[str] = None, exchange_order_id: Optional[str] = None,
- new_status: Optional[str] = None, amount_filled_increment: Optional[float] = None, set_exchange_order_id: Optional[str] = None) -> bool:
- """Update an existing order's status and/or amount_filled. Identify order by order_db_id, bot_order_ref_id, or exchange_order_id.
-
- Args:
- order_db_id: Database ID to identify the order
- bot_order_ref_id: Bot's internal reference ID to identify the order
- exchange_order_id: Exchange's order ID to identify the order
- new_status: New status to set
- amount_filled_increment: Amount to add to current filled amount
- set_exchange_order_id: If provided, sets/updates the exchange_order_id field in the database
- """
- if not any([order_db_id, bot_order_ref_id, exchange_order_id]):
- logger.error("Must provide one of order_db_id, bot_order_ref_id, or exchange_order_id to update order.")
- return False
- now_iso = datetime.now(timezone.utc).isoformat()
- set_clauses = []
- params = []
- if new_status:
- set_clauses.append("status = ?")
- params.append(new_status.lower())
-
- if set_exchange_order_id is not None:
- set_clauses.append("exchange_order_id = ?")
- params.append(set_exchange_order_id)
-
- current_amount_filled = 0.0
- identifier_clause = ""
- identifier_param = None
- if order_db_id:
- identifier_clause = "id = ?"
- identifier_param = order_db_id
- elif bot_order_ref_id:
- identifier_clause = "bot_order_ref_id = ?"
- identifier_param = bot_order_ref_id
- elif exchange_order_id:
- identifier_clause = "exchange_order_id = ?"
- identifier_param = exchange_order_id
- if amount_filled_increment is not None and amount_filled_increment > 0:
- # To correctly increment, we might need to fetch current filled amount first if DB doesn't support direct increment easily or atomically with other updates.
- # For simplicity here, assuming we can use SQL's increment if other fields are not changing, or we do it in two steps.
- # Let's assume we fetch first then update to be safe and clear.
- order_data = self._fetchone_query(f"SELECT amount_filled FROM orders WHERE {identifier_clause}", (identifier_param,))
- if order_data:
- current_amount_filled = order_data.get('amount_filled', 0.0)
- else:
- logger.warning(f"Order not found by {identifier_clause}={identifier_param} when trying to increment amount_filled.")
- # Potentially still update status if new_status is provided, but amount_filled won't be right.
- # For now, let's proceed with update if status is there.
- set_clauses.append("amount_filled = ?")
- params.append(current_amount_filled + amount_filled_increment)
- if not set_clauses:
- logger.info("No fields to update for order.")
- return True # No update needed, not an error
- set_clauses.append("timestamp_updated = ?")
- params.append(now_iso)
-
- params.append(identifier_param) # Add identifier param at the end for WHERE clause
- query = f"UPDATE orders SET { ', '.join(set_clauses) } WHERE {identifier_clause}"
-
- try:
- self._execute_query(query, tuple(params))
- log_msg = f"Updated order ({identifier_clause}={identifier_param}): Status to '{new_status or 'N/A'}', Filled increment {amount_filled_increment or 0.0}"
- if set_exchange_order_id is not None:
- log_msg += f", Exchange ID set to '{set_exchange_order_id}'"
- logger.info(log_msg)
- return True
- except Exception as e:
- logger.error(f"Failed to update order ({identifier_clause}={identifier_param}): {e}")
- return False
- def get_order_by_db_id(self, order_db_id: int) -> Optional[Dict[str, Any]]:
- """Fetch an order by its database primary key ID."""
- return self._fetchone_query("SELECT * FROM orders WHERE id = ?", (order_db_id,))
- def get_order_by_bot_ref_id(self, bot_order_ref_id: str) -> Optional[Dict[str, Any]]:
- """Fetch an order by the bot's internal reference ID."""
- return self._fetchone_query("SELECT * FROM orders WHERE bot_order_ref_id = ?", (bot_order_ref_id,))
- def get_order_by_exchange_id(self, exchange_order_id: str) -> Optional[Dict[str, Any]]:
- """Fetch an order by the exchange's order ID."""
- return self._fetchone_query("SELECT * FROM orders WHERE exchange_order_id = ?", (exchange_order_id,))
- def get_orders_by_status(self, status: str, order_type_filter: Optional[str] = None, parent_bot_order_ref_id: Optional[str] = None) -> List[Dict[str, Any]]:
- """Fetch all orders with a specific status, optionally filtering by order_type and parent_bot_order_ref_id."""
- query = "SELECT * FROM orders WHERE status = ?"
- params = [status.lower()]
- if order_type_filter:
- query += " AND type = ?"
- params.append(order_type_filter.lower())
- if parent_bot_order_ref_id:
- query += " AND parent_bot_order_ref_id = ?"
- params.append(parent_bot_order_ref_id)
- query += " ORDER BY timestamp_created ASC"
- return self._fetch_query(query, tuple(params))
- def cancel_linked_orders(self, parent_bot_order_ref_id: str, new_status: str = 'cancelled_parent_filled') -> int:
- """Cancel all orders linked to a parent order (e.g., pending stop losses when parent order fills or gets cancelled).
- Returns the number of orders that were cancelled."""
- linked_orders = self.get_orders_by_status('pending_trigger', parent_bot_order_ref_id=parent_bot_order_ref_id)
- cancelled_count = 0
-
- for order in linked_orders:
- order_db_id = order.get('id')
- if order_db_id:
- success = self.update_order_status(order_db_id=order_db_id, new_status=new_status)
- if success:
- cancelled_count += 1
- logger.info(f"Cancelled linked order ID {order_db_id} (parent: {parent_bot_order_ref_id}) -> status: {new_status}")
-
- return cancelled_count
- def cancel_pending_stop_losses_by_symbol(self, symbol: str, new_status: str = 'cancelled_position_closed') -> int:
- """Cancel all pending stop loss orders for a specific symbol (when position is closed).
- Returns the number of stop loss orders that were cancelled."""
- query = "SELECT * FROM orders WHERE symbol = ? AND status = 'pending_trigger' AND type = 'stop_limit_trigger'"
- pending_stop_losses = self._fetch_query(query, (symbol,))
- cancelled_count = 0
-
- for order in pending_stop_losses:
- order_db_id = order.get('id')
- if order_db_id:
- success = self.update_order_status(order_db_id=order_db_id, new_status=new_status)
- if success:
- cancelled_count += 1
- logger.info(f"Cancelled pending SL order ID {order_db_id} for {symbol} -> status: {new_status}")
-
- return cancelled_count
- def get_order_cleanup_summary(self) -> Dict[str, Any]:
- """Get summary of order cleanup actions for monitoring and debugging."""
- try:
- # Get counts of different cancellation types
- cleanup_stats = {}
-
- cancellation_types = [
- 'cancelled_parent_cancelled',
- 'cancelled_parent_disappeared',
- 'cancelled_manual_exit',
- 'cancelled_auto_exit',
- 'cancelled_no_position',
- 'cancelled_external_position_close',
- 'cancelled_orphaned_no_position',
- 'cancelled_externally',
- 'immediately_executed_on_activation',
- 'activation_execution_failed',
- 'activation_execution_error'
- ]
-
- for cancel_type in cancellation_types:
- count_result = self._fetchone_query(
- "SELECT COUNT(*) as count FROM orders WHERE status = ?",
- (cancel_type,)
- )
- cleanup_stats[cancel_type] = count_result['count'] if count_result else 0
-
- # Get currently pending stop losses
- pending_sls = self.get_orders_by_status('pending_trigger', 'stop_limit_trigger')
- cleanup_stats['currently_pending_stop_losses'] = len(pending_sls)
-
- # Get total orders in various states
- active_orders = self._fetchone_query(
- "SELECT COUNT(*) as count FROM orders WHERE status IN ('open', 'submitted', 'partially_filled')",
- ()
- )
- cleanup_stats['currently_active_orders'] = active_orders['count'] if active_orders else 0
-
- return cleanup_stats
-
- except Exception as e:
- logger.error(f"Error getting order cleanup summary: {e}")
- return {}
- def get_external_activity_summary(self, days: int = 7) -> Dict[str, Any]:
- """Get summary of external activity (trades and cancellations) over the last N days."""
- try:
- from datetime import timedelta
- cutoff_date = (datetime.now(timezone.utc) - timedelta(days=days)).isoformat()
-
- # External trades
- external_trades = self._fetch_query(
- "SELECT COUNT(*) as count, side FROM trades WHERE trade_type = 'external' AND timestamp >= ? GROUP BY side",
- (cutoff_date,)
- )
-
- external_trade_summary = {
- 'external_buy_trades': 0,
- 'external_sell_trades': 0,
- 'total_external_trades': 0
- }
-
- for trade_group in external_trades:
- side = trade_group['side']
- count = trade_group['count']
- external_trade_summary['total_external_trades'] += count
- if side == 'buy':
- external_trade_summary['external_buy_trades'] = count
- elif side == 'sell':
- external_trade_summary['external_sell_trades'] = count
-
- # External cancellations
- external_cancellations = self._fetchone_query(
- "SELECT COUNT(*) as count FROM orders WHERE status = 'cancelled_externally' AND timestamp_updated >= ?",
- (cutoff_date,)
- )
- external_trade_summary['external_cancellations'] = external_cancellations['count'] if external_cancellations else 0
-
- # Cleanup actions
- cleanup_cancellations = self._fetchone_query(
- """SELECT COUNT(*) as count FROM orders
- WHERE status LIKE 'cancelled_%'
- AND status != 'cancelled_externally'
- AND timestamp_updated >= ?""",
- (cutoff_date,)
- )
- external_trade_summary['cleanup_cancellations'] = cleanup_cancellations['count'] if cleanup_cancellations else 0
-
- external_trade_summary['period_days'] = days
-
- return external_trade_summary
-
- except Exception as e:
- logger.error(f"Error getting external activity summary: {e}")
- return {'period_days': days, 'total_external_trades': 0, 'external_cancellations': 0}
- # --- End Order Table Management ---
- # =============================================================================
- # TRADE LIFECYCLE MANAGEMENT - PHASE 4: UNIFIED TRADES TABLE
- # =============================================================================
-
- def create_trade_lifecycle(self, symbol: str, side: str, entry_order_id: Optional[str] = None,
- entry_bot_order_ref_id: Optional[str] = None, # New parameter
- stop_loss_price: Optional[float] = None,
- take_profit_price: Optional[float] = None,
- trade_type: str = 'manual') -> Optional[str]:
- """Create a new trade lifecycle.
- If stop_loss_price is provided, also creates a conceptual 'pending_sl_activation' order.
- """
- try:
- lifecycle_id = str(uuid.uuid4())
-
- # Main lifecycle record in 'trades' table
- query = """
- INSERT INTO trades (
- symbol, side, amount, price, value, trade_type, timestamp,
- status, trade_lifecycle_id, position_side, entry_order_id,
- stop_loss_price, take_profit_price, updated_at
- ) VALUES (?, ?, 0, 0, 0, ?, ?, 'pending', ?, 'flat', ?, ?, ?, ?)
- """
- timestamp = datetime.now(timezone.utc).isoformat()
- params = (symbol, side.lower(), trade_type, timestamp, lifecycle_id,
- entry_order_id, stop_loss_price, take_profit_price, timestamp)
-
- self._execute_query(query, params)
- logger.info(f"📊 Created trade lifecycle {lifecycle_id}: {side.upper()} {symbol} (pending for exch_id: {entry_order_id or 'N/A'})")
- # If SL price is provided, create a conceptual pending SL order in 'orders' table
- if stop_loss_price is not None and entry_bot_order_ref_id is not None:
- sl_order_side = 'sell' if side.lower() == 'buy' else 'buy'
- # Using entry_bot_order_ref_id ensures this conceptual SL is linked to the specific entry attempt
- conceptual_sl_bot_ref_id = f"pending_sl_activation_{entry_bot_order_ref_id}"
-
- # Record this conceptual order. Amount is 0 for now.
- # The actual amount will be determined when the SL is placed after entry fill.
- sl_order_db_id = self.record_order_placed(
- symbol=symbol,
- side=sl_order_side,
- order_type='pending_sl_activation', # New conceptual type
- amount_requested=0, # Placeholder amount
- price=stop_loss_price,
- bot_order_ref_id=conceptual_sl_bot_ref_id,
- status='pending_activation', # New conceptual status
- parent_bot_order_ref_id=entry_bot_order_ref_id, # Link to the main entry order
- exchange_order_id=None # Not on exchange yet
- )
- if sl_order_db_id:
- logger.info(f"💡 Recorded conceptual 'pending_sl_activation' order (DB ID: {sl_order_db_id}, BotRef: {conceptual_sl_bot_ref_id}) for lifecycle {lifecycle_id} at SL price {stop_loss_price}.")
- else:
- logger.error(f"⚠️ Failed to record conceptual 'pending_sl_activation' order for lifecycle {lifecycle_id} (Entry BotRef: {entry_bot_order_ref_id}).")
- return lifecycle_id
-
- except Exception as e:
- logger.error(f"❌ Error creating trade lifecycle: {e}")
- return None
-
- def update_trade_position_opened(self, lifecycle_id: str, entry_price: float,
- entry_amount: float, exchange_fill_id: str) -> bool:
- """Update trade when position is opened (entry order filled)."""
- try:
- query = """
- UPDATE trades
- SET status = 'position_opened',
- amount = ?,
- price = ?,
- value = ?,
- entry_price = ?,
- current_position_size = ?,
- position_side = CASE
- WHEN side = 'buy' THEN 'long'
- WHEN side = 'sell' THEN 'short'
- ELSE position_side
- END,
- exchange_fill_id = ?,
- position_opened_at = ?,
- updated_at = ?
- WHERE trade_lifecycle_id = ? AND status = 'pending'
- """
- timestamp = datetime.now(timezone.utc).isoformat()
- value = entry_amount * entry_price
- params = (entry_amount, entry_price, value, entry_price, entry_amount,
- exchange_fill_id, timestamp, timestamp, lifecycle_id)
-
- self._execute_query(query, params)
-
- formatter = get_formatter()
- trade_info = self.get_trade_by_lifecycle_id(lifecycle_id) # Fetch to get symbol for formatting
- symbol_for_formatting = trade_info.get('symbol', 'UNKNOWN_SYMBOL') if trade_info else 'UNKNOWN_SYMBOL'
- base_asset_for_amount = symbol_for_formatting.split('/')[0] if '/' in symbol_for_formatting else symbol_for_formatting
- logger.info(f"📈 Trade lifecycle {lifecycle_id} position opened: {formatter.format_amount(entry_amount, base_asset_for_amount)} {symbol_for_formatting} @ {formatter.format_price(entry_price, symbol_for_formatting)}")
- return True
-
- except Exception as e:
- logger.error(f"❌ Error updating trade position opened: {e}")
- return False
-
- def update_trade_position_closed(self, lifecycle_id: str, exit_price: float,
- realized_pnl: float, exchange_fill_id: str) -> bool:
- """Update trade when position is fully closed."""
- try:
- query = """
- UPDATE trades
- SET status = 'position_closed',
- current_position_size = 0,
- position_side = 'flat',
- realized_pnl = ?,
- position_closed_at = ?,
- updated_at = ?
- WHERE trade_lifecycle_id = ? AND status = 'position_opened'
- """
- timestamp = datetime.now(timezone.utc).isoformat()
- params = (realized_pnl, timestamp, timestamp, lifecycle_id)
-
- self._execute_query(query, params)
-
- formatter = get_formatter()
- trade_info = self.get_trade_by_lifecycle_id(lifecycle_id) # Fetch to get symbol for P&L formatting context
- symbol_for_formatting = trade_info.get('symbol', 'USD') # Default to USD for PNL if symbol unknown
- pnl_emoji = "🟢" if realized_pnl >= 0 else "🔴"
- logger.info(f"{pnl_emoji} Trade lifecycle {lifecycle_id} position closed: P&L {formatter.format_price_with_symbol(realized_pnl)}")
- return True
-
- except Exception as e:
- logger.error(f"❌ Error updating trade position closed: {e}")
- return False
-
- def update_trade_cancelled(self, lifecycle_id: str, reason: str = "order_cancelled") -> bool:
- """Update trade when entry order is cancelled (never opened)."""
- try:
- query = """
- UPDATE trades
- SET status = 'cancelled',
- notes = ?,
- updated_at = ?
- WHERE trade_lifecycle_id = ? AND status = 'pending'
- """
- timestamp = datetime.now(timezone.utc).isoformat()
- params = (f"Cancelled: {reason}", timestamp, lifecycle_id)
-
- self._execute_query(query, params)
-
- logger.info(f"❌ Trade lifecycle {lifecycle_id} cancelled: {reason}")
- return True
-
- except Exception as e:
- logger.error(f"❌ Error updating trade cancelled: {e}")
- return False
-
- def link_stop_loss_to_trade(self, lifecycle_id: str, stop_loss_order_id: str,
- stop_loss_price: float) -> bool:
- """Link a stop loss order to a trade lifecycle."""
- try:
- query = """
- UPDATE trades
- SET stop_loss_order_id = ?,
- stop_loss_price = ?,
- updated_at = ?
- WHERE trade_lifecycle_id = ? AND status = 'position_opened'
- """
- timestamp = datetime.now(timezone.utc).isoformat()
- params = (stop_loss_order_id, stop_loss_price, timestamp, lifecycle_id)
-
- self._execute_query(query, params)
-
- formatter = get_formatter()
- trade_info = self.get_trade_by_lifecycle_id(lifecycle_id) # Fetch to get symbol for formatting
- symbol_for_formatting = trade_info.get('symbol', 'UNKNOWN_SYMBOL') if trade_info else 'UNKNOWN_SYMBOL'
- logger.info(f"🛑 Linked stop loss order {stop_loss_order_id} ({formatter.format_price(stop_loss_price, symbol_for_formatting)}) to trade {lifecycle_id}")
- return True
-
- except Exception as e:
- logger.error(f"❌ Error linking stop loss to trade: {e}")
- return False
-
- def link_take_profit_to_trade(self, lifecycle_id: str, take_profit_order_id: str,
- take_profit_price: float) -> bool:
- """Link a take profit order to a trade lifecycle."""
- try:
- query = """
- UPDATE trades
- SET take_profit_order_id = ?,
- take_profit_price = ?,
- updated_at = ?
- WHERE trade_lifecycle_id = ? AND status = 'position_opened'
- """
- timestamp = datetime.now(timezone.utc).isoformat()
- params = (take_profit_order_id, take_profit_price, timestamp, lifecycle_id)
-
- self._execute_query(query, params)
-
- formatter = get_formatter()
- trade_info = self.get_trade_by_lifecycle_id(lifecycle_id) # Fetch to get symbol for formatting
- symbol_for_formatting = trade_info.get('symbol', 'UNKNOWN_SYMBOL') if trade_info else 'UNKNOWN_SYMBOL'
- logger.info(f"🎯 Linked take profit order {take_profit_order_id} ({formatter.format_price(take_profit_price, symbol_for_formatting)}) to trade {lifecycle_id}")
- return True
-
- except Exception as e:
- logger.error(f"❌ Error linking take profit to trade: {e}")
- return False
-
- def get_trade_by_lifecycle_id(self, lifecycle_id: str) -> Optional[Dict[str, Any]]:
- """Get trade by lifecycle ID."""
- query = "SELECT * FROM trades WHERE trade_lifecycle_id = ?"
- return self._fetchone_query(query, (lifecycle_id,))
-
- # Re-instating the correct get_trade_by_symbol_and_status from earlier version in case it was overwritten by file read
- def get_trade_by_symbol_and_status(self, symbol: str, status: str = 'position_opened') -> Optional[Dict[str, Any]]: # Copied from earlier state
- """Get trade by symbol and status."""
- query = "SELECT * FROM trades WHERE symbol = ? AND status = ? ORDER BY updated_at DESC LIMIT 1"
- return self._fetchone_query(query, (symbol, status))
-
- def get_open_positions(self, symbol: Optional[str] = None) -> List[Dict[str, Any]]:
- """Get all open positions, optionally filtered by symbol."""
- if symbol:
- query = "SELECT * FROM trades WHERE status = 'position_opened' AND symbol = ? ORDER BY position_opened_at DESC"
- return self._fetch_query(query, (symbol,))
- else:
- query = "SELECT * FROM trades WHERE status = 'position_opened' ORDER BY position_opened_at DESC"
- return self._fetch_query(query)
-
- def get_trades_by_status(self, status: str, limit: int = 50) -> List[Dict[str, Any]]:
- """Get trades by status."""
- query = "SELECT * FROM trades WHERE status = ? ORDER BY updated_at DESC LIMIT ?"
- return self._fetch_query(query, (status, limit))
- def _format_duration(self, seconds: float) -> str:
- """Formats a duration in seconds into a human-readable string (e.g., 1h 25m 3s)."""
- hours = int(seconds // 3600)
- minutes = int((seconds % 3600) // 60)
- remaining_seconds = int(seconds % 60)
- return f"{hours}h {minutes}m {remaining_seconds}s"
- # --- End Trade Lifecycle Management ---
- def get_balance_history_record_count(self) -> int:
- """Get the total number of balance history records."""
- row = self._fetchone_query("SELECT COUNT(*) as count FROM balance_history")
- return row['count'] if row and 'count' in row else 0
- # 🆕 PHASE 5: AGGREGATION AND PURGING LOGIC
- def _migrate_trade_to_aggregated_stats(self, trade_lifecycle_id: str):
- """Migrate a completed/cancelled trade's stats to aggregate tables and delete the original trade."""
- # Implement the logic to migrate trade stats to aggregate tables and delete the original trade
- pass
- def purge_old_daily_aggregated_stats(self, months_to_keep: int = 10):
- """Purge records from daily_aggregated_stats older than a specified number of months."""
- try:
- cutoff_date = datetime.now(timezone.utc).date() - timedelta(days=months_to_keep * 30)
- cutoff_datetime_str = cutoff_date.isoformat()
- query = "DELETE FROM daily_aggregated_stats WHERE date < ?"
-
- with self.conn:
- cursor = self.conn.cursor()
- cursor.execute(query, (cutoff_datetime_str,))
- rows_deleted = cursor.rowcount
-
- if rows_deleted > 0:
- logger.info(f"Purged {rows_deleted} old records from daily_aggregated_stats (older than {months_to_keep} months).")
- else:
- logger.debug(f"No old records found in daily_aggregated_stats to purge (older than {months_to_keep} months).")
- except sqlite3.Error as e:
- logger.error(f"Database error purging old daily_aggregated_stats: {e}", exc_info=True)
- except Exception as e:
- logger.error(f"Unexpected error purging old daily_aggregated_stats: {e}", exc_info=True)
- def purge_old_balance_history(self):
- """Purge records from balance_history older than the configured retention period."""
- days_to_keep = Config.BALANCE_HISTORY_RETENTION_DAYS
- if days_to_keep <= 0:
- logger.info("Not purging balance_history as retention days is not positive.")
- return
- try:
- cutoff_date = datetime.now(timezone.utc).date() - timedelta(days=days_to_keep)
- cutoff_datetime_str = cutoff_date.isoformat()
- query = "DELETE FROM balance_history WHERE timestamp < ?"
-
- with self.conn:
- cursor = self.conn.cursor()
- cursor.execute(query, (cutoff_datetime_str,))
- rows_deleted = cursor.rowcount
-
- if rows_deleted > 0:
- logger.info(f"Purged {rows_deleted} old records from balance_history (older than {days_to_keep} days).")
- else:
- logger.debug(f"No old records found in balance_history to purge (older than {days_to_keep} days).")
- except sqlite3.Error as e:
- logger.error(f"Database error purging old balance_history: {e}", exc_info=True)
- except Exception as e:
- logger.error(f"Unexpected error purging old balance_history: {e}", exc_info=True)
- def get_daily_balance_record_count(self) -> int:
- """Get the total number of daily balance records."""
- row = self._fetchone_query("SELECT COUNT(*) as count FROM daily_balances")
- return row['count'] if row and 'count' in row else 0
|