import asyncio import logging import uuid from typing import Dict, List, Optional, Any from datetime import datetime, timezone from ..clients.hyperliquid_client import HyperliquidClient from ..notifications.notification_manager import NotificationManager from ..config.config import Config logger = logging.getLogger(__name__) class PositionTracker: """ Simplified position tracker that mirrors exchange state. Monitors for position changes and saves stats when positions close. """ def __init__(self, hl_client: HyperliquidClient, notification_manager: NotificationManager): self.hl_client = hl_client self.notification_manager = notification_manager self.trading_stats = None # Will be lazy loaded # Track current positions self.current_positions: Dict[str, Dict] = {} self.is_running = False async def start(self): """Start position tracking""" if self.is_running: return self.is_running = True logger.info("🔄 Starting position tracker") try: # Initialize current positions logger.info("📊 Initializing current positions...") await self._update_current_positions() logger.info(f"✅ Position tracker initialized with {len(self.current_positions)} open positions") # Start monitoring loop logger.info("🔄 Starting position monitoring loop...") asyncio.create_task(self._monitoring_loop()) logger.info("✅ Position tracker started successfully") except Exception as e: logger.error(f"❌ Error starting position tracker: {e}", exc_info=True) self.is_running = False raise async def stop(self): """Stop position tracking""" self.is_running = False logger.info("Stopping position tracker") async def _monitoring_loop(self): """Main monitoring loop""" logger.info(f"🔄 Position tracker monitoring loop started (heartbeat: {Config.BOT_HEARTBEAT_SECONDS}s)") loop_count = 0 while self.is_running: try: loop_count += 1 logger.debug(f"📊 Position tracker loop #{loop_count} - checking for position changes...") await self._check_position_changes() # Log periodically to show it's alive if loop_count % 12 == 0: # Every 12 loops (60 seconds with 5s heartbeat) logger.info(f"📊 Position tracker alive - loop #{loop_count}, {len(self.current_positions)} positions tracked") await asyncio.sleep(Config.BOT_HEARTBEAT_SECONDS) # Use config heartbeat except Exception as e: logger.error(f"❌ Error in position tracking loop #{loop_count}: {e}", exc_info=True) await asyncio.sleep(Config.BOT_HEARTBEAT_SECONDS) logger.info("🛑 Position tracker monitoring loop stopped") async def _check_position_changes(self): """Check for any position changes""" try: previous_positions = self.current_positions.copy() await self._update_current_positions() # Compare with previous positions (simple exchange state tracking) await self._process_position_changes(previous_positions, self.current_positions) # Simple database sync check (once per cycle) await self._sync_database_once() # Update database with current market data for open positions await self._update_database_market_data() except Exception as e: logger.error(f"Error checking position changes: {e}") async def _sync_database_once(self): """Simple bidirectional sync: close database positions that don't exist on exchange, and create database records for exchange positions that don't exist in database""" try: if self.trading_stats is None: from ..stats.trading_stats import TradingStats self.trading_stats = TradingStats() open_trades = self.trading_stats.get_open_positions() # PART 1: Close database positions that don't exist on exchange for trade in open_trades: symbol = trade.get('symbol', '') if not symbol: continue token = symbol.split('/')[0] if '/' in symbol else symbol # If database position doesn't exist on exchange, close it if token not in self.current_positions: # Create simulated position object from database data entry_price = float(trade.get('entry_price', 0)) amount = float(trade.get('amount', 0)) side = trade.get('side', '').lower() simulated_position = { 'size': -amount if side == 'sell' else amount, # sell=short(negative), buy=long(positive) 'entry_px': entry_price, 'unrealized_pnl': 0, # Will be calculated 'margin_used': 0, 'max_leverage': 1, 'current_leverage': 1, 'return_on_equity': 0 } # Reuse existing position closed handler - consistent behavior! await self._handle_position_closed(token, simulated_position) # PART 2: Create database records for exchange positions that don't exist in database # Get current open trades after potential closures above current_open_trades = self.trading_stats.get_open_positions() database_tokens = {trade.get('symbol', '').split('/')[0] for trade in current_open_trades if trade.get('symbol')} for token, position_data in self.current_positions.items(): if token not in database_tokens: logger.info(f"🔄 Found exchange position for {token} with no database record - creating trade record") # Create new trade record using existing process_trade_complete_cycle method # but we'll need to handle this differently since we don't have entry/exit # Instead, we'll create a manual position record full_symbol = f"{token}/USDC:USDC" size = abs(position_data['size']) side = 'sell' if position_data['size'] < 0 else 'buy' # sell=short, buy=long entry_price = position_data['entry_px'] # Create a trade lifecycle record for this existing position lifecycle_id = str(uuid.uuid4()) timestamp = datetime.now(timezone.utc).isoformat() # Insert into trades table query = """ INSERT INTO trades ( trade_lifecycle_id, symbol, side, amount, price, value, entry_price, current_position_size, position_side, status, position_opened_at, timestamp, updated_at, trade_type ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """ position_side = 'short' if side == 'sell' else 'long' value = size * entry_price params = ( lifecycle_id, full_symbol, side, size, entry_price, value, entry_price, size, position_side, 'position_opened', timestamp, timestamp, timestamp, 'sync_detected' ) self.trading_stats.db_manager._execute_query(query, params) logger.info(f"✅ Created database record for {token} position: {side} {size} @ ${entry_price}") except Exception as e: logger.error(f"Error syncing database: {e}") import traceback traceback.print_exc() async def _update_database_market_data(self): """Update database with current market data for open positions""" try: # Lazy load TradingStats if needed if self.trading_stats is None: from ..stats.trading_stats import TradingStats self.trading_stats = TradingStats() # Get open trades from database open_trades = self.trading_stats.get_open_positions() for trade in open_trades: try: symbol = trade.get('symbol', '') if not symbol: continue # Extract token from symbol (e.g., "BTC/USDC:USDC" -> "BTC") token = symbol.split('/')[0] if '/' in symbol else symbol # Find corresponding exchange position if token in self.current_positions: pos_data = self.current_positions[token] # Convert exchange ROE from decimal to percentage roe_percentage = pos_data['return_on_equity'] * 100 # Get current leverage from database to compare old_leverage = trade.get('leverage', 0) new_leverage = pos_data['current_leverage'] # Get current market price for mark price and position value calculation current_mark_price = 0.0 try: market_data = self.hl_client.get_market_data(symbol) if market_data and market_data.get('ticker'): current_mark_price = float(market_data['ticker'].get('last', 0)) except Exception as e: logger.debug(f"Could not fetch current market price for {symbol}: {e}") # Fallback to entry price if we can't get current market price if current_mark_price <= 0: current_mark_price = pos_data['entry_px'] # Calculate position value (size * current price) position_size = abs(pos_data['size']) position_value = position_size * current_mark_price # Update database with live market data including position value self.trading_stats.update_trade_market_data( trade_lifecycle_id=trade['trade_lifecycle_id'], current_position_size=position_size, unrealized_pnl=pos_data['unrealized_pnl'], roe_percentage=roe_percentage, mark_price=current_mark_price, position_value=position_value, margin_used=pos_data['margin_used'], leverage=new_leverage # Use current leverage, not max leverage ) # Log leverage changes if old_leverage and abs(old_leverage - new_leverage) > 0.1: logger.info(f"📊 Database updated - Leverage changed for {symbol}: {old_leverage:.1f}x → {new_leverage:.1f}x, " f"Position Value: ${position_value:,.2f}") else: logger.debug(f"Updated market data for {symbol}: leverage={new_leverage:.1f}x, ROE={roe_percentage:.2f}%, " f"mark_price=${current_mark_price:.4f}, value=${position_value:,.2f}") except Exception as e: logger.warning(f"Error updating market data for trade {trade.get('trade_lifecycle_id', 'unknown')}: {e}") continue except Exception as e: logger.error(f"Error updating database market data: {e}") async def _update_current_positions(self): """Update current positions from exchange""" try: logger.debug("🔍 Fetching positions from Hyperliquid client...") positions = self.hl_client.get_positions() # Distinguish between API failure (None) and legitimate empty positions ([]) if positions is None: logger.warning("📊 API failure - could not fetch positions from exchange!") # Don't clear positions during API failures - keep last known state to avoid false "position opened" notifications if not self.current_positions: # Only clear if we truly have no tracked positions (e.g., first startup) self.current_positions = {} else: logger.info(f"📊 Keeping last known positions during API failure: {list(self.current_positions.keys())}") return elif not positions: # Empty list [] - legitimately no positions logger.info("📊 No open positions on exchange - clearing position tracker state") self.current_positions = {} return logger.info(f"📊 Raw positions data from exchange: {len(positions)} positions") # Log first position structure for debugging #if positions: # logger.info(f"📊 Sample position structure: {positions[0]}") logger.debug(f"📊 Processing {len(positions)} positions from exchange...") new_positions = {} for i, position in enumerate(positions): logger.debug(f"📊 Processing position {i+1}: {position}") # Access nested position data from info.position position_data = position.get('info', {}).get('position', {}) if not position_data: logger.warning(f"📊 Position {i+1} has no info.position data: {position}") continue size = float(position_data.get('szi', '0')) logger.debug(f"📊 Position {i+1} size: {size}") if size != 0: # Only include open positions symbol = position_data.get('coin', '') if symbol: # Get actual current leverage from leverage object leverage_info = position_data.get('leverage', {}) if isinstance(leverage_info, dict) and 'value' in leverage_info: current_leverage = float(leverage_info['value']) logger.debug(f"Using current leverage {current_leverage}x for {symbol} (max: {position_data.get('maxLeverage', 'N/A')}x)") else: current_leverage = float(position_data.get('maxLeverage', '1')) logger.debug(f"Fallback to max leverage {current_leverage}x for {symbol} (no current leverage data)") new_positions[symbol] = { 'size': size, 'entry_px': float(position_data.get('entryPx', '0')), 'unrealized_pnl': float(position_data.get('unrealizedPnl', '0')), 'margin_used': float(position_data.get('marginUsed', '0')), 'max_leverage': float(position_data.get('maxLeverage', '1')), 'current_leverage': current_leverage, # Add current leverage 'return_on_equity': float(position_data.get('returnOnEquity', '0')) } # Log position state changes had_positions_before = len(self.current_positions) > 0 getting_positions_now = len(new_positions) > 0 if had_positions_before and not getting_positions_now: logger.info("📊 All positions have been closed on exchange") elif not had_positions_before and getting_positions_now: logger.info(f"📊 New positions detected: {list(new_positions.keys())}") elif had_positions_before and getting_positions_now: logger.debug(f"✅ Updated current positions: {len(new_positions)} open positions ({list(new_positions.keys())})") else: logger.debug(f"✅ Confirmed no open positions on exchange") self.current_positions = new_positions except Exception as e: logger.error(f"❌ Error updating current positions: {e}", exc_info=True) # Don't clear positions on exception - keep last known state to avoid false notifications logger.info(f"📊 Keeping last known positions during exception: {list(self.current_positions.keys()) if self.current_positions else 'none'}") async def _process_position_changes(self, previous: Dict, current: Dict): """Process changes between previous and current positions""" # Find new positions for symbol in current: if symbol not in previous: await self._handle_position_opened(symbol, current[symbol]) # Find closed positions for symbol in previous: if symbol not in current: await self._handle_position_closed(symbol, previous[symbol]) # Find changed positions for symbol in current: if symbol in previous: await self._handle_position_changed(symbol, previous[symbol], current[symbol]) async def _handle_position_opened(self, symbol: str, position: Dict): """Handle new position opened""" try: size = position['size'] side = "Long" if size > 0 else "Short" message = ( f"🟢 Position Opened\n" f"Token: {symbol}\n" f"Side: {side}\n" f"Size: {abs(size):.4f}\n" f"Entry: ${position['entry_px']:.4f}\n" f"Leverage: {position.get('current_leverage', position['max_leverage']):.1f}x\n\n" f"💡 Use /positions to see current positions" ) await self.notification_manager.send_generic_notification(message) logger.info(f"Position opened: {symbol} {side} {abs(size)}") except Exception as e: logger.error(f"Error handling position opened for {symbol}: {e}") async def _handle_position_closed(self, symbol: str, position: Dict): """Handle position closed - find and close the corresponding database trade""" try: # Lazy load TradingStats if needed if self.trading_stats is None: from ..stats.trading_stats import TradingStats self.trading_stats = TradingStats() # Construct full symbol format (symbol here is just token name like "BTC") full_symbol = f"{symbol}/USDC:USDC" # Find the open trade in database for this symbol open_trade = self.trading_stats.get_trade_by_symbol_and_status(full_symbol, 'position_opened') if not open_trade: logger.warning(f"No open trade found in database for {full_symbol} - position was closed on exchange but no database record") return lifecycle_id = open_trade['trade_lifecycle_id'] entry_price = position['entry_px'] size = abs(position['size']) side = "Long" if position['size'] > 0 else "Short" # Get current market price for exit calculation market_data = self.hl_client.get_market_data(full_symbol) if not market_data: logger.error(f"Could not get market data for {full_symbol}") return current_price = float(market_data.get('ticker', {}).get('last', 0)) # Calculate realized PnL if side == "Long": realized_pnl = (current_price - entry_price) * size else: realized_pnl = (entry_price - current_price) * size # Close the trade in database success = await self.trading_stats.update_trade_position_closed( lifecycle_id=lifecycle_id, exit_price=current_price, realized_pnl=realized_pnl, exchange_fill_id="position_tracker_detected_closure" ) if success: # Migrate to aggregated stats (token_stats table, etc.) self.trading_stats.migrate_trade_to_aggregated_stats(lifecycle_id) # Send clean notification pnl_emoji = "🟢" if realized_pnl >= 0 else "🔴" message = ( f"{pnl_emoji} Position Closed\n" f"Token: {symbol}\n" f"Side: {side}\n" f"Size: {size:.4f}\n" f"Entry: ${entry_price:.4f}\n" f"Exit: ${current_price:.4f}\n" f"PnL: ${realized_pnl:.3f}\n\n" f"💡 Use /positions to see current positions" ) await self.notification_manager.send_generic_notification(message) logger.info(f"Position closed: {symbol} {side} PnL: ${realized_pnl:.3f}") else: logger.error(f"Failed to close trade {lifecycle_id} for {symbol}") except Exception as e: logger.error(f"Error handling position closed for {symbol}: {e}") async def _handle_position_changed(self, symbol: str, previous: Dict, current: Dict): """Handle position size, direction, or leverage changes""" try: prev_size = previous['size'] curr_size = current['size'] prev_leverage = previous.get('current_leverage', 0) curr_leverage = current.get('current_leverage', 0) # Check if position reversed (long to short or vice versa) if (prev_size > 0 and curr_size < 0) or (prev_size < 0 and curr_size > 0): # Position reversed - close old, open new await self._handle_position_closed(symbol, previous) await self._handle_position_opened(symbol, current) return # Check if leverage changed if abs(prev_leverage - curr_leverage) > 0.1: # Threshold to avoid noise logger.info(f"📊 Leverage changed for {symbol}: {prev_leverage:.1f}x → {curr_leverage:.1f}x") # Optional: Send notification for significant leverage changes if abs(prev_leverage - curr_leverage) >= 1.0: # Only notify for changes >= 1x side = "Long" if curr_size > 0 else "Short" change_direction = "Increased" if curr_leverage > prev_leverage else "Decreased" message = ( f"⚖️ Leverage {change_direction}\n" f"Token: {symbol}\n" f"Side: {side}\n" f"Leverage: {prev_leverage:.1f}x → {curr_leverage:.1f}x\n\n" f"💡 Use /positions to see current positions" ) await self.notification_manager.send_generic_notification(message) # Check if position size changed significantly size_change = abs(curr_size) - abs(prev_size) # Get current market price for more accurate value calculation try: full_symbol = f"{symbol}/USDC:USDC" market_data = self.hl_client.get_market_data(full_symbol) current_market_price = float(market_data.get('ticker', {}).get('last', current['entry_px'])) if market_data else current['entry_px'] except Exception: current_market_price = current['entry_px'] # Fallback to entry price # Calculate change value using current market price change_value = abs(size_change) * current_market_price # Get formatter to determine token category and appropriate thresholds try: from src.utils.token_display_formatter import get_formatter formatter = get_formatter() # Use the existing token classification system to determine threshold price_decimals = await formatter.get_token_price_decimal_places(symbol) amount_decimals = await formatter.get_token_amount_decimal_places(symbol) # Determine quantity threshold based on token characteristics # Higher precision tokens (like BTC, ETH) need smaller quantity thresholds if price_decimals <= 2: # Major tokens like BTC, ETH (high value) quantity_threshold = 0.0001 elif price_decimals <= 4: # Mid-tier tokens quantity_threshold = 0.001 else: # Lower-value tokens (meme coins, etc.) quantity_threshold = 0.01 # Also set minimum value threshold based on token category min_value_threshold = 1.0 # Minimum $1 change for any token except Exception as e: logger.debug(f"Could not get token formatting info for {symbol}, using defaults: {e}") quantity_threshold = 0.001 min_value_threshold = 1.0 price_decimals = 4 # Default for fallback logging # Trigger notification if either: # 1. Quantity change exceeds token-specific threshold, OR # 2. Value change exceeds minimum value threshold should_notify = (abs(size_change) > quantity_threshold or change_value > min_value_threshold) if should_notify: change_type = "Increased" if size_change > 0 else "Decreased" side = "Long" if curr_size > 0 else "Short" # Use formatter for consistent display try: formatted_new_size = await formatter.format_amount(abs(curr_size), symbol) formatted_change = await formatter.format_amount(abs(size_change), symbol) formatted_value_change = await formatter.format_price_with_symbol(change_value) formatted_current_price = await formatter.format_price_with_symbol(current_market_price, symbol) except Exception: # Fallback formatting formatted_new_size = f"{abs(curr_size):.4f}" formatted_change = f"{abs(size_change):.4f}" formatted_value_change = f"${change_value:.2f}" formatted_current_price = f"${current_market_price:.4f}" message = ( f"🔄 Position {change_type}\n" f"Token: {symbol}\n" f"Side: {side}\n" f"New Size: {formatted_new_size}\n" f"Change: {'+' if size_change > 0 else ''}{formatted_change}\n" f"Value Change: {formatted_value_change}\n" f"Current Price: {formatted_current_price}\n\n" f"💡 Use /positions to see current positions" ) await self.notification_manager.send_generic_notification(message) logger.info(f"Position changed: {symbol} {change_type} by {size_change:.6f} (${change_value:.2f}) " f"threshold: {quantity_threshold} qty or ${min_value_threshold} value") else: # Log when changes don't meet threshold (debug level to avoid spam) logger.debug(f"Position size changed for {symbol} but below notification threshold: " f"{size_change:.6f} quantity (${change_value:.2f} value), " f"thresholds: {quantity_threshold} qty or ${min_value_threshold} value " f"(price_decimals: {price_decimals if 'price_decimals' in locals() else 'unknown'})") except Exception as e: logger.error(f"Error handling position change for {symbol}: {e}") async def _save_position_stats(self, symbol: str, side: str, size: float, entry_price: float, exit_price: float, pnl: float): """Save position statistics to database using existing TradingStats interface""" try: # Lazy load TradingStats to avoid circular imports if self.trading_stats is None: from ..stats.trading_stats import TradingStats self.trading_stats = TradingStats() # Use the existing process_trade_complete_cycle method lifecycle_id = self.trading_stats.process_trade_complete_cycle( symbol=symbol, side=side.lower(), entry_price=entry_price, exit_price=exit_price, amount=size, timestamp=datetime.now(timezone.utc).isoformat() ) logger.info(f"Saved stats for {symbol}: PnL ${pnl:.3f}, lifecycle_id: {lifecycle_id}") except Exception as e: logger.error(f"Error saving position stats for {symbol}: {e}")