#!/usr/bin/env python3 """ Performance Calculator for Trading Statistics Handles performance metrics calculations including win rate, PnL, drawdown, trade durations, and comprehensive statistical analysis. """ import logging from datetime import datetime, timezone, timedelta from typing import Dict, List, Any, Optional, Tuple import math import numpy as np from src.utils.token_display_formatter import get_formatter logger = logging.getLogger(__name__) class PerformanceCalculator: """Calculates performance metrics and statistics from trading data.""" def __init__(self, db_manager): """Initialize with database manager.""" self.db = db_manager def _format_duration(self, total_seconds: int) -> str: """Format duration from seconds to human-readable format.""" if total_seconds < 60: return f"{int(total_seconds)}s" elif total_seconds < 3600: minutes = total_seconds // 60 seconds = total_seconds % 60 return f"{int(minutes)}m {int(seconds)}s" elif total_seconds < 86400: hours = total_seconds // 3600 minutes = (total_seconds % 3600) // 60 return f"{int(hours)}h {int(minutes)}m" else: days = total_seconds // 86400 hours = (total_seconds % 86400) // 3600 return f"{int(days)}d {int(hours)}h" def get_performance_stats(self) -> Dict[str, Any]: """Get performance stats.""" try: # Get initial balance from metadata initial_balance_str = self.db._get_metadata('initial_balance') initial_balance = float(initial_balance_str) if initial_balance_str else 0.0 # Get all token stats token_stats = self.db._fetch_query( "SELECT * FROM token_stats", () ) # Get open positions open_positions = self.db._fetch_query( "SELECT * FROM trades WHERE status = 'position_opened'", () ) # Initialize performance metrics total_trades = 0 total_wins = 0 total_losses = 0 total_pnl = 0.0 total_entry_volume = 0.0 total_exit_volume = 0.0 largest_win = 0.0 largest_loss = 0.0 # Initialize to 0 largest_win_token = "N/A" largest_loss_token = "N/A" largest_win_pct = 0.0 largest_loss_pct = 0.0 best_token_name = "N/A" best_token_pnl_value = 0.0 best_token_pnl_pct = 0.0 best_token_volume = 0.0 worst_token_name = "N/A" worst_token_pnl_value = 0.0 worst_token_pnl_pct = 0.0 worst_token_volume = 0.0 # Process token stats for token in token_stats: if token.get('total_completed_cycles', 0) > 0: total_trades += token.get('total_completed_cycles', 0) total_wins += token.get('winning_cycles', 0) total_losses += token.get('losing_cycles', 0) total_pnl += token.get('total_realized_pnl', 0) total_entry_volume += token.get('total_entry_volume', 0) total_exit_volume += token.get('total_exit_volume', 0) # Track largest trades token_largest_win = token.get('largest_winning_cycle_pnl', 0) token_largest_loss = token.get('largest_losing_cycle_pnl', 0) if token_largest_win > largest_win: largest_win = token_largest_win largest_win_token = token['token'] largest_win_pct = (token_largest_win / token.get('largest_winning_cycle_entry_volume', 1)) * 100 # For losses, we want the most negative number if token_largest_loss < 0 and (largest_loss == 0 or token_largest_loss < largest_loss): largest_loss = token_largest_loss largest_loss_token = token['token'] largest_loss_pct = (token_largest_loss / token.get('largest_losing_cycle_entry_volume', 1)) * 100 # Track best/worst tokens token_pnl = token.get('total_realized_pnl', 0) token_volume = token.get('total_entry_volume', 0) if token_volume > 0: token_pnl_pct = (token_pnl / token_volume) * 100 if best_token_name == "N/A" or token_pnl > best_token_pnl_value: best_token_name = token['token'] best_token_pnl_value = token_pnl best_token_pnl_pct = token_pnl_pct best_token_volume = token_volume if worst_token_name == "N/A" or token_pnl < worst_token_pnl_value: worst_token_name = token['token'] worst_token_pnl_value = token_pnl worst_token_pnl_pct = token_pnl_pct worst_token_volume = token_volume # Calculate win rate and profit factor win_rate = (total_wins / total_trades * 100) if total_trades > 0 else 0 # Calculate sum of winning and losing trades sum_winning = sum(token.get('sum_of_winning_pnl', 0) for token in token_stats) sum_losing = abs(sum(token.get('sum_of_losing_pnl', 0) for token in token_stats)) profit_factor = (sum_winning / sum_losing) if sum_losing > 0 else float('inf') if sum_winning > 0 else 0 # Calculate average P&L stats avg_win_pnl = sum_winning / total_wins if total_wins > 0 else 0 avg_loss_pnl = sum_losing / total_losses if total_losses > 0 else 0 avg_trade_pnl = total_pnl / total_trades if total_trades > 0 else 0.0 # Calculate expectancy expectancy = (avg_win_pnl * (win_rate/100)) - (avg_loss_pnl * (1 - win_rate/100)) # Get max drawdown max_drawdown, max_drawdown_pct, drawdown_start_date = self.get_live_max_drawdown() # Best/Worst trades by ROE best_roe_trade = self.db._fetchone_query("SELECT token, best_roe_percentage as percentage FROM token_stats WHERE best_roe_percentage IS NOT NULL ORDER BY best_roe_percentage DESC LIMIT 1") worst_roe_trade = self.db._fetchone_query("SELECT token, worst_roe_percentage as percentage FROM token_stats WHERE worst_roe_percentage IS NOT NULL ORDER BY worst_roe_percentage ASC LIMIT 1") return { 'initial_balance': initial_balance, 'total_trades': total_trades, 'total_wins': total_wins, 'total_losses': total_losses, 'win_rate': win_rate, 'total_pnl': total_pnl, 'total_entry_volume': total_entry_volume, 'total_exit_volume': total_exit_volume, 'profit_factor': profit_factor, 'expectancy': expectancy, 'avg_trade_pnl': avg_trade_pnl, 'avg_win_pnl': avg_win_pnl, 'avg_loss_pnl': avg_loss_pnl, 'largest_win': largest_win, 'largest_loss': largest_loss, 'largest_win_token': largest_win_token, 'largest_loss_token': largest_loss_token, 'largest_win_pct': largest_win_pct, 'largest_loss_pct': largest_loss_pct, 'best_token': best_token_name, 'best_token_pnl': best_token_pnl_value, 'best_token_pct': best_token_pnl_pct, 'best_token_volume': best_token_volume, 'worst_token': worst_token_name, 'worst_token_pnl': worst_token_pnl_value, 'worst_token_pct': worst_token_pnl_pct, 'worst_token_volume': worst_token_volume, 'max_drawdown': max_drawdown, 'max_drawdown_pct': max_drawdown_pct, 'drawdown_start_date': drawdown_start_date, 'open_positions': len(open_positions), 'best_roe_trade': best_roe_trade, 'worst_roe_trade': worst_roe_trade } except Exception as e: logger.error(f"Error getting performance stats: {e}") return {} def get_token_performance(self, limit: int = 20) -> List[Dict[str, Any]]: """Get performance stats by token, sorted by total P&L (dollar amount).""" formatter = get_formatter() # Get all token stats first, then sort by total P&L in Python token_stats = self.db._fetch_query( "SELECT * FROM token_stats", () ) for token in token_stats: total_cycles = token.get('total_completed_cycles', 0) winning_cycles = token.get('winning_cycles', 0) # Calculate win rate token['win_rate'] = (winning_cycles / total_cycles * 100) if total_cycles > 0 else 0 # Calculate profit factor sum_winning = token.get('sum_of_winning_pnl', 0) sum_losing = token.get('sum_of_losing_pnl', 0) token['profit_factor'] = sum_winning / sum_losing if sum_losing > 0 else float('inf') if sum_winning > 0 else 0 # Calculate ROE from realized PnL and entry volume total_pnl = token.get('total_realized_pnl', 0) entry_volume = token.get('completed_entry_volume', 0) token['roe_percentage'] = (total_pnl / entry_volume * 100) if entry_volume > 0 else 0.0 # Format durations total_duration = token.get('total_duration_seconds', 0) avg_duration = total_duration / total_cycles if total_cycles > 0 else 0 token['average_trade_duration_formatted'] = self._format_duration(avg_duration) # Token display name (use token as-is) token['display_name'] = token['token'].upper() # Sort by total P&L (highest to lowest), then by ROE as tiebreaker sorted_tokens = sorted( token_stats, key=lambda x: (x.get('total_realized_pnl', 0), x.get('roe_percentage', 0)), reverse=True ) # Return top tokens (limit) return sorted_tokens[:limit] def get_balance_history(self, days: int = 30) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]: """Get balance history for the last N days with detailed statistics.""" balance_history = self.db._fetch_query( "SELECT * FROM balance_history WHERE timestamp >= datetime('now', '-{} days') ORDER BY timestamp ASC".format(days) ) if not balance_history: return [], {} # Calculate statistics balances = [item['balance'] for item in balance_history] peak_balance = max(balances) current_balance = balances[-1] if balances else 0 # Calculate max drawdown running_max = 0 max_drawdown = 0 max_drawdown_percentage = 0 for balance in balances: if balance > running_max: running_max = balance drawdown = running_max - balance drawdown_percentage = (drawdown / running_max * 100) if running_max > 0 else 0 if drawdown > max_drawdown: max_drawdown = drawdown max_drawdown_percentage = drawdown_percentage # Calculate period return initial_balance_period = balances[0] if balances else 0 period_pnl = current_balance - initial_balance_period period_return_percentage = (period_pnl / initial_balance_period * 100) if initial_balance_period > 0 else 0 stats = { 'peak_balance': peak_balance, 'current_balance': current_balance, 'max_drawdown': max_drawdown, 'max_drawdown_percentage': max_drawdown_percentage, 'period_pnl': period_pnl, 'period_return_percentage': period_return_percentage, 'data_points': len(balance_history) } return balance_history, stats def get_live_max_drawdown(self) -> Tuple[float, float, Optional[str]]: """ Get live max drawdown value (in USD), percentage, and the date of the last peak. """ peak_balance = float(self.db._get_metadata('drawdown_peak_balance') or 0.0) max_drawdown_pct = float(self.db._get_metadata('drawdown_max_drawdown_pct') or 0.0) peak_date = self.db._get_metadata('drawdown_peak_date') # Calculate max drawdown value based on peak and percentage max_drawdown_value = peak_balance * (max_drawdown_pct / 100) return max_drawdown_value, max_drawdown_pct, peak_date def update_live_max_drawdown(self, current_balance: float) -> bool: """ Update the live maximum drawdown based on the current balance. This should be called periodically (e.g., every minute) or after every trade. """ if current_balance <= 0: return False peak_balance = float(self.db._get_metadata('drawdown_peak_balance') or '0.0') max_drawdown_percentage = float(self.db._get_metadata('drawdown_max_drawdown_pct') or '0.0') updated = False if current_balance > peak_balance: # New peak detected, reset drawdown tracking self.db._set_metadata('drawdown_peak_balance', str(current_balance)) self.db._set_metadata('drawdown_peak_date', datetime.now(timezone.utc).isoformat()) # Reset max drawdown percentage since we are at a new high if max_drawdown_percentage != 0: self.db._set_metadata('drawdown_max_drawdown_pct', '0.0') logger.info(f"New peak balance for drawdown tracking: ${current_balance:,.2f}") updated = True else: # Still in a drawdown, check if it's a new max drawdown = peak_balance - current_balance drawdown_percentage = (drawdown / peak_balance * 100) if peak_balance > 0 else 0 if drawdown_percentage > max_drawdown_percentage: self.db._set_metadata('drawdown_max_drawdown_pct', str(drawdown_percentage)) logger.info(f"New max drawdown detected: {drawdown_percentage:.2f}%") updated = True return updated def calculate_sharpe_ratio(self, days: int = 30) -> Optional[float]: """ Calculate Sharpe ratio from balance history. """ try: risk_free_rate = 0.0 # Assuming 0 for simplicity # Get balance history balance_history, _ = self.get_balance_history(days) if not balance_history or len(balance_history) < 2: return None # Calculate daily returns returns = [] for i in range(1, len(balance_history)): prev_balance = balance_history[i-1]['balance'] curr_balance = balance_history[i]['balance'] if prev_balance > 0: daily_return = (curr_balance - prev_balance) / prev_balance returns.append(daily_return) if not returns or np.std(returns) == 0: return 0.0 # Or None if not enough data # Calculate annualized Sharpe Ratio avg_daily_return = np.mean(returns) std_dev_daily_return = np.std(returns) sharpe_ratio = (avg_daily_return - (risk_free_rate / 365)) / std_dev_daily_return annualized_sharpe_ratio = sharpe_ratio * np.sqrt(365) # Annualize return annualized_sharpe_ratio except Exception as e: logger.error(f"❌ Error calculating Sharpe ratio: {e}") return None def calculate_max_consecutive_losses(self) -> int: """Calculate the maximum number of consecutive losing trades.""" # This now requires fetching from the token_stats table and is more complex # For simplicity, we assume this needs a direct query on a more granular `trades` table if it existed # This is a placeholder for a more complex implementation if needed. # As of now, we will get this from an aggregated value if we decide to store it. logger.warning("calculate_max_consecutive_losses is not fully implemented with the new schema.") return 0 # Placeholder def get_risk_metrics(self) -> Dict[str, Any]: """ Get key risk metrics for the trading account. """ # Get live drawdown stats max_drawdown_value, max_drawdown_percentage, drawdown_start_date = self.get_live_max_drawdown() # Get Sharpe ratio sharpe_ratio = self.calculate_sharpe_ratio(days=90) # Use 90 days for a more stable metric # Other metrics can be added here return { 'max_drawdown_value': max_drawdown_value, 'max_drawdown_percentage': max_drawdown_percentage, 'drawdown_start_date': drawdown_start_date, 'sharpe_ratio': sharpe_ratio, } def get_period_performance(self, start_date: str, end_date: str) -> Dict[str, Any]: """Get performance statistics for a specific date range.""" try: # Get daily stats for the period daily_stats = self.db._fetch_query(""" SELECT date, SUM(realized_pnl) as pnl, SUM(completed_cycles) as trades, SUM(exit_volume) as volume FROM daily_aggregated_stats WHERE date BETWEEN ? AND ? GROUP BY date ORDER BY date ASC """, (start_date, end_date)) if not daily_stats: return { 'period_start': start_date, 'period_end': end_date, 'total_pnl': 0, 'total_trades': 0, 'total_volume': 0, 'win_rate': 0, 'trading_days': 0, 'average_daily_pnl': 0 } total_pnl = sum(day.get('pnl', 0) or 0 for day in daily_stats) total_trades = sum(day.get('trades', 0) or 0 for day in daily_stats) total_volume = sum(day.get('volume', 0) or 0 for day in daily_stats) trading_days = len([day for day in daily_stats if (day.get('trades', 0) or 0) > 0]) average_daily_pnl = total_pnl / trading_days if trading_days > 0 else 0 return { 'period_start': start_date, 'period_end': end_date, 'total_pnl': total_pnl, 'total_trades': total_trades, 'total_volume': total_volume, 'trading_days': trading_days, 'average_daily_pnl': average_daily_pnl, 'daily_stats': daily_stats } except Exception as e: logger.error(f"❌ Error calculating period performance: {e}") return {} def get_recent_performance_trend(self, days: int = 7) -> Dict[str, Any]: """Get recent performance trend analysis.""" try: end_date = datetime.now(timezone.utc).date() start_date = end_date - timedelta(days=days) period_stats = self.get_period_performance( start_date.strftime('%Y-%m-%d'), end_date.strftime('%Y-%m-%d') ) # Calculate trend direction daily_pnls = [day.get('pnl', 0) or 0 for day in period_stats.get('daily_stats', [])] if len(daily_pnls) >= 2: # Simple linear trend x = list(range(len(daily_pnls))) slope = np.polyfit(x, daily_pnls, 1)[0] if len(daily_pnls) > 1 else 0 trend_direction = 'up' if slope > 0 else 'down' if slope < 0 else 'flat' else: trend_direction = 'insufficient_data' slope = 0 return { 'days': days, 'trend_direction': trend_direction, 'slope': slope, **period_stats } except Exception as e: logger.error(f"❌ Error calculating recent performance trend: {e}") return {'days': days, 'trend_direction': 'error'}