|
@@ -59,16 +59,15 @@ class AggregationManager:
|
|
|
logger.error(f"Unexpected error migrating trade {trade_lifecycle_id} to aggregate stats: {e}", exc_info=True)
|
|
|
|
|
|
def _migrate_closed_position(self, trade_data: Dict[str, Any], token: str, now_iso: str) -> None:
|
|
|
- """Migrate a closed position to aggregated stats."""
|
|
|
+ """Migrate a closed position to aggregated stats using a safer read-then-write approach."""
|
|
|
try:
|
|
|
- # Extract trade data
|
|
|
+ # Extract new trade data
|
|
|
realized_pnl = trade_data.get('realized_pnl', 0.0)
|
|
|
entry_value = trade_data.get('value', 0.0)
|
|
|
- exit_value = entry_value + realized_pnl # Calculate exit value from entry + P&L
|
|
|
- is_win = realized_pnl > 0
|
|
|
- is_loss = realized_pnl < 0
|
|
|
+ exit_value = entry_value + realized_pnl
|
|
|
+ is_win = 1 if realized_pnl > 0 else 0
|
|
|
+ is_loss = 1 if realized_pnl < 0 else 0
|
|
|
|
|
|
- # Get timestamps
|
|
|
opened_at = trade_data.get('position_opened_at')
|
|
|
closed_at = trade_data.get('position_closed_at')
|
|
|
|
|
@@ -76,91 +75,74 @@ class AggregationManager:
|
|
|
logger.warning(f"Missing timestamps for trade {trade_data.get('trade_lifecycle_id')}")
|
|
|
return
|
|
|
|
|
|
- # Calculate duration
|
|
|
- opened_dt = datetime.fromisoformat(opened_at)
|
|
|
- closed_dt = datetime.fromisoformat(closed_at)
|
|
|
- duration_seconds = int((closed_dt - opened_dt).total_seconds())
|
|
|
-
|
|
|
- # Calculate ROE percentage
|
|
|
- roe_percentage = 0.0
|
|
|
- if entry_value > 0:
|
|
|
- roe_percentage = (realized_pnl / entry_value) * 100
|
|
|
-
|
|
|
- # Format timestamps for SQL
|
|
|
- opened_at_str = opened_at
|
|
|
- closed_at_str = closed_at
|
|
|
+ duration_seconds = int((datetime.fromisoformat(closed_at) - datetime.fromisoformat(opened_at)).total_seconds())
|
|
|
+
|
|
|
+ # 1. Fetch existing token stats
|
|
|
+ existing_stats = self.db._fetchone_query("SELECT * FROM token_stats WHERE token = ?", (token,))
|
|
|
+
|
|
|
+ if not existing_stats:
|
|
|
+ # Initialize stats for a new token
|
|
|
+ stats = {
|
|
|
+ 'token': token,
|
|
|
+ 'total_realized_pnl': 0.0, 'total_completed_cycles': 0, 'winning_cycles': 0,
|
|
|
+ 'losing_cycles': 0, 'total_entry_volume': 0.0, 'total_exit_volume': 0.0,
|
|
|
+ 'sum_of_winning_pnl': 0.0, 'sum_of_losing_pnl': 0.0,
|
|
|
+ 'largest_winning_cycle_pnl': 0.0, 'largest_losing_cycle_pnl': 0.0,
|
|
|
+ 'first_cycle_closed_at': closed_at, 'last_cycle_closed_at': closed_at,
|
|
|
+ 'total_duration_seconds': 0,
|
|
|
+ 'largest_winning_cycle_entry_volume': 0.0,
|
|
|
+ 'largest_losing_cycle_entry_volume': 0.0,
|
|
|
+ 'total_cancelled_cycles': 0 # Ensure this is initialized
|
|
|
+ }
|
|
|
+ else:
|
|
|
+ stats = dict(existing_stats)
|
|
|
+
|
|
|
+ # 2. Update stats in Python
|
|
|
+ stats['total_realized_pnl'] += realized_pnl
|
|
|
+ stats['total_completed_cycles'] += 1
|
|
|
+ stats['winning_cycles'] += is_win
|
|
|
+ stats['losing_cycles'] += is_loss
|
|
|
+ stats['total_entry_volume'] += entry_value
|
|
|
+ stats['total_exit_volume'] += exit_value
|
|
|
+ stats['sum_of_winning_pnl'] += realized_pnl if is_win else 0
|
|
|
+ stats['sum_of_losing_pnl'] += realized_pnl if is_loss else 0
|
|
|
+ stats['total_duration_seconds'] += duration_seconds
|
|
|
+ stats['last_cycle_closed_at'] = now_iso
|
|
|
+
|
|
|
+ if realized_pnl > stats['largest_winning_cycle_pnl']:
|
|
|
+ stats['largest_winning_cycle_pnl'] = realized_pnl
|
|
|
+ stats['largest_winning_cycle_entry_volume'] = entry_value
|
|
|
|
|
|
- # Upsert query for token_stats
|
|
|
- token_upsert_query = """
|
|
|
- INSERT INTO token_stats (
|
|
|
+ # For losses, a more negative number is "larger"
|
|
|
+ if realized_pnl < stats['largest_losing_cycle_pnl'] or stats['largest_losing_cycle_pnl'] == 0:
|
|
|
+ stats['largest_losing_cycle_pnl'] = realized_pnl
|
|
|
+ stats['largest_losing_cycle_entry_volume'] = entry_value
|
|
|
+
|
|
|
+ # 3. Write updated stats back to the database
|
|
|
+ upsert_query = """
|
|
|
+ INSERT OR REPLACE INTO token_stats (
|
|
|
token, total_realized_pnl, total_completed_cycles,
|
|
|
winning_cycles, losing_cycles, total_entry_volume,
|
|
|
total_exit_volume, sum_of_winning_pnl, sum_of_losing_pnl,
|
|
|
largest_winning_cycle_pnl, largest_losing_cycle_pnl,
|
|
|
first_cycle_closed_at, last_cycle_closed_at,
|
|
|
total_duration_seconds, largest_winning_cycle_entry_volume,
|
|
|
- largest_losing_cycle_entry_volume, roe_percentage, updated_at
|
|
|
- ) VALUES (?, ?, 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
|
- ON CONFLICT(token) DO UPDATE SET
|
|
|
- total_realized_pnl = total_realized_pnl + ?,
|
|
|
- total_completed_cycles = total_completed_cycles + 1,
|
|
|
- winning_cycles = winning_cycles + ?,
|
|
|
- losing_cycles = losing_cycles + ?,
|
|
|
- total_entry_volume = total_entry_volume + ?,
|
|
|
- total_exit_volume = total_exit_volume + ?,
|
|
|
- sum_of_winning_pnl = sum_of_winning_pnl + ?,
|
|
|
- sum_of_losing_pnl = sum_of_losing_pnl + ?,
|
|
|
- largest_winning_cycle_pnl = CASE
|
|
|
- WHEN ? > largest_winning_cycle_pnl THEN ?
|
|
|
- ELSE largest_winning_cycle_pnl
|
|
|
- END,
|
|
|
- largest_losing_cycle_pnl = CASE
|
|
|
- WHEN ? < largest_losing_cycle_pnl OR largest_losing_cycle_pnl = 0 THEN ?
|
|
|
- ELSE largest_losing_cycle_pnl
|
|
|
- END,
|
|
|
- first_cycle_closed_at = CASE
|
|
|
- WHEN first_cycle_closed_at IS NULL OR ? < first_cycle_closed_at
|
|
|
- THEN ? ELSE first_cycle_closed_at
|
|
|
- END,
|
|
|
- last_cycle_closed_at = CASE
|
|
|
- WHEN last_cycle_closed_at IS NULL OR ? > last_cycle_closed_at
|
|
|
- THEN ? ELSE last_cycle_closed_at
|
|
|
- END,
|
|
|
- total_duration_seconds = total_duration_seconds + ?,
|
|
|
- largest_winning_cycle_entry_volume = CASE
|
|
|
- WHEN ? > largest_winning_cycle_pnl THEN ?
|
|
|
- ELSE largest_winning_cycle_entry_volume
|
|
|
- END,
|
|
|
- largest_losing_cycle_entry_volume = CASE
|
|
|
- WHEN ? < largest_losing_cycle_pnl THEN ?
|
|
|
- ELSE largest_losing_cycle_entry_volume
|
|
|
- END,
|
|
|
- roe_percentage = CASE
|
|
|
- WHEN ? > roe_percentage THEN ?
|
|
|
- ELSE roe_percentage
|
|
|
- END,
|
|
|
- updated_at = ?"""
|
|
|
-
|
|
|
- # Execute the upsert
|
|
|
- self.db._execute_query(token_upsert_query, (
|
|
|
- token, realized_pnl, is_win, is_loss, entry_value, exit_value,
|
|
|
- realized_pnl if is_win else 0, realized_pnl if is_loss else 0,
|
|
|
- realized_pnl if is_win else 0, realized_pnl if is_loss else 0,
|
|
|
- closed_at_str, closed_at_str, duration_seconds, entry_value if is_win else 0,
|
|
|
- entry_value if is_loss else 0, roe_percentage, now_iso,
|
|
|
- # For the UPDATE part
|
|
|
- realized_pnl, is_win, is_loss, entry_value, exit_value,
|
|
|
- realized_pnl if is_win else 0, realized_pnl if is_loss else 0,
|
|
|
- realized_pnl if is_win else 0, realized_pnl if is_win else 0,
|
|
|
- realized_pnl if is_loss else 0, realized_pnl if is_loss else 0,
|
|
|
- closed_at_str, closed_at_str, closed_at_str, closed_at_str,
|
|
|
- duration_seconds, realized_pnl if is_win else 0, entry_value if is_win else 0,
|
|
|
- realized_pnl if is_loss else 0, entry_value if is_loss else 0,
|
|
|
- roe_percentage, roe_percentage, now_iso
|
|
|
+ largest_losing_cycle_entry_volume, total_cancelled_cycles, updated_at
|
|
|
+ ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
|
+ """
|
|
|
+ self.db._execute_query(upsert_query, (
|
|
|
+ stats['token'], stats['total_realized_pnl'], stats['total_completed_cycles'],
|
|
|
+ stats['winning_cycles'], stats['losing_cycles'], stats['total_entry_volume'],
|
|
|
+ stats['total_exit_volume'], stats['sum_of_winning_pnl'], stats['sum_of_losing_pnl'],
|
|
|
+ stats['largest_winning_cycle_pnl'], stats['largest_losing_cycle_pnl'],
|
|
|
+ stats['first_cycle_closed_at'], stats['last_cycle_closed_at'],
|
|
|
+ stats['total_duration_seconds'], stats['largest_winning_cycle_entry_volume'],
|
|
|
+ stats['largest_losing_cycle_entry_volume'], stats.get('total_cancelled_cycles', 0), now_iso
|
|
|
))
|
|
|
-
|
|
|
+ logger.info(f"Successfully aggregated closed trade for {token}. P&L: {realized_pnl}")
|
|
|
+
|
|
|
except Exception as e:
|
|
|
- logger.error(f"Error migrating closed position to aggregated stats: {e}")
|
|
|
+ logger.error(f"Error migrating closed position to aggregated stats for token {token}: {e}", exc_info=True)
|
|
|
raise
|
|
|
|
|
|
def _migrate_cancelled_position(self, trade_data: Dict[str, Any], token: str, now_iso: str):
|