123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815 |
- #!/usr/bin/env python3
- """
- Trading Statistics Tracker (Refactored Version)
- Main class that coordinates between specialized manager components.
- """
- import logging
- from datetime import datetime, timezone
- from typing import Dict, List, Any, Optional, Tuple
- import math
- import numpy as np
- import uuid
- from .database_manager import DatabaseManager
- from .order_manager import OrderManager
- from .trade_lifecycle_manager import TradeLifecycleManager
- from .aggregation_manager import AggregationManager
- from .performance_calculator import PerformanceCalculator
- from src.utils.token_display_formatter import get_formatter
- logger = logging.getLogger(__name__)
- def _normalize_token_case(token: str) -> str:
- """Normalize token case for consistency."""
- if any(c.isupper() for c in token):
- return token # Keep original case for mixed-case tokens
- else:
- return token.upper() # Convert to uppercase for all-lowercase
- class TradingStats:
- """Refactored trading statistics tracker using modular components."""
- def __init__(self, db_path: str = "data/trading_stats.sqlite"):
- """Initialize with all manager components."""
- # Initialize core database manager
- self.db_manager = DatabaseManager(db_path)
-
- # Initialize specialized managers
- self.order_manager = OrderManager(self.db_manager)
- self.trade_manager = TradeLifecycleManager(self.db_manager)
- self.aggregation_manager = AggregationManager(self.db_manager)
- self.performance_calculator = PerformanceCalculator(self.db_manager)
-
- logger.info("🚀 TradingStats initialized with modular components")
- def close(self):
- """Close database connection."""
- self.db_manager.close()
- # =============================================================================
- # COMPATIBILITY METHODS - Direct exposure of internal methods
- # =============================================================================
-
- def _get_metadata(self, key: str) -> Optional[str]:
- """Get metadata from database."""
- return self.db_manager._get_metadata(key)
-
- def _set_metadata(self, key: str, value: str):
- """Set metadata in database."""
- return self.db_manager._set_metadata(key, value)
- # =============================================================================
- # DATABASE MANAGEMENT DELEGATION
- # =============================================================================
-
- def set_initial_balance(self, balance: float):
- """Set initial balance."""
- return self.db_manager.set_initial_balance(balance)
-
- def get_initial_balance(self) -> float:
- """Get initial balance."""
- return self.db_manager.get_initial_balance()
-
- def record_balance_snapshot(self, balance: float, unrealized_pnl: float = 0.0,
- timestamp: Optional[str] = None, notes: Optional[str] = None):
- """Record balance snapshot."""
- return self.db_manager.record_balance_snapshot(balance, unrealized_pnl, timestamp, notes)
-
- def purge_old_balance_history(self, days_to_keep: int = 30) -> int:
- """Purge old balance history."""
- return self.db_manager.purge_old_balance_history(days_to_keep)
-
- def get_balance_history_record_count(self) -> int:
- """Get balance history record count."""
- return self.db_manager.get_balance_history_record_count()
-
- def purge_old_daily_aggregated_stats(self, days_to_keep: int = 365) -> int:
- """Purge old daily aggregated stats."""
- return self.db_manager.purge_old_daily_aggregated_stats(days_to_keep)
- # =============================================================================
- # ORDER MANAGEMENT DELEGATION
- # =============================================================================
-
- def record_order_placed(self, symbol: str, side: str, order_type: str,
- amount_requested: float, price: Optional[float] = None,
- bot_order_ref_id: Optional[str] = None,
- exchange_order_id: Optional[str] = None,
- timestamp: Optional[str] = None) -> bool:
- """Record order placement."""
- result = self.order_manager.record_order_placed(
- symbol, side, order_type, amount_requested, price,
- bot_order_ref_id, exchange_order_id
- )
- return result is not None
-
- def update_order_exchange_id(self, bot_order_ref_id: str, exchange_order_id: str) -> bool:
- """Update order with exchange ID."""
- return self.order_manager.update_order_exchange_id(bot_order_ref_id, exchange_order_id)
-
- def record_order_filled(self, exchange_order_id: str, actual_amount: float,
- actual_price: float, fees: float = 0.0,
- timestamp: Optional[str] = None,
- exchange_fill_id: Optional[str] = None) -> bool:
- """Record order fill."""
- return self.order_manager.record_order_filled(
- exchange_order_id, actual_amount, actual_price, fees, timestamp, exchange_fill_id
- )
-
- def record_order_cancelled(self, exchange_order_id: str, reason: str = "user_cancelled",
- timestamp: Optional[str] = None) -> bool:
- """Record order cancellation."""
- return self.order_manager.record_order_cancelled(exchange_order_id, reason, timestamp)
-
- def update_order_status(self, exchange_order_id: str, new_status: str,
- notes: Optional[str] = None, timestamp: Optional[str] = None) -> bool:
- """Update order status."""
- return self.order_manager.update_order_status(exchange_order_id, new_status, notes, timestamp)
-
- def get_order_by_exchange_id(self, exchange_order_id: str) -> Optional[Dict[str, Any]]:
- """Get order by exchange ID."""
- return self.order_manager.get_order_by_exchange_id(exchange_order_id)
-
- def get_order_by_bot_ref_id(self, bot_order_ref_id: str) -> Optional[Dict[str, Any]]:
- """Get order by bot reference ID."""
- return self.order_manager.get_order_by_bot_ref_id(bot_order_ref_id)
-
- def get_orders_by_symbol(self, symbol: str, limit: int = 50) -> List[Dict[str, Any]]:
- """Get orders by symbol."""
- return self.order_manager.get_orders_by_symbol(symbol, limit)
-
- def get_orders_by_status(self, status: str, limit: Optional[int] = 50,
- order_type_filter: Optional[str] = None,
- parent_bot_order_ref_id: Optional[str] = None) -> List[Dict[str, Any]]:
- """Get orders by status with optional filters."""
- # OrderManager expects (status, order_type_filter, parent_bot_order_ref_id) without limit
- return self.order_manager.get_orders_by_status(status, order_type_filter, parent_bot_order_ref_id)
-
- def get_recent_orders(self, limit: int = 20) -> List[Dict[str, Any]]:
- """Get recent orders."""
- return self.order_manager.get_recent_orders(limit)
-
- def cleanup_old_cancelled_orders(self, days_old: int = 7) -> int:
- """Clean up old cancelled orders."""
- return self.order_manager.cleanup_old_cancelled_orders(days_old)
- # =============================================================================
- # TRADE LIFECYCLE DELEGATION
- # =============================================================================
-
- def create_trade_lifecycle(self, symbol: str, side: str, entry_order_id: Optional[str] = None,
- entry_bot_order_ref_id: Optional[str] = None,
- stop_loss_price: Optional[float] = None,
- take_profit_price: Optional[float] = None,
- trade_type: str = 'manual') -> Optional[str]:
- """Create trade lifecycle."""
- return self.trade_manager.create_trade_lifecycle(
- symbol, side, entry_order_id, entry_bot_order_ref_id,
- stop_loss_price, take_profit_price, trade_type
- )
-
- def update_trade_position_opened(self, lifecycle_id: str, entry_price: float,
- entry_amount: float, exchange_fill_id: str) -> bool:
- """Update trade position opened."""
- return self.trade_manager.update_trade_position_opened(
- lifecycle_id, entry_price, entry_amount, exchange_fill_id
- )
-
- def update_trade_position_closed(self, lifecycle_id: str, exit_price: float,
- realized_pnl: float, exchange_fill_id: str) -> bool:
- """Update trade position closed."""
- return self.trade_manager.update_trade_position_closed(
- lifecycle_id, exit_price, realized_pnl, exchange_fill_id
- )
-
- def update_trade_cancelled(self, lifecycle_id: str, reason: str = "order_cancelled") -> bool:
- """Update trade cancelled."""
- return self.trade_manager.update_trade_cancelled(lifecycle_id, reason)
-
- def link_stop_loss_to_trade(self, lifecycle_id: str, stop_loss_order_id: str,
- stop_loss_price: float) -> bool:
- """Link stop loss to trade."""
- return self.trade_manager.link_stop_loss_to_trade(
- lifecycle_id, stop_loss_order_id, stop_loss_price
- )
-
- def link_take_profit_to_trade(self, lifecycle_id: str, take_profit_order_id: str,
- take_profit_price: float) -> bool:
- """Link take profit to trade."""
- return self.trade_manager.link_take_profit_to_trade(
- lifecycle_id, take_profit_order_id, take_profit_price
- )
-
- def get_trade_by_lifecycle_id(self, lifecycle_id: str) -> Optional[Dict[str, Any]]:
- """Get trade by lifecycle ID."""
- return self.trade_manager.get_trade_by_lifecycle_id(lifecycle_id)
-
- def get_trade_by_symbol_and_status(self, symbol: str, status: str = 'position_opened') -> Optional[Dict[str, Any]]:
- """Get trade by symbol and status."""
- return self.trade_manager.get_trade_by_symbol_and_status(symbol, status)
-
- def get_open_positions(self, symbol: Optional[str] = None) -> List[Dict[str, Any]]:
- """Get open positions."""
- return self.trade_manager.get_open_positions(symbol)
-
- def get_trades_by_status(self, status: str, limit: int = 50) -> List[Dict[str, Any]]:
- """Get trades by status."""
- return self.trade_manager.get_trades_by_status(status, limit)
-
- def get_lifecycle_by_entry_order_id(self, entry_exchange_order_id: str, status: Optional[str] = None) -> Optional[Dict[str, Any]]:
- """Get lifecycle by entry order ID."""
- return self.trade_manager.get_lifecycle_by_entry_order_id(entry_exchange_order_id, status)
-
- def get_lifecycle_by_sl_order_id(self, sl_exchange_order_id: str, status: str = 'position_opened') -> Optional[Dict[str, Any]]:
- """Get lifecycle by stop loss order ID."""
- return self.trade_manager.get_lifecycle_by_sl_order_id(sl_exchange_order_id, status)
-
- def get_lifecycle_by_tp_order_id(self, tp_exchange_order_id: str, status: str = 'position_opened') -> Optional[Dict[str, Any]]:
- """Get lifecycle by take profit order ID."""
- return self.trade_manager.get_lifecycle_by_tp_order_id(tp_exchange_order_id, status)
-
- def get_pending_stop_loss_activations(self) -> List[Dict[str, Any]]:
- """Get pending stop loss activations."""
- return self.trade_manager.get_pending_stop_loss_activations()
-
- def cleanup_old_cancelled_trades(self, days_old: int = 7) -> int:
- """Clean up old cancelled trades."""
- return self.trade_manager.cleanup_old_cancelled_trades(days_old)
-
- def confirm_position_with_exchange(self, symbol: str, exchange_position_size: float,
- exchange_open_orders: List[Dict]) -> bool:
- """Confirm position with exchange."""
- return self.trade_manager.confirm_position_with_exchange(
- symbol, exchange_position_size, exchange_open_orders
- )
-
- def update_trade_market_data(self, trade_lifecycle_id: str, **kwargs) -> bool:
- """Update trade market data."""
- return self.trade_manager.update_trade_market_data(trade_lifecycle_id, **kwargs)
-
- def get_recent_trades(self, limit: int = 10) -> List[Dict[str, Any]]:
- """Get recent trades."""
- return self.trade_manager.get_recent_trades(limit)
-
- def get_all_trades(self) -> List[Dict[str, Any]]:
- """Get all trades."""
- return self.trade_manager.get_all_trades()
- # =============================================================================
- # AGGREGATION MANAGEMENT DELEGATION
- # =============================================================================
-
- def migrate_trade_to_aggregated_stats(self, trade_lifecycle_id: str):
- """Migrate trade to aggregated stats."""
- return self.aggregation_manager.migrate_trade_to_aggregated_stats(trade_lifecycle_id)
-
- def record_deposit(self, amount: float, timestamp: Optional[str] = None,
- deposit_id: Optional[str] = None, description: Optional[str] = None):
- """Record deposit."""
- return self.aggregation_manager.record_deposit(amount, timestamp, deposit_id, description)
-
- def record_withdrawal(self, amount: float, timestamp: Optional[str] = None,
- withdrawal_id: Optional[str] = None, description: Optional[str] = None):
- """Record withdrawal."""
- return self.aggregation_manager.record_withdrawal(amount, timestamp, withdrawal_id, description)
-
- def get_balance_adjustments_summary(self) -> Dict[str, Any]:
- """Get balance adjustments summary."""
- return self.aggregation_manager.get_balance_adjustments_summary()
-
- def get_daily_stats(self, limit: int = 10) -> List[Dict[str, Any]]:
- """Get daily stats."""
- return self.aggregation_manager.get_daily_stats(limit)
-
- def get_weekly_stats(self, limit: int = 10) -> List[Dict[str, Any]]:
- """Get weekly stats."""
- return self.aggregation_manager.get_weekly_stats(limit)
-
- def get_monthly_stats(self, limit: int = 10) -> List[Dict[str, Any]]:
- """Get monthly stats."""
- return self.aggregation_manager.get_monthly_stats(limit)
- # =============================================================================
- # PERFORMANCE CALCULATION DELEGATION
- # =============================================================================
-
- def get_performance_stats(self) -> Dict[str, Any]:
- """Get performance stats."""
- return self.performance_calculator.get_performance_stats()
-
- def get_token_performance(self, limit: int = 20) -> List[Dict[str, Any]]:
- """Get token performance."""
- return self.performance_calculator.get_token_performance(limit)
-
- def get_balance_history(self, days: int = 30) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]:
- """Get balance history."""
- return self.performance_calculator.get_balance_history(days)
-
- def get_live_max_drawdown(self) -> Tuple[float, float]:
- """Get live max drawdown."""
- return self.performance_calculator.get_live_max_drawdown()
-
- def update_live_max_drawdown(self, current_balance: float) -> bool:
- """Update live max drawdown."""
- return self.performance_calculator.update_live_max_drawdown(current_balance)
-
- def get_drawdown_monitor_data(self) -> Dict[str, float]:
- """Get drawdown data from DrawdownMonitor for external monitoring systems."""
- try:
- peak_balance = float(self._get_metadata('drawdown_peak_balance') or '0.0')
- max_drawdown_pct = float(self._get_metadata('drawdown_max_drawdown_pct') or '0.0')
- return {
- 'peak_balance': peak_balance,
- 'max_drawdown_percentage': max_drawdown_pct
- }
- except (ValueError, TypeError):
- return {'peak_balance': 0.0, 'max_drawdown_percentage': 0.0}
-
- def calculate_sharpe_ratio(self, days: int = 30) -> Optional[float]:
- """Calculate Sharpe ratio."""
- return self.performance_calculator.calculate_sharpe_ratio(days)
-
- def calculate_max_consecutive_losses(self) -> int:
- """Calculate max consecutive losses."""
- return self.performance_calculator.calculate_max_consecutive_losses()
-
- def get_risk_metrics(self) -> Dict[str, Any]:
- """Get risk metrics."""
- return self.performance_calculator.get_risk_metrics()
-
- def get_period_performance(self, start_date: str, end_date: str) -> Dict[str, Any]:
- """Get period performance."""
- return self.performance_calculator.get_period_performance(start_date, end_date)
-
- def get_recent_performance_trend(self, days: int = 7) -> Dict[str, Any]:
- """Get recent performance trend."""
- return self.performance_calculator.get_recent_performance_trend(days)
- # =============================================================================
- # COMPATIBILITY METHODS - Legacy API Support
- # =============================================================================
-
- def get_basic_stats(self, current_balance: Optional[float] = None) -> Dict[str, Any]:
- """Get basic trading statistics from DB, primarily using aggregated tables."""
-
- # Get counts of open positions (trades that are not yet migrated)
- open_positions_count = self._get_open_positions_count_from_db()
- # Get overall aggregated stats from token_stats table
- query_token_stats_summary = """
- SELECT
- SUM(total_realized_pnl) as total_pnl_from_cycles,
- SUM(total_completed_cycles) as total_completed_cycles_sum,
- MIN(first_cycle_closed_at) as overall_first_cycle_closed,
- MAX(last_cycle_closed_at) as overall_last_cycle_closed
- FROM token_stats
- """
- token_stats_summary = self.db_manager._fetchone_query(query_token_stats_summary)
- total_pnl_from_cycles = token_stats_summary['total_pnl_from_cycles'] if token_stats_summary and token_stats_summary['total_pnl_from_cycles'] is not None else 0.0
- total_completed_cycles_sum = token_stats_summary['total_completed_cycles_sum'] if token_stats_summary and token_stats_summary['total_completed_cycles_sum'] is not None else 0
- # Total trades considered as sum of completed cycles and currently open positions
- total_trades_redefined = total_completed_cycles_sum + open_positions_count
- initial_balance_str = self._get_metadata('initial_balance')
- initial_balance = float(initial_balance_str) if initial_balance_str else 0.0
-
- start_date_iso = self._get_metadata('start_date')
- start_date_obj = datetime.fromisoformat(start_date_iso) if start_date_iso else datetime.now(timezone.utc)
- days_active = (datetime.now(timezone.utc) - start_date_obj).days + 1
-
- # 'last_trade' timestamp could be the last update to token_stats or an open trade
- last_activity_ts = token_stats_summary['overall_last_cycle_closed'] if token_stats_summary else None
- last_open_trade_ts_row = self.db_manager._fetchone_query("SELECT MAX(updated_at) as last_update FROM trades WHERE status = 'position_opened'")
- if last_open_trade_ts_row and last_open_trade_ts_row['last_update']:
- if not last_activity_ts or datetime.fromisoformat(last_open_trade_ts_row['last_update']) > datetime.fromisoformat(last_activity_ts):
- last_activity_ts = last_open_trade_ts_row['last_update']
- return {
- 'total_trades': total_trades_redefined,
- 'completed_trades': total_completed_cycles_sum,
- 'initial_balance': initial_balance,
- 'total_pnl': total_pnl_from_cycles,
- 'days_active': days_active,
- 'start_date': start_date_obj.strftime('%Y-%m-%d'),
- 'last_trade': last_activity_ts,
- 'open_positions_count': open_positions_count
- }
- def _get_open_positions_count_from_db(self) -> int:
- """Get count of open positions from trades table."""
- row = self.db_manager._fetchone_query("SELECT COUNT(DISTINCT symbol) as count FROM trades WHERE status = 'position_opened'")
- return row['count'] if row else 0
- def get_token_detailed_stats(self, token: str) -> Dict[str, Any]:
- """Get detailed statistics for a specific token."""
- try:
- # Normalize token case
- upper_token = _normalize_token_case(token)
-
- # Get aggregated stats from token_stats table
- token_agg_stats = self.db_manager._fetchone_query(
- "SELECT * FROM token_stats WHERE token = ?", (upper_token,)
- )
-
- # Get open trades for this token
- open_trades_for_token = self.db_manager._fetch_query(
- "SELECT * FROM trades WHERE status = 'position_opened' AND symbol LIKE ? ORDER BY position_opened_at DESC",
- (f"{upper_token}/%",)
- )
-
- # Initialize performance stats
- perf_stats = {
- 'completed_trades': 0,
- 'total_pnl': 0.0,
- 'pnl_percentage': 0.0,
- 'win_rate': 0.0,
- 'profit_factor': 0.0,
- 'avg_win': 0.0,
- 'avg_loss': 0.0,
- 'largest_win': 0.0,
- 'largest_loss': 0.0,
- 'expectancy': 0.0,
- 'total_wins': 0,
- 'total_losses': 0,
- 'completed_entry_volume': 0.0,
- 'completed_exit_volume': 0.0,
- 'total_cancelled': 0,
- 'total_duration_seconds': 0,
- 'avg_trade_duration': "N/A"
- }
-
- if token_agg_stats:
- total_cycles = token_agg_stats.get('total_completed_cycles', 0)
- winning_cycles = token_agg_stats.get('winning_cycles', 0)
- losing_cycles = token_agg_stats.get('losing_cycles', 0)
- sum_winning_pnl = token_agg_stats.get('sum_of_winning_pnl', 0.0)
- sum_losing_pnl = token_agg_stats.get('sum_of_losing_pnl', 0.0)
-
- # Calculate percentages for largest trades
- largest_win_pnl = token_agg_stats.get('largest_winning_cycle_pnl', 0.0)
- largest_loss_pnl = token_agg_stats.get('largest_losing_cycle_pnl', 0.0)
- largest_win_entry_volume = token_agg_stats.get('largest_winning_cycle_entry_volume', 0.0)
- largest_loss_entry_volume = token_agg_stats.get('largest_losing_cycle_entry_volume', 0.0)
-
- largest_win_percentage = (largest_win_pnl / largest_win_entry_volume * 100) if largest_win_entry_volume > 0 else 0.0
- largest_loss_percentage = (largest_loss_pnl / largest_loss_entry_volume * 100) if largest_loss_entry_volume > 0 else 0.0
-
- perf_stats.update({
- 'completed_trades': total_cycles,
- 'total_pnl': token_agg_stats.get('total_realized_pnl', 0.0),
- 'win_rate': (winning_cycles / total_cycles * 100) if total_cycles > 0 else 0.0,
- 'profit_factor': (sum_winning_pnl / sum_losing_pnl) if sum_losing_pnl > 0 else float('inf') if sum_winning_pnl > 0 else 0.0,
- 'avg_win': (sum_winning_pnl / winning_cycles) if winning_cycles > 0 else 0.0,
- 'avg_loss': (sum_losing_pnl / losing_cycles) if losing_cycles > 0 else 0.0,
- 'largest_win': largest_win_pnl,
- 'largest_loss': largest_loss_pnl,
- 'largest_win_percentage': largest_win_percentage,
- 'largest_loss_percentage': largest_loss_percentage,
- 'total_wins': winning_cycles,
- 'total_losses': losing_cycles,
- 'completed_entry_volume': token_agg_stats.get('total_entry_volume', 0.0),
- 'completed_exit_volume': token_agg_stats.get('total_exit_volume', 0.0),
- 'total_cancelled': token_agg_stats.get('total_cancelled_cycles', 0),
- 'total_duration_seconds': token_agg_stats.get('total_duration_seconds', 0)
- })
-
- # Calculate expectancy
- win_rate_decimal = perf_stats['win_rate'] / 100
- perf_stats['expectancy'] = (perf_stats['avg_win'] * win_rate_decimal) - (perf_stats['avg_loss'] * (1 - win_rate_decimal))
-
- # Format average trade duration
- if total_cycles > 0:
- avg_duration_seconds = token_agg_stats.get('total_duration_seconds', 0) / total_cycles
- perf_stats['avg_trade_duration'] = self._format_duration(avg_duration_seconds)
-
- # Calculate open positions summary
- open_positions_summary = []
- total_open_value = 0.0
- total_open_unrealized_pnl = 0.0
-
- for op_trade in open_trades_for_token:
- open_positions_summary.append({
- 'lifecycle_id': op_trade.get('trade_lifecycle_id'),
- 'side': op_trade.get('position_side'),
- 'amount': op_trade.get('current_position_size'),
- 'entry_price': op_trade.get('entry_price'),
- 'mark_price': op_trade.get('mark_price'),
- 'unrealized_pnl': op_trade.get('unrealized_pnl'),
- 'opened_at': op_trade.get('position_opened_at')
- })
- total_open_value += op_trade.get('value', 0.0)
- total_open_unrealized_pnl += op_trade.get('unrealized_pnl', 0.0)
-
- # Get open orders count for this token
- open_orders_count_row = self.db_manager._fetchone_query(
- "SELECT COUNT(*) as count FROM orders WHERE symbol LIKE ? AND status IN ('open', 'submitted', 'pending_trigger')",
- (f"{upper_token}/%",)
- )
- current_open_orders_for_token = open_orders_count_row['count'] if open_orders_count_row else 0
-
- effective_total_trades = perf_stats['completed_trades'] + len(open_trades_for_token)
-
- return {
- 'token': upper_token,
- 'message': f"Statistics for {upper_token}",
- 'performance_summary': perf_stats, # Expected key by formatting method
- 'performance': perf_stats, # Legacy compatibility
- 'open_positions': open_positions_summary, # Direct list as expected
- 'summary_total_trades': effective_total_trades, # Expected by formatting method
- 'summary_total_unrealized_pnl': total_open_unrealized_pnl, # Expected by formatting method
- 'current_open_orders_count': current_open_orders_for_token, # Expected by formatting method
- 'summary': {
- 'total_trades': effective_total_trades,
- 'open_orders': current_open_orders_for_token,
- }
- }
-
- except Exception as e:
- logger.error(f"❌ Error getting detailed stats for {token}: {e}")
- return {}
- def _format_duration(self, seconds: float) -> str:
- """Format duration in seconds to a human-readable string."""
- if seconds <= 0:
- return "0s"
-
- days = int(seconds // 86400)
- hours = int((seconds % 86400) // 3600)
- minutes = int((seconds % 3600) // 60)
- secs = int(seconds % 60)
-
- parts = []
- if days > 0:
- parts.append(f"{days}d")
- if hours > 0:
- parts.append(f"{hours}h")
- if minutes > 0:
- parts.append(f"{minutes}m")
- if secs > 0 or not parts:
- parts.append(f"{secs}s")
-
- return " ".join(parts)
- def format_stats_message(self, current_balance: Optional[float] = None) -> str:
- """Format stats for Telegram display using data from DB."""
- try:
- basic = self.get_basic_stats(current_balance)
- perf = self.get_performance_stats()
- risk = self.get_risk_metrics()
-
- formatter = get_formatter()
-
- effective_current_balance = current_balance if current_balance is not None else (basic['initial_balance'] + basic['total_pnl'])
- initial_bal = basic['initial_balance']
- total_pnl_val = effective_current_balance - initial_bal if initial_bal > 0 and current_balance is not None else basic['total_pnl']
- total_return_pct = (total_pnl_val / initial_bal * 100) if initial_bal > 0 else 0.0
- pnl_emoji = "🟢" if total_pnl_val >= 0 else "🔴"
- open_positions_count = basic['open_positions_count']
- stats_text_parts = []
- stats_text_parts.append(f"📊 <b>Trading Statistics</b>\n")
-
- # Account Overview
- stats_text_parts.append(f"\n💰 <b>Account Overview:</b>")
- stats_text_parts.append(f"• Current Balance: {formatter.format_price_with_symbol(effective_current_balance)}")
- stats_text_parts.append(f"• Initial Balance: {formatter.format_price_with_symbol(initial_bal)}")
- stats_text_parts.append(f"• Open Positions: {open_positions_count}")
- stats_text_parts.append(f"• {pnl_emoji} Total P&L: {formatter.format_price_with_symbol(total_pnl_val)} ({total_return_pct:+.2f}%)")
- stats_text_parts.append(f"• Days Active: {basic['days_active']}\n")
-
- # Performance Metrics
- stats_text_parts.append(f"\n🏆 <b>Performance Metrics:</b>")
- stats_text_parts.append(f"• Total Completed Trades: {basic['completed_trades']}")
- stats_text_parts.append(f"• Win Rate: {perf['win_rate']:.1f}% ({perf['total_wins']}/{basic['completed_trades']})")
- stats_text_parts.append(f"• Trading Volume (Entry Vol.): {formatter.format_price_with_symbol(perf.get('total_trading_volume', 0.0))}")
- stats_text_parts.append(f"• Profit Factor: {perf['profit_factor']:.2f}")
- stats_text_parts.append(f"• Expectancy: {formatter.format_price_with_symbol(perf['expectancy'])}")
-
- largest_win_pct_str = f" ({perf.get('largest_winning_percentage', 0):+.2f}%)" if perf.get('largest_winning_percentage', 0) != 0 else ""
- largest_loss_pct_str = f" ({perf.get('largest_losing_percentage', 0):+.2f}%)" if perf.get('largest_losing_percentage', 0) != 0 else ""
-
- stats_text_parts.append(f"• Largest Winning Trade: {formatter.format_price_with_symbol(perf['largest_win'])}{largest_win_pct_str}")
- stats_text_parts.append(f"• Largest Losing Trade: {formatter.format_price_with_symbol(-perf['largest_loss'])}{largest_loss_pct_str}")
- best_token_stats = perf.get('best_performing_token', {'name': 'N/A', 'pnl_percentage': 0.0, 'volume': 0.0, 'pnl_value': 0.0})
- worst_token_stats = perf.get('worst_performing_token', {'name': 'N/A', 'pnl_percentage': 0.0, 'volume': 0.0, 'pnl_value': 0.0})
-
- stats_text_parts.append(f"• Best Token: {best_token_stats['name']} {formatter.format_price_with_symbol(best_token_stats['pnl_value'])} ({best_token_stats['pnl_percentage']:+.2f}%)")
- stats_text_parts.append(f"• Worst Token: {worst_token_stats['name']} {formatter.format_price_with_symbol(worst_token_stats['pnl_value'])} ({worst_token_stats['pnl_percentage']:+.2f}%)")
-
- stats_text_parts.append(f"• Average Trade Duration: {perf.get('avg_trade_duration', 'N/A')}")
- stats_text_parts.append(f"• Portfolio Max Drawdown: {risk.get('max_drawdown_live_percentage', 0.0):.2f}% <i>(Live)</i>")
-
- # Session Info
- stats_text_parts.append(f"\n\n⏰ <b>Session Info:</b>")
- stats_text_parts.append(f"• Bot Started: {basic['start_date']}")
- stats_text_parts.append(f"• Stats Last Updated: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}")
-
- return "\n".join(stats_text_parts).strip()
-
- except Exception as e:
- logger.error(f"❌ Error formatting stats message: {e}", exc_info=True)
- return f"""📊 <b>Trading Statistics</b>\n\n❌ <b>Error loading statistics</b>\n\n🔧 <b>Debug info:</b> {str(e)[:100]}"""
- def format_token_stats_message(self, token: str) -> str:
- """Format detailed statistics for a specific token."""
- try:
- from src.utils.token_display_formatter import get_formatter
- formatter = get_formatter()
-
- token_stats_data = self.get_token_detailed_stats(token)
- token_name = token_stats_data.get('token', token.upper())
-
- if not token_stats_data or token_stats_data.get('summary_total_trades', 0) == 0:
- return (
- f"📊 <b>{token_name} Statistics</b>\n\n"
- f"📭 No trading data found for {token_name}.\n\n"
- f"💡 To trade this token, try commands like:\n"
- f" <code>/long {token_name} 100</code>\n"
- f" <code>/short {token_name} 100</code>"
- )
- perf_summary = token_stats_data.get('performance_summary', {})
- open_positions = token_stats_data.get('open_positions', [])
-
- parts = [f"📊 <b>{token_name.upper()} Detailed Statistics</b>\n"]
- # Completed Trades Summary
- parts.append("📈 <b>Completed Trades Summary:</b>")
- if perf_summary.get('completed_trades', 0) > 0:
- pnl_emoji = "🟢" if perf_summary.get('total_pnl', 0) >= 0 else "🔴"
- entry_vol = perf_summary.get('completed_entry_volume', 0.0)
- pnl_pct = (perf_summary.get('total_pnl', 0.0) / entry_vol * 100) if entry_vol > 0 else 0.0
-
- parts.append(f"• Total Completed: {perf_summary.get('completed_trades', 0)}")
- parts.append(f"• {pnl_emoji} Realized P&L: {formatter.format_price_with_symbol(perf_summary.get('total_pnl', 0.0))} ({pnl_pct:+.2f}%)")
- parts.append(f"• Win Rate: {perf_summary.get('win_rate', 0.0):.1f}% ({perf_summary.get('total_wins', 0)}W / {perf_summary.get('total_losses', 0)}L)")
- parts.append(f"• Profit Factor: {perf_summary.get('profit_factor', 0.0):.2f}")
- parts.append(f"• Expectancy: {formatter.format_price_with_symbol(perf_summary.get('expectancy', 0.0))}")
- parts.append(f"• Avg Win: {formatter.format_price_with_symbol(perf_summary.get('avg_win', 0.0))} | Avg Loss: {formatter.format_price_with_symbol(perf_summary.get('avg_loss', 0.0))}")
-
- # Format largest trades with percentages
- largest_win_pct_str = f" ({perf_summary.get('largest_win_percentage', 0):+.2f}%)" if perf_summary.get('largest_win_percentage', 0) != 0 else ""
- largest_loss_pct_str = f" ({perf_summary.get('largest_loss_percentage', 0):+.2f}%)" if perf_summary.get('largest_loss_percentage', 0) != 0 else ""
-
- parts.append(f"• Largest Win: {formatter.format_price_with_symbol(perf_summary.get('largest_win', 0.0))}{largest_win_pct_str} | Largest Loss: {formatter.format_price_with_symbol(perf_summary.get('largest_loss', 0.0))}{largest_loss_pct_str}")
- parts.append(f"• Entry Volume: {formatter.format_price_with_symbol(perf_summary.get('completed_entry_volume', 0.0))}")
- parts.append(f"• Exit Volume: {formatter.format_price_with_symbol(perf_summary.get('completed_exit_volume', 0.0))}")
- parts.append(f"• Average Trade Duration: {perf_summary.get('avg_trade_duration', 'N/A')}")
- parts.append(f"• Cancelled Cycles: {perf_summary.get('total_cancelled', 0)}")
- else:
- parts.append("• No completed trades for this token yet.")
- parts.append("")
- # Open Positions
- parts.append("📉 <b>Current Open Positions:</b>")
- if open_positions:
- total_open_unrealized_pnl = token_stats_data.get('summary_total_unrealized_pnl', 0.0)
- open_pnl_emoji = "🟢" if total_open_unrealized_pnl >= 0 else "🔴"
-
- for pos in open_positions:
- pos_side_emoji = "🟢" if pos.get('side') == 'long' else "🔴"
- pos_pnl_emoji = "🟢" if pos.get('unrealized_pnl', 0) >= 0 else "🔴"
- opened_at_str = "N/A"
- if pos.get('opened_at'):
- try:
- from datetime import datetime
- opened_at_dt = datetime.fromisoformat(pos['opened_at'])
- opened_at_str = opened_at_dt.strftime('%Y-%m-%d %H:%M')
- except:
- pass
-
- parts.append(f"• {pos_side_emoji} {pos.get('side', '').upper()} {formatter.format_amount(abs(pos.get('amount',0)), token_name)} {token_name}")
- parts.append(f" Entry: {formatter.format_price_with_symbol(pos.get('entry_price',0), token_name)} | Mark: {formatter.format_price_with_symbol(pos.get('mark_price',0), token_name)}")
- parts.append(f" {pos_pnl_emoji} Unrealized P&L: {formatter.format_price_with_symbol(pos.get('unrealized_pnl',0))}")
- parts.append(f" Opened: {opened_at_str} | ID: ...{pos.get('lifecycle_id', '')[-6:]}")
- parts.append(f" {open_pnl_emoji} <b>Total Open P&L: {formatter.format_price_with_symbol(total_open_unrealized_pnl)}</b>")
- else:
- parts.append("• No open positions for this token.")
- parts.append("")
- parts.append(f"📋 Open Orders (Exchange): {token_stats_data.get('current_open_orders_count', 0)}")
- parts.append(f"💡 Use <code>/performance {token_name}</code> for another view including recent trades.")
-
- return "\n".join(parts)
-
- except Exception as e:
- logger.error(f"❌ Error formatting token stats message for {token}: {e}", exc_info=True)
- return f"❌ Error generating statistics for {token}: {str(e)[:100]}"
- # =============================================================================
- # CONVENIENCE METHODS & HIGH-LEVEL OPERATIONS
- # =============================================================================
-
- def process_trade_complete_cycle(self, symbol: str, side: str, entry_price: float,
- exit_price: float, amount: float,
- timestamp: Optional[str] = None) -> str:
- """Process a complete trade cycle in one operation."""
- # Create lifecycle
- lifecycle_id = self.create_trade_lifecycle(symbol, side, trade_type='complete_cycle')
- if not lifecycle_id:
- raise Exception("Failed to create trade lifecycle")
-
- # Update to position opened
- success = self.update_trade_position_opened(lifecycle_id, entry_price, amount, "manual_entry")
- if not success:
- raise Exception("Failed to update position opened")
-
- # Calculate PnL
- if side.lower() == 'buy':
- realized_pnl = (exit_price - entry_price) * amount
- else: # sell
- realized_pnl = (entry_price - exit_price) * amount
-
- # Update to position closed
- success = self.update_trade_position_closed(lifecycle_id, exit_price, realized_pnl, "manual_exit")
- if not success:
- raise Exception("Failed to update position closed")
-
- # Migrate to aggregated stats
- self.migrate_trade_to_aggregated_stats(lifecycle_id)
-
- logger.info(f"✅ Processed complete trade cycle: {symbol} {side.upper()} P&L: ${realized_pnl:.2f}")
- return lifecycle_id
- def get_summary_report(self) -> Dict[str, Any]:
- """Get comprehensive summary report."""
- try:
- perf_stats = self.get_performance_stats()
- token_performance = self.get_token_performance(limit=10)
- daily_stats = self.get_daily_stats(limit=7)
- risk_metrics = self.get_risk_metrics()
- balance_adjustments = self.get_balance_adjustments_summary()
-
- # Get current positions
- open_positions = self.get_open_positions()
-
- return {
- 'performance_stats': perf_stats,
- 'top_tokens': token_performance,
- 'recent_daily_stats': daily_stats,
- 'risk_metrics': risk_metrics,
- 'balance_adjustments': balance_adjustments,
- 'open_positions_count': len(open_positions),
- 'open_positions': open_positions,
- 'generated_at': datetime.now(timezone.utc).isoformat()
- }
-
- except Exception as e:
- logger.error(f"❌ Error generating summary report: {e}")
- return {'error': str(e)}
- def record_trade(self, symbol: str, side: str, amount: float, price: float,
- exchange_fill_id: Optional[str] = None, trade_type: str = "manual",
- pnl: Optional[float] = None, timestamp: Optional[str] = None,
- linked_order_table_id_to_link: Optional[int] = None):
- """Record a trade directly in the database (used for unmatched external fills)."""
- if timestamp is None:
- timestamp = datetime.now(timezone.utc).isoformat()
-
- value = amount * price
-
- try:
- self.db_manager._execute_query(
- "INSERT OR IGNORE INTO trades (symbol, side, amount, price, value, trade_type, timestamp, exchange_fill_id, pnl, linked_order_table_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
- (symbol, side, amount, price, value, trade_type, timestamp, exchange_fill_id, pnl or 0.0, linked_order_table_id_to_link)
- )
-
- formatter = get_formatter()
- base_asset_for_amount = symbol.split('/')[0] if '/' in symbol else symbol
- logger.info(f"📈 Trade recorded: {side.upper()} {formatter.format_amount(amount, base_asset_for_amount)} {symbol} @ {formatter.format_price(price, symbol)} ({formatter.format_price(value, symbol)}) [{trade_type}]")
-
- except Exception as e:
- logger.error(f"Failed to record trade: {e}")
- def health_check(self) -> Dict[str, Any]:
- """Perform health check on all components."""
- try:
- health = {
- 'database': 'ok',
- 'order_manager': 'ok',
- 'trade_manager': 'ok',
- 'aggregation_manager': 'ok',
- 'performance_calculator': 'ok',
- 'overall': 'ok'
- }
-
- # Test database connection
- self.db_manager._fetch_query("SELECT 1")
-
- # Test each component with basic operations
- self.get_recent_orders(limit=1)
- self.get_recent_trades(limit=1)
- self.get_daily_stats(limit=1)
- self.get_performance_stats()
-
- return health
-
- except Exception as e:
- logger.error(f"❌ Health check failed: {e}")
- return {'overall': 'error', 'error': str(e)}
|