|
@@ -5,6 +5,8 @@ Copy Trading State Management - Handles persistence and tracking of copy trading
|
|
import json
|
|
import json
|
|
import logging
|
|
import logging
|
|
import time
|
|
import time
|
|
|
|
+import asyncio
|
|
|
|
+import aiofiles
|
|
from datetime import datetime
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from pathlib import Path
|
|
from typing import Dict, Set, Optional, Any
|
|
from typing import Dict, Set, Optional, Any
|
|
@@ -39,17 +41,32 @@ class CopyTradingStateManager:
|
|
self.state_file = Path(state_file)
|
|
self.state_file = Path(state_file)
|
|
self.state = CopyTradingState()
|
|
self.state = CopyTradingState()
|
|
self._lock = Lock()
|
|
self._lock = Lock()
|
|
|
|
+ self._initialized = False
|
|
|
|
|
|
- # Ensure data directory exists
|
|
|
|
- self.state_file.parent.mkdir(exist_ok=True)
|
|
|
|
-
|
|
|
|
- # Load existing state
|
|
|
|
- self.load_state()
|
|
|
|
-
|
|
|
|
- logger.info(f"Copy trading state manager initialized - State file: {self.state_file}")
|
|
|
|
|
|
+ # Don't do file I/O during __init__ to prevent blocking
|
|
|
|
+ # File operations will be deferred until first use
|
|
|
|
+ logger.info(f"Copy trading state manager created - State file: {self.state_file}")
|
|
|
|
|
|
- def load_state(self) -> None:
|
|
|
|
- """Load state from file"""
|
|
|
|
|
|
+ def _ensure_initialized(self) -> None:
|
|
|
|
+ """Ensure state is loaded (non-blocking, called when needed)"""
|
|
|
|
+ if self._initialized:
|
|
|
|
+ return
|
|
|
|
+
|
|
|
|
+ try:
|
|
|
|
+ # Ensure data directory exists
|
|
|
|
+ self.state_file.parent.mkdir(exist_ok=True)
|
|
|
|
+
|
|
|
|
+ # Load existing state
|
|
|
|
+ self._load_state_sync()
|
|
|
|
+ self._initialized = True
|
|
|
|
+
|
|
|
|
+ except Exception as e:
|
|
|
|
+ logger.error(f"Error during state initialization: {e}")
|
|
|
|
+ self.state = CopyTradingState()
|
|
|
|
+ self._initialized = True
|
|
|
|
+
|
|
|
|
+ def _load_state_sync(self) -> None:
|
|
|
|
+ """Load state from file (synchronous version for internal use)"""
|
|
try:
|
|
try:
|
|
if self.state_file.exists():
|
|
if self.state_file.exists():
|
|
with open(self.state_file, 'r') as f:
|
|
with open(self.state_file, 'r') as f:
|
|
@@ -78,8 +95,34 @@ class CopyTradingStateManager:
|
|
logger.info("Starting with fresh state")
|
|
logger.info("Starting with fresh state")
|
|
self.state = CopyTradingState()
|
|
self.state = CopyTradingState()
|
|
|
|
|
|
|
|
+ async def load_state(self) -> None:
|
|
|
|
+ """Load state from file (async version)"""
|
|
|
|
+ try:
|
|
|
|
+ if self.state_file.exists():
|
|
|
|
+ async with aiofiles.open(self.state_file, 'r') as f:
|
|
|
|
+ content = await f.read()
|
|
|
|
+ data = json.loads(content)
|
|
|
|
+
|
|
|
|
+ # Convert copied_trades back to set
|
|
|
|
+ if 'copied_trades' in data and isinstance(data['copied_trades'], list):
|
|
|
|
+ data['copied_trades'] = set(data['copied_trades'])
|
|
|
|
+
|
|
|
|
+ # Update state with loaded data
|
|
|
|
+ for key, value in data.items():
|
|
|
|
+ if hasattr(self.state, key):
|
|
|
|
+ setattr(self.state, key, value)
|
|
|
|
+
|
|
|
|
+ logger.info(f"✅ Loaded copy trading state from {self.state_file}")
|
|
|
|
+
|
|
|
|
+ else:
|
|
|
|
+ logger.info(f"No existing state file found, starting fresh")
|
|
|
|
+
|
|
|
|
+ except Exception as e:
|
|
|
|
+ logger.error(f"Error loading copy trading state: {e}")
|
|
|
|
+ self.state = CopyTradingState()
|
|
|
|
+
|
|
def save_state(self) -> None:
|
|
def save_state(self) -> None:
|
|
- """Save current state to file"""
|
|
|
|
|
|
+ """Save current state to file (synchronous version)"""
|
|
try:
|
|
try:
|
|
with self._lock:
|
|
with self._lock:
|
|
# Convert state to dict
|
|
# Convert state to dict
|
|
@@ -102,8 +145,34 @@ class CopyTradingStateManager:
|
|
except Exception as e:
|
|
except Exception as e:
|
|
logger.error(f"Error saving copy trading state: {e}")
|
|
logger.error(f"Error saving copy trading state: {e}")
|
|
|
|
|
|
|
|
+ async def save_state_async(self) -> None:
|
|
|
|
+ """Save current state to file (async version)"""
|
|
|
|
+ try:
|
|
|
|
+ # Convert state to dict
|
|
|
|
+ state_dict = asdict(self.state)
|
|
|
|
+
|
|
|
|
+ # Convert set to list for JSON serialization
|
|
|
|
+ if 'copied_trades' in state_dict:
|
|
|
|
+ state_dict['copied_trades'] = list(state_dict['copied_trades'])
|
|
|
|
+
|
|
|
|
+ # Write to temporary file first, then rename (atomic operation)
|
|
|
|
+ temp_file = self.state_file.with_suffix('.tmp')
|
|
|
|
+ async with aiofiles.open(temp_file, 'w') as f:
|
|
|
|
+ await f.write(json.dumps(state_dict, indent=2))
|
|
|
|
+
|
|
|
|
+ # Atomic rename (still sync, but very fast)
|
|
|
|
+ temp_file.rename(self.state_file)
|
|
|
|
+
|
|
|
|
+ logger.debug(f"💾 Saved copy trading state to {self.state_file}")
|
|
|
|
+
|
|
|
|
+ except Exception as e:
|
|
|
|
+ logger.error(f"Error saving copy trading state: {e}")
|
|
|
|
+
|
|
def start_copy_trading(self, target_address: str) -> None:
|
|
def start_copy_trading(self, target_address: str) -> None:
|
|
"""Start copy trading for a target address"""
|
|
"""Start copy trading for a target address"""
|
|
|
|
+ # Ensure state is loaded first
|
|
|
|
+ self._ensure_initialized()
|
|
|
|
+
|
|
with self._lock:
|
|
with self._lock:
|
|
current_time = int(time.time() * 1000)
|
|
current_time = int(time.time() * 1000)
|
|
|
|
|
|
@@ -140,6 +209,7 @@ class CopyTradingStateManager:
|
|
|
|
|
|
def should_copy_position(self, coin: str, position_data: Dict) -> bool:
|
|
def should_copy_position(self, coin: str, position_data: Dict) -> bool:
|
|
"""Determine if we should copy a position"""
|
|
"""Determine if we should copy a position"""
|
|
|
|
+ self._ensure_initialized()
|
|
with self._lock:
|
|
with self._lock:
|
|
# If we haven't seen this position before, it's new
|
|
# If we haven't seen this position before, it's new
|
|
if coin not in self.state.tracked_positions:
|
|
if coin not in self.state.tracked_positions:
|
|
@@ -200,6 +270,7 @@ class CopyTradingStateManager:
|
|
|
|
|
|
def has_copied_trade(self, trade_id: str) -> bool:
|
|
def has_copied_trade(self, trade_id: str) -> bool:
|
|
"""Check if we've already copied a trade"""
|
|
"""Check if we've already copied a trade"""
|
|
|
|
+ self._ensure_initialized()
|
|
with self._lock:
|
|
with self._lock:
|
|
return trade_id in self.state.copied_trades
|
|
return trade_id in self.state.copied_trades
|
|
|
|
|
|
@@ -256,14 +327,19 @@ class CopyTradingStateManager:
|
|
logger.info("🔄 Copy trading state reset")
|
|
logger.info("🔄 Copy trading state reset")
|
|
|
|
|
|
def get_tracked_positions(self) -> Dict[str, Dict]:
|
|
def get_tracked_positions(self) -> Dict[str, Dict]:
|
|
- """Get current tracked positions"""
|
|
|
|
|
|
+ """Get currently tracked positions"""
|
|
|
|
+ self._ensure_initialized()
|
|
with self._lock:
|
|
with self._lock:
|
|
return self.state.tracked_positions.copy()
|
|
return self.state.tracked_positions.copy()
|
|
|
|
|
|
def is_enabled(self) -> bool:
|
|
def is_enabled(self) -> bool:
|
|
"""Check if copy trading is enabled"""
|
|
"""Check if copy trading is enabled"""
|
|
- return self.state.enabled
|
|
|
|
|
|
+ self._ensure_initialized()
|
|
|
|
+ with self._lock:
|
|
|
|
+ return self.state.enabled
|
|
|
|
|
|
def get_target_address(self) -> Optional[str]:
|
|
def get_target_address(self) -> Optional[str]:
|
|
- """Get current target address"""
|
|
|
|
- return self.state.target_address
|
|
|
|
|
|
+ """Get the target address"""
|
|
|
|
+ self._ensure_initialized()
|
|
|
|
+ with self._lock:
|
|
|
|
+ return self.state.target_address
|