浏览代码

Enhance database synchronization in PositionTracker to support bidirectional sync

- Updated the _sync_database_once method to close database positions that don't exist on the exchange and create records for exchange positions missing in the database.
- Introduced UUID generation for new trade records and improved logging for created database entries.
- Modified the PerformanceCalculator to use total_entry_volume instead of completed_entry_volume for more accurate ROE calculations.
Carles Sentis 10 小时之前
父节点
当前提交
bf6838a70a
共有 3 个文件被更改,包括 54 次插入5 次删除
  1. 52 3
      src/monitoring/position_tracker.py
  2. 1 1
      src/stats/performance_calculator.py
  3. 1 1
      trading_bot.py

+ 52 - 3
src/monitoring/position_tracker.py

@@ -1,5 +1,6 @@
 import asyncio
 import asyncio
 import logging
 import logging
+import uuid
 from typing import Dict, List, Optional, Any
 from typing import Dict, List, Optional, Any
 from datetime import datetime, timezone
 from datetime import datetime, timezone
 
 
@@ -94,7 +95,8 @@ class PositionTracker:
             logger.error(f"Error checking position changes: {e}")
             logger.error(f"Error checking position changes: {e}")
             
             
     async def _sync_database_once(self):
     async def _sync_database_once(self):
-        """Simple one-time check: close database positions that don't exist on exchange"""
+        """Simple bidirectional sync: close database positions that don't exist on exchange, 
+        and create database records for exchange positions that don't exist in database"""
         try:
         try:
             if self.trading_stats is None:
             if self.trading_stats is None:
                 from ..stats.trading_stats import TradingStats
                 from ..stats.trading_stats import TradingStats
@@ -102,6 +104,7 @@ class PositionTracker:
             
             
             open_trades = self.trading_stats.get_open_positions()
             open_trades = self.trading_stats.get_open_positions()
             
             
+            # PART 1: Close database positions that don't exist on exchange
             for trade in open_trades:
             for trade in open_trades:
                 symbol = trade.get('symbol', '')
                 symbol = trade.get('symbol', '')
                 if not symbol:
                 if not symbol:
@@ -128,9 +131,55 @@ class PositionTracker:
                     
                     
                     # Reuse existing position closed handler - consistent behavior!
                     # Reuse existing position closed handler - consistent behavior!
                     await self._handle_position_closed(token, simulated_position)
                     await self._handle_position_closed(token, simulated_position)
+            
+            # PART 2: Create database records for exchange positions that don't exist in database
+            # Get current open trades after potential closures above
+            current_open_trades = self.trading_stats.get_open_positions()
+            database_tokens = {trade.get('symbol', '').split('/')[0] for trade in current_open_trades if trade.get('symbol')}
+            
+            for token, position_data in self.current_positions.items():
+                if token not in database_tokens:
+                    logger.info(f"🔄 Found exchange position for {token} with no database record - creating trade record")
+                    
+                    # Create new trade record using existing process_trade_complete_cycle method
+                    # but we'll need to handle this differently since we don't have entry/exit
+                    # Instead, we'll create a manual position record
+                    
+                    full_symbol = f"{token}/USDC:USDC"
+                    size = abs(position_data['size'])
+                    side = 'sell' if position_data['size'] < 0 else 'buy'  # sell=short, buy=long
+                    entry_price = position_data['entry_px']
+                    
+                    # Create a trade lifecycle record for this existing position
+                    lifecycle_id = str(uuid.uuid4())
+                    timestamp = datetime.now(timezone.utc).isoformat()
+                    
+                    # Insert into trades table
+                    query = """
+                        INSERT INTO trades (
+                            trade_lifecycle_id, symbol, side, amount, price, value,
+                            entry_price, current_position_size, position_side, status,
+                            position_opened_at, timestamp, updated_at, trade_type
+                        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+                    """
+                    
+                    position_side = 'short' if side == 'sell' else 'long'
+                    value = size * entry_price
+                    
+                    params = (
+                        lifecycle_id, full_symbol, side, size, entry_price, value,
+                        entry_price, size, position_side, 'position_opened',
+                        timestamp, timestamp, timestamp, 'sync_detected'
+                    )
+                    
+                    self.trading_stats.db_manager._execute_query(query, params)
+                    
+                    logger.info(f"✅ Created database record for {token} position: {side} {size} @ ${entry_price}")
                         
                         
         except Exception as e:
         except Exception as e:
             logger.error(f"Error syncing database: {e}")
             logger.error(f"Error syncing database: {e}")
+            import traceback
+            traceback.print_exc()
             
             
     async def _update_database_market_data(self):
     async def _update_database_market_data(self):
         """Update database with current market data for open positions"""
         """Update database with current market data for open positions"""
@@ -199,8 +248,8 @@ class PositionTracker:
                 
                 
             logger.info(f"📊 Raw positions data from exchange: {len(positions)} positions")
             logger.info(f"📊 Raw positions data from exchange: {len(positions)} positions")
             # Log first position structure for debugging
             # Log first position structure for debugging
-            if positions:
-                logger.info(f"📊 Sample position structure: {positions[0]}")
+            #if positions:
+            #    logger.info(f"📊 Sample position structure: {positions[0]}")
                 
                 
             logger.debug(f"📊 Processing {len(positions)} positions from exchange...")
             logger.debug(f"📊 Processing {len(positions)} positions from exchange...")
             
             

+ 1 - 1
src/stats/performance_calculator.py

@@ -211,7 +211,7 @@ class PerformanceCalculator:
             
             
             # Calculate ROE from realized PnL and entry volume
             # Calculate ROE from realized PnL and entry volume
             total_pnl = token.get('total_realized_pnl', 0)
             total_pnl = token.get('total_realized_pnl', 0)
-            entry_volume = token.get('completed_entry_volume', 0)
+            entry_volume = token.get('total_entry_volume', 0)
             token['roe_percentage'] = (total_pnl / entry_volume * 100) if entry_volume > 0 else 0.0
             token['roe_percentage'] = (total_pnl / entry_volume * 100) if entry_volume > 0 else 0.0
             
             
             # Format durations
             # Format durations

+ 1 - 1
trading_bot.py

@@ -14,7 +14,7 @@ from datetime import datetime
 from pathlib import Path
 from pathlib import Path
 
 
 # Bot version
 # Bot version
-BOT_VERSION = "2.6.305"
+BOT_VERSION = "2.6.306"
 
 
 # Add src directory to Python path
 # Add src directory to Python path
 sys.path.insert(0, str(Path(__file__).parent / "src"))
 sys.path.insert(0, str(Path(__file__).parent / "src"))