position_tracker.py 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648
  1. import asyncio
  2. import logging
  3. import uuid
  4. from typing import Dict, List, Optional, Any
  5. from datetime import datetime, timezone
  6. from ..clients.hyperliquid_client import HyperliquidClient
  7. from ..notifications.notification_manager import NotificationManager
  8. from ..config.config import Config
  9. logger = logging.getLogger(__name__)
  10. class PositionTracker:
  11. """
  12. Simplified position tracker that mirrors exchange state.
  13. Monitors for position changes and saves stats when positions close.
  14. """
  15. def __init__(self, hl_client: HyperliquidClient, notification_manager: NotificationManager):
  16. self.hl_client = hl_client
  17. self.notification_manager = notification_manager
  18. self.trading_stats = None # Will be lazy loaded
  19. # Track current positions
  20. self.current_positions: Dict[str, Dict] = {}
  21. self.is_running = False
  22. # Market data cache to prevent overfetching
  23. # Previously: Every 5s cycle fetched market data 2-4 times per symbol
  24. # Now: Market data is cached for 30s, reducing API calls by ~75%
  25. self._market_data_cache: Dict[str, Dict] = {} # symbol -> {data, timestamp}
  26. self._cache_ttl_seconds = 30 # Cache market data for 30 seconds
  27. async def start(self):
  28. """Start position tracking"""
  29. if self.is_running:
  30. return
  31. self.is_running = True
  32. logger.info("🔄 Starting position tracker")
  33. try:
  34. # Initialize current positions
  35. logger.info("📊 Initializing current positions...")
  36. await self._update_current_positions()
  37. logger.info(f"✅ Position tracker initialized with {len(self.current_positions)} open positions")
  38. # Start monitoring loop
  39. logger.info("🔄 Starting position monitoring loop...")
  40. asyncio.create_task(self._monitoring_loop())
  41. logger.info("✅ Position tracker started successfully")
  42. except Exception as e:
  43. logger.error(f"❌ Error starting position tracker: {e}", exc_info=True)
  44. self.is_running = False
  45. raise
  46. async def stop(self):
  47. """Stop position tracking"""
  48. self.is_running = False
  49. logger.info("Stopping position tracker")
  50. async def _monitoring_loop(self):
  51. """Main monitoring loop"""
  52. logger.info(f"🔄 Position tracker monitoring loop started (heartbeat: {Config.BOT_HEARTBEAT_SECONDS}s)")
  53. loop_count = 0
  54. while self.is_running:
  55. try:
  56. loop_count += 1
  57. logger.debug(f"📊 Position tracker loop #{loop_count} - checking for position changes...")
  58. await self._check_position_changes()
  59. # Clean stale cache entries every 12 loops (60 seconds with 5s heartbeat)
  60. if loop_count % 12 == 0:
  61. self._clear_stale_cache_entries()
  62. logger.info(f"📊 Position tracker alive - loop #{loop_count}, {len(self.current_positions)} positions tracked, {len(self._market_data_cache)} cached symbols")
  63. await asyncio.sleep(Config.BOT_HEARTBEAT_SECONDS) # Use config heartbeat
  64. except Exception as e:
  65. logger.error(f"❌ Error in position tracking loop #{loop_count}: {e}", exc_info=True)
  66. await asyncio.sleep(Config.BOT_HEARTBEAT_SECONDS)
  67. logger.info("🛑 Position tracker monitoring loop stopped")
  68. async def _check_position_changes(self):
  69. """Check for any position changes"""
  70. try:
  71. previous_positions = self.current_positions.copy()
  72. await self._update_current_positions()
  73. # Compare with previous positions (simple exchange state tracking)
  74. await self._process_position_changes(previous_positions, self.current_positions)
  75. # Simple database sync check (once per cycle)
  76. await self._sync_database_once()
  77. # Update database with current market data for open positions
  78. await self._update_database_market_data()
  79. except Exception as e:
  80. logger.error(f"Error checking position changes: {e}")
  81. async def _sync_database_once(self):
  82. """Simple bidirectional sync: close database positions that don't exist on exchange,
  83. and create database records for exchange positions that don't exist in database"""
  84. try:
  85. if self.trading_stats is None:
  86. from ..stats.trading_stats import TradingStats
  87. self.trading_stats = TradingStats()
  88. open_trades = self.trading_stats.get_open_positions()
  89. # PART 1: Close database positions that don't exist on exchange
  90. for trade in open_trades:
  91. symbol = trade.get('symbol', '')
  92. if not symbol:
  93. continue
  94. token = symbol.split('/')[0] if '/' in symbol else symbol
  95. # If database position doesn't exist on exchange, close it
  96. if token not in self.current_positions:
  97. # Create simulated position object from database data
  98. entry_price = float(trade.get('entry_price', 0))
  99. amount = float(trade.get('amount', 0))
  100. side = trade.get('side', '').lower()
  101. simulated_position = {
  102. 'size': -amount if side == 'sell' else amount, # sell=short(negative), buy=long(positive)
  103. 'entry_px': entry_price,
  104. 'unrealized_pnl': 0, # Will be calculated
  105. 'margin_used': 0,
  106. 'max_leverage': 1,
  107. 'current_leverage': 1,
  108. 'return_on_equity': 0
  109. }
  110. # Reuse existing position closed handler - consistent behavior!
  111. await self._handle_position_closed(token, simulated_position)
  112. # PART 2: Create database records for exchange positions that don't exist in database
  113. # Get current open trades after potential closures above
  114. current_open_trades = self.trading_stats.get_open_positions()
  115. database_tokens = {trade.get('symbol', '').split('/')[0] for trade in current_open_trades if trade.get('symbol')}
  116. for token, position_data in self.current_positions.items():
  117. if token not in database_tokens:
  118. logger.info(f"🔄 Found exchange position for {token} with no database record - creating trade record")
  119. # Create new trade record using existing process_trade_complete_cycle method
  120. # but we'll need to handle this differently since we don't have entry/exit
  121. # Instead, we'll create a manual position record
  122. full_symbol = f"{token}/USDC:USDC"
  123. size = abs(position_data['size'])
  124. side = 'sell' if position_data['size'] < 0 else 'buy' # sell=short, buy=long
  125. entry_price = position_data['entry_px']
  126. # Create a trade lifecycle record for this existing position
  127. lifecycle_id = str(uuid.uuid4())
  128. timestamp = datetime.now(timezone.utc).isoformat()
  129. # Insert into trades table
  130. query = """
  131. INSERT INTO trades (
  132. trade_lifecycle_id, symbol, side, amount, price, value,
  133. entry_price, current_position_size, position_side, status,
  134. position_opened_at, timestamp, updated_at, trade_type
  135. ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
  136. """
  137. position_side = 'short' if side == 'sell' else 'long'
  138. value = size * entry_price
  139. params = (
  140. lifecycle_id, full_symbol, side, size, entry_price, value,
  141. entry_price, size, position_side, 'position_opened',
  142. timestamp, timestamp, timestamp, 'sync_detected'
  143. )
  144. self.trading_stats.db_manager._execute_query(query, params)
  145. logger.info(f"✅ Created database record for {token} position: {side} {size} @ ${entry_price}")
  146. except Exception as e:
  147. logger.error(f"Error syncing database: {e}")
  148. import traceback
  149. traceback.print_exc()
  150. async def _update_database_market_data(self):
  151. """Update database with current market data for open positions"""
  152. try:
  153. # Lazy load TradingStats if needed
  154. if self.trading_stats is None:
  155. from ..stats.trading_stats import TradingStats
  156. self.trading_stats = TradingStats()
  157. # Get open trades from database
  158. open_trades = self.trading_stats.get_open_positions()
  159. for trade in open_trades:
  160. try:
  161. symbol = trade.get('symbol', '')
  162. if not symbol:
  163. continue
  164. # Extract token from symbol (e.g., "BTC/USDC:USDC" -> "BTC")
  165. token = symbol.split('/')[0] if '/' in symbol else symbol
  166. # Find corresponding exchange position
  167. if token in self.current_positions:
  168. pos_data = self.current_positions[token]
  169. # Convert exchange ROE from decimal to percentage
  170. roe_percentage = pos_data['return_on_equity'] * 100
  171. # Get current leverage from database to compare
  172. old_leverage = trade.get('leverage', 0)
  173. new_leverage = pos_data['current_leverage']
  174. # Get current market price for mark price and position value calculation
  175. current_mark_price = 0.0
  176. try:
  177. market_data = self._get_cached_market_data(symbol)
  178. if market_data and market_data.get('ticker'):
  179. current_mark_price = float(market_data['ticker'].get('last', 0))
  180. except Exception as e:
  181. logger.debug(f"Could not fetch current market price for {symbol}: {e}")
  182. # Fallback to entry price if we can't get current market price
  183. if current_mark_price <= 0:
  184. current_mark_price = pos_data['entry_px']
  185. # Calculate position value (size * current price)
  186. position_size = abs(pos_data['size'])
  187. position_value = position_size * current_mark_price
  188. # Update database with live market data including position value
  189. self.trading_stats.update_trade_market_data(
  190. trade_lifecycle_id=trade['trade_lifecycle_id'],
  191. current_position_size=position_size,
  192. unrealized_pnl=pos_data['unrealized_pnl'],
  193. roe_percentage=roe_percentage,
  194. mark_price=current_mark_price,
  195. position_value=position_value,
  196. margin_used=pos_data['margin_used'],
  197. leverage=new_leverage # Use current leverage, not max leverage
  198. )
  199. # Log leverage changes
  200. if old_leverage and abs(old_leverage - new_leverage) > 0.1:
  201. logger.info(f"📊 Database updated - Leverage changed for {symbol}: {old_leverage:.1f}x → {new_leverage:.1f}x, "
  202. f"Position Value: ${position_value:,.2f}")
  203. else:
  204. logger.debug(f"Updated market data for {symbol}: leverage={new_leverage:.1f}x, ROE={roe_percentage:.2f}%, "
  205. f"mark_price=${current_mark_price:.4f}, value=${position_value:,.2f}")
  206. except Exception as e:
  207. logger.warning(f"Error updating market data for trade {trade.get('trade_lifecycle_id', 'unknown')}: {e}")
  208. continue
  209. except Exception as e:
  210. logger.error(f"Error updating database market data: {e}")
  211. async def _update_current_positions(self):
  212. """Update current positions from exchange"""
  213. try:
  214. logger.debug("🔍 Fetching positions from Hyperliquid client...")
  215. positions = self.hl_client.get_positions()
  216. # Distinguish between API failure (None) and legitimate empty positions ([])
  217. if positions is None:
  218. logger.warning("📊 API failure - could not fetch positions from exchange!")
  219. # Don't clear positions during API failures - keep last known state to avoid false "position opened" notifications
  220. if not self.current_positions:
  221. # Only clear if we truly have no tracked positions (e.g., first startup)
  222. self.current_positions = {}
  223. else:
  224. logger.info(f"📊 Keeping last known positions during API failure: {list(self.current_positions.keys())}")
  225. return
  226. elif not positions: # Empty list [] - legitimately no positions
  227. logger.info("📊 No open positions on exchange - clearing position tracker state")
  228. self.current_positions = {}
  229. return
  230. logger.info(f"📊 Raw positions data from exchange: {len(positions)} positions")
  231. # Log first position structure for debugging
  232. #if positions:
  233. # logger.info(f"📊 Sample position structure: {positions[0]}")
  234. logger.debug(f"📊 Processing {len(positions)} positions from exchange...")
  235. new_positions = {}
  236. for i, position in enumerate(positions):
  237. logger.debug(f"📊 Processing position {i+1}: {position}")
  238. # Access nested position data from info.position
  239. position_data = position.get('info', {}).get('position', {})
  240. if not position_data:
  241. logger.warning(f"📊 Position {i+1} has no info.position data: {position}")
  242. continue
  243. size = float(position_data.get('szi', '0'))
  244. logger.debug(f"📊 Position {i+1} size: {size}")
  245. if size != 0: # Only include open positions
  246. symbol = position_data.get('coin', '')
  247. if symbol:
  248. # Get actual current leverage from leverage object
  249. leverage_info = position_data.get('leverage', {})
  250. if isinstance(leverage_info, dict) and 'value' in leverage_info:
  251. current_leverage = float(leverage_info['value'])
  252. logger.debug(f"Using current leverage {current_leverage}x for {symbol} (max: {position_data.get('maxLeverage', 'N/A')}x)")
  253. else:
  254. current_leverage = float(position_data.get('maxLeverage', '1'))
  255. logger.debug(f"Fallback to max leverage {current_leverage}x for {symbol} (no current leverage data)")
  256. new_positions[symbol] = {
  257. 'size': size,
  258. 'entry_px': float(position_data.get('entryPx', '0')),
  259. 'unrealized_pnl': float(position_data.get('unrealizedPnl', '0')),
  260. 'margin_used': float(position_data.get('marginUsed', '0')),
  261. 'max_leverage': float(position_data.get('maxLeverage', '1')),
  262. 'current_leverage': current_leverage, # Add current leverage
  263. 'return_on_equity': float(position_data.get('returnOnEquity', '0'))
  264. }
  265. # Log position state changes
  266. had_positions_before = len(self.current_positions) > 0
  267. getting_positions_now = len(new_positions) > 0
  268. if had_positions_before and not getting_positions_now:
  269. logger.info("📊 All positions have been closed on exchange")
  270. elif not had_positions_before and getting_positions_now:
  271. logger.info(f"📊 New positions detected: {list(new_positions.keys())}")
  272. elif had_positions_before and getting_positions_now:
  273. logger.debug(f"✅ Updated current positions: {len(new_positions)} open positions ({list(new_positions.keys())})")
  274. else:
  275. logger.debug(f"✅ Confirmed no open positions on exchange")
  276. self.current_positions = new_positions
  277. except Exception as e:
  278. logger.error(f"❌ Error updating current positions: {e}", exc_info=True)
  279. # Don't clear positions on exception - keep last known state to avoid false notifications
  280. logger.info(f"📊 Keeping last known positions during exception: {list(self.current_positions.keys()) if self.current_positions else 'none'}")
  281. async def _process_position_changes(self, previous: Dict, current: Dict):
  282. """Process changes between previous and current positions"""
  283. # Find new positions
  284. for symbol in current:
  285. if symbol not in previous:
  286. await self._handle_position_opened(symbol, current[symbol])
  287. # Find closed positions
  288. for symbol in previous:
  289. if symbol not in current:
  290. await self._handle_position_closed(symbol, previous[symbol])
  291. # Find changed positions
  292. for symbol in current:
  293. if symbol in previous:
  294. await self._handle_position_changed(symbol, previous[symbol], current[symbol])
  295. async def _handle_position_opened(self, symbol: str, position: Dict):
  296. """Handle new position opened"""
  297. try:
  298. size = position['size']
  299. side = "Long" if size > 0 else "Short"
  300. message = (
  301. f"🟢 Position Opened\n"
  302. f"Token: {symbol}\n"
  303. f"Side: {side}\n"
  304. f"Size: {abs(size):.4f}\n"
  305. f"Entry: ${position['entry_px']:.4f}\n"
  306. f"Leverage: {position.get('current_leverage', position['max_leverage']):.1f}x\n\n"
  307. f"💡 Use /positions to see current positions"
  308. )
  309. await self.notification_manager.send_generic_notification(message)
  310. logger.info(f"Position opened: {symbol} {side} {abs(size)}")
  311. except Exception as e:
  312. logger.error(f"Error handling position opened for {symbol}: {e}")
  313. async def _handle_position_closed(self, symbol: str, position: Dict):
  314. """Handle position closed - find and close the corresponding database trade"""
  315. try:
  316. # Lazy load TradingStats if needed
  317. if self.trading_stats is None:
  318. from ..stats.trading_stats import TradingStats
  319. self.trading_stats = TradingStats()
  320. # Construct full symbol format (symbol here is just token name like "BTC")
  321. full_symbol = f"{symbol}/USDC:USDC"
  322. # Find the open trade in database for this symbol
  323. open_trade = self.trading_stats.get_trade_by_symbol_and_status(full_symbol, 'position_opened')
  324. if not open_trade:
  325. logger.warning(f"No open trade found in database for {full_symbol} - position was closed on exchange but no database record")
  326. return
  327. lifecycle_id = open_trade['trade_lifecycle_id']
  328. entry_price = position['entry_px']
  329. size = abs(position['size'])
  330. side = "Long" if position['size'] > 0 else "Short"
  331. # Get current market price for exit calculation
  332. market_data = self._get_cached_market_data(full_symbol)
  333. if not market_data:
  334. logger.error(f"Could not get market data for {full_symbol}")
  335. return
  336. current_price = float(market_data.get('ticker', {}).get('last', 0))
  337. # Calculate realized PnL
  338. if side == "Long":
  339. realized_pnl = (current_price - entry_price) * size
  340. else:
  341. realized_pnl = (entry_price - current_price) * size
  342. # Close the trade in database
  343. success = await self.trading_stats.update_trade_position_closed(
  344. lifecycle_id=lifecycle_id,
  345. exit_price=current_price,
  346. realized_pnl=realized_pnl,
  347. exchange_fill_id="position_tracker_detected_closure"
  348. )
  349. if success:
  350. # Migrate to aggregated stats (token_stats table, etc.)
  351. self.trading_stats.migrate_trade_to_aggregated_stats(lifecycle_id)
  352. # Send clean notification
  353. pnl_emoji = "🟢" if realized_pnl >= 0 else "🔴"
  354. message = (
  355. f"{pnl_emoji} Position Closed\n"
  356. f"Token: {symbol}\n"
  357. f"Side: {side}\n"
  358. f"Size: {size:.4f}\n"
  359. f"Entry: ${entry_price:.4f}\n"
  360. f"Exit: ${current_price:.4f}\n"
  361. f"PnL: ${realized_pnl:.3f}\n\n"
  362. f"💡 Use /positions to see current positions"
  363. )
  364. await self.notification_manager.send_generic_notification(message)
  365. logger.info(f"Position closed: {symbol} {side} PnL: ${realized_pnl:.3f}")
  366. else:
  367. logger.error(f"Failed to close trade {lifecycle_id} for {symbol}")
  368. except Exception as e:
  369. logger.error(f"Error handling position closed for {symbol}: {e}")
  370. async def _handle_position_changed(self, symbol: str, previous: Dict, current: Dict):
  371. """Handle position size, direction, or leverage changes"""
  372. try:
  373. prev_size = previous['size']
  374. curr_size = current['size']
  375. prev_leverage = previous.get('current_leverage', 0)
  376. curr_leverage = current.get('current_leverage', 0)
  377. # Check if position reversed (long to short or vice versa)
  378. if (prev_size > 0 and curr_size < 0) or (prev_size < 0 and curr_size > 0):
  379. # Position reversed - close old, open new
  380. await self._handle_position_closed(symbol, previous)
  381. await self._handle_position_opened(symbol, current)
  382. return
  383. # Check if leverage changed
  384. if abs(prev_leverage - curr_leverage) > 0.1: # Threshold to avoid noise
  385. logger.info(f"📊 Leverage changed for {symbol}: {prev_leverage:.1f}x → {curr_leverage:.1f}x")
  386. # Optional: Send notification for significant leverage changes
  387. if abs(prev_leverage - curr_leverage) >= 1.0: # Only notify for changes >= 1x
  388. side = "Long" if curr_size > 0 else "Short"
  389. change_direction = "Increased" if curr_leverage > prev_leverage else "Decreased"
  390. message = (
  391. f"⚖️ Leverage {change_direction}\n"
  392. f"Token: {symbol}\n"
  393. f"Side: {side}\n"
  394. f"Leverage: {prev_leverage:.1f}x → {curr_leverage:.1f}x\n\n"
  395. f"💡 Use /positions to see current positions"
  396. )
  397. await self.notification_manager.send_generic_notification(message)
  398. # Check if position size changed significantly
  399. size_change = abs(curr_size) - abs(prev_size)
  400. # Get current market price for more accurate value calculation
  401. try:
  402. full_symbol = f"{symbol}/USDC:USDC"
  403. market_data = self._get_cached_market_data(full_symbol)
  404. current_market_price = float(market_data.get('ticker', {}).get('last', current['entry_px'])) if market_data else current['entry_px']
  405. except Exception:
  406. current_market_price = current['entry_px'] # Fallback to entry price
  407. # Calculate change value using current market price
  408. change_value = abs(size_change) * current_market_price
  409. # Get formatter to determine token category and appropriate thresholds
  410. try:
  411. from src.utils.token_display_formatter import get_formatter
  412. formatter = get_formatter()
  413. # Use the existing token classification system to determine threshold
  414. price_decimals = await formatter.get_token_price_decimal_places(symbol)
  415. amount_decimals = await formatter.get_token_amount_decimal_places(symbol)
  416. # Determine quantity threshold based on token characteristics
  417. # Higher precision tokens (like BTC, ETH) need smaller quantity thresholds
  418. if price_decimals <= 2: # Major tokens like BTC, ETH (high value)
  419. quantity_threshold = 0.0001
  420. elif price_decimals <= 4: # Mid-tier tokens
  421. quantity_threshold = 0.001
  422. else: # Lower-value tokens (meme coins, etc.)
  423. quantity_threshold = 0.01
  424. # Also set minimum value threshold based on token category
  425. min_value_threshold = 1.0 # Minimum $1 change for any token
  426. except Exception as e:
  427. logger.debug(f"Could not get token formatting info for {symbol}, using defaults: {e}")
  428. quantity_threshold = 0.001
  429. min_value_threshold = 1.0
  430. price_decimals = 4 # Default for fallback logging
  431. # Trigger notification if either:
  432. # 1. Quantity change exceeds token-specific threshold, OR
  433. # 2. Value change exceeds minimum value threshold
  434. should_notify = (abs(size_change) > quantity_threshold or
  435. change_value > min_value_threshold)
  436. if should_notify:
  437. change_type = "Increased" if size_change > 0 else "Decreased"
  438. side = "Long" if curr_size > 0 else "Short"
  439. # Use formatter for consistent display
  440. try:
  441. formatted_new_size = await formatter.format_amount(abs(curr_size), symbol)
  442. formatted_change = await formatter.format_amount(abs(size_change), symbol)
  443. formatted_value_change = await formatter.format_price_with_symbol(change_value)
  444. formatted_current_price = await formatter.format_price_with_symbol(current_market_price, symbol)
  445. except Exception:
  446. # Fallback formatting
  447. formatted_new_size = f"{abs(curr_size):.4f}"
  448. formatted_change = f"{abs(size_change):.4f}"
  449. formatted_value_change = f"${change_value:.2f}"
  450. formatted_current_price = f"${current_market_price:.4f}"
  451. message = (
  452. f"🔄 Position {change_type}\n"
  453. f"Token: {symbol}\n"
  454. f"Side: {side}\n"
  455. f"New Size: {formatted_new_size}\n"
  456. f"Change: {'+' if size_change > 0 else ''}{formatted_change}\n"
  457. f"Value Change: {formatted_value_change}\n"
  458. f"Current Price: {formatted_current_price}\n\n"
  459. f"💡 Use /positions to see current positions"
  460. )
  461. await self.notification_manager.send_generic_notification(message)
  462. logger.info(f"Position changed: {symbol} {change_type} by {size_change:.6f} (${change_value:.2f}) "
  463. f"threshold: {quantity_threshold} qty or ${min_value_threshold} value")
  464. else:
  465. # Log when changes don't meet threshold (debug level to avoid spam)
  466. logger.debug(f"Position size changed for {symbol} but below notification threshold: "
  467. f"{size_change:.6f} quantity (${change_value:.2f} value), "
  468. f"thresholds: {quantity_threshold} qty or ${min_value_threshold} value "
  469. f"(price_decimals: {price_decimals if 'price_decimals' in locals() else 'unknown'})")
  470. except Exception as e:
  471. logger.error(f"Error handling position change for {symbol}: {e}")
  472. async def _save_position_stats(self, symbol: str, side: str, size: float,
  473. entry_price: float, exit_price: float, pnl: float):
  474. """Save position statistics to database using existing TradingStats interface"""
  475. try:
  476. # Lazy load TradingStats to avoid circular imports
  477. if self.trading_stats is None:
  478. from ..stats.trading_stats import TradingStats
  479. self.trading_stats = TradingStats()
  480. # Use the existing process_trade_complete_cycle method
  481. lifecycle_id = self.trading_stats.process_trade_complete_cycle(
  482. symbol=symbol,
  483. side=side.lower(),
  484. entry_price=entry_price,
  485. exit_price=exit_price,
  486. amount=size,
  487. timestamp=datetime.now(timezone.utc).isoformat()
  488. )
  489. logger.info(f"Saved stats for {symbol}: PnL ${pnl:.3f}, lifecycle_id: {lifecycle_id}")
  490. except Exception as e:
  491. logger.error(f"Error saving position stats for {symbol}: {e}")
  492. def _get_cached_market_data(self, symbol: str) -> Optional[Dict[str, Any]]:
  493. """Get market data from cache if fresh, otherwise fetch and cache it."""
  494. now = datetime.now(timezone.utc)
  495. # Check if we have cached data for this symbol
  496. if symbol in self._market_data_cache:
  497. cached_entry = self._market_data_cache[symbol]
  498. cache_age = (now - cached_entry['timestamp']).total_seconds()
  499. # Return cached data if it's still fresh
  500. if cache_age < self._cache_ttl_seconds:
  501. logger.debug(f"📋 Using cached market data for {symbol} (age: {cache_age:.1f}s)")
  502. return cached_entry['data']
  503. else:
  504. logger.debug(f"🗑️ Cached data for {symbol} is stale (age: {cache_age:.1f}s), will fetch fresh data")
  505. # Fetch fresh market data
  506. logger.debug(f"🔄 Fetching fresh market data for {symbol}")
  507. market_data = self.hl_client.get_market_data(symbol)
  508. if market_data:
  509. # Cache the fresh data
  510. self._market_data_cache[symbol] = {
  511. 'data': market_data,
  512. 'timestamp': now
  513. }
  514. logger.debug(f"💾 Cached market data for {symbol}")
  515. return market_data
  516. def _clear_stale_cache_entries(self):
  517. """Remove stale entries from market data cache."""
  518. now = datetime.now(timezone.utc)
  519. stale_symbols = []
  520. for symbol, cached_entry in self._market_data_cache.items():
  521. cache_age = (now - cached_entry['timestamp']).total_seconds()
  522. if cache_age >= self._cache_ttl_seconds:
  523. stale_symbols.append(symbol)
  524. for symbol in stale_symbols:
  525. del self._market_data_cache[symbol]
  526. logger.debug(f"🗑️ Removed stale cache entry for {symbol}")
  527. if stale_symbols:
  528. logger.debug(f"🧹 Cleaned {len(stale_symbols)} stale cache entries")