|
@@ -0,0 +1,453 @@
|
|
|
+#!/usr/bin/env python3
|
|
|
+"""
|
|
|
+Simplified Position Tracker
|
|
|
+
|
|
|
+Focuses only on:
|
|
|
+1. Detecting position changes (opened/closed/size changed)
|
|
|
+2. Sending notifications
|
|
|
+3. Managing pending stop losses
|
|
|
+
|
|
|
+Reuses existing trades table and managers.
|
|
|
+"""
|
|
|
+
|
|
|
+import logging
|
|
|
+import asyncio
|
|
|
+from datetime import datetime, timezone
|
|
|
+from typing import Optional, Dict, Any, List
|
|
|
+
|
|
|
+logger = logging.getLogger(__name__)
|
|
|
+
|
|
|
+class SimplePositionTracker:
|
|
|
+ """Simplified position tracking focused on notifications and pending SLs."""
|
|
|
+
|
|
|
+ def __init__(self, trading_engine, notification_manager):
|
|
|
+ self.trading_engine = trading_engine
|
|
|
+ self.notification_manager = notification_manager
|
|
|
+
|
|
|
+ async def check_all_position_changes(self):
|
|
|
+ """Main method - check all positions for changes and send notifications."""
|
|
|
+ try:
|
|
|
+ stats = self.trading_engine.get_stats()
|
|
|
+ if not stats:
|
|
|
+ logger.warning("TradingStats not available")
|
|
|
+ return
|
|
|
+
|
|
|
+ # Get current exchange positions
|
|
|
+ exchange_positions = self.trading_engine.get_positions() or []
|
|
|
+
|
|
|
+ # Get current DB positions (trades with status='position_opened')
|
|
|
+ db_positions = stats.get_open_positions()
|
|
|
+
|
|
|
+ # Create lookup maps
|
|
|
+ exchange_map = {pos['symbol']: pos for pos in exchange_positions if abs(float(pos.get('contracts', 0))) > 1e-9}
|
|
|
+ db_map = {pos['symbol']: pos for pos in db_positions}
|
|
|
+
|
|
|
+ all_symbols = set(exchange_map.keys()) | set(db_map.keys())
|
|
|
+
|
|
|
+ for symbol in all_symbols:
|
|
|
+ await self._check_symbol_position_change(symbol, exchange_map.get(symbol), db_map.get(symbol), stats)
|
|
|
+
|
|
|
+ # Handle pending stop losses
|
|
|
+ await self._handle_pending_stop_losses(stats)
|
|
|
+
|
|
|
+ # Handle orphaned pending trades (orders cancelled before filling)
|
|
|
+ await self._handle_orphaned_pending_trades(stats)
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ Error checking position changes: {e}")
|
|
|
+
|
|
|
+ async def _check_symbol_position_change(self, symbol: str, exchange_pos: Optional[Dict],
|
|
|
+ db_pos: Optional[Dict], stats) -> None:
|
|
|
+ """Check position changes for a single symbol."""
|
|
|
+ try:
|
|
|
+ current_time = datetime.now(timezone.utc)
|
|
|
+
|
|
|
+ # Case 1: New position (exchange has, DB doesn't)
|
|
|
+ if exchange_pos and not db_pos:
|
|
|
+ await self._handle_position_opened(symbol, exchange_pos, stats, current_time)
|
|
|
+
|
|
|
+ # Case 2: Position closed (DB has, exchange doesn't)
|
|
|
+ elif db_pos and not exchange_pos:
|
|
|
+ await self._handle_position_closed(symbol, db_pos, stats, current_time)
|
|
|
+
|
|
|
+ # Case 3: Position size changed (both exist, different sizes)
|
|
|
+ elif exchange_pos and db_pos:
|
|
|
+ await self._handle_position_size_change(symbol, exchange_pos, db_pos, stats, current_time)
|
|
|
+
|
|
|
+ # Case 4: Both None - no action needed
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ Error checking position change for {symbol}: {e}")
|
|
|
+
|
|
|
+ async def _handle_position_opened(self, symbol: str, exchange_pos: Dict, stats, timestamp: datetime):
|
|
|
+ """Handle new position detection."""
|
|
|
+ try:
|
|
|
+ contracts = float(exchange_pos.get('contracts', 0))
|
|
|
+ size = abs(contracts)
|
|
|
+ side = 'long' if contracts > 0 else 'short'
|
|
|
+ order_side = 'buy' if side == 'long' else 'sell'
|
|
|
+
|
|
|
+ # Get entry price from exchange
|
|
|
+ entry_price = float(exchange_pos.get('entryPrice', 0)) or float(exchange_pos.get('entryPx', 0))
|
|
|
+ if not entry_price:
|
|
|
+ entry_price = float(exchange_pos.get('markPrice', 0)) or float(exchange_pos.get('markPx', 0))
|
|
|
+
|
|
|
+ if not entry_price:
|
|
|
+ logger.error(f"❌ Cannot determine entry price for {symbol}")
|
|
|
+ return
|
|
|
+
|
|
|
+ # Create trade lifecycle using existing manager
|
|
|
+ lifecycle_id = stats.create_trade_lifecycle(
|
|
|
+ symbol=symbol,
|
|
|
+ side=order_side,
|
|
|
+ entry_order_id=f"external_position_{timestamp.strftime('%Y%m%d_%H%M%S')}",
|
|
|
+ trade_type='external_detected'
|
|
|
+ )
|
|
|
+
|
|
|
+ if lifecycle_id:
|
|
|
+ # Update to position_opened using existing manager
|
|
|
+ success = stats.update_trade_position_opened(
|
|
|
+ lifecycle_id=lifecycle_id,
|
|
|
+ entry_price=entry_price,
|
|
|
+ entry_amount=size,
|
|
|
+ exchange_fill_id=f"position_detected_{timestamp.isoformat()}"
|
|
|
+ )
|
|
|
+
|
|
|
+ if success:
|
|
|
+ logger.info(f"🚀 NEW POSITION: {symbol} {side.upper()} {size} @ {entry_price}")
|
|
|
+
|
|
|
+ # Send notification
|
|
|
+ await self._send_position_notification('opened', symbol, {
|
|
|
+ 'side': side,
|
|
|
+ 'size': size,
|
|
|
+ 'price': entry_price,
|
|
|
+ 'timestamp': timestamp
|
|
|
+ })
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ Error handling position opened for {symbol}: {e}")
|
|
|
+
|
|
|
+ async def _handle_position_closed(self, symbol: str, db_pos: Dict, stats, timestamp: datetime):
|
|
|
+ """Handle position closure detection."""
|
|
|
+ try:
|
|
|
+ lifecycle_id = db_pos['trade_lifecycle_id']
|
|
|
+ entry_price = db_pos.get('entry_price', 0)
|
|
|
+ position_side = db_pos.get('position_side')
|
|
|
+ size = db_pos.get('current_position_size', 0)
|
|
|
+
|
|
|
+ # Estimate exit price (could be improved with recent fills)
|
|
|
+ market_data = self.trading_engine.get_market_data(symbol)
|
|
|
+ exit_price = entry_price # Fallback
|
|
|
+ if market_data and market_data.get('ticker'):
|
|
|
+ exit_price = float(market_data['ticker'].get('last', exit_price))
|
|
|
+
|
|
|
+ # Calculate realized PnL
|
|
|
+ realized_pnl = 0
|
|
|
+ if position_side == 'long':
|
|
|
+ realized_pnl = size * (exit_price - entry_price)
|
|
|
+ elif position_side == 'short':
|
|
|
+ realized_pnl = size * (entry_price - exit_price)
|
|
|
+
|
|
|
+ # Update to position_closed using existing manager
|
|
|
+ success = stats.update_trade_position_closed(
|
|
|
+ lifecycle_id=lifecycle_id,
|
|
|
+ exit_price=exit_price,
|
|
|
+ realized_pnl=realized_pnl,
|
|
|
+ exchange_fill_id=f"position_closed_detected_{timestamp.isoformat()}"
|
|
|
+ )
|
|
|
+
|
|
|
+ if success:
|
|
|
+ logger.info(f"🎯 POSITION CLOSED: {symbol} {position_side.upper()} PnL: {realized_pnl:.2f}")
|
|
|
+
|
|
|
+ # Send notification
|
|
|
+ await self._send_position_notification('closed', symbol, {
|
|
|
+ 'side': position_side,
|
|
|
+ 'size': size,
|
|
|
+ 'entry_price': entry_price,
|
|
|
+ 'exit_price': exit_price,
|
|
|
+ 'realized_pnl': realized_pnl,
|
|
|
+ 'timestamp': timestamp
|
|
|
+ })
|
|
|
+
|
|
|
+ # Clear any pending stop losses for this symbol
|
|
|
+ stats.order_manager.cancel_pending_stop_losses_by_symbol(symbol, 'cancelled_position_closed')
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ Error handling position closed for {symbol}: {e}")
|
|
|
+
|
|
|
+ async def _handle_position_size_change(self, symbol: str, exchange_pos: Dict,
|
|
|
+ db_pos: Dict, stats, timestamp: datetime):
|
|
|
+ """Handle position size changes."""
|
|
|
+ try:
|
|
|
+ exchange_size = abs(float(exchange_pos.get('contracts', 0)))
|
|
|
+ db_size = db_pos.get('current_position_size', 0)
|
|
|
+
|
|
|
+ # Check if size actually changed (with small tolerance)
|
|
|
+ if abs(exchange_size - db_size) < 1e-6:
|
|
|
+ return # No meaningful change
|
|
|
+
|
|
|
+ lifecycle_id = db_pos['trade_lifecycle_id']
|
|
|
+ position_side = db_pos.get('position_side')
|
|
|
+ entry_price = db_pos.get('entry_price', 0)
|
|
|
+
|
|
|
+ # Update position size using existing manager
|
|
|
+ success = stats.trade_manager.update_trade_market_data(
|
|
|
+ lifecycle_id, current_position_size=exchange_size
|
|
|
+ )
|
|
|
+
|
|
|
+ if success:
|
|
|
+ change_type = 'increased' if exchange_size > db_size else 'decreased'
|
|
|
+ size_diff = abs(exchange_size - db_size)
|
|
|
+
|
|
|
+ logger.info(f"📊 POSITION {change_type.upper()}: {symbol} {db_size} → {exchange_size}")
|
|
|
+
|
|
|
+ # Send notification
|
|
|
+ await self._send_position_notification(change_type, symbol, {
|
|
|
+ 'side': position_side,
|
|
|
+ 'old_size': db_size,
|
|
|
+ 'new_size': exchange_size,
|
|
|
+ 'size_diff': size_diff,
|
|
|
+ 'timestamp': timestamp
|
|
|
+ })
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ Error handling position size change for {symbol}: {e}")
|
|
|
+
|
|
|
+ async def _handle_pending_stop_losses(self, stats):
|
|
|
+ """Handle pending stop losses - place orders for positions that need them."""
|
|
|
+ try:
|
|
|
+ # Get positions with pending SLs using existing manager
|
|
|
+ pending_sl_trades = stats.get_pending_stop_loss_activations()
|
|
|
+
|
|
|
+ for trade in pending_sl_trades:
|
|
|
+ symbol = trade['symbol']
|
|
|
+ stop_price = trade['stop_loss_price']
|
|
|
+ position_side = trade['position_side']
|
|
|
+ lifecycle_id = trade['trade_lifecycle_id']
|
|
|
+
|
|
|
+ try:
|
|
|
+ # Check if position still exists on exchange
|
|
|
+ exchange_positions = self.trading_engine.get_positions() or []
|
|
|
+ position_exists = any(
|
|
|
+ pos['symbol'] == symbol and abs(float(pos.get('contracts', 0))) > 1e-9
|
|
|
+ for pos in exchange_positions
|
|
|
+ )
|
|
|
+
|
|
|
+ if position_exists:
|
|
|
+ # Place stop loss order
|
|
|
+ sl_side = 'sell' if position_side == 'long' else 'buy'
|
|
|
+
|
|
|
+ token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0]
|
|
|
+ result = await self.trading_engine.execute_stop_loss_order(
|
|
|
+ token=token,
|
|
|
+ stop_price=stop_price
|
|
|
+ )
|
|
|
+
|
|
|
+ if result and result.get('success'):
|
|
|
+ exchange_order_id = result.get('order_placed_details', {}).get('exchange_order_id')
|
|
|
+ if exchange_order_id:
|
|
|
+ # The execute_stop_loss_order already links the SL to the trade
|
|
|
+ logger.info(f"✅ Placed pending SL: {symbol} @ {stop_price}")
|
|
|
+ else:
|
|
|
+ logger.warning(f"⚠️ SL placed for {symbol} but no exchange_order_id returned")
|
|
|
+ else:
|
|
|
+ # Position doesn't exist, clear pending SL
|
|
|
+ logger.info(f"🗑️ Clearing pending SL for non-existent position: {symbol}")
|
|
|
+ # This will be handled by position closed detection
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ Error handling pending SL for {symbol}: {e}")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ Error handling pending stop losses: {e}")
|
|
|
+
|
|
|
+ async def _handle_orphaned_pending_trades(self, stats):
|
|
|
+ """Handle trades stuck in 'pending' status due to cancelled orders."""
|
|
|
+ try:
|
|
|
+ # Get all pending trades
|
|
|
+ pending_trades = stats.get_trades_by_status('pending')
|
|
|
+
|
|
|
+ if not pending_trades:
|
|
|
+ return
|
|
|
+
|
|
|
+ logger.debug(f"🔍 Checking {len(pending_trades)} pending trades for orphaned status")
|
|
|
+
|
|
|
+ # Get current exchange orders for comparison
|
|
|
+ exchange_orders = self.trading_engine.get_orders() or []
|
|
|
+ exchange_order_ids = {order.get('id') for order in exchange_orders if order.get('id')}
|
|
|
+
|
|
|
+ # Get current exchange positions
|
|
|
+ exchange_positions = self.trading_engine.get_positions() or []
|
|
|
+ exchange_position_symbols = {
|
|
|
+ pos.get('symbol') for pos in exchange_positions
|
|
|
+ if pos.get('symbol') and abs(float(pos.get('contracts', 0))) > 1e-9
|
|
|
+ }
|
|
|
+
|
|
|
+ for trade in pending_trades:
|
|
|
+ try:
|
|
|
+ lifecycle_id = trade['trade_lifecycle_id']
|
|
|
+ symbol = trade['symbol']
|
|
|
+ entry_order_id = trade.get('entry_order_id')
|
|
|
+
|
|
|
+ # Check if this trade should be cancelled
|
|
|
+ should_cancel = False
|
|
|
+ cancel_reason = ""
|
|
|
+
|
|
|
+ # Case 1: Entry order ID exists but order is no longer on exchange
|
|
|
+ if entry_order_id and entry_order_id not in exchange_order_ids:
|
|
|
+ # Check if a position was opened (even if order disappeared)
|
|
|
+ if symbol not in exchange_position_symbols:
|
|
|
+ should_cancel = True
|
|
|
+ cancel_reason = "entry_order_cancelled_no_position"
|
|
|
+ logger.debug(f"🗑️ Pending trade {lifecycle_id[:8]} for {symbol}: entry order {entry_order_id} no longer exists and no position opened")
|
|
|
+
|
|
|
+ # Case 2: No entry order ID but no position exists (shouldn't happen but safety check)
|
|
|
+ elif not entry_order_id and symbol not in exchange_position_symbols:
|
|
|
+ should_cancel = True
|
|
|
+ cancel_reason = "no_entry_order_no_position"
|
|
|
+ logger.debug(f"🗑️ Pending trade {lifecycle_id[:8]} for {symbol}: no entry order ID and no position")
|
|
|
+
|
|
|
+ # Case 3: Check if trade is very old (safety net for other edge cases)
|
|
|
+ else:
|
|
|
+ from datetime import datetime, timezone, timedelta
|
|
|
+ created_at_str = trade.get('timestamp')
|
|
|
+ if created_at_str:
|
|
|
+ try:
|
|
|
+ created_at = datetime.fromisoformat(created_at_str.replace('Z', '+00:00'))
|
|
|
+ if datetime.now(timezone.utc) - created_at > timedelta(hours=1):
|
|
|
+ # Very old pending trade, likely orphaned
|
|
|
+ if symbol not in exchange_position_symbols:
|
|
|
+ should_cancel = True
|
|
|
+ cancel_reason = "old_pending_trade_no_position"
|
|
|
+ logger.debug(f"🗑️ Pending trade {lifecycle_id[:8]} for {symbol}: very old ({created_at}) with no position")
|
|
|
+ except (ValueError, TypeError) as e:
|
|
|
+ logger.warning(f"Could not parse timestamp for pending trade {lifecycle_id}: {e}")
|
|
|
+
|
|
|
+ # Cancel the orphaned trade
|
|
|
+ if should_cancel:
|
|
|
+ success = stats.update_trade_cancelled(lifecycle_id, reason=cancel_reason)
|
|
|
+ if success:
|
|
|
+ logger.info(f"🗑️ Cancelled orphaned pending trade: {symbol} (Lifecycle: {lifecycle_id[:8]}) - {cancel_reason}")
|
|
|
+
|
|
|
+ # Send a notification about the cancelled trade
|
|
|
+ await self._send_trade_cancelled_notification(symbol, cancel_reason, trade)
|
|
|
+ else:
|
|
|
+ logger.error(f"❌ Failed to cancel orphaned pending trade: {lifecycle_id}")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ Error processing pending trade {trade.get('trade_lifecycle_id', 'unknown')}: {e}")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ Error handling orphaned pending trades: {e}")
|
|
|
+
|
|
|
+ async def _send_trade_cancelled_notification(self, symbol: str, cancel_reason: str, trade: Dict[str, Any]):
|
|
|
+ """Send notification for cancelled trade."""
|
|
|
+ try:
|
|
|
+ if not self.notification_manager:
|
|
|
+ return
|
|
|
+
|
|
|
+ token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0]
|
|
|
+ lifecycle_id = trade['trade_lifecycle_id']
|
|
|
+
|
|
|
+ # Create user-friendly reason
|
|
|
+ reason_map = {
|
|
|
+ 'entry_order_cancelled_no_position': 'Entry order was cancelled before filling',
|
|
|
+ 'no_entry_order_no_position': 'No entry order and no position opened',
|
|
|
+ 'old_pending_trade_no_position': 'Trade was pending too long without execution'
|
|
|
+ }
|
|
|
+ user_reason = reason_map.get(cancel_reason, cancel_reason)
|
|
|
+
|
|
|
+ message = f"""❌ <b>Trade Cancelled</b>
|
|
|
+
|
|
|
+📊 <b>Details:</b>
|
|
|
+• Token: {token}
|
|
|
+• Trade ID: {lifecycle_id[:8]}...
|
|
|
+• Reason: {user_reason}
|
|
|
+
|
|
|
+ℹ️ <b>Status:</b> Trade was automatically cancelled due to order issues
|
|
|
+📱 Use /positions to view current positions"""
|
|
|
+
|
|
|
+ await self.notification_manager.send_generic_notification(message.strip())
|
|
|
+ logger.debug(f"📨 Sent cancellation notification for {symbol}")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ Error sending cancellation notification for {symbol}: {e}")
|
|
|
+
|
|
|
+ async def _send_position_notification(self, change_type: str, symbol: str, details: Dict[str, Any]):
|
|
|
+ """Send position change notification."""
|
|
|
+ try:
|
|
|
+ if not self.notification_manager:
|
|
|
+ return
|
|
|
+
|
|
|
+ token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0]
|
|
|
+ timestamp = details.get('timestamp', datetime.now(timezone.utc))
|
|
|
+ time_str = timestamp.strftime('%H:%M:%S')
|
|
|
+
|
|
|
+ from src.utils.token_display_formatter import get_formatter
|
|
|
+ formatter = get_formatter()
|
|
|
+
|
|
|
+ if change_type == 'opened':
|
|
|
+ side = details['side'].upper()
|
|
|
+ size = details['size']
|
|
|
+ price = details['price']
|
|
|
+
|
|
|
+ message = f"""🚀 <b>Position Opened</b>
|
|
|
+
|
|
|
+📊 <b>Details:</b>
|
|
|
+• Token: {token}
|
|
|
+• Direction: {side}
|
|
|
+• Size: {formatter.format_amount(size, token)}
|
|
|
+• Entry Price: {formatter.format_price_with_symbol(price, token)}
|
|
|
+• Value: {formatter.format_price_with_symbol(size * price)}
|
|
|
+
|
|
|
+⏰ <b>Time:</b> {time_str}
|
|
|
+📱 Use /positions to view all positions"""
|
|
|
+
|
|
|
+ elif change_type == 'closed':
|
|
|
+ side = details['side'].upper()
|
|
|
+ size = details['size']
|
|
|
+ entry_price = details['entry_price']
|
|
|
+ exit_price = details['exit_price']
|
|
|
+ pnl = details['realized_pnl']
|
|
|
+ pnl_emoji = "🟢" if pnl >= 0 else "🔴"
|
|
|
+
|
|
|
+ message = f"""🎯 <b>Position Closed</b>
|
|
|
+
|
|
|
+📊 <b>Details:</b>
|
|
|
+• Token: {token}
|
|
|
+• Direction: {side}
|
|
|
+• Size: {formatter.format_amount(size, token)}
|
|
|
+• Entry: {formatter.format_price_with_symbol(entry_price, token)}
|
|
|
+• Exit: {formatter.format_price_with_symbol(exit_price, token)}
|
|
|
+
|
|
|
+{pnl_emoji} <b>P&L:</b> {formatter.format_price_with_symbol(pnl)}
|
|
|
+
|
|
|
+⏰ <b>Time:</b> {time_str}
|
|
|
+📊 Use /stats to view performance"""
|
|
|
+
|
|
|
+ elif change_type in ['increased', 'decreased']:
|
|
|
+ side = details['side'].upper()
|
|
|
+ old_size = details['old_size']
|
|
|
+ new_size = details['new_size']
|
|
|
+ size_diff = details['size_diff']
|
|
|
+ emoji = "📈" if change_type == 'increased' else "📉"
|
|
|
+
|
|
|
+ message = f"""{emoji} <b>Position {change_type.title()}</b>
|
|
|
+
|
|
|
+📊 <b>Details:</b>
|
|
|
+• Token: {token}
|
|
|
+• Direction: {side}
|
|
|
+• Previous Size: {formatter.format_amount(old_size, token)}
|
|
|
+• New Size: {formatter.format_amount(new_size, token)}
|
|
|
+• Change: {formatter.format_amount(size_diff, token)}
|
|
|
+
|
|
|
+⏰ <b>Time:</b> {time_str}
|
|
|
+📈 Use /positions to view current status"""
|
|
|
+ else:
|
|
|
+ return
|
|
|
+
|
|
|
+ await self.notification_manager.send_generic_notification(message.strip())
|
|
|
+ logger.debug(f"📨 Sent {change_type} notification for {symbol}")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ Error sending notification for {symbol}: {e}")
|