#!/usr/bin/env python3
"""
Trading Statistics Tracker (Refactored Version)
Main class that coordinates between specialized manager components.
"""
import logging
from datetime import datetime, timezone
from typing import Dict, List, Any, Optional, Tuple, Union
import math
import numpy as np
import uuid
from .database_manager import DatabaseManager
from .order_manager import OrderManager
from .trade_lifecycle_manager import TradeLifecycleManager
from .aggregation_manager import AggregationManager
from .performance_calculator import PerformanceCalculator
from src.utils.token_display_formatter import get_formatter
logger = logging.getLogger(__name__)
def _normalize_token_case(token: str) -> str:
"""Normalize token case for consistency."""
if any(c.isupper() for c in token):
return token # Keep original case for mixed-case tokens
else:
return token.upper() # Convert to uppercase for all-lowercase
class TradingStats:
"""Refactored trading statistics tracker using modular components."""
def __init__(self, db_path: str = "data/trading_stats.sqlite"):
"""Initialize with all manager components."""
# Initialize core database manager
self.db_manager = DatabaseManager(db_path)
# Initialize specialized managers
self.order_manager = OrderManager(self.db_manager)
self.trade_manager = TradeLifecycleManager(self.db_manager)
self.aggregation_manager = AggregationManager(self.db_manager)
self.performance_calculator = PerformanceCalculator(self.db_manager)
logger.info("🚀 TradingStats initialized with modular components")
def close(self):
"""Close database connection."""
self.db_manager.close()
# =============================================================================
# COMPATIBILITY METHODS - Direct exposure of internal methods
# =============================================================================
def _get_metadata(self, key: str) -> Optional[str]:
"""Get metadata from database."""
return self.db_manager._get_metadata(key)
def _set_metadata(self, key: str, value: str):
"""Set metadata in database."""
return self.db_manager._set_metadata(key, value)
# =============================================================================
# DATABASE MANAGEMENT DELEGATION
# =============================================================================
async def set_initial_balance(self, balance: float):
"""Set initial balance."""
return await self.db_manager.set_initial_balance(balance)
def get_initial_balance(self) -> float:
"""Get initial balance."""
return self.db_manager.get_initial_balance()
async def record_balance_snapshot(self, balance: float, unrealized_pnl: float = 0.0,
timestamp: Optional[str] = None, notes: Optional[str] = None):
"""Record balance snapshot."""
return await self.db_manager.record_balance_snapshot(balance, unrealized_pnl, timestamp, notes)
def purge_old_balance_history(self, days_to_keep: int = 30) -> int:
"""Purge old balance history."""
return self.db_manager.purge_old_balance_history(days_to_keep)
def get_balance_history_record_count(self) -> int:
"""Get balance history record count."""
return self.db_manager.get_balance_history_record_count()
def purge_old_daily_aggregated_stats(self, days_to_keep: int = 365) -> int:
"""Purge old daily aggregated stats."""
return self.db_manager.purge_old_daily_aggregated_stats(days_to_keep)
# =============================================================================
# ORDER MANAGEMENT DELEGATION
# =============================================================================
def record_order_placed(self, symbol: str, side: str, order_type: str,
amount_requested: float, price: Optional[float] = None,
bot_order_ref_id: Optional[str] = None,
exchange_order_id: Optional[str] = None,
timestamp: Optional[str] = None,
status: str = 'open') -> bool:
"""Record order placement."""
result = self.order_manager.record_order_placed(
symbol, side, order_type, amount_requested, price,
bot_order_ref_id, exchange_order_id, status
)
return result is not None
def update_order_exchange_id(self, bot_order_ref_id: str, exchange_order_id: str) -> bool:
"""Update order with exchange ID."""
return self.order_manager.update_order_exchange_id(bot_order_ref_id, exchange_order_id)
def record_order_filled(self, exchange_order_id: str, actual_amount: float,
actual_price: float, fees: float = 0.0,
timestamp: Optional[str] = None,
exchange_fill_id: Optional[str] = None) -> bool:
"""Record order fill."""
return self.order_manager.record_order_filled(
exchange_order_id, actual_amount, actual_price, fees, timestamp, exchange_fill_id
)
def record_order_cancelled(self, exchange_order_id: str, reason: str = "user_cancelled",
timestamp: Optional[str] = None) -> bool:
"""Record order cancellation."""
return self.order_manager.record_order_cancelled(exchange_order_id, reason, timestamp)
def update_order_status(self, order_db_id: Optional[int] = None, bot_order_ref_id: Optional[str] = None,
exchange_order_id: Optional[str] = None, new_status: Optional[str] = None,
amount_filled_increment: Optional[float] = None, set_exchange_order_id: Optional[str] = None,
notes: Optional[str] = None, timestamp: Optional[str] = None) -> bool:
"""Update order status - delegates to OrderManager with full parameter support."""
return self.order_manager.update_order_status(
order_db_id=order_db_id,
bot_order_ref_id=bot_order_ref_id,
exchange_order_id=exchange_order_id,
new_status=new_status,
amount_filled_increment=amount_filled_increment,
set_exchange_order_id=set_exchange_order_id
)
def get_order_by_exchange_id(self, exchange_order_id: str) -> Optional[Dict[str, Any]]:
"""Get order by exchange ID."""
return self.order_manager.get_order_by_exchange_id(exchange_order_id)
def get_order_by_bot_ref_id(self, bot_order_ref_id: str) -> Optional[Dict[str, Any]]:
"""Get order by bot reference ID."""
return self.order_manager.get_order_by_bot_ref_id(bot_order_ref_id)
def has_exchange_fill_been_processed(self, exchange_fill_id: str) -> bool:
"""Check if an exchange fill ID has already been processed."""
try:
# Check trades table which is the primary place where all processed fills are recorded
trade_exists = self.db_manager._fetch_query(
"SELECT 1 FROM trades WHERE exchange_fill_id = ? LIMIT 1",
(exchange_fill_id,)
)
return bool(trade_exists)
except Exception as e:
logger.error(f"Error checking if fill {exchange_fill_id} was processed: {e}")
return False
def get_orders_by_symbol(self, symbol: str, limit: int = 50) -> List[Dict[str, Any]]:
"""Get orders by symbol."""
return self.order_manager.get_orders_by_symbol(symbol, limit)
def get_orders_by_status(self, status: str, limit: Optional[int] = 50,
order_type_filter: Optional[str] = None,
parent_bot_order_ref_id: Optional[str] = None) -> List[Dict[str, Any]]:
"""Get orders by status with optional filters."""
# OrderManager expects (status, order_type_filter, parent_bot_order_ref_id) without limit
return self.order_manager.get_orders_by_status(status, order_type_filter, parent_bot_order_ref_id)
def get_recent_orders(self, limit: int = 20) -> List[Dict[str, Any]]:
"""Get recent orders."""
return self.order_manager.get_recent_orders(limit)
def cleanup_old_cancelled_orders(self, days_old: int = 7) -> int:
"""Clean up old cancelled orders."""
return self.order_manager.cleanup_old_cancelled_orders(days_old)
# =============================================================================
# TRADE LIFECYCLE DELEGATION
# =============================================================================
def create_trade_lifecycle(self, symbol: str, side: str, entry_order_id: Optional[str] = None,
entry_bot_order_ref_id: Optional[str] = None,
stop_loss_price: Optional[float] = None,
take_profit_price: Optional[float] = None,
trade_type: str = 'manual') -> Optional[str]:
"""Create trade lifecycle."""
return self.trade_manager.create_trade_lifecycle(
symbol, side, entry_order_id, entry_bot_order_ref_id,
stop_loss_price, take_profit_price, trade_type
)
async def update_trade_position_opened(self, lifecycle_id: str, entry_price: float,
entry_amount: float, exchange_fill_id: str) -> bool:
"""Update trade position opened."""
return await self.trade_manager.update_trade_position_opened(
lifecycle_id, entry_price, entry_amount, exchange_fill_id
)
async def update_trade_position_closed(self, lifecycle_id: str, exit_price: float,
realized_pnl: float, exchange_fill_id: str) -> bool:
"""Update trade position closed."""
return await self.trade_manager.update_trade_position_closed(
lifecycle_id, exit_price, realized_pnl, exchange_fill_id
)
def update_trade_cancelled(self, lifecycle_id: str, reason: str = "order_cancelled") -> bool:
"""Update trade cancelled."""
return self.trade_manager.update_trade_cancelled(lifecycle_id, reason)
async def link_stop_loss_to_trade(self, lifecycle_id: str, stop_loss_order_id: str,
stop_loss_price: float) -> bool:
"""Link stop loss to trade."""
return await self.trade_manager.link_stop_loss_to_trade(
lifecycle_id, stop_loss_order_id, stop_loss_price
)
async def link_take_profit_to_trade(self, lifecycle_id: str, take_profit_order_id: str,
take_profit_price: float) -> bool:
"""Link take profit to trade."""
return await self.trade_manager.link_take_profit_to_trade(
lifecycle_id, take_profit_order_id, take_profit_price
)
def get_trade_by_lifecycle_id(self, lifecycle_id: str) -> Optional[Dict[str, Any]]:
"""Get trade by lifecycle ID."""
return self.trade_manager.get_trade_by_lifecycle_id(lifecycle_id)
def get_trade_by_symbol_and_status(self, symbol: str, status: str = 'position_opened') -> Optional[Dict[str, Any]]:
"""Get trade by symbol and status."""
return self.trade_manager.get_trade_by_symbol_and_status(symbol, status)
def get_open_positions(self, symbol: Optional[str] = None) -> List[Dict[str, Any]]:
"""Get open positions."""
return self.trade_manager.get_open_positions(symbol)
def get_trades_by_status(self, status: str, limit: int = 50) -> List[Dict[str, Any]]:
"""Get trades by status."""
return self.trade_manager.get_trades_by_status(status, limit)
def get_lifecycle_by_entry_order_id(self, entry_exchange_order_id: str, status: Optional[str] = None) -> Optional[Dict[str, Any]]:
"""Get lifecycle by entry order ID."""
return self.trade_manager.get_lifecycle_by_entry_order_id(entry_exchange_order_id, status)
def get_lifecycle_by_sl_order_id(self, sl_exchange_order_id: str, status: str = 'position_opened') -> Optional[Dict[str, Any]]:
"""Get lifecycle by stop loss order ID."""
return self.trade_manager.get_lifecycle_by_sl_order_id(sl_exchange_order_id, status)
def get_lifecycle_by_tp_order_id(self, tp_exchange_order_id: str, status: str = 'position_opened') -> Optional[Dict[str, Any]]:
"""Get lifecycle by take profit order ID."""
return self.trade_manager.get_lifecycle_by_tp_order_id(tp_exchange_order_id, status)
def get_pending_stop_loss_activations(self) -> List[Dict[str, Any]]:
"""Get pending stop loss activations."""
return self.trade_manager.get_pending_stop_loss_activations()
def cleanup_old_cancelled_trades(self, days_old: int = 7) -> int:
"""Clean up old cancelled trades."""
return self.trade_manager.cleanup_old_cancelled_trades(days_old)
def confirm_position_with_exchange(self, symbol: str, exchange_position_size: float,
exchange_open_orders: List[Dict]) -> bool:
"""Confirm position with exchange."""
return self.trade_manager.confirm_position_with_exchange(
symbol, exchange_position_size, exchange_open_orders
)
def update_trade_market_data(self, trade_lifecycle_id: str, **kwargs) -> bool:
"""Update trade market data."""
return self.trade_manager.update_trade_market_data(trade_lifecycle_id, **kwargs)
def get_recent_trades(self, limit: int = 10) -> List[Dict[str, Any]]:
"""Get recent trades."""
return self.trade_manager.get_recent_trades(limit)
def get_all_trades(self) -> List[Dict[str, Any]]:
"""Get all trades."""
return self.trade_manager.get_all_trades()
def cancel_linked_orders(self, parent_bot_order_ref_id: str, new_status: str = 'cancelled_parent_filled') -> int:
"""Cancel linked SL/TP orders when a parent order is filled or cancelled."""
return self.trade_manager.cancel_linked_orders(parent_bot_order_ref_id, new_status)
# =============================================================================
# AGGREGATION MANAGEMENT DELEGATION
# =============================================================================
def migrate_trade_to_aggregated_stats(self, trade_lifecycle_id: str):
"""Migrate completed trade to aggregated stats."""
return self.aggregation_manager.migrate_trade_to_aggregated_stats(trade_lifecycle_id)
async def record_deposit(self, amount: float, timestamp: Optional[str] = None,
deposit_id: Optional[str] = None, description: Optional[str] = None):
"""Record a deposit."""
return await self.aggregation_manager.record_deposit(amount, timestamp, deposit_id, description)
async def record_withdrawal(self, amount: float, timestamp: Optional[str] = None,
withdrawal_id: Optional[str] = None, description: Optional[str] = None):
"""Record a withdrawal."""
return await self.aggregation_manager.record_withdrawal(amount, timestamp, withdrawal_id, description)
def get_balance_adjustments_summary(self) -> Dict[str, Any]:
"""Get summary of balance adjustments."""
return self.aggregation_manager.get_balance_adjustments_summary()
def get_daily_stats(self, limit: int = 10) -> List[Dict[str, Any]]:
"""Get daily stats."""
return self.aggregation_manager.get_daily_stats(limit)
def get_weekly_stats(self, limit: int = 10) -> List[Dict[str, Any]]:
"""Get weekly stats."""
return self.aggregation_manager.get_weekly_stats(limit)
def get_monthly_stats(self, limit: int = 10) -> List[Dict[str, Any]]:
"""Get monthly stats."""
return self.aggregation_manager.get_monthly_stats(limit)
# =============================================================================
# PERFORMANCE CALCULATION DELEGATION
# =============================================================================
def get_performance_stats(self) -> Dict[str, Any]:
"""Get performance stats."""
return self.performance_calculator.get_performance_stats()
def get_token_performance(self, token: Optional[str] = None) -> Union[List[Dict[str, Any]], Dict[str, Any]]:
"""Get performance data for a specific token or all tokens."""
try:
if token:
# Get performance for specific token
query = """
SELECT
symbol,
COUNT(*) as total_trades,
SUM(CASE WHEN realized_pnl > 0 THEN 1 ELSE 0 END) as winning_trades,
SUM(realized_pnl) as total_pnl,
AVG(realized_pnl) as avg_trade,
MAX(realized_pnl) as largest_win,
MIN(realized_pnl) as largest_loss,
AVG(CASE WHEN realized_pnl > 0 THEN realized_pnl ELSE NULL END) as avg_win,
AVG(CASE WHEN realized_pnl < 0 THEN realized_pnl ELSE NULL END) as avg_loss
FROM trades
WHERE symbol = ? AND status = 'position_closed'
GROUP BY symbol
"""
result = self.db_manager._fetchone_query(query, (token,))
if not result:
return {}
# Calculate win rate
total_trades = result['total_trades']
winning_trades = result['winning_trades']
win_rate = (winning_trades / total_trades * 100) if total_trades > 0 else 0
# Get recent trades
recent_trades_query = """
SELECT
side,
entry_price,
exit_price,
realized_pnl as pnl,
position_opened_at,
position_closed_at
FROM trades
WHERE symbol = ? AND status = 'position_closed'
ORDER BY position_closed_at DESC
LIMIT 5
"""
recent_trades = self.db_manager._fetch_query(recent_trades_query, (token,))
return {
'token': token,
'total_trades': total_trades,
'winning_trades': winning_trades,
'win_rate': win_rate,
'total_pnl': result['total_pnl'],
'avg_trade': result['avg_trade'],
'largest_win': result['largest_win'],
'largest_loss': result['largest_loss'],
'avg_win': result['avg_win'],
'avg_loss': result['avg_loss'],
'recent_trades': recent_trades
}
else:
# Get performance for all tokens
query = """
SELECT
symbol,
COUNT(*) as total_trades,
SUM(CASE WHEN realized_pnl > 0 THEN 1 ELSE 0 END) as winning_trades,
SUM(realized_pnl) as total_pnl,
AVG(realized_pnl) as avg_trade
FROM trades
WHERE status = 'position_closed'
GROUP BY symbol
ORDER BY total_pnl DESC
"""
results = self.db_manager._fetch_query(query)
performance_data = []
for result in results:
total_trades = result['total_trades']
winning_trades = result['winning_trades']
win_rate = (winning_trades / total_trades * 100) if total_trades > 0 else 0
performance_data.append({
'token': result['symbol'],
'total_trades': total_trades,
'winning_trades': winning_trades,
'win_rate': win_rate,
'total_pnl': result['total_pnl'],
'avg_trade': result['avg_trade']
})
return performance_data
except Exception as e:
logger.error(f"Error getting token performance: {e}")
return [] if token is None else {}
def get_balance_history(self, days: int = 30) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]:
"""Get balance history."""
return self.performance_calculator.get_balance_history(days)
def get_live_max_drawdown(self) -> Tuple[float, float]:
"""Get live max drawdown."""
return self.performance_calculator.get_live_max_drawdown()
def update_live_max_drawdown(self, current_balance: float) -> bool:
"""Update live max drawdown."""
return self.performance_calculator.update_live_max_drawdown(current_balance)
def get_drawdown_monitor_data(self) -> Dict[str, float]:
"""Get drawdown data from DrawdownMonitor for external monitoring systems."""
try:
peak_balance = float(self._get_metadata('drawdown_peak_balance') or '0.0')
max_drawdown_pct = float(self._get_metadata('drawdown_max_drawdown_pct') or '0.0')
return {
'peak_balance': peak_balance,
'max_drawdown_percentage': max_drawdown_pct
}
except (ValueError, TypeError):
return {'peak_balance': 0.0, 'max_drawdown_percentage': 0.0}
def calculate_sharpe_ratio(self, days: int = 30) -> Optional[float]:
"""Calculate Sharpe ratio."""
return self.performance_calculator.calculate_sharpe_ratio(days)
def calculate_max_consecutive_losses(self) -> int:
"""Calculate max consecutive losses."""
return self.performance_calculator.calculate_max_consecutive_losses()
def get_risk_metrics(self) -> Dict[str, Any]:
"""Get risk metrics."""
return self.performance_calculator.get_risk_metrics()
def get_period_performance(self, start_date: str, end_date: str) -> Dict[str, Any]:
"""Get period performance."""
return self.performance_calculator.get_period_performance(start_date, end_date)
def get_recent_performance_trend(self, days: int = 7) -> Dict[str, Any]:
"""Get recent performance trend."""
return self.performance_calculator.get_recent_performance_trend(days)
# =============================================================================
# COMPATIBILITY METHODS - Legacy API Support
# =============================================================================
def get_basic_stats(self, current_balance: Optional[float] = None) -> Dict[str, Any]:
"""Get basic trading statistics from DB, primarily using aggregated tables."""
# Get counts of open positions (trades that are not yet migrated)
open_positions_count = self._get_open_positions_count_from_db()
# Get overall aggregated stats from token_stats table
query_token_stats_summary = """
SELECT
SUM(total_realized_pnl) as total_pnl_from_cycles,
SUM(total_completed_cycles) as total_completed_cycles_sum,
MIN(first_cycle_closed_at) as overall_first_cycle_closed,
MAX(last_cycle_closed_at) as overall_last_cycle_closed
FROM token_stats
"""
token_stats_summary = self.db_manager._fetchone_query(query_token_stats_summary)
total_pnl_from_cycles = token_stats_summary['total_pnl_from_cycles'] if token_stats_summary and token_stats_summary['total_pnl_from_cycles'] is not None else 0.0
total_completed_cycles_sum = token_stats_summary['total_completed_cycles_sum'] if token_stats_summary and token_stats_summary['total_completed_cycles_sum'] is not None else 0
# Total trades considered as sum of completed cycles and currently open positions
total_trades_redefined = total_completed_cycles_sum + open_positions_count
initial_balance_str = self._get_metadata('initial_balance')
initial_balance = float(initial_balance_str) if initial_balance_str else 0.0
start_date_iso = self._get_metadata('start_date')
start_date_obj = datetime.fromisoformat(start_date_iso) if start_date_iso else datetime.now(timezone.utc)
days_active = (datetime.now(timezone.utc) - start_date_obj).days + 1
# Get last activity timestamp
last_activity_ts = None
last_activity_query = """
SELECT MAX(updated_at) as last_update
FROM trades
WHERE status IN ('position_opened', 'position_closed')
"""
last_activity_row = self.db_manager._fetchone_query(last_activity_query)
if last_activity_row and last_activity_row['last_update']:
last_activity_ts = last_activity_row['last_update']
# Ensure timezone-aware
if isinstance(last_activity_ts, str):
last_activity_ts = datetime.fromisoformat(last_activity_ts)
if last_activity_ts.tzinfo is None:
last_activity_ts = last_activity_ts.replace(tzinfo=timezone.utc)
# Get last open trade timestamp
last_open_trade_query = """
SELECT MAX(updated_at) as last_update
FROM trades
WHERE status = 'position_opened'
"""
last_open_trade_ts_row = self.db_manager._fetchone_query(last_open_trade_query)
if last_open_trade_ts_row and last_open_trade_ts_row['last_update']:
last_open_trade_ts = last_open_trade_ts_row['last_update']
# Ensure timezone-aware
if isinstance(last_open_trade_ts, str):
last_open_trade_ts = datetime.fromisoformat(last_open_trade_ts)
if last_open_trade_ts.tzinfo is None:
last_open_trade_ts = last_open_trade_ts.replace(tzinfo=timezone.utc)
# Now both datetimes are timezone-aware, we can compare them
if not last_activity_ts or last_open_trade_ts > last_activity_ts:
last_activity_ts = last_open_trade_ts
return {
'total_trades': total_trades_redefined,
'completed_trades': total_completed_cycles_sum,
'initial_balance': initial_balance,
'total_pnl': total_pnl_from_cycles,
'days_active': days_active,
'start_date': start_date_obj.strftime('%Y-%m-%d'),
'last_trade': last_activity_ts,
'open_positions_count': open_positions_count
}
def _get_open_positions_count_from_db(self) -> int:
"""Get count of open positions from trades table."""
row = self.db_manager._fetchone_query("SELECT COUNT(DISTINCT symbol) as count FROM trades WHERE status = 'position_opened'")
return row['count'] if row else 0
def get_token_detailed_stats(self, token: str) -> Dict[str, Any]:
"""Get detailed statistics for a specific token."""
try:
# Normalize token case
upper_token = _normalize_token_case(token)
# Get aggregated stats from token_stats table
token_agg_stats = self.db_manager._fetchone_query(
"SELECT * FROM token_stats WHERE token = ?", (upper_token,)
)
# Get open trades for this token
open_trades_for_token = self.db_manager._fetch_query(
"SELECT * FROM trades WHERE status = 'position_opened' AND symbol LIKE ? ORDER BY position_opened_at DESC",
(f"{upper_token}/%",)
)
# Initialize performance stats
perf_stats = {
'completed_trades': 0,
'total_pnl': 0.0,
'pnl_percentage': 0.0,
'win_rate': 0.0,
'profit_factor': 0.0,
'avg_win': 0.0,
'avg_loss': 0.0,
'largest_win': 0.0,
'largest_loss': 0.0,
'expectancy': 0.0,
'total_wins': 0,
'total_losses': 0,
'completed_entry_volume': 0.0,
'completed_exit_volume': 0.0,
'total_cancelled': 0,
'total_duration_seconds': 0,
'avg_trade_duration': "N/A"
}
if token_agg_stats:
total_cycles = token_agg_stats.get('total_completed_cycles', 0)
winning_cycles = token_agg_stats.get('winning_cycles', 0)
losing_cycles = token_agg_stats.get('losing_cycles', 0)
sum_winning_pnl = token_agg_stats.get('sum_of_winning_pnl', 0.0)
sum_losing_pnl = token_agg_stats.get('sum_of_losing_pnl', 0.0)
# Calculate percentages for largest trades
largest_win_pnl = token_agg_stats.get('largest_winning_cycle_pnl', 0.0)
largest_loss_pnl = token_agg_stats.get('largest_losing_cycle_pnl', 0.0)
largest_win_entry_volume = token_agg_stats.get('largest_winning_cycle_entry_volume', 0.0)
largest_loss_entry_volume = token_agg_stats.get('largest_losing_cycle_entry_volume', 0.0)
largest_win_percentage = (largest_win_pnl / largest_win_entry_volume * 100) if largest_win_entry_volume > 0 else 0.0
largest_loss_percentage = (largest_loss_pnl / largest_loss_entry_volume * 100) if largest_loss_entry_volume > 0 else 0.0
perf_stats.update({
'completed_trades': total_cycles,
'total_pnl': token_agg_stats.get('total_realized_pnl', 0.0),
'win_rate': (winning_cycles / total_cycles * 100) if total_cycles > 0 else 0.0,
'profit_factor': (sum_winning_pnl / sum_losing_pnl) if sum_losing_pnl > 0 else float('inf') if sum_winning_pnl > 0 else 0.0,
'avg_win': (sum_winning_pnl / winning_cycles) if winning_cycles > 0 else 0.0,
'avg_loss': (sum_losing_pnl / losing_cycles) if losing_cycles > 0 else 0.0,
'largest_win': largest_win_pnl,
'largest_loss': largest_loss_pnl,
'largest_win_percentage': largest_win_percentage,
'largest_loss_percentage': largest_loss_percentage,
'total_wins': winning_cycles,
'total_losses': losing_cycles,
'completed_entry_volume': token_agg_stats.get('total_entry_volume', 0.0),
'completed_exit_volume': token_agg_stats.get('total_exit_volume', 0.0),
'total_cancelled': token_agg_stats.get('total_cancelled_cycles', 0),
'total_duration_seconds': token_agg_stats.get('total_duration_seconds', 0)
})
# Calculate expectancy
win_rate_decimal = perf_stats['win_rate'] / 100
perf_stats['expectancy'] = (perf_stats['avg_win'] * win_rate_decimal) - (perf_stats['avg_loss'] * (1 - win_rate_decimal))
# Format average trade duration
if total_cycles > 0:
avg_duration_seconds = token_agg_stats.get('total_duration_seconds', 0) / total_cycles
perf_stats['avg_trade_duration'] = self._format_duration(avg_duration_seconds)
# Calculate open positions summary
open_positions_summary = []
total_open_value = 0.0
total_open_unrealized_pnl = 0.0
for op_trade in open_trades_for_token:
open_positions_summary.append({
'lifecycle_id': op_trade.get('trade_lifecycle_id'),
'side': op_trade.get('position_side'),
'amount': op_trade.get('current_position_size'),
'entry_price': op_trade.get('entry_price'),
'mark_price': op_trade.get('mark_price'),
'unrealized_pnl': op_trade.get('unrealized_pnl'),
'opened_at': op_trade.get('position_opened_at')
})
total_open_value += op_trade.get('value', 0.0)
total_open_unrealized_pnl += op_trade.get('unrealized_pnl', 0.0)
# Get open orders count for this token
open_orders_count_row = self.db_manager._fetchone_query(
"SELECT COUNT(*) as count FROM orders WHERE symbol LIKE ? AND status IN ('open', 'submitted', 'pending_trigger')",
(f"{upper_token}/%",)
)
current_open_orders_for_token = open_orders_count_row['count'] if open_orders_count_row else 0
effective_total_trades = perf_stats['completed_trades'] + len(open_trades_for_token)
return {
'token': upper_token,
'message': f"Statistics for {upper_token}",
'performance_summary': perf_stats, # Expected key by formatting method
'performance': perf_stats, # Legacy compatibility
'open_positions': open_positions_summary, # Direct list as expected
'summary_total_trades': effective_total_trades, # Expected by formatting method
'summary_total_unrealized_pnl': total_open_unrealized_pnl, # Expected by formatting method
'current_open_orders_count': current_open_orders_for_token, # Expected by formatting method
'summary': {
'total_trades': effective_total_trades,
'open_orders': current_open_orders_for_token,
}
}
except Exception as e:
logger.error(f"❌ Error getting detailed stats for {token}: {e}")
return {}
def _format_duration(self, seconds: float) -> str:
"""Format duration in seconds to a human-readable string."""
if seconds <= 0:
return "0s"
days = int(seconds // 86400)
hours = int((seconds % 86400) // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
parts = []
if days > 0:
parts.append(f"{days}d")
if hours > 0:
parts.append(f"{hours}h")
if minutes > 0:
parts.append(f"{minutes}m")
if secs > 0 or not parts:
parts.append(f"{secs}s")
return " ".join(parts)
async def format_stats_message(self, current_balance: Optional[float] = None) -> str:
"""Formats a comprehensive statistics message."""
formatter = get_formatter()
basic_stats = self.get_basic_stats(current_balance)
initial_bal = basic_stats.get('initial_balance', 0.0)
total_pnl_val = basic_stats.get('total_pnl', 0.0)
total_return_pct = basic_stats.get('total_return_pct', 0.0)
pnl_emoji = "✅" if total_pnl_val >= 0 else "🔻"
stats_text_parts = [
f"📊 Trading Performance Summary",
f"• Current Balance: {await formatter.format_price_with_symbol(current_balance if current_balance is not None else (initial_bal + total_pnl_val))} ({await formatter.format_price_with_symbol(current_balance if current_balance is not None else (initial_bal + total_pnl_val) - initial_bal) if initial_bal > 0 else 'N/A'})",
f"• Initial Balance: {await formatter.format_price_with_symbol(initial_bal)}",
f"• Balance Change: {await formatter.format_price_with_symbol(total_pnl_val)} ({total_return_pct:+.2f}%)",
f"• {pnl_emoji} Total P&L: {await formatter.format_price_with_symbol(total_pnl_val)} ({total_return_pct:+.2f}%)"
]
# Performance Metrics
perf = basic_stats.get('performance_metrics', {})
if perf:
stats_text_parts.append("\nKey Metrics:")
stats_text_parts.append(f"• Trading Volume (Entry Vol.): {await formatter.format_price_with_symbol(perf.get('total_trading_volume', 0.0))}")
if perf.get('expectancy') is not None:
stats_text_parts.append(f"• Expectancy: {await formatter.format_price_with_symbol(perf['expectancy'])}")
stats_text_parts.append(f"• Win Rate: {perf.get('win_rate', 0.0):.2f}% ({perf.get('num_wins', 0)} wins)")
stats_text_parts.append(f"• Profit Factor: {perf.get('profit_factor', 0.0):.2f}")
# Largest Trades
if perf.get('largest_win') is not None:
largest_win_pct_str = f" ({perf.get('largest_win_entry_pct', 0):.2f}%)" if perf.get('largest_win_entry_pct') is not None else ""
largest_win_token = perf.get('largest_win_token', 'N/A')
stats_text_parts.append(f"• Largest Winning Trade: {await formatter.format_price_with_symbol(perf['largest_win'])}{largest_win_pct_str} ({largest_win_token})")
if perf.get('largest_loss') is not None:
largest_loss_pct_str = f" ({perf.get('largest_loss_entry_pct', 0):.2f}%)" if perf.get('largest_loss_entry_pct') is not None else ""
largest_loss_token = perf.get('largest_loss_token', 'N/A')
stats_text_parts.append(f"• Largest Losing Trade: {await formatter.format_price_with_symbol(-perf['largest_loss'])}{largest_loss_pct_str} ({largest_loss_token})")
# ROE-based metrics if available
largest_win_roe = perf.get('largest_win_roe')
largest_loss_roe = perf.get('largest_loss_roe')
if largest_win_roe is not None:
largest_win_roe_pnl = perf.get('largest_win_roe_pnl', 0.0)
largest_win_roe_token = perf.get('largest_win_roe_token', 'N/A')
stats_text_parts.append(f"• Best ROE Trade: {await formatter.format_price_with_symbol(largest_win_roe_pnl)} (+{largest_win_roe:.2f}%) ({largest_win_roe_token})")
if largest_loss_roe is not None:
largest_loss_roe_pnl = perf.get('largest_loss_roe_pnl', 0.0)
largest_loss_roe_token = perf.get('largest_loss_roe_token', 'N/A')
stats_text_parts.append(f"• Worst ROE Trade: {await formatter.format_price_with_symbol(-largest_loss_roe_pnl)} (-{largest_loss_roe:.2f}%) ({largest_loss_roe_token})")
# Best/Worst Tokens
best_token_stats = basic_stats.get('best_token')
worst_token_stats = basic_stats.get('worst_token')
if best_token_stats:
stats_text_parts.append(f"• Best Token: {best_token_stats['name']} {await formatter.format_price_with_symbol(best_token_stats['pnl_value'])} ({best_token_stats['pnl_percentage']:+.2f}%)")
if worst_token_stats:
stats_text_parts.append(f"• Worst Token: {worst_token_stats['name']} {await formatter.format_price_with_symbol(worst_token_stats['pnl_value'])} ({worst_token_stats['pnl_percentage']:+.2f}%)")
return "\n".join(stats_text_parts)
async def format_token_stats_message(self, token: str) -> str:
"""Formats a statistics message for a specific token."""
formatter = get_formatter()
token_stats = self.get_token_detailed_stats(token)
normalized_token = _normalize_token_case(token)
token_name = token_stats.get('token', normalized_token.upper())
if not token_stats or token_stats.get('summary_total_trades', 0) == 0:
return (
f"📊 {token_name} Statistics\n\n"
f"📭 No trading data found for {token_name}.\n\n"
f"💡 To trade this token, try commands like:\n"
f" /long {token_name} 100
\n"
f" /short {token_name} 100
"
)
perf_summary = token_stats.get('performance_summary', {})
open_positions = token_stats.get('open_positions', [])
parts = [f"📊 {token_name.upper()} Detailed Statistics\n"]
# Completed Trades Summary
parts.append("📈 Completed Trades Summary:")
if perf_summary.get('completed_trades', 0) > 0:
pnl_emoji = "✅" if perf_summary.get('total_pnl', 0) >= 0 else "🔻"
entry_vol = perf_summary.get('completed_entry_volume', 0.0)
pnl_pct = (perf_summary.get('total_pnl', 0.0) / entry_vol * 100) if entry_vol > 0 else 0.0
parts.append(f"• Total Completed: {perf_summary.get('completed_trades', 0)}")
parts.append(f"• {pnl_emoji} Realized P&L: {await formatter.format_price_with_symbol(perf_summary.get('total_pnl', 0.0))} ({pnl_pct:+.2f}%)")
parts.append(f"• Win Rate: {perf_summary.get('win_rate', 0.0):.1f}% ({perf_summary.get('total_wins', 0)}W / {perf_summary.get('total_losses', 0)}L)")
parts.append(f"• Profit Factor: {perf_summary.get('profit_factor', 0.0):.2f}")
parts.append(f"• Expectancy: {await formatter.format_price_with_symbol(perf_summary.get('expectancy', 0.0))}")
parts.append(f"• Avg Win: {await formatter.format_price_with_symbol(perf_summary.get('avg_win', 0.0))} | Avg Loss: {await formatter.format_price_with_symbol(perf_summary.get('avg_loss', 0.0))}")
# Format largest trades with percentages
largest_win_pct_str = f" ({perf_summary.get('largest_win_entry_pct', 0):.2f}%)" if perf_summary.get('largest_win_entry_pct') is not None else ""
largest_loss_pct_str = f" ({perf_summary.get('largest_loss_entry_pct', 0):.2f}%)" if perf_summary.get('largest_loss_entry_pct') is not None else ""
parts.append(f"• Largest Win: {await formatter.format_price_with_symbol(perf_summary.get('largest_win', 0.0))}{largest_win_pct_str} | Largest Loss: {await formatter.format_price_with_symbol(perf_summary.get('largest_loss', 0.0))}{largest_loss_pct_str}")
parts.append(f"• Entry Volume: {await formatter.format_price_with_symbol(perf_summary.get('completed_entry_volume', 0.0))}")
parts.append(f"• Exit Volume: {await formatter.format_price_with_symbol(perf_summary.get('completed_exit_volume', 0.0))}")
parts.append(f"• Average Trade Duration: {perf_summary.get('avg_trade_duration', 'N/A')}")
parts.append(f"• Cancelled Cycles: {perf_summary.get('total_cancelled', 0)}")
else:
parts.append("• No completed trades for this token yet.")
parts.append("")
# Open Positions
parts.append("📉 Current Open Positions:")
if open_positions:
total_open_unrealized_pnl = token_stats.get('summary_total_unrealized_pnl', 0.0)
open_pnl_emoji = "✅" if total_open_unrealized_pnl >= 0 else "🔻"
for pos in open_positions:
pos_side_emoji = "🔼" if pos.get('side', 'buy').lower() == 'buy' else "🔽"
pos_pnl_emoji = "✅" if pos.get('unrealized_pnl', 0) >= 0 else "🔻"
opened_at_str = "N/A"
if pos.get('opened_at'):
try:
from datetime import datetime
opened_at_dt = datetime.fromisoformat(pos['opened_at'])
opened_at_str = opened_at_dt.strftime('%Y-%m-%d %H:%M')
except:
pass
parts.append(f"• {pos_side_emoji} {pos.get('side', '').upper()} {await formatter.format_amount(abs(pos.get('amount',0)), token_name)} {token_name}")
parts.append(f" Entry: {await formatter.format_price_with_symbol(pos.get('entry_price',0), token_name)} | Mark: {await formatter.format_price_with_symbol(pos.get('mark_price',0), token_name)}")
parts.append(f" {pos_pnl_emoji} Unrealized P&L: {await formatter.format_price_with_symbol(pos.get('unrealized_pnl',0))}")
parts.append(f" Opened: {opened_at_str} | ID: ...{pos.get('lifecycle_id', '')[-6:]}")
parts.append(f" {open_pnl_emoji} Total Open P&L: {await formatter.format_price_with_symbol(total_open_unrealized_pnl)}")
else:
parts.append("• No open positions for this token.")
parts.append("")
parts.append(f"📋 Open Orders (Exchange): {token_stats.get('current_open_orders_count', 0)}")
parts.append(f"💡 Use /performance {token_name}
for another view including recent trades.")
return "\n".join(parts)
# =============================================================================
# CONVENIENCE METHODS & HIGH-LEVEL OPERATIONS
# =============================================================================
def process_trade_complete_cycle(self, symbol: str, side: str, entry_price: float,
exit_price: float, amount: float,
timestamp: Optional[str] = None) -> str:
"""Process a complete trade cycle in one operation."""
# Create lifecycle
lifecycle_id = self.create_trade_lifecycle(symbol, side, trade_type='complete_cycle')
if not lifecycle_id:
raise Exception("Failed to create trade lifecycle")
# Update to position opened
success = self.update_trade_position_opened(lifecycle_id, entry_price, amount, "manual_entry")
if not success:
raise Exception("Failed to update position opened")
# Calculate PnL
if side.lower() == 'buy':
realized_pnl = (exit_price - entry_price) * amount
else: # sell
realized_pnl = (entry_price - exit_price) * amount
# Update to position closed
success = self.update_trade_position_closed(lifecycle_id, exit_price, realized_pnl, "manual_exit")
if not success:
raise Exception("Failed to update position closed")
# Migrate to aggregated stats
self.migrate_trade_to_aggregated_stats(lifecycle_id)
logger.info(f"✅ Processed complete trade cycle: {symbol} {side.upper()} P&L: ${realized_pnl:.2f}")
return lifecycle_id
def get_summary_report(self) -> Dict[str, Any]:
"""Get comprehensive summary report."""
try:
perf_stats = self.get_performance_stats()
token_performance = self.get_token_performance(limit=10)
daily_stats = self.get_daily_stats(limit=7)
risk_metrics = self.get_risk_metrics()
balance_adjustments = self.get_balance_adjustments_summary()
# Get current positions
open_positions = self.get_open_positions()
return {
'performance_stats': perf_stats,
'top_tokens': token_performance,
'recent_daily_stats': daily_stats,
'risk_metrics': risk_metrics,
'balance_adjustments': balance_adjustments,
'open_positions_count': len(open_positions),
'open_positions': open_positions,
'generated_at': datetime.now(timezone.utc).isoformat()
}
except Exception as e:
logger.error(f"❌ Error generating summary report: {e}")
return {'error': str(e)}
async def record_trade(self, symbol: str, side: str, amount: float, price: float,
exchange_fill_id: Optional[str] = None, trade_type: str = "manual",
pnl: Optional[float] = None, timestamp: Optional[str] = None,
linked_order_table_id_to_link: Optional[int] = None):
"""DEPRECATED - use trade lifecycle methods instead."""
if timestamp is None:
timestamp = datetime.now(timezone.utc).isoformat()
value = amount * price
formatter = get_formatter()
ts = timestamp or datetime.now(timezone.utc).isoformat()
base_asset_for_amount = symbol.split('/')[0]
logger.info(f"📈 Trade recorded: {side.upper()} {await formatter.format_amount(amount, base_asset_for_amount)} {symbol} @ {await formatter.format_price(price, symbol)} ({await formatter.format_price(value, symbol)}) [{trade_type}]")
self.db_manager._execute_query(
"INSERT OR IGNORE INTO trades (symbol, side, amount, price, value, trade_type, timestamp, exchange_fill_id, pnl, linked_order_table_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(symbol, side, amount, price, value, trade_type, ts, exchange_fill_id, pnl or 0.0, linked_order_table_id_to_link)
)
def health_check(self) -> Dict[str, Any]:
"""Perform health check on all components."""
try:
health = {
'database': 'ok',
'order_manager': 'ok',
'trade_manager': 'ok',
'aggregation_manager': 'ok',
'performance_calculator': 'ok',
'overall': 'ok'
}
# Test database connection
self.db_manager._fetch_query("SELECT 1")
# Test each component with basic operations
self.get_recent_orders(limit=1)
self.get_recent_trades(limit=1)
self.get_daily_stats(limit=1)
self.get_performance_stats()
return health
except Exception as e:
logger.error(f"❌ Health check failed: {e}")
return {'overall': 'error', 'error': str(e)}