import asyncio import logging from typing import Dict, List, Optional, Any from datetime import datetime, timezone, timedelta import sqlite3 from ..clients.hyperliquid_client import HyperliquidClient from ..notifications.notification_manager import NotificationManager logger = logging.getLogger(__name__) class PendingOrdersManager: """ Manages pending stop loss orders from /long commands with sl: parameter. Places stop loss orders when positions are opened. """ def __init__(self, hl_client: HyperliquidClient, notification_manager: NotificationManager): self.hl_client = hl_client self.notification_manager = notification_manager self.db_path = "data/pending_orders.db" self.is_running = False # Initialize database self._init_database() def _init_database(self): """Initialize pending orders database""" try: with sqlite3.connect(self.db_path) as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS pending_stop_loss ( id INTEGER PRIMARY KEY AUTOINCREMENT, symbol TEXT NOT NULL, stop_price REAL NOT NULL, size REAL NOT NULL, side TEXT NOT NULL, created_at TEXT NOT NULL, expires_at TEXT, status TEXT DEFAULT 'pending', order_id TEXT, placed_at TEXT ) """) conn.commit() logger.info("Pending orders database initialized") except Exception as e: logger.error(f"Error initializing pending orders database: {e}") async def start(self): """Start pending orders manager""" if self.is_running: return self.is_running = True logger.info("Starting pending orders manager") # Start monitoring loop asyncio.create_task(self._monitoring_loop()) async def stop(self): """Stop pending orders manager""" self.is_running = False logger.info("Stopping pending orders manager") async def add_pending_stop_loss(self, symbol: str, stop_price: float, size: float, side: str, expires_hours: int = 24): """Add a pending stop loss order""" try: created_at = datetime.now(timezone.utc) expires_at = created_at + timedelta(hours=expires_hours) with sqlite3.connect(self.db_path) as conn: conn.execute(""" INSERT INTO pending_stop_loss (symbol, stop_price, size, side, created_at, expires_at) VALUES (?, ?, ?, ?, ?, ?) """, (symbol, stop_price, size, side, created_at.isoformat(), expires_at.isoformat())) conn.commit() logger.info(f"Added pending stop loss: {symbol} {side} {size} @ ${stop_price}") message = ( f"⏳ Pending Stop Loss Added\n" f"Token: {symbol}\n" f"Side: {side}\n" f"Size: {size}\n" f"Stop Price: ${stop_price:.4f}\n" f"Expires: {expires_hours}h" ) await self.notification_manager.send_notification(message) except Exception as e: logger.error(f"Error adding pending stop loss: {e}") async def _monitoring_loop(self): """Main monitoring loop""" while self.is_running: try: await self._check_pending_orders() await self._cleanup_expired_orders() await asyncio.sleep(5) # Check every 5 seconds except Exception as e: logger.error(f"Error in pending orders monitoring loop: {e}") await asyncio.sleep(10) async def _check_pending_orders(self): """Check if any pending orders should be placed""" try: # Get current positions positions = self.hl_client.get_positions() if not positions: return current_positions = {} for position in positions: size = float(position.get('szi', '0')) if size != 0: symbol = position.get('coin', '') if symbol: current_positions[symbol] = { 'size': size, 'side': 'long' if size > 0 else 'short' } # Check pending orders against current positions with sqlite3.connect(self.db_path) as conn: cursor = conn.execute(""" SELECT id, symbol, stop_price, size, side FROM pending_stop_loss WHERE status = 'pending' """) pending_orders = cursor.fetchall() for order_id, symbol, stop_price, size, side in pending_orders: if symbol in current_positions: current_pos = current_positions[symbol] # Check if position matches pending order if (current_pos['side'] == side.lower() and abs(current_pos['size']) >= abs(size) * 0.95): # Allow 5% tolerance await self._place_stop_loss_order(order_id, symbol, stop_price, current_pos['size'], side) except Exception as e: logger.error(f"Error checking pending orders: {e}") async def _place_stop_loss_order(self, pending_id: int, symbol: str, stop_price: float, position_size: float, side: str): """Place stop loss order on exchange""" try: # Determine stop loss side (opposite of position) sl_side = 'sell' if side.lower() == 'long' else 'buy' sl_size = abs(position_size) # Place stop loss order order_result = await self.hl_client.place_order( symbol=symbol, side=sl_side, size=sl_size, order_type='stop', stop_px=stop_price, reduce_only=True ) if order_result and 'response' in order_result: response = order_result['response'] if response.get('type') == 'order' and response.get('data', {}).get('statuses', [{}])[0].get('filled'): # Order placed successfully order_id = str(response.get('data', {}).get('statuses', [{}])[0].get('resting', {}).get('oid', '')) # Update database with sqlite3.connect(self.db_path) as conn: conn.execute(""" UPDATE pending_stop_loss SET status = 'placed', order_id = ?, placed_at = ? WHERE id = ? """, (order_id, datetime.now(timezone.utc).isoformat(), pending_id)) conn.commit() message = ( f"✅ Stop Loss Placed\n" f"Token: {symbol}\n" f"Size: {sl_size:.4f}\n" f"Stop Price: ${stop_price:.4f}\n" f"Order ID: {order_id}" ) await self.notification_manager.send_notification(message) logger.info(f"Placed stop loss: {symbol} {sl_size} @ ${stop_price}") else: logger.error(f"Failed to place stop loss for {symbol}: {response}") except Exception as e: logger.error(f"Error placing stop loss order for {symbol}: {e}") async def _cleanup_expired_orders(self): """Remove expired pending orders""" try: current_time = datetime.now(timezone.utc) with sqlite3.connect(self.db_path) as conn: # Get expired orders cursor = conn.execute(""" SELECT id, symbol, stop_price FROM pending_stop_loss WHERE status = 'pending' AND expires_at < ? """, (current_time.isoformat(),)) expired_orders = cursor.fetchall() if expired_orders: # Mark as expired conn.execute(""" UPDATE pending_stop_loss SET status = 'expired' WHERE status = 'pending' AND expires_at < ? """, (current_time.isoformat(),)) conn.commit() for order_id, symbol, stop_price in expired_orders: logger.info(f"Expired pending stop loss: {symbol} @ ${stop_price}") except Exception as e: logger.error(f"Error cleaning up expired orders: {e}") async def get_pending_orders(self) -> List[Dict]: """Get all pending orders""" try: with sqlite3.connect(self.db_path) as conn: cursor = conn.execute(""" SELECT symbol, stop_price, size, side, created_at, expires_at, status FROM pending_stop_loss ORDER BY created_at DESC """) orders = cursor.fetchall() return [ { 'symbol': row[0], 'stop_price': row[1], 'size': row[2], 'side': row[3], 'created_at': row[4], 'expires_at': row[5], 'status': row[6] } for row in orders ] except Exception as e: logger.error(f"Error getting pending orders: {e}") return [] async def cancel_pending_order(self, symbol: str) -> bool: """Cancel pending order for symbol""" try: with sqlite3.connect(self.db_path) as conn: cursor = conn.execute(""" SELECT id FROM pending_stop_loss WHERE symbol = ? AND status = 'pending' """, (symbol,)) order = cursor.fetchone() if order: conn.execute(""" UPDATE pending_stop_loss SET status = 'cancelled' WHERE id = ? """, (order[0],)) conn.commit() logger.info(f"Cancelled pending stop loss for {symbol}") return True return False except Exception as e: logger.error(f"Error cancelling pending order for {symbol}: {e}") return False