#!/usr/bin/env python3 """ Simplified Position Tracker Focuses only on: 1. Detecting position changes (opened/closed/size changed) 2. Sending notifications 3. Managing pending stop losses Reuses existing trades table and managers. """ import logging import asyncio from datetime import datetime, timezone from typing import Optional, Dict, Any, List logger = logging.getLogger(__name__) class SimplePositionTracker: """Simplified position tracking focused on notifications and pending SLs.""" def __init__(self, trading_engine, notification_manager): self.trading_engine = trading_engine self.notification_manager = notification_manager async def check_all_position_changes(self): """Main method - check all positions for changes and send notifications.""" try: stats = self.trading_engine.get_stats() if not stats: logger.warning("TradingStats not available") return # Get current exchange positions exchange_positions = self.trading_engine.get_positions() or [] # Get current DB positions (trades with status='position_opened') db_positions = stats.get_open_positions() # Create lookup maps exchange_map = {pos['symbol']: pos for pos in exchange_positions if abs(float(pos.get('contracts', 0))) > 1e-9} db_map = {pos['symbol']: pos for pos in db_positions} all_symbols = set(exchange_map.keys()) | set(db_map.keys()) for symbol in all_symbols: await self._check_symbol_position_change(symbol, exchange_map.get(symbol), db_map.get(symbol), stats) # Handle pending stop losses await self._handle_pending_stop_losses(stats) # Handle orphaned pending trades (orders cancelled before filling) await self._handle_orphaned_pending_trades(stats) except Exception as e: logger.error(f"❌ Error checking position changes: {e}") async def _check_symbol_position_change(self, symbol: str, exchange_pos: Optional[Dict], db_pos: Optional[Dict], stats) -> None: """Check position changes for a single symbol.""" try: current_time = datetime.now(timezone.utc) # Case 1: New position (exchange has, DB doesn't) if exchange_pos and not db_pos: await self._handle_position_opened(symbol, exchange_pos, stats, current_time) # Case 2: Position closed (DB has, exchange doesn't) elif db_pos and not exchange_pos: await self._handle_position_closed(symbol, db_pos, stats, current_time) # Case 3: Position size changed (both exist, different sizes) elif exchange_pos and db_pos: await self._handle_position_size_change(symbol, exchange_pos, db_pos, stats, current_time) # Case 4: Both None - no action needed except Exception as e: logger.error(f"❌ Error checking position change for {symbol}: {e}") async def _handle_position_opened(self, symbol: str, exchange_pos: Dict, stats, timestamp: datetime): """Handle new position detection.""" try: contracts = float(exchange_pos.get('contracts', 0)) size = abs(contracts) # Use CCXT's side field first (more reliable), fallback to contract sign ccxt_side = exchange_pos.get('side', '').lower() if ccxt_side == 'long': side, order_side = 'long', 'buy' elif ccxt_side == 'short': side, order_side = 'short', 'sell' else: # Fallback to contract sign (less reliable but better than nothing) side = 'long' if contracts > 0 else 'short' order_side = 'buy' if side == 'long' else 'sell' logger.warning(f"⚠️ Using contract sign fallback for {symbol}: side={side}, ccxt_side='{ccxt_side}'") # Get entry price from exchange entry_price = float(exchange_pos.get('entryPrice', 0)) or float(exchange_pos.get('entryPx', 0)) if not entry_price: entry_price = float(exchange_pos.get('markPrice', 0)) or float(exchange_pos.get('markPx', 0)) if not entry_price: logger.error(f"❌ Cannot determine entry price for {symbol}") return # Create trade lifecycle using existing manager lifecycle_id = stats.create_trade_lifecycle( symbol=symbol, side=order_side, entry_order_id=f"external_position_{timestamp.strftime('%Y%m%d_%H%M%S')}", trade_type='external_detected' ) if lifecycle_id: # Update to position_opened using existing manager success = stats.update_trade_position_opened( lifecycle_id=lifecycle_id, entry_price=entry_price, entry_amount=size, exchange_fill_id=f"position_detected_{timestamp.isoformat()}" ) if success: logger.info(f"🚀 NEW POSITION: {symbol} {side.upper()} {size} @ {entry_price}") # Send notification await self._send_position_notification('opened', symbol, { 'side': side, 'size': size, 'price': entry_price, 'timestamp': timestamp }) except Exception as e: logger.error(f"❌ Error handling position opened for {symbol}: {e}") async def _handle_position_closed(self, symbol: str, db_pos: Dict, stats, timestamp: datetime): """Handle position closure detection.""" try: lifecycle_id = db_pos['trade_lifecycle_id'] entry_price = db_pos.get('entry_price', 0) position_side = db_pos.get('position_side') size = db_pos.get('current_position_size', 0) # Estimate exit price (could be improved with recent fills) market_data = self.trading_engine.get_market_data(symbol) exit_price = entry_price # Fallback if market_data and market_data.get('ticker'): exit_price = float(market_data['ticker'].get('last', exit_price)) # Calculate realized PnL realized_pnl = 0 if position_side == 'long': realized_pnl = size * (exit_price - entry_price) elif position_side == 'short': realized_pnl = size * (entry_price - exit_price) # Update to position_closed using existing manager success = stats.update_trade_position_closed( lifecycle_id=lifecycle_id, exit_price=exit_price, realized_pnl=realized_pnl, exchange_fill_id=f"position_closed_detected_{timestamp.isoformat()}" ) if success: logger.info(f"🎯 POSITION CLOSED: {symbol} {position_side.upper()} PnL: {realized_pnl:.2f}") # Send notification await self._send_position_notification('closed', symbol, { 'side': position_side, 'size': size, 'entry_price': entry_price, 'exit_price': exit_price, 'realized_pnl': realized_pnl, 'timestamp': timestamp }) # Clear any pending stop losses for this symbol stats.order_manager.cancel_pending_stop_losses_by_symbol(symbol, 'cancelled_position_closed') # Migrate trade to aggregated stats and clean up stats.migrate_trade_to_aggregated_stats(lifecycle_id) except Exception as e: logger.error(f"❌ Error handling position closed for {symbol}: {e}") async def _handle_position_size_change(self, symbol: str, exchange_pos: Dict, db_pos: Dict, stats, timestamp: datetime): """Handle position size changes and position flips.""" try: exchange_size = abs(float(exchange_pos.get('contracts', 0))) db_size = db_pos.get('current_position_size', 0) db_position_side = db_pos.get('position_side') # Determine current exchange position side ccxt_side = exchange_pos.get('side', '').lower() if ccxt_side == 'long': exchange_position_side = 'long' elif ccxt_side == 'short': exchange_position_side = 'short' else: # Fallback to contract sign contracts = float(exchange_pos.get('contracts', 0)) exchange_position_side = 'long' if contracts > 0 else 'short' logger.warning(f"⚠️ Using contract sign fallback for side detection: {symbol}") # Check for POSITION FLIP (LONG ↔ SHORT) if db_position_side != exchange_position_side: logger.info(f"🔄 POSITION FLIP DETECTED: {symbol} {db_position_side.upper()} → {exchange_position_side.upper()}") # Handle as: close old position + open new position await self._handle_position_closed(symbol, db_pos, stats, timestamp) await self._handle_position_opened(symbol, exchange_pos, stats, timestamp) return # Check if size actually changed (with small tolerance) if abs(exchange_size - db_size) < 1e-6: return # No meaningful change lifecycle_id = db_pos['trade_lifecycle_id'] entry_price = db_pos.get('entry_price', 0) # Update position size using existing manager success = stats.trade_manager.update_trade_market_data( lifecycle_id, current_position_size=exchange_size ) if success: change_type = 'increased' if exchange_size > db_size else 'decreased' size_diff = abs(exchange_size - db_size) logger.info(f"📊 POSITION {change_type.upper()}: {symbol} {db_size} → {exchange_size}") # Send notification await self._send_position_notification(change_type, symbol, { 'side': db_position_side, 'old_size': db_size, 'new_size': exchange_size, 'size_diff': size_diff, 'timestamp': timestamp }) except Exception as e: logger.error(f"❌ Error handling position size change for {symbol}: {e}") async def _handle_pending_stop_losses(self, stats): """Handle pending stop losses - place orders for positions that need them.""" try: # Get positions with pending SLs using existing manager pending_sl_trades = stats.get_pending_stop_loss_activations() for trade in pending_sl_trades: symbol = trade['symbol'] stop_price = trade['stop_loss_price'] position_side = trade['position_side'] lifecycle_id = trade['trade_lifecycle_id'] try: # Check if position still exists on exchange exchange_positions = self.trading_engine.get_positions() or [] position_exists = any( pos['symbol'] == symbol and abs(float(pos.get('contracts', 0))) > 1e-9 for pos in exchange_positions ) if position_exists: # Place stop loss order sl_side = 'sell' if position_side == 'long' else 'buy' token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0] result = await self.trading_engine.execute_stop_loss_order( token=token, stop_price=stop_price ) if result and result.get('success'): exchange_order_id = result.get('order_placed_details', {}).get('exchange_order_id') if exchange_order_id: # The execute_stop_loss_order already links the SL to the trade logger.info(f"✅ Placed pending SL: {symbol} @ {stop_price}") else: logger.warning(f"⚠️ SL placed for {symbol} but no exchange_order_id returned") else: # Position doesn't exist, clear pending SL logger.info(f"🗑️ Clearing pending SL for non-existent position: {symbol}") # This will be handled by position closed detection except Exception as e: logger.error(f"❌ Error handling pending SL for {symbol}: {e}") except Exception as e: logger.error(f"❌ Error handling pending stop losses: {e}") async def _handle_orphaned_pending_trades(self, stats): """Handle trades stuck in 'pending' status due to cancelled orders.""" try: # Get all pending trades pending_trades = stats.get_trades_by_status('pending') if not pending_trades: return logger.debug(f"🔍 Checking {len(pending_trades)} pending trades for orphaned status") # Get current exchange orders for comparison exchange_orders = self.trading_engine.get_orders() or [] exchange_order_ids = {order.get('id') for order in exchange_orders if order.get('id')} # Get current exchange positions exchange_positions = self.trading_engine.get_positions() or [] exchange_position_symbols = { pos.get('symbol') for pos in exchange_positions if pos.get('symbol') and abs(float(pos.get('contracts', 0))) > 1e-9 } for trade in pending_trades: try: lifecycle_id = trade['trade_lifecycle_id'] symbol = trade['symbol'] entry_order_id = trade.get('entry_order_id') # Check if this trade should be cancelled should_cancel = False cancel_reason = "" # Case 1: Entry order ID exists but order is no longer on exchange if entry_order_id and entry_order_id not in exchange_order_ids: # Check if a position was opened (even if order disappeared) if symbol not in exchange_position_symbols: should_cancel = True cancel_reason = "entry_order_cancelled_no_position" logger.debug(f"🗑️ Pending trade {lifecycle_id[:8]} for {symbol}: entry order {entry_order_id} no longer exists and no position opened") # Case 2: No entry order ID but no position exists (shouldn't happen but safety check) elif not entry_order_id and symbol not in exchange_position_symbols: should_cancel = True cancel_reason = "no_entry_order_no_position" logger.debug(f"🗑️ Pending trade {lifecycle_id[:8]} for {symbol}: no entry order ID and no position") # Case 3: Check if trade is very old (safety net for other edge cases) else: from datetime import datetime, timezone, timedelta created_at_str = trade.get('timestamp') if created_at_str: try: created_at = datetime.fromisoformat(created_at_str.replace('Z', '+00:00')) if datetime.now(timezone.utc) - created_at > timedelta(hours=1): # Very old pending trade, likely orphaned if symbol not in exchange_position_symbols: should_cancel = True cancel_reason = "old_pending_trade_no_position" logger.debug(f"🗑️ Pending trade {lifecycle_id[:8]} for {symbol}: very old ({created_at}) with no position") except (ValueError, TypeError) as e: logger.warning(f"Could not parse timestamp for pending trade {lifecycle_id}: {e}") # Cancel the orphaned trade if should_cancel: success = stats.update_trade_cancelled(lifecycle_id, reason=cancel_reason) if success: logger.info(f"🗑️ Cancelled orphaned pending trade: {symbol} (Lifecycle: {lifecycle_id[:8]}) - {cancel_reason}") # Send a notification about the cancelled trade await self._send_trade_cancelled_notification(symbol, cancel_reason, trade) # Migrate cancelled trade to aggregated stats stats.migrate_trade_to_aggregated_stats(lifecycle_id) else: logger.error(f"❌ Failed to cancel orphaned pending trade: {lifecycle_id}") except Exception as e: logger.error(f"❌ Error processing pending trade {trade.get('trade_lifecycle_id', 'unknown')}: {e}") except Exception as e: logger.error(f"❌ Error handling orphaned pending trades: {e}") async def _send_trade_cancelled_notification(self, symbol: str, cancel_reason: str, trade: Dict[str, Any]): """Send notification for cancelled trade.""" try: if not self.notification_manager: return token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0] lifecycle_id = trade['trade_lifecycle_id'] # Create user-friendly reason reason_map = { 'entry_order_cancelled_no_position': 'Entry order was cancelled before filling', 'no_entry_order_no_position': 'No entry order and no position opened', 'old_pending_trade_no_position': 'Trade was pending too long without execution' } user_reason = reason_map.get(cancel_reason, cancel_reason) message = f"""❌ Trade Cancelled 📊 Details: • Token: {token} • Trade ID: {lifecycle_id[:8]}... • Reason: {user_reason} ℹ️ Status: Trade was automatically cancelled due to order issues 📱 Use /positions to view current positions""" await self.notification_manager.send_generic_notification(message.strip()) logger.debug(f"📨 Sent cancellation notification for {symbol}") except Exception as e: logger.error(f"❌ Error sending cancellation notification for {symbol}: {e}") async def _send_position_notification(self, change_type: str, symbol: str, details: Dict[str, Any]): """Send position change notification.""" try: if not self.notification_manager: return token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0] timestamp = details.get('timestamp', datetime.now(timezone.utc)) time_str = timestamp.strftime('%H:%M:%S') from src.utils.token_display_formatter import get_formatter formatter = get_formatter() if change_type == 'opened': side = details['side'].upper() size = details['size'] price = details['price'] message = f"""🚀 Position Opened 📊 Details: • Token: {token} • Direction: {side} • Size: {formatter.format_amount(size, token)} • Entry Price: {formatter.format_price_with_symbol(price, token)} • Value: {formatter.format_price_with_symbol(size * price)} ⏰ Time: {time_str} 📱 Use /positions to view all positions""" elif change_type == 'closed': side = details['side'].upper() size = details['size'] entry_price = details['entry_price'] exit_price = details['exit_price'] pnl = details['realized_pnl'] pnl_emoji = "🟢" if pnl >= 0 else "🔴" message = f"""🎯 Position Closed 📊 Details: • Token: {token} • Direction: {side} • Size: {formatter.format_amount(size, token)} • Entry: {formatter.format_price_with_symbol(entry_price, token)} • Exit: {formatter.format_price_with_symbol(exit_price, token)} {pnl_emoji} P&L: {formatter.format_price_with_symbol(pnl)} ⏰ Time: {time_str} 📊 Use /stats to view performance""" elif change_type in ['increased', 'decreased']: side = details['side'].upper() old_size = details['old_size'] new_size = details['new_size'] size_diff = details['size_diff'] emoji = "📈" if change_type == 'increased' else "📉" message = f"""{emoji} Position {change_type.title()} 📊 Details: • Token: {token} • Direction: {side} • Previous Size: {formatter.format_amount(old_size, token)} • New Size: {formatter.format_amount(new_size, token)} • Change: {formatter.format_amount(size_diff, token)} ⏰ Time: {time_str} 📈 Use /positions to view current status""" else: return await self.notification_manager.send_generic_notification(message.strip()) logger.debug(f"📨 Sent {change_type} notification for {symbol}") except Exception as e: logger.error(f"❌ Error sending notification for {symbol}: {e}")