performance_calculator.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474
  1. #!/usr/bin/env python3
  2. """
  3. Performance Calculator for Trading Statistics
  4. Handles performance metrics calculations including win rate, PnL, drawdown,
  5. trade durations, and comprehensive statistical analysis.
  6. """
  7. import logging
  8. from datetime import datetime, timezone, timedelta
  9. from typing import Dict, List, Any, Optional, Tuple
  10. import math
  11. import numpy as np
  12. from src.utils.token_display_formatter import get_formatter
  13. logger = logging.getLogger(__name__)
  14. class PerformanceCalculator:
  15. """Calculates performance metrics and statistics from trading data."""
  16. def __init__(self, db_manager):
  17. """Initialize with database manager."""
  18. self.db = db_manager
  19. def _format_duration(self, total_seconds: int) -> str:
  20. """Format duration from seconds to human-readable format."""
  21. if total_seconds < 60:
  22. return f"{int(total_seconds)}s"
  23. elif total_seconds < 3600:
  24. minutes = total_seconds // 60
  25. seconds = total_seconds % 60
  26. return f"{int(minutes)}m {int(seconds)}s"
  27. elif total_seconds < 86400:
  28. hours = total_seconds // 3600
  29. minutes = (total_seconds % 3600) // 60
  30. return f"{int(hours)}h {int(minutes)}m"
  31. else:
  32. days = total_seconds // 86400
  33. hours = (total_seconds % 86400) // 3600
  34. return f"{int(days)}d {int(hours)}h"
  35. def get_performance_stats(self) -> Dict[str, Any]:
  36. """Get performance stats."""
  37. try:
  38. # Get initial balance from metadata
  39. initial_balance_str = self.db._get_metadata('initial_balance')
  40. initial_balance = float(initial_balance_str) if initial_balance_str else 0.0
  41. # Get all token stats
  42. token_stats = self.db._fetch_query(
  43. "SELECT * FROM token_stats",
  44. ()
  45. )
  46. # Get open positions
  47. open_positions = self.db._fetch_query(
  48. "SELECT * FROM trades WHERE status = 'position_opened'",
  49. ()
  50. )
  51. # Initialize performance metrics
  52. total_trades = 0
  53. total_wins = 0
  54. total_losses = 0
  55. total_pnl = 0.0
  56. total_entry_volume = 0.0
  57. total_exit_volume = 0.0
  58. largest_win = 0.0
  59. largest_loss = 0.0 # Initialize to 0
  60. largest_win_token = "N/A"
  61. largest_loss_token = "N/A"
  62. largest_win_pct = 0.0
  63. largest_loss_pct = 0.0
  64. best_token_name = "N/A"
  65. best_token_pnl_value = 0.0
  66. best_token_pnl_pct = 0.0
  67. best_token_volume = 0.0
  68. worst_token_name = "N/A"
  69. worst_token_pnl_value = 0.0
  70. worst_token_pnl_pct = 0.0
  71. worst_token_volume = 0.0
  72. # Process token stats
  73. for token in token_stats:
  74. if token.get('total_completed_cycles', 0) > 0:
  75. total_trades += token.get('total_completed_cycles', 0)
  76. total_wins += token.get('winning_cycles', 0)
  77. total_losses += token.get('losing_cycles', 0)
  78. total_pnl += token.get('total_realized_pnl', 0)
  79. total_entry_volume += token.get('total_entry_volume', 0)
  80. total_exit_volume += token.get('total_exit_volume', 0)
  81. # Track largest trades
  82. token_largest_win = token.get('largest_winning_cycle_pnl', 0)
  83. token_largest_loss = token.get('largest_losing_cycle_pnl', 0)
  84. if token_largest_win > largest_win:
  85. largest_win = token_largest_win
  86. largest_win_token = token['token']
  87. largest_win_pct = (token_largest_win / token.get('largest_winning_cycle_entry_volume', 1)) * 100
  88. # For losses, we want the most negative number
  89. if token_largest_loss < 0 and (largest_loss == 0 or token_largest_loss < largest_loss):
  90. largest_loss = token_largest_loss
  91. largest_loss_token = token['token']
  92. largest_loss_pct = (token_largest_loss / token.get('largest_losing_cycle_entry_volume', 1)) * 100
  93. # Track best/worst tokens
  94. token_pnl = token.get('total_realized_pnl', 0)
  95. token_volume = token.get('total_entry_volume', 0)
  96. if token_volume > 0:
  97. token_pnl_pct = (token_pnl / token_volume) * 100
  98. if best_token_name == "N/A" or token_pnl > best_token_pnl_value:
  99. best_token_name = token['token']
  100. best_token_pnl_value = token_pnl
  101. best_token_pnl_pct = token_pnl_pct
  102. best_token_volume = token_volume
  103. if worst_token_name == "N/A" or token_pnl < worst_token_pnl_value:
  104. worst_token_name = token['token']
  105. worst_token_pnl_value = token_pnl
  106. worst_token_pnl_pct = token_pnl_pct
  107. worst_token_volume = token_volume
  108. # Calculate win rate and profit factor
  109. win_rate = (total_wins / total_trades * 100) if total_trades > 0 else 0
  110. # Calculate sum of winning and losing trades
  111. sum_winning = sum(token.get('sum_of_winning_pnl', 0) for token in token_stats)
  112. sum_losing = abs(sum(token.get('sum_of_losing_pnl', 0) for token in token_stats))
  113. profit_factor = (sum_winning / sum_losing) if sum_losing > 0 else float('inf') if sum_winning > 0 else 0
  114. # Calculate average P&L stats
  115. avg_win_pnl = sum_winning / total_wins if total_wins > 0 else 0
  116. avg_loss_pnl = sum_losing / total_losses if total_losses > 0 else 0
  117. avg_trade_pnl = total_pnl / total_trades if total_trades > 0 else 0.0
  118. # Calculate expectancy
  119. expectancy = (avg_win_pnl * (win_rate/100)) - (avg_loss_pnl * (1 - win_rate/100))
  120. # Get max drawdown
  121. max_drawdown, max_drawdown_pct, drawdown_start_date = self.get_live_max_drawdown()
  122. # Best/Worst trades by ROE
  123. best_roe_trade = self.db._fetchone_query("SELECT token, best_roe_percentage as percentage FROM token_stats WHERE best_roe_percentage IS NOT NULL ORDER BY best_roe_percentage DESC LIMIT 1")
  124. worst_roe_trade = self.db._fetchone_query("SELECT token, worst_roe_percentage as percentage FROM token_stats WHERE worst_roe_percentage IS NOT NULL ORDER BY worst_roe_percentage ASC LIMIT 1")
  125. return {
  126. 'initial_balance': initial_balance,
  127. 'total_trades': total_trades,
  128. 'total_wins': total_wins,
  129. 'total_losses': total_losses,
  130. 'win_rate': win_rate,
  131. 'total_pnl': total_pnl,
  132. 'total_entry_volume': total_entry_volume,
  133. 'total_exit_volume': total_exit_volume,
  134. 'profit_factor': profit_factor,
  135. 'expectancy': expectancy,
  136. 'avg_trade_pnl': avg_trade_pnl,
  137. 'avg_win_pnl': avg_win_pnl,
  138. 'avg_loss_pnl': avg_loss_pnl,
  139. 'largest_win': largest_win,
  140. 'largest_loss': largest_loss,
  141. 'largest_win_token': largest_win_token,
  142. 'largest_loss_token': largest_loss_token,
  143. 'largest_win_pct': largest_win_pct,
  144. 'largest_loss_pct': largest_loss_pct,
  145. 'best_token': best_token_name,
  146. 'best_token_pnl': best_token_pnl_value,
  147. 'best_token_pct': best_token_pnl_pct,
  148. 'best_token_volume': best_token_volume,
  149. 'worst_token': worst_token_name,
  150. 'worst_token_pnl': worst_token_pnl_value,
  151. 'worst_token_pct': worst_token_pnl_pct,
  152. 'worst_token_volume': worst_token_volume,
  153. 'max_drawdown': max_drawdown,
  154. 'max_drawdown_pct': max_drawdown_pct,
  155. 'drawdown_start_date': drawdown_start_date,
  156. 'open_positions': len(open_positions),
  157. 'best_roe_trade': best_roe_trade,
  158. 'worst_roe_trade': worst_roe_trade
  159. }
  160. except Exception as e:
  161. logger.error(f"Error getting performance stats: {e}")
  162. return {}
  163. def get_token_performance(self, limit: int = 20) -> List[Dict[str, Any]]:
  164. """Get performance stats by token, sorted by total P&L (dollar amount)."""
  165. formatter = get_formatter()
  166. # Get all token stats first, then sort by total P&L in Python
  167. token_stats = self.db._fetch_query(
  168. "SELECT * FROM token_stats",
  169. ()
  170. )
  171. for token in token_stats:
  172. total_cycles = token.get('total_completed_cycles', 0)
  173. winning_cycles = token.get('winning_cycles', 0)
  174. # Calculate win rate
  175. token['win_rate'] = (winning_cycles / total_cycles * 100) if total_cycles > 0 else 0
  176. # Calculate profit factor
  177. sum_winning = token.get('sum_of_winning_pnl', 0)
  178. sum_losing = token.get('sum_of_losing_pnl', 0)
  179. token['profit_factor'] = sum_winning / sum_losing if sum_losing > 0 else float('inf') if sum_winning > 0 else 0
  180. # Calculate ROE from realized PnL and entry volume
  181. total_pnl = token.get('total_realized_pnl', 0)
  182. entry_volume = token.get('completed_entry_volume', 0)
  183. token['roe_percentage'] = (total_pnl / entry_volume * 100) if entry_volume > 0 else 0.0
  184. # Format durations
  185. total_duration = token.get('total_duration_seconds', 0)
  186. avg_duration = total_duration / total_cycles if total_cycles > 0 else 0
  187. token['average_trade_duration_formatted'] = self._format_duration(avg_duration)
  188. # Token display name (use token as-is)
  189. token['display_name'] = token['token'].upper()
  190. # Sort by total P&L (highest to lowest), then by ROE as tiebreaker
  191. sorted_tokens = sorted(
  192. token_stats,
  193. key=lambda x: (x.get('total_realized_pnl', 0), x.get('roe_percentage', 0)),
  194. reverse=True
  195. )
  196. # Return top tokens (limit)
  197. return sorted_tokens[:limit]
  198. def get_balance_history(self, days: int = 30) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]:
  199. """Get balance history for the last N days with detailed statistics."""
  200. balance_history = self.db._fetch_query(
  201. "SELECT * FROM balance_history WHERE timestamp >= datetime('now', '-{} days') ORDER BY timestamp ASC".format(days)
  202. )
  203. if not balance_history:
  204. return [], {}
  205. # Calculate statistics
  206. balances = [item['balance'] for item in balance_history]
  207. peak_balance = max(balances)
  208. current_balance = balances[-1] if balances else 0
  209. # Calculate max drawdown
  210. running_max = 0
  211. max_drawdown = 0
  212. max_drawdown_percentage = 0
  213. for balance in balances:
  214. if balance > running_max:
  215. running_max = balance
  216. drawdown = running_max - balance
  217. drawdown_percentage = (drawdown / running_max * 100) if running_max > 0 else 0
  218. if drawdown > max_drawdown:
  219. max_drawdown = drawdown
  220. max_drawdown_percentage = drawdown_percentage
  221. # Calculate period return
  222. initial_balance_period = balances[0] if balances else 0
  223. period_pnl = current_balance - initial_balance_period
  224. period_return_percentage = (period_pnl / initial_balance_period * 100) if initial_balance_period > 0 else 0
  225. stats = {
  226. 'peak_balance': peak_balance,
  227. 'current_balance': current_balance,
  228. 'max_drawdown': max_drawdown,
  229. 'max_drawdown_percentage': max_drawdown_percentage,
  230. 'period_pnl': period_pnl,
  231. 'period_return_percentage': period_return_percentage,
  232. 'data_points': len(balance_history)
  233. }
  234. return balance_history, stats
  235. def get_live_max_drawdown(self) -> Tuple[float, float, Optional[str]]:
  236. """
  237. Get live max drawdown value (in USD), percentage, and the date of the last peak.
  238. """
  239. peak_balance = float(self.db._get_metadata('drawdown_peak_balance') or 0.0)
  240. max_drawdown_pct = float(self.db._get_metadata('drawdown_max_drawdown_pct') or 0.0)
  241. peak_date = self.db._get_metadata('drawdown_peak_date')
  242. # Calculate max drawdown value based on peak and percentage
  243. max_drawdown_value = peak_balance * (max_drawdown_pct / 100)
  244. return max_drawdown_value, max_drawdown_pct, peak_date
  245. def update_live_max_drawdown(self, current_balance: float) -> bool:
  246. """
  247. Update the live maximum drawdown based on the current balance.
  248. This should be called periodically (e.g., every minute) or after every trade.
  249. """
  250. if current_balance <= 0:
  251. return False
  252. peak_balance = float(self.db._get_metadata('drawdown_peak_balance') or '0.0')
  253. max_drawdown_percentage = float(self.db._get_metadata('drawdown_max_drawdown_pct') or '0.0')
  254. updated = False
  255. if current_balance > peak_balance:
  256. # New peak detected, reset drawdown tracking
  257. self.db._set_metadata('drawdown_peak_balance', str(current_balance))
  258. self.db._set_metadata('drawdown_peak_date', datetime.now(timezone.utc).isoformat())
  259. # Reset max drawdown percentage since we are at a new high
  260. if max_drawdown_percentage != 0:
  261. self.db._set_metadata('drawdown_max_drawdown_pct', '0.0')
  262. logger.info(f"New peak balance for drawdown tracking: ${current_balance:,.2f}")
  263. updated = True
  264. else:
  265. # Still in a drawdown, check if it's a new max
  266. drawdown = peak_balance - current_balance
  267. drawdown_percentage = (drawdown / peak_balance * 100) if peak_balance > 0 else 0
  268. if drawdown_percentage > max_drawdown_percentage:
  269. self.db._set_metadata('drawdown_max_drawdown_pct', str(drawdown_percentage))
  270. logger.info(f"New max drawdown detected: {drawdown_percentage:.2f}%")
  271. updated = True
  272. return updated
  273. def calculate_sharpe_ratio(self, days: int = 30) -> Optional[float]:
  274. """
  275. Calculate Sharpe ratio from balance history.
  276. """
  277. try:
  278. risk_free_rate = 0.0 # Assuming 0 for simplicity
  279. # Get balance history
  280. balance_history, _ = self.get_balance_history(days)
  281. if not balance_history or len(balance_history) < 2:
  282. return None
  283. # Calculate daily returns
  284. returns = []
  285. for i in range(1, len(balance_history)):
  286. prev_balance = balance_history[i-1]['balance']
  287. curr_balance = balance_history[i]['balance']
  288. if prev_balance > 0:
  289. daily_return = (curr_balance - prev_balance) / prev_balance
  290. returns.append(daily_return)
  291. if not returns or np.std(returns) == 0:
  292. return 0.0 # Or None if not enough data
  293. # Calculate annualized Sharpe Ratio
  294. avg_daily_return = np.mean(returns)
  295. std_dev_daily_return = np.std(returns)
  296. sharpe_ratio = (avg_daily_return - (risk_free_rate / 365)) / std_dev_daily_return
  297. annualized_sharpe_ratio = sharpe_ratio * np.sqrt(365) # Annualize
  298. return annualized_sharpe_ratio
  299. except Exception as e:
  300. logger.error(f"❌ Error calculating Sharpe ratio: {e}")
  301. return None
  302. def calculate_max_consecutive_losses(self) -> int:
  303. """Calculate the maximum number of consecutive losing trades."""
  304. # This now requires fetching from the token_stats table and is more complex
  305. # For simplicity, we assume this needs a direct query on a more granular `trades` table if it existed
  306. # This is a placeholder for a more complex implementation if needed.
  307. # As of now, we will get this from an aggregated value if we decide to store it.
  308. logger.warning("calculate_max_consecutive_losses is not fully implemented with the new schema.")
  309. return 0 # Placeholder
  310. def get_risk_metrics(self) -> Dict[str, Any]:
  311. """
  312. Get key risk metrics for the trading account.
  313. """
  314. # Get live drawdown stats
  315. max_drawdown_value, max_drawdown_percentage, drawdown_start_date = self.get_live_max_drawdown()
  316. # Get Sharpe ratio
  317. sharpe_ratio = self.calculate_sharpe_ratio(days=90) # Use 90 days for a more stable metric
  318. # Other metrics can be added here
  319. return {
  320. 'max_drawdown_value': max_drawdown_value,
  321. 'max_drawdown_percentage': max_drawdown_percentage,
  322. 'drawdown_start_date': drawdown_start_date,
  323. 'sharpe_ratio': sharpe_ratio,
  324. }
  325. def get_period_performance(self, start_date: str, end_date: str) -> Dict[str, Any]:
  326. """Get performance statistics for a specific date range."""
  327. try:
  328. # Get daily stats for the period
  329. daily_stats = self.db._fetch_query("""
  330. SELECT date, SUM(realized_pnl) as pnl, SUM(completed_cycles) as trades,
  331. SUM(exit_volume) as volume
  332. FROM daily_aggregated_stats
  333. WHERE date BETWEEN ? AND ?
  334. GROUP BY date
  335. ORDER BY date ASC
  336. """, (start_date, end_date))
  337. if not daily_stats:
  338. return {
  339. 'period_start': start_date,
  340. 'period_end': end_date,
  341. 'total_pnl': 0,
  342. 'total_trades': 0,
  343. 'total_volume': 0,
  344. 'win_rate': 0,
  345. 'trading_days': 0,
  346. 'average_daily_pnl': 0
  347. }
  348. total_pnl = sum(day.get('pnl', 0) or 0 for day in daily_stats)
  349. total_trades = sum(day.get('trades', 0) or 0 for day in daily_stats)
  350. total_volume = sum(day.get('volume', 0) or 0 for day in daily_stats)
  351. trading_days = len([day for day in daily_stats if (day.get('trades', 0) or 0) > 0])
  352. average_daily_pnl = total_pnl / trading_days if trading_days > 0 else 0
  353. return {
  354. 'period_start': start_date,
  355. 'period_end': end_date,
  356. 'total_pnl': total_pnl,
  357. 'total_trades': total_trades,
  358. 'total_volume': total_volume,
  359. 'trading_days': trading_days,
  360. 'average_daily_pnl': average_daily_pnl,
  361. 'daily_stats': daily_stats
  362. }
  363. except Exception as e:
  364. logger.error(f"❌ Error calculating period performance: {e}")
  365. return {}
  366. def get_recent_performance_trend(self, days: int = 7) -> Dict[str, Any]:
  367. """Get recent performance trend analysis."""
  368. try:
  369. end_date = datetime.now(timezone.utc).date()
  370. start_date = end_date - timedelta(days=days)
  371. period_stats = self.get_period_performance(
  372. start_date.strftime('%Y-%m-%d'),
  373. end_date.strftime('%Y-%m-%d')
  374. )
  375. # Calculate trend direction
  376. daily_pnls = [day.get('pnl', 0) or 0 for day in period_stats.get('daily_stats', [])]
  377. if len(daily_pnls) >= 2:
  378. # Simple linear trend
  379. x = list(range(len(daily_pnls)))
  380. slope = np.polyfit(x, daily_pnls, 1)[0] if len(daily_pnls) > 1 else 0
  381. trend_direction = 'up' if slope > 0 else 'down' if slope < 0 else 'flat'
  382. else:
  383. trend_direction = 'insufficient_data'
  384. slope = 0
  385. return {
  386. 'days': days,
  387. 'trend_direction': trend_direction,
  388. 'slope': slope,
  389. **period_stats
  390. }
  391. except Exception as e:
  392. logger.error(f"❌ Error calculating recent performance trend: {e}")
  393. return {'days': days, 'trend_direction': 'error'}