simple_position_tracker.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490
  1. #!/usr/bin/env python3
  2. """
  3. Simplified Position Tracker
  4. Focuses only on:
  5. 1. Detecting position changes (opened/closed/size changed)
  6. 2. Sending notifications
  7. 3. Managing pending stop losses
  8. Reuses existing trades table and managers.
  9. """
  10. import logging
  11. import asyncio
  12. from datetime import datetime, timezone
  13. from typing import Optional, Dict, Any, List
  14. logger = logging.getLogger(__name__)
  15. class SimplePositionTracker:
  16. """Simplified position tracking focused on notifications and pending SLs."""
  17. def __init__(self, trading_engine, notification_manager):
  18. self.trading_engine = trading_engine
  19. self.notification_manager = notification_manager
  20. async def check_all_position_changes(self):
  21. """Main method - check all positions for changes and send notifications."""
  22. try:
  23. stats = self.trading_engine.get_stats()
  24. if not stats:
  25. logger.warning("TradingStats not available")
  26. return
  27. # Get current exchange positions
  28. exchange_positions = self.trading_engine.get_positions() or []
  29. # Get current DB positions (trades with status='position_opened')
  30. db_positions = stats.get_open_positions()
  31. # Create lookup maps
  32. exchange_map = {pos['symbol']: pos for pos in exchange_positions if abs(float(pos.get('contracts', 0))) > 1e-9}
  33. db_map = {pos['symbol']: pos for pos in db_positions}
  34. all_symbols = set(exchange_map.keys()) | set(db_map.keys())
  35. for symbol in all_symbols:
  36. await self._check_symbol_position_change(symbol, exchange_map.get(symbol), db_map.get(symbol), stats)
  37. # Handle pending stop losses
  38. await self._handle_pending_stop_losses(stats)
  39. # Handle orphaned pending trades (orders cancelled before filling)
  40. await self._handle_orphaned_pending_trades(stats)
  41. except Exception as e:
  42. logger.error(f"❌ Error checking position changes: {e}")
  43. async def _check_symbol_position_change(self, symbol: str, exchange_pos: Optional[Dict],
  44. db_pos: Optional[Dict], stats) -> None:
  45. """Check position changes for a single symbol."""
  46. try:
  47. current_time = datetime.now(timezone.utc)
  48. # Case 1: New position (exchange has, DB doesn't)
  49. if exchange_pos and not db_pos:
  50. await self._handle_position_opened(symbol, exchange_pos, stats, current_time)
  51. # Case 2: Position closed (DB has, exchange doesn't)
  52. elif db_pos and not exchange_pos:
  53. await self._handle_position_closed(symbol, db_pos, stats, current_time)
  54. # Case 3: Position size changed (both exist, different sizes)
  55. elif exchange_pos and db_pos:
  56. await self._handle_position_size_change(symbol, exchange_pos, db_pos, stats, current_time)
  57. # Case 4: Both None - no action needed
  58. except Exception as e:
  59. logger.error(f"❌ Error checking position change for {symbol}: {e}")
  60. async def _handle_position_opened(self, symbol: str, exchange_pos: Dict, stats, timestamp: datetime):
  61. """Handle new position detection."""
  62. try:
  63. contracts = float(exchange_pos.get('contracts', 0))
  64. size = abs(contracts)
  65. # Use CCXT's side field first (more reliable), fallback to contract sign
  66. ccxt_side = exchange_pos.get('side', '').lower()
  67. if ccxt_side == 'long':
  68. side, order_side = 'long', 'buy'
  69. elif ccxt_side == 'short':
  70. side, order_side = 'short', 'sell'
  71. else:
  72. # Fallback to contract sign (less reliable but better than nothing)
  73. side = 'long' if contracts > 0 else 'short'
  74. order_side = 'buy' if side == 'long' else 'sell'
  75. logger.warning(f"⚠️ Using contract sign fallback for {symbol}: side={side}, ccxt_side='{ccxt_side}'")
  76. # Get entry price from exchange
  77. entry_price = float(exchange_pos.get('entryPrice', 0)) or float(exchange_pos.get('entryPx', 0))
  78. if not entry_price:
  79. entry_price = float(exchange_pos.get('markPrice', 0)) or float(exchange_pos.get('markPx', 0))
  80. if not entry_price:
  81. logger.error(f"❌ Cannot determine entry price for {symbol}")
  82. return
  83. # Create trade lifecycle using existing manager
  84. lifecycle_id = stats.create_trade_lifecycle(
  85. symbol=symbol,
  86. side=order_side,
  87. entry_order_id=f"external_position_{timestamp.strftime('%Y%m%d_%H%M%S')}",
  88. trade_type='external_detected'
  89. )
  90. if lifecycle_id:
  91. # Update to position_opened using existing manager
  92. success = stats.update_trade_position_opened(
  93. lifecycle_id=lifecycle_id,
  94. entry_price=entry_price,
  95. entry_amount=size,
  96. exchange_fill_id=f"position_detected_{timestamp.isoformat()}"
  97. )
  98. if success:
  99. logger.info(f"🚀 NEW POSITION: {symbol} {side.upper()} {size} @ {entry_price}")
  100. # Send notification
  101. await self._send_position_notification('opened', symbol, {
  102. 'side': side,
  103. 'size': size,
  104. 'price': entry_price,
  105. 'timestamp': timestamp
  106. })
  107. except Exception as e:
  108. logger.error(f"❌ Error handling position opened for {symbol}: {e}")
  109. async def _handle_position_closed(self, symbol: str, db_pos: Dict, stats, timestamp: datetime):
  110. """Handle position closure detection."""
  111. try:
  112. lifecycle_id = db_pos['trade_lifecycle_id']
  113. entry_price = db_pos.get('entry_price', 0)
  114. position_side = db_pos.get('position_side')
  115. size = db_pos.get('current_position_size', 0)
  116. # Estimate exit price (could be improved with recent fills)
  117. market_data = self.trading_engine.get_market_data(symbol)
  118. exit_price = entry_price # Fallback
  119. if market_data and market_data.get('ticker'):
  120. exit_price = float(market_data['ticker'].get('last', exit_price))
  121. # Calculate realized PnL
  122. realized_pnl = 0
  123. if position_side == 'long':
  124. realized_pnl = size * (exit_price - entry_price)
  125. elif position_side == 'short':
  126. realized_pnl = size * (entry_price - exit_price)
  127. # Update to position_closed using existing manager
  128. success = stats.update_trade_position_closed(
  129. lifecycle_id=lifecycle_id,
  130. exit_price=exit_price,
  131. realized_pnl=realized_pnl,
  132. exchange_fill_id=f"position_closed_detected_{timestamp.isoformat()}"
  133. )
  134. if success:
  135. logger.info(f"🎯 POSITION CLOSED: {symbol} {position_side.upper()} PnL: {realized_pnl:.2f}")
  136. # Send notification
  137. await self._send_position_notification('closed', symbol, {
  138. 'side': position_side,
  139. 'size': size,
  140. 'entry_price': entry_price,
  141. 'exit_price': exit_price,
  142. 'realized_pnl': realized_pnl,
  143. 'timestamp': timestamp
  144. })
  145. # Clear any pending stop losses for this symbol
  146. stats.order_manager.cancel_pending_stop_losses_by_symbol(symbol, 'cancelled_position_closed')
  147. # Migrate trade to aggregated stats and clean up
  148. stats.migrate_trade_to_aggregated_stats(lifecycle_id)
  149. except Exception as e:
  150. logger.error(f"❌ Error handling position closed for {symbol}: {e}")
  151. async def _handle_position_size_change(self, symbol: str, exchange_pos: Dict,
  152. db_pos: Dict, stats, timestamp: datetime):
  153. """Handle position size changes and position flips."""
  154. try:
  155. exchange_size = abs(float(exchange_pos.get('contracts', 0)))
  156. db_size = db_pos.get('current_position_size', 0)
  157. db_position_side = db_pos.get('position_side')
  158. # Determine current exchange position side
  159. ccxt_side = exchange_pos.get('side', '').lower()
  160. if ccxt_side == 'long':
  161. exchange_position_side = 'long'
  162. elif ccxt_side == 'short':
  163. exchange_position_side = 'short'
  164. else:
  165. # Fallback to contract sign
  166. contracts = float(exchange_pos.get('contracts', 0))
  167. exchange_position_side = 'long' if contracts > 0 else 'short'
  168. logger.warning(f"⚠️ Using contract sign fallback for side detection: {symbol}")
  169. # Check for POSITION FLIP (LONG ↔ SHORT)
  170. if db_position_side != exchange_position_side:
  171. logger.info(f"🔄 POSITION FLIP DETECTED: {symbol} {db_position_side.upper()} → {exchange_position_side.upper()}")
  172. # Handle as: close old position + open new position
  173. await self._handle_position_closed(symbol, db_pos, stats, timestamp)
  174. await self._handle_position_opened(symbol, exchange_pos, stats, timestamp)
  175. return
  176. # Check if size actually changed (with small tolerance)
  177. if abs(exchange_size - db_size) < 1e-6:
  178. return # No meaningful change
  179. lifecycle_id = db_pos['trade_lifecycle_id']
  180. entry_price = db_pos.get('entry_price', 0)
  181. # Update position size using existing manager
  182. success = stats.trade_manager.update_trade_market_data(
  183. lifecycle_id, current_position_size=exchange_size
  184. )
  185. if success:
  186. change_type = 'increased' if exchange_size > db_size else 'decreased'
  187. size_diff = abs(exchange_size - db_size)
  188. logger.info(f"📊 POSITION {change_type.upper()}: {symbol} {db_size} → {exchange_size}")
  189. # Send notification
  190. await self._send_position_notification(change_type, symbol, {
  191. 'side': db_position_side,
  192. 'old_size': db_size,
  193. 'new_size': exchange_size,
  194. 'size_diff': size_diff,
  195. 'timestamp': timestamp
  196. })
  197. except Exception as e:
  198. logger.error(f"❌ Error handling position size change for {symbol}: {e}")
  199. async def _handle_pending_stop_losses(self, stats):
  200. """Handle pending stop losses - place orders for positions that need them."""
  201. try:
  202. # Get positions with pending SLs using existing manager
  203. pending_sl_trades = stats.get_pending_stop_loss_activations()
  204. for trade in pending_sl_trades:
  205. symbol = trade['symbol']
  206. stop_price = trade['stop_loss_price']
  207. position_side = trade['position_side']
  208. lifecycle_id = trade['trade_lifecycle_id']
  209. try:
  210. # Check if position still exists on exchange
  211. exchange_positions = self.trading_engine.get_positions() or []
  212. position_exists = any(
  213. pos['symbol'] == symbol and abs(float(pos.get('contracts', 0))) > 1e-9
  214. for pos in exchange_positions
  215. )
  216. if position_exists:
  217. # Place stop loss order
  218. sl_side = 'sell' if position_side == 'long' else 'buy'
  219. token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0]
  220. result = await self.trading_engine.execute_stop_loss_order(
  221. token=token,
  222. stop_price=stop_price
  223. )
  224. if result and result.get('success'):
  225. exchange_order_id = result.get('order_placed_details', {}).get('exchange_order_id')
  226. if exchange_order_id:
  227. # The execute_stop_loss_order already links the SL to the trade
  228. logger.info(f"✅ Placed pending SL: {symbol} @ {stop_price}")
  229. else:
  230. logger.warning(f"⚠️ SL placed for {symbol} but no exchange_order_id returned")
  231. else:
  232. # Position doesn't exist, clear pending SL
  233. logger.info(f"🗑️ Clearing pending SL for non-existent position: {symbol}")
  234. # This will be handled by position closed detection
  235. except Exception as e:
  236. logger.error(f"❌ Error handling pending SL for {symbol}: {e}")
  237. except Exception as e:
  238. logger.error(f"❌ Error handling pending stop losses: {e}")
  239. async def _handle_orphaned_pending_trades(self, stats):
  240. """Handle trades stuck in 'pending' status due to cancelled orders."""
  241. try:
  242. # Get all pending trades
  243. pending_trades = stats.get_trades_by_status('pending')
  244. if not pending_trades:
  245. return
  246. logger.debug(f"🔍 Checking {len(pending_trades)} pending trades for orphaned status")
  247. # Get current exchange orders for comparison
  248. exchange_orders = self.trading_engine.get_orders() or []
  249. exchange_order_ids = {order.get('id') for order in exchange_orders if order.get('id')}
  250. # Get current exchange positions
  251. exchange_positions = self.trading_engine.get_positions() or []
  252. exchange_position_symbols = {
  253. pos.get('symbol') for pos in exchange_positions
  254. if pos.get('symbol') and abs(float(pos.get('contracts', 0))) > 1e-9
  255. }
  256. for trade in pending_trades:
  257. try:
  258. lifecycle_id = trade['trade_lifecycle_id']
  259. symbol = trade['symbol']
  260. entry_order_id = trade.get('entry_order_id')
  261. # Check if this trade should be cancelled
  262. should_cancel = False
  263. cancel_reason = ""
  264. # Case 1: Entry order ID exists but order is no longer on exchange
  265. if entry_order_id and entry_order_id not in exchange_order_ids:
  266. # Check if a position was opened (even if order disappeared)
  267. if symbol not in exchange_position_symbols:
  268. should_cancel = True
  269. cancel_reason = "entry_order_cancelled_no_position"
  270. logger.debug(f"🗑️ Pending trade {lifecycle_id[:8]} for {symbol}: entry order {entry_order_id} no longer exists and no position opened")
  271. # Case 2: No entry order ID but no position exists (shouldn't happen but safety check)
  272. elif not entry_order_id and symbol not in exchange_position_symbols:
  273. should_cancel = True
  274. cancel_reason = "no_entry_order_no_position"
  275. logger.debug(f"🗑️ Pending trade {lifecycle_id[:8]} for {symbol}: no entry order ID and no position")
  276. # Case 3: Check if trade is very old (safety net for other edge cases)
  277. else:
  278. from datetime import datetime, timezone, timedelta
  279. created_at_str = trade.get('timestamp')
  280. if created_at_str:
  281. try:
  282. created_at = datetime.fromisoformat(created_at_str.replace('Z', '+00:00'))
  283. if datetime.now(timezone.utc) - created_at > timedelta(hours=1):
  284. # Very old pending trade, likely orphaned
  285. if symbol not in exchange_position_symbols:
  286. should_cancel = True
  287. cancel_reason = "old_pending_trade_no_position"
  288. logger.debug(f"🗑️ Pending trade {lifecycle_id[:8]} for {symbol}: very old ({created_at}) with no position")
  289. except (ValueError, TypeError) as e:
  290. logger.warning(f"Could not parse timestamp for pending trade {lifecycle_id}: {e}")
  291. # Cancel the orphaned trade
  292. if should_cancel:
  293. success = stats.update_trade_cancelled(lifecycle_id, reason=cancel_reason)
  294. if success:
  295. logger.info(f"🗑️ Cancelled orphaned pending trade: {symbol} (Lifecycle: {lifecycle_id[:8]}) - {cancel_reason}")
  296. # Send a notification about the cancelled trade
  297. await self._send_trade_cancelled_notification(symbol, cancel_reason, trade)
  298. # Migrate cancelled trade to aggregated stats
  299. stats.migrate_trade_to_aggregated_stats(lifecycle_id)
  300. else:
  301. logger.error(f"❌ Failed to cancel orphaned pending trade: {lifecycle_id}")
  302. except Exception as e:
  303. logger.error(f"❌ Error processing pending trade {trade.get('trade_lifecycle_id', 'unknown')}: {e}")
  304. except Exception as e:
  305. logger.error(f"❌ Error handling orphaned pending trades: {e}")
  306. async def _send_trade_cancelled_notification(self, symbol: str, cancel_reason: str, trade: Dict[str, Any]):
  307. """Send notification for cancelled trade."""
  308. try:
  309. if not self.notification_manager:
  310. return
  311. token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0]
  312. lifecycle_id = trade['trade_lifecycle_id']
  313. # Create user-friendly reason
  314. reason_map = {
  315. 'entry_order_cancelled_no_position': 'Entry order was cancelled before filling',
  316. 'no_entry_order_no_position': 'No entry order and no position opened',
  317. 'old_pending_trade_no_position': 'Trade was pending too long without execution'
  318. }
  319. user_reason = reason_map.get(cancel_reason, cancel_reason)
  320. message = f"""❌ <b>Trade Cancelled</b>
  321. 📊 <b>Details:</b>
  322. • Token: {token}
  323. • Trade ID: {lifecycle_id[:8]}...
  324. • Reason: {user_reason}
  325. ℹ️ <b>Status:</b> Trade was automatically cancelled due to order issues
  326. 📱 Use /positions to view current positions"""
  327. await self.notification_manager.send_generic_notification(message.strip())
  328. logger.debug(f"📨 Sent cancellation notification for {symbol}")
  329. except Exception as e:
  330. logger.error(f"❌ Error sending cancellation notification for {symbol}: {e}")
  331. async def _send_position_notification(self, change_type: str, symbol: str, details: Dict[str, Any]):
  332. """Send position change notification."""
  333. try:
  334. if not self.notification_manager:
  335. return
  336. token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0]
  337. timestamp = details.get('timestamp', datetime.now(timezone.utc))
  338. time_str = timestamp.strftime('%H:%M:%S')
  339. from src.utils.token_display_formatter import get_formatter
  340. formatter = get_formatter()
  341. if change_type == 'opened':
  342. side = details['side'].upper()
  343. size = details['size']
  344. price = details['price']
  345. message = f"""🚀 <b>Position Opened</b>
  346. 📊 <b>Details:</b>
  347. • Token: {token}
  348. • Direction: {side}
  349. • Size: {formatter.format_amount(size, token)}
  350. • Entry Price: {formatter.format_price_with_symbol(price, token)}
  351. • Value: {formatter.format_price_with_symbol(size * price)}
  352. ⏰ <b>Time:</b> {time_str}
  353. 📱 Use /positions to view all positions"""
  354. elif change_type == 'closed':
  355. side = details['side'].upper()
  356. size = details['size']
  357. entry_price = details['entry_price']
  358. exit_price = details['exit_price']
  359. pnl = details['realized_pnl']
  360. pnl_emoji = "🟢" if pnl >= 0 else "🔴"
  361. message = f"""🎯 <b>Position Closed</b>
  362. 📊 <b>Details:</b>
  363. • Token: {token}
  364. • Direction: {side}
  365. • Size: {formatter.format_amount(size, token)}
  366. • Entry: {formatter.format_price_with_symbol(entry_price, token)}
  367. • Exit: {formatter.format_price_with_symbol(exit_price, token)}
  368. {pnl_emoji} <b>P&L:</b> {formatter.format_price_with_symbol(pnl)}
  369. ⏰ <b>Time:</b> {time_str}
  370. 📊 Use /stats to view performance"""
  371. elif change_type in ['increased', 'decreased']:
  372. side = details['side'].upper()
  373. old_size = details['old_size']
  374. new_size = details['new_size']
  375. size_diff = details['size_diff']
  376. emoji = "📈" if change_type == 'increased' else "📉"
  377. message = f"""{emoji} <b>Position {change_type.title()}</b>
  378. 📊 <b>Details:</b>
  379. • Token: {token}
  380. • Direction: {side}
  381. • Previous Size: {formatter.format_amount(old_size, token)}
  382. • New Size: {formatter.format_amount(new_size, token)}
  383. • Change: {formatter.format_amount(size_diff, token)}
  384. ⏰ <b>Time:</b> {time_str}
  385. 📈 Use /positions to view current status"""
  386. else:
  387. return
  388. await self.notification_manager.send_generic_notification(message.strip())
  389. logger.debug(f"📨 Sent {change_type} notification for {symbol}")
  390. except Exception as e:
  391. logger.error(f"❌ Error sending notification for {symbol}: {e}")