aggregation_manager.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344
  1. #!/usr/bin/env python3
  2. """
  3. Aggregation Manager for Trading Statistics
  4. Handles data aggregation, migration from individual trades to aggregated statistics,
  5. and balance adjustment tracking.
  6. """
  7. import sqlite3
  8. import logging
  9. from datetime import datetime, timezone, timedelta
  10. from typing import Dict, List, Any, Optional
  11. import uuid
  12. from src.utils.token_display_formatter import get_formatter
  13. logger = logging.getLogger(__name__)
  14. class AggregationManager:
  15. """Manages data aggregation and migration in the trading statistics database."""
  16. def __init__(self, db_manager):
  17. """Initialize with database manager."""
  18. self.db = db_manager
  19. def migrate_trade_to_aggregated_stats(self, trade_lifecycle_id: str):
  20. """Migrate a completed/cancelled trade's stats to aggregate tables and delete the original trade."""
  21. trade_data = self.db._fetchone_query("SELECT * FROM trades WHERE trade_lifecycle_id = ?", (trade_lifecycle_id,))
  22. if not trade_data:
  23. logger.error(f"Cannot migrate trade {trade_lifecycle_id}: Not found.")
  24. return
  25. status = trade_data.get('status')
  26. symbol = trade_data.get('symbol')
  27. token = symbol.split('/')[0] if symbol and '/' in symbol else symbol
  28. if not token:
  29. logger.error(f"Cannot migrate trade {trade_lifecycle_id}: Token could not be derived from symbol '{symbol}'.")
  30. return
  31. now_iso = datetime.now(timezone.utc).isoformat()
  32. try:
  33. with self.db.conn:
  34. if status == 'position_closed':
  35. self._migrate_closed_position(trade_data, token, now_iso)
  36. elif status == 'cancelled':
  37. self._migrate_cancelled_position(trade_data, token, now_iso)
  38. # Delete the original trade from the 'trades' table
  39. self.db._execute_query("DELETE FROM trades WHERE trade_lifecycle_id = ?", (trade_lifecycle_id,))
  40. logger.info(f"Deleted trade lifecycle {trade_lifecycle_id} from trades table after aggregation.")
  41. except sqlite3.Error as e:
  42. logger.error(f"Database error migrating trade {trade_lifecycle_id} to aggregate stats: {e}", exc_info=True)
  43. except Exception as e:
  44. logger.error(f"Unexpected error migrating trade {trade_lifecycle_id} to aggregate stats: {e}", exc_info=True)
  45. def _migrate_closed_position(self, trade_data: Dict[str, Any], token: str, now_iso: str):
  46. """Migrate a closed position to aggregated stats."""
  47. realized_pnl = trade_data.get('realized_pnl', 0.0)
  48. entry_value = trade_data.get('value', 0.0)
  49. exit_value = entry_value + realized_pnl
  50. closed_at_str = trade_data.get('position_closed_at', now_iso)
  51. closed_at_dt = datetime.fromisoformat(closed_at_str)
  52. date_str = closed_at_dt.strftime('%Y-%m-%d')
  53. # Calculate duration if timestamps are available
  54. opened_at_str = trade_data.get('position_opened_at')
  55. duration_seconds = 0
  56. if opened_at_str and closed_at_str:
  57. try:
  58. opened_at_dt = datetime.fromisoformat(opened_at_str)
  59. duration_seconds = (closed_at_dt - opened_at_dt).total_seconds()
  60. except Exception:
  61. duration_seconds = 0
  62. # Calculate ROE percentage
  63. roe_percentage = (realized_pnl / entry_value * 100) if entry_value > 0 else 0.0
  64. # Update token_stats
  65. token_upsert_query = """
  66. INSERT INTO token_stats (
  67. token, total_realized_pnl, total_completed_cycles, winning_cycles, losing_cycles,
  68. total_entry_volume, total_exit_volume, sum_of_winning_pnl, sum_of_losing_pnl,
  69. largest_winning_cycle_pnl, largest_losing_cycle_pnl, largest_winning_cycle_entry_volume, largest_losing_cycle_entry_volume,
  70. first_cycle_closed_at, last_cycle_closed_at, total_duration_seconds, roe_percentage, updated_at
  71. ) VALUES (?, ?, 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
  72. ON CONFLICT(token) DO UPDATE SET
  73. total_realized_pnl = total_realized_pnl + excluded.total_realized_pnl,
  74. total_completed_cycles = total_completed_cycles + 1,
  75. winning_cycles = winning_cycles + excluded.winning_cycles,
  76. losing_cycles = losing_cycles + excluded.losing_cycles,
  77. total_entry_volume = total_entry_volume + excluded.total_entry_volume,
  78. total_exit_volume = total_exit_volume + excluded.total_exit_volume,
  79. sum_of_winning_pnl = sum_of_winning_pnl + excluded.sum_of_winning_pnl,
  80. sum_of_losing_pnl = sum_of_losing_pnl + excluded.sum_of_losing_pnl,
  81. largest_winning_cycle_pnl =
  82. CASE WHEN excluded.largest_winning_cycle_pnl > largest_winning_cycle_pnl
  83. THEN excluded.largest_winning_cycle_pnl
  84. ELSE largest_winning_cycle_pnl END,
  85. largest_losing_cycle_pnl =
  86. CASE WHEN excluded.largest_losing_cycle_pnl > largest_losing_cycle_pnl
  87. THEN excluded.largest_losing_cycle_pnl
  88. ELSE largest_losing_cycle_pnl END,
  89. largest_winning_cycle_entry_volume =
  90. CASE WHEN excluded.largest_winning_cycle_pnl > largest_winning_cycle_pnl
  91. THEN excluded.largest_winning_cycle_entry_volume
  92. ELSE largest_winning_cycle_entry_volume END,
  93. largest_losing_cycle_entry_volume =
  94. CASE WHEN excluded.largest_losing_cycle_pnl > largest_losing_cycle_pnl
  95. THEN excluded.largest_losing_cycle_entry_volume
  96. ELSE largest_losing_cycle_entry_volume END,
  97. first_cycle_closed_at = MIN(first_cycle_closed_at, excluded.first_cycle_closed_at),
  98. last_cycle_closed_at = MAX(last_cycle_closed_at, excluded.last_cycle_closed_at),
  99. total_duration_seconds = total_duration_seconds + excluded.total_duration_seconds,
  100. roe_percentage = excluded.roe_percentage,
  101. updated_at = excluded.updated_at
  102. """
  103. is_win = 1 if realized_pnl > 0 else 0
  104. is_loss = 1 if realized_pnl < 0 else 0
  105. win_pnl_contrib = realized_pnl if realized_pnl > 0 else 0.0
  106. loss_pnl_contrib = abs(realized_pnl) if realized_pnl < 0 else 0.0
  107. # For largest winning/losing, we only consider them if this is the new largest
  108. largest_win_entry_volume = entry_value if realized_pnl > 0 else 0.0
  109. largest_loss_entry_volume = entry_value if realized_pnl < 0 else 0.0
  110. self.db._execute_query(token_upsert_query, (
  111. token, realized_pnl, is_win, is_loss, entry_value, exit_value,
  112. win_pnl_contrib, loss_pnl_contrib, win_pnl_contrib, loss_pnl_contrib,
  113. largest_win_entry_volume, largest_loss_entry_volume,
  114. closed_at_str, closed_at_str, duration_seconds, roe_percentage, now_iso
  115. ))
  116. # Update daily_aggregated_stats
  117. daily_upsert_query = """
  118. INSERT INTO daily_aggregated_stats (
  119. date, token, realized_pnl, completed_cycles, entry_volume, exit_volume
  120. ) VALUES (?, ?, ?, 1, ?, ?)
  121. ON CONFLICT(date, token) DO UPDATE SET
  122. realized_pnl = realized_pnl + excluded.realized_pnl,
  123. completed_cycles = completed_cycles + 1,
  124. entry_volume = entry_volume + excluded.entry_volume,
  125. exit_volume = exit_volume + excluded.exit_volume
  126. """
  127. self.db._execute_query(daily_upsert_query, (
  128. date_str, token, realized_pnl, entry_value, exit_value
  129. ))
  130. logger.info(f"Aggregated stats for closed trade lifecycle ({token}). PNL: {realized_pnl:.2f}")
  131. def _migrate_cancelled_position(self, trade_data: Dict[str, Any], token: str, now_iso: str):
  132. """Migrate a cancelled position to aggregated stats."""
  133. # Update token_stats for cancelled count
  134. cancelled_upsert_query = """
  135. INSERT INTO token_stats (token, total_cancelled_cycles, updated_at)
  136. VALUES (?, 1, ?)
  137. ON CONFLICT(token) DO UPDATE SET
  138. total_cancelled_cycles = total_cancelled_cycles + 1,
  139. updated_at = excluded.updated_at
  140. """
  141. self.db._execute_query(cancelled_upsert_query, (token, now_iso))
  142. logger.info(f"Incremented cancelled_cycles for {token}.")
  143. def record_deposit(self, amount: float, timestamp: Optional[str] = None,
  144. deposit_id: Optional[str] = None, description: Optional[str] = None):
  145. """Record a deposit."""
  146. ts = timestamp if timestamp else datetime.now(timezone.utc).isoformat()
  147. formatter = get_formatter()
  148. formatted_amount_str = formatter.format_price_with_symbol(amount)
  149. desc = description if description else f'Deposit of {formatted_amount_str}'
  150. self.db._execute_query(
  151. "INSERT INTO balance_adjustments (adjustment_id, timestamp, type, amount, description) VALUES (?, ?, ?, ?, ?)",
  152. (deposit_id or str(uuid.uuid4()), ts, 'deposit', amount, desc)
  153. )
  154. # Adjust initial_balance in metadata to reflect capital changes
  155. current_initial = float(self.db._get_metadata('initial_balance') or '0.0')
  156. self.db._set_metadata('initial_balance', str(current_initial + amount))
  157. logger.info(f"💰 Recorded deposit: {formatted_amount_str}. New effective initial balance: {formatter.format_price_with_symbol(current_initial + amount)}")
  158. def record_withdrawal(self, amount: float, timestamp: Optional[str] = None,
  159. withdrawal_id: Optional[str] = None, description: Optional[str] = None):
  160. """Record a withdrawal."""
  161. ts = timestamp if timestamp else datetime.now(timezone.utc).isoformat()
  162. formatter = get_formatter()
  163. formatted_amount_str = formatter.format_price_with_symbol(amount)
  164. desc = description if description else f'Withdrawal of {formatted_amount_str}'
  165. self.db._execute_query(
  166. "INSERT INTO balance_adjustments (adjustment_id, timestamp, type, amount, description) VALUES (?, ?, ?, ?, ?)",
  167. (withdrawal_id or str(uuid.uuid4()), ts, 'withdrawal', amount, desc)
  168. )
  169. current_initial = float(self.db._get_metadata('initial_balance') or '0.0')
  170. self.db._set_metadata('initial_balance', str(current_initial - amount))
  171. logger.info(f"💸 Recorded withdrawal: {formatted_amount_str}. New effective initial balance: {formatter.format_price_with_symbol(current_initial - amount)}")
  172. def get_balance_adjustments_summary(self) -> Dict[str, Any]:
  173. """Get summary of all balance adjustments from DB."""
  174. adjustments = self.db._fetch_query("SELECT type, amount, timestamp FROM balance_adjustments ORDER BY timestamp ASC")
  175. if not adjustments:
  176. return {'total_deposits': 0.0, 'total_withdrawals': 0.0, 'net_adjustment': 0.0,
  177. 'adjustment_count': 0, 'last_adjustment': None}
  178. total_deposits = sum(adj['amount'] for adj in adjustments if adj['type'] == 'deposit')
  179. total_withdrawals = sum(adj['amount'] for adj in adjustments if adj['type'] == 'withdrawal')
  180. net_adjustment = total_deposits - total_withdrawals
  181. return {
  182. 'total_deposits': total_deposits, 'total_withdrawals': total_withdrawals,
  183. 'net_adjustment': net_adjustment, 'adjustment_count': len(adjustments),
  184. 'last_adjustment': adjustments[-1]['timestamp'] if adjustments else None
  185. }
  186. def get_daily_stats(self, limit: int = 10) -> List[Dict[str, Any]]:
  187. """Get daily performance stats for the last N days from daily_aggregated_stats."""
  188. daily_stats_list = []
  189. today_utc = datetime.now(timezone.utc).date()
  190. for i in range(limit):
  191. target_date = today_utc - timedelta(days=i)
  192. date_str = target_date.strftime('%Y-%m-%d')
  193. date_formatted = target_date.strftime('%m/%d')
  194. day_aggregated_data = self.db._fetch_query(
  195. "SELECT SUM(realized_pnl) as pnl, SUM(completed_cycles) as trades, SUM(exit_volume) as volume FROM daily_aggregated_stats WHERE date = ?",
  196. (date_str,)
  197. )
  198. stats_for_day = None
  199. if day_aggregated_data and len(day_aggregated_data) > 0 and day_aggregated_data[0]['trades'] is not None:
  200. stats_for_day = day_aggregated_data[0]
  201. pnl = stats_for_day.get('pnl', 0.0) or 0.0
  202. volume = stats_for_day.get('volume', 0.0) or 0.0
  203. stats_for_day['pnl_pct'] = (pnl / volume * 100) if volume > 0 else 0.0
  204. stats_for_day['trades'] = int(stats_for_day.get('trades', 0) or 0)
  205. if stats_for_day and stats_for_day['trades'] > 0:
  206. daily_stats_list.append({
  207. 'date': date_str, 'date_formatted': date_formatted, 'has_trades': True,
  208. **stats_for_day
  209. })
  210. else:
  211. daily_stats_list.append({
  212. 'date': date_str, 'date_formatted': date_formatted, 'has_trades': False,
  213. 'trades': 0, 'pnl': 0.0, 'volume': 0.0, 'pnl_pct': 0.0
  214. })
  215. return daily_stats_list
  216. def get_weekly_stats(self, limit: int = 10) -> List[Dict[str, Any]]:
  217. """Get weekly performance stats for the last N weeks by aggregating daily_aggregated_stats."""
  218. weekly_stats_list = []
  219. today_utc = datetime.now(timezone.utc).date()
  220. for i in range(limit):
  221. target_monday = today_utc - timedelta(days=today_utc.weekday() + (i * 7))
  222. target_sunday = target_monday + timedelta(days=6)
  223. week_key_display = f"{target_monday.strftime('%Y-W%W')}"
  224. week_formatted_display = f"{target_monday.strftime('%m/%d')}-{target_sunday.strftime('%m/%d/%y')}"
  225. daily_records_for_week = self.db._fetch_query(
  226. "SELECT date, realized_pnl, completed_cycles, exit_volume FROM daily_aggregated_stats WHERE date BETWEEN ? AND ?",
  227. (target_monday.strftime('%Y-%m-%d'), target_sunday.strftime('%Y-%m-%d'))
  228. )
  229. if daily_records_for_week:
  230. total_pnl_week = sum(d.get('realized_pnl', 0.0) or 0.0 for d in daily_records_for_week)
  231. total_trades_week = sum(d.get('completed_cycles', 0) or 0 for d in daily_records_for_week)
  232. total_volume_week = sum(d.get('exit_volume', 0.0) or 0.0 for d in daily_records_for_week)
  233. pnl_pct_week = (total_pnl_week / total_volume_week * 100) if total_volume_week > 0 else 0.0
  234. if total_trades_week > 0:
  235. weekly_stats_list.append({
  236. 'week': week_key_display,
  237. 'week_formatted': week_formatted_display,
  238. 'has_trades': True,
  239. 'pnl': total_pnl_week,
  240. 'trades': total_trades_week,
  241. 'volume': total_volume_week,
  242. 'pnl_pct': pnl_pct_week
  243. })
  244. else:
  245. weekly_stats_list.append({
  246. 'week': week_key_display, 'week_formatted': week_formatted_display, 'has_trades': False,
  247. 'trades': 0, 'pnl': 0.0, 'volume': 0.0, 'pnl_pct': 0.0
  248. })
  249. else:
  250. weekly_stats_list.append({
  251. 'week': week_key_display, 'week_formatted': week_formatted_display, 'has_trades': False,
  252. 'trades': 0, 'pnl': 0.0, 'volume': 0.0, 'pnl_pct': 0.0
  253. })
  254. return weekly_stats_list
  255. def get_monthly_stats(self, limit: int = 10) -> List[Dict[str, Any]]:
  256. """Get monthly performance stats for the last N months by aggregating daily_aggregated_stats."""
  257. monthly_stats_list = []
  258. current_month_start_utc = datetime.now(timezone.utc).date().replace(day=1)
  259. for i in range(limit):
  260. year = current_month_start_utc.year
  261. month = current_month_start_utc.month - i
  262. while month <= 0:
  263. month += 12
  264. year -= 1
  265. target_month_start_date = datetime(year, month, 1, tzinfo=timezone.utc).date()
  266. next_month_start_date = datetime(year + (month // 12), (month % 12) + 1, 1, tzinfo=timezone.utc).date() if month < 12 else datetime(year + 1, 1, 1, tzinfo=timezone.utc).date()
  267. target_month_end_date = next_month_start_date - timedelta(days=1)
  268. month_key_display = target_month_start_date.strftime('%Y-%m')
  269. month_formatted_display = target_month_start_date.strftime('%b %Y')
  270. daily_records_for_month = self.db._fetch_query(
  271. "SELECT date, realized_pnl, completed_cycles, exit_volume FROM daily_aggregated_stats WHERE date BETWEEN ? AND ?",
  272. (target_month_start_date.strftime('%Y-%m-%d'), target_month_end_date.strftime('%Y-%m-%d'))
  273. )
  274. if daily_records_for_month:
  275. total_pnl_month = sum(d.get('realized_pnl', 0.0) or 0.0 for d in daily_records_for_month)
  276. total_trades_month = sum(d.get('completed_cycles', 0) or 0 for d in daily_records_for_month)
  277. total_volume_month = sum(d.get('exit_volume', 0.0) or 0.0 for d in daily_records_for_month)
  278. pnl_pct_month = (total_pnl_month / total_volume_month * 100) if total_volume_month > 0 else 0.0
  279. if total_trades_month > 0:
  280. monthly_stats_list.append({
  281. 'month': month_key_display,
  282. 'month_formatted': month_formatted_display,
  283. 'has_trades': True,
  284. 'pnl': total_pnl_month,
  285. 'trades': total_trades_month,
  286. 'volume': total_volume_month,
  287. 'pnl_pct': pnl_pct_month
  288. })
  289. else:
  290. monthly_stats_list.append({
  291. 'month': month_key_display, 'month_formatted': month_formatted_display, 'has_trades': False,
  292. 'trades': 0, 'pnl': 0.0, 'volume': 0.0, 'pnl_pct': 0.0
  293. })
  294. else:
  295. monthly_stats_list.append({
  296. 'month': month_key_display, 'month_formatted': month_formatted_display, 'has_trades': False,
  297. 'trades': 0, 'pnl': 0.0, 'volume': 0.0, 'pnl_pct': 0.0
  298. })
  299. return monthly_stats_list