12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622 |
- #!/usr/bin/env python3
- """
- Trading Statistics Tracker (SQLite Version)
- Tracks and calculates comprehensive trading statistics using an SQLite database.
- """
- import sqlite3
- import os
- import logging
- from datetime import datetime, timedelta, timezone
- from typing import Dict, List, Any, Optional, Tuple, Union
- import math
- from collections import defaultdict
- import uuid
- # 🆕 Import the migration runner
- from src.migrations.migrate_db import run_migrations as run_db_migrations
- logger = logging.getLogger(__name__)
- def _normalize_token_case(token: str) -> str:
- """
- Normalize token case: if any characters are already uppercase, keep as-is.
- Otherwise, convert to uppercase. This handles mixed-case tokens like kPEPE, kBONK.
- """
- # Check if any character is already uppercase
- if any(c.isupper() for c in token):
- return token # Keep original case for mixed-case tokens
- else:
- return token.upper() # Convert to uppercase for all-lowercase input
- class TradingStats:
- """Comprehensive trading statistics tracker using SQLite."""
- def __init__(self, db_path: str = "data/trading_stats.sqlite"):
- """Initialize the stats tracker and connect to SQLite DB."""
- self.db_path = db_path
- self._ensure_data_directory()
-
- # 🆕 Run database migrations before connecting and creating tables
- # This ensures the schema is up-to-date when the connection is made
- # and tables are potentially created for the first time.
- logger.info("Running database migrations if needed...")
- run_db_migrations() # Uses DB_PATH defined in migrate_db.py, which should be the same
- logger.info("Database migration check complete.")
-
- self.conn = sqlite3.connect(self.db_path, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES)
- self.conn.row_factory = self._dict_factory
- self._create_tables() # CREATE IF NOT EXISTS will still be useful for first-time setup
- self._initialize_metadata() # Also potentially sets schema_version if DB was just created
- def _dict_factory(self, cursor, row):
- """Convert SQLite rows to dictionaries."""
- d = {}
- for idx, col in enumerate(cursor.description):
- d[col[0]] = row[idx]
- return d
- def _ensure_data_directory(self):
- """Ensure the data directory for the SQLite file exists."""
- data_dir = os.path.dirname(self.db_path)
- if data_dir and not os.path.exists(data_dir):
- os.makedirs(data_dir)
- logger.info(f"Created data directory for TradingStats DB: {data_dir}")
- def _execute_query(self, query: str, params: tuple = ()):
- """Execute a query (INSERT, UPDATE, DELETE)."""
- with self.conn:
- self.conn.execute(query, params)
- def _fetch_query(self, query: str, params: tuple = ()) -> List[Dict[str, Any]]:
- """Execute a SELECT query and fetch all results."""
- cur = self.conn.cursor()
- cur.execute(query, params)
- return cur.fetchall()
- def _fetchone_query(self, query: str, params: tuple = ()) -> Optional[Dict[str, Any]]:
- """Execute a SELECT query and fetch one result."""
- cur = self.conn.cursor()
- cur.execute(query, params)
- return cur.fetchone()
- def _create_tables(self):
- """Create SQLite tables if they don't exist."""
- queries = [
- """
- CREATE TABLE IF NOT EXISTS metadata (
- key TEXT PRIMARY KEY,
- value TEXT
- )
- """,
- """
- CREATE TABLE IF NOT EXISTS trades (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- exchange_fill_id TEXT UNIQUE,
- timestamp TEXT NOT NULL,
- symbol TEXT NOT NULL,
- side TEXT NOT NULL,
- amount REAL NOT NULL,
- price REAL NOT NULL,
- value REAL NOT NULL,
- trade_type TEXT NOT NULL,
- pnl REAL DEFAULT 0.0,
- linked_order_table_id INTEGER,
-
- -- 🆕 PHASE 4: Lifecycle tracking fields (merged from active_trades)
- status TEXT DEFAULT 'executed', -- 'pending', 'executed', 'position_opened', 'position_closed', 'cancelled'
- trade_lifecycle_id TEXT, -- Groups related trades into one lifecycle
- position_side TEXT, -- 'long', 'short', 'flat' - the resulting position side
-
- -- Position tracking
- entry_price REAL,
- current_position_size REAL DEFAULT 0,
-
- -- Order IDs (exchange IDs)
- entry_order_id TEXT,
- stop_loss_order_id TEXT,
- take_profit_order_id TEXT,
-
- -- Risk management
- stop_loss_price REAL,
- take_profit_price REAL,
-
- -- P&L tracking
- realized_pnl REAL DEFAULT 0,
- unrealized_pnl REAL DEFAULT 0,
- mark_price REAL DEFAULT 0,
- position_value REAL DEFAULT NULL,
-
- -- Risk Info from Exchange
- liquidation_price REAL DEFAULT NULL,
- margin_used REAL DEFAULT NULL,
- leverage REAL DEFAULT NULL,
-
- -- Timestamps
- position_opened_at TEXT,
- position_closed_at TEXT,
- updated_at TEXT DEFAULT CURRENT_TIMESTAMP,
-
- -- Notes
- notes TEXT
- )
- """,
- """
- CREATE TABLE IF NOT EXISTS daily_balances (
- date TEXT PRIMARY KEY,
- balance REAL NOT NULL,
- timestamp TEXT NOT NULL
- )
- """,
- """
- CREATE TABLE IF NOT EXISTS balance_adjustments (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- adjustment_id TEXT UNIQUE,
- timestamp TEXT NOT NULL,
- type TEXT NOT NULL, -- 'deposit' or 'withdrawal'
- amount REAL NOT NULL, -- Always positive, type indicates direction
- description TEXT
- )
- """,
- """
- CREATE TABLE IF NOT EXISTS orders (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- bot_order_ref_id TEXT UNIQUE,
- exchange_order_id TEXT UNIQUE,
- symbol TEXT NOT NULL,
- side TEXT NOT NULL,
- type TEXT NOT NULL,
- amount_requested REAL NOT NULL,
- amount_filled REAL DEFAULT 0.0,
- price REAL, -- For limit, stop, etc.
- status TEXT NOT NULL, -- e.g., 'open', 'partially_filled', 'filled', 'cancelled', 'rejected', 'expired', 'pending_trigger'
- timestamp_created TEXT NOT NULL,
- timestamp_updated TEXT NOT NULL,
- parent_bot_order_ref_id TEXT NULLABLE -- To link conditional orders (like SL triggers) to their parent order
- )
- """,
- """
- CREATE INDEX IF NOT EXISTS idx_orders_bot_order_ref_id ON orders (bot_order_ref_id);
- """,
- """
- CREATE INDEX IF NOT EXISTS idx_orders_exchange_order_id ON orders (exchange_order_id);
- """,
- """
- CREATE INDEX IF NOT EXISTS idx_trades_exchange_fill_id ON trades (exchange_fill_id);
- """,
- """
- CREATE INDEX IF NOT EXISTS idx_trades_linked_order_table_id ON trades (linked_order_table_id);
- """,
- """
- CREATE INDEX IF NOT EXISTS idx_orders_parent_bot_order_ref_id ON orders (parent_bot_order_ref_id);
- """,
- """
- CREATE INDEX IF NOT EXISTS idx_orders_status_type ON orders (status, type);
- """,
- """
- CREATE INDEX IF NOT EXISTS idx_trades_status ON trades (status);
- """,
- """
- CREATE INDEX IF NOT EXISTS idx_trades_lifecycle_id ON trades (trade_lifecycle_id);
- """,
- """
- CREATE INDEX IF NOT EXISTS idx_trades_position_side ON trades (position_side);
- """,
- """
- CREATE INDEX IF NOT EXISTS idx_trades_symbol_status ON trades (symbol, status);
- """
- ]
- for query in queries:
- self._execute_query(query)
- logger.info("SQLite tables ensured for TradingStats.")
- def _initialize_metadata(self):
- """Initialize metadata if not already present."""
- start_date = self._get_metadata('start_date')
- initial_balance = self._get_metadata('initial_balance')
- if start_date is None:
- self._set_metadata('start_date', datetime.now(timezone.utc).isoformat())
- logger.info("Initialized 'start_date' in metadata.")
-
- if initial_balance is None:
- self._set_metadata('initial_balance', '0.0')
- logger.info("Initialized 'initial_balance' in metadata.")
-
- logger.info(f"TradingStats initialized. Start Date: {self._get_metadata('start_date')}, Initial Balance: {self._get_metadata('initial_balance')}")
- def _get_metadata(self, key: str) -> Optional[str]:
- """Retrieve a value from the metadata table."""
- row = self._fetchone_query("SELECT value FROM metadata WHERE key = ?", (key,))
- return row['value'] if row else None
- def _set_metadata(self, key: str, value: str):
- """Set a value in the metadata table."""
- self._execute_query("INSERT OR REPLACE INTO metadata (key, value) VALUES (?, ?)", (key, value))
- def set_initial_balance(self, balance: float):
- """Set the initial balance if not already set or zero."""
- current_initial_balance_str = self._get_metadata('initial_balance')
- current_initial_balance = float(current_initial_balance_str) if current_initial_balance_str else 0.0
-
- if current_initial_balance == 0.0: # Only set if it's effectively unset
- self._set_metadata('initial_balance', str(balance))
- # Also set start_date if it's the first time setting balance
- if self._get_metadata('start_date') is None or float(current_initial_balance_str if current_initial_balance_str else '0.0') == 0.0:
- self._set_metadata('start_date', datetime.now(timezone.utc).isoformat())
- logger.info(f"Initial balance set to: ${balance:.2f}")
- else:
- logger.info(f"Initial balance already set to ${current_initial_balance:.2f}. Not changing.")
- def record_balance(self, balance: float):
- """Record daily balance snapshot."""
- today_iso = datetime.now(timezone.utc).date().isoformat()
- now_iso = datetime.now(timezone.utc).isoformat()
-
- existing_entry = self._fetchone_query("SELECT date FROM daily_balances WHERE date = ?", (today_iso,))
- if existing_entry:
- self._execute_query("UPDATE daily_balances SET balance = ?, timestamp = ? WHERE date = ?",
- (balance, now_iso, today_iso))
- else:
- self._execute_query("INSERT INTO daily_balances (date, balance, timestamp) VALUES (?, ?, ?)",
- (today_iso, balance, now_iso))
- # logger.debug(f"Recorded balance for {today_iso}: ${balance:.2f}") # Potentially too verbose
- def record_trade(self, symbol: str, side: str, amount: float, price: float,
- exchange_fill_id: Optional[str] = None, trade_type: str = "manual",
- pnl: Optional[float] = None, timestamp: Optional[str] = None,
- linked_order_table_id_to_link: Optional[int] = None):
- """Record a trade in the database."""
- if timestamp is None:
- timestamp = datetime.now(timezone.utc).isoformat()
-
- value = amount * price
- self._execute_query(
- "INSERT OR IGNORE INTO trades (symbol, side, amount, price, value, trade_type, timestamp, exchange_fill_id, pnl, linked_order_table_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
- (symbol, side, amount, price, value, trade_type, timestamp, exchange_fill_id, pnl or 0.0, linked_order_table_id_to_link)
- )
- logger.info(f"📈 Trade recorded: {side.upper()} {amount:.6f} {symbol} @ ${price:.2f} (${value:.2f}) [{trade_type}]")
- def get_all_trades(self) -> List[Dict[str, Any]]:
- """Fetch all trades from the database, ordered by timestamp."""
- return self._fetch_query("SELECT * FROM trades ORDER BY timestamp ASC")
-
- def calculate_completed_trade_cycles(self) -> List[Dict[str, Any]]:
- """
- Calculate completed trade cycles (full position open to close) using FIFO method from DB trades.
- Handles both long and short cycles. PNL is summed from individual trade records.
- """
- completed_cycles = []
- # symbol -> {
- # 'open_legs': [{'side', 'amount_remaining', 'price', 'timestamp', 'value', 'pnl_contribution'}], # Holds fills of the current open leg
- # 'cycle_trades_details': [trade_dict_from_db], # All trades part of the current forming cycle
- # 'cycle_start_ts': timestamp_str,
- # 'current_leg_type': 'long' | 'short' | None
- # }
- open_positions_data = defaultdict(lambda: {
- 'open_legs': [],
- 'cycle_trades_details': [],
- 'cycle_start_ts': None,
- 'current_leg_type': None
- })
- all_trades = self.get_all_trades() # Trades now include their 'pnl' contribution
- for trade in all_trades:
- symbol = trade['symbol']
- side = trade['side'].lower() # Ensure lowercase
- amount = trade['amount']
- price = trade['price']
- timestamp = trade['timestamp']
- trade_pnl = trade.get('pnl', 0.0) # PNL from this specific fill
- pos_data = open_positions_data[symbol]
- current_trade_detail = {**trade} # Copy trade details
- if pos_data['current_leg_type'] is None: # Starting a new potential cycle
- pos_data['current_leg_type'] = 'long' if side == 'buy' else 'short'
- pos_data['cycle_start_ts'] = timestamp
- pos_data['open_legs'].append({
- 'side': side, 'amount_remaining': amount, 'price': price,
- 'timestamp': timestamp, 'value': amount * price,
- 'pnl_contribution': trade_pnl # PNL of opening trade usually 0
- })
- pos_data['cycle_trades_details'] = [current_trade_detail]
-
- elif (side == 'buy' and pos_data['current_leg_type'] == 'long') or \
- (side == 'sell' and pos_data['current_leg_type'] == 'short'):
- # Increasing an existing long or short position
- pos_data['open_legs'].append({
- 'side': side, 'amount_remaining': amount, 'price': price,
- 'timestamp': timestamp, 'value': amount * price,
- 'pnl_contribution': trade_pnl
- })
- pos_data['cycle_trades_details'].append(current_trade_detail)
- elif (side == 'sell' and pos_data['current_leg_type'] == 'long'): # Selling to reduce/close long
- pos_data['cycle_trades_details'].append(current_trade_detail)
- sell_amount_remaining = amount
-
- while sell_amount_remaining > 0 and pos_data['open_legs']:
- oldest_leg_fill = pos_data['open_legs'][0] # FIFO
-
- match_amount = min(sell_amount_remaining, oldest_leg_fill['amount_remaining'])
-
- oldest_leg_fill['amount_remaining'] -= match_amount
- sell_amount_remaining -= match_amount
-
- if oldest_leg_fill['amount_remaining'] <= 1e-9:
- pos_data['open_legs'].pop(0)
-
- if not pos_data['open_legs']: # Long cycle closed
- # Compile cycle
- cycle_pnl = sum(t.get('pnl', 0.0) for t in pos_data['cycle_trades_details'])
-
- cycle_buys = [t for t in pos_data['cycle_trades_details'] if t['side'] == 'buy']
- cycle_sells = [t for t in pos_data['cycle_trades_details'] if t['side'] == 'sell']
- total_amount_bought = sum(t['amount'] for t in cycle_buys)
- total_buy_value = sum(t['value'] for t in cycle_buys)
- total_amount_sold = sum(t['amount'] for t in cycle_sells) # Should match total_amount_bought
- total_sell_value = sum(t['value'] for t in cycle_sells)
- completed_cycle = {
- 'symbol': symbol,
- 'token': symbol.split('/')[0] if '/' in symbol else symbol,
- 'cycle_start': pos_data['cycle_start_ts'],
- 'cycle_end': timestamp, # End time is the timestamp of the closing trade
- 'cycle_type': 'long',
- 'buy_orders': len(cycle_buys),
- 'sell_orders': len(cycle_sells),
- 'total_orders': len(pos_data['cycle_trades_details']),
- 'total_amount': total_amount_bought,
- 'avg_entry_price': total_buy_value / total_amount_bought if total_amount_bought > 0 else 0,
- 'avg_exit_price': total_sell_value / total_amount_sold if total_amount_sold > 0 else 0,
- 'total_pnl': cycle_pnl,
- 'buy_value': total_buy_value,
- 'sell_value': total_sell_value,
- 'cycle_trades': pos_data['cycle_trades_details'].copy()
- }
- completed_cycles.append(completed_cycle)
-
- # Reset for next cycle, potentially flip if sell_amount_remaining > 0
- pos_data['cycle_trades_details'] = []
- pos_data['cycle_start_ts'] = None
- pos_data['current_leg_type'] = None
- if sell_amount_remaining > 1e-9: # Flipped to short
- pos_data['current_leg_type'] = 'short'
- pos_data['cycle_start_ts'] = timestamp
- pos_data['open_legs'].append({
- 'side': 'sell', 'amount_remaining': sell_amount_remaining, 'price': price,
- 'timestamp': timestamp, 'value': sell_amount_remaining * price,
- 'pnl_contribution': trade_pnl # PNL of this fill if it was part of closing previous and opening this
- })
- pos_data['cycle_trades_details'] = [current_trade_detail] # Start new details list with current trade
- elif (side == 'buy' and pos_data['current_leg_type'] == 'short'): # Buying to reduce/close short
- pos_data['cycle_trades_details'].append(current_trade_detail)
- buy_amount_remaining = amount
- while buy_amount_remaining > 0 and pos_data['open_legs']:
- oldest_leg_fill = pos_data['open_legs'][0] # FIFO
-
- match_amount = min(buy_amount_remaining, oldest_leg_fill['amount_remaining'])
-
- oldest_leg_fill['amount_remaining'] -= match_amount
- buy_amount_remaining -= match_amount
-
- if oldest_leg_fill['amount_remaining'] <= 1e-9:
- pos_data['open_legs'].pop(0)
- if not pos_data['open_legs']: # Short cycle closed
- # Compile cycle
- cycle_pnl = sum(t.get('pnl', 0.0) for t in pos_data['cycle_trades_details'])
- cycle_sells = [t for t in pos_data['cycle_trades_details'] if t['side'] == 'sell'] # Entry for short
- cycle_buys = [t for t in pos_data['cycle_trades_details'] if t['side'] == 'buy'] # Exit for short
- total_amount_sold = sum(t['amount'] for t in cycle_sells)
- total_sell_value = sum(t['value'] for t in cycle_sells)
- total_amount_bought = sum(t['amount'] for t in cycle_buys) # Should match total_amount_sold
- total_buy_value = sum(t['value'] for t in cycle_buys)
- completed_cycle = {
- 'symbol': symbol,
- 'token': symbol.split('/')[0] if '/' in symbol else symbol,
- 'cycle_start': pos_data['cycle_start_ts'],
- 'cycle_end': timestamp,
- 'cycle_type': 'short',
- 'sell_orders': len(cycle_sells), # Entry orders for short
- 'buy_orders': len(cycle_buys), # Exit orders for short
- 'total_orders': len(pos_data['cycle_trades_details']),
- 'total_amount': total_amount_sold, # Amount that formed the basis of the short
- 'avg_entry_price': total_sell_value / total_amount_sold if total_amount_sold > 0 else 0, # Avg sell price
- 'avg_exit_price': total_buy_value / total_amount_bought if total_amount_bought > 0 else 0, # Avg buy price
- 'total_pnl': cycle_pnl,
- 'sell_value': total_sell_value, # Entry value for short
- 'buy_value': total_buy_value, # Exit value for short
- 'cycle_trades': pos_data['cycle_trades_details'].copy()
- }
- completed_cycles.append(completed_cycle)
- # Reset for next cycle, potentially flip if buy_amount_remaining > 0
- pos_data['cycle_trades_details'] = []
- pos_data['cycle_start_ts'] = None
- pos_data['current_leg_type'] = None
- if buy_amount_remaining > 1e-9: # Flipped to long
- pos_data['current_leg_type'] = 'long'
- pos_data['cycle_start_ts'] = timestamp
- pos_data['open_legs'].append({
- 'side': 'buy', 'amount_remaining': buy_amount_remaining, 'price': price,
- 'timestamp': timestamp, 'value': buy_amount_remaining * price,
- 'pnl_contribution': trade_pnl
- })
- pos_data['cycle_trades_details'] = [current_trade_detail]
-
- return completed_cycles
- def get_basic_stats(self, current_balance: Optional[float] = None) -> Dict[str, Any]:
- """Get basic trading statistics from DB."""
- trades = self._fetch_query("SELECT COUNT(*) as count, side FROM trades GROUP BY side")
- total_trades_count = sum(t['count'] for t in trades)
- buy_trades_count = next((t['count'] for t in trades if t['side'] == 'buy'), 0)
- sell_trades_count = next((t['count'] for t in trades if t['side'] == 'sell'), 0)
- initial_balance_str = self._get_metadata('initial_balance')
- initial_balance = float(initial_balance_str) if initial_balance_str else 0.0
-
- start_date_iso = self._get_metadata('start_date')
- start_date_obj = datetime.fromisoformat(start_date_iso) if start_date_iso else datetime.now(timezone.utc)
- days_active = (datetime.now(timezone.utc) - start_date_obj).days + 1
- completed_cycles = self.calculate_completed_trade_cycles() # This can be expensive
- total_pnl_from_cycles = sum(cycle['total_pnl'] for cycle in completed_cycles)
-
- last_trade_row = self._fetchone_query("SELECT timestamp FROM trades ORDER BY timestamp DESC LIMIT 1")
- last_trade_ts = last_trade_row['timestamp'] if last_trade_row else None
- return {
- 'total_trades': total_trades_count,
- 'completed_trades': len(completed_cycles),
- 'buy_trades': buy_trades_count,
- 'sell_trades': sell_trades_count,
- 'initial_balance': initial_balance,
- 'total_pnl': total_pnl_from_cycles, # PNL from closed cycles
- 'days_active': days_active,
- 'start_date': start_date_obj.strftime('%Y-%m-%d'),
- 'last_trade': last_trade_ts
- }
- def get_performance_stats(self) -> Dict[str, Any]:
- """Calculate advanced performance statistics using completed cycles."""
- completed_cycles = self.calculate_completed_trade_cycles()
-
- if not completed_cycles:
- return {
- 'win_rate': 0.0, 'profit_factor': 0.0, 'avg_win': 0.0, 'avg_loss': 0.0,
- 'largest_win': 0.0, 'largest_loss': 0.0, 'consecutive_wins': 0,
- 'consecutive_losses': 0, 'total_wins': 0, 'total_losses': 0, 'expectancy': 0.0
- }
- wins_pnl = [c['total_pnl'] for c in completed_cycles if c['total_pnl'] > 0]
- losses_pnl = [abs(c['total_pnl']) for c in completed_cycles if c['total_pnl'] < 0] # Absolute values for losses
- total_wins_count = len(wins_pnl)
- total_losses_count = len(losses_pnl)
- total_completed_count = total_wins_count + total_losses_count
- win_rate = (total_wins_count / total_completed_count * 100) if total_completed_count > 0 else 0.0
-
- sum_of_wins = sum(wins_pnl)
- sum_of_losses = sum(losses_pnl) # Sum of absolute losses
- profit_factor = (sum_of_wins / sum_of_losses) if sum_of_losses > 0 else float('inf') if sum_of_wins > 0 else 0.0
-
- avg_win = np.mean(wins_pnl) if wins_pnl else 0.0
- avg_loss = np.mean(losses_pnl) if losses_pnl else 0.0 # Avg of absolute losses
-
- largest_win = max(wins_pnl) if wins_pnl else 0.0
- largest_loss = max(losses_pnl) if losses_pnl else 0.0 # Largest absolute loss
- # Consecutive wins/losses
- consecutive_wins = 0
- consecutive_losses = 0
- current_wins = 0
- current_losses = 0
- for cycle in completed_cycles:
- if cycle['total_pnl'] > 0:
- current_wins += 1
- current_losses = 0
- else: # Assumes PNL is non-zero for a loss, or it's a scratch trade
- current_losses += 1
- current_wins = 0
- consecutive_wins = max(consecutive_wins, current_wins)
- consecutive_losses = max(consecutive_losses, current_losses)
-
- expectancy = (avg_win * (win_rate / 100)) - (avg_loss * (1 - (win_rate / 100)))
- return {
- 'win_rate': win_rate, 'profit_factor': profit_factor, 'avg_win': avg_win, 'avg_loss': avg_loss,
- 'largest_win': largest_win, 'largest_loss': largest_loss,
- 'consecutive_wins': consecutive_wins, 'consecutive_losses': consecutive_losses,
- 'total_wins': total_wins_count, 'total_losses': total_losses_count, 'expectancy': expectancy
- }
- def get_risk_metrics(self) -> Dict[str, Any]:
- """Calculate risk-adjusted metrics from daily balances."""
- daily_balances_data = self._fetch_query("SELECT balance FROM daily_balances ORDER BY date ASC")
-
- if not daily_balances_data or len(daily_balances_data) < 2:
- return {'sharpe_ratio': 0.0, 'sortino_ratio': 0.0, 'max_drawdown': 0.0, 'volatility': 0.0, 'var_95': 0.0}
- balances = [entry['balance'] for entry in daily_balances_data]
- returns = np.diff(balances) / balances[:-1] # Calculate daily returns
- returns = returns[np.isfinite(returns)] # Remove NaNs or Infs if any balance was 0
- if returns.size == 0:
- return {'sharpe_ratio': 0.0, 'sortino_ratio': 0.0, 'max_drawdown': 0.0, 'volatility': 0.0, 'var_95': 0.0}
- risk_free_rate_daily = (1 + 0.02)**(1/365) - 1 # Approx 2% annual risk-free rate, daily
-
- excess_returns = returns - risk_free_rate_daily
- sharpe_ratio = np.mean(excess_returns) / np.std(returns) * np.sqrt(365) if np.std(returns) > 0 else 0.0
-
- downside_returns = returns[returns < 0]
- downside_std = np.std(downside_returns) if len(downside_returns) > 0 else 0.0
- sortino_ratio = np.mean(excess_returns) / downside_std * np.sqrt(365) if downside_std > 0 else 0.0
-
- cumulative_returns = np.cumprod(1 + returns)
- peak = np.maximum.accumulate(cumulative_returns)
- drawdown = (cumulative_returns - peak) / peak
- max_drawdown_pct = abs(np.min(drawdown) * 100) if drawdown.size > 0 else 0.0
-
- volatility_pct = np.std(returns) * np.sqrt(365) * 100
- var_95_pct = abs(np.percentile(returns, 5) * 100) if returns.size > 0 else 0.0
-
- return {
- 'sharpe_ratio': sharpe_ratio, 'sortino_ratio': sortino_ratio,
- 'max_drawdown': max_drawdown_pct, 'volatility': volatility_pct, 'var_95': var_95_pct
- }
- def get_comprehensive_stats(self, current_balance: Optional[float] = None) -> Dict[str, Any]:
- """Get all statistics combined."""
- if current_balance is not None: # Ensure it's not just None, but explicitly provided
- self.record_balance(current_balance) # Record current balance for today
-
- basic = self.get_basic_stats(current_balance) # Pass current_balance for P&L context if needed
- performance = self.get_performance_stats()
- risk = self.get_risk_metrics()
-
- initial_balance = basic['initial_balance']
- total_return_pct = 0.0
- # Use current_balance if available and valid for total return calculation
- # Otherwise, PNL from basic_stats (closed trades) is the primary PNL source
- # This needs careful thought: current_balance reflects unrealized PNL too.
- # The original code used current_balance - initial_balance for total_pnl if current_balance provided.
-
- effective_balance_for_return = current_balance if current_balance is not None else (initial_balance + basic['total_pnl'])
- if initial_balance > 0:
- total_return_pct = ((effective_balance_for_return - initial_balance) / initial_balance) * 100
-
- return {
- 'basic': basic,
- 'performance': performance,
- 'risk': risk,
- 'current_balance': current_balance if current_balance is not None else initial_balance + basic['total_pnl'], # Best estimate
- 'total_return': total_return_pct, # Percentage
- 'last_updated': datetime.now(timezone.utc).isoformat()
- }
- def _get_open_positions_count_from_db(self) -> int:
- """🧹 PHASE 4: Get count of open positions from enhanced trades table."""
- row = self._fetchone_query("SELECT COUNT(DISTINCT symbol) as count FROM trades WHERE status = 'position_opened'")
- return row['count'] if row else 0
- def format_stats_message(self, current_balance: Optional[float] = None) -> str:
- """Format stats for Telegram display using data from DB."""
- try:
- stats = self.get_comprehensive_stats(current_balance)
-
- basic = stats['basic']
- perf = stats['performance']
- # risk = stats['risk'] # Risk metrics not directly used in this message format previously
- # Use current_balance passed or derived in get_comprehensive_stats
- effective_current_balance = stats['current_balance']
- initial_bal = basic['initial_balance']
- # Total P&L should reflect current worth vs initial, including open positions if current_balance is live
- total_pnl_val = effective_current_balance - initial_bal if initial_bal > 0 else basic['total_pnl'] # Fallback to closed PNL
- total_return_pct = (total_pnl_val / initial_bal * 100) if initial_bal > 0 else 0.0
-
- pnl_emoji = "🟢" if total_pnl_val >= 0 else "🔴"
-
- open_positions_count = self._get_open_positions_count_from_db()
- # Calculate trade volume and average trade size from 'trades' table for sell orders
- sell_trades_data = self._fetch_query("SELECT value FROM trades WHERE side = 'sell'")
- total_sell_volume = sum(t['value'] for t in sell_trades_data)
- avg_trade_size_sell = (total_sell_volume / len(sell_trades_data)) if sell_trades_data else 0.0
-
- adjustments_summary = self.get_balance_adjustments_summary()
- stats_text = f"""📊 <b>Trading Statistics</b>
- 💰 <b>Account Overview:</b>
- • Current Balance: ${effective_current_balance:,.2f}
- • Initial Balance: ${initial_bal:,.2f}
- • {pnl_emoji} Total P&L: ${total_pnl_val:,.2f} ({total_return_pct:+.2f}%)
- 📈 <b>Trading Activity:</b>
- • Total Orders: {basic['total_trades']}
- • Completed Trades (Cycles): {basic['completed_trades']}
- • Open Positions: {open_positions_count}
- • Days Active: {basic['days_active']}
- 🏆 <b>Performance Metrics:</b>
- • Win Rate: {perf['win_rate']:.1f}% ({perf['total_wins']}W/{perf['total_losses']}L)
- • Profit Factor: {perf['profit_factor']:.2f}
- • Avg Win: ${perf['avg_win']:.2f} | Avg Loss: ${perf['avg_loss']:.2f}
- • Largest Win: ${perf['largest_win']:.2f} | Largest Loss: ${perf['largest_loss']:.2f}
- """
- if adjustments_summary['adjustment_count'] > 0:
- adj_emoji = "💰" if adjustments_summary['net_adjustment'] >= 0 else "💸"
- stats_text += f"""
- 💰 <b>Balance Adjustments:</b>
- • Deposits: ${adjustments_summary['total_deposits']:,.2f}
- • Withdrawals: ${adjustments_summary['total_withdrawals']:,.2f}
- • {adj_emoji} Net: ${adjustments_summary['net_adjustment']:,.2f} ({adjustments_summary['adjustment_count']} transactions)
- """
-
- stats_text += f"""
- 🎯 <b>Trade Distribution:</b>
- • Buy Orders: {basic['buy_trades']} | Sell Orders: {basic['sell_trades']}
- • Volume Traded (Sells): ${total_sell_volume:,.2f}
- • Avg Sell Trade Size: ${avg_trade_size_sell:.2f}
- ⏰ <b>Session Info:</b>
- • Started: {basic['start_date']}
- • Last Update: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}
- """ # Changed Last Update format
- return stats_text.strip()
-
- except Exception as e:
- logger.error(f"Error formatting stats message: {e}", exc_info=True)
- return f"📊 <b>Trading Statistics</b>\n\n❌ <b>Error loading statistics</b>\n\n🔧 <b>Debug info:</b> {str(e)[:100]}"
- def get_recent_trades(self, limit: int = 10) -> List[Dict[str, Any]]:
- """Get recent trades from DB."""
- return self._fetch_query("SELECT * FROM trades ORDER BY timestamp DESC LIMIT ?", (limit,))
- def get_token_performance(self) -> Dict[str, Dict[str, Any]]:
- """Get performance statistics grouped by token using completed cycles."""
- completed_cycles = self.calculate_completed_trade_cycles()
- token_performance = {}
-
- # Group cycles by token (symbol's base part)
- token_cycles_map = defaultdict(list)
- for cycle in completed_cycles:
- token_cycles_map[cycle['token']].append(cycle)
-
- for token, cycles_for_token in token_cycles_map.items():
- if not cycles_for_token: continue
- wins_pnl = [c['total_pnl'] for c in cycles_for_token if c['total_pnl'] > 0]
- losses_pnl = [abs(c['total_pnl']) for c in cycles_for_token if c['total_pnl'] < 0]
- total_pnl = sum(c['total_pnl'] for c in cycles_for_token)
- total_volume_sold = sum(c['sell_value'] for c in cycles_for_token) # Based on sell value in cycle
-
- pnl_percentage = (total_pnl / total_volume_sold * 100) if total_volume_sold > 0 else 0.0
-
- total_wins_count = len(wins_pnl)
- total_losses_count = len(losses_pnl)
- total_completed_count = total_wins_count + total_losses_count
-
- win_rate = (total_wins_count / total_completed_count * 100) if total_completed_count > 0 else 0.0
-
- sum_of_wins = sum(wins_pnl)
- sum_of_losses = sum(losses_pnl)
- profit_factor = (sum_of_wins / sum_of_losses) if sum_of_losses > 0 else float('inf') if sum_of_wins > 0 else 0.0
-
- avg_win = np.mean(wins_pnl) if wins_pnl else 0.0
- avg_loss = np.mean(losses_pnl) if losses_pnl else 0.0
- expectancy = (avg_win * (win_rate / 100)) - (avg_loss * (1 - (win_rate / 100)))
-
- largest_win = max(wins_pnl) if wins_pnl else 0.0
- largest_loss = max(losses_pnl) if losses_pnl else 0.0
- token_performance[token] = {
- 'total_pnl': total_pnl, 'pnl_percentage': pnl_percentage,
- 'completed_trades': total_completed_count, 'total_volume': total_volume_sold,
- 'win_rate': win_rate, 'total_wins': total_wins_count, 'total_losses': total_losses_count,
- 'profit_factor': profit_factor, 'expectancy': expectancy,
- 'largest_win': largest_win, 'largest_loss': largest_loss,
- 'avg_win': avg_win, 'avg_loss': avg_loss
- # 'cycles': cycles_for_token # Optionally include raw cycle data
- }
- return token_performance
- def get_token_detailed_stats(self, token: str) -> Dict[str, Any]:
- """Get detailed statistics for a specific token using DB queries and cycle calculation."""
- upper_token = _normalize_token_case(token)
-
- # Get all trades for this specific token (symbol starts with token + '/')
- # This is simpler than trying to filter cycles by token string directly in SQL for complex symbols
- all_trades_for_token_symbol_prefix = self._fetch_query(
- "SELECT * FROM trades WHERE symbol LIKE ? ORDER BY timestamp ASC", (f"{upper_token}/%",)
- )
-
- if not all_trades_for_token_symbol_prefix:
- return {
- 'token': upper_token, 'total_trades': 0, 'total_pnl': 0.0, 'win_rate': 0.0,
- 'message': f"No trading history found for {upper_token}"
- }
- # Calculate completed cycles specifically for these trades
- # To correctly calculate cycles for *only* this token, we need to run the FIFO logic
- # on trades filtered for this token.
- # The global `calculate_completed_trade_cycles` uses *all* trades.
- all_completed_cycles = self.calculate_completed_trade_cycles()
- token_cycles = [c for c in all_completed_cycles if _normalize_token_case(c['token']) == upper_token]
- total_individual_orders = len(all_trades_for_token_symbol_prefix)
- buy_orders = len([t for t in all_trades_for_token_symbol_prefix if t['side'] == 'buy'])
- sell_orders = len([t for t in all_trades_for_token_symbol_prefix if t['side'] == 'sell'])
- total_volume_all_orders = sum(t['value'] for t in all_trades_for_token_symbol_prefix)
- if not token_cycles:
- return {
- 'token': upper_token, 'total_trades': total_individual_orders, 'buy_trades': buy_orders,
- 'sell_trades': sell_orders, 'total_volume': total_volume_all_orders,
- 'completed_trades': 0, 'total_pnl': 0.0, 'pnl_percentage': 0.0, 'win_rate': 0.0,
- 'message': f"{upper_token} has open positions or trades but no completed trade cycles yet."
- }
- # Performance based on this token's completed cycles
- perf_stats = self.get_token_performance().get(upper_token, {}) # Re-use general calculation logic
- # Filter for recent closed trades
- recent_closed_trades = [t for t in all_trades_for_token_symbol_prefix if t.get('status') == 'position_closed']
- return {
- 'token': upper_token,
- 'total_trades': total_individual_orders,
- 'completed_trades': perf_stats.get('completed_trades', 0),
- 'buy_trades': buy_orders,
- 'sell_trades': sell_orders,
- 'total_volume': total_volume_all_orders, # Volume of all orders for this token
- 'completed_volume': perf_stats.get('total_volume', 0.0), # Volume from completed cycles
- 'total_pnl': perf_stats.get('total_pnl', 0.0),
- 'pnl_percentage': perf_stats.get('pnl_percentage', 0.0),
- 'win_rate': perf_stats.get('win_rate', 0.0),
- 'profit_factor': perf_stats.get('profit_factor', 0.0),
- 'avg_win': perf_stats.get('avg_win', 0.0),
- 'avg_loss': perf_stats.get('avg_loss', 0.0),
- 'largest_win': perf_stats.get('largest_win', 0.0),
- 'largest_loss': perf_stats.get('largest_loss', 0.0),
- 'expectancy': perf_stats.get('expectancy', 0.0),
- 'total_wins': perf_stats.get('total_wins',0),
- 'total_losses': perf_stats.get('total_losses',0),
- 'recent_trades': recent_closed_trades[-5:], # Last 5 CLOSET trades for this token
- 'cycles': token_cycles # Optionally include raw cycle data
- }
- def _get_aggregated_period_stats_from_cycles(self) -> Dict[str, Dict[str, Dict[str, Any]]]:
- """Helper to aggregate completed cycles by day, week, month for P&L and volume."""
- completed_cycles = self.calculate_completed_trade_cycles()
-
- daily_aggr = defaultdict(lambda: {'trades': 0, 'pnl': 0.0, 'volume': 0.0})
- weekly_aggr = defaultdict(lambda: {'trades': 0, 'pnl': 0.0, 'volume': 0.0})
- monthly_aggr = defaultdict(lambda: {'trades': 0, 'pnl': 0.0, 'volume': 0.0})
- for cycle in completed_cycles:
- try:
- # Use cycle_end timestamp (string) and parse it
- end_dt = datetime.fromisoformat(cycle['cycle_end'])
- if end_dt.tzinfo is None: # Ensure timezone aware for proper calculations
- end_dt = end_dt.replace(tzinfo=timezone.utc)
- else:
- end_dt = end_dt.astimezone(timezone.utc)
- pnl = cycle['total_pnl']
- volume = cycle['sell_value'] # Volume based on sell value of the cycle
- # Daily
- day_key = end_dt.strftime('%Y-%m-%d')
- daily_aggr[day_key]['trades'] += 1
- daily_aggr[day_key]['pnl'] += pnl
- daily_aggr[day_key]['volume'] += volume
- # Weekly (YYYY-Www, where ww is week number 00-53, Monday as first day)
- week_key = end_dt.strftime('%Y-W%W') # %W for Monday as first day
- weekly_aggr[week_key]['trades'] += 1
- weekly_aggr[week_key]['pnl'] += pnl
- weekly_aggr[week_key]['volume'] += volume
-
- # Monthly
- month_key = end_dt.strftime('%Y-%m')
- monthly_aggr[month_key]['trades'] += 1
- monthly_aggr[month_key]['pnl'] += pnl
- monthly_aggr[month_key]['volume'] += volume
- except Exception as e:
- logger.warning(f"Could not parse cycle_end '{cycle.get('cycle_end')}' for periodic stats: {e}")
- continue
-
- for aggr_dict in [daily_aggr, weekly_aggr, monthly_aggr]:
- for stats in aggr_dict.values():
- stats['pnl_pct'] = (stats['pnl'] / stats['volume'] * 100) if stats['volume'] > 0 else 0.0
-
- return {'daily': dict(daily_aggr), 'weekly': dict(weekly_aggr), 'monthly': dict(monthly_aggr)}
- def get_daily_stats(self, limit: int = 10) -> List[Dict[str, Any]]:
- """Get daily performance stats for the last N days."""
- period_aggregates = self._get_aggregated_period_stats_from_cycles()['daily']
- daily_stats_list = []
- today_utc = datetime.now(timezone.utc).date()
- for i in range(limit):
- target_date = today_utc - timedelta(days=i)
- date_str = target_date.strftime('%Y-%m-%d')
- date_formatted = target_date.strftime('%m/%d') # For display
- stats_for_day = period_aggregates.get(date_str)
- if stats_for_day:
- daily_stats_list.append({
- 'date': date_str, 'date_formatted': date_formatted, 'has_trades': True,
- **stats_for_day
- })
- else:
- daily_stats_list.append({
- 'date': date_str, 'date_formatted': date_formatted, 'has_trades': False,
- 'trades': 0, 'pnl': 0.0, 'volume': 0.0, 'pnl_pct': 0.0
- })
- return daily_stats_list
- def get_weekly_stats(self, limit: int = 10) -> List[Dict[str, Any]]:
- """Get weekly performance stats for the last N weeks."""
- period_aggregates = self._get_aggregated_period_stats_from_cycles()['weekly']
- weekly_stats_list = []
- today_utc = datetime.now(timezone.utc).date()
- for i in range(limit):
- # Target week starts on Monday 'i' weeks ago
- target_monday = today_utc - timedelta(days=today_utc.weekday() + (i * 7))
- target_sunday = target_monday + timedelta(days=6)
-
- week_key = target_monday.strftime('%Y-W%W') # %W for Monday as first day
- week_formatted = f"{target_monday.strftime('%m/%d')}-{target_sunday.strftime('%m/%d')}"
- stats_for_week = period_aggregates.get(week_key)
- if stats_for_week:
- weekly_stats_list.append({
- 'week': week_key, 'week_formatted': week_formatted, 'has_trades': True,
- **stats_for_week
- })
- else:
- weekly_stats_list.append({
- 'week': week_key, 'week_formatted': week_formatted, 'has_trades': False,
- 'trades': 0, 'pnl': 0.0, 'volume': 0.0, 'pnl_pct': 0.0
- })
- return weekly_stats_list
- def get_monthly_stats(self, limit: int = 10) -> List[Dict[str, Any]]:
- """Get monthly performance stats for the last N months."""
- period_aggregates = self._get_aggregated_period_stats_from_cycles()['monthly']
- monthly_stats_list = []
- current_month_start_utc = datetime.now(timezone.utc).date().replace(day=1)
- for i in range(limit):
- # Calculate target month by subtracting months
- year = current_month_start_utc.year
- month = current_month_start_utc.month - i
- while month <= 0: # Adjust year if month goes to 0 or negative
- month += 12
- year -= 1
-
- target_month_date = datetime(year, month, 1, tzinfo=timezone.utc).date()
- month_key = target_month_date.strftime('%Y-%m')
- month_formatted = target_month_date.strftime('%b %Y')
- stats_for_month = period_aggregates.get(month_key)
- if stats_for_month:
- monthly_stats_list.append({
- 'month': month_key, 'month_formatted': month_formatted, 'has_trades': True,
- **stats_for_month
- })
- else:
- monthly_stats_list.append({
- 'month': month_key, 'month_formatted': month_formatted, 'has_trades': False,
- 'trades': 0, 'pnl': 0.0, 'volume': 0.0, 'pnl_pct': 0.0
- })
- return monthly_stats_list
- def record_deposit(self, amount: float, timestamp: Optional[str] = None,
- deposit_id: Optional[str] = None, description: Optional[str] = None):
- """Record a deposit."""
- ts = timestamp if timestamp else datetime.now(timezone.utc).isoformat()
- desc = description if description else f'Deposit of ${amount:.2f}'
-
- self._execute_query(
- "INSERT INTO balance_adjustments (adjustment_id, timestamp, type, amount, description) VALUES (?, ?, ?, ?, ?)",
- (deposit_id, ts, 'deposit', amount, desc)
- )
- # Adjust initial_balance in metadata to reflect capital changes
- current_initial = float(self._get_metadata('initial_balance') or '0.0')
- self._set_metadata('initial_balance', str(current_initial + amount))
- logger.info(f"💰 Recorded deposit: ${amount:.2f}. New effective initial balance: ${current_initial + amount:.2f}")
- def record_withdrawal(self, amount: float, timestamp: Optional[str] = None,
- withdrawal_id: Optional[str] = None, description: Optional[str] = None):
- """Record a withdrawal."""
- ts = timestamp if timestamp else datetime.now(timezone.utc).isoformat()
- desc = description if description else f'Withdrawal of ${amount:.2f}'
-
- self._execute_query(
- "INSERT INTO balance_adjustments (adjustment_id, timestamp, type, amount, description) VALUES (?, ?, ?, ?, ?)",
- (withdrawal_id, ts, 'withdrawal', amount, desc) # Store positive amount, type indicates withdrawal
- )
- current_initial = float(self._get_metadata('initial_balance') or '0.0')
- self._set_metadata('initial_balance', str(current_initial - amount))
- logger.info(f"💸 Recorded withdrawal: ${amount:.2f}. New effective initial balance: ${current_initial - amount:.2f}")
- def get_balance_adjustments_summary(self) -> Dict[str, Any]:
- """Get summary of all balance adjustments from DB."""
- adjustments = self._fetch_query("SELECT type, amount, timestamp FROM balance_adjustments ORDER BY timestamp ASC")
-
- if not adjustments:
- return {'total_deposits': 0.0, 'total_withdrawals': 0.0, 'net_adjustment': 0.0,
- 'adjustment_count': 0, 'last_adjustment': None}
-
- total_deposits = sum(adj['amount'] for adj in adjustments if adj['type'] == 'deposit')
- total_withdrawals = sum(adj['amount'] for adj in adjustments if adj['type'] == 'withdrawal') # Amounts stored positive
- net_adjustment = total_deposits - total_withdrawals
-
- return {
- 'total_deposits': total_deposits, 'total_withdrawals': total_withdrawals,
- 'net_adjustment': net_adjustment, 'adjustment_count': len(adjustments),
- 'last_adjustment': adjustments[-1]['timestamp'] if adjustments else None
- }
- def close_connection(self):
- """Close the SQLite database connection."""
- if self.conn:
- self.conn.close()
- logger.info("TradingStats SQLite connection closed.")
- def __del__(self):
- """Ensure connection is closed when object is deleted."""
- self.close_connection()
- # --- Order Table Management ---
- def record_order_placed(self, symbol: str, side: str, order_type: str,
- amount_requested: float, price: Optional[float] = None,
- bot_order_ref_id: Optional[str] = None,
- exchange_order_id: Optional[str] = None,
- status: str = 'open',
- parent_bot_order_ref_id: Optional[str] = None) -> Optional[int]:
- """Record a newly placed order in the 'orders' table. Returns the ID of the inserted order or None on failure."""
- now_iso = datetime.now(timezone.utc).isoformat()
- query = """
- INSERT INTO orders (bot_order_ref_id, exchange_order_id, symbol, side, type,
- amount_requested, price, status, timestamp_created, timestamp_updated, parent_bot_order_ref_id)
- VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- """
- params = (bot_order_ref_id, exchange_order_id, symbol, side.lower(), order_type.lower(),
- amount_requested, price, status.lower(), now_iso, now_iso, parent_bot_order_ref_id)
- try:
- cur = self.conn.cursor()
- cur.execute(query, params)
- self.conn.commit()
- order_db_id = cur.lastrowid
- logger.info(f"Recorded order placed: ID {order_db_id}, Symbol {symbol}, Side {side}, Type {order_type}, Amount {amount_requested}, BotRef {bot_order_ref_id}, ExchID {exchange_order_id}")
- return order_db_id
- except sqlite3.IntegrityError as e:
- logger.error(f"Failed to record order due to IntegrityError (likely duplicate bot_order_ref_id '{bot_order_ref_id}' or exchange_order_id '{exchange_order_id}'): {e}")
- return None
- except Exception as e:
- logger.error(f"Failed to record order: {e}")
- return None
- def update_order_status(self, order_db_id: Optional[int] = None, bot_order_ref_id: Optional[str] = None, exchange_order_id: Optional[str] = None,
- new_status: Optional[str] = None, amount_filled_increment: Optional[float] = None, set_exchange_order_id: Optional[str] = None) -> bool:
- """Update an existing order's status and/or amount_filled. Identify order by order_db_id, bot_order_ref_id, or exchange_order_id.
-
- Args:
- order_db_id: Database ID to identify the order
- bot_order_ref_id: Bot's internal reference ID to identify the order
- exchange_order_id: Exchange's order ID to identify the order
- new_status: New status to set
- amount_filled_increment: Amount to add to current filled amount
- set_exchange_order_id: If provided, sets/updates the exchange_order_id field in the database
- """
- if not any([order_db_id, bot_order_ref_id, exchange_order_id]):
- logger.error("Must provide one of order_db_id, bot_order_ref_id, or exchange_order_id to update order.")
- return False
- now_iso = datetime.now(timezone.utc).isoformat()
- set_clauses = []
- params = []
- if new_status:
- set_clauses.append("status = ?")
- params.append(new_status.lower())
-
- if set_exchange_order_id is not None:
- set_clauses.append("exchange_order_id = ?")
- params.append(set_exchange_order_id)
-
- current_amount_filled = 0.0
- identifier_clause = ""
- identifier_param = None
- if order_db_id:
- identifier_clause = "id = ?"
- identifier_param = order_db_id
- elif bot_order_ref_id:
- identifier_clause = "bot_order_ref_id = ?"
- identifier_param = bot_order_ref_id
- elif exchange_order_id:
- identifier_clause = "exchange_order_id = ?"
- identifier_param = exchange_order_id
- if amount_filled_increment is not None and amount_filled_increment > 0:
- # To correctly increment, we might need to fetch current filled amount first if DB doesn't support direct increment easily or atomically with other updates.
- # For simplicity here, assuming we can use SQL's increment if other fields are not changing, or we do it in two steps.
- # Let's assume we fetch first then update to be safe and clear.
- order_data = self._fetchone_query(f"SELECT amount_filled FROM orders WHERE {identifier_clause}", (identifier_param,))
- if order_data:
- current_amount_filled = order_data.get('amount_filled', 0.0)
- else:
- logger.warning(f"Order not found by {identifier_clause}={identifier_param} when trying to increment amount_filled.")
- # Potentially still update status if new_status is provided, but amount_filled won't be right.
- # For now, let's proceed with update if status is there.
- set_clauses.append("amount_filled = ?")
- params.append(current_amount_filled + amount_filled_increment)
- if not set_clauses:
- logger.info("No fields to update for order.")
- return True # Or False if an update was expected
- set_clauses.append("timestamp_updated = ?")
- params.append(now_iso)
-
- params.append(identifier_param) # Add identifier param at the end for WHERE clause
- query = f"UPDATE orders SET { ', '.join(set_clauses) } WHERE {identifier_clause}"
-
- try:
- self._execute_query(query, params)
- log_msg = f"Updated order ({identifier_clause}={identifier_param}): Status to '{new_status or 'N/A'}', Filled increment {amount_filled_increment or 0.0}"
- if set_exchange_order_id is not None:
- log_msg += f", Exchange ID set to '{set_exchange_order_id}'"
- logger.info(log_msg)
- return True
- except Exception as e:
- logger.error(f"Failed to update order ({identifier_clause}={identifier_param}): {e}")
- return False
- def get_order_by_db_id(self, order_db_id: int) -> Optional[Dict[str, Any]]:
- """Fetch an order by its database primary key ID."""
- return self._fetchone_query("SELECT * FROM orders WHERE id = ?", (order_db_id,))
- def get_order_by_bot_ref_id(self, bot_order_ref_id: str) -> Optional[Dict[str, Any]]:
- """Fetch an order by the bot's internal reference ID."""
- return self._fetchone_query("SELECT * FROM orders WHERE bot_order_ref_id = ?", (bot_order_ref_id,))
- def get_order_by_exchange_id(self, exchange_order_id: str) -> Optional[Dict[str, Any]]:
- """Fetch an order by the exchange's order ID."""
- return self._fetchone_query("SELECT * FROM orders WHERE exchange_order_id = ?", (exchange_order_id,))
- def get_orders_by_status(self, status: str, order_type_filter: Optional[str] = None, parent_bot_order_ref_id: Optional[str] = None) -> List[Dict[str, Any]]:
- """Fetch all orders with a specific status, optionally filtering by order_type and parent_bot_order_ref_id."""
- query = "SELECT * FROM orders WHERE status = ?"
- params = [status.lower()]
- if order_type_filter:
- query += " AND type = ?"
- params.append(order_type_filter.lower())
- if parent_bot_order_ref_id:
- query += " AND parent_bot_order_ref_id = ?"
- params.append(parent_bot_order_ref_id)
- query += " ORDER BY timestamp_created ASC"
- return self._fetch_query(query, tuple(params))
- def cancel_linked_orders(self, parent_bot_order_ref_id: str, new_status: str = 'cancelled_parent_filled') -> int:
- """Cancel all orders linked to a parent order (e.g., pending stop losses when parent order fills or gets cancelled).
- Returns the number of orders that were cancelled."""
- linked_orders = self.get_orders_by_status('pending_trigger', parent_bot_order_ref_id=parent_bot_order_ref_id)
- cancelled_count = 0
-
- for order in linked_orders:
- order_db_id = order.get('id')
- if order_db_id:
- success = self.update_order_status(order_db_id=order_db_id, new_status=new_status)
- if success:
- cancelled_count += 1
- logger.info(f"Cancelled linked order ID {order_db_id} (parent: {parent_bot_order_ref_id}) -> status: {new_status}")
-
- return cancelled_count
- def cancel_pending_stop_losses_by_symbol(self, symbol: str, new_status: str = 'cancelled_position_closed') -> int:
- """Cancel all pending stop loss orders for a specific symbol (when position is closed).
- Returns the number of stop loss orders that were cancelled."""
- query = "SELECT * FROM orders WHERE symbol = ? AND status = 'pending_trigger' AND type = 'stop_limit_trigger'"
- pending_stop_losses = self._fetch_query(query, (symbol,))
- cancelled_count = 0
-
- for order in pending_stop_losses:
- order_db_id = order.get('id')
- if order_db_id:
- success = self.update_order_status(order_db_id=order_db_id, new_status=new_status)
- if success:
- cancelled_count += 1
- logger.info(f"Cancelled pending SL order ID {order_db_id} for {symbol} -> status: {new_status}")
-
- return cancelled_count
- def get_order_cleanup_summary(self) -> Dict[str, Any]:
- """Get summary of order cleanup actions for monitoring and debugging."""
- try:
- # Get counts of different cancellation types
- cleanup_stats = {}
-
- cancellation_types = [
- 'cancelled_parent_cancelled',
- 'cancelled_parent_disappeared',
- 'cancelled_manual_exit',
- 'cancelled_auto_exit',
- 'cancelled_no_position',
- 'cancelled_external_position_close',
- 'cancelled_orphaned_no_position',
- 'cancelled_externally',
- 'immediately_executed_on_activation',
- 'activation_execution_failed',
- 'activation_execution_error'
- ]
-
- for cancel_type in cancellation_types:
- count_result = self._fetchone_query(
- "SELECT COUNT(*) as count FROM orders WHERE status = ?",
- (cancel_type,)
- )
- cleanup_stats[cancel_type] = count_result['count'] if count_result else 0
-
- # Get currently pending stop losses
- pending_sls = self.get_orders_by_status('pending_trigger', 'stop_limit_trigger')
- cleanup_stats['currently_pending_stop_losses'] = len(pending_sls)
-
- # Get total orders in various states
- active_orders = self._fetchone_query(
- "SELECT COUNT(*) as count FROM orders WHERE status IN ('open', 'submitted', 'partially_filled')",
- ()
- )
- cleanup_stats['currently_active_orders'] = active_orders['count'] if active_orders else 0
-
- return cleanup_stats
-
- except Exception as e:
- logger.error(f"Error getting order cleanup summary: {e}")
- return {}
- def get_external_activity_summary(self, days: int = 7) -> Dict[str, Any]:
- """Get summary of external activity (trades and cancellations) over the last N days."""
- try:
- from datetime import timedelta
- cutoff_date = (datetime.now(timezone.utc) - timedelta(days=days)).isoformat()
-
- # External trades
- external_trades = self._fetch_query(
- "SELECT COUNT(*) as count, side FROM trades WHERE trade_type = 'external' AND timestamp >= ? GROUP BY side",
- (cutoff_date,)
- )
-
- external_trade_summary = {
- 'external_buy_trades': 0,
- 'external_sell_trades': 0,
- 'total_external_trades': 0
- }
-
- for trade_group in external_trades:
- side = trade_group['side']
- count = trade_group['count']
- external_trade_summary['total_external_trades'] += count
- if side == 'buy':
- external_trade_summary['external_buy_trades'] = count
- elif side == 'sell':
- external_trade_summary['external_sell_trades'] = count
-
- # External cancellations
- external_cancellations = self._fetchone_query(
- "SELECT COUNT(*) as count FROM orders WHERE status = 'cancelled_externally' AND timestamp_updated >= ?",
- (cutoff_date,)
- )
- external_trade_summary['external_cancellations'] = external_cancellations['count'] if external_cancellations else 0
-
- # Cleanup actions
- cleanup_cancellations = self._fetchone_query(
- """SELECT COUNT(*) as count FROM orders
- WHERE status LIKE 'cancelled_%'
- AND status != 'cancelled_externally'
- AND timestamp_updated >= ?""",
- (cutoff_date,)
- )
- external_trade_summary['cleanup_cancellations'] = cleanup_cancellations['count'] if cleanup_cancellations else 0
-
- external_trade_summary['period_days'] = days
-
- return external_trade_summary
-
- except Exception as e:
- logger.error(f"Error getting external activity summary: {e}")
- return {'period_days': days, 'total_external_trades': 0, 'external_cancellations': 0}
- # --- End Order Table Management ---
- # =============================================================================
- # TRADE LIFECYCLE MANAGEMENT - PHASE 4: UNIFIED TRADES TABLE
- # =============================================================================
-
- def create_trade_lifecycle(self, symbol: str, side: str, entry_order_id: Optional[str] = None,
- stop_loss_price: Optional[float] = None, take_profit_price: Optional[float] = None,
- trade_type: str = 'manual') -> Optional[str]:
- """Create a new trade lifecycle when an entry order is placed."""
- try:
- lifecycle_id = str(uuid.uuid4())
-
- query = """
- INSERT INTO trades (
- symbol, side, amount, price, value, trade_type, timestamp,
- status, trade_lifecycle_id, position_side, entry_order_id,
- stop_loss_price, take_profit_price, updated_at
- ) VALUES (?, ?, 0, 0, 0, ?, ?, 'pending', ?, 'flat', ?, ?, ?, ?)
- """
- timestamp = datetime.now(timezone.utc).isoformat()
- params = (symbol, side.lower(), trade_type, timestamp, lifecycle_id,
- entry_order_id, stop_loss_price, take_profit_price, timestamp)
-
- self._execute_query(query, params)
-
- logger.info(f"📊 Created trade lifecycle {lifecycle_id}: {side.upper()} {symbol} (pending)")
- return lifecycle_id
-
- except Exception as e:
- logger.error(f"❌ Error creating trade lifecycle: {e}")
- return None
-
- def update_trade_position_opened(self, lifecycle_id: str, entry_price: float,
- entry_amount: float, exchange_fill_id: str) -> bool:
- """Update trade when position is opened (entry order filled)."""
- try:
- query = """
- UPDATE trades
- SET status = 'position_opened',
- amount = ?,
- price = ?,
- value = ?,
- entry_price = ?,
- current_position_size = ?,
- position_side = CASE
- WHEN side = 'buy' THEN 'long'
- WHEN side = 'sell' THEN 'short'
- ELSE position_side
- END,
- exchange_fill_id = ?,
- position_opened_at = ?,
- updated_at = ?
- WHERE trade_lifecycle_id = ? AND status = 'pending'
- """
- timestamp = datetime.now(timezone.utc).isoformat()
- value = entry_amount * entry_price
- params = (entry_amount, entry_price, value, entry_price, entry_amount,
- exchange_fill_id, timestamp, timestamp, lifecycle_id)
-
- self._execute_query(query, params)
-
- logger.info(f"📈 Trade lifecycle {lifecycle_id} position opened: {entry_amount} @ ${entry_price:.2f}")
- return True
-
- except Exception as e:
- logger.error(f"❌ Error updating trade position opened: {e}")
- return False
-
- def update_trade_position_closed(self, lifecycle_id: str, exit_price: float,
- realized_pnl: float, exchange_fill_id: str) -> bool:
- """Update trade when position is fully closed."""
- try:
- query = """
- UPDATE trades
- SET status = 'position_closed',
- current_position_size = 0,
- position_side = 'flat',
- realized_pnl = ?,
- position_closed_at = ?,
- updated_at = ?
- WHERE trade_lifecycle_id = ? AND status = 'position_opened'
- """
- timestamp = datetime.now(timezone.utc).isoformat()
- params = (realized_pnl, timestamp, timestamp, lifecycle_id)
-
- self._execute_query(query, params)
-
- pnl_emoji = "🟢" if realized_pnl >= 0 else "🔴"
- logger.info(f"{pnl_emoji} Trade lifecycle {lifecycle_id} position closed: P&L ${realized_pnl:.2f}")
- return True
-
- except Exception as e:
- logger.error(f"❌ Error updating trade position closed: {e}")
- return False
-
- def update_trade_cancelled(self, lifecycle_id: str, reason: str = "order_cancelled") -> bool:
- """Update trade when entry order is cancelled (never opened)."""
- try:
- query = """
- UPDATE trades
- SET status = 'cancelled',
- notes = ?,
- updated_at = ?
- WHERE trade_lifecycle_id = ? AND status = 'pending'
- """
- timestamp = datetime.now(timezone.utc).isoformat()
- params = (f"Cancelled: {reason}", timestamp, lifecycle_id)
-
- self._execute_query(query, params)
-
- logger.info(f"❌ Trade lifecycle {lifecycle_id} cancelled: {reason}")
- return True
-
- except Exception as e:
- logger.error(f"❌ Error updating trade cancelled: {e}")
- return False
-
- def link_stop_loss_to_trade(self, lifecycle_id: str, stop_loss_order_id: str,
- stop_loss_price: float) -> bool:
- """Link a stop loss order to a trade lifecycle."""
- try:
- query = """
- UPDATE trades
- SET stop_loss_order_id = ?,
- stop_loss_price = ?,
- updated_at = ?
- WHERE trade_lifecycle_id = ? AND status = 'position_opened'
- """
- timestamp = datetime.now(timezone.utc).isoformat()
- params = (stop_loss_order_id, stop_loss_price, timestamp, lifecycle_id)
-
- self._execute_query(query, params)
-
- logger.info(f"🛑 Linked stop loss order {stop_loss_order_id} (${stop_loss_price:.2f}) to trade {lifecycle_id}")
- return True
-
- except Exception as e:
- logger.error(f"❌ Error linking stop loss to trade: {e}")
- return False
-
- def link_take_profit_to_trade(self, lifecycle_id: str, take_profit_order_id: str,
- take_profit_price: float) -> bool:
- """Link a take profit order to a trade lifecycle."""
- try:
- query = """
- UPDATE trades
- SET take_profit_order_id = ?,
- take_profit_price = ?,
- updated_at = ?
- WHERE trade_lifecycle_id = ? AND status = 'position_opened'
- """
- timestamp = datetime.now(timezone.utc).isoformat()
- params = (take_profit_order_id, take_profit_price, timestamp, lifecycle_id)
-
- self._execute_query(query, params)
-
- logger.info(f"🎯 Linked take profit order {take_profit_order_id} (${take_profit_price:.2f}) to trade {lifecycle_id}")
- return True
-
- except Exception as e:
- logger.error(f"❌ Error linking take profit to trade: {e}")
- return False
-
- def get_trade_by_lifecycle_id(self, lifecycle_id: str) -> Optional[Dict[str, Any]]:
- """Get trade by lifecycle ID."""
- query = "SELECT * FROM trades WHERE trade_lifecycle_id = ?"
- return self._fetchone_query(query, (lifecycle_id,))
-
- def get_trade_by_symbol_and_status(self, symbol: str, status: str = 'position_opened') -> Optional[Dict[str, Any]]:
- """Get trade by symbol and status."""
- query = "SELECT * FROM trades WHERE symbol = ? AND status = ? ORDER BY updated_at DESC LIMIT 1"
- return self._fetchone_query(query, (symbol, status))
-
- def get_open_positions(self, symbol: Optional[str] = None) -> List[Dict[str, Any]]:
- """Get all open positions, optionally filtered by symbol."""
- if symbol:
- query = "SELECT * FROM trades WHERE status = 'position_opened' AND symbol = ? ORDER BY position_opened_at DESC"
- return self._fetch_query(query, (symbol,))
- else:
- query = "SELECT * FROM trades WHERE status = 'position_opened' ORDER BY position_opened_at DESC"
- return self._fetch_query(query)
-
- def get_trades_by_status(self, status: str, limit: int = 50) -> List[Dict[str, Any]]:
- """Get trades by status."""
- query = "SELECT * FROM trades WHERE status = ? ORDER BY updated_at DESC LIMIT ?"
- return self._fetch_query(query, (status, limit))
-
- def get_lifecycle_by_entry_order_id(self, entry_exchange_order_id: str, status: Optional[str] = None) -> Optional[Dict[str, Any]]:
- """Get a trade lifecycle by its entry_order_id (exchange ID) and optionally by status."""
- if status:
- query = "SELECT * FROM trades WHERE entry_order_id = ? AND status = ? LIMIT 1"
- params = (entry_exchange_order_id, status)
- else:
- query = "SELECT * FROM trades WHERE entry_order_id = ? LIMIT 1"
- params = (entry_exchange_order_id,)
- return self._fetchone_query(query, params)
- def get_lifecycle_by_sl_order_id(self, sl_exchange_order_id: str, status: str = 'position_opened') -> Optional[Dict[str, Any]]:
- """Get an active trade lifecycle by its stop_loss_order_id (exchange ID)."""
- query = "SELECT * FROM trades WHERE stop_loss_order_id = ? AND status = ? LIMIT 1"
- return self._fetchone_query(query, (sl_exchange_order_id, status))
- def get_lifecycle_by_tp_order_id(self, tp_exchange_order_id: str, status: str = 'position_opened') -> Optional[Dict[str, Any]]:
- """Get an active trade lifecycle by its take_profit_order_id (exchange ID)."""
- query = "SELECT * FROM trades WHERE take_profit_order_id = ? AND status = ? LIMIT 1"
- return self._fetchone_query(query, (tp_exchange_order_id, status))
-
- def get_pending_stop_loss_activations(self) -> List[Dict[str, Any]]:
- """Get open positions that need stop loss activation."""
- query = """
- SELECT * FROM trades
- WHERE status = 'position_opened'
- AND stop_loss_price IS NOT NULL
- AND stop_loss_order_id IS NULL
- ORDER BY updated_at ASC
- """
- return self._fetch_query(query)
-
- def cleanup_old_cancelled_trades(self, days_old: int = 7) -> int:
- """Clean up old cancelled trades (optional - for housekeeping)."""
- try:
- cutoff_date = (datetime.now(timezone.utc) - timedelta(days=days_old)).isoformat()
-
- # Count before deletion
- count_query = """
- SELECT COUNT(*) as count FROM trades
- WHERE status = 'cancelled' AND updated_at < ?
- """
- count_result = self._fetchone_query(count_query, (cutoff_date,))
- count_to_delete = count_result['count'] if count_result else 0
-
- if count_to_delete > 0:
- delete_query = """
- DELETE FROM trades
- WHERE status = 'cancelled' AND updated_at < ?
- """
- self._execute_query(delete_query, (cutoff_date,))
- logger.info(f"🧹 Cleaned up {count_to_delete} old cancelled trades (older than {days_old} days)")
-
- return count_to_delete
-
- except Exception as e:
- logger.error(f"❌ Error cleaning up old cancelled trades: {e}")
- return 0
-
- def confirm_position_with_exchange(self, symbol: str, exchange_position_size: float,
- exchange_open_orders: List[Dict]) -> bool:
- """🆕 PHASE 4: Confirm position status with exchange before updating status."""
- try:
- # Get current trade status
- current_trade = self.get_trade_by_symbol_and_status(symbol, 'position_opened')
-
- if not current_trade:
- return True # No open position to confirm
-
- lifecycle_id = current_trade['trade_lifecycle_id']
- has_open_orders = len([o for o in exchange_open_orders if o.get('symbol') == symbol]) > 0
-
- # Only close position if exchange confirms no position AND no pending orders
- if abs(exchange_position_size) < 1e-8 and not has_open_orders:
- # Calculate realized P&L based on position side
- position_side = current_trade['position_side']
- entry_price_db = current_trade['entry_price'] # entry_price from db
- # current_amount = current_trade['current_position_size'] # Not directly used for PNL calc here
-
- # For a closed position, we need to calculate final P&L
- # This would typically come from the closing trade, but for confirmation we estimate
- estimated_pnl = current_trade.get('realized_pnl', 0) # Use existing realized_pnl if any
-
- success = self.update_trade_position_closed(
- lifecycle_id,
- entry_price_db, # Using entry price from DB as estimate since position is confirmed closed
- estimated_pnl,
- "exchange_confirmed_closed"
- )
-
- if success:
- logger.info(f"✅ Confirmed position closed for {symbol} with exchange")
-
- return success
-
- return True # Position still exists on exchange, no update needed
-
- except Exception as e:
- logger.error(f"❌ Error confirming position with exchange: {e}")
- return False
- def update_trade_market_data(self,
- trade_lifecycle_id: str,
- unrealized_pnl: Optional[float] = None,
- mark_price: Optional[float] = None,
- current_position_size: Optional[float] = None,
- entry_price: Optional[float] = None,
- liquidation_price: Optional[float] = None,
- margin_used: Optional[float] = None,
- leverage: Optional[float] = None,
- position_value: Optional[float] = None) -> bool:
- """Update market-related data for an open trade lifecycle.
- Only updates fields for which a non-None value is provided.
- """
- try:
- updates = []
- params = []
-
- if unrealized_pnl is not None:
- updates.append("unrealized_pnl = ?")
- params.append(unrealized_pnl)
- if mark_price is not None:
- updates.append("mark_price = ?")
- params.append(mark_price)
- if current_position_size is not None:
- updates.append("current_position_size = ?")
- params.append(current_position_size)
- if entry_price is not None: # If exchange provides updated avg entry
- updates.append("entry_price = ?")
- params.append(entry_price)
- if liquidation_price is not None:
- updates.append("liquidation_price = ?")
- params.append(liquidation_price)
- if margin_used is not None:
- updates.append("margin_used = ?")
- params.append(margin_used)
- if leverage is not None:
- updates.append("leverage = ?")
- params.append(leverage)
- if position_value is not None:
- updates.append("position_value = ?")
- params.append(position_value)
- if not updates:
- logger.debug(f"No market data fields provided to update for lifecycle {trade_lifecycle_id}.")
- return True # No update needed, not an error
- timestamp = datetime.now(timezone.utc).isoformat()
- updates.append("updated_at = ?")
- params.append(timestamp)
- set_clause = ", ".join(updates)
- query = f"""
- UPDATE trades
- SET {set_clause}
- WHERE trade_lifecycle_id = ? AND status = 'position_opened'
- """
- params.append(trade_lifecycle_id)
-
- # Use the class's own connection self.conn
- cursor = self.conn.cursor()
- cursor.execute(query, tuple(params))
- self.conn.commit()
- updated_rows = cursor.rowcount
- if updated_rows > 0:
- logger.debug(f"💹 Updated market data for lifecycle {trade_lifecycle_id}. Fields: {updates}")
- return True
- else:
- # This might happen if the lifecycle ID doesn't exist or status is not 'position_opened'
- # logger.warning(f"⚠️ No trade found or not in 'position_opened' state for lifecycle {trade_lifecycle_id} to update market data.")
- return False # Not necessarily an error
- except Exception as e:
- logger.error(f"❌ Error updating market data for trade lifecycle {trade_lifecycle_id}: {e}")
- return False
- # --- End Trade Lifecycle Management ---
|