浏览代码

Enhance MarketMonitor and PositionSynchronizer with improved balance handling and method refactoring.

- Updated MarketMonitor to extract USDC balance from a dictionary structure in fresh_balance before updating the drawdown monitor.
- Refactored PositionSynchronizer to remove leading underscores from method calls to stats, improving consistency and readability.
- Introduced new methods in DatabaseManager for setting and recording initial balances, enhancing balance management capabilities.
- Added order management methods in OrderManager for recording filled and cancelled orders, improving order tracking and management.
- Enhanced TradingStats with compatibility methods for better access to metadata and improved statistics retrieval.
Carles Sentis 23 小时之前
父节点
当前提交
8098300cc3

+ 4 - 1
src/monitoring/market_monitor.py

@@ -243,7 +243,10 @@ class MarketMonitor:
             
             # Update drawdown monitor with the latest balance
             if self.drawdown_monitor and fresh_balance and fresh_balance.get('total') is not None:
-                self.drawdown_monitor.update_balance(float(fresh_balance['total']))
+                # fresh_balance['total'] is a dict like {'USDC': 1234.56, 'BTC': 0.001}
+                usdc_balance = fresh_balance['total'].get('USDC', 0)
+                if usdc_balance:
+                    self.drawdown_monitor.update_balance(float(usdc_balance))
 
             logger.debug(f"🔄 Cache updated: {len(fresh_positions_list)} positions, {len(fresh_orders_list)} orders")
 

+ 2 - 2
src/monitoring/position_synchronizer.py

@@ -199,7 +199,7 @@ class PositionSynchronizer:
                     if success:
                         closed_due_to_discrepancy += 1
                         logger.info(f"✅ AUTO-SYNC (Discrepancy): Successfully closed bot lifecycle {lc_id} for {symbol}.")
-                        stats._migrate_trade_to_aggregated_stats(lc_id)
+                        stats.migrate_trade_to_aggregated_stats(lc_id)
                         if self.notification_manager:
                             pnl_emoji = "🟢" if realized_pnl >= 0 else "🔴"
                             notification_text = (
@@ -425,7 +425,7 @@ class PositionSynchronizer:
                         if success_close:
                             closed_due_to_discrepancy_startup += 1
                             logger.info(f"✅ STARTUP (Discrepancy): Successfully closed bot lifecycle {lc_id} for {symbol}.")
-                            stats._migrate_trade_to_aggregated_stats(lc_id)
+                            stats.migrate_trade_to_aggregated_stats(lc_id)
                             if self.notification_manager:
                                 pnl_emoji = "🟢" if realized_pnl >= 0 else "🔴"
                                 notification_text = (

+ 41 - 0
src/stats/database_manager.py

@@ -12,6 +12,7 @@ from datetime import datetime, timezone, timedelta
 from typing import Dict, List, Any, Optional
 from src.migrations.migrate_db import run_migrations as run_db_migrations
 from src.config.config import Config
+from src.utils.token_display_formatter import get_formatter
 
 logger = logging.getLogger(__name__)
 
@@ -251,6 +252,46 @@ class DatabaseManager:
         """Set a value in the metadata table."""
         self._execute_query("INSERT OR REPLACE INTO metadata (key, value) VALUES (?, ?)", (key, value))
 
+    def set_initial_balance(self, balance: float):
+        """Set the initial balance if not already set or zero."""
+        current_initial_balance_str = self._get_metadata('initial_balance')
+        current_initial_balance = float(current_initial_balance_str) if current_initial_balance_str else 0.0
+        
+        if current_initial_balance == 0.0:  # Only set if it's effectively unset
+            self._set_metadata('initial_balance', str(balance))
+            # Also set start_date if it's the first time setting balance
+            if self._get_metadata('start_date') is None or float(current_initial_balance_str if current_initial_balance_str else '0.0') == 0.0:
+                self._set_metadata('start_date', datetime.now(timezone.utc).isoformat())
+            formatter = get_formatter()
+            logger.info(f"Initial balance set to: {formatter.format_price_with_symbol(balance)}")
+        else:
+            formatter = get_formatter()
+            logger.info(f"Initial balance already set to {formatter.format_price_with_symbol(current_initial_balance)}. Not changing.")
+
+    def get_initial_balance(self) -> float:
+        """Get the initial balance."""
+        initial_balance_str = self._get_metadata('initial_balance')
+        return float(initial_balance_str) if initial_balance_str else 0.0
+
+    def record_balance_snapshot(self, balance: float, unrealized_pnl: float = 0.0, 
+                               timestamp: Optional[str] = None, notes: Optional[str] = None):
+        """Record a balance snapshot."""
+        if not timestamp:
+            timestamp = datetime.now(timezone.utc).isoformat()
+        
+        query = """
+            INSERT INTO balance_history (balance, unrealized_pnl, timestamp, notes)
+            VALUES (?, ?, ?, ?)
+        """
+        
+        try:
+            self._execute_query(query, (balance, unrealized_pnl, timestamp, notes))
+            from src.utils.token_display_formatter import get_formatter
+            formatter = get_formatter()
+            logger.info(f"Recorded balance snapshot: {formatter.format_price_with_symbol(balance)} (unrealized: {formatter.format_price_with_symbol(unrealized_pnl)})")
+        except Exception as e:
+            logger.error(f"Failed to record balance snapshot: {e}")
+
     def purge_old_daily_aggregated_stats(self, months_to_keep: int = 10):
         """Purge records from daily_aggregated_stats older than specified months."""
         try:

+ 61 - 1
src/stats/order_manager.py

@@ -262,4 +262,64 @@ class OrderManager:
             return self.db._fetch_query(query, (limit,))
         except Exception as e:
             logger.error(f"❌ Error getting recent orders: {e}")
-            return [] 
+            return []
+
+    def record_order_filled(self, exchange_order_id: str, actual_amount: float, 
+                           actual_price: float, fees: float = 0.0, 
+                           timestamp: Optional[str] = None, 
+                           exchange_fill_id: Optional[str] = None) -> bool:
+        """Record order fill."""
+        if not timestamp:
+            timestamp = datetime.now(timezone.utc).isoformat()
+        
+        success = self.update_order_status(
+            exchange_order_id=exchange_order_id, 
+            new_status='filled',
+            amount_filled_increment=actual_amount
+        )
+        
+        if success:
+            logger.info(f"Recorded order filled: {exchange_order_id}, Amount: {actual_amount}, Price: {actual_price}")
+        
+        return success
+
+    def update_order_exchange_id(self, bot_order_ref_id: str, exchange_order_id: str) -> bool:
+        """Update order with exchange ID."""
+        return self.update_order_status(
+            bot_order_ref_id=bot_order_ref_id,
+            set_exchange_order_id=exchange_order_id
+        )
+
+    def record_order_cancelled(self, exchange_order_id: str, reason: str = "user_cancelled", 
+                              timestamp: Optional[str] = None) -> bool:
+        """Record order cancellation."""
+        return self.update_order_status(
+            exchange_order_id=exchange_order_id,
+            new_status=f'cancelled_{reason}' if not reason.startswith('cancelled') else reason
+        )
+
+    def get_orders_by_symbol(self, symbol: str, limit: int = 50) -> List[Dict[str, Any]]:
+        """Get orders by symbol."""
+        return self.db._fetch_query(
+            "SELECT * FROM orders WHERE symbol = ? ORDER BY timestamp_created DESC LIMIT ?",
+            (symbol, limit)
+        )
+
+    def cleanup_old_cancelled_orders(self, days_old: int = 7) -> int:
+        """Clean up old cancelled orders."""
+        cutoff_date = (datetime.now(timezone.utc) - timedelta(days=days_old)).isoformat()
+        
+        result = self.db._fetchone_query(
+            "SELECT COUNT(*) as count FROM orders WHERE status LIKE 'cancelled%' AND timestamp_created < ?",
+            (cutoff_date,)
+        )
+        
+        count_before = result['count'] if result else 0
+        
+        self.db._execute_query(
+            "DELETE FROM orders WHERE status LIKE 'cancelled%' AND timestamp_created < ?",
+            (cutoff_date,)
+        )
+        
+        logger.info(f"Cleaned up {count_before} cancelled orders older than {days_old} days")
+        return count_before 

+ 258 - 5
src/stats/trading_stats.py

@@ -48,6 +48,18 @@ class TradingStats:
         """Close database connection."""
         self.db_manager.close()
 
+    # =============================================================================
+    # COMPATIBILITY METHODS - Direct exposure of internal methods
+    # =============================================================================
+    
+    def _get_metadata(self, key: str) -> Optional[str]:
+        """Get metadata from database."""
+        return self.db_manager._get_metadata(key)
+    
+    def _set_metadata(self, key: str, value: str):
+        """Set metadata in database."""
+        return self.db_manager._set_metadata(key, value)
+
     # =============================================================================
     # DATABASE MANAGEMENT DELEGATION
     # =============================================================================
@@ -87,10 +99,11 @@ class TradingStats:
                             exchange_order_id: Optional[str] = None, 
                             timestamp: Optional[str] = None) -> bool:
         """Record order placement."""
-        return self.order_manager.record_order_placed(
+        result = self.order_manager.record_order_placed(
             symbol, side, order_type, amount_requested, price, 
-            bot_order_ref_id, exchange_order_id, timestamp
+            bot_order_ref_id, exchange_order_id
         )
+        return result is not None
     
     def update_order_exchange_id(self, bot_order_ref_id: str, exchange_order_id: str) -> bool:
         """Update order with exchange ID."""
@@ -127,9 +140,12 @@ class TradingStats:
         """Get orders by symbol."""
         return self.order_manager.get_orders_by_symbol(symbol, limit)
     
-    def get_orders_by_status(self, status: str, limit: int = 50) -> List[Dict[str, Any]]:
-        """Get orders by status."""
-        return self.order_manager.get_orders_by_status(status, limit)
+    def get_orders_by_status(self, status: str, limit: Optional[int] = 50, 
+                            order_type_filter: Optional[str] = None,
+                            parent_bot_order_ref_id: Optional[str] = None) -> List[Dict[str, Any]]:
+        """Get orders by status with optional filters."""
+        # OrderManager expects (status, order_type_filter, parent_bot_order_ref_id) without limit
+        return self.order_manager.get_orders_by_status(status, order_type_filter, parent_bot_order_ref_id)
     
     def get_recent_orders(self, limit: int = 20) -> List[Dict[str, Any]]:
         """Get recent orders."""
@@ -319,6 +335,243 @@ class TradingStats:
         """Get recent performance trend."""
         return self.performance_calculator.get_recent_performance_trend(days)
 
+    # =============================================================================
+    # COMPATIBILITY METHODS - Legacy API Support
+    # =============================================================================
+    
+    def get_basic_stats(self, current_balance: Optional[float] = None) -> Dict[str, Any]:
+        """Get basic trading statistics from DB, primarily using aggregated tables."""
+        
+        # Get counts of open positions (trades that are not yet migrated)
+        open_positions_count = self._get_open_positions_count_from_db()
+
+        # Get overall aggregated stats from token_stats table
+        query_token_stats_summary = """
+            SELECT 
+                SUM(total_realized_pnl) as total_pnl_from_cycles,
+                SUM(total_completed_cycles) as total_completed_cycles_sum,
+                MIN(first_cycle_closed_at) as overall_first_cycle_closed,
+                MAX(last_cycle_closed_at) as overall_last_cycle_closed
+            FROM token_stats
+        """
+        token_stats_summary = self.db_manager._fetchone_query(query_token_stats_summary)
+
+        total_pnl_from_cycles = token_stats_summary['total_pnl_from_cycles'] if token_stats_summary and token_stats_summary['total_pnl_from_cycles'] is not None else 0.0
+        total_completed_cycles_sum = token_stats_summary['total_completed_cycles_sum'] if token_stats_summary and token_stats_summary['total_completed_cycles_sum'] is not None else 0
+
+        # Total trades considered as sum of completed cycles and currently open positions
+        total_trades_redefined = total_completed_cycles_sum + open_positions_count
+
+        initial_balance_str = self._get_metadata('initial_balance')
+        initial_balance = float(initial_balance_str) if initial_balance_str else 0.0
+        
+        start_date_iso = self._get_metadata('start_date')
+        start_date_obj = datetime.fromisoformat(start_date_iso) if start_date_iso else datetime.now(timezone.utc)
+        days_active = (datetime.now(timezone.utc) - start_date_obj).days + 1
+        
+        # 'last_trade' timestamp could be the last update to token_stats or an open trade
+        last_activity_ts = token_stats_summary['overall_last_cycle_closed'] if token_stats_summary else None
+        last_open_trade_ts_row = self.db_manager._fetchone_query("SELECT MAX(updated_at) as last_update FROM trades WHERE status = 'position_opened'")
+        if last_open_trade_ts_row and last_open_trade_ts_row['last_update']:
+            if not last_activity_ts or datetime.fromisoformat(last_open_trade_ts_row['last_update']) > datetime.fromisoformat(last_activity_ts):
+                last_activity_ts = last_open_trade_ts_row['last_update']
+
+        return {
+            'total_trades': total_trades_redefined,
+            'completed_trades': total_completed_cycles_sum,
+            'initial_balance': initial_balance,
+            'total_pnl': total_pnl_from_cycles,
+            'days_active': days_active,
+            'start_date': start_date_obj.strftime('%Y-%m-%d'),
+            'last_trade': last_activity_ts,
+            'open_positions_count': open_positions_count
+        }
+
+    def _get_open_positions_count_from_db(self) -> int:
+        """Get count of open positions from trades table."""
+        row = self.db_manager._fetchone_query("SELECT COUNT(DISTINCT symbol) as count FROM trades WHERE status = 'position_opened'")
+        return row['count'] if row else 0
+
+    def get_token_detailed_stats(self, token: str) -> Dict[str, Any]:
+        """Get detailed statistics for a specific token."""
+        try:
+            # Normalize token case
+            upper_token = _normalize_token_case(token)
+            
+            # Get aggregated stats from token_stats table
+            token_agg_stats = self.db_manager._fetchone_query(
+                "SELECT * FROM token_stats WHERE token = ?", (upper_token,)
+            )
+            
+            # Get open trades for this token
+            open_trades_for_token = self.db_manager._fetch_query(
+                "SELECT * FROM trades WHERE status = 'position_opened' AND symbol LIKE ? ORDER BY position_opened_at DESC",
+                (f"{upper_token}/%",)
+            )
+            
+            # Initialize performance stats
+            perf_stats = {
+                'completed_trades': 0,
+                'total_pnl': 0.0,
+                'pnl_percentage': 0.0,
+                'win_rate': 0.0,
+                'profit_factor': 0.0,
+                'avg_win': 0.0,
+                'avg_loss': 0.0,
+                'largest_win': 0.0,
+                'largest_loss': 0.0,
+                'expectancy': 0.0,
+                'total_wins': 0,
+                'total_losses': 0,
+                'completed_entry_volume': 0.0,
+                'completed_exit_volume': 0.0,
+                'total_cancelled': 0,
+                'total_duration_seconds': 0,
+                'avg_trade_duration': "N/A"
+            }
+            
+            if token_agg_stats:
+                total_cycles = token_agg_stats.get('total_completed_cycles', 0)
+                winning_cycles = token_agg_stats.get('winning_cycles', 0)
+                losing_cycles = token_agg_stats.get('losing_cycles', 0)
+                sum_winning_pnl = token_agg_stats.get('sum_of_winning_pnl', 0.0)
+                sum_losing_pnl = token_agg_stats.get('sum_of_losing_pnl', 0.0)
+                
+                perf_stats.update({
+                    'completed_trades': total_cycles,
+                    'total_pnl': token_agg_stats.get('total_realized_pnl', 0.0),
+                    'win_rate': (winning_cycles / total_cycles * 100) if total_cycles > 0 else 0.0,
+                    'profit_factor': (sum_winning_pnl / sum_losing_pnl) if sum_losing_pnl > 0 else float('inf') if sum_winning_pnl > 0 else 0.0,
+                    'avg_win': (sum_winning_pnl / winning_cycles) if winning_cycles > 0 else 0.0,
+                    'avg_loss': (sum_losing_pnl / losing_cycles) if losing_cycles > 0 else 0.0,
+                    'largest_win': token_agg_stats.get('largest_winning_cycle_pnl', 0.0),
+                    'largest_loss': token_agg_stats.get('largest_losing_cycle_pnl', 0.0),
+                    'total_wins': winning_cycles,
+                    'total_losses': losing_cycles,
+                    'completed_entry_volume': token_agg_stats.get('total_entry_volume', 0.0),
+                    'completed_exit_volume': token_agg_stats.get('total_exit_volume', 0.0),
+                    'total_cancelled': token_agg_stats.get('total_cancelled_cycles', 0),
+                    'total_duration_seconds': token_agg_stats.get('total_duration_seconds', 0)
+                })
+                
+                # Calculate expectancy
+                win_rate_decimal = perf_stats['win_rate'] / 100
+                perf_stats['expectancy'] = (perf_stats['avg_win'] * win_rate_decimal) - (perf_stats['avg_loss'] * (1 - win_rate_decimal))
+                
+                # Format average trade duration
+                if total_cycles > 0:
+                    avg_duration_seconds = token_agg_stats.get('total_duration_seconds', 0) / total_cycles
+                    perf_stats['avg_trade_duration'] = self._format_duration(avg_duration_seconds)
+            
+            # Calculate open positions summary
+            open_positions_summary = []
+            total_open_value = 0.0
+            total_open_unrealized_pnl = 0.0
+            
+            for op_trade in open_trades_for_token:
+                open_positions_summary.append({
+                    'lifecycle_id': op_trade.get('trade_lifecycle_id'),
+                    'side': op_trade.get('position_side'),
+                    'amount': op_trade.get('current_position_size'),
+                    'entry_price': op_trade.get('entry_price'),
+                    'mark_price': op_trade.get('mark_price'),
+                    'unrealized_pnl': op_trade.get('unrealized_pnl'),
+                    'opened_at': op_trade.get('position_opened_at')
+                })
+                total_open_value += op_trade.get('value', 0.0)
+                total_open_unrealized_pnl += op_trade.get('unrealized_pnl', 0.0)
+            
+            # Get open orders count for this token
+            open_orders_count_row = self.db_manager._fetchone_query(
+                "SELECT COUNT(*) as count FROM orders WHERE symbol LIKE ? AND status IN ('open', 'submitted', 'pending_trigger')",
+                 (f"{upper_token}/%",)
+            )
+            current_open_orders_for_token = open_orders_count_row['count'] if open_orders_count_row else 0
+            
+            effective_total_trades = perf_stats['completed_trades'] + len(open_trades_for_token)
+            
+            return {
+                'token': upper_token,
+                'performance': perf_stats,
+                'open_positions': {
+                    'count': len(open_trades_for_token),
+                    'total_value': total_open_value,
+                    'total_unrealized_pnl': total_open_unrealized_pnl,
+                    'positions': open_positions_summary
+                },
+                'summary': {
+                    'total_trades': effective_total_trades,
+                    'open_orders': current_open_orders_for_token,
+                }
+            }
+            
+        except Exception as e:
+            logger.error(f"❌ Error getting detailed stats for {token}: {e}")
+            return {}
+
+    def _format_duration(self, seconds: float) -> str:
+        """Format duration in seconds to a human-readable string."""
+        if seconds <= 0:
+            return "0s"
+        
+        days = int(seconds // 86400)
+        hours = int((seconds % 86400) // 3600)
+        minutes = int((seconds % 3600) // 60)
+        secs = int(seconds % 60)
+        
+        parts = []
+        if days > 0:
+            parts.append(f"{days}d")
+        if hours > 0:
+            parts.append(f"{hours}h")
+        if minutes > 0:
+            parts.append(f"{minutes}m")
+        if secs > 0 or not parts:
+            parts.append(f"{secs}s")
+        
+        return " ".join(parts)
+
+    def format_stats_message(self, current_balance: Optional[float] = None) -> str:
+        """Format stats for Telegram display using data from DB."""
+        try:
+            basic = self.get_basic_stats(current_balance)
+            perf = self.get_performance_stats()
+            risk = self.get_risk_metrics()
+            
+            formatter = get_formatter()
+            
+            effective_current_balance = current_balance if current_balance is not None else (basic['initial_balance'] + basic['total_pnl'])
+            initial_bal = basic['initial_balance']
+
+            total_pnl_val = effective_current_balance - initial_bal if initial_bal > 0 and current_balance is not None else basic['total_pnl']
+            total_return_pct = (total_pnl_val / initial_bal * 100) if initial_bal > 0 else 0.0
+            pnl_emoji = "🟢" if total_pnl_val >= 0 else "🔴"
+            open_positions_count = basic['open_positions_count']
+
+            stats_text_parts = []
+            stats_text_parts.append(f"📊 <b>Trading Statistics</b>\n")
+            
+            # Account Overview
+            stats_text_parts.append(f"\n💰 <b>Account Overview:</b>")
+            stats_text_parts.append(f"• Current Balance: {formatter.format_price_with_symbol(effective_current_balance)}")
+            stats_text_parts.append(f"• Initial Balance: {formatter.format_price_with_symbol(initial_bal)}")
+            stats_text_parts.append(f"• Open Positions: {open_positions_count}")
+            stats_text_parts.append(f"• {pnl_emoji} Total P&L: {formatter.format_price_with_symbol(total_pnl_val)} ({total_return_pct:+.2f}%)")
+            stats_text_parts.append(f"• Days Active: {basic['days_active']}\n")
+
+            # Trading Performance
+            stats_text_parts.append(f"📈 <b>Trading Performance:</b>")
+            stats_text_parts.append(f"• Total Cycles: {perf['total_completed_cycles']}")
+            stats_text_parts.append(f"• Win Rate: {perf['win_rate']:.1f}% ({perf['total_winning_cycles']}/{perf['total_completed_cycles']})")
+            stats_text_parts.append(f"• Profit Factor: {perf['profit_factor']:.2f}")
+            stats_text_parts.append(f"• Expectancy: {formatter.format_price_with_symbol(perf['expectancy'])}")
+            
+            return "\n".join(stats_text_parts)
+            
+        except Exception as e:
+            logger.error(f"❌ Error formatting stats message: {e}")
+            return f"❌ Error generating statistics: {str(e)}"
+
     # =============================================================================
     # CONVENIENCE METHODS & HIGH-LEVEL OPERATIONS
     # =============================================================================