12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763 |
- #!/usr/bin/env python3
- """
- Trading Statistics Tracker (SQLite Version)
- Tracks and calculates comprehensive trading statistics using an SQLite database.
- """
- import sqlite3
- import os
- import logging
- from datetime import datetime, timedelta, timezone
- from typing import Dict, List, Any, Optional, Tuple, Union
- import math
- from collections import defaultdict
- import uuid
- import numpy as np # Ensure numpy is imported as np
- # 🆕 Import the migration runner
- from src.migrations.migrate_db import run_migrations as run_db_migrations
- from src.utils.token_display_formatter import get_formatter # Added import
- logger = logging.getLogger(__name__)
- def _normalize_token_case(token: str) -> str:
- """
- Normalize token case: if any characters are already uppercase, keep as-is.
- Otherwise, convert to uppercase. This handles mixed-case tokens like kPEPE, kBONK.
- """
- # Check if any character is already uppercase
- if any(c.isupper() for c in token):
- return token # Keep original case for mixed-case tokens
- else:
- return token.upper() # Convert to uppercase for all-lowercase input
- class TradingStats:
- """Comprehensive trading statistics tracker using SQLite."""
- def __init__(self, db_path: str = "data/trading_stats.sqlite"):
- """Initialize the stats tracker and connect to SQLite DB."""
- self.db_path = db_path
- self._ensure_data_directory()
-
- # 🆕 Run database migrations before connecting and creating tables
- # This ensures the schema is up-to-date when the connection is made
- # and tables are potentially created for the first time.
- logger.info("Running database migrations if needed...")
- run_db_migrations(self.db_path) # Pass the correct db_path
- logger.info("Database migration check complete.")
-
- self.conn = sqlite3.connect(self.db_path, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES)
- self.conn.row_factory = self._dict_factory
- self._create_tables() # CREATE IF NOT EXISTS will still be useful for first-time setup
- self._initialize_metadata() # Also potentially sets schema_version if DB was just created
- # 🆕 Purge old daily aggregated stats on startup
- self.purge_old_daily_aggregated_stats()
- def _dict_factory(self, cursor, row):
- """Convert SQLite rows to dictionaries."""
- d = {}
- for idx, col in enumerate(cursor.description):
- d[col[0]] = row[idx]
- return d
- def _ensure_data_directory(self):
- """Ensure the data directory for the SQLite file exists."""
- data_dir = os.path.dirname(self.db_path)
- if data_dir and not os.path.exists(data_dir):
- os.makedirs(data_dir)
- logger.info(f"Created data directory for TradingStats DB: {data_dir}")
- def _execute_query(self, query: str, params: tuple = ()):
- """Execute a query (INSERT, UPDATE, DELETE)."""
- with self.conn:
- self.conn.execute(query, params)
- def _fetch_query(self, query: str, params: tuple = ()) -> List[Dict[str, Any]]:
- """Execute a SELECT query and fetch all results."""
- cur = self.conn.cursor()
- cur.execute(query, params)
- return cur.fetchall()
- def _fetchone_query(self, query: str, params: tuple = ()) -> Optional[Dict[str, Any]]:
- """Execute a SELECT query and fetch one result."""
- cur = self.conn.cursor()
- cur.execute(query, params)
- return cur.fetchone()
- def _create_tables(self):
- """Create SQLite tables if they don't exist."""
- queries = [
- """
- CREATE TABLE IF NOT EXISTS metadata (
- key TEXT PRIMARY KEY,
- value TEXT
- )
- """,
- """
- CREATE TABLE IF NOT EXISTS trades (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- exchange_fill_id TEXT UNIQUE,
- timestamp TEXT NOT NULL,
- symbol TEXT NOT NULL,
- side TEXT NOT NULL,
- amount REAL NOT NULL,
- price REAL NOT NULL,
- value REAL NOT NULL,
- trade_type TEXT NOT NULL,
- pnl REAL DEFAULT 0.0,
- linked_order_table_id INTEGER,
-
- -- 🆕 PHASE 4: Lifecycle tracking fields (merged from active_trades)
- status TEXT DEFAULT 'executed', -- 'pending', 'executed', 'position_opened', 'position_closed', 'cancelled'
- trade_lifecycle_id TEXT, -- Groups related trades into one lifecycle
- position_side TEXT, -- 'long', 'short', 'flat' - the resulting position side
-
- -- Position tracking
- entry_price REAL,
- current_position_size REAL DEFAULT 0,
-
- -- Order IDs (exchange IDs)
- entry_order_id TEXT,
- stop_loss_order_id TEXT,
- take_profit_order_id TEXT,
-
- -- Risk management
- stop_loss_price REAL,
- take_profit_price REAL,
-
- -- P&L tracking
- realized_pnl REAL DEFAULT 0,
- unrealized_pnl REAL DEFAULT 0,
- mark_price REAL DEFAULT 0,
- position_value REAL DEFAULT NULL,
- unrealized_pnl_percentage REAL DEFAULT NULL,
-
- -- Risk Info from Exchange
- liquidation_price REAL DEFAULT NULL,
- margin_used REAL DEFAULT NULL,
- leverage REAL DEFAULT NULL,
-
- -- Timestamps
- position_opened_at TEXT,
- position_closed_at TEXT,
- updated_at TEXT DEFAULT CURRENT_TIMESTAMP,
-
- -- Notes
- notes TEXT
- )
- """,
- """
- CREATE TABLE IF NOT EXISTS daily_balances (
- date TEXT PRIMARY KEY,
- balance REAL NOT NULL,
- timestamp TEXT NOT NULL
- )
- """,
- """
- CREATE TABLE IF NOT EXISTS balance_adjustments (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- adjustment_id TEXT UNIQUE,
- timestamp TEXT NOT NULL,
- type TEXT NOT NULL, -- 'deposit' or 'withdrawal'
- amount REAL NOT NULL, -- Always positive, type indicates direction
- description TEXT
- )
- """,
- """
- CREATE TABLE IF NOT EXISTS orders (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- bot_order_ref_id TEXT UNIQUE,
- exchange_order_id TEXT UNIQUE,
- symbol TEXT NOT NULL,
- side TEXT NOT NULL,
- type TEXT NOT NULL,
- amount_requested REAL NOT NULL,
- amount_filled REAL DEFAULT 0.0,
- price REAL, -- For limit, stop, etc.
- status TEXT NOT NULL, -- e.g., 'open', 'partially_filled', 'filled', 'cancelled', 'rejected', 'expired', 'pending_trigger'
- timestamp_created TEXT NOT NULL,
- timestamp_updated TEXT NOT NULL,
- parent_bot_order_ref_id TEXT NULLABLE -- To link conditional orders (like SL triggers) to their parent order
- )
- """,
- """
- CREATE INDEX IF NOT EXISTS idx_orders_bot_order_ref_id ON orders (bot_order_ref_id);
- """,
- """
- CREATE INDEX IF NOT EXISTS idx_orders_exchange_order_id ON orders (exchange_order_id);
- """,
- """
- CREATE INDEX IF NOT EXISTS idx_trades_exchange_fill_id ON trades (exchange_fill_id);
- """,
- """
- CREATE INDEX IF NOT EXISTS idx_trades_linked_order_table_id ON trades (linked_order_table_id);
- """,
- """
- CREATE INDEX IF NOT EXISTS idx_orders_parent_bot_order_ref_id ON orders (parent_bot_order_ref_id);
- """,
- """
- CREATE INDEX IF NOT EXISTS idx_orders_status_type ON orders (status, type);
- """,
- """
- CREATE INDEX IF NOT EXISTS idx_trades_status ON trades (status);
- """,
- """
- CREATE INDEX IF NOT EXISTS idx_trades_lifecycle_id ON trades (trade_lifecycle_id);
- """,
- """
- CREATE INDEX IF NOT EXISTS idx_trades_position_side ON trades (position_side);
- """,
- """
- CREATE INDEX IF NOT EXISTS idx_trades_symbol_status ON trades (symbol, status);
- """
- ]
- # 🆕 Add new table creation queries
- queries.extend([
- """
- CREATE TABLE IF NOT EXISTS token_stats (
- token TEXT PRIMARY KEY,
- total_realized_pnl REAL DEFAULT 0.0,
- total_completed_cycles INTEGER DEFAULT 0,
- winning_cycles INTEGER DEFAULT 0,
- losing_cycles INTEGER DEFAULT 0,
- total_entry_volume REAL DEFAULT 0.0, -- Sum of (amount * entry_price) for completed cycles
- total_exit_volume REAL DEFAULT 0.0, -- Sum of (amount * exit_price) for completed cycles
- sum_of_winning_pnl REAL DEFAULT 0.0,
- sum_of_losing_pnl REAL DEFAULT 0.0, -- Stored as a positive value
- largest_winning_cycle_pnl REAL DEFAULT 0.0,
- largest_losing_cycle_pnl REAL DEFAULT 0.0, -- Stored as a positive value
- first_cycle_closed_at TEXT,
- last_cycle_closed_at TEXT,
- total_cancelled_cycles INTEGER DEFAULT 0, -- Count of lifecycles that ended in 'cancelled'
- updated_at TEXT DEFAULT CURRENT_TIMESTAMP
- )
- """,
- """
- CREATE TABLE IF NOT EXISTS daily_aggregated_stats (
- date TEXT NOT NULL, -- YYYY-MM-DD
- token TEXT NOT NULL, -- Specific token or a general identifier like '_OVERALL_'
- realized_pnl REAL DEFAULT 0.0,
- completed_cycles INTEGER DEFAULT 0,
- entry_volume REAL DEFAULT 0.0,
- exit_volume REAL DEFAULT 0.0,
- PRIMARY KEY (date, token)
- )
- """,
- """
- CREATE INDEX IF NOT EXISTS idx_daily_stats_date_token ON daily_aggregated_stats (date, token);
- """
- ])
- for query in queries:
- self._execute_query(query)
- logger.info("SQLite tables ensured for TradingStats.")
- def _initialize_metadata(self):
- """Initialize metadata if not already present."""
- start_date = self._get_metadata('start_date')
- initial_balance = self._get_metadata('initial_balance')
- if start_date is None:
- self._set_metadata('start_date', datetime.now(timezone.utc).isoformat())
- logger.info("Initialized 'start_date' in metadata.")
-
- if initial_balance is None:
- self._set_metadata('initial_balance', '0.0')
- logger.info("Initialized 'initial_balance' in metadata.")
-
- logger.info(f"TradingStats initialized. Start Date: {self._get_metadata('start_date')}, Initial Balance: {self._get_metadata('initial_balance')}")
- def _get_metadata(self, key: str) -> Optional[str]:
- """Retrieve a value from the metadata table."""
- row = self._fetchone_query("SELECT value FROM metadata WHERE key = ?", (key,))
- return row['value'] if row else None
- def _set_metadata(self, key: str, value: str):
- """Set a value in the metadata table."""
- self._execute_query("INSERT OR REPLACE INTO metadata (key, value) VALUES (?, ?)", (key, value))
- def set_initial_balance(self, balance: float):
- """Set the initial balance if not already set or zero."""
- current_initial_balance_str = self._get_metadata('initial_balance')
- current_initial_balance = float(current_initial_balance_str) if current_initial_balance_str else 0.0
-
- if current_initial_balance == 0.0: # Only set if it's effectively unset
- self._set_metadata('initial_balance', str(balance))
- # Also set start_date if it's the first time setting balance
- if self._get_metadata('start_date') is None or float(current_initial_balance_str if current_initial_balance_str else '0.0') == 0.0:
- self._set_metadata('start_date', datetime.now(timezone.utc).isoformat())
- formatter = get_formatter()
- logger.info(f"Initial balance set to: {formatter.format_price_with_symbol(balance)}")
- else:
- formatter = get_formatter()
- logger.info(f"Initial balance already set to {formatter.format_price_with_symbol(current_initial_balance)}. Not changing.")
- def record_balance(self, balance: float):
- """Record daily balance snapshot."""
- today_iso = datetime.now(timezone.utc).date().isoformat()
- now_iso = datetime.now(timezone.utc).isoformat()
-
- existing_entry = self._fetchone_query("SELECT date FROM daily_balances WHERE date = ?", (today_iso,))
- if existing_entry:
- self._execute_query("UPDATE daily_balances SET balance = ?, timestamp = ? WHERE date = ?",
- (balance, now_iso, today_iso))
- else:
- self._execute_query("INSERT INTO daily_balances (date, balance, timestamp) VALUES (?, ?, ?)",
- (today_iso, balance, now_iso))
- # logger.debug(f"Recorded balance for {today_iso}: ${balance:.2f}") # Potentially too verbose
- def record_trade(self, symbol: str, side: str, amount: float, price: float,
- exchange_fill_id: Optional[str] = None, trade_type: str = "manual",
- pnl: Optional[float] = None, timestamp: Optional[str] = None,
- linked_order_table_id_to_link: Optional[int] = None):
- """Record a trade in the database."""
- if timestamp is None:
- timestamp = datetime.now(timezone.utc).isoformat()
-
- value = amount * price
- self._execute_query(
- "INSERT OR IGNORE INTO trades (symbol, side, amount, price, value, trade_type, timestamp, exchange_fill_id, pnl, linked_order_table_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
- (symbol, side, amount, price, value, trade_type, timestamp, exchange_fill_id, pnl or 0.0, linked_order_table_id_to_link)
- )
- formatter = get_formatter()
- # Assuming symbol's base asset for amount formatting. If symbol is like BTC/USDT, base is BTC.
- base_asset_for_amount = symbol.split('/')[0] if '/' in symbol else symbol
- logger.info(f"📈 Trade recorded: {side.upper()} {formatter.format_amount(amount, base_asset_for_amount)} {symbol} @ {formatter.format_price(price, symbol)} ({formatter.format_price(value, symbol)}) [{trade_type}]")
- def get_all_trades(self) -> List[Dict[str, Any]]:
- """Fetch all trades from the database, ordered by timestamp."""
- return self._fetch_query("SELECT * FROM trades ORDER BY timestamp ASC")
- def get_trade_by_symbol_and_status(self, symbol: str, status: str) -> Optional[Dict[str, Any]]:
- """
- Fetches a single trade record for a given symbol and status.
- Typically used to find an open position master record.
- Assumes that for a given symbol, there's at most one trade record with a specific
- active status like 'position_opened'. If multiple could exist, this fetches the most recent.
- """
- query = "SELECT * FROM trades WHERE symbol = ? AND status = ? ORDER BY id DESC LIMIT 1"
- trade = self._fetchone_query(query, (symbol, status))
- if trade:
- logger.debug(f"Found trade for {symbol} with status {status}: ID {trade.get('id')}")
- # else: # Can be noisy if not finding a trade is a common occurrence
- # logger.debug(f"No trade found for {symbol} with status {status}")
- return trade
-
- def get_basic_stats(self, current_balance: Optional[float] = None) -> Dict[str, Any]:
- """Get basic trading statistics from DB, primarily using aggregated tables."""
-
- # Get counts of open positions (trades that are not yet migrated)
- open_positions_count = self._get_open_positions_count_from_db()
- # Get overall aggregated stats from token_stats table
- query_token_stats_summary = """
- SELECT
- SUM(total_realized_pnl) as total_pnl_from_cycles,
- SUM(total_completed_cycles) as total_completed_cycles_sum,
- MIN(first_cycle_closed_at) as overall_first_cycle_closed,
- MAX(last_cycle_closed_at) as overall_last_cycle_closed
- FROM token_stats
- """
- token_stats_summary = self._fetchone_query(query_token_stats_summary)
- total_pnl_from_cycles = token_stats_summary['total_pnl_from_cycles'] if token_stats_summary and token_stats_summary['total_pnl_from_cycles'] is not None else 0.0
- total_completed_cycles_sum = token_stats_summary['total_completed_cycles_sum'] if token_stats_summary and token_stats_summary['total_completed_cycles_sum'] is not None else 0
- # Total trades considered as sum of completed cycles and currently open positions
- # This redefines 'total_trades' from its previous meaning of individual fills.
- total_trades_redefined = total_completed_cycles_sum + open_positions_count
- initial_balance_str = self._get_metadata('initial_balance')
- initial_balance = float(initial_balance_str) if initial_balance_str else 0.0
-
- start_date_iso = self._get_metadata('start_date')
- start_date_obj = datetime.fromisoformat(start_date_iso) if start_date_iso else datetime.now(timezone.utc)
- days_active = (datetime.now(timezone.utc) - start_date_obj).days + 1
-
- # 'last_trade' timestamp could be the last update to token_stats or an open trade
- last_activity_ts = token_stats_summary['overall_last_cycle_closed'] if token_stats_summary else None
- last_open_trade_ts_row = self._fetchone_query("SELECT MAX(updated_at) as last_update FROM trades WHERE status = 'position_opened'")
- if last_open_trade_ts_row and last_open_trade_ts_row['last_update']:
- if not last_activity_ts or datetime.fromisoformat(last_open_trade_ts_row['last_update']) > datetime.fromisoformat(last_activity_ts):
- last_activity_ts = last_open_trade_ts_row['last_update']
- # Buy/Sell trades count from individual fills is no longer directly available for completed cycles.
- # If needed, this requires schema change in token_stats or a different approach.
- # For now, these are omitted from basic_stats.
- return {
- 'total_trades': total_trades_redefined, # This is now cycles + open positions
- 'completed_trades': total_completed_cycles_sum, # This is sum of total_completed_cycles from token_stats
- # 'buy_trades': buy_trades_count, # Omitted
- # 'sell_trades': sell_trades_count, # Omitted
- 'initial_balance': initial_balance,
- 'total_pnl': total_pnl_from_cycles, # PNL from closed cycles via token_stats
- 'days_active': days_active,
- 'start_date': start_date_obj.strftime('%Y-%m-%d'),
- 'last_trade': last_activity_ts, # Reflects last known activity (cycle close or open trade update)
- 'open_positions_count': open_positions_count
- }
- def get_performance_stats(self) -> Dict[str, Any]:
- """Calculate advanced performance statistics using aggregated data from token_stats."""
- query = """
- SELECT
- SUM(total_completed_cycles) as total_cycles,
- SUM(winning_cycles) as total_wins,
- SUM(losing_cycles) as total_losses,
- SUM(sum_of_winning_pnl) as total_winning_pnl,
- SUM(sum_of_losing_pnl) as total_losing_pnl, -- Stored positive
- MAX(largest_winning_cycle_pnl) as overall_largest_win,
- MAX(largest_losing_cycle_pnl) as overall_largest_loss -- Stored positive
- FROM token_stats
- """
- summary = self._fetchone_query(query)
- # Add total volume
- volume_summary = self._fetchone_query("SELECT SUM(total_exit_volume) as total_volume FROM token_stats")
- total_trading_volume = volume_summary['total_volume'] if volume_summary and volume_summary['total_volume'] is not None else 0.0
- # Get individual token performances for best/worst
- all_token_perf_stats = self.get_token_performance()
- best_token_pnl_pct = -float('inf')
- best_token_name = "N/A"
- worst_token_pnl_pct = float('inf')
- worst_token_name = "N/A"
- if all_token_perf_stats:
- for token_name_iter, stats_data in all_token_perf_stats.items():
- pnl_pct = stats_data.get('pnl_percentage', 0.0)
- # Ensure token has completed trades and pnl_pct is a valid number
- if stats_data.get('completed_trades', 0) > 0 and isinstance(pnl_pct, (int, float)) and not math.isinf(pnl_pct) and not math.isnan(pnl_pct):
- if pnl_pct > best_token_pnl_pct:
- best_token_pnl_pct = pnl_pct
- best_token_name = token_name_iter
- if pnl_pct < worst_token_pnl_pct:
- worst_token_pnl_pct = pnl_pct
- worst_token_name = token_name_iter
-
- # Handle cases where no valid tokens were found for best/worst
- if best_token_name == "N/A":
- best_token_pnl_pct = 0.0
- if worst_token_name == "N/A":
- worst_token_pnl_pct = 0.0
- if not summary or summary['total_cycles'] is None or summary['total_cycles'] == 0:
- return {
- 'win_rate': 0.0, 'profit_factor': 0.0, 'avg_win': 0.0, 'avg_loss': 0.0,
- 'largest_win': 0.0, 'largest_loss': 0.0,
- 'total_wins': 0, 'total_losses': 0, 'expectancy': 0.0,
- 'total_trading_volume': total_trading_volume,
- 'best_performing_token': {'name': best_token_name, 'pnl_percentage': best_token_pnl_pct},
- 'worst_performing_token': {'name': worst_token_name, 'pnl_percentage': worst_token_pnl_pct},
- }
- total_completed_count = summary['total_cycles']
- total_wins_count = summary['total_wins'] if summary['total_wins'] is not None else 0
- total_losses_count = summary['total_losses'] if summary['total_losses'] is not None else 0
-
- win_rate = (total_wins_count / total_completed_count * 100) if total_completed_count > 0 else 0.0
-
- sum_of_wins = summary['total_winning_pnl'] if summary['total_winning_pnl'] is not None else 0.0
- sum_of_losses = summary['total_losing_pnl'] if summary['total_losing_pnl'] is not None else 0.0 # This is sum of absolute losses
- profit_factor = (sum_of_wins / sum_of_losses) if sum_of_losses > 0 else float('inf') if sum_of_wins > 0 else 0.0
-
- avg_win = (sum_of_wins / total_wins_count) if total_wins_count > 0 else 0.0
- avg_loss = (sum_of_losses / total_losses_count) if total_losses_count > 0 else 0.0 # Avg of absolute losses
-
- largest_win = summary['overall_largest_win'] if summary['overall_largest_win'] is not None else 0.0
- largest_loss = summary['overall_largest_loss'] if summary['overall_largest_loss'] is not None else 0.0 # Largest absolute loss
- # Consecutive wins/losses removed as it's hard to track with this aggregation model.
-
- expectancy = (avg_win * (win_rate / 100)) - (avg_loss * (1 - (win_rate / 100)))
- return {
- 'win_rate': win_rate, 'profit_factor': profit_factor, 'avg_win': avg_win, 'avg_loss': avg_loss,
- 'largest_win': largest_win, 'largest_loss': largest_loss,
- 'total_wins': total_wins_count, 'total_losses': total_losses_count, 'expectancy': expectancy,
- 'total_trading_volume': total_trading_volume,
- 'best_performing_token': {'name': best_token_name, 'pnl_percentage': best_token_pnl_pct},
- 'worst_performing_token': {'name': worst_token_name, 'pnl_percentage': worst_token_pnl_pct},
- }
- def get_risk_metrics(self) -> Dict[str, Any]:
- """Calculate risk-adjusted metrics from daily balances."""
- daily_balances_data = self._fetch_query("SELECT balance FROM daily_balances ORDER BY date ASC")
-
- if not daily_balances_data or len(daily_balances_data) < 2:
- return {'sharpe_ratio': 0.0, 'sortino_ratio': 0.0, 'max_drawdown': 0.0, 'volatility': 0.0, 'var_95': 0.0}
- balances = [entry['balance'] for entry in daily_balances_data]
- returns = np.diff(balances) / balances[:-1] # Calculate daily returns
- returns = returns[np.isfinite(returns)] # Remove NaNs or Infs if any balance was 0
- if returns.size == 0:
- return {'sharpe_ratio': 0.0, 'sortino_ratio': 0.0, 'max_drawdown': 0.0, 'volatility': 0.0, 'var_95': 0.0}
- risk_free_rate_daily = (1 + 0.02)**(1/365) - 1 # Approx 2% annual risk-free rate, daily
-
- excess_returns = returns - risk_free_rate_daily
- sharpe_ratio = np.mean(excess_returns) / np.std(returns) * np.sqrt(365) if np.std(returns) > 0 else 0.0
-
- downside_returns = returns[returns < 0]
- downside_std = np.std(downside_returns) if len(downside_returns) > 0 else 0.0
- sortino_ratio = np.mean(excess_returns) / downside_std * np.sqrt(365) if downside_std > 0 else 0.0
-
- cumulative_returns = np.cumprod(1 + returns)
- peak = np.maximum.accumulate(cumulative_returns)
- drawdown = (cumulative_returns - peak) / peak
- max_drawdown_pct = abs(np.min(drawdown) * 100) if drawdown.size > 0 else 0.0
-
- volatility_pct = np.std(returns) * np.sqrt(365) * 100
- var_95_pct = abs(np.percentile(returns, 5) * 100) if returns.size > 0 else 0.0
-
- return {
- 'sharpe_ratio': sharpe_ratio, 'sortino_ratio': sortino_ratio,
- 'max_drawdown': max_drawdown_pct, 'volatility': volatility_pct, 'var_95': var_95_pct
- }
- def get_comprehensive_stats(self, current_balance: Optional[float] = None) -> Dict[str, Any]:
- """Get all statistics combined."""
- if current_balance is not None: # Ensure it's not just None, but explicitly provided
- self.record_balance(current_balance) # Record current balance for today
-
- basic = self.get_basic_stats(current_balance) # Pass current_balance for P&L context if needed
- performance = self.get_performance_stats()
- risk = self.get_risk_metrics()
-
- initial_balance = basic['initial_balance']
- total_return_pct = 0.0
- # Use current_balance if available and valid for total return calculation
- # Otherwise, PNL from basic_stats (closed trades) is the primary PNL source
- # This needs careful thought: current_balance reflects unrealized PNL too.
- # The original code used current_balance - initial_balance for total_pnl if current_balance provided.
-
- effective_balance_for_return = current_balance if current_balance is not None else (initial_balance + basic['total_pnl'])
- if initial_balance > 0:
- total_return_pct = ((effective_balance_for_return - initial_balance) / initial_balance) * 100
-
- return {
- 'basic': basic,
- 'performance': performance,
- 'risk': risk,
- 'current_balance': current_balance if current_balance is not None else initial_balance + basic['total_pnl'], # Best estimate
- 'total_return': total_return_pct, # Percentage
- 'last_updated': datetime.now(timezone.utc).isoformat()
- }
- def _get_open_positions_count_from_db(self) -> int:
- """🧹 PHASE 4: Get count of open positions from enhanced trades table."""
- row = self._fetchone_query("SELECT COUNT(DISTINCT symbol) as count FROM trades WHERE status = 'position_opened'")
- return row['count'] if row else 0
- def format_stats_message(self, current_balance: Optional[float] = None) -> str:
- """Format stats for Telegram display using data from DB."""
- try:
- stats = self.get_comprehensive_stats(current_balance)
- formatter = get_formatter()
-
- basic = stats['basic']
- perf = stats['performance']
- risk = stats['risk'] # For portfolio drawdown
-
- effective_current_balance = stats['current_balance']
- initial_bal = basic['initial_balance']
- total_pnl_val = effective_current_balance - initial_bal if initial_bal > 0 and current_balance is not None else basic['total_pnl']
- total_return_pct = (total_pnl_val / initial_bal * 100) if initial_bal > 0 else 0.0
- pnl_emoji = "🟢" if total_pnl_val >= 0 else "🔴"
- open_positions_count = basic['open_positions_count']
- stats_text_parts = []
- stats_text_parts.append(f"📊 <b>Trading Statistics</b>\n")
-
- # Account Overview
- stats_text_parts.append(f"\n💰 <b>Account Overview:</b>")
- stats_text_parts.append(f"• Current Balance: {formatter.format_price_with_symbol(effective_current_balance)}")
- stats_text_parts.append(f"• Initial Balance: {formatter.format_price_with_symbol(initial_bal)}")
- stats_text_parts.append(f"• Open Positions: {open_positions_count}")
- stats_text_parts.append(f"• {pnl_emoji} Total P&L: {formatter.format_price_with_symbol(total_pnl_val)} ({total_return_pct:+.2f}%)")
- stats_text_parts.append(f"• Days Active: {basic['days_active']}\n")
-
- # Performance Metrics
- stats_text_parts.append(f"\n🏆 <b>Performance Metrics:</b>")
- stats_text_parts.append(f"• Total Completed Trades: {basic['completed_trades']}")
- stats_text_parts.append(f"• Trading Volume (Exit Vol.): {formatter.format_price_with_symbol(perf.get('total_trading_volume', 0.0))}")
- stats_text_parts.append(f"• Profit Factor: {perf['profit_factor']:.2f}")
- stats_text_parts.append(f"• Expectancy: {formatter.format_price_with_symbol(perf['expectancy'])} (Value per trade)")
- # Note for Expectancy Percentage: \"[Info: Percentage representation requires further definition]\" might be too verbose for typical display.
-
- stats_text_parts.append(f"• Largest Winning Trade: {formatter.format_price_with_symbol(perf['largest_win'])} (Value)")
- stats_text_parts.append(f"• Largest Losing Trade: {formatter.format_price_with_symbol(perf['largest_loss'])} (Value)")
- # Note for Largest Trade P&L %: Similar to expectancy, noting \"[Info: P&L % for specific trades requires data enhancement]\" in the bot message might be too much.
- best_token_stats = perf.get('best_performing_token', {'name': 'N/A', 'pnl_percentage': 0.0})
- worst_token_stats = perf.get('worst_performing_token', {'name': 'N/A', 'pnl_percentage': 0.0})
- stats_text_parts.append(f"• Best Performing Token: {best_token_stats['name']} ({best_token_stats['pnl_percentage']:+.2f}%)")
- stats_text_parts.append(f"• Worst Performing Token: {worst_token_stats['name']} ({worst_token_stats['pnl_percentage']:+.2f}%)")
-
- stats_text_parts.append(f"• Average Trade Duration: N/A <i>(Data collection required)</i>")
- stats_text_parts.append(f"• Portfolio Max Drawdown: {risk['max_drawdown']:.2f}% <i>(Daily Balance based)</i>")
- # Future note: \"[Info: Trading P&L specific drawdown analysis planned]\"
-
- # Session Info
- stats_text_parts.append(f"\n\n⏰ <b>Session Info:</b>")
- stats_text_parts.append(f"• Bot Started: {basic['start_date']}")
- stats_text_parts.append(f"• Stats Last Updated: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}")
-
- return "\n".join(stats_text_parts).strip()
-
- except Exception as e:
- logger.error(f"Error formatting stats message: {e}", exc_info=True)
- return f"""📊 <b>Trading Statistics</b>\n\n❌ <b>Error loading statistics</b>\n\n🔧 <b>Debug info:</b> {str(e)[:100]}"""
- def get_recent_trades(self, limit: int = 10) -> List[Dict[str, Any]]:
- """Get recent trades from DB (these are active/open trades, as completed ones are migrated)."""
- return self._fetch_query("SELECT * FROM trades WHERE status = 'position_opened' ORDER BY updated_at DESC LIMIT ?", (limit,))
- def get_token_performance(self) -> Dict[str, Dict[str, Any]]:
- """Get performance statistics grouped by token using the token_stats table."""
- all_token_stats = self._fetch_query("SELECT * FROM token_stats ORDER BY token ASC")
-
- token_performance_map = {}
- for record in all_token_stats:
- token = record['token']
- total_pnl = record.get('total_realized_pnl', 0.0)
- # total_volume_sold now refers to total_exit_volume from token_stats
- total_volume = record.get('total_exit_volume', 0.0)
-
- pnl_percentage = (total_pnl / total_volume * 100) if total_volume > 0 else 0.0
-
- total_completed_count = record.get('total_completed_cycles', 0)
- total_wins_count = record.get('winning_cycles', 0)
- total_losses_count = record.get('losing_cycles', 0)
-
- win_rate = (total_wins_count / total_completed_count * 100) if total_completed_count > 0 else 0.0
-
- sum_of_wins = record.get('sum_of_winning_pnl', 0.0)
- sum_of_losses = record.get('sum_of_losing_pnl', 0.0) # Stored positive
- profit_factor = (sum_of_wins / sum_of_losses) if sum_of_losses > 0 else float('inf') if sum_of_wins > 0 else 0.0
-
- avg_win = (sum_of_wins / total_wins_count) if total_wins_count > 0 else 0.0
- avg_loss = (sum_of_losses / total_losses_count) if total_losses_count > 0 else 0.0
- expectancy = (avg_win * (win_rate / 100)) - (avg_loss * (1 - (win_rate / 100)))
-
- largest_win = record.get('largest_winning_cycle_pnl', 0.0)
- largest_loss = record.get('largest_losing_cycle_pnl', 0.0) # Stored positive
- token_performance_map[token] = {
- 'token': token, # Added for easier access if iterating over values
- 'total_pnl': total_pnl,
- 'pnl_percentage': pnl_percentage,
- 'completed_trades': total_completed_count,
- 'total_volume': total_volume, # This is total_exit_volume
- 'win_rate': win_rate,
- 'total_wins': total_wins_count,
- 'total_losses': total_losses_count,
- 'profit_factor': profit_factor,
- 'expectancy': expectancy,
- 'largest_win': largest_win,
- 'largest_loss': largest_loss,
- 'avg_win': avg_win,
- 'avg_loss': avg_loss,
- 'first_cycle_closed_at': record.get('first_cycle_closed_at'),
- 'last_cycle_closed_at': record.get('last_cycle_closed_at'),
- 'total_cancelled_cycles': record.get('total_cancelled_cycles', 0)
- }
- return token_performance_map
- def get_token_detailed_stats(self, token: str) -> Dict[str, Any]:
- """Get detailed statistics for a specific token using token_stats and current open trades."""
- upper_token = _normalize_token_case(token)
-
- # Get aggregated performance from token_stats
- token_agg_stats = self._fetchone_query("SELECT * FROM token_stats WHERE token = ?", (upper_token,))
-
- # Get currently open trades for this token from the 'trades' table (not yet migrated)
- # These are not completed cycles but represent current exposure.
- open_trades_for_token = self._fetch_query(
- "SELECT * FROM trades WHERE symbol LIKE ? AND status = 'position_opened' ORDER BY timestamp ASC",
- (f"{upper_token}/%",)
- )
-
- if not token_agg_stats and not open_trades_for_token:
- return {
- 'token': upper_token, 'total_trades': 0, 'total_pnl': 0.0, 'win_rate': 0.0,
- 'message': f"No trading history or open positions found for {upper_token}"
- }
- # Initialize with empty performance if no aggregated data
- perf_stats = {}
- if token_agg_stats:
- perf_stats = {
- 'completed_trades': token_agg_stats.get('total_completed_cycles', 0),
- 'total_pnl': token_agg_stats.get('total_realized_pnl', 0.0),
- 'pnl_percentage': 0.0, # Recalculate if needed, or store avg pnl_percentage
- 'win_rate': 0.0,
- 'profit_factor': token_agg_stats.get('profit_factor'), # Placeholder, need to calc from sums
- 'avg_win': 0.0,
- 'avg_loss': 0.0,
- 'largest_win': token_agg_stats.get('largest_winning_cycle_pnl', 0.0),
- 'largest_loss': token_agg_stats.get('largest_losing_cycle_pnl', 0.0),
- 'expectancy': 0.0,
- 'total_wins': token_agg_stats.get('winning_cycles',0),
- 'total_losses': token_agg_stats.get('losing_cycles',0),
- 'completed_entry_volume': token_agg_stats.get('total_entry_volume', 0.0),
- 'completed_exit_volume': token_agg_stats.get('total_exit_volume', 0.0),
- 'total_cancelled': token_agg_stats.get('total_cancelled_cycles', 0)
- }
- if perf_stats['completed_trades'] > 0:
- perf_stats['win_rate'] = (perf_stats['total_wins'] / perf_stats['completed_trades'] * 100) if perf_stats['completed_trades'] > 0 else 0.0
- sum_wins = token_agg_stats.get('sum_of_winning_pnl', 0.0)
- sum_losses = token_agg_stats.get('sum_of_losing_pnl', 0.0)
- perf_stats['profit_factor'] = (sum_wins / sum_losses) if sum_losses > 0 else float('inf') if sum_wins > 0 else 0.0
- perf_stats['avg_win'] = (sum_wins / perf_stats['total_wins']) if perf_stats['total_wins'] > 0 else 0.0
- perf_stats['avg_loss'] = (sum_losses / perf_stats['total_losses']) if perf_stats['total_losses'] > 0 else 0.0
- perf_stats['expectancy'] = (perf_stats['avg_win'] * (perf_stats['win_rate'] / 100)) - (perf_stats['avg_loss'] * (1 - (perf_stats['win_rate'] / 100)))
- if perf_stats['completed_exit_volume'] > 0:
- perf_stats['pnl_percentage'] = (perf_stats['total_pnl'] / perf_stats['completed_exit_volume'] * 100)
- else: # No completed cycles for this token yet
- perf_stats = {
- 'completed_trades': 0, 'total_pnl': 0.0, 'pnl_percentage': 0.0, 'win_rate': 0.0,
- 'profit_factor': 0.0, 'avg_win': 0.0, 'avg_loss': 0.0, 'largest_win': 0.0, 'largest_loss': 0.0,
- 'expectancy': 0.0, 'total_wins':0, 'total_losses':0,
- 'completed_entry_volume': 0.0, 'completed_exit_volume': 0.0, 'total_cancelled': 0
- }
- # Info about open positions for this token (raw trades, not cycles)
- open_positions_summary = []
- total_open_value = 0.0
- total_open_unrealized_pnl = 0.0
- for op_trade in open_trades_for_token:
- open_positions_summary.append({
- 'lifecycle_id': op_trade.get('trade_lifecycle_id'),
- 'side': op_trade.get('position_side'),
- 'amount': op_trade.get('current_position_size'),
- 'entry_price': op_trade.get('entry_price'),
- 'mark_price': op_trade.get('mark_price'),
- 'unrealized_pnl': op_trade.get('unrealized_pnl'),
- 'opened_at': op_trade.get('position_opened_at')
- })
- total_open_value += op_trade.get('value', 0.0) # Initial value of open positions
- total_open_unrealized_pnl += op_trade.get('unrealized_pnl', 0.0)
- # Raw individual orders from 'orders' table for this token can be complex to summarize here
- # The old version counted 'buy_orders' and 'sell_orders' from all trades for the token.
- # This is no longer straightforward for completed cycles.
- # We can count open orders for this token.
- open_orders_count_row = self._fetchone_query(
- "SELECT COUNT(*) as count FROM orders WHERE symbol LIKE ? AND status IN ('open', 'submitted', 'pending_trigger')",
- (f"{upper_token}/%",)
- )
- current_open_orders_for_token = open_orders_count_row['count'] if open_orders_count_row else 0
- # 'total_trades' here could mean total orders ever placed for this token, or completed cycles + open positions
- # Let's define it as completed cycles + number of currently open positions for consistency with get_basic_stats
- effective_total_trades = perf_stats['completed_trades'] + len(open_trades_for_token)
- return {
- 'token': upper_token,
- 'message': f"Statistics for {upper_token}",
- 'performance_summary': perf_stats, # From token_stats table
- 'open_positions': open_positions_summary, # List of currently open positions
- 'open_positions_count': len(open_trades_for_token),
- 'current_open_orders_count': current_open_orders_for_token,
- 'summary_total_trades': effective_total_trades, # Completed cycles + open positions
- 'summary_total_realized_pnl': perf_stats['total_pnl'],
- 'summary_total_unrealized_pnl': total_open_unrealized_pnl,
- # 'cycles': token_cycles # Raw cycle data for completed trades is no longer stored directly after migration
- }
- def get_daily_stats(self, limit: int = 10) -> List[Dict[str, Any]]:
- """Get daily performance stats for the last N days from daily_aggregated_stats."""
- daily_stats_list = []
- today_utc = datetime.now(timezone.utc).date()
- for i in range(limit):
- target_date = today_utc - timedelta(days=i)
- date_str = target_date.strftime('%Y-%m-%d')
- date_formatted = target_date.strftime('%m/%d') # For display
- # Query for all tokens for that day and sum them up
- # Or, if daily_aggregated_stats stores an _OVERALL_ record, query that.
- # Assuming for now we sum up all token records for a given day.
- day_aggregated_data = self._fetch_query(
- "SELECT SUM(realized_pnl) as pnl, SUM(completed_cycles) as trades, SUM(exit_volume) as volume FROM daily_aggregated_stats WHERE date = ?",
- (date_str,)
- )
-
- stats_for_day = None
- if day_aggregated_data and len(day_aggregated_data) > 0 and day_aggregated_data[0]['trades'] is not None:
- stats_for_day = day_aggregated_data[0]
- # Calculate pnl_pct if volume is present and positive
- pnl = stats_for_day.get('pnl', 0.0) or 0.0
- volume = stats_for_day.get('volume', 0.0) or 0.0
- stats_for_day['pnl_pct'] = (pnl / volume * 100) if volume > 0 else 0.0
- # Ensure trades is an int
- stats_for_day['trades'] = int(stats_for_day.get('trades', 0) or 0)
- if stats_for_day and stats_for_day['trades'] > 0:
- daily_stats_list.append({
- 'date': date_str, 'date_formatted': date_formatted, 'has_trades': True,
- **stats_for_day
- })
- else:
- daily_stats_list.append({
- 'date': date_str, 'date_formatted': date_formatted, 'has_trades': False,
- 'trades': 0, 'pnl': 0.0, 'volume': 0.0, 'pnl_pct': 0.0
- })
- return daily_stats_list
- def get_weekly_stats(self, limit: int = 10) -> List[Dict[str, Any]]:
- """Get weekly performance stats for the last N weeks by aggregating daily_aggregated_stats."""
- weekly_stats_list = []
- today_utc = datetime.now(timezone.utc).date()
- for i in range(limit):
- target_monday = today_utc - timedelta(days=today_utc.weekday() + (i * 7))
- target_sunday = target_monday + timedelta(days=6)
-
- week_key_display = f"{target_monday.strftime('%Y-W%W')}" # For internal key if needed
- week_formatted_display = f"{target_monday.strftime('%m/%d')}-{target_sunday.strftime('%m/%d/%y')}"
- # Fetch daily records for this week range
- daily_records_for_week = self._fetch_query(
- "SELECT date, realized_pnl, completed_cycles, exit_volume FROM daily_aggregated_stats WHERE date BETWEEN ? AND ?",
- (target_monday.strftime('%Y-%m-%d'), target_sunday.strftime('%Y-%m-%d'))
- )
-
- if daily_records_for_week:
- total_pnl_week = sum(d.get('realized_pnl', 0.0) or 0.0 for d in daily_records_for_week)
- total_trades_week = sum(d.get('completed_cycles', 0) or 0 for d in daily_records_for_week)
- total_volume_week = sum(d.get('exit_volume', 0.0) or 0.0 for d in daily_records_for_week)
- pnl_pct_week = (total_pnl_week / total_volume_week * 100) if total_volume_week > 0 else 0.0
-
- if total_trades_week > 0:
- weekly_stats_list.append({
- 'week': week_key_display,
- 'week_formatted': week_formatted_display,
- 'has_trades': True,
- 'pnl': total_pnl_week,
- 'trades': total_trades_week,
- 'volume': total_volume_week,
- 'pnl_pct': pnl_pct_week
- })
- else:
- weekly_stats_list.append({
- 'week': week_key_display, 'week_formatted': week_formatted_display, 'has_trades': False,
- 'trades': 0, 'pnl': 0.0, 'volume': 0.0, 'pnl_pct': 0.0
- })
- else:
- weekly_stats_list.append({
- 'week': week_key_display, 'week_formatted': week_formatted_display, 'has_trades': False,
- 'trades': 0, 'pnl': 0.0, 'volume': 0.0, 'pnl_pct': 0.0
- })
- return weekly_stats_list
- def get_monthly_stats(self, limit: int = 10) -> List[Dict[str, Any]]:
- """Get monthly performance stats for the last N months by aggregating daily_aggregated_stats."""
- monthly_stats_list = []
- current_month_start_utc = datetime.now(timezone.utc).date().replace(day=1)
- for i in range(limit):
- year = current_month_start_utc.year
- month = current_month_start_utc.month - i
- while month <= 0:
- month += 12
- year -= 1
-
- target_month_start_date = datetime(year, month, 1, tzinfo=timezone.utc).date()
- # Find end of target month
- next_month_start_date = datetime(year + (month // 12), (month % 12) + 1, 1, tzinfo=timezone.utc).date() if month < 12 else datetime(year + 1, 1, 1, tzinfo=timezone.utc).date()
- target_month_end_date = next_month_start_date - timedelta(days=1)
-
- month_key_display = target_month_start_date.strftime('%Y-%m')
- month_formatted_display = target_month_start_date.strftime('%b %Y')
- daily_records_for_month = self._fetch_query(
- "SELECT date, realized_pnl, completed_cycles, exit_volume FROM daily_aggregated_stats WHERE date BETWEEN ? AND ?",
- (target_month_start_date.strftime('%Y-%m-%d'), target_month_end_date.strftime('%Y-%m-%d'))
- )
- if daily_records_for_month:
- total_pnl_month = sum(d.get('realized_pnl', 0.0) or 0.0 for d in daily_records_for_month)
- total_trades_month = sum(d.get('completed_cycles', 0) or 0 for d in daily_records_for_month)
- total_volume_month = sum(d.get('exit_volume', 0.0) or 0.0 for d in daily_records_for_month)
- pnl_pct_month = (total_pnl_month / total_volume_month * 100) if total_volume_month > 0 else 0.0
- if total_trades_month > 0:
- monthly_stats_list.append({
- 'month': month_key_display,
- 'month_formatted': month_formatted_display,
- 'has_trades': True,
- 'pnl': total_pnl_month,
- 'trades': total_trades_month,
- 'volume': total_volume_month,
- 'pnl_pct': pnl_pct_month
- })
- else:
- monthly_stats_list.append({
- 'month': month_key_display, 'month_formatted': month_formatted_display, 'has_trades': False,
- 'trades': 0, 'pnl': 0.0, 'volume': 0.0, 'pnl_pct': 0.0
- })
- else:
- monthly_stats_list.append({
- 'month': month_key_display, 'month_formatted': month_formatted_display, 'has_trades': False,
- 'trades': 0, 'pnl': 0.0, 'volume': 0.0, 'pnl_pct': 0.0
- })
- return monthly_stats_list
- def record_deposit(self, amount: float, timestamp: Optional[str] = None,
- deposit_id: Optional[str] = None, description: Optional[str] = None):
- """Record a deposit."""
- ts = timestamp if timestamp else datetime.now(timezone.utc).isoformat()
- formatter = get_formatter()
- formatted_amount_str = formatter.format_price_with_symbol(amount)
- desc = description if description else f'Deposit of {formatted_amount_str}'
-
- self._execute_query(
- "INSERT INTO balance_adjustments (adjustment_id, timestamp, type, amount, description) VALUES (?, ?, ?, ?, ?)",
- (deposit_id or str(uuid.uuid4()), ts, 'deposit', amount, desc) # Ensured uuid is string
- )
- # Adjust initial_balance in metadata to reflect capital changes
- current_initial = float(self._get_metadata('initial_balance') or '0.0')
- self._set_metadata('initial_balance', str(current_initial + amount))
- logger.info(f"💰 Recorded deposit: {formatted_amount_str}. New effective initial balance: {formatter.format_price_with_symbol(current_initial + amount)}")
- def record_withdrawal(self, amount: float, timestamp: Optional[str] = None,
- withdrawal_id: Optional[str] = None, description: Optional[str] = None):
- """Record a withdrawal."""
- ts = timestamp if timestamp else datetime.now(timezone.utc).isoformat()
- formatter = get_formatter()
- formatted_amount_str = formatter.format_price_with_symbol(amount)
- desc = description if description else f'Withdrawal of {formatted_amount_str}'
-
- self._execute_query(
- "INSERT INTO balance_adjustments (adjustment_id, timestamp, type, amount, description) VALUES (?, ?, ?, ?, ?)",
- (withdrawal_id or str(uuid.uuid4()), ts, 'withdrawal', amount, desc) # Ensured uuid is string
- )
- current_initial = float(self._get_metadata('initial_balance') or '0.0')
- self._set_metadata('initial_balance', str(current_initial - amount))
- logger.info(f"💸 Recorded withdrawal: {formatted_amount_str}. New effective initial balance: {formatter.format_price_with_symbol(current_initial - amount)}")
- def get_balance_adjustments_summary(self) -> Dict[str, Any]:
- """Get summary of all balance adjustments from DB."""
- adjustments = self._fetch_query("SELECT type, amount, timestamp FROM balance_adjustments ORDER BY timestamp ASC")
-
- if not adjustments:
- return {'total_deposits': 0.0, 'total_withdrawals': 0.0, 'net_adjustment': 0.0,
- 'adjustment_count': 0, 'last_adjustment': None}
-
- total_deposits = sum(adj['amount'] for adj in adjustments if adj['type'] == 'deposit')
- total_withdrawals = sum(adj['amount'] for adj in adjustments if adj['type'] == 'withdrawal') # Amounts stored positive
- net_adjustment = total_deposits - total_withdrawals
-
- return {
- 'total_deposits': total_deposits, 'total_withdrawals': total_withdrawals,
- 'net_adjustment': net_adjustment, 'adjustment_count': len(adjustments),
- 'last_adjustment': adjustments[-1]['timestamp'] if adjustments else None
- }
- def close_connection(self):
- """Close the SQLite database connection."""
- if self.conn:
- self.conn.close()
- logger.info("TradingStats SQLite connection closed.")
- def __del__(self):
- """Ensure connection is closed when object is deleted."""
- self.close_connection()
- # --- Order Table Management ---
- def record_order_placed(self, symbol: str, side: str, order_type: str,
- amount_requested: float, price: Optional[float] = None,
- bot_order_ref_id: Optional[str] = None,
- exchange_order_id: Optional[str] = None,
- status: str = 'open',
- parent_bot_order_ref_id: Optional[str] = None) -> Optional[int]:
- """Record a newly placed order in the 'orders' table. Returns the ID of the inserted order or None on failure."""
- now_iso = datetime.now(timezone.utc).isoformat()
- query = """
- INSERT INTO orders (bot_order_ref_id, exchange_order_id, symbol, side, type,
- amount_requested, price, status, timestamp_created, timestamp_updated, parent_bot_order_ref_id)
- VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- """
- params = (bot_order_ref_id, exchange_order_id, symbol, side.lower(), order_type.lower(),
- amount_requested, price, status.lower(), now_iso, now_iso, parent_bot_order_ref_id)
- try:
- cur = self.conn.cursor()
- cur.execute(query, params)
- self.conn.commit()
- order_db_id = cur.lastrowid
- logger.info(f"Recorded order placed: ID {order_db_id}, Symbol {symbol}, Side {side}, Type {order_type}, Amount {amount_requested}, BotRef {bot_order_ref_id}, ExchID {exchange_order_id}")
- return order_db_id
- except sqlite3.IntegrityError as e:
- logger.error(f"Failed to record order due to IntegrityError (likely duplicate bot_order_ref_id '{bot_order_ref_id}' or exchange_order_id '{exchange_order_id}'): {e}")
- return None
- except Exception as e:
- logger.error(f"Failed to record order: {e}")
- return None
- def update_order_status(self, order_db_id: Optional[int] = None, bot_order_ref_id: Optional[str] = None, exchange_order_id: Optional[str] = None,
- new_status: Optional[str] = None, amount_filled_increment: Optional[float] = None, set_exchange_order_id: Optional[str] = None) -> bool:
- """Update an existing order's status and/or amount_filled. Identify order by order_db_id, bot_order_ref_id, or exchange_order_id.
-
- Args:
- order_db_id: Database ID to identify the order
- bot_order_ref_id: Bot's internal reference ID to identify the order
- exchange_order_id: Exchange's order ID to identify the order
- new_status: New status to set
- amount_filled_increment: Amount to add to current filled amount
- set_exchange_order_id: If provided, sets/updates the exchange_order_id field in the database
- """
- if not any([order_db_id, bot_order_ref_id, exchange_order_id]):
- logger.error("Must provide one of order_db_id, bot_order_ref_id, or exchange_order_id to update order.")
- return False
- now_iso = datetime.now(timezone.utc).isoformat()
- set_clauses = []
- params = []
- if new_status:
- set_clauses.append("status = ?")
- params.append(new_status.lower())
-
- if set_exchange_order_id is not None:
- set_clauses.append("exchange_order_id = ?")
- params.append(set_exchange_order_id)
-
- current_amount_filled = 0.0
- identifier_clause = ""
- identifier_param = None
- if order_db_id:
- identifier_clause = "id = ?"
- identifier_param = order_db_id
- elif bot_order_ref_id:
- identifier_clause = "bot_order_ref_id = ?"
- identifier_param = bot_order_ref_id
- elif exchange_order_id:
- identifier_clause = "exchange_order_id = ?"
- identifier_param = exchange_order_id
- if amount_filled_increment is not None and amount_filled_increment > 0:
- # To correctly increment, we might need to fetch current filled amount first if DB doesn't support direct increment easily or atomically with other updates.
- # For simplicity here, assuming we can use SQL's increment if other fields are not changing, or we do it in two steps.
- # Let's assume we fetch first then update to be safe and clear.
- order_data = self._fetchone_query(f"SELECT amount_filled FROM orders WHERE {identifier_clause}", (identifier_param,))
- if order_data:
- current_amount_filled = order_data.get('amount_filled', 0.0)
- else:
- logger.warning(f"Order not found by {identifier_clause}={identifier_param} when trying to increment amount_filled.")
- # Potentially still update status if new_status is provided, but amount_filled won't be right.
- # For now, let's proceed with update if status is there.
- set_clauses.append("amount_filled = ?")
- params.append(current_amount_filled + amount_filled_increment)
- if not set_clauses:
- logger.info("No fields to update for order.")
- return True # No update needed, not an error
- set_clauses.append("timestamp_updated = ?")
- params.append(now_iso)
-
- params.append(identifier_param) # Add identifier param at the end for WHERE clause
- query = f"UPDATE orders SET { ', '.join(set_clauses) } WHERE {identifier_clause}"
-
- try:
- self._execute_query(query, tuple(params))
- log_msg = f"Updated order ({identifier_clause}={identifier_param}): Status to '{new_status or 'N/A'}', Filled increment {amount_filled_increment or 0.0}"
- if set_exchange_order_id is not None:
- log_msg += f", Exchange ID set to '{set_exchange_order_id}'"
- logger.info(log_msg)
- return True
- except Exception as e:
- logger.error(f"Failed to update order ({identifier_clause}={identifier_param}): {e}")
- return False
- def get_order_by_db_id(self, order_db_id: int) -> Optional[Dict[str, Any]]:
- """Fetch an order by its database primary key ID."""
- return self._fetchone_query("SELECT * FROM orders WHERE id = ?", (order_db_id,))
- def get_order_by_bot_ref_id(self, bot_order_ref_id: str) -> Optional[Dict[str, Any]]:
- """Fetch an order by the bot's internal reference ID."""
- return self._fetchone_query("SELECT * FROM orders WHERE bot_order_ref_id = ?", (bot_order_ref_id,))
- def get_order_by_exchange_id(self, exchange_order_id: str) -> Optional[Dict[str, Any]]:
- """Fetch an order by the exchange's order ID."""
- return self._fetchone_query("SELECT * FROM orders WHERE exchange_order_id = ?", (exchange_order_id,))
- def get_orders_by_status(self, status: str, order_type_filter: Optional[str] = None, parent_bot_order_ref_id: Optional[str] = None) -> List[Dict[str, Any]]:
- """Fetch all orders with a specific status, optionally filtering by order_type and parent_bot_order_ref_id."""
- query = "SELECT * FROM orders WHERE status = ?"
- params = [status.lower()]
- if order_type_filter:
- query += " AND type = ?"
- params.append(order_type_filter.lower())
- if parent_bot_order_ref_id:
- query += " AND parent_bot_order_ref_id = ?"
- params.append(parent_bot_order_ref_id)
- query += " ORDER BY timestamp_created ASC"
- return self._fetch_query(query, tuple(params))
- def cancel_linked_orders(self, parent_bot_order_ref_id: str, new_status: str = 'cancelled_parent_filled') -> int:
- """Cancel all orders linked to a parent order (e.g., pending stop losses when parent order fills or gets cancelled).
- Returns the number of orders that were cancelled."""
- linked_orders = self.get_orders_by_status('pending_trigger', parent_bot_order_ref_id=parent_bot_order_ref_id)
- cancelled_count = 0
-
- for order in linked_orders:
- order_db_id = order.get('id')
- if order_db_id:
- success = self.update_order_status(order_db_id=order_db_id, new_status=new_status)
- if success:
- cancelled_count += 1
- logger.info(f"Cancelled linked order ID {order_db_id} (parent: {parent_bot_order_ref_id}) -> status: {new_status}")
-
- return cancelled_count
- def cancel_pending_stop_losses_by_symbol(self, symbol: str, new_status: str = 'cancelled_position_closed') -> int:
- """Cancel all pending stop loss orders for a specific symbol (when position is closed).
- Returns the number of stop loss orders that were cancelled."""
- query = "SELECT * FROM orders WHERE symbol = ? AND status = 'pending_trigger' AND type = 'stop_limit_trigger'"
- pending_stop_losses = self._fetch_query(query, (symbol,))
- cancelled_count = 0
-
- for order in pending_stop_losses:
- order_db_id = order.get('id')
- if order_db_id:
- success = self.update_order_status(order_db_id=order_db_id, new_status=new_status)
- if success:
- cancelled_count += 1
- logger.info(f"Cancelled pending SL order ID {order_db_id} for {symbol} -> status: {new_status}")
-
- return cancelled_count
- def get_order_cleanup_summary(self) -> Dict[str, Any]:
- """Get summary of order cleanup actions for monitoring and debugging."""
- try:
- # Get counts of different cancellation types
- cleanup_stats = {}
-
- cancellation_types = [
- 'cancelled_parent_cancelled',
- 'cancelled_parent_disappeared',
- 'cancelled_manual_exit',
- 'cancelled_auto_exit',
- 'cancelled_no_position',
- 'cancelled_external_position_close',
- 'cancelled_orphaned_no_position',
- 'cancelled_externally',
- 'immediately_executed_on_activation',
- 'activation_execution_failed',
- 'activation_execution_error'
- ]
-
- for cancel_type in cancellation_types:
- count_result = self._fetchone_query(
- "SELECT COUNT(*) as count FROM orders WHERE status = ?",
- (cancel_type,)
- )
- cleanup_stats[cancel_type] = count_result['count'] if count_result else 0
-
- # Get currently pending stop losses
- pending_sls = self.get_orders_by_status('pending_trigger', 'stop_limit_trigger')
- cleanup_stats['currently_pending_stop_losses'] = len(pending_sls)
-
- # Get total orders in various states
- active_orders = self._fetchone_query(
- "SELECT COUNT(*) as count FROM orders WHERE status IN ('open', 'submitted', 'partially_filled')",
- ()
- )
- cleanup_stats['currently_active_orders'] = active_orders['count'] if active_orders else 0
-
- return cleanup_stats
-
- except Exception as e:
- logger.error(f"Error getting order cleanup summary: {e}")
- return {}
- def get_external_activity_summary(self, days: int = 7) -> Dict[str, Any]:
- """Get summary of external activity (trades and cancellations) over the last N days."""
- try:
- from datetime import timedelta
- cutoff_date = (datetime.now(timezone.utc) - timedelta(days=days)).isoformat()
-
- # External trades
- external_trades = self._fetch_query(
- "SELECT COUNT(*) as count, side FROM trades WHERE trade_type = 'external' AND timestamp >= ? GROUP BY side",
- (cutoff_date,)
- )
-
- external_trade_summary = {
- 'external_buy_trades': 0,
- 'external_sell_trades': 0,
- 'total_external_trades': 0
- }
-
- for trade_group in external_trades:
- side = trade_group['side']
- count = trade_group['count']
- external_trade_summary['total_external_trades'] += count
- if side == 'buy':
- external_trade_summary['external_buy_trades'] = count
- elif side == 'sell':
- external_trade_summary['external_sell_trades'] = count
-
- # External cancellations
- external_cancellations = self._fetchone_query(
- "SELECT COUNT(*) as count FROM orders WHERE status = 'cancelled_externally' AND timestamp_updated >= ?",
- (cutoff_date,)
- )
- external_trade_summary['external_cancellations'] = external_cancellations['count'] if external_cancellations else 0
-
- # Cleanup actions
- cleanup_cancellations = self._fetchone_query(
- """SELECT COUNT(*) as count FROM orders
- WHERE status LIKE 'cancelled_%'
- AND status != 'cancelled_externally'
- AND timestamp_updated >= ?""",
- (cutoff_date,)
- )
- external_trade_summary['cleanup_cancellations'] = cleanup_cancellations['count'] if cleanup_cancellations else 0
-
- external_trade_summary['period_days'] = days
-
- return external_trade_summary
-
- except Exception as e:
- logger.error(f"Error getting external activity summary: {e}")
- return {'period_days': days, 'total_external_trades': 0, 'external_cancellations': 0}
- # --- End Order Table Management ---
- # =============================================================================
- # TRADE LIFECYCLE MANAGEMENT - PHASE 4: UNIFIED TRADES TABLE
- # =============================================================================
-
- def create_trade_lifecycle(self, symbol: str, side: str, entry_order_id: Optional[str] = None,
- stop_loss_price: Optional[float] = None, take_profit_price: Optional[float] = None,
- trade_type: str = 'manual') -> Optional[str]:
- """Create a new trade lifecycle when an entry order is placed."""
- try:
- lifecycle_id = str(uuid.uuid4())
-
- query = """
- INSERT INTO trades (
- symbol, side, amount, price, value, trade_type, timestamp,
- status, trade_lifecycle_id, position_side, entry_order_id,
- stop_loss_price, take_profit_price, updated_at
- ) VALUES (?, ?, 0, 0, 0, ?, ?, 'pending', ?, 'flat', ?, ?, ?, ?)
- """
- timestamp = datetime.now(timezone.utc).isoformat()
- params = (symbol, side.lower(), trade_type, timestamp, lifecycle_id,
- entry_order_id, stop_loss_price, take_profit_price, timestamp)
-
- self._execute_query(query, params)
-
- logger.info(f"📊 Created trade lifecycle {lifecycle_id}: {side.upper()} {symbol} (pending)")
- return lifecycle_id
-
- except Exception as e:
- logger.error(f"❌ Error creating trade lifecycle: {e}")
- return None
-
- def update_trade_position_opened(self, lifecycle_id: str, entry_price: float,
- entry_amount: float, exchange_fill_id: str) -> bool:
- """Update trade when position is opened (entry order filled)."""
- try:
- query = """
- UPDATE trades
- SET status = 'position_opened',
- amount = ?,
- price = ?,
- value = ?,
- entry_price = ?,
- current_position_size = ?,
- position_side = CASE
- WHEN side = 'buy' THEN 'long'
- WHEN side = 'sell' THEN 'short'
- ELSE position_side
- END,
- exchange_fill_id = ?,
- position_opened_at = ?,
- updated_at = ?
- WHERE trade_lifecycle_id = ? AND status = 'pending'
- """
- timestamp = datetime.now(timezone.utc).isoformat()
- value = entry_amount * entry_price
- params = (entry_amount, entry_price, value, entry_price, entry_amount,
- exchange_fill_id, timestamp, timestamp, lifecycle_id)
-
- self._execute_query(query, params)
-
- formatter = get_formatter()
- trade_info = self.get_trade_by_lifecycle_id(lifecycle_id) # Fetch to get symbol for formatting
- symbol_for_formatting = trade_info.get('symbol', 'UNKNOWN_SYMBOL') if trade_info else 'UNKNOWN_SYMBOL'
- base_asset_for_amount = symbol_for_formatting.split('/')[0] if '/' in symbol_for_formatting else symbol_for_formatting
- logger.info(f"📈 Trade lifecycle {lifecycle_id} position opened: {formatter.format_amount(entry_amount, base_asset_for_amount)} {symbol_for_formatting} @ {formatter.format_price(entry_price, symbol_for_formatting)}")
- return True
-
- except Exception as e:
- logger.error(f"❌ Error updating trade position opened: {e}")
- return False
-
- def update_trade_position_closed(self, lifecycle_id: str, exit_price: float,
- realized_pnl: float, exchange_fill_id: str) -> bool:
- """Update trade when position is fully closed."""
- try:
- query = """
- UPDATE trades
- SET status = 'position_closed',
- current_position_size = 0,
- position_side = 'flat',
- realized_pnl = ?,
- position_closed_at = ?,
- updated_at = ?
- WHERE trade_lifecycle_id = ? AND status = 'position_opened'
- """
- timestamp = datetime.now(timezone.utc).isoformat()
- params = (realized_pnl, timestamp, timestamp, lifecycle_id)
-
- self._execute_query(query, params)
-
- formatter = get_formatter()
- trade_info = self.get_trade_by_lifecycle_id(lifecycle_id) # Fetch to get symbol for P&L formatting context
- symbol_for_formatting = trade_info.get('symbol', 'USD') # Default to USD for PNL if symbol unknown
- pnl_emoji = "🟢" if realized_pnl >= 0 else "🔴"
- logger.info(f"{pnl_emoji} Trade lifecycle {lifecycle_id} position closed: P&L {formatter.format_price_with_symbol(realized_pnl)}")
- return True
-
- except Exception as e:
- logger.error(f"❌ Error updating trade position closed: {e}")
- return False
-
- def update_trade_cancelled(self, lifecycle_id: str, reason: str = "order_cancelled") -> bool:
- """Update trade when entry order is cancelled (never opened)."""
- try:
- query = """
- UPDATE trades
- SET status = 'cancelled',
- notes = ?,
- updated_at = ?
- WHERE trade_lifecycle_id = ? AND status = 'pending'
- """
- timestamp = datetime.now(timezone.utc).isoformat()
- params = (f"Cancelled: {reason}", timestamp, lifecycle_id)
-
- self._execute_query(query, params)
-
- logger.info(f"❌ Trade lifecycle {lifecycle_id} cancelled: {reason}")
- return True
-
- except Exception as e:
- logger.error(f"❌ Error updating trade cancelled: {e}")
- return False
-
- def link_stop_loss_to_trade(self, lifecycle_id: str, stop_loss_order_id: str,
- stop_loss_price: float) -> bool:
- """Link a stop loss order to a trade lifecycle."""
- try:
- query = """
- UPDATE trades
- SET stop_loss_order_id = ?,
- stop_loss_price = ?,
- updated_at = ?
- WHERE trade_lifecycle_id = ? AND status = 'position_opened'
- """
- timestamp = datetime.now(timezone.utc).isoformat()
- params = (stop_loss_order_id, stop_loss_price, timestamp, lifecycle_id)
-
- self._execute_query(query, params)
-
- formatter = get_formatter()
- trade_info = self.get_trade_by_lifecycle_id(lifecycle_id) # Fetch to get symbol for formatting
- symbol_for_formatting = trade_info.get('symbol', 'UNKNOWN_SYMBOL') if trade_info else 'UNKNOWN_SYMBOL'
- logger.info(f"🛑 Linked stop loss order {stop_loss_order_id} ({formatter.format_price(stop_loss_price, symbol_for_formatting)}) to trade {lifecycle_id}")
- return True
-
- except Exception as e:
- logger.error(f"❌ Error linking stop loss to trade: {e}")
- return False
-
- def link_take_profit_to_trade(self, lifecycle_id: str, take_profit_order_id: str,
- take_profit_price: float) -> bool:
- """Link a take profit order to a trade lifecycle."""
- try:
- query = """
- UPDATE trades
- SET take_profit_order_id = ?,
- take_profit_price = ?,
- updated_at = ?
- WHERE trade_lifecycle_id = ? AND status = 'position_opened'
- """
- timestamp = datetime.now(timezone.utc).isoformat()
- params = (take_profit_order_id, take_profit_price, timestamp, lifecycle_id)
-
- self._execute_query(query, params)
-
- formatter = get_formatter()
- trade_info = self.get_trade_by_lifecycle_id(lifecycle_id) # Fetch to get symbol for formatting
- symbol_for_formatting = trade_info.get('symbol', 'UNKNOWN_SYMBOL') if trade_info else 'UNKNOWN_SYMBOL'
- logger.info(f"🎯 Linked take profit order {take_profit_order_id} ({formatter.format_price(take_profit_price, symbol_for_formatting)}) to trade {lifecycle_id}")
- return True
-
- except Exception as e:
- logger.error(f"❌ Error linking take profit to trade: {e}")
- return False
-
- def get_trade_by_lifecycle_id(self, lifecycle_id: str) -> Optional[Dict[str, Any]]:
- """Get trade by lifecycle ID."""
- query = "SELECT * FROM trades WHERE trade_lifecycle_id = ?"
- return self._fetchone_query(query, (lifecycle_id,))
-
- def get_trade_by_symbol_and_status(self, symbol: str, status: str = 'position_opened') -> Optional[Dict[str, Any]]:
- """Get trade by symbol and status."""
- query = "SELECT * FROM trades WHERE symbol = ? AND status = ? ORDER BY updated_at DESC LIMIT 1"
- return self._fetchone_query(query, (symbol, status))
-
- def get_open_positions(self, symbol: Optional[str] = None) -> List[Dict[str, Any]]:
- """Get all open positions, optionally filtered by symbol."""
- if symbol:
- query = "SELECT * FROM trades WHERE status = 'position_opened' AND symbol = ? ORDER BY position_opened_at DESC"
- return self._fetch_query(query, (symbol,))
- else:
- query = "SELECT * FROM trades WHERE status = 'position_opened' ORDER BY position_opened_at DESC"
- return self._fetch_query(query)
-
- def get_trades_by_status(self, status: str, limit: int = 50) -> List[Dict[str, Any]]:
- """Get trades by status."""
- query = "SELECT * FROM trades WHERE status = ? ORDER BY updated_at DESC LIMIT ?"
- return self._fetch_query(query, (status, limit))
-
- def get_lifecycle_by_entry_order_id(self, entry_exchange_order_id: str, status: Optional[str] = None) -> Optional[Dict[str, Any]]:
- """Get a trade lifecycle by its entry_order_id (exchange ID) and optionally by status."""
- if status:
- query = "SELECT * FROM trades WHERE entry_order_id = ? AND status = ? LIMIT 1"
- params = (entry_exchange_order_id, status)
- else:
- query = "SELECT * FROM trades WHERE entry_order_id = ? LIMIT 1"
- params = (entry_exchange_order_id,)
- return self._fetchone_query(query, params)
- def get_lifecycle_by_sl_order_id(self, sl_exchange_order_id: str, status: str = 'position_opened') -> Optional[Dict[str, Any]]:
- """Get an active trade lifecycle by its stop_loss_order_id (exchange ID)."""
- query = "SELECT * FROM trades WHERE stop_loss_order_id = ? AND status = ? LIMIT 1"
- return self._fetchone_query(query, (sl_exchange_order_id, status))
- def get_lifecycle_by_tp_order_id(self, tp_exchange_order_id: str, status: str = 'position_opened') -> Optional[Dict[str, Any]]:
- """Get an active trade lifecycle by its take_profit_order_id (exchange ID)."""
- query = "SELECT * FROM trades WHERE take_profit_order_id = ? AND status = ? LIMIT 1"
- return self._fetchone_query(query, (tp_exchange_order_id, status))
-
- def get_pending_stop_loss_activations(self) -> List[Dict[str, Any]]:
- """Get open positions that need stop loss activation."""
- query = """
- SELECT * FROM trades
- WHERE status = 'position_opened'
- AND stop_loss_price IS NOT NULL
- AND stop_loss_order_id IS NULL
- ORDER BY updated_at ASC
- """
- return self._fetch_query(query)
-
- def cleanup_old_cancelled_trades(self, days_old: int = 7) -> int:
- """Clean up old cancelled trades (optional - for housekeeping)."""
- try:
- cutoff_date = (datetime.now(timezone.utc) - timedelta(days=days_old)).isoformat()
-
- # Count before deletion
- count_query = """
- SELECT COUNT(*) as count FROM trades
- WHERE status = 'cancelled' AND updated_at < ?
- """
- count_result = self._fetchone_query(count_query, (cutoff_date,))
- count_to_delete = count_result['count'] if count_result else 0
-
- if count_to_delete > 0:
- delete_query = """
- DELETE FROM trades
- WHERE status = 'cancelled' AND updated_at < ?
- """
- self._execute_query(delete_query, (cutoff_date,))
- logger.info(f"🧹 Cleaned up {count_to_delete} old cancelled trades (older than {days_old} days)")
-
- return count_to_delete
-
- except Exception as e:
- logger.error(f"❌ Error cleaning up old cancelled trades: {e}")
- return 0
-
- def confirm_position_with_exchange(self, symbol: str, exchange_position_size: float,
- exchange_open_orders: List[Dict]) -> bool:
- """🆕 PHASE 4: Confirm position status with exchange before updating status."""
- try:
- # Get current trade status
- current_trade = self.get_trade_by_symbol_and_status(symbol, 'position_opened')
-
- if not current_trade:
- return True # No open position to confirm
-
- lifecycle_id = current_trade['trade_lifecycle_id']
- has_open_orders = len([o for o in exchange_open_orders if o.get('symbol') == symbol]) > 0
-
- # Only close position if exchange confirms no position AND no pending orders
- if abs(exchange_position_size) < 1e-8 and not has_open_orders:
- # Calculate realized P&L based on position side
- position_side = current_trade['position_side']
- entry_price_db = current_trade['entry_price'] # entry_price from db
- # current_amount = current_trade['current_position_size'] # Not directly used for PNL calc here
-
- # For a closed position, we need to calculate final P&L
- # This would typically come from the closing trade, but for confirmation we estimate
- estimated_pnl = current_trade.get('realized_pnl', 0) # Use existing realized_pnl if any
-
- success = self.update_trade_position_closed(
- lifecycle_id,
- entry_price_db, # Using entry price from DB as estimate since position is confirmed closed
- estimated_pnl,
- "exchange_confirmed_closed"
- )
-
- if success:
- logger.info(f"✅ Confirmed position closed for {symbol} with exchange")
-
- return success
-
- return True # Position still exists on exchange, no update needed
-
- except Exception as e:
- logger.error(f"❌ Error confirming position with exchange: {e}")
- return False
- def update_trade_market_data(self,
- trade_lifecycle_id: str,
- unrealized_pnl: Optional[float] = None,
- mark_price: Optional[float] = None,
- current_position_size: Optional[float] = None,
- entry_price: Optional[float] = None,
- liquidation_price: Optional[float] = None,
- margin_used: Optional[float] = None,
- leverage: Optional[float] = None,
- position_value: Optional[float] = None,
- unrealized_pnl_percentage: Optional[float] = None) -> bool:
- """Update market-related data for an open trade lifecycle.
- Only updates fields for which a non-None value is provided.
- """
- try:
- updates = []
- params = []
-
- if unrealized_pnl is not None:
- updates.append("unrealized_pnl = ?")
- params.append(unrealized_pnl)
- if mark_price is not None:
- updates.append("mark_price = ?")
- params.append(mark_price)
- if current_position_size is not None:
- updates.append("current_position_size = ?")
- params.append(current_position_size)
- if entry_price is not None: # If exchange provides updated avg entry
- updates.append("entry_price = ?")
- params.append(entry_price)
- if liquidation_price is not None:
- updates.append("liquidation_price = ?")
- params.append(liquidation_price)
- if margin_used is not None:
- updates.append("margin_used = ?")
- params.append(margin_used)
- if leverage is not None:
- updates.append("leverage = ?")
- params.append(leverage)
- if position_value is not None:
- updates.append("position_value = ?")
- params.append(position_value)
- if unrealized_pnl_percentage is not None:
- updates.append("unrealized_pnl_percentage = ?")
- params.append(unrealized_pnl_percentage)
- if not updates:
- logger.debug(f"No market data fields provided to update for lifecycle {trade_lifecycle_id}.")
- return True # No update needed, not an error
- timestamp = datetime.now(timezone.utc).isoformat()
- updates.append("updated_at = ?")
- params.append(timestamp)
- set_clause = ", ".join(updates)
- query = f"""
- UPDATE trades
- SET {set_clause}
- WHERE trade_lifecycle_id = ? AND status = 'position_opened'
- """
- params.append(trade_lifecycle_id)
-
- # Use the class's own connection self.conn
- cursor = self.conn.cursor()
- cursor.execute(query, tuple(params))
- self.conn.commit()
- updated_rows = cursor.rowcount
- if updated_rows > 0:
- logger.debug(f"💹 Updated market data for lifecycle {trade_lifecycle_id}. Fields: {updates}")
- return True
- else:
- # This might happen if the lifecycle ID doesn't exist or status is not 'position_opened'
- # logger.warning(f"⚠️ No trade found or not in 'position_opened' state for lifecycle {trade_lifecycle_id} to update market data.")
- return False # Not necessarily an error
- except Exception as e:
- logger.error(f"❌ Error updating market data for trade lifecycle {trade_lifecycle_id}: {e}")
- return False
- # --- End Trade Lifecycle Management ---
- def get_daily_balance_record_count(self) -> int:
- """Get the total number of daily balance records."""
- row = self._fetchone_query("SELECT COUNT(*) as count FROM daily_balances")
- return row['count'] if row and 'count' in row else 0
- # 🆕 PHASE 5: AGGREGATION AND PURGING LOGIC
- def _migrate_trade_to_aggregated_stats(self, trade_lifecycle_id: str):
- """Migrate a completed/cancelled trade's stats to aggregate tables and delete the original trade."""
- trade_data = self.get_trade_by_lifecycle_id(trade_lifecycle_id)
- if not trade_data:
- logger.error(f"Cannot migrate trade {trade_lifecycle_id}: Not found.")
- return
- status = trade_data.get('status')
- symbol = trade_data.get('symbol')
- token = symbol.split('/')[0] if symbol and '/' in symbol else symbol # Assuming symbol like BTC/USDT
- if not token:
- logger.error(f"Cannot migrate trade {trade_lifecycle_id}: Token could not be derived from symbol '{symbol}'.")
- return
- now_iso = datetime.now(timezone.utc).isoformat()
- try:
- with self.conn: # Ensures atomicity for the operations below
- if status == 'position_closed':
- realized_pnl = trade_data.get('realized_pnl', 0.0)
- # Use entry value if available, otherwise value (amount * price at entry)
- entry_value = trade_data.get('value', 0.0) # 'value' is amount * price from initial trade record
- # For exit_value, we'd ideally have the value of the closing trade(s).
- # If the 'realized_pnl' is from the trade record, and 'entry_value' is entry, exit_value = entry_value + realized_pnl
- exit_value = entry_value + realized_pnl
- closed_at_str = trade_data.get('position_closed_at', now_iso)
- closed_at_dt = datetime.fromisoformat(closed_at_str)
- date_str = closed_at_dt.strftime('%Y-%m-%d')
- # Update token_stats
- token_upsert_query = """
- INSERT INTO token_stats (
- token, total_realized_pnl, total_completed_cycles, winning_cycles, losing_cycles,
- total_entry_volume, total_exit_volume, sum_of_winning_pnl, sum_of_losing_pnl,
- largest_winning_cycle_pnl, largest_losing_cycle_pnl,
- first_cycle_closed_at, last_cycle_closed_at, updated_at
- ) VALUES (?, ?, 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- ON CONFLICT(token) DO UPDATE SET
- total_realized_pnl = total_realized_pnl + excluded.total_realized_pnl,
- total_completed_cycles = total_completed_cycles + 1,
- winning_cycles = winning_cycles + excluded.winning_cycles,
- losing_cycles = losing_cycles + excluded.losing_cycles,
- total_entry_volume = total_entry_volume + excluded.total_entry_volume,
- total_exit_volume = total_exit_volume + excluded.total_exit_volume,
- sum_of_winning_pnl = sum_of_winning_pnl + excluded.sum_of_winning_pnl,
- sum_of_losing_pnl = sum_of_losing_pnl + excluded.sum_of_losing_pnl,
- largest_winning_cycle_pnl = MAX(largest_winning_cycle_pnl, excluded.largest_winning_cycle_pnl),
- largest_losing_cycle_pnl = MAX(largest_losing_cycle_pnl, excluded.largest_losing_cycle_pnl),
- first_cycle_closed_at = MIN(first_cycle_closed_at, excluded.first_cycle_closed_at),
- last_cycle_closed_at = MAX(last_cycle_closed_at, excluded.last_cycle_closed_at),
- updated_at = excluded.updated_at
- """
- is_win = 1 if realized_pnl > 0 else 0
- is_loss = 1 if realized_pnl < 0 else 0
- win_pnl_contrib = realized_pnl if realized_pnl > 0 else 0.0
- loss_pnl_contrib = abs(realized_pnl) if realized_pnl < 0 else 0.0
-
- self._execute_query(token_upsert_query, (
- token, realized_pnl, is_win, is_loss, entry_value, exit_value,
- win_pnl_contrib, loss_pnl_contrib, win_pnl_contrib, loss_pnl_contrib,
- closed_at_str, closed_at_str, now_iso
- ))
- # Update daily_aggregated_stats
- daily_upsert_query = """
- INSERT INTO daily_aggregated_stats (
- date, token, realized_pnl, completed_cycles, entry_volume, exit_volume
- ) VALUES (?, ?, ?, 1, ?, ?)
- ON CONFLICT(date, token) DO UPDATE SET
- realized_pnl = realized_pnl + excluded.realized_pnl,
- completed_cycles = completed_cycles + 1,
- entry_volume = entry_volume + excluded.entry_volume,
- exit_volume = exit_volume + excluded.exit_volume
- """
- self._execute_query(daily_upsert_query, (
- date_str, token, realized_pnl, entry_value, exit_value
- ))
- logger.info(f"Aggregated stats for closed trade lifecycle {trade_lifecycle_id} ({token}). PNL: {realized_pnl:.2f}")
- elif status == 'cancelled':
- # Update token_stats for cancelled count
- cancelled_upsert_query = """
- INSERT INTO token_stats (token, total_cancelled_cycles, updated_at)
- VALUES (?, 1, ?)
- ON CONFLICT(token) DO UPDATE SET
- total_cancelled_cycles = total_cancelled_cycles + 1,
- updated_at = excluded.updated_at
- """
- self._execute_query(cancelled_upsert_query, (token, now_iso))
- logger.info(f"Incremented cancelled_cycles for {token} due to lifecycle {trade_lifecycle_id}.")
-
- # Delete the original trade from the 'trades' table
- self._execute_query("DELETE FROM trades WHERE trade_lifecycle_id = ?", (trade_lifecycle_id,))
- logger.info(f"Deleted trade lifecycle {trade_lifecycle_id} from trades table after aggregation.")
- except sqlite3.Error as e:
- logger.error(f"Database error migrating trade {trade_lifecycle_id} to aggregate stats: {e}", exc_info=True)
- except Exception as e:
- logger.error(f"Unexpected error migrating trade {trade_lifecycle_id} to aggregate stats: {e}", exc_info=True)
- def purge_old_daily_aggregated_stats(self, months_to_keep: int = 10):
- """Purge records from daily_aggregated_stats older than a specified number of months."""
- if months_to_keep <= 0:
- logger.info("Not purging daily_aggregated_stats as months_to_keep is not positive.")
- return
- try:
- # Calculate the cutoff date
- # This is a bit simplified; for more precise month calculations, dateutil.relativedelta might be used
- # For SQLite, comparing YYYY-MM-DD strings works well.
- cutoff_date = datetime.now(timezone.utc).date() - timedelta(days=months_to_keep * 30) # Approximate
- cutoff_date_str = cutoff_date.strftime('%Y-%m-%d')
- query = "DELETE FROM daily_aggregated_stats WHERE date < ?"
-
- # To count before deleting (optional, for logging)
- # count_query = "SELECT COUNT(*) as count FROM daily_aggregated_stats WHERE date < ?"
- # before_count_row = self._fetchone_query(count_query, (cutoff_date_str,))
- # num_to_delete = before_count_row['count'] if before_count_row else 0
- with self.conn:
- cursor = self.conn.cursor()
- cursor.execute(query, (cutoff_date_str,))
- rows_deleted = cursor.rowcount
-
- if rows_deleted > 0:
- logger.info(f"Purged {rows_deleted} old records from daily_aggregated_stats (older than approx. {months_to_keep} months, before {cutoff_date_str}).")
- else:
- logger.info(f"No old records found in daily_aggregated_stats to purge (older than approx. {months_to_keep} months, before {cutoff_date_str}).")
- except sqlite3.Error as e:
- logger.error(f"Database error purging old daily_aggregated_stats: {e}", exc_info=True)
- except Exception as e:
- logger.error(f"Unexpected error purging old daily_aggregated_stats: {e}", exc_info=True)
|