123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594 |
- import asyncio
- import logging
- import uuid
- from typing import Dict, List, Optional, Any
- from datetime import datetime, timezone
- from ..clients.hyperliquid_client import HyperliquidClient
- from ..notifications.notification_manager import NotificationManager
- from ..config.config import Config
- logger = logging.getLogger(__name__)
- class PositionTracker:
- """
- Simplified position tracker that mirrors exchange state.
- Monitors for position changes and saves stats when positions close.
- """
-
- def __init__(self, hl_client: HyperliquidClient, notification_manager: NotificationManager):
- self.hl_client = hl_client
- self.notification_manager = notification_manager
- self.trading_stats = None # Will be lazy loaded
-
- # Track current positions
- self.current_positions: Dict[str, Dict] = {}
- self.is_running = False
-
- async def start(self):
- """Start position tracking"""
- if self.is_running:
- return
-
- self.is_running = True
- logger.info("🔄 Starting position tracker")
-
- try:
- # Initialize current positions
- logger.info("📊 Initializing current positions...")
- await self._update_current_positions()
- logger.info(f"✅ Position tracker initialized with {len(self.current_positions)} open positions")
-
- # Start monitoring loop
- logger.info("🔄 Starting position monitoring loop...")
- asyncio.create_task(self._monitoring_loop())
- logger.info("✅ Position tracker started successfully")
-
- except Exception as e:
- logger.error(f"❌ Error starting position tracker: {e}", exc_info=True)
- self.is_running = False
- raise
-
- async def stop(self):
- """Stop position tracking"""
- self.is_running = False
- logger.info("Stopping position tracker")
-
- async def _monitoring_loop(self):
- """Main monitoring loop"""
- logger.info(f"🔄 Position tracker monitoring loop started (heartbeat: {Config.BOT_HEARTBEAT_SECONDS}s)")
- loop_count = 0
-
- while self.is_running:
- try:
- loop_count += 1
- logger.debug(f"📊 Position tracker loop #{loop_count} - checking for position changes...")
- await self._check_position_changes()
-
- # Log periodically to show it's alive
- if loop_count % 12 == 0: # Every 12 loops (60 seconds with 5s heartbeat)
- logger.info(f"📊 Position tracker alive - loop #{loop_count}, {len(self.current_positions)} positions tracked")
-
- await asyncio.sleep(Config.BOT_HEARTBEAT_SECONDS) # Use config heartbeat
- except Exception as e:
- logger.error(f"❌ Error in position tracking loop #{loop_count}: {e}", exc_info=True)
- await asyncio.sleep(Config.BOT_HEARTBEAT_SECONDS)
-
- logger.info("🛑 Position tracker monitoring loop stopped")
-
- async def _check_position_changes(self):
- """Check for any position changes"""
- try:
- previous_positions = self.current_positions.copy()
- await self._update_current_positions()
-
- # Compare with previous positions (simple exchange state tracking)
- await self._process_position_changes(previous_positions, self.current_positions)
-
- # Simple database sync check (once per cycle)
- await self._sync_database_once()
-
- # Update database with current market data for open positions
- await self._update_database_market_data()
-
- except Exception as e:
- logger.error(f"Error checking position changes: {e}")
-
- async def _sync_database_once(self):
- """Simple bidirectional sync: close database positions that don't exist on exchange,
- and create database records for exchange positions that don't exist in database"""
- try:
- if self.trading_stats is None:
- from ..stats.trading_stats import TradingStats
- self.trading_stats = TradingStats()
-
- open_trades = self.trading_stats.get_open_positions()
-
- # PART 1: Close database positions that don't exist on exchange
- for trade in open_trades:
- symbol = trade.get('symbol', '')
- if not symbol:
- continue
-
- token = symbol.split('/')[0] if '/' in symbol else symbol
-
- # If database position doesn't exist on exchange, close it
- if token not in self.current_positions:
- # Create simulated position object from database data
- entry_price = float(trade.get('entry_price', 0))
- amount = float(trade.get('amount', 0))
- side = trade.get('side', '').lower()
-
- simulated_position = {
- 'size': -amount if side == 'sell' else amount, # sell=short(negative), buy=long(positive)
- 'entry_px': entry_price,
- 'unrealized_pnl': 0, # Will be calculated
- 'margin_used': 0,
- 'max_leverage': 1,
- 'current_leverage': 1,
- 'return_on_equity': 0
- }
-
- # Reuse existing position closed handler - consistent behavior!
- await self._handle_position_closed(token, simulated_position)
-
- # PART 2: Create database records for exchange positions that don't exist in database
- # Get current open trades after potential closures above
- current_open_trades = self.trading_stats.get_open_positions()
- database_tokens = {trade.get('symbol', '').split('/')[0] for trade in current_open_trades if trade.get('symbol')}
-
- for token, position_data in self.current_positions.items():
- if token not in database_tokens:
- logger.info(f"🔄 Found exchange position for {token} with no database record - creating trade record")
-
- # Create new trade record using existing process_trade_complete_cycle method
- # but we'll need to handle this differently since we don't have entry/exit
- # Instead, we'll create a manual position record
-
- full_symbol = f"{token}/USDC:USDC"
- size = abs(position_data['size'])
- side = 'sell' if position_data['size'] < 0 else 'buy' # sell=short, buy=long
- entry_price = position_data['entry_px']
-
- # Create a trade lifecycle record for this existing position
- lifecycle_id = str(uuid.uuid4())
- timestamp = datetime.now(timezone.utc).isoformat()
-
- # Insert into trades table
- query = """
- INSERT INTO trades (
- trade_lifecycle_id, symbol, side, amount, price, value,
- entry_price, current_position_size, position_side, status,
- position_opened_at, timestamp, updated_at, trade_type
- ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- """
-
- position_side = 'short' if side == 'sell' else 'long'
- value = size * entry_price
-
- params = (
- lifecycle_id, full_symbol, side, size, entry_price, value,
- entry_price, size, position_side, 'position_opened',
- timestamp, timestamp, timestamp, 'sync_detected'
- )
-
- self.trading_stats.db_manager._execute_query(query, params)
-
- logger.info(f"✅ Created database record for {token} position: {side} {size} @ ${entry_price}")
-
- except Exception as e:
- logger.error(f"Error syncing database: {e}")
- import traceback
- traceback.print_exc()
-
- async def _update_database_market_data(self):
- """Update database with current market data for open positions"""
- try:
- # Lazy load TradingStats if needed
- if self.trading_stats is None:
- from ..stats.trading_stats import TradingStats
- self.trading_stats = TradingStats()
-
- # Get open trades from database
- open_trades = self.trading_stats.get_open_positions()
-
- for trade in open_trades:
- try:
- symbol = trade.get('symbol', '')
- if not symbol:
- continue
-
- # Extract token from symbol (e.g., "BTC/USDC:USDC" -> "BTC")
- token = symbol.split('/')[0] if '/' in symbol else symbol
-
- # Find corresponding exchange position
- if token in self.current_positions:
- pos_data = self.current_positions[token]
-
- # Convert exchange ROE from decimal to percentage
- roe_percentage = pos_data['return_on_equity'] * 100
-
- # Get current leverage from database to compare
- old_leverage = trade.get('leverage', 0)
- new_leverage = pos_data['current_leverage']
-
- # Get current market price for mark price and position value calculation
- current_mark_price = 0.0
- try:
- market_data = self.hl_client.get_market_data(symbol)
- if market_data and market_data.get('ticker'):
- current_mark_price = float(market_data['ticker'].get('last', 0))
- except Exception as e:
- logger.debug(f"Could not fetch current market price for {symbol}: {e}")
-
- # Fallback to entry price if we can't get current market price
- if current_mark_price <= 0:
- current_mark_price = pos_data['entry_px']
-
- # Calculate position value (size * current price)
- position_size = abs(pos_data['size'])
- position_value = position_size * current_mark_price
-
- # Update database with live market data including position value
- self.trading_stats.update_trade_market_data(
- trade_lifecycle_id=trade['trade_lifecycle_id'],
- current_position_size=position_size,
- unrealized_pnl=pos_data['unrealized_pnl'],
- roe_percentage=roe_percentage,
- mark_price=current_mark_price,
- position_value=position_value,
- margin_used=pos_data['margin_used'],
- leverage=new_leverage # Use current leverage, not max leverage
- )
-
- # Log leverage changes
- if old_leverage and abs(old_leverage - new_leverage) > 0.1:
- logger.info(f"📊 Database updated - Leverage changed for {symbol}: {old_leverage:.1f}x → {new_leverage:.1f}x, "
- f"Position Value: ${position_value:,.2f}")
- else:
- logger.debug(f"Updated market data for {symbol}: leverage={new_leverage:.1f}x, ROE={roe_percentage:.2f}%, "
- f"mark_price=${current_mark_price:.4f}, value=${position_value:,.2f}")
-
- except Exception as e:
- logger.warning(f"Error updating market data for trade {trade.get('trade_lifecycle_id', 'unknown')}: {e}")
- continue
-
- except Exception as e:
- logger.error(f"Error updating database market data: {e}")
-
- async def _update_current_positions(self):
- """Update current positions from exchange"""
- try:
- logger.debug("🔍 Fetching positions from Hyperliquid client...")
- positions = self.hl_client.get_positions()
-
- # Distinguish between API failure (None) and legitimate empty positions ([])
- if positions is None:
- logger.warning("📊 API failure - could not fetch positions from exchange!")
- # Don't clear positions during API failures - keep last known state to avoid false "position opened" notifications
- if not self.current_positions:
- # Only clear if we truly have no tracked positions (e.g., first startup)
- self.current_positions = {}
- else:
- logger.info(f"📊 Keeping last known positions during API failure: {list(self.current_positions.keys())}")
- return
- elif not positions: # Empty list [] - legitimately no positions
- logger.info("📊 No open positions on exchange - clearing position tracker state")
- self.current_positions = {}
- return
-
- logger.info(f"📊 Raw positions data from exchange: {len(positions)} positions")
- # Log first position structure for debugging
- #if positions:
- # logger.info(f"📊 Sample position structure: {positions[0]}")
-
- logger.debug(f"📊 Processing {len(positions)} positions from exchange...")
-
- new_positions = {}
- for i, position in enumerate(positions):
- logger.debug(f"📊 Processing position {i+1}: {position}")
-
- # Access nested position data from info.position
- position_data = position.get('info', {}).get('position', {})
- if not position_data:
- logger.warning(f"📊 Position {i+1} has no info.position data: {position}")
- continue
-
- size = float(position_data.get('szi', '0'))
- logger.debug(f"📊 Position {i+1} size: {size}")
- if size != 0: # Only include open positions
- symbol = position_data.get('coin', '')
- if symbol:
- # Get actual current leverage from leverage object
- leverage_info = position_data.get('leverage', {})
- if isinstance(leverage_info, dict) and 'value' in leverage_info:
- current_leverage = float(leverage_info['value'])
- logger.debug(f"Using current leverage {current_leverage}x for {symbol} (max: {position_data.get('maxLeverage', 'N/A')}x)")
- else:
- current_leverage = float(position_data.get('maxLeverage', '1'))
- logger.debug(f"Fallback to max leverage {current_leverage}x for {symbol} (no current leverage data)")
-
- new_positions[symbol] = {
- 'size': size,
- 'entry_px': float(position_data.get('entryPx', '0')),
- 'unrealized_pnl': float(position_data.get('unrealizedPnl', '0')),
- 'margin_used': float(position_data.get('marginUsed', '0')),
- 'max_leverage': float(position_data.get('maxLeverage', '1')),
- 'current_leverage': current_leverage, # Add current leverage
- 'return_on_equity': float(position_data.get('returnOnEquity', '0'))
- }
-
- # Log position state changes
- had_positions_before = len(self.current_positions) > 0
- getting_positions_now = len(new_positions) > 0
-
- if had_positions_before and not getting_positions_now:
- logger.info("📊 All positions have been closed on exchange")
- elif not had_positions_before and getting_positions_now:
- logger.info(f"📊 New positions detected: {list(new_positions.keys())}")
- elif had_positions_before and getting_positions_now:
- logger.debug(f"✅ Updated current positions: {len(new_positions)} open positions ({list(new_positions.keys())})")
- else:
- logger.debug(f"✅ Confirmed no open positions on exchange")
-
- self.current_positions = new_positions
-
- except Exception as e:
- logger.error(f"❌ Error updating current positions: {e}", exc_info=True)
- # Don't clear positions on exception - keep last known state to avoid false notifications
- logger.info(f"📊 Keeping last known positions during exception: {list(self.current_positions.keys()) if self.current_positions else 'none'}")
-
- async def _process_position_changes(self, previous: Dict, current: Dict):
- """Process changes between previous and current positions"""
-
- # Find new positions
- for symbol in current:
- if symbol not in previous:
- await self._handle_position_opened(symbol, current[symbol])
-
- # Find closed positions
- for symbol in previous:
- if symbol not in current:
- await self._handle_position_closed(symbol, previous[symbol])
-
- # Find changed positions
- for symbol in current:
- if symbol in previous:
- await self._handle_position_changed(symbol, previous[symbol], current[symbol])
-
- async def _handle_position_opened(self, symbol: str, position: Dict):
- """Handle new position opened"""
- try:
- size = position['size']
- side = "Long" if size > 0 else "Short"
-
- message = (
- f"🟢 Position Opened\n"
- f"Token: {symbol}\n"
- f"Side: {side}\n"
- f"Size: {abs(size):.4f}\n"
- f"Entry: ${position['entry_px']:.4f}\n"
- f"Leverage: {position.get('current_leverage', position['max_leverage']):.1f}x\n\n"
- f"💡 Use /positions to see current positions"
- )
-
- await self.notification_manager.send_generic_notification(message)
- logger.info(f"Position opened: {symbol} {side} {abs(size)}")
-
- except Exception as e:
- logger.error(f"Error handling position opened for {symbol}: {e}")
-
- async def _handle_position_closed(self, symbol: str, position: Dict):
- """Handle position closed - find and close the corresponding database trade"""
- try:
- # Lazy load TradingStats if needed
- if self.trading_stats is None:
- from ..stats.trading_stats import TradingStats
- self.trading_stats = TradingStats()
-
- # Construct full symbol format (symbol here is just token name like "BTC")
- full_symbol = f"{symbol}/USDC:USDC"
-
- # Find the open trade in database for this symbol
- open_trade = self.trading_stats.get_trade_by_symbol_and_status(full_symbol, 'position_opened')
-
- if not open_trade:
- logger.warning(f"No open trade found in database for {full_symbol} - position was closed on exchange but no database record")
- return
-
- lifecycle_id = open_trade['trade_lifecycle_id']
- entry_price = position['entry_px']
- size = abs(position['size'])
- side = "Long" if position['size'] > 0 else "Short"
-
- # Get current market price for exit calculation
- market_data = self.hl_client.get_market_data(full_symbol)
- if not market_data:
- logger.error(f"Could not get market data for {full_symbol}")
- return
-
- current_price = float(market_data.get('ticker', {}).get('last', 0))
-
- # Calculate realized PnL
- if side == "Long":
- realized_pnl = (current_price - entry_price) * size
- else:
- realized_pnl = (entry_price - current_price) * size
-
- # Close the trade in database
- success = await self.trading_stats.update_trade_position_closed(
- lifecycle_id=lifecycle_id,
- exit_price=current_price,
- realized_pnl=realized_pnl,
- exchange_fill_id="position_tracker_detected_closure"
- )
-
- if success:
- # Migrate to aggregated stats (token_stats table, etc.)
- self.trading_stats.migrate_trade_to_aggregated_stats(lifecycle_id)
-
- # Send clean notification
- pnl_emoji = "🟢" if realized_pnl >= 0 else "🔴"
- message = (
- f"{pnl_emoji} Position Closed\n"
- f"Token: {symbol}\n"
- f"Side: {side}\n"
- f"Size: {size:.4f}\n"
- f"Entry: ${entry_price:.4f}\n"
- f"Exit: ${current_price:.4f}\n"
- f"PnL: ${realized_pnl:.3f}\n\n"
- f"💡 Use /positions to see current positions"
- )
-
- await self.notification_manager.send_generic_notification(message)
- logger.info(f"Position closed: {symbol} {side} PnL: ${realized_pnl:.3f}")
- else:
- logger.error(f"Failed to close trade {lifecycle_id} for {symbol}")
-
- except Exception as e:
- logger.error(f"Error handling position closed for {symbol}: {e}")
-
- async def _handle_position_changed(self, symbol: str, previous: Dict, current: Dict):
- """Handle position size, direction, or leverage changes"""
- try:
- prev_size = previous['size']
- curr_size = current['size']
- prev_leverage = previous.get('current_leverage', 0)
- curr_leverage = current.get('current_leverage', 0)
-
- # Check if position reversed (long to short or vice versa)
- if (prev_size > 0 and curr_size < 0) or (prev_size < 0 and curr_size > 0):
- # Position reversed - close old, open new
- await self._handle_position_closed(symbol, previous)
- await self._handle_position_opened(symbol, current)
- return
-
- # Check if leverage changed
- if abs(prev_leverage - curr_leverage) > 0.1: # Threshold to avoid noise
- logger.info(f"📊 Leverage changed for {symbol}: {prev_leverage:.1f}x → {curr_leverage:.1f}x")
-
- # Optional: Send notification for significant leverage changes
- if abs(prev_leverage - curr_leverage) >= 1.0: # Only notify for changes >= 1x
- side = "Long" if curr_size > 0 else "Short"
- change_direction = "Increased" if curr_leverage > prev_leverage else "Decreased"
- message = (
- f"⚖️ Leverage {change_direction}\n"
- f"Token: {symbol}\n"
- f"Side: {side}\n"
- f"Leverage: {prev_leverage:.1f}x → {curr_leverage:.1f}x\n\n"
- f"💡 Use /positions to see current positions"
- )
- await self.notification_manager.send_generic_notification(message)
-
- # Check if position size changed significantly
- size_change = abs(curr_size) - abs(prev_size)
-
- # Get current market price for more accurate value calculation
- try:
- full_symbol = f"{symbol}/USDC:USDC"
- market_data = self.hl_client.get_market_data(full_symbol)
- current_market_price = float(market_data.get('ticker', {}).get('last', current['entry_px'])) if market_data else current['entry_px']
- except Exception:
- current_market_price = current['entry_px'] # Fallback to entry price
-
- # Calculate change value using current market price
- change_value = abs(size_change) * current_market_price
-
- # Get formatter to determine token category and appropriate thresholds
- try:
- from src.utils.token_display_formatter import get_formatter
- formatter = get_formatter()
-
- # Use the existing token classification system to determine threshold
- price_decimals = await formatter.get_token_price_decimal_places(symbol)
- amount_decimals = await formatter.get_token_amount_decimal_places(symbol)
-
- # Determine quantity threshold based on token characteristics
- # Higher precision tokens (like BTC, ETH) need smaller quantity thresholds
- if price_decimals <= 2: # Major tokens like BTC, ETH (high value)
- quantity_threshold = 0.0001
- elif price_decimals <= 4: # Mid-tier tokens
- quantity_threshold = 0.001
- else: # Lower-value tokens (meme coins, etc.)
- quantity_threshold = 0.01
-
- # Also set minimum value threshold based on token category
- min_value_threshold = 1.0 # Minimum $1 change for any token
-
- except Exception as e:
- logger.debug(f"Could not get token formatting info for {symbol}, using defaults: {e}")
- quantity_threshold = 0.001
- min_value_threshold = 1.0
- price_decimals = 4 # Default for fallback logging
-
- # Trigger notification if either:
- # 1. Quantity change exceeds token-specific threshold, OR
- # 2. Value change exceeds minimum value threshold
- should_notify = (abs(size_change) > quantity_threshold or
- change_value > min_value_threshold)
-
- if should_notify:
-
- change_type = "Increased" if size_change > 0 else "Decreased"
- side = "Long" if curr_size > 0 else "Short"
-
- # Use formatter for consistent display
- try:
- formatted_new_size = await formatter.format_amount(abs(curr_size), symbol)
- formatted_change = await formatter.format_amount(abs(size_change), symbol)
- formatted_value_change = await formatter.format_price_with_symbol(change_value)
- formatted_current_price = await formatter.format_price_with_symbol(current_market_price, symbol)
- except Exception:
- # Fallback formatting
- formatted_new_size = f"{abs(curr_size):.4f}"
- formatted_change = f"{abs(size_change):.4f}"
- formatted_value_change = f"${change_value:.2f}"
- formatted_current_price = f"${current_market_price:.4f}"
-
- message = (
- f"🔄 Position {change_type}\n"
- f"Token: {symbol}\n"
- f"Side: {side}\n"
- f"New Size: {formatted_new_size}\n"
- f"Change: {'+' if size_change > 0 else ''}{formatted_change}\n"
- f"Value Change: {formatted_value_change}\n"
- f"Current Price: {formatted_current_price}\n\n"
- f"💡 Use /positions to see current positions"
- )
-
- await self.notification_manager.send_generic_notification(message)
- logger.info(f"Position changed: {symbol} {change_type} by {size_change:.6f} (${change_value:.2f}) "
- f"threshold: {quantity_threshold} qty or ${min_value_threshold} value")
- else:
- # Log when changes don't meet threshold (debug level to avoid spam)
- logger.debug(f"Position size changed for {symbol} but below notification threshold: "
- f"{size_change:.6f} quantity (${change_value:.2f} value), "
- f"thresholds: {quantity_threshold} qty or ${min_value_threshold} value "
- f"(price_decimals: {price_decimals if 'price_decimals' in locals() else 'unknown'})")
-
- except Exception as e:
- logger.error(f"Error handling position change for {symbol}: {e}")
-
- async def _save_position_stats(self, symbol: str, side: str, size: float,
- entry_price: float, exit_price: float, pnl: float):
- """Save position statistics to database using existing TradingStats interface"""
- try:
- # Lazy load TradingStats to avoid circular imports
- if self.trading_stats is None:
- from ..stats.trading_stats import TradingStats
- self.trading_stats = TradingStats()
-
- # Use the existing process_trade_complete_cycle method
- lifecycle_id = self.trading_stats.process_trade_complete_cycle(
- symbol=symbol,
- side=side.lower(),
- entry_price=entry_price,
- exit_price=exit_price,
- amount=size,
- timestamp=datetime.now(timezone.utc).isoformat()
- )
-
- logger.info(f"Saved stats for {symbol}: PnL ${pnl:.3f}, lifecycle_id: {lifecycle_id}")
-
- except Exception as e:
- logger.error(f"Error saving position stats for {symbol}: {e}")
|