#!/usr/bin/env python3
"""
Trading Statistics Tracker
Tracks and calculates comprehensive trading statistics including:
- Balance tracking
- Trade history
- P&L analysis
- Win/Loss ratios
- Risk metrics (Sharpe, Sortino)
- Performance metrics
"""
import json
import os
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Tuple
import numpy as np
from config import Config
logger = logging.getLogger(__name__)
class TradingStats:
"""Comprehensive trading statistics tracker."""
def __init__(self, stats_file: str = "trading_stats.json"):
"""Initialize the stats tracker."""
self.stats_file = stats_file
self.data = self._load_stats()
# Initialize if first run
if not self.data:
self._initialize_stats()
def _load_stats(self) -> Dict[str, Any]:
"""Load stats from file."""
try:
if os.path.exists(self.stats_file):
with open(self.stats_file, 'r') as f:
return json.load(f)
except Exception as e:
logger.error(f"Error loading stats: {e}")
return {}
def _save_stats(self):
"""Save stats to file."""
try:
with open(self.stats_file, 'w') as f:
json.dump(self.data, f, indent=2, default=str)
except Exception as e:
logger.error(f"Error saving stats: {e}")
def _initialize_stats(self):
"""Initialize stats structure."""
self.data = {
'start_time': datetime.now().isoformat(),
'initial_balance': 0.0,
'trades': [],
'daily_balances': [],
'last_update': datetime.now().isoformat(),
'manual_trades_only': True # Flag to indicate manual trading
}
self._save_stats()
def set_initial_balance(self, balance: float):
"""Set the initial balance when bot starts."""
if self.data.get('initial_balance', 0) == 0:
self.data['initial_balance'] = balance
self.data['start_time'] = datetime.now().isoformat()
logger.info(f"Initial balance set to: ${balance:.2f}")
self._save_stats()
def record_balance(self, balance: float):
"""Record daily balance snapshot."""
today = datetime.now().date().isoformat()
# Check if we already have today's balance
for entry in self.data['daily_balances']:
if entry['date'] == today:
entry['balance'] = balance
entry['timestamp'] = datetime.now().isoformat()
self._save_stats()
return
# Add new daily balance
self.data['daily_balances'].append({
'date': today,
'balance': balance,
'timestamp': datetime.now().isoformat()
})
self._save_stats()
def record_trade(self, symbol: str, side: str, amount: float, price: float,
order_id: str = None, trade_type: str = "manual"):
"""Record a trade."""
trade = {
'timestamp': datetime.now().isoformat(),
'symbol': symbol,
'side': side.lower(),
'amount': amount,
'price': price,
'value': amount * price,
'order_id': order_id,
'type': trade_type,
'pnl': 0.0 # Will be calculated when position is closed
}
self.data['trades'].append(trade)
self.data['last_update'] = datetime.now().isoformat()
self._save_stats()
logger.info(f"Recorded trade: {side} {amount} {symbol} @ ${price:.2f}")
def calculate_trade_pnl(self) -> List[Dict[str, Any]]:
"""Calculate P&L for completed trades using FIFO method."""
trades_with_pnl = []
positions = {} # Track open positions by symbol
for trade in self.data['trades']:
symbol = trade['symbol']
side = trade['side']
amount = trade['amount']
price = trade['price']
if symbol not in positions:
positions[symbol] = {'amount': 0, 'avg_price': 0, 'total_cost': 0}
pos = positions[symbol]
if side == 'buy':
# Add to position
pos['total_cost'] += amount * price
pos['amount'] += amount
pos['avg_price'] = pos['total_cost'] / pos['amount'] if pos['amount'] > 0 else 0
trade_copy = trade.copy()
trade_copy['pnl'] = 0 # No PnL on opening
trades_with_pnl.append(trade_copy)
elif side == 'sell':
# Reduce position and calculate PnL
if pos['amount'] > 0:
sold_amount = min(amount, pos['amount'])
pnl = sold_amount * (price - pos['avg_price'])
# Update position
pos['amount'] -= sold_amount
if pos['amount'] > 0:
pos['total_cost'] = pos['amount'] * pos['avg_price']
else:
pos['total_cost'] = 0
pos['avg_price'] = 0
trade_copy = trade.copy()
trade_copy['pnl'] = pnl
trade_copy['sold_amount'] = sold_amount
trades_with_pnl.append(trade_copy)
else:
# Short selling (negative PnL calculation)
trade_copy = trade.copy()
trade_copy['pnl'] = 0 # Would need more complex logic for shorts
trades_with_pnl.append(trade_copy)
return trades_with_pnl
def get_basic_stats(self) -> Dict[str, Any]:
"""Get basic trading statistics."""
if not self.data['trades']:
return {
'total_trades': 0,
'initial_balance': self.data.get('initial_balance', 0),
'current_balance': 0,
'total_pnl': 0,
'days_active': 0
}
trades_with_pnl = self.calculate_trade_pnl()
total_pnl = sum(trade.get('pnl', 0) for trade in trades_with_pnl)
# Calculate days active
start_date = datetime.fromisoformat(self.data['start_time'])
days_active = (datetime.now() - start_date).days + 1
return {
'total_trades': len(self.data['trades']),
'buy_trades': len([t for t in self.data['trades'] if t['side'] == 'buy']),
'sell_trades': len([t for t in self.data['trades'] if t['side'] == 'sell']),
'initial_balance': self.data.get('initial_balance', 0),
'total_pnl': total_pnl,
'days_active': days_active,
'start_date': start_date.strftime('%Y-%m-%d'),
'last_trade': self.data['trades'][-1]['timestamp'] if self.data['trades'] else None
}
def get_performance_stats(self) -> Dict[str, Any]:
"""Calculate advanced performance statistics."""
trades_with_pnl = self.calculate_trade_pnl()
completed_trades = [t for t in trades_with_pnl if t.get('pnl', 0) != 0]
if not completed_trades:
return {
'win_rate': 0,
'profit_factor': 0,
'avg_win': 0,
'avg_loss': 0,
'largest_win': 0,
'largest_loss': 0,
'consecutive_wins': 0,
'consecutive_losses': 0
}
# Separate wins and losses
wins = [t['pnl'] for t in completed_trades if t['pnl'] > 0]
losses = [abs(t['pnl']) for t in completed_trades if t['pnl'] < 0]
# Basic metrics
total_wins = len(wins)
total_losses = len(losses)
total_completed = total_wins + total_losses
win_rate = (total_wins / total_completed * 100) if total_completed > 0 else 0
# Profit metrics
total_profit = sum(wins) if wins else 0
total_loss = sum(losses) if losses else 0
profit_factor = (total_profit / total_loss) if total_loss > 0 else float('inf') if total_profit > 0 else 0
avg_win = np.mean(wins) if wins else 0
avg_loss = np.mean(losses) if losses else 0
largest_win = max(wins) if wins else 0
largest_loss = max(losses) if losses else 0
# Consecutive wins/losses
consecutive_wins = 0
consecutive_losses = 0
current_wins = 0
current_losses = 0
for trade in completed_trades:
if trade['pnl'] > 0:
current_wins += 1
current_losses = 0
consecutive_wins = max(consecutive_wins, current_wins)
else:
current_losses += 1
current_wins = 0
consecutive_losses = max(consecutive_losses, current_losses)
return {
'win_rate': win_rate,
'profit_factor': profit_factor,
'avg_win': avg_win,
'avg_loss': avg_loss,
'largest_win': largest_win,
'largest_loss': largest_loss,
'consecutive_wins': consecutive_wins,
'consecutive_losses': consecutive_losses,
'total_wins': total_wins,
'total_losses': total_losses,
'expectancy': avg_win * (win_rate/100) - avg_loss * ((100-win_rate)/100)
}
def get_risk_metrics(self) -> Dict[str, Any]:
"""Calculate risk-adjusted metrics."""
if not self.data['daily_balances'] or len(self.data['daily_balances']) < 2:
return {
'sharpe_ratio': 0,
'sortino_ratio': 0,
'max_drawdown': 0,
'volatility': 0,
'var_95': 0
}
# Calculate daily returns
balances = [entry['balance'] for entry in self.data['daily_balances']]
returns = []
for i in range(1, len(balances)):
daily_return = (balances[i] - balances[i-1]) / balances[i-1] if balances[i-1] > 0 else 0
returns.append(daily_return)
if not returns:
return {
'sharpe_ratio': 0,
'sortino_ratio': 0,
'max_drawdown': 0,
'volatility': 0,
'var_95': 0
}
returns = np.array(returns)
# Risk-free rate (assume 2% annually, convert to daily)
risk_free_rate = 0.02 / 365
# Sharpe Ratio
excess_returns = returns - risk_free_rate
sharpe_ratio = np.mean(excess_returns) / np.std(returns) * np.sqrt(365) if np.std(returns) > 0 else 0
# Sortino Ratio (only downside volatility)
downside_returns = returns[returns < 0]
downside_std = np.std(downside_returns) if len(downside_returns) > 0 else 0
sortino_ratio = np.mean(excess_returns) / downside_std * np.sqrt(365) if downside_std > 0 else 0
# Maximum Drawdown
cumulative = np.cumprod(1 + returns)
running_max = np.maximum.accumulate(cumulative)
drawdown = (cumulative - running_max) / running_max
max_drawdown = np.min(drawdown) * 100
# Volatility (annualized)
volatility = np.std(returns) * np.sqrt(365) * 100
# Value at Risk (95%)
var_95 = np.percentile(returns, 5) * 100
return {
'sharpe_ratio': sharpe_ratio,
'sortino_ratio': sortino_ratio,
'max_drawdown': abs(max_drawdown),
'volatility': volatility,
'var_95': abs(var_95)
}
def get_comprehensive_stats(self, current_balance: float = None) -> Dict[str, Any]:
"""Get all statistics combined."""
if current_balance:
self.record_balance(current_balance)
basic = self.get_basic_stats()
performance = self.get_performance_stats()
risk = self.get_risk_metrics()
# Calculate total return
initial_balance = basic['initial_balance']
total_return = 0
if current_balance and initial_balance > 0:
total_return = ((current_balance - initial_balance) / initial_balance) * 100
return {
'basic': basic,
'performance': performance,
'risk': risk,
'current_balance': current_balance or 0,
'total_return': total_return,
'last_updated': datetime.now().isoformat()
}
def format_stats_message(self, current_balance: float = None) -> str:
"""Format stats for Telegram display."""
stats = self.get_comprehensive_stats(current_balance)
basic = stats['basic']
perf = stats['performance']
risk = stats['risk']
message = f"""
📊 Trading Statistics
💰 Balance Overview
• Initial: ${basic['initial_balance']:,.2f}
• Current: ${stats['current_balance']:,.2f}
• Total P&L: ${basic['total_pnl']:,.2f}
• Total Return: {stats['total_return']:.2f}%
📈 Trading Activity
• Total Trades: {basic['total_trades']}
• Buy Orders: {basic['buy_trades']}
• Sell Orders: {basic['sell_trades']}
• Days Active: {basic['days_active']}
🏆 Performance Metrics
• Win Rate: {perf['win_rate']:.1f}%
• Profit Factor: {perf['profit_factor']:.2f}
• Avg Win: ${perf['avg_win']:.2f}
• Avg Loss: ${perf['avg_loss']:.2f}
• Expectancy: ${perf['expectancy']:.2f}
📊 Risk Metrics
• Sharpe Ratio: {risk['sharpe_ratio']:.2f}
• Sortino Ratio: {risk['sortino_ratio']:.2f}
• Max Drawdown: {risk['max_drawdown']:.2f}%
• Volatility: {risk['volatility']:.2f}%
• VaR (95%): {risk['var_95']:.2f}%
🎯 Best/Worst
• Largest Win: ${perf['largest_win']:.2f}
• Largest Loss: ${perf['largest_loss']:.2f}
• Max Consecutive Wins: {perf['consecutive_wins']}
• Max Consecutive Losses: {perf['consecutive_losses']}
📅 Since: {basic['start_date']}
"""
return message.strip()
def get_recent_trades(self, limit: int = 10) -> List[Dict[str, Any]]:
"""Get recent trades."""
return self.data['trades'][-limit:] if self.data['trades'] else []