""" Copy Trading API Router Provides endpoints for: - Copy trading status and control - Account analysis functionality - Target trader evaluation """ from fastapi import APIRouter, HTTPException, Query, BackgroundTasks from pydantic import BaseModel, Field from typing import List, Optional, Dict, Any from datetime import datetime import asyncio import logging # Import our existing classes from ...monitoring.copy_trading_monitor import CopyTradingMonitor from ...monitoring.copy_trading_state import CopyTradingStateManager from ...clients.hyperliquid_client import HyperliquidClient from ...notifications.notification_manager import NotificationManager from ...config.config import Config # Import the account analyzer utility import sys import os sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', '..', 'utils')) from hyperliquid_account_analyzer import HyperliquidAccountAnalyzer, AccountStats router = APIRouter(prefix="/copy-trading", tags=["copy-trading"]) logger = logging.getLogger(__name__) # Pydantic Models class CopyTradingStatus(BaseModel): enabled: bool target_address: Optional[str] portfolio_percentage: float copy_mode: str max_leverage: float target_positions: int our_positions: int tracked_positions: int copied_trades: int session_start_time: Optional[datetime] session_duration_hours: Optional[float] last_check: Optional[datetime] class CopyTradingConfig(BaseModel): target_address: str = Field(..., description="Target trader address to copy") portfolio_percentage: float = Field(default=0.1, ge=0.01, le=0.5, description="Portfolio percentage (1-50%)") copy_mode: str = Field(default="FIXED", description="Copy mode: FIXED or PROPORTIONAL") max_leverage: float = Field(default=10.0, ge=1.0, le=20.0, description="Maximum leverage (1-20x)") min_position_size: float = Field(default=25.0, ge=10.0, description="Minimum position size in USD") execution_delay: float = Field(default=2.0, ge=0.0, le=10.0, description="Execution delay in seconds") notifications_enabled: bool = Field(default=True, description="Enable notifications") class AccountAnalysisRequest(BaseModel): addresses: List[str] = Field(..., description="List of addresses to analyze") limit: Optional[int] = Field(default=10, ge=1, le=50, description="Limit results") class AccountStatsResponse(BaseModel): address: str total_pnl: float win_rate: float total_trades: int avg_trade_duration_hours: float max_drawdown: float avg_position_size: float max_leverage_used: float avg_leverage_used: float trading_frequency_per_day: float risk_reward_ratio: float profit_factor: float active_positions: int current_drawdown: float last_trade_timestamp: int analysis_period_days: int is_copyable: bool copyability_reason: str unique_tokens_traded: int trading_type: str top_tokens: List[str] short_percentage: float trading_style: str buy_sell_ratio: float relative_score: Optional[float] = None class LeaderboardRequest(BaseModel): window: str = Field(default="7d", description="Time window: 1d, 7d, 30d, allTime") limit: int = Field(default=10, ge=1, le=50, description="Number of top accounts") class BalanceTestResponse(BaseModel): our_balance: float target_balance: float portfolio_percentage: float test_price: float test_leverage: float margin_to_use: float position_value: float token_amount: float min_position_size: float would_execute: bool config_enabled: bool state_enabled: bool error: Optional[str] = None class SingleAccountAnalysisRequest(BaseModel): address: str = Field(..., description="Address to analyze in detail") # Global instances copy_trading_monitor: Optional[CopyTradingMonitor] = None state_manager = CopyTradingStateManager() def get_copy_trading_monitor(): """Get or create copy trading monitor instance""" global copy_trading_monitor if copy_trading_monitor is None: try: config = Config() client = HyperliquidClient() notification_manager = NotificationManager() copy_trading_monitor = CopyTradingMonitor(client, notification_manager) except Exception as e: logger.error(f"Failed to create copy trading monitor: {e}") raise HTTPException(status_code=500, detail="Failed to initialize copy trading system") return copy_trading_monitor @router.get("/status", response_model=CopyTradingStatus) async def get_copy_trading_status(): """Get current copy trading status""" try: monitor = get_copy_trading_monitor() status = monitor.get_status() return CopyTradingStatus( enabled=status['enabled'], target_address=status['target_address'], portfolio_percentage=status['portfolio_percentage'], copy_mode=status['copy_mode'], max_leverage=status['max_leverage'], target_positions=status['target_positions'], our_positions=status['our_positions'], tracked_positions=status['tracked_positions'], copied_trades=status['copied_trades'], session_start_time=status['session_start_time'], session_duration_hours=status['session_duration_hours'], last_check=status['last_check'] ) except Exception as e: logger.error(f"Error getting copy trading status: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/start") async def start_copy_trading(config: CopyTradingConfig, background_tasks: BackgroundTasks): """Start copy trading with the specified configuration""" try: monitor = get_copy_trading_monitor() # Update configuration monitor.target_address = config.target_address monitor.portfolio_percentage = config.portfolio_percentage monitor.copy_mode = config.copy_mode monitor.max_leverage = config.max_leverage monitor.min_position_size = config.min_position_size monitor.execution_delay = config.execution_delay monitor.notifications_enabled = config.notifications_enabled monitor.enabled = True # Start monitoring in background background_tasks.add_task(monitor.start_monitoring) return {"message": "Copy trading started successfully", "target_address": config.target_address} except Exception as e: logger.error(f"Error starting copy trading: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/stop") async def stop_copy_trading(): """Stop copy trading""" try: monitor = get_copy_trading_monitor() await monitor.stop_monitoring() return {"message": "Copy trading stopped successfully"} except Exception as e: logger.error(f"Error stopping copy trading: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/config") async def get_copy_trading_config(): """Get current copy trading configuration""" try: config = Config() return { "target_address": config.COPY_TRADING_TARGET_ADDRESS, "portfolio_percentage": config.COPY_TRADING_PORTFOLIO_PERCENTAGE, "copy_mode": config.COPY_TRADING_MODE, "max_leverage": config.COPY_TRADING_MAX_LEVERAGE, "min_position_size": config.COPY_TRADING_MIN_POSITION_SIZE, "execution_delay": config.COPY_TRADING_EXECUTION_DELAY, "notifications_enabled": config.COPY_TRADING_NOTIFICATIONS } except Exception as e: logger.error(f"Error getting copy trading config: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/test-balance", response_model=BalanceTestResponse) async def test_balance_fetching(): """Test balance fetching and position sizing for debugging""" try: monitor = get_copy_trading_monitor() result = await monitor.test_balance_fetching() if 'error' in result: return BalanceTestResponse( our_balance=0, target_balance=0, portfolio_percentage=0, test_price=0, test_leverage=0, margin_to_use=0, position_value=0, token_amount=0, min_position_size=0, would_execute=False, config_enabled=False, state_enabled=False, error=result['error'] ) return BalanceTestResponse(**result) except Exception as e: logger.error(f"Error testing balance: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/analyze-accounts", response_model=List[AccountStatsResponse]) async def analyze_accounts(request: AccountAnalysisRequest): """Analyze multiple Hyperliquid accounts for copy trading suitability""" try: async with HyperliquidAccountAnalyzer() as analyzer: results = await analyzer.analyze_multiple_accounts(request.addresses) if not results: return [] # Calculate relative scores def calculate_relative_score(stats: AccountStats, all_stats: List[AccountStats]) -> float: """Calculate relative score for ranking accounts""" score = 0.0 # Copyability (35% weight) if stats.trading_frequency_per_day > 50: score += 0 # HFT bots get 0 elif stats.trading_frequency_per_day < 1: score += 5 # Inactive accounts get 5 elif 1 <= stats.trading_frequency_per_day <= 20: ideal_freq = 15 freq_distance = abs(stats.trading_frequency_per_day - ideal_freq) score += max(0, 35 - (freq_distance * 1.5)) else: score += 15 # Questionable frequency # Profitability (30% weight) if stats.total_pnl < 0: pnl_range = max([s.total_pnl for s in all_stats]) - min([s.total_pnl for s in all_stats]) if pnl_range > 0: loss_severity = abs(stats.total_pnl) / pnl_range score += -15 * loss_severity else: score += -15 elif stats.total_pnl == 0: score += 0 else: max_pnl = max([s.total_pnl for s in all_stats if s.total_pnl > 0], default=1) score += (stats.total_pnl / max_pnl) * 30 # Risk management (20% weight) if stats.max_drawdown > 0.5: score += -10 elif stats.max_drawdown > 0.25: score += -5 elif stats.max_drawdown > 0.15: score += 5 elif stats.max_drawdown > 0.05: score += 15 else: score += 20 # Account maturity (10% weight) if stats.analysis_period_days < 7: score += 0 elif stats.analysis_period_days < 14: score += 2 elif stats.analysis_period_days < 30: score += 5 else: max_age = max([s.analysis_period_days for s in all_stats]) age_ratio = min(1.0, (stats.analysis_period_days - 30) / max(1, max_age - 30)) score += 7 + (age_ratio * 3) # Win rate (5% weight) win_rates = [s.win_rate for s in all_stats] if max(win_rates) > min(win_rates): winrate_normalized = (stats.win_rate - min(win_rates)) / (max(win_rates) - min(win_rates)) score += winrate_normalized * 5 else: score += 2.5 return score # Convert to response format with relative scores response_list = [] for stats in results: relative_score = calculate_relative_score(stats, results) response_list.append(AccountStatsResponse( address=stats.address, total_pnl=stats.total_pnl, win_rate=stats.win_rate, total_trades=stats.total_trades, avg_trade_duration_hours=stats.avg_trade_duration_hours, max_drawdown=stats.max_drawdown, avg_position_size=stats.avg_position_size, max_leverage_used=stats.max_leverage_used, avg_leverage_used=stats.avg_leverage_used, trading_frequency_per_day=stats.trading_frequency_per_day, risk_reward_ratio=stats.risk_reward_ratio, profit_factor=stats.profit_factor, active_positions=stats.active_positions, current_drawdown=stats.current_drawdown, last_trade_timestamp=stats.last_trade_timestamp, analysis_period_days=stats.analysis_period_days, is_copyable=stats.is_copyable, copyability_reason=stats.copyability_reason, unique_tokens_traded=stats.unique_tokens_traded, trading_type=stats.trading_type, top_tokens=stats.top_tokens, short_percentage=stats.short_percentage, trading_style=stats.trading_style, buy_sell_ratio=stats.buy_sell_ratio, relative_score=relative_score )) # Sort by relative score response_list.sort(key=lambda x: x.relative_score or 0, reverse=True) return response_list[:request.limit] except Exception as e: logger.error(f"Error analyzing accounts: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/get-leaderboard", response_model=List[str]) async def get_leaderboard_accounts(request: LeaderboardRequest): """Get top accounts from curated leaderboard""" try: async with HyperliquidAccountAnalyzer() as analyzer: addresses = await analyzer.get_top_accounts_from_leaderboard(request.window, request.limit) return addresses or [] except Exception as e: logger.error(f"Error getting leaderboard: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/session-info") async def get_session_info(): """Get detailed session information""" try: session_info = state_manager.get_session_info() return session_info except Exception as e: logger.error(f"Error getting session info: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/reset-state") async def reset_copy_trading_state(): """Reset copy trading state (use with caution)""" try: state_manager.reset_state() return {"message": "Copy trading state reset successfully"} except Exception as e: logger.error(f"Error resetting state: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/analyze-single-account", response_model=Dict[str, Any]) async def analyze_single_account(request: SingleAccountAnalysisRequest): """Get detailed analysis for a single account including current positions and recent trades""" try: async with HyperliquidAccountAnalyzer() as analyzer: # Get the account stats stats = await analyzer.analyze_account(request.address) if not stats: raise HTTPException(status_code=404, detail="Account not found or no trading data") # Get current positions and recent trades account_state = await analyzer.get_account_state(request.address) recent_fills = await analyzer.get_user_fills(request.address, limit=20) # Parse current positions current_positions = [] if account_state: positions = analyzer.parse_positions(account_state) for pos in positions: side_emoji = "📈" if pos.side == "long" else "📉" pnl_emoji = "✅" if pos.unrealized_pnl >= 0 else "❌" current_positions.append({ "coin": pos.coin, "side": pos.side, "side_emoji": side_emoji, "size": pos.size, "entry_price": pos.entry_price, "mark_price": pos.mark_price, "unrealized_pnl": pos.unrealized_pnl, "pnl_emoji": pnl_emoji, "leverage": pos.leverage, "margin_used": pos.margin_used, "position_value": abs(pos.size * pos.mark_price) }) # Parse recent trades recent_trades = [] if recent_fills: trades = analyzer.parse_trades(recent_fills) for trade in trades[-10:]: # Last 10 trades side_emoji = "🟢" if trade.side == "buy" else "🔴" trade_time = datetime.fromtimestamp(trade.timestamp / 1000) recent_trades.append({ "coin": trade.coin, "side": trade.side, "side_emoji": side_emoji, "size": trade.size, "price": trade.price, "value": trade.size * trade.price, "fee": trade.fee, "timestamp": trade_time.strftime('%Y-%m-%d %H:%M:%S'), "is_maker": trade.is_maker }) # Calculate total position value and unrealized PnL total_position_value = sum(pos["position_value"] for pos in current_positions) total_unrealized_pnl = sum(pos["unrealized_pnl"] for pos in current_positions) # Convert stats to dict and add extra details stats_dict = { "address": stats.address, "total_pnl": stats.total_pnl, "win_rate": stats.win_rate, "total_trades": stats.total_trades, "avg_trade_duration_hours": stats.avg_trade_duration_hours, "max_drawdown": stats.max_drawdown, "avg_position_size": stats.avg_position_size, "max_leverage_used": stats.max_leverage_used, "avg_leverage_used": stats.avg_leverage_used, "trading_frequency_per_day": stats.trading_frequency_per_day, "risk_reward_ratio": stats.risk_reward_ratio, "profit_factor": stats.profit_factor, "active_positions": stats.active_positions, "current_drawdown": stats.current_drawdown, "last_trade_timestamp": stats.last_trade_timestamp, "analysis_period_days": stats.analysis_period_days, "is_copyable": stats.is_copyable, "copyability_reason": stats.copyability_reason, "unique_tokens_traded": stats.unique_tokens_traded, "trading_type": stats.trading_type, "top_tokens": stats.top_tokens, "short_percentage": stats.short_percentage, "trading_style": stats.trading_style, "buy_sell_ratio": stats.buy_sell_ratio } # Calculate relative score def calculate_single_account_score(stats): score = 0.0 # Copyability (35% weight) if stats.trading_frequency_per_day > 50: score += 0 elif stats.trading_frequency_per_day < 1: score += 5 elif 1 <= stats.trading_frequency_per_day <= 20: ideal_freq = 15 freq_distance = abs(stats.trading_frequency_per_day - ideal_freq) score += max(0, 35 - (freq_distance * 1.5)) else: score += 15 # Profitability (30% weight) if stats.total_pnl < 0: score += -15 elif stats.total_pnl == 0: score += 0 else: # Score based on PnL magnitude (assuming $10k is excellent) score += min(30, (stats.total_pnl / 10000) * 30) # Risk management (20% weight) if stats.max_drawdown > 0.5: score += -10 elif stats.max_drawdown > 0.25: score += -5 elif stats.max_drawdown > 0.15: score += 5 elif stats.max_drawdown > 0.05: score += 15 else: score += 20 # Account maturity (10% weight) if stats.analysis_period_days < 7: score += 0 elif stats.analysis_period_days < 14: score += 2 elif stats.analysis_period_days < 30: score += 5 else: score += 7 + min(3, (stats.analysis_period_days - 30) / 30) # Win rate (5% weight) score += stats.win_rate * 5 return score relative_score = calculate_single_account_score(stats) stats_dict["relative_score"] = relative_score # Determine recommendation if relative_score >= 60: recommendation = "🟢 HIGHLY RECOMMENDED" portfolio_allocation = "10-25% (confident allocation)" max_leverage_limit = "5-10x" elif relative_score >= 40: recommendation = "🟡 MODERATELY RECOMMENDED" portfolio_allocation = "5-15% (moderate allocation)" max_leverage_limit = "3-5x" elif relative_score >= 20: recommendation = "🟠 PROCEED WITH CAUTION" portfolio_allocation = "2-5% (very small allocation)" max_leverage_limit = "2-3x" elif relative_score >= 0: recommendation = "🔴 NOT RECOMMENDED" portfolio_allocation = "DO NOT COPY (Risky)" max_leverage_limit = "N/A" else: recommendation = "⛔ DANGEROUS" portfolio_allocation = "DO NOT COPY (Negative Score)" max_leverage_limit = "N/A" # Evaluation points evaluation = [] is_hft_pattern = stats.trading_frequency_per_day > 50 is_copyable = 1 <= stats.trading_frequency_per_day <= 20 if is_hft_pattern: evaluation.append("❌ HFT/Bot pattern detected") elif stats.trading_frequency_per_day < 1: evaluation.append("❌ Too inactive for copy trading") elif is_copyable: evaluation.append("✅ Human-like trading pattern") if stats.total_pnl > 0: evaluation.append("✅ Profitable track record") else: evaluation.append("❌ Not profitable") if stats.max_drawdown < 0.15: evaluation.append("✅ Good risk management") elif stats.max_drawdown < 0.25: evaluation.append("⚠️ Moderate risk") else: evaluation.append("❌ High risk (excessive drawdown)") if 2 <= stats.avg_trade_duration_hours <= 48: evaluation.append("✅ Suitable trade duration") elif stats.avg_trade_duration_hours < 2: evaluation.append("⚠️ Very short trades (scalping)") else: evaluation.append("⚠️ Long hold times") return { "stats": stats_dict, "current_positions": current_positions, "recent_trades": recent_trades, "position_summary": { "total_position_value": total_position_value, "total_unrealized_pnl": total_unrealized_pnl, "position_count": len(current_positions) }, "recommendation": { "overall": recommendation, "portfolio_allocation": portfolio_allocation, "max_leverage_limit": max_leverage_limit, "evaluation_points": evaluation }, "trading_type_display": { "perps": "🔄 Perpetuals", "spot": "💱 Spot Trading", "mixed": "🔀 Mixed (Spot + Perps)", "unknown": "❓ Unknown" }.get(stats.trading_type, f"❓ {stats.trading_type}"), "buy_sell_ratio_display": "∞ (only buys)" if stats.buy_sell_ratio == float('inf') else "0 (only sells)" if stats.buy_sell_ratio == 0 else f"{stats.buy_sell_ratio:.2f}" } except Exception as e: logger.error(f"Error analyzing single account {request.address}: {e}") raise HTTPException(status_code=500, detail=str(e))