123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344 |
- #!/usr/bin/env python3
- """
- Aggregation Manager for Trading Statistics
- Handles data aggregation, migration from individual trades to aggregated statistics,
- and balance adjustment tracking.
- """
- import sqlite3
- import logging
- from datetime import datetime, timezone, timedelta
- from typing import Dict, List, Any, Optional
- import uuid
- from src.utils.token_display_formatter import get_formatter
- logger = logging.getLogger(__name__)
- class AggregationManager:
- """Manages data aggregation and migration in the trading statistics database."""
- def __init__(self, db_manager):
- """Initialize with database manager."""
- self.db = db_manager
- def migrate_trade_to_aggregated_stats(self, trade_lifecycle_id: str):
- """Migrate a completed/cancelled trade's stats to aggregate tables and delete the original trade."""
- trade_data = self.db._fetchone_query("SELECT * FROM trades WHERE trade_lifecycle_id = ?", (trade_lifecycle_id,))
- if not trade_data:
- logger.error(f"Cannot migrate trade {trade_lifecycle_id}: Not found.")
- return
- status = trade_data.get('status')
- symbol = trade_data.get('symbol')
- token = symbol.split('/')[0] if symbol and '/' in symbol else symbol
- if not token:
- logger.error(f"Cannot migrate trade {trade_lifecycle_id}: Token could not be derived from symbol '{symbol}'.")
- return
- now_iso = datetime.now(timezone.utc).isoformat()
- try:
- with self.db.conn:
- if status == 'position_closed':
- self._migrate_closed_position(trade_data, token, now_iso)
- elif status == 'cancelled':
- self._migrate_cancelled_position(trade_data, token, now_iso)
-
- # Delete the original trade from the 'trades' table
- self.db._execute_query("DELETE FROM trades WHERE trade_lifecycle_id = ?", (trade_lifecycle_id,))
- logger.info(f"Deleted trade lifecycle {trade_lifecycle_id} from trades table after aggregation.")
- except sqlite3.Error as e:
- logger.error(f"Database error migrating trade {trade_lifecycle_id} to aggregate stats: {e}", exc_info=True)
- except Exception as e:
- logger.error(f"Unexpected error migrating trade {trade_lifecycle_id} to aggregate stats: {e}", exc_info=True)
- def _migrate_closed_position(self, trade_data: Dict[str, Any], token: str, now_iso: str):
- """Migrate a closed position to aggregated stats."""
- realized_pnl = trade_data.get('realized_pnl', 0.0)
- entry_value = trade_data.get('value', 0.0)
- exit_value = entry_value + realized_pnl
- closed_at_str = trade_data.get('position_closed_at', now_iso)
- closed_at_dt = datetime.fromisoformat(closed_at_str)
- date_str = closed_at_dt.strftime('%Y-%m-%d')
- # Calculate duration if timestamps are available
- opened_at_str = trade_data.get('position_opened_at')
- duration_seconds = 0
- if opened_at_str and closed_at_str:
- try:
- opened_at_dt = datetime.fromisoformat(opened_at_str)
- duration_seconds = (closed_at_dt - opened_at_dt).total_seconds()
- except Exception:
- duration_seconds = 0
- # Calculate ROE percentage
- roe_percentage = (realized_pnl / entry_value * 100) if entry_value > 0 else 0.0
- # Update token_stats
- token_upsert_query = """
- INSERT INTO token_stats (
- token, total_realized_pnl, total_completed_cycles, winning_cycles, losing_cycles,
- total_entry_volume, total_exit_volume, sum_of_winning_pnl, sum_of_losing_pnl,
- largest_winning_cycle_pnl, largest_losing_cycle_pnl, largest_winning_cycle_entry_volume, largest_losing_cycle_entry_volume,
- first_cycle_closed_at, last_cycle_closed_at, total_duration_seconds, roe_percentage, updated_at
- ) VALUES (?, ?, 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- ON CONFLICT(token) DO UPDATE SET
- total_realized_pnl = total_realized_pnl + excluded.total_realized_pnl,
- total_completed_cycles = total_completed_cycles + 1,
- winning_cycles = winning_cycles + excluded.winning_cycles,
- losing_cycles = losing_cycles + excluded.losing_cycles,
- total_entry_volume = total_entry_volume + excluded.total_entry_volume,
- total_exit_volume = total_exit_volume + excluded.total_exit_volume,
- sum_of_winning_pnl = sum_of_winning_pnl + excluded.sum_of_winning_pnl,
- sum_of_losing_pnl = sum_of_losing_pnl + excluded.sum_of_losing_pnl,
- largest_winning_cycle_pnl =
- CASE WHEN excluded.largest_winning_cycle_pnl > largest_winning_cycle_pnl
- THEN excluded.largest_winning_cycle_pnl
- ELSE largest_winning_cycle_pnl END,
- largest_losing_cycle_pnl =
- CASE WHEN excluded.largest_losing_cycle_pnl > largest_losing_cycle_pnl
- THEN excluded.largest_losing_cycle_pnl
- ELSE largest_losing_cycle_pnl END,
- largest_winning_cycle_entry_volume =
- CASE WHEN excluded.largest_winning_cycle_pnl > largest_winning_cycle_pnl
- THEN excluded.largest_winning_cycle_entry_volume
- ELSE largest_winning_cycle_entry_volume END,
- largest_losing_cycle_entry_volume =
- CASE WHEN excluded.largest_losing_cycle_pnl > largest_losing_cycle_pnl
- THEN excluded.largest_losing_cycle_entry_volume
- ELSE largest_losing_cycle_entry_volume END,
- first_cycle_closed_at = MIN(first_cycle_closed_at, excluded.first_cycle_closed_at),
- last_cycle_closed_at = MAX(last_cycle_closed_at, excluded.last_cycle_closed_at),
- total_duration_seconds = total_duration_seconds + excluded.total_duration_seconds,
- roe_percentage = excluded.roe_percentage,
- updated_at = excluded.updated_at
- """
- is_win = 1 if realized_pnl > 0 else 0
- is_loss = 1 if realized_pnl < 0 else 0
- win_pnl_contrib = realized_pnl if realized_pnl > 0 else 0.0
- loss_pnl_contrib = abs(realized_pnl) if realized_pnl < 0 else 0.0
-
- # For largest winning/losing, we only consider them if this is the new largest
- largest_win_entry_volume = entry_value if realized_pnl > 0 else 0.0
- largest_loss_entry_volume = entry_value if realized_pnl < 0 else 0.0
-
- self.db._execute_query(token_upsert_query, (
- token, realized_pnl, is_win, is_loss, entry_value, exit_value,
- win_pnl_contrib, loss_pnl_contrib, win_pnl_contrib, loss_pnl_contrib,
- largest_win_entry_volume, largest_loss_entry_volume,
- closed_at_str, closed_at_str, duration_seconds, roe_percentage, now_iso
- ))
- # Update daily_aggregated_stats
- daily_upsert_query = """
- INSERT INTO daily_aggregated_stats (
- date, token, realized_pnl, completed_cycles, entry_volume, exit_volume
- ) VALUES (?, ?, ?, 1, ?, ?)
- ON CONFLICT(date, token) DO UPDATE SET
- realized_pnl = realized_pnl + excluded.realized_pnl,
- completed_cycles = completed_cycles + 1,
- entry_volume = entry_volume + excluded.entry_volume,
- exit_volume = exit_volume + excluded.exit_volume
- """
- self.db._execute_query(daily_upsert_query, (
- date_str, token, realized_pnl, entry_value, exit_value
- ))
- logger.info(f"Aggregated stats for closed trade lifecycle ({token}). PNL: {realized_pnl:.2f}")
- def _migrate_cancelled_position(self, trade_data: Dict[str, Any], token: str, now_iso: str):
- """Migrate a cancelled position to aggregated stats."""
- # Update token_stats for cancelled count
- cancelled_upsert_query = """
- INSERT INTO token_stats (token, total_cancelled_cycles, updated_at)
- VALUES (?, 1, ?)
- ON CONFLICT(token) DO UPDATE SET
- total_cancelled_cycles = total_cancelled_cycles + 1,
- updated_at = excluded.updated_at
- """
- self.db._execute_query(cancelled_upsert_query, (token, now_iso))
- logger.info(f"Incremented cancelled_cycles for {token}.")
- def record_deposit(self, amount: float, timestamp: Optional[str] = None,
- deposit_id: Optional[str] = None, description: Optional[str] = None):
- """Record a deposit."""
- ts = timestamp if timestamp else datetime.now(timezone.utc).isoformat()
- formatter = get_formatter()
- formatted_amount_str = formatter.format_price_with_symbol(amount)
- desc = description if description else f'Deposit of {formatted_amount_str}'
-
- self.db._execute_query(
- "INSERT INTO balance_adjustments (adjustment_id, timestamp, type, amount, description) VALUES (?, ?, ?, ?, ?)",
- (deposit_id or str(uuid.uuid4()), ts, 'deposit', amount, desc)
- )
- # Adjust initial_balance in metadata to reflect capital changes
- current_initial = float(self.db._get_metadata('initial_balance') or '0.0')
- self.db._set_metadata('initial_balance', str(current_initial + amount))
- logger.info(f"💰 Recorded deposit: {formatted_amount_str}. New effective initial balance: {formatter.format_price_with_symbol(current_initial + amount)}")
- def record_withdrawal(self, amount: float, timestamp: Optional[str] = None,
- withdrawal_id: Optional[str] = None, description: Optional[str] = None):
- """Record a withdrawal."""
- ts = timestamp if timestamp else datetime.now(timezone.utc).isoformat()
- formatter = get_formatter()
- formatted_amount_str = formatter.format_price_with_symbol(amount)
- desc = description if description else f'Withdrawal of {formatted_amount_str}'
-
- self.db._execute_query(
- "INSERT INTO balance_adjustments (adjustment_id, timestamp, type, amount, description) VALUES (?, ?, ?, ?, ?)",
- (withdrawal_id or str(uuid.uuid4()), ts, 'withdrawal', amount, desc)
- )
- current_initial = float(self.db._get_metadata('initial_balance') or '0.0')
- self.db._set_metadata('initial_balance', str(current_initial - amount))
- logger.info(f"💸 Recorded withdrawal: {formatted_amount_str}. New effective initial balance: {formatter.format_price_with_symbol(current_initial - amount)}")
- def get_balance_adjustments_summary(self) -> Dict[str, Any]:
- """Get summary of all balance adjustments from DB."""
- adjustments = self.db._fetch_query("SELECT type, amount, timestamp FROM balance_adjustments ORDER BY timestamp ASC")
-
- if not adjustments:
- return {'total_deposits': 0.0, 'total_withdrawals': 0.0, 'net_adjustment': 0.0,
- 'adjustment_count': 0, 'last_adjustment': None}
-
- total_deposits = sum(adj['amount'] for adj in adjustments if adj['type'] == 'deposit')
- total_withdrawals = sum(adj['amount'] for adj in adjustments if adj['type'] == 'withdrawal')
- net_adjustment = total_deposits - total_withdrawals
-
- return {
- 'total_deposits': total_deposits, 'total_withdrawals': total_withdrawals,
- 'net_adjustment': net_adjustment, 'adjustment_count': len(adjustments),
- 'last_adjustment': adjustments[-1]['timestamp'] if adjustments else None
- }
- def get_daily_stats(self, limit: int = 10) -> List[Dict[str, Any]]:
- """Get daily performance stats for the last N days from daily_aggregated_stats."""
- daily_stats_list = []
- today_utc = datetime.now(timezone.utc).date()
- for i in range(limit):
- target_date = today_utc - timedelta(days=i)
- date_str = target_date.strftime('%Y-%m-%d')
- date_formatted = target_date.strftime('%m/%d')
- day_aggregated_data = self.db._fetch_query(
- "SELECT SUM(realized_pnl) as pnl, SUM(completed_cycles) as trades, SUM(exit_volume) as volume FROM daily_aggregated_stats WHERE date = ?",
- (date_str,)
- )
-
- stats_for_day = None
- if day_aggregated_data and len(day_aggregated_data) > 0 and day_aggregated_data[0]['trades'] is not None:
- stats_for_day = day_aggregated_data[0]
- pnl = stats_for_day.get('pnl', 0.0) or 0.0
- volume = stats_for_day.get('volume', 0.0) or 0.0
- stats_for_day['pnl_pct'] = (pnl / volume * 100) if volume > 0 else 0.0
- stats_for_day['trades'] = int(stats_for_day.get('trades', 0) or 0)
- if stats_for_day and stats_for_day['trades'] > 0:
- daily_stats_list.append({
- 'date': date_str, 'date_formatted': date_formatted, 'has_trades': True,
- **stats_for_day
- })
- else:
- daily_stats_list.append({
- 'date': date_str, 'date_formatted': date_formatted, 'has_trades': False,
- 'trades': 0, 'pnl': 0.0, 'volume': 0.0, 'pnl_pct': 0.0
- })
- return daily_stats_list
- def get_weekly_stats(self, limit: int = 10) -> List[Dict[str, Any]]:
- """Get weekly performance stats for the last N weeks by aggregating daily_aggregated_stats."""
- weekly_stats_list = []
- today_utc = datetime.now(timezone.utc).date()
- for i in range(limit):
- target_monday = today_utc - timedelta(days=today_utc.weekday() + (i * 7))
- target_sunday = target_monday + timedelta(days=6)
-
- week_key_display = f"{target_monday.strftime('%Y-W%W')}"
- week_formatted_display = f"{target_monday.strftime('%m/%d')}-{target_sunday.strftime('%m/%d/%y')}"
- daily_records_for_week = self.db._fetch_query(
- "SELECT date, realized_pnl, completed_cycles, exit_volume FROM daily_aggregated_stats WHERE date BETWEEN ? AND ?",
- (target_monday.strftime('%Y-%m-%d'), target_sunday.strftime('%Y-%m-%d'))
- )
-
- if daily_records_for_week:
- total_pnl_week = sum(d.get('realized_pnl', 0.0) or 0.0 for d in daily_records_for_week)
- total_trades_week = sum(d.get('completed_cycles', 0) or 0 for d in daily_records_for_week)
- total_volume_week = sum(d.get('exit_volume', 0.0) or 0.0 for d in daily_records_for_week)
- pnl_pct_week = (total_pnl_week / total_volume_week * 100) if total_volume_week > 0 else 0.0
-
- if total_trades_week > 0:
- weekly_stats_list.append({
- 'week': week_key_display,
- 'week_formatted': week_formatted_display,
- 'has_trades': True,
- 'pnl': total_pnl_week,
- 'trades': total_trades_week,
- 'volume': total_volume_week,
- 'pnl_pct': pnl_pct_week
- })
- else:
- weekly_stats_list.append({
- 'week': week_key_display, 'week_formatted': week_formatted_display, 'has_trades': False,
- 'trades': 0, 'pnl': 0.0, 'volume': 0.0, 'pnl_pct': 0.0
- })
- else:
- weekly_stats_list.append({
- 'week': week_key_display, 'week_formatted': week_formatted_display, 'has_trades': False,
- 'trades': 0, 'pnl': 0.0, 'volume': 0.0, 'pnl_pct': 0.0
- })
- return weekly_stats_list
- def get_monthly_stats(self, limit: int = 10) -> List[Dict[str, Any]]:
- """Get monthly performance stats for the last N months by aggregating daily_aggregated_stats."""
- monthly_stats_list = []
- current_month_start_utc = datetime.now(timezone.utc).date().replace(day=1)
- for i in range(limit):
- year = current_month_start_utc.year
- month = current_month_start_utc.month - i
- while month <= 0:
- month += 12
- year -= 1
-
- target_month_start_date = datetime(year, month, 1, tzinfo=timezone.utc).date()
- next_month_start_date = datetime(year + (month // 12), (month % 12) + 1, 1, tzinfo=timezone.utc).date() if month < 12 else datetime(year + 1, 1, 1, tzinfo=timezone.utc).date()
- target_month_end_date = next_month_start_date - timedelta(days=1)
-
- month_key_display = target_month_start_date.strftime('%Y-%m')
- month_formatted_display = target_month_start_date.strftime('%b %Y')
- daily_records_for_month = self.db._fetch_query(
- "SELECT date, realized_pnl, completed_cycles, exit_volume FROM daily_aggregated_stats WHERE date BETWEEN ? AND ?",
- (target_month_start_date.strftime('%Y-%m-%d'), target_month_end_date.strftime('%Y-%m-%d'))
- )
- if daily_records_for_month:
- total_pnl_month = sum(d.get('realized_pnl', 0.0) or 0.0 for d in daily_records_for_month)
- total_trades_month = sum(d.get('completed_cycles', 0) or 0 for d in daily_records_for_month)
- total_volume_month = sum(d.get('exit_volume', 0.0) or 0.0 for d in daily_records_for_month)
- pnl_pct_month = (total_pnl_month / total_volume_month * 100) if total_volume_month > 0 else 0.0
- if total_trades_month > 0:
- monthly_stats_list.append({
- 'month': month_key_display,
- 'month_formatted': month_formatted_display,
- 'has_trades': True,
- 'pnl': total_pnl_month,
- 'trades': total_trades_month,
- 'volume': total_volume_month,
- 'pnl_pct': pnl_pct_month
- })
- else:
- monthly_stats_list.append({
- 'month': month_key_display, 'month_formatted': month_formatted_display, 'has_trades': False,
- 'trades': 0, 'pnl': 0.0, 'volume': 0.0, 'pnl_pct': 0.0
- })
- else:
- monthly_stats_list.append({
- 'month': month_key_display, 'month_formatted': month_formatted_display, 'has_trades': False,
- 'trades': 0, 'pnl': 0.0, 'volume': 0.0, 'pnl_pct': 0.0
- })
- return monthly_stats_list
|