#!/usr/bin/env python3 """ Aggregation Manager for Trading Statistics Handles data aggregation, migration from individual trades to aggregated statistics, and balance adjustment tracking. """ import sqlite3 import logging from datetime import datetime, timezone, timedelta from typing import Dict, List, Any, Optional import uuid from src.utils.token_display_formatter import get_formatter logger = logging.getLogger(__name__) class AggregationManager: """Manages data aggregation and migration in the trading statistics database.""" def __init__(self, db_manager): """Initialize with database manager.""" self.db = db_manager def migrate_trade_to_aggregated_stats(self, trade_lifecycle_id: str): """Migrate a completed/cancelled trade's stats to aggregate tables and delete the original trade.""" trade_data = self.db._fetchone_query("SELECT * FROM trades WHERE trade_lifecycle_id = ?", (trade_lifecycle_id,)) if not trade_data: logger.error(f"Cannot migrate trade {trade_lifecycle_id}: Not found.") return status = trade_data.get('status') symbol = trade_data.get('symbol') token = symbol.split('/')[0] if symbol and '/' in symbol else symbol if not token: logger.error(f"Cannot migrate trade {trade_lifecycle_id}: Token could not be derived from symbol '{symbol}'.") return now_iso = datetime.now(timezone.utc).isoformat() try: with self.db.conn: if status == 'position_closed': self._migrate_closed_position(trade_data, token, now_iso) elif status == 'cancelled': self._migrate_cancelled_position(trade_data, token, now_iso) # Delete the original trade from the 'trades' table self.db._execute_query("DELETE FROM trades WHERE trade_lifecycle_id = ?", (trade_lifecycle_id,)) logger.info(f"Deleted trade lifecycle {trade_lifecycle_id} from trades table after aggregation.") except sqlite3.Error as e: logger.error(f"Database error migrating trade {trade_lifecycle_id} to aggregate stats: {e}", exc_info=True) except Exception as e: logger.error(f"Unexpected error migrating trade {trade_lifecycle_id} to aggregate stats: {e}", exc_info=True) def _migrate_closed_position(self, trade_data: Dict[str, Any], token: str, now_iso: str): """Migrate a closed position to aggregated stats.""" realized_pnl = trade_data.get('realized_pnl', 0.0) entry_value = trade_data.get('value', 0.0) exit_value = entry_value + realized_pnl closed_at_str = trade_data.get('position_closed_at', now_iso) closed_at_dt = datetime.fromisoformat(closed_at_str) date_str = closed_at_dt.strftime('%Y-%m-%d') # Calculate duration if timestamps are available opened_at_str = trade_data.get('position_opened_at') duration_seconds = 0 if opened_at_str and closed_at_str: try: opened_at_dt = datetime.fromisoformat(opened_at_str) duration_seconds = (closed_at_dt - opened_at_dt).total_seconds() except Exception: duration_seconds = 0 # Calculate ROE percentage roe_percentage = (realized_pnl / entry_value * 100) if entry_value > 0 else 0.0 # Update token_stats token_upsert_query = """ INSERT INTO token_stats ( token, total_realized_pnl, total_completed_cycles, winning_cycles, losing_cycles, total_entry_volume, total_exit_volume, sum_of_winning_pnl, sum_of_losing_pnl, largest_winning_cycle_pnl, largest_losing_cycle_pnl, largest_winning_cycle_entry_volume, largest_losing_cycle_entry_volume, first_cycle_closed_at, last_cycle_closed_at, total_duration_seconds, roe_percentage, updated_at ) VALUES (?, ?, 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(token) DO UPDATE SET total_realized_pnl = total_realized_pnl + excluded.total_realized_pnl, total_completed_cycles = total_completed_cycles + 1, winning_cycles = winning_cycles + excluded.winning_cycles, losing_cycles = losing_cycles + excluded.losing_cycles, total_entry_volume = total_entry_volume + excluded.total_entry_volume, total_exit_volume = total_exit_volume + excluded.total_exit_volume, sum_of_winning_pnl = sum_of_winning_pnl + excluded.sum_of_winning_pnl, sum_of_losing_pnl = sum_of_losing_pnl + excluded.sum_of_losing_pnl, largest_winning_cycle_pnl = CASE WHEN excluded.largest_winning_cycle_pnl > largest_winning_cycle_pnl THEN excluded.largest_winning_cycle_pnl ELSE largest_winning_cycle_pnl END, largest_losing_cycle_pnl = CASE WHEN excluded.largest_losing_cycle_pnl > largest_losing_cycle_pnl THEN excluded.largest_losing_cycle_pnl ELSE largest_losing_cycle_pnl END, largest_winning_cycle_entry_volume = CASE WHEN excluded.largest_winning_cycle_pnl > largest_winning_cycle_pnl THEN excluded.largest_winning_cycle_entry_volume ELSE largest_winning_cycle_entry_volume END, largest_losing_cycle_entry_volume = CASE WHEN excluded.largest_losing_cycle_pnl > largest_losing_cycle_pnl THEN excluded.largest_losing_cycle_entry_volume ELSE largest_losing_cycle_entry_volume END, first_cycle_closed_at = MIN(first_cycle_closed_at, excluded.first_cycle_closed_at), last_cycle_closed_at = MAX(last_cycle_closed_at, excluded.last_cycle_closed_at), total_duration_seconds = total_duration_seconds + excluded.total_duration_seconds, roe_percentage = excluded.roe_percentage, updated_at = excluded.updated_at """ is_win = 1 if realized_pnl > 0 else 0 is_loss = 1 if realized_pnl < 0 else 0 win_pnl_contrib = realized_pnl if realized_pnl > 0 else 0.0 loss_pnl_contrib = abs(realized_pnl) if realized_pnl < 0 else 0.0 # For largest winning/losing, we only consider them if this is the new largest largest_win_entry_volume = entry_value if realized_pnl > 0 else 0.0 largest_loss_entry_volume = entry_value if realized_pnl < 0 else 0.0 self.db._execute_query(token_upsert_query, ( token, realized_pnl, is_win, is_loss, entry_value, exit_value, win_pnl_contrib, loss_pnl_contrib, win_pnl_contrib, loss_pnl_contrib, largest_win_entry_volume, largest_loss_entry_volume, closed_at_str, closed_at_str, duration_seconds, roe_percentage, now_iso )) # Update daily_aggregated_stats daily_upsert_query = """ INSERT INTO daily_aggregated_stats ( date, token, realized_pnl, completed_cycles, entry_volume, exit_volume ) VALUES (?, ?, ?, 1, ?, ?) ON CONFLICT(date, token) DO UPDATE SET realized_pnl = realized_pnl + excluded.realized_pnl, completed_cycles = completed_cycles + 1, entry_volume = entry_volume + excluded.entry_volume, exit_volume = exit_volume + excluded.exit_volume """ self.db._execute_query(daily_upsert_query, ( date_str, token, realized_pnl, entry_value, exit_value )) logger.info(f"Aggregated stats for closed trade lifecycle ({token}). PNL: {realized_pnl:.2f}") def _migrate_cancelled_position(self, trade_data: Dict[str, Any], token: str, now_iso: str): """Migrate a cancelled position to aggregated stats.""" # Update token_stats for cancelled count cancelled_upsert_query = """ INSERT INTO token_stats (token, total_cancelled_cycles, updated_at) VALUES (?, 1, ?) ON CONFLICT(token) DO UPDATE SET total_cancelled_cycles = total_cancelled_cycles + 1, updated_at = excluded.updated_at """ self.db._execute_query(cancelled_upsert_query, (token, now_iso)) logger.info(f"Incremented cancelled_cycles for {token}.") def record_deposit(self, amount: float, timestamp: Optional[str] = None, deposit_id: Optional[str] = None, description: Optional[str] = None): """Record a deposit.""" ts = timestamp if timestamp else datetime.now(timezone.utc).isoformat() formatter = get_formatter() formatted_amount_str = formatter.format_price_with_symbol(amount) desc = description if description else f'Deposit of {formatted_amount_str}' self.db._execute_query( "INSERT INTO balance_adjustments (adjustment_id, timestamp, type, amount, description) VALUES (?, ?, ?, ?, ?)", (deposit_id or str(uuid.uuid4()), ts, 'deposit', amount, desc) ) # Adjust initial_balance in metadata to reflect capital changes current_initial = float(self.db._get_metadata('initial_balance') or '0.0') self.db._set_metadata('initial_balance', str(current_initial + amount)) logger.info(f"💰 Recorded deposit: {formatted_amount_str}. New effective initial balance: {formatter.format_price_with_symbol(current_initial + amount)}") def record_withdrawal(self, amount: float, timestamp: Optional[str] = None, withdrawal_id: Optional[str] = None, description: Optional[str] = None): """Record a withdrawal.""" ts = timestamp if timestamp else datetime.now(timezone.utc).isoformat() formatter = get_formatter() formatted_amount_str = formatter.format_price_with_symbol(amount) desc = description if description else f'Withdrawal of {formatted_amount_str}' self.db._execute_query( "INSERT INTO balance_adjustments (adjustment_id, timestamp, type, amount, description) VALUES (?, ?, ?, ?, ?)", (withdrawal_id or str(uuid.uuid4()), ts, 'withdrawal', amount, desc) ) current_initial = float(self.db._get_metadata('initial_balance') or '0.0') self.db._set_metadata('initial_balance', str(current_initial - amount)) logger.info(f"💸 Recorded withdrawal: {formatted_amount_str}. New effective initial balance: {formatter.format_price_with_symbol(current_initial - amount)}") def get_balance_adjustments_summary(self) -> Dict[str, Any]: """Get summary of all balance adjustments from DB.""" adjustments = self.db._fetch_query("SELECT type, amount, timestamp FROM balance_adjustments ORDER BY timestamp ASC") if not adjustments: return {'total_deposits': 0.0, 'total_withdrawals': 0.0, 'net_adjustment': 0.0, 'adjustment_count': 0, 'last_adjustment': None} total_deposits = sum(adj['amount'] for adj in adjustments if adj['type'] == 'deposit') total_withdrawals = sum(adj['amount'] for adj in adjustments if adj['type'] == 'withdrawal') net_adjustment = total_deposits - total_withdrawals return { 'total_deposits': total_deposits, 'total_withdrawals': total_withdrawals, 'net_adjustment': net_adjustment, 'adjustment_count': len(adjustments), 'last_adjustment': adjustments[-1]['timestamp'] if adjustments else None } def get_daily_stats(self, limit: int = 10) -> List[Dict[str, Any]]: """Get daily performance stats for the last N days from daily_aggregated_stats.""" daily_stats_list = [] today_utc = datetime.now(timezone.utc).date() for i in range(limit): target_date = today_utc - timedelta(days=i) date_str = target_date.strftime('%Y-%m-%d') date_formatted = target_date.strftime('%m/%d') day_aggregated_data = self.db._fetch_query( "SELECT SUM(realized_pnl) as pnl, SUM(completed_cycles) as trades, SUM(exit_volume) as volume FROM daily_aggregated_stats WHERE date = ?", (date_str,) ) stats_for_day = None if day_aggregated_data and len(day_aggregated_data) > 0 and day_aggregated_data[0]['trades'] is not None: stats_for_day = day_aggregated_data[0] pnl = stats_for_day.get('pnl', 0.0) or 0.0 volume = stats_for_day.get('volume', 0.0) or 0.0 stats_for_day['pnl_pct'] = (pnl / volume * 100) if volume > 0 else 0.0 stats_for_day['trades'] = int(stats_for_day.get('trades', 0) or 0) if stats_for_day and stats_for_day['trades'] > 0: daily_stats_list.append({ 'date': date_str, 'date_formatted': date_formatted, 'has_trades': True, **stats_for_day }) else: daily_stats_list.append({ 'date': date_str, 'date_formatted': date_formatted, 'has_trades': False, 'trades': 0, 'pnl': 0.0, 'volume': 0.0, 'pnl_pct': 0.0 }) return daily_stats_list def get_weekly_stats(self, limit: int = 10) -> List[Dict[str, Any]]: """Get weekly performance stats for the last N weeks by aggregating daily_aggregated_stats.""" weekly_stats_list = [] today_utc = datetime.now(timezone.utc).date() for i in range(limit): target_monday = today_utc - timedelta(days=today_utc.weekday() + (i * 7)) target_sunday = target_monday + timedelta(days=6) week_key_display = f"{target_monday.strftime('%Y-W%W')}" week_formatted_display = f"{target_monday.strftime('%m/%d')}-{target_sunday.strftime('%m/%d/%y')}" daily_records_for_week = self.db._fetch_query( "SELECT date, realized_pnl, completed_cycles, exit_volume FROM daily_aggregated_stats WHERE date BETWEEN ? AND ?", (target_monday.strftime('%Y-%m-%d'), target_sunday.strftime('%Y-%m-%d')) ) if daily_records_for_week: total_pnl_week = sum(d.get('realized_pnl', 0.0) or 0.0 for d in daily_records_for_week) total_trades_week = sum(d.get('completed_cycles', 0) or 0 for d in daily_records_for_week) total_volume_week = sum(d.get('exit_volume', 0.0) or 0.0 for d in daily_records_for_week) pnl_pct_week = (total_pnl_week / total_volume_week * 100) if total_volume_week > 0 else 0.0 if total_trades_week > 0: weekly_stats_list.append({ 'week': week_key_display, 'week_formatted': week_formatted_display, 'has_trades': True, 'pnl': total_pnl_week, 'trades': total_trades_week, 'volume': total_volume_week, 'pnl_pct': pnl_pct_week }) else: weekly_stats_list.append({ 'week': week_key_display, 'week_formatted': week_formatted_display, 'has_trades': False, 'trades': 0, 'pnl': 0.0, 'volume': 0.0, 'pnl_pct': 0.0 }) else: weekly_stats_list.append({ 'week': week_key_display, 'week_formatted': week_formatted_display, 'has_trades': False, 'trades': 0, 'pnl': 0.0, 'volume': 0.0, 'pnl_pct': 0.0 }) return weekly_stats_list def get_monthly_stats(self, limit: int = 10) -> List[Dict[str, Any]]: """Get monthly performance stats for the last N months by aggregating daily_aggregated_stats.""" monthly_stats_list = [] current_month_start_utc = datetime.now(timezone.utc).date().replace(day=1) for i in range(limit): year = current_month_start_utc.year month = current_month_start_utc.month - i while month <= 0: month += 12 year -= 1 target_month_start_date = datetime(year, month, 1, tzinfo=timezone.utc).date() next_month_start_date = datetime(year + (month // 12), (month % 12) + 1, 1, tzinfo=timezone.utc).date() if month < 12 else datetime(year + 1, 1, 1, tzinfo=timezone.utc).date() target_month_end_date = next_month_start_date - timedelta(days=1) month_key_display = target_month_start_date.strftime('%Y-%m') month_formatted_display = target_month_start_date.strftime('%b %Y') daily_records_for_month = self.db._fetch_query( "SELECT date, realized_pnl, completed_cycles, exit_volume FROM daily_aggregated_stats WHERE date BETWEEN ? AND ?", (target_month_start_date.strftime('%Y-%m-%d'), target_month_end_date.strftime('%Y-%m-%d')) ) if daily_records_for_month: total_pnl_month = sum(d.get('realized_pnl', 0.0) or 0.0 for d in daily_records_for_month) total_trades_month = sum(d.get('completed_cycles', 0) or 0 for d in daily_records_for_month) total_volume_month = sum(d.get('exit_volume', 0.0) or 0.0 for d in daily_records_for_month) pnl_pct_month = (total_pnl_month / total_volume_month * 100) if total_volume_month > 0 else 0.0 if total_trades_month > 0: monthly_stats_list.append({ 'month': month_key_display, 'month_formatted': month_formatted_display, 'has_trades': True, 'pnl': total_pnl_month, 'trades': total_trades_month, 'volume': total_volume_month, 'pnl_pct': pnl_pct_month }) else: monthly_stats_list.append({ 'month': month_key_display, 'month_formatted': month_formatted_display, 'has_trades': False, 'trades': 0, 'pnl': 0.0, 'volume': 0.0, 'pnl_pct': 0.0 }) else: monthly_stats_list.append({ 'month': month_key_display, 'month_formatted': month_formatted_display, 'has_trades': False, 'trades': 0, 'pnl': 0.0, 'volume': 0.0, 'pnl_pct': 0.0 }) return monthly_stats_list