123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490 |
- #!/usr/bin/env python3
- """
- Simplified Position Tracker
- Focuses only on:
- 1. Detecting position changes (opened/closed/size changed)
- 2. Sending notifications
- 3. Managing pending stop losses
- Reuses existing trades table and managers.
- """
- import logging
- import asyncio
- from datetime import datetime, timezone
- from typing import Optional, Dict, Any, List
- logger = logging.getLogger(__name__)
- class SimplePositionTracker:
- """Simplified position tracking focused on notifications and pending SLs."""
-
- def __init__(self, trading_engine, notification_manager):
- self.trading_engine = trading_engine
- self.notification_manager = notification_manager
-
- async def check_all_position_changes(self):
- """Main method - check all positions for changes and send notifications."""
- try:
- stats = self.trading_engine.get_stats()
- if not stats:
- logger.warning("TradingStats not available")
- return
-
- # Get current exchange positions
- exchange_positions = self.trading_engine.get_positions() or []
-
- # Get current DB positions (trades with status='position_opened')
- db_positions = stats.get_open_positions()
-
- # Create lookup maps
- exchange_map = {pos['symbol']: pos for pos in exchange_positions if abs(float(pos.get('contracts', 0))) > 1e-9}
- db_map = {pos['symbol']: pos for pos in db_positions}
-
- all_symbols = set(exchange_map.keys()) | set(db_map.keys())
-
- for symbol in all_symbols:
- await self._check_symbol_position_change(symbol, exchange_map.get(symbol), db_map.get(symbol), stats)
-
- # Handle pending stop losses
- await self._handle_pending_stop_losses(stats)
-
- # Handle orphaned pending trades (orders cancelled before filling)
- await self._handle_orphaned_pending_trades(stats)
-
- except Exception as e:
- logger.error(f"❌ Error checking position changes: {e}")
-
- async def _check_symbol_position_change(self, symbol: str, exchange_pos: Optional[Dict],
- db_pos: Optional[Dict], stats) -> None:
- """Check position changes for a single symbol."""
- try:
- current_time = datetime.now(timezone.utc)
-
- # Case 1: New position (exchange has, DB doesn't)
- if exchange_pos and not db_pos:
- await self._handle_position_opened(symbol, exchange_pos, stats, current_time)
-
- # Case 2: Position closed (DB has, exchange doesn't)
- elif db_pos and not exchange_pos:
- await self._handle_position_closed(symbol, db_pos, stats, current_time)
-
- # Case 3: Position size changed (both exist, different sizes)
- elif exchange_pos and db_pos:
- await self._handle_position_size_change(symbol, exchange_pos, db_pos, stats, current_time)
-
- # Case 4: Both None - no action needed
-
- except Exception as e:
- logger.error(f"❌ Error checking position change for {symbol}: {e}")
-
- async def _handle_position_opened(self, symbol: str, exchange_pos: Dict, stats, timestamp: datetime):
- """Handle new position detection."""
- try:
- contracts = float(exchange_pos.get('contracts', 0))
- size = abs(contracts)
-
- # Use CCXT's side field first (more reliable), fallback to contract sign
- ccxt_side = exchange_pos.get('side', '').lower()
- if ccxt_side == 'long':
- side, order_side = 'long', 'buy'
- elif ccxt_side == 'short':
- side, order_side = 'short', 'sell'
- else:
- # Fallback to contract sign (less reliable but better than nothing)
- side = 'long' if contracts > 0 else 'short'
- order_side = 'buy' if side == 'long' else 'sell'
- logger.warning(f"⚠️ Using contract sign fallback for {symbol}: side={side}, ccxt_side='{ccxt_side}'")
-
- # Get entry price from exchange
- entry_price = float(exchange_pos.get('entryPrice', 0)) or float(exchange_pos.get('entryPx', 0))
- if not entry_price:
- entry_price = float(exchange_pos.get('markPrice', 0)) or float(exchange_pos.get('markPx', 0))
-
- if not entry_price:
- logger.error(f"❌ Cannot determine entry price for {symbol}")
- return
-
- # Create trade lifecycle using existing manager
- lifecycle_id = stats.create_trade_lifecycle(
- symbol=symbol,
- side=order_side,
- entry_order_id=f"external_position_{timestamp.strftime('%Y%m%d_%H%M%S')}",
- trade_type='external_detected'
- )
-
- if lifecycle_id:
- # Update to position_opened using existing manager
- success = stats.update_trade_position_opened(
- lifecycle_id=lifecycle_id,
- entry_price=entry_price,
- entry_amount=size,
- exchange_fill_id=f"position_detected_{timestamp.isoformat()}"
- )
-
- if success:
- logger.info(f"🚀 NEW POSITION: {symbol} {side.upper()} {size} @ {entry_price}")
-
- # Send notification
- await self._send_position_notification('opened', symbol, {
- 'side': side,
- 'size': size,
- 'price': entry_price,
- 'timestamp': timestamp
- })
-
- except Exception as e:
- logger.error(f"❌ Error handling position opened for {symbol}: {e}")
-
- async def _handle_position_closed(self, symbol: str, db_pos: Dict, stats, timestamp: datetime):
- """Handle position closure detection."""
- try:
- lifecycle_id = db_pos['trade_lifecycle_id']
- entry_price = db_pos.get('entry_price', 0)
- position_side = db_pos.get('position_side')
- size = db_pos.get('current_position_size', 0)
-
- # Estimate exit price (could be improved with recent fills)
- market_data = self.trading_engine.get_market_data(symbol)
- exit_price = entry_price # Fallback
- if market_data and market_data.get('ticker'):
- exit_price = float(market_data['ticker'].get('last', exit_price))
-
- # Calculate realized PnL
- realized_pnl = 0
- if position_side == 'long':
- realized_pnl = size * (exit_price - entry_price)
- elif position_side == 'short':
- realized_pnl = size * (entry_price - exit_price)
-
- # Update to position_closed using existing manager
- success = stats.update_trade_position_closed(
- lifecycle_id=lifecycle_id,
- exit_price=exit_price,
- realized_pnl=realized_pnl,
- exchange_fill_id=f"position_closed_detected_{timestamp.isoformat()}"
- )
-
- if success:
- logger.info(f"🎯 POSITION CLOSED: {symbol} {position_side.upper()} PnL: {realized_pnl:.2f}")
-
- # Send notification
- await self._send_position_notification('closed', symbol, {
- 'side': position_side,
- 'size': size,
- 'entry_price': entry_price,
- 'exit_price': exit_price,
- 'realized_pnl': realized_pnl,
- 'timestamp': timestamp
- })
-
- # Clear any pending stop losses for this symbol
- stats.order_manager.cancel_pending_stop_losses_by_symbol(symbol, 'cancelled_position_closed')
-
- # Migrate trade to aggregated stats and clean up
- stats.migrate_trade_to_aggregated_stats(lifecycle_id)
-
- except Exception as e:
- logger.error(f"❌ Error handling position closed for {symbol}: {e}")
-
- async def _handle_position_size_change(self, symbol: str, exchange_pos: Dict,
- db_pos: Dict, stats, timestamp: datetime):
- """Handle position size changes and position flips."""
- try:
- exchange_size = abs(float(exchange_pos.get('contracts', 0)))
- db_size = db_pos.get('current_position_size', 0)
- db_position_side = db_pos.get('position_side')
-
- # Determine current exchange position side
- ccxt_side = exchange_pos.get('side', '').lower()
- if ccxt_side == 'long':
- exchange_position_side = 'long'
- elif ccxt_side == 'short':
- exchange_position_side = 'short'
- else:
- # Fallback to contract sign
- contracts = float(exchange_pos.get('contracts', 0))
- exchange_position_side = 'long' if contracts > 0 else 'short'
- logger.warning(f"⚠️ Using contract sign fallback for side detection: {symbol}")
-
- # Check for POSITION FLIP (LONG ↔ SHORT)
- if db_position_side != exchange_position_side:
- logger.info(f"🔄 POSITION FLIP DETECTED: {symbol} {db_position_side.upper()} → {exchange_position_side.upper()}")
-
- # Handle as: close old position + open new position
- await self._handle_position_closed(symbol, db_pos, stats, timestamp)
- await self._handle_position_opened(symbol, exchange_pos, stats, timestamp)
- return
-
- # Check if size actually changed (with small tolerance)
- if abs(exchange_size - db_size) < 1e-6:
- return # No meaningful change
-
- lifecycle_id = db_pos['trade_lifecycle_id']
- entry_price = db_pos.get('entry_price', 0)
-
- # Update position size using existing manager
- success = stats.trade_manager.update_trade_market_data(
- lifecycle_id, current_position_size=exchange_size
- )
-
- if success:
- change_type = 'increased' if exchange_size > db_size else 'decreased'
- size_diff = abs(exchange_size - db_size)
-
- logger.info(f"📊 POSITION {change_type.upper()}: {symbol} {db_size} → {exchange_size}")
-
- # Send notification
- await self._send_position_notification(change_type, symbol, {
- 'side': db_position_side,
- 'old_size': db_size,
- 'new_size': exchange_size,
- 'size_diff': size_diff,
- 'timestamp': timestamp
- })
-
- except Exception as e:
- logger.error(f"❌ Error handling position size change for {symbol}: {e}")
-
- async def _handle_pending_stop_losses(self, stats):
- """Handle pending stop losses - place orders for positions that need them."""
- try:
- # Get positions with pending SLs using existing manager
- pending_sl_trades = stats.get_pending_stop_loss_activations()
-
- for trade in pending_sl_trades:
- symbol = trade['symbol']
- stop_price = trade['stop_loss_price']
- position_side = trade['position_side']
- lifecycle_id = trade['trade_lifecycle_id']
-
- try:
- # Check if position still exists on exchange
- exchange_positions = self.trading_engine.get_positions() or []
- position_exists = any(
- pos['symbol'] == symbol and abs(float(pos.get('contracts', 0))) > 1e-9
- for pos in exchange_positions
- )
-
- if position_exists:
- # Place stop loss order
- sl_side = 'sell' if position_side == 'long' else 'buy'
-
- token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0]
- result = await self.trading_engine.execute_stop_loss_order(
- token=token,
- stop_price=stop_price
- )
-
- if result and result.get('success'):
- exchange_order_id = result.get('order_placed_details', {}).get('exchange_order_id')
- if exchange_order_id:
- # The execute_stop_loss_order already links the SL to the trade
- logger.info(f"✅ Placed pending SL: {symbol} @ {stop_price}")
- else:
- logger.warning(f"⚠️ SL placed for {symbol} but no exchange_order_id returned")
- else:
- # Position doesn't exist, clear pending SL
- logger.info(f"🗑️ Clearing pending SL for non-existent position: {symbol}")
- # This will be handled by position closed detection
-
- except Exception as e:
- logger.error(f"❌ Error handling pending SL for {symbol}: {e}")
-
- except Exception as e:
- logger.error(f"❌ Error handling pending stop losses: {e}")
-
- async def _handle_orphaned_pending_trades(self, stats):
- """Handle trades stuck in 'pending' status due to cancelled orders."""
- try:
- # Get all pending trades
- pending_trades = stats.get_trades_by_status('pending')
-
- if not pending_trades:
- return
-
- logger.debug(f"🔍 Checking {len(pending_trades)} pending trades for orphaned status")
-
- # Get current exchange orders for comparison
- exchange_orders = self.trading_engine.get_orders() or []
- exchange_order_ids = {order.get('id') for order in exchange_orders if order.get('id')}
-
- # Get current exchange positions
- exchange_positions = self.trading_engine.get_positions() or []
- exchange_position_symbols = {
- pos.get('symbol') for pos in exchange_positions
- if pos.get('symbol') and abs(float(pos.get('contracts', 0))) > 1e-9
- }
-
- for trade in pending_trades:
- try:
- lifecycle_id = trade['trade_lifecycle_id']
- symbol = trade['symbol']
- entry_order_id = trade.get('entry_order_id')
-
- # Check if this trade should be cancelled
- should_cancel = False
- cancel_reason = ""
-
- # Case 1: Entry order ID exists but order is no longer on exchange
- if entry_order_id and entry_order_id not in exchange_order_ids:
- # Check if a position was opened (even if order disappeared)
- if symbol not in exchange_position_symbols:
- should_cancel = True
- cancel_reason = "entry_order_cancelled_no_position"
- logger.debug(f"🗑️ Pending trade {lifecycle_id[:8]} for {symbol}: entry order {entry_order_id} no longer exists and no position opened")
-
- # Case 2: No entry order ID but no position exists (shouldn't happen but safety check)
- elif not entry_order_id and symbol not in exchange_position_symbols:
- should_cancel = True
- cancel_reason = "no_entry_order_no_position"
- logger.debug(f"🗑️ Pending trade {lifecycle_id[:8]} for {symbol}: no entry order ID and no position")
-
- # Case 3: Check if trade is very old (safety net for other edge cases)
- else:
- from datetime import datetime, timezone, timedelta
- created_at_str = trade.get('timestamp')
- if created_at_str:
- try:
- created_at = datetime.fromisoformat(created_at_str.replace('Z', '+00:00'))
- if datetime.now(timezone.utc) - created_at > timedelta(hours=1):
- # Very old pending trade, likely orphaned
- if symbol not in exchange_position_symbols:
- should_cancel = True
- cancel_reason = "old_pending_trade_no_position"
- logger.debug(f"🗑️ Pending trade {lifecycle_id[:8]} for {symbol}: very old ({created_at}) with no position")
- except (ValueError, TypeError) as e:
- logger.warning(f"Could not parse timestamp for pending trade {lifecycle_id}: {e}")
-
- # Cancel the orphaned trade
- if should_cancel:
- success = stats.update_trade_cancelled(lifecycle_id, reason=cancel_reason)
- if success:
- logger.info(f"🗑️ Cancelled orphaned pending trade: {symbol} (Lifecycle: {lifecycle_id[:8]}) - {cancel_reason}")
-
- # Send a notification about the cancelled trade
- await self._send_trade_cancelled_notification(symbol, cancel_reason, trade)
-
- # Migrate cancelled trade to aggregated stats
- stats.migrate_trade_to_aggregated_stats(lifecycle_id)
- else:
- logger.error(f"❌ Failed to cancel orphaned pending trade: {lifecycle_id}")
-
- except Exception as e:
- logger.error(f"❌ Error processing pending trade {trade.get('trade_lifecycle_id', 'unknown')}: {e}")
-
- except Exception as e:
- logger.error(f"❌ Error handling orphaned pending trades: {e}")
-
- async def _send_trade_cancelled_notification(self, symbol: str, cancel_reason: str, trade: Dict[str, Any]):
- """Send notification for cancelled trade."""
- try:
- if not self.notification_manager:
- return
-
- token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0]
- lifecycle_id = trade['trade_lifecycle_id']
-
- # Create user-friendly reason
- reason_map = {
- 'entry_order_cancelled_no_position': 'Entry order was cancelled before filling',
- 'no_entry_order_no_position': 'No entry order and no position opened',
- 'old_pending_trade_no_position': 'Trade was pending too long without execution'
- }
- user_reason = reason_map.get(cancel_reason, cancel_reason)
-
- message = f"""❌ <b>Trade Cancelled</b>
- 📊 <b>Details:</b>
- • Token: {token}
- • Trade ID: {lifecycle_id[:8]}...
- • Reason: {user_reason}
- ℹ️ <b>Status:</b> Trade was automatically cancelled due to order issues
- 📱 Use /positions to view current positions"""
-
- await self.notification_manager.send_generic_notification(message.strip())
- logger.debug(f"📨 Sent cancellation notification for {symbol}")
-
- except Exception as e:
- logger.error(f"❌ Error sending cancellation notification for {symbol}: {e}")
-
- async def _send_position_notification(self, change_type: str, symbol: str, details: Dict[str, Any]):
- """Send position change notification."""
- try:
- if not self.notification_manager:
- return
-
- token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0]
- timestamp = details.get('timestamp', datetime.now(timezone.utc))
- time_str = timestamp.strftime('%H:%M:%S')
-
- from src.utils.token_display_formatter import get_formatter
- formatter = get_formatter()
-
- if change_type == 'opened':
- side = details['side'].upper()
- size = details['size']
- price = details['price']
-
- message = f"""🚀 <b>Position Opened</b>
- 📊 <b>Details:</b>
- • Token: {token}
- • Direction: {side}
- • Size: {formatter.format_amount(size, token)}
- • Entry Price: {formatter.format_price_with_symbol(price, token)}
- • Value: {formatter.format_price_with_symbol(size * price)}
- ⏰ <b>Time:</b> {time_str}
- 📱 Use /positions to view all positions"""
-
- elif change_type == 'closed':
- side = details['side'].upper()
- size = details['size']
- entry_price = details['entry_price']
- exit_price = details['exit_price']
- pnl = details['realized_pnl']
- pnl_emoji = "🟢" if pnl >= 0 else "🔴"
-
- message = f"""🎯 <b>Position Closed</b>
- 📊 <b>Details:</b>
- • Token: {token}
- • Direction: {side}
- • Size: {formatter.format_amount(size, token)}
- • Entry: {formatter.format_price_with_symbol(entry_price, token)}
- • Exit: {formatter.format_price_with_symbol(exit_price, token)}
- {pnl_emoji} <b>P&L:</b> {formatter.format_price_with_symbol(pnl)}
- ⏰ <b>Time:</b> {time_str}
- 📊 Use /stats to view performance"""
-
- elif change_type in ['increased', 'decreased']:
- side = details['side'].upper()
- old_size = details['old_size']
- new_size = details['new_size']
- size_diff = details['size_diff']
- emoji = "📈" if change_type == 'increased' else "📉"
-
- message = f"""{emoji} <b>Position {change_type.title()}</b>
- 📊 <b>Details:</b>
- • Token: {token}
- • Direction: {side}
- • Previous Size: {formatter.format_amount(old_size, token)}
- • New Size: {formatter.format_amount(new_size, token)}
- • Change: {formatter.format_amount(size_diff, token)}
- ⏰ <b>Time:</b> {time_str}
- 📈 Use /positions to view current status"""
- else:
- return
-
- await self.notification_manager.send_generic_notification(message.strip())
- logger.debug(f"📨 Sent {change_type} notification for {symbol}")
-
- except Exception as e:
- logger.error(f"❌ Error sending notification for {symbol}: {e}")
|