123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951 |
- #!/usr/bin/env python3
- """
- Trading Statistics Tracker (Refactored Version)
- Main class that coordinates between specialized manager components.
- """
- import logging
- from datetime import datetime, timezone
- from typing import Dict, List, Any, Optional, Tuple, Union
- import math
- import numpy as np
- import uuid
- from .database_manager import DatabaseManager
- from .order_manager import OrderManager
- from .trade_lifecycle_manager import TradeLifecycleManager
- from .aggregation_manager import AggregationManager
- from .performance_calculator import PerformanceCalculator
- from src.utils.token_display_formatter import get_formatter
- logger = logging.getLogger(__name__)
- def _normalize_token_case(token: str) -> str:
- """Normalize token case for consistency."""
- if any(c.isupper() for c in token):
- return token # Keep original case for mixed-case tokens
- else:
- return token.upper() # Convert to uppercase for all-lowercase
- class TradingStats:
- """Refactored trading statistics tracker using modular components."""
- def __init__(self, db_path: str = "data/trading_stats.sqlite"):
- """Initialize with all manager components."""
- # Initialize core database manager
- self.db_manager = DatabaseManager(db_path)
-
- # Initialize specialized managers
- self.order_manager = OrderManager(self.db_manager)
- self.trade_manager = TradeLifecycleManager(self.db_manager)
- self.aggregation_manager = AggregationManager(self.db_manager)
- self.performance_calculator = PerformanceCalculator(self.db_manager)
-
- logger.info("🚀 TradingStats initialized with modular components")
- def close(self):
- """Close database connection."""
- self.db_manager.close()
- # =============================================================================
- # COMPATIBILITY METHODS - Direct exposure of internal methods
- # =============================================================================
-
- def _get_metadata(self, key: str) -> Optional[str]:
- """Get metadata from database."""
- return self.db_manager._get_metadata(key)
-
- def _set_metadata(self, key: str, value: str):
- """Set metadata in database."""
- return self.db_manager._set_metadata(key, value)
- # =============================================================================
- # DATABASE MANAGEMENT DELEGATION
- # =============================================================================
-
- async def set_initial_balance(self, balance: float):
- """Set initial balance."""
- return await self.db_manager.set_initial_balance(balance)
-
- def get_initial_balance(self) -> float:
- """Get initial balance."""
- return self.db_manager.get_initial_balance()
-
- async def record_balance_snapshot(self, balance: float, unrealized_pnl: float = 0.0,
- timestamp: Optional[str] = None, notes: Optional[str] = None):
- """Record balance snapshot."""
- return await self.db_manager.record_balance_snapshot(balance, unrealized_pnl, timestamp, notes)
-
- def purge_old_balance_history(self, days_to_keep: int = 30) -> int:
- """Purge old balance history."""
- return self.db_manager.purge_old_balance_history(days_to_keep)
-
- def get_balance_history_record_count(self) -> int:
- """Get balance history record count."""
- return self.db_manager.get_balance_history_record_count()
-
- def purge_old_daily_aggregated_stats(self, days_to_keep: int = 365) -> int:
- """Purge old daily aggregated stats."""
- return self.db_manager.purge_old_daily_aggregated_stats(days_to_keep)
- # =============================================================================
- # ORDER MANAGEMENT DELEGATION
- # =============================================================================
-
- def record_order_placed(self, symbol: str, side: str, order_type: str,
- amount_requested: float, price: Optional[float] = None,
- bot_order_ref_id: Optional[str] = None,
- exchange_order_id: Optional[str] = None,
- timestamp: Optional[str] = None,
- status: str = 'open') -> bool:
- """Record order placement."""
- result = self.order_manager.record_order_placed(
- symbol, side, order_type, amount_requested, price,
- bot_order_ref_id, exchange_order_id, status
- )
- return result is not None
-
- def update_order_exchange_id(self, bot_order_ref_id: str, exchange_order_id: str) -> bool:
- """Update order with exchange ID."""
- return self.order_manager.update_order_exchange_id(bot_order_ref_id, exchange_order_id)
-
- def record_order_filled(self, exchange_order_id: str, actual_amount: float,
- actual_price: float, fees: float = 0.0,
- timestamp: Optional[str] = None,
- exchange_fill_id: Optional[str] = None) -> bool:
- """Record order fill."""
- return self.order_manager.record_order_filled(
- exchange_order_id, actual_amount, actual_price, fees, timestamp, exchange_fill_id
- )
-
- def record_order_cancelled(self, exchange_order_id: str, reason: str = "user_cancelled",
- timestamp: Optional[str] = None) -> bool:
- """Record order cancellation."""
- return self.order_manager.record_order_cancelled(exchange_order_id, reason, timestamp)
-
- def update_order_status(self, order_db_id: Optional[int] = None, bot_order_ref_id: Optional[str] = None,
- exchange_order_id: Optional[str] = None, new_status: Optional[str] = None,
- amount_filled_increment: Optional[float] = None, set_exchange_order_id: Optional[str] = None,
- notes: Optional[str] = None, timestamp: Optional[str] = None) -> bool:
- """Update order status - delegates to OrderManager with full parameter support."""
- return self.order_manager.update_order_status(
- order_db_id=order_db_id,
- bot_order_ref_id=bot_order_ref_id,
- exchange_order_id=exchange_order_id,
- new_status=new_status,
- amount_filled_increment=amount_filled_increment,
- set_exchange_order_id=set_exchange_order_id
- )
-
- def get_order_by_exchange_id(self, exchange_order_id: str) -> Optional[Dict[str, Any]]:
- """Get order by exchange ID."""
- return self.order_manager.get_order_by_exchange_id(exchange_order_id)
-
- def get_order_by_bot_ref_id(self, bot_order_ref_id: str) -> Optional[Dict[str, Any]]:
- """Get order by bot reference ID."""
- return self.order_manager.get_order_by_bot_ref_id(bot_order_ref_id)
- def has_exchange_fill_been_processed(self, exchange_fill_id: str) -> bool:
- """Check if an exchange fill ID has already been processed."""
- try:
- # Check trades table which is the primary place where all processed fills are recorded
- trade_exists = self.db_manager._fetch_query(
- "SELECT 1 FROM trades WHERE exchange_fill_id = ? LIMIT 1",
- (exchange_fill_id,)
- )
-
- return bool(trade_exists)
- except Exception as e:
- logger.error(f"Error checking if fill {exchange_fill_id} was processed: {e}")
- return False
-
- def get_orders_by_symbol(self, symbol: str, limit: int = 50) -> List[Dict[str, Any]]:
- """Get orders by symbol."""
- return self.order_manager.get_orders_by_symbol(symbol, limit)
-
- def get_orders_by_status(self, status: str, limit: Optional[int] = 50,
- order_type_filter: Optional[str] = None,
- parent_bot_order_ref_id: Optional[str] = None) -> List[Dict[str, Any]]:
- """Get orders by status with optional filters."""
- # OrderManager expects (status, order_type_filter, parent_bot_order_ref_id) without limit
- return self.order_manager.get_orders_by_status(status, order_type_filter, parent_bot_order_ref_id)
-
- def get_recent_orders(self, limit: int = 20) -> List[Dict[str, Any]]:
- """Get recent orders."""
- return self.order_manager.get_recent_orders(limit)
-
- def cleanup_old_cancelled_orders(self, days_old: int = 7) -> int:
- """Clean up old cancelled orders."""
- return self.order_manager.cleanup_old_cancelled_orders(days_old)
- # =============================================================================
- # TRADE LIFECYCLE DELEGATION
- # =============================================================================
-
- def create_trade_lifecycle(self, symbol: str, side: str, entry_order_id: Optional[str] = None,
- entry_bot_order_ref_id: Optional[str] = None,
- stop_loss_price: Optional[float] = None,
- take_profit_price: Optional[float] = None,
- trade_type: str = 'manual') -> Optional[str]:
- """Create trade lifecycle."""
- return self.trade_manager.create_trade_lifecycle(
- symbol, side, entry_order_id, entry_bot_order_ref_id,
- stop_loss_price, take_profit_price, trade_type
- )
-
- async def update_trade_position_opened(self, lifecycle_id: str, entry_price: float,
- entry_amount: float, exchange_fill_id: str) -> bool:
- """Update trade position opened."""
- return await self.trade_manager.update_trade_position_opened(
- lifecycle_id, entry_price, entry_amount, exchange_fill_id
- )
-
- async def update_trade_position_closed(self, lifecycle_id: str, exit_price: float,
- realized_pnl: float, exchange_fill_id: str) -> bool:
- """Update trade position closed."""
- return await self.trade_manager.update_trade_position_closed(
- lifecycle_id, exit_price, realized_pnl, exchange_fill_id
- )
-
- def update_trade_cancelled(self, lifecycle_id: str, reason: str = "order_cancelled") -> bool:
- """Update trade cancelled."""
- return self.trade_manager.update_trade_cancelled(lifecycle_id, reason)
-
- async def link_stop_loss_to_trade(self, lifecycle_id: str, stop_loss_order_id: str,
- stop_loss_price: float) -> bool:
- """Link stop loss to trade."""
- return await self.trade_manager.link_stop_loss_to_trade(
- lifecycle_id, stop_loss_order_id, stop_loss_price
- )
-
- async def link_take_profit_to_trade(self, lifecycle_id: str, take_profit_order_id: str,
- take_profit_price: float) -> bool:
- """Link take profit to trade."""
- return await self.trade_manager.link_take_profit_to_trade(
- lifecycle_id, take_profit_order_id, take_profit_price
- )
-
- def get_trade_by_lifecycle_id(self, lifecycle_id: str) -> Optional[Dict[str, Any]]:
- """Get trade by lifecycle ID."""
- return self.trade_manager.get_trade_by_lifecycle_id(lifecycle_id)
-
- def get_trade_by_symbol_and_status(self, symbol: str, status: str = 'position_opened') -> Optional[Dict[str, Any]]:
- """Get trade by symbol and status."""
- return self.trade_manager.get_trade_by_symbol_and_status(symbol, status)
-
- def get_open_positions(self, symbol: Optional[str] = None) -> List[Dict[str, Any]]:
- """Get open positions."""
- return self.trade_manager.get_open_positions(symbol)
-
- def get_trades_by_status(self, status: str, limit: int = 50) -> List[Dict[str, Any]]:
- """Get trades by status."""
- return self.trade_manager.get_trades_by_status(status, limit)
-
- def get_lifecycle_by_entry_order_id(self, entry_exchange_order_id: str, status: Optional[str] = None) -> Optional[Dict[str, Any]]:
- """Get lifecycle by entry order ID."""
- return self.trade_manager.get_lifecycle_by_entry_order_id(entry_exchange_order_id, status)
-
- def get_lifecycle_by_sl_order_id(self, sl_exchange_order_id: str, status: str = 'position_opened') -> Optional[Dict[str, Any]]:
- """Get lifecycle by stop loss order ID."""
- return self.trade_manager.get_lifecycle_by_sl_order_id(sl_exchange_order_id, status)
-
- def get_lifecycle_by_tp_order_id(self, tp_exchange_order_id: str, status: str = 'position_opened') -> Optional[Dict[str, Any]]:
- """Get lifecycle by take profit order ID."""
- return self.trade_manager.get_lifecycle_by_tp_order_id(tp_exchange_order_id, status)
-
- def get_pending_stop_loss_activations(self) -> List[Dict[str, Any]]:
- """Get pending stop loss activations."""
- return self.trade_manager.get_pending_stop_loss_activations()
-
- def cleanup_old_cancelled_trades(self, days_old: int = 7) -> int:
- """Clean up old cancelled trades."""
- return self.trade_manager.cleanup_old_cancelled_trades(days_old)
-
- def confirm_position_with_exchange(self, symbol: str, exchange_position_size: float,
- exchange_open_orders: List[Dict]) -> bool:
- """Confirm position with exchange."""
- return self.trade_manager.confirm_position_with_exchange(
- symbol, exchange_position_size, exchange_open_orders
- )
-
- def update_trade_market_data(self, trade_lifecycle_id: str, **kwargs) -> bool:
- """Update trade market data."""
- return self.trade_manager.update_trade_market_data(trade_lifecycle_id, **kwargs)
-
- def get_recent_trades(self, limit: int = 10) -> List[Dict[str, Any]]:
- """Get recent trades."""
- return self.trade_manager.get_recent_trades(limit)
-
- def get_all_trades(self) -> List[Dict[str, Any]]:
- """Get all trades."""
- return self.trade_manager.get_all_trades()
- def cancel_linked_orders(self, parent_bot_order_ref_id: str, new_status: str = 'cancelled_parent_filled') -> int:
- """Cancel linked SL/TP orders when a parent order is filled or cancelled."""
- return self.trade_manager.cancel_linked_orders(parent_bot_order_ref_id, new_status)
- # =============================================================================
- # AGGREGATION MANAGEMENT DELEGATION
- # =============================================================================
-
- def migrate_trade_to_aggregated_stats(self, trade_lifecycle_id: str):
- """Migrate completed trade to aggregated stats."""
- return self.aggregation_manager.migrate_trade_to_aggregated_stats(trade_lifecycle_id)
-
- async def record_deposit(self, amount: float, timestamp: Optional[str] = None,
- deposit_id: Optional[str] = None, description: Optional[str] = None):
- """Record a deposit."""
- return await self.aggregation_manager.record_deposit(amount, timestamp, deposit_id, description)
-
- async def record_withdrawal(self, amount: float, timestamp: Optional[str] = None,
- withdrawal_id: Optional[str] = None, description: Optional[str] = None):
- """Record a withdrawal."""
- return await self.aggregation_manager.record_withdrawal(amount, timestamp, withdrawal_id, description)
-
- def get_balance_adjustments_summary(self) -> Dict[str, Any]:
- """Get summary of balance adjustments."""
- return self.aggregation_manager.get_balance_adjustments_summary()
-
- def get_daily_stats(self, limit: int = 10) -> List[Dict[str, Any]]:
- """Get daily stats."""
- return self.aggregation_manager.get_daily_stats(limit)
-
- def get_weekly_stats(self, limit: int = 10) -> List[Dict[str, Any]]:
- """Get weekly stats."""
- return self.aggregation_manager.get_weekly_stats(limit)
-
- def get_monthly_stats(self, limit: int = 10) -> List[Dict[str, Any]]:
- """Get monthly stats."""
- return self.aggregation_manager.get_monthly_stats(limit)
- # =============================================================================
- # PERFORMANCE CALCULATION DELEGATION
- # =============================================================================
-
- def get_performance_stats(self) -> Dict[str, Any]:
- """Get performance stats."""
- return self.performance_calculator.get_performance_stats()
-
- def get_token_performance(self, token: Optional[str] = None) -> Union[List[Dict[str, Any]], Dict[str, Any]]:
- """Get performance data for a specific token or all tokens."""
- try:
- if token:
- # Get performance for specific token
- query = """
- SELECT
- symbol,
- COUNT(*) as total_trades,
- SUM(CASE WHEN realized_pnl > 0 THEN 1 ELSE 0 END) as winning_trades,
- SUM(realized_pnl) as total_pnl,
- AVG(realized_pnl) as avg_trade,
- MAX(realized_pnl) as largest_win,
- MIN(realized_pnl) as largest_loss,
- AVG(CASE WHEN realized_pnl > 0 THEN realized_pnl ELSE NULL END) as avg_win,
- AVG(CASE WHEN realized_pnl < 0 THEN realized_pnl ELSE NULL END) as avg_loss
- FROM trades
- WHERE symbol = ? AND status = 'position_closed'
- GROUP BY symbol
- """
- result = self.db_manager._fetchone_query(query, (token,))
-
- if not result:
- return {}
-
- # Calculate win rate
- total_trades = result['total_trades']
- winning_trades = result['winning_trades']
- win_rate = (winning_trades / total_trades * 100) if total_trades > 0 else 0
-
- # Get recent trades
- recent_trades_query = """
- SELECT
- side,
- entry_price,
- exit_price,
- realized_pnl as pnl,
- position_opened_at,
- position_closed_at
- FROM trades
- WHERE symbol = ? AND status = 'position_closed'
- ORDER BY position_closed_at DESC
- LIMIT 5
- """
- recent_trades = self.db_manager._fetch_query(recent_trades_query, (token,))
-
- return {
- 'token': token,
- 'total_trades': total_trades,
- 'winning_trades': winning_trades,
- 'win_rate': win_rate,
- 'total_pnl': result['total_pnl'],
- 'avg_trade': result['avg_trade'],
- 'largest_win': result['largest_win'],
- 'largest_loss': result['largest_loss'],
- 'avg_win': result['avg_win'],
- 'avg_loss': result['avg_loss'],
- 'recent_trades': recent_trades
- }
- else:
- # Get performance for all tokens
- query = """
- SELECT
- symbol,
- COUNT(*) as total_trades,
- SUM(CASE WHEN realized_pnl > 0 THEN 1 ELSE 0 END) as winning_trades,
- SUM(realized_pnl) as total_pnl,
- AVG(realized_pnl) as avg_trade
- FROM trades
- WHERE status = 'position_closed'
- GROUP BY symbol
- ORDER BY total_pnl DESC
- """
- results = self.db_manager._fetch_query(query)
-
- performance_data = []
- for result in results:
- total_trades = result['total_trades']
- winning_trades = result['winning_trades']
- win_rate = (winning_trades / total_trades * 100) if total_trades > 0 else 0
-
- performance_data.append({
- 'token': result['symbol'],
- 'total_trades': total_trades,
- 'winning_trades': winning_trades,
- 'win_rate': win_rate,
- 'total_pnl': result['total_pnl'],
- 'avg_trade': result['avg_trade']
- })
-
- return performance_data
-
- except Exception as e:
- logger.error(f"Error getting token performance: {e}")
- return [] if token is None else {}
-
- def get_balance_history(self, days: int = 30) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]:
- """Get balance history."""
- return self.performance_calculator.get_balance_history(days)
-
- def get_live_max_drawdown(self) -> Tuple[float, float]:
- """Get live max drawdown."""
- return self.performance_calculator.get_live_max_drawdown()
-
- def update_live_max_drawdown(self, current_balance: float) -> bool:
- """Update live max drawdown."""
- return self.performance_calculator.update_live_max_drawdown(current_balance)
-
- def get_drawdown_monitor_data(self) -> Dict[str, float]:
- """Get drawdown data from DrawdownMonitor for external monitoring systems."""
- try:
- peak_balance = float(self._get_metadata('drawdown_peak_balance') or '0.0')
- max_drawdown_pct = float(self._get_metadata('drawdown_max_drawdown_pct') or '0.0')
- return {
- 'peak_balance': peak_balance,
- 'max_drawdown_percentage': max_drawdown_pct
- }
- except (ValueError, TypeError):
- return {'peak_balance': 0.0, 'max_drawdown_percentage': 0.0}
-
- def calculate_sharpe_ratio(self, days: int = 30) -> Optional[float]:
- """Calculate Sharpe ratio."""
- return self.performance_calculator.calculate_sharpe_ratio(days)
-
- def calculate_max_consecutive_losses(self) -> int:
- """Calculate max consecutive losses."""
- return self.performance_calculator.calculate_max_consecutive_losses()
-
- def get_risk_metrics(self) -> Dict[str, Any]:
- """Get risk metrics."""
- return self.performance_calculator.get_risk_metrics()
-
- def get_period_performance(self, start_date: str, end_date: str) -> Dict[str, Any]:
- """Get period performance."""
- return self.performance_calculator.get_period_performance(start_date, end_date)
-
- def get_recent_performance_trend(self, days: int = 7) -> Dict[str, Any]:
- """Get recent performance trend."""
- return self.performance_calculator.get_recent_performance_trend(days)
- # =============================================================================
- # COMPATIBILITY METHODS - Legacy API Support
- # =============================================================================
-
- def get_basic_stats(self, current_balance: Optional[float] = None) -> Dict[str, Any]:
- """Get basic trading statistics from DB, primarily using aggregated tables."""
-
- # Get counts of open positions (trades that are not yet migrated)
- open_positions_count = self._get_open_positions_count_from_db()
- # Get overall aggregated stats from token_stats table
- query_token_stats_summary = """
- SELECT
- SUM(total_realized_pnl) as total_pnl_from_cycles,
- SUM(total_completed_cycles) as total_completed_cycles_sum,
- MIN(first_cycle_closed_at) as overall_first_cycle_closed,
- MAX(last_cycle_closed_at) as overall_last_cycle_closed
- FROM token_stats
- """
- token_stats_summary = self.db_manager._fetchone_query(query_token_stats_summary)
- total_pnl_from_cycles = token_stats_summary['total_pnl_from_cycles'] if token_stats_summary and token_stats_summary['total_pnl_from_cycles'] is not None else 0.0
- total_completed_cycles_sum = token_stats_summary['total_completed_cycles_sum'] if token_stats_summary and token_stats_summary['total_completed_cycles_sum'] is not None else 0
- # Total trades considered as sum of completed cycles and currently open positions
- total_trades_redefined = total_completed_cycles_sum + open_positions_count
- initial_balance_str = self._get_metadata('initial_balance')
- initial_balance = float(initial_balance_str) if initial_balance_str else 0.0
-
- start_date_iso = self._get_metadata('start_date')
- start_date_obj = datetime.fromisoformat(start_date_iso) if start_date_iso else datetime.now(timezone.utc)
- days_active = (datetime.now(timezone.utc) - start_date_obj).days + 1
-
- # Get last activity timestamp
- last_activity_ts = None
- last_activity_query = """
- SELECT MAX(updated_at) as last_update
- FROM trades
- WHERE status IN ('position_opened', 'position_closed')
- """
- last_activity_row = self.db_manager._fetchone_query(last_activity_query)
- if last_activity_row and last_activity_row['last_update']:
- last_activity_ts = last_activity_row['last_update']
- # Ensure timezone-aware
- if isinstance(last_activity_ts, str):
- last_activity_ts = datetime.fromisoformat(last_activity_ts)
- if last_activity_ts.tzinfo is None:
- last_activity_ts = last_activity_ts.replace(tzinfo=timezone.utc)
- # Get last open trade timestamp
- last_open_trade_query = """
- SELECT MAX(updated_at) as last_update
- FROM trades
- WHERE status = 'position_opened'
- """
- last_open_trade_ts_row = self.db_manager._fetchone_query(last_open_trade_query)
- if last_open_trade_ts_row and last_open_trade_ts_row['last_update']:
- last_open_trade_ts = last_open_trade_ts_row['last_update']
- # Ensure timezone-aware
- if isinstance(last_open_trade_ts, str):
- last_open_trade_ts = datetime.fromisoformat(last_open_trade_ts)
- if last_open_trade_ts.tzinfo is None:
- last_open_trade_ts = last_open_trade_ts.replace(tzinfo=timezone.utc)
- # Now both datetimes are timezone-aware, we can compare them
- if not last_activity_ts or last_open_trade_ts > last_activity_ts:
- last_activity_ts = last_open_trade_ts
- return {
- 'total_trades': total_trades_redefined,
- 'completed_trades': total_completed_cycles_sum,
- 'initial_balance': initial_balance,
- 'total_pnl': total_pnl_from_cycles,
- 'days_active': days_active,
- 'start_date': start_date_obj.strftime('%Y-%m-%d'),
- 'last_trade': last_activity_ts,
- 'open_positions_count': open_positions_count
- }
- def _get_open_positions_count_from_db(self) -> int:
- """Get count of open positions from trades table."""
- row = self.db_manager._fetchone_query("SELECT COUNT(DISTINCT symbol) as count FROM trades WHERE status = 'position_opened'")
- return row['count'] if row else 0
- def get_token_detailed_stats(self, token: str) -> Dict[str, Any]:
- """Get detailed statistics for a specific token."""
- try:
- # Normalize token case
- upper_token = _normalize_token_case(token)
-
- # Get aggregated stats from token_stats table
- token_agg_stats = self.db_manager._fetchone_query(
- "SELECT * FROM token_stats WHERE token = ?", (upper_token,)
- )
-
- # Get open trades for this token
- open_trades_for_token = self.db_manager._fetch_query(
- "SELECT * FROM trades WHERE status = 'position_opened' AND symbol LIKE ? ORDER BY position_opened_at DESC",
- (f"{upper_token}/%",)
- )
-
- # Initialize performance stats
- perf_stats = {
- 'completed_trades': 0,
- 'total_pnl': 0.0,
- 'pnl_percentage': 0.0,
- 'win_rate': 0.0,
- 'profit_factor': 0.0,
- 'avg_win': 0.0,
- 'avg_loss': 0.0,
- 'largest_win': 0.0,
- 'largest_loss': 0.0,
- 'expectancy': 0.0,
- 'total_wins': 0,
- 'total_losses': 0,
- 'completed_entry_volume': 0.0,
- 'completed_exit_volume': 0.0,
- 'total_cancelled': 0,
- 'total_duration_seconds': 0,
- 'avg_trade_duration': "N/A"
- }
-
- if token_agg_stats:
- total_cycles = token_agg_stats.get('total_completed_cycles', 0)
- winning_cycles = token_agg_stats.get('winning_cycles', 0)
- losing_cycles = token_agg_stats.get('losing_cycles', 0)
- sum_winning_pnl = token_agg_stats.get('sum_of_winning_pnl', 0.0)
- sum_losing_pnl = token_agg_stats.get('sum_of_losing_pnl', 0.0)
-
- # Calculate percentages for largest trades
- largest_win_pnl = token_agg_stats.get('largest_winning_cycle_pnl', 0.0)
- largest_loss_pnl = token_agg_stats.get('largest_losing_cycle_pnl', 0.0)
- largest_win_entry_volume = token_agg_stats.get('largest_winning_cycle_entry_volume', 0.0)
- largest_loss_entry_volume = token_agg_stats.get('largest_losing_cycle_entry_volume', 0.0)
-
- largest_win_percentage = (largest_win_pnl / largest_win_entry_volume * 100) if largest_win_entry_volume > 0 else 0.0
- largest_loss_percentage = (largest_loss_pnl / largest_loss_entry_volume * 100) if largest_loss_entry_volume > 0 else 0.0
-
- perf_stats.update({
- 'completed_trades': total_cycles,
- 'total_pnl': token_agg_stats.get('total_realized_pnl', 0.0),
- 'win_rate': (winning_cycles / total_cycles * 100) if total_cycles > 0 else 0.0,
- 'profit_factor': (sum_winning_pnl / sum_losing_pnl) if sum_losing_pnl > 0 else float('inf') if sum_winning_pnl > 0 else 0.0,
- 'avg_win': (sum_winning_pnl / winning_cycles) if winning_cycles > 0 else 0.0,
- 'avg_loss': (sum_losing_pnl / losing_cycles) if losing_cycles > 0 else 0.0,
- 'largest_win': largest_win_pnl,
- 'largest_loss': largest_loss_pnl,
- 'largest_win_percentage': largest_win_percentage,
- 'largest_loss_percentage': largest_loss_percentage,
- 'total_wins': winning_cycles,
- 'total_losses': losing_cycles,
- 'completed_entry_volume': token_agg_stats.get('total_entry_volume', 0.0),
- 'completed_exit_volume': token_agg_stats.get('total_exit_volume', 0.0),
- 'total_cancelled': token_agg_stats.get('total_cancelled_cycles', 0),
- 'total_duration_seconds': token_agg_stats.get('total_duration_seconds', 0)
- })
-
- # Calculate expectancy
- win_rate_decimal = perf_stats['win_rate'] / 100
- perf_stats['expectancy'] = (perf_stats['avg_win'] * win_rate_decimal) - (perf_stats['avg_loss'] * (1 - win_rate_decimal))
-
- # Format average trade duration
- if total_cycles > 0:
- avg_duration_seconds = token_agg_stats.get('total_duration_seconds', 0) / total_cycles
- perf_stats['avg_trade_duration'] = self._format_duration(avg_duration_seconds)
-
- # Calculate open positions summary
- open_positions_summary = []
- total_open_value = 0.0
- total_open_unrealized_pnl = 0.0
-
- for op_trade in open_trades_for_token:
- open_positions_summary.append({
- 'lifecycle_id': op_trade.get('trade_lifecycle_id'),
- 'side': op_trade.get('position_side'),
- 'amount': op_trade.get('current_position_size'),
- 'entry_price': op_trade.get('entry_price'),
- 'mark_price': op_trade.get('mark_price'),
- 'unrealized_pnl': op_trade.get('unrealized_pnl'),
- 'opened_at': op_trade.get('position_opened_at')
- })
- total_open_value += op_trade.get('value', 0.0)
- total_open_unrealized_pnl += op_trade.get('unrealized_pnl', 0.0)
-
- # Get open orders count for this token
- open_orders_count_row = self.db_manager._fetchone_query(
- "SELECT COUNT(*) as count FROM orders WHERE symbol LIKE ? AND status IN ('open', 'submitted', 'pending_trigger')",
- (f"{upper_token}/%",)
- )
- current_open_orders_for_token = open_orders_count_row['count'] if open_orders_count_row else 0
-
- effective_total_trades = perf_stats['completed_trades'] + len(open_trades_for_token)
-
- return {
- 'token': upper_token,
- 'message': f"Statistics for {upper_token}",
- 'performance_summary': perf_stats, # Expected key by formatting method
- 'performance': perf_stats, # Legacy compatibility
- 'open_positions': open_positions_summary, # Direct list as expected
- 'summary_total_trades': effective_total_trades, # Expected by formatting method
- 'summary_total_unrealized_pnl': total_open_unrealized_pnl, # Expected by formatting method
- 'current_open_orders_count': current_open_orders_for_token, # Expected by formatting method
- 'summary': {
- 'total_trades': effective_total_trades,
- 'open_orders': current_open_orders_for_token,
- }
- }
-
- except Exception as e:
- logger.error(f"❌ Error getting detailed stats for {token}: {e}")
- return {}
- def _format_duration(self, seconds: float) -> str:
- """Format duration in seconds to a human-readable string."""
- if seconds <= 0:
- return "0s"
-
- days = int(seconds // 86400)
- hours = int((seconds % 86400) // 3600)
- minutes = int((seconds % 3600) // 60)
- secs = int(seconds % 60)
-
- parts = []
- if days > 0:
- parts.append(f"{days}d")
- if hours > 0:
- parts.append(f"{hours}h")
- if minutes > 0:
- parts.append(f"{minutes}m")
- if secs > 0 or not parts:
- parts.append(f"{secs}s")
-
- return " ".join(parts)
- async def format_stats_message(self, current_balance: Optional[float] = None) -> str:
- """Formats a comprehensive statistics message."""
- formatter = get_formatter()
- basic_stats = self.get_basic_stats(current_balance)
- initial_bal = basic_stats.get('initial_balance', 0.0)
- total_pnl_val = basic_stats.get('total_pnl', 0.0)
- total_return_pct = basic_stats.get('total_return_pct', 0.0)
- pnl_emoji = "✅" if total_pnl_val >= 0 else "🔻"
- stats_text_parts = [
- f"📊 <b>Trading Performance Summary</b>",
- f"• Current Balance: {await formatter.format_price_with_symbol(current_balance if current_balance is not None else (initial_bal + total_pnl_val))} ({await formatter.format_price_with_symbol(current_balance if current_balance is not None else (initial_bal + total_pnl_val) - initial_bal) if initial_bal > 0 else 'N/A'})",
- f"• Initial Balance: {await formatter.format_price_with_symbol(initial_bal)}",
- f"• Balance Change: {await formatter.format_price_with_symbol(total_pnl_val)} ({total_return_pct:+.2f}%)",
- f"• {pnl_emoji} Total P&L: {await formatter.format_price_with_symbol(total_pnl_val)} ({total_return_pct:+.2f}%)"
- ]
-
- # Performance Metrics
- perf = basic_stats.get('performance_metrics', {})
- if perf:
- stats_text_parts.append("\n<b>Key Metrics:</b>")
- stats_text_parts.append(f"• Trading Volume (Entry Vol.): {await formatter.format_price_with_symbol(perf.get('total_trading_volume', 0.0))}")
- if perf.get('expectancy') is not None:
- stats_text_parts.append(f"• Expectancy: {await formatter.format_price_with_symbol(perf['expectancy'])}")
- stats_text_parts.append(f"• Win Rate: {perf.get('win_rate', 0.0):.2f}% ({perf.get('num_wins', 0)} wins)")
- stats_text_parts.append(f"• Profit Factor: {perf.get('profit_factor', 0.0):.2f}")
-
- # Largest Trades
- if perf.get('largest_win') is not None:
- largest_win_pct_str = f" ({perf.get('largest_win_entry_pct', 0):.2f}%)" if perf.get('largest_win_entry_pct') is not None else ""
- largest_win_token = perf.get('largest_win_token', 'N/A')
- stats_text_parts.append(f"• Largest Winning Trade: {await formatter.format_price_with_symbol(perf['largest_win'])}{largest_win_pct_str} ({largest_win_token})")
- if perf.get('largest_loss') is not None:
- largest_loss_pct_str = f" ({perf.get('largest_loss_entry_pct', 0):.2f}%)" if perf.get('largest_loss_entry_pct') is not None else ""
- largest_loss_token = perf.get('largest_loss_token', 'N/A')
- stats_text_parts.append(f"• Largest Losing Trade: {await formatter.format_price_with_symbol(-perf['largest_loss'])}{largest_loss_pct_str} ({largest_loss_token})")
- # ROE-based metrics if available
- largest_win_roe = perf.get('largest_win_roe')
- largest_loss_roe = perf.get('largest_loss_roe')
- if largest_win_roe is not None:
- largest_win_roe_pnl = perf.get('largest_win_roe_pnl', 0.0)
- largest_win_roe_token = perf.get('largest_win_roe_token', 'N/A')
- stats_text_parts.append(f"• Best ROE Trade: {await formatter.format_price_with_symbol(largest_win_roe_pnl)} (+{largest_win_roe:.2f}%) ({largest_win_roe_token})")
- if largest_loss_roe is not None:
- largest_loss_roe_pnl = perf.get('largest_loss_roe_pnl', 0.0)
- largest_loss_roe_token = perf.get('largest_loss_roe_token', 'N/A')
- stats_text_parts.append(f"• Worst ROE Trade: {await formatter.format_price_with_symbol(-largest_loss_roe_pnl)} (-{largest_loss_roe:.2f}%) ({largest_loss_roe_token})")
- # Best/Worst Tokens
- best_token_stats = basic_stats.get('best_token')
- worst_token_stats = basic_stats.get('worst_token')
- if best_token_stats:
- stats_text_parts.append(f"• Best Token: {best_token_stats['name']} {await formatter.format_price_with_symbol(best_token_stats['pnl_value'])} ({best_token_stats['pnl_percentage']:+.2f}%)")
- if worst_token_stats:
- stats_text_parts.append(f"• Worst Token: {worst_token_stats['name']} {await formatter.format_price_with_symbol(worst_token_stats['pnl_value'])} ({worst_token_stats['pnl_percentage']:+.2f}%)")
- return "\n".join(stats_text_parts)
- async def format_token_stats_message(self, token: str) -> str:
- """Formats a statistics message for a specific token."""
- formatter = get_formatter()
- token_stats = self.get_token_detailed_stats(token)
- normalized_token = _normalize_token_case(token)
- token_name = token_stats.get('token', normalized_token.upper())
-
- if not token_stats or token_stats.get('summary_total_trades', 0) == 0:
- return (
- f"📊 <b>{token_name} Statistics</b>\n\n"
- f"📭 No trading data found for {token_name}.\n\n"
- f"💡 To trade this token, try commands like:\n"
- f" <code>/long {token_name} 100</code>\n"
- f" <code>/short {token_name} 100</code>"
- )
- perf_summary = token_stats.get('performance_summary', {})
- open_positions = token_stats.get('open_positions', [])
-
- parts = [f"📊 <b>{token_name.upper()} Detailed Statistics</b>\n"]
- # Completed Trades Summary
- parts.append("📈 <b>Completed Trades Summary:</b>")
- if perf_summary.get('completed_trades', 0) > 0:
- pnl_emoji = "✅" if perf_summary.get('total_pnl', 0) >= 0 else "🔻"
- entry_vol = perf_summary.get('completed_entry_volume', 0.0)
- pnl_pct = (perf_summary.get('total_pnl', 0.0) / entry_vol * 100) if entry_vol > 0 else 0.0
-
- parts.append(f"• Total Completed: {perf_summary.get('completed_trades', 0)}")
- parts.append(f"• {pnl_emoji} Realized P&L: {await formatter.format_price_with_symbol(perf_summary.get('total_pnl', 0.0))} ({pnl_pct:+.2f}%)")
- parts.append(f"• Win Rate: {perf_summary.get('win_rate', 0.0):.1f}% ({perf_summary.get('total_wins', 0)}W / {perf_summary.get('total_losses', 0)}L)")
- parts.append(f"• Profit Factor: {perf_summary.get('profit_factor', 0.0):.2f}")
- parts.append(f"• Expectancy: {await formatter.format_price_with_symbol(perf_summary.get('expectancy', 0.0))}")
- parts.append(f"• Avg Win: {await formatter.format_price_with_symbol(perf_summary.get('avg_win', 0.0))} | Avg Loss: {await formatter.format_price_with_symbol(perf_summary.get('avg_loss', 0.0))}")
-
- # Format largest trades with percentages
- largest_win_pct_str = f" ({perf_summary.get('largest_win_entry_pct', 0):.2f}%)" if perf_summary.get('largest_win_entry_pct') is not None else ""
- largest_loss_pct_str = f" ({perf_summary.get('largest_loss_entry_pct', 0):.2f}%)" if perf_summary.get('largest_loss_entry_pct') is not None else ""
-
- parts.append(f"• Largest Win: {await formatter.format_price_with_symbol(perf_summary.get('largest_win', 0.0))}{largest_win_pct_str} | Largest Loss: {await formatter.format_price_with_symbol(perf_summary.get('largest_loss', 0.0))}{largest_loss_pct_str}")
- parts.append(f"• Entry Volume: {await formatter.format_price_with_symbol(perf_summary.get('completed_entry_volume', 0.0))}")
- parts.append(f"• Exit Volume: {await formatter.format_price_with_symbol(perf_summary.get('completed_exit_volume', 0.0))}")
- parts.append(f"• Average Trade Duration: {perf_summary.get('avg_trade_duration', 'N/A')}")
- parts.append(f"• Cancelled Cycles: {perf_summary.get('total_cancelled', 0)}")
- else:
- parts.append("• No completed trades for this token yet.")
- parts.append("")
- # Open Positions
- parts.append("📉 <b>Current Open Positions:</b>")
- if open_positions:
- total_open_unrealized_pnl = token_stats.get('summary_total_unrealized_pnl', 0.0)
- open_pnl_emoji = "✅" if total_open_unrealized_pnl >= 0 else "🔻"
-
- for pos in open_positions:
- pos_side_emoji = "🔼" if pos.get('side', 'buy').lower() == 'buy' else "🔽"
- pos_pnl_emoji = "✅" if pos.get('unrealized_pnl', 0) >= 0 else "🔻"
- opened_at_str = "N/A"
- if pos.get('opened_at'):
- try:
- from datetime import datetime
- opened_at_dt = datetime.fromisoformat(pos['opened_at'])
- opened_at_str = opened_at_dt.strftime('%Y-%m-%d %H:%M')
- except:
- pass
-
- parts.append(f"• {pos_side_emoji} {pos.get('side', '').upper()} {await formatter.format_amount(abs(pos.get('amount',0)), token_name)} {token_name}")
- parts.append(f" Entry: {await formatter.format_price_with_symbol(pos.get('entry_price',0), token_name)} | Mark: {await formatter.format_price_with_symbol(pos.get('mark_price',0), token_name)}")
- parts.append(f" {pos_pnl_emoji} Unrealized P&L: {await formatter.format_price_with_symbol(pos.get('unrealized_pnl',0))}")
- parts.append(f" Opened: {opened_at_str} | ID: ...{pos.get('lifecycle_id', '')[-6:]}")
- parts.append(f" {open_pnl_emoji} <b>Total Open P&L: {await formatter.format_price_with_symbol(total_open_unrealized_pnl)}</b>")
- else:
- parts.append("• No open positions for this token.")
- parts.append("")
- parts.append(f"📋 Open Orders (Exchange): {token_stats.get('current_open_orders_count', 0)}")
- parts.append(f"💡 Use <code>/performance {token_name}</code> for another view including recent trades.")
-
- return "\n".join(parts)
- # =============================================================================
- # CONVENIENCE METHODS & HIGH-LEVEL OPERATIONS
- # =============================================================================
-
- def process_trade_complete_cycle(self, symbol: str, side: str, entry_price: float,
- exit_price: float, amount: float,
- timestamp: Optional[str] = None) -> str:
- """Process a complete trade cycle in one operation."""
- # Create lifecycle
- lifecycle_id = self.create_trade_lifecycle(symbol, side, trade_type='complete_cycle')
- if not lifecycle_id:
- raise Exception("Failed to create trade lifecycle")
-
- # Update to position opened
- success = self.update_trade_position_opened(lifecycle_id, entry_price, amount, "manual_entry")
- if not success:
- raise Exception("Failed to update position opened")
-
- # Calculate PnL
- if side.lower() == 'buy':
- realized_pnl = (exit_price - entry_price) * amount
- else: # sell
- realized_pnl = (entry_price - exit_price) * amount
-
- # Update to position closed
- success = self.update_trade_position_closed(lifecycle_id, exit_price, realized_pnl, "manual_exit")
- if not success:
- raise Exception("Failed to update position closed")
-
- # Migrate to aggregated stats
- self.migrate_trade_to_aggregated_stats(lifecycle_id)
-
- logger.info(f"✅ Processed complete trade cycle: {symbol} {side.upper()} P&L: ${realized_pnl:.2f}")
- return lifecycle_id
- def get_summary_report(self) -> Dict[str, Any]:
- """Get comprehensive summary report."""
- try:
- perf_stats = self.get_performance_stats()
- token_performance = self.get_token_performance(limit=10)
- daily_stats = self.get_daily_stats(limit=7)
- risk_metrics = self.get_risk_metrics()
- balance_adjustments = self.get_balance_adjustments_summary()
-
- # Get current positions
- open_positions = self.get_open_positions()
-
- return {
- 'performance_stats': perf_stats,
- 'top_tokens': token_performance,
- 'recent_daily_stats': daily_stats,
- 'risk_metrics': risk_metrics,
- 'balance_adjustments': balance_adjustments,
- 'open_positions_count': len(open_positions),
- 'open_positions': open_positions,
- 'generated_at': datetime.now(timezone.utc).isoformat()
- }
-
- except Exception as e:
- logger.error(f"❌ Error generating summary report: {e}")
- return {'error': str(e)}
- async def record_trade(self, symbol: str, side: str, amount: float, price: float,
- exchange_fill_id: Optional[str] = None, trade_type: str = "manual",
- pnl: Optional[float] = None, timestamp: Optional[str] = None,
- linked_order_table_id_to_link: Optional[int] = None):
- """DEPRECATED - use trade lifecycle methods instead."""
- if timestamp is None:
- timestamp = datetime.now(timezone.utc).isoformat()
-
- value = amount * price
- formatter = get_formatter()
- ts = timestamp or datetime.now(timezone.utc).isoformat()
-
- base_asset_for_amount = symbol.split('/')[0]
- logger.info(f"📈 Trade recorded: {side.upper()} {await formatter.format_amount(amount, base_asset_for_amount)} {symbol} @ {await formatter.format_price(price, symbol)} ({await formatter.format_price(value, symbol)}) [{trade_type}]")
- self.db_manager._execute_query(
- "INSERT OR IGNORE INTO trades (symbol, side, amount, price, value, trade_type, timestamp, exchange_fill_id, pnl, linked_order_table_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
- (symbol, side, amount, price, value, trade_type, ts, exchange_fill_id, pnl or 0.0, linked_order_table_id_to_link)
- )
- def health_check(self) -> Dict[str, Any]:
- """Perform health check on all components."""
- try:
- health = {
- 'database': 'ok',
- 'order_manager': 'ok',
- 'trade_manager': 'ok',
- 'aggregation_manager': 'ok',
- 'performance_calculator': 'ok',
- 'overall': 'ok'
- }
-
- # Test database connection
- self.db_manager._fetch_query("SELECT 1")
-
- # Test each component with basic operations
- self.get_recent_orders(limit=1)
- self.get_recent_trades(limit=1)
- self.get_daily_stats(limit=1)
- self.get_performance_stats()
-
- return health
-
- except Exception as e:
- logger.error(f"❌ Health check failed: {e}")
- return {'overall': 'error', 'error': str(e)}
|