123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278 |
- import asyncio
- import logging
- from typing import Dict, List, Optional, Any
- from datetime import datetime, timezone, timedelta
- import sqlite3
- from ..clients.hyperliquid_client import HyperliquidClient
- from ..notifications.notification_manager import NotificationManager
- logger = logging.getLogger(__name__)
- class PendingOrdersManager:
- """
- Manages pending stop loss orders from /long commands with sl: parameter.
- Places stop loss orders when positions are opened.
- """
-
- def __init__(self, hl_client: HyperliquidClient, notification_manager: NotificationManager):
- self.hl_client = hl_client
- self.notification_manager = notification_manager
- self.db_path = "data/pending_orders.db"
- self.is_running = False
-
- # Initialize database
- self._init_database()
-
- def _init_database(self):
- """Initialize pending orders database"""
- try:
- with sqlite3.connect(self.db_path) as conn:
- conn.execute("""
- CREATE TABLE IF NOT EXISTS pending_stop_loss (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- symbol TEXT NOT NULL,
- stop_price REAL NOT NULL,
- size REAL NOT NULL,
- side TEXT NOT NULL,
- created_at TEXT NOT NULL,
- expires_at TEXT,
- status TEXT DEFAULT 'pending',
- order_id TEXT,
- placed_at TEXT
- )
- """)
- conn.commit()
- logger.info("Pending orders database initialized")
- except Exception as e:
- logger.error(f"Error initializing pending orders database: {e}")
-
- async def start(self):
- """Start pending orders manager"""
- if self.is_running:
- return
-
- self.is_running = True
- logger.info("Starting pending orders manager")
-
- # Start monitoring loop
- asyncio.create_task(self._monitoring_loop())
-
- async def stop(self):
- """Stop pending orders manager"""
- self.is_running = False
- logger.info("Stopping pending orders manager")
-
- async def add_pending_stop_loss(self, symbol: str, stop_price: float, size: float, side: str, expires_hours: int = 24):
- """Add a pending stop loss order"""
- try:
- created_at = datetime.now(timezone.utc)
- expires_at = created_at + timedelta(hours=expires_hours)
-
- with sqlite3.connect(self.db_path) as conn:
- conn.execute("""
- INSERT INTO pending_stop_loss
- (symbol, stop_price, size, side, created_at, expires_at)
- VALUES (?, ?, ?, ?, ?, ?)
- """, (symbol, stop_price, size, side, created_at.isoformat(), expires_at.isoformat()))
- conn.commit()
-
- logger.info(f"Added pending stop loss: {symbol} {side} {size} @ ${stop_price}")
-
- message = (
- f"⏳ Pending Stop Loss Added\n"
- f"Token: {symbol}\n"
- f"Side: {side}\n"
- f"Size: {size}\n"
- f"Stop Price: ${stop_price:.4f}\n"
- f"Expires: {expires_hours}h"
- )
- await self.notification_manager.send_notification(message)
-
- except Exception as e:
- logger.error(f"Error adding pending stop loss: {e}")
-
- async def _monitoring_loop(self):
- """Main monitoring loop"""
- while self.is_running:
- try:
- await self._check_pending_orders()
- await self._cleanup_expired_orders()
- await asyncio.sleep(5) # Check every 5 seconds
- except Exception as e:
- logger.error(f"Error in pending orders monitoring loop: {e}")
- await asyncio.sleep(10)
-
- async def _check_pending_orders(self):
- """Check if any pending orders should be placed"""
- try:
- # Get current positions
- positions = self.hl_client.get_positions()
- if not positions:
- return
-
- current_positions = {}
- for position in positions:
- size = float(position.get('szi', '0'))
- if size != 0:
- symbol = position.get('coin', '')
- if symbol:
- current_positions[symbol] = {
- 'size': size,
- 'side': 'long' if size > 0 else 'short'
- }
-
- # Check pending orders against current positions
- with sqlite3.connect(self.db_path) as conn:
- cursor = conn.execute("""
- SELECT id, symbol, stop_price, size, side
- FROM pending_stop_loss
- WHERE status = 'pending'
- """)
- pending_orders = cursor.fetchall()
-
- for order_id, symbol, stop_price, size, side in pending_orders:
- if symbol in current_positions:
- current_pos = current_positions[symbol]
-
- # Check if position matches pending order
- if (current_pos['side'] == side.lower() and
- abs(current_pos['size']) >= abs(size) * 0.95): # Allow 5% tolerance
-
- await self._place_stop_loss_order(order_id, symbol, stop_price,
- current_pos['size'], side)
-
- except Exception as e:
- logger.error(f"Error checking pending orders: {e}")
-
- async def _place_stop_loss_order(self, pending_id: int, symbol: str, stop_price: float,
- position_size: float, side: str):
- """Place stop loss order on exchange"""
- try:
- # Determine stop loss side (opposite of position)
- sl_side = 'sell' if side.lower() == 'long' else 'buy'
- sl_size = abs(position_size)
-
- # Place stop loss order
- order_result = await self.hl_client.place_order(
- symbol=symbol,
- side=sl_side,
- size=sl_size,
- order_type='stop',
- stop_px=stop_price,
- reduce_only=True
- )
-
- if order_result and 'response' in order_result:
- response = order_result['response']
- if response.get('type') == 'order' and response.get('data', {}).get('statuses', [{}])[0].get('filled'):
- # Order placed successfully
- order_id = str(response.get('data', {}).get('statuses', [{}])[0].get('resting', {}).get('oid', ''))
-
- # Update database
- with sqlite3.connect(self.db_path) as conn:
- conn.execute("""
- UPDATE pending_stop_loss
- SET status = 'placed', order_id = ?, placed_at = ?
- WHERE id = ?
- """, (order_id, datetime.now(timezone.utc).isoformat(), pending_id))
- conn.commit()
-
- message = (
- f"✅ Stop Loss Placed\n"
- f"Token: {symbol}\n"
- f"Size: {sl_size:.4f}\n"
- f"Stop Price: ${stop_price:.4f}\n"
- f"Order ID: {order_id}"
- )
- await self.notification_manager.send_notification(message)
- logger.info(f"Placed stop loss: {symbol} {sl_size} @ ${stop_price}")
-
- else:
- logger.error(f"Failed to place stop loss for {symbol}: {response}")
-
- except Exception as e:
- logger.error(f"Error placing stop loss order for {symbol}: {e}")
-
- async def _cleanup_expired_orders(self):
- """Remove expired pending orders"""
- try:
- current_time = datetime.now(timezone.utc)
-
- with sqlite3.connect(self.db_path) as conn:
- # Get expired orders
- cursor = conn.execute("""
- SELECT id, symbol, stop_price FROM pending_stop_loss
- WHERE status = 'pending' AND expires_at < ?
- """, (current_time.isoformat(),))
- expired_orders = cursor.fetchall()
-
- if expired_orders:
- # Mark as expired
- conn.execute("""
- UPDATE pending_stop_loss
- SET status = 'expired'
- WHERE status = 'pending' AND expires_at < ?
- """, (current_time.isoformat(),))
- conn.commit()
-
- for order_id, symbol, stop_price in expired_orders:
- logger.info(f"Expired pending stop loss: {symbol} @ ${stop_price}")
-
- except Exception as e:
- logger.error(f"Error cleaning up expired orders: {e}")
-
- async def get_pending_orders(self) -> List[Dict]:
- """Get all pending orders"""
- try:
- with sqlite3.connect(self.db_path) as conn:
- cursor = conn.execute("""
- SELECT symbol, stop_price, size, side, created_at, expires_at, status
- FROM pending_stop_loss
- ORDER BY created_at DESC
- """)
- orders = cursor.fetchall()
-
- return [
- {
- 'symbol': row[0],
- 'stop_price': row[1],
- 'size': row[2],
- 'side': row[3],
- 'created_at': row[4],
- 'expires_at': row[5],
- 'status': row[6]
- }
- for row in orders
- ]
-
- except Exception as e:
- logger.error(f"Error getting pending orders: {e}")
- return []
-
- async def cancel_pending_order(self, symbol: str) -> bool:
- """Cancel pending order for symbol"""
- try:
- with sqlite3.connect(self.db_path) as conn:
- cursor = conn.execute("""
- SELECT id FROM pending_stop_loss
- WHERE symbol = ? AND status = 'pending'
- """, (symbol,))
- order = cursor.fetchone()
-
- if order:
- conn.execute("""
- UPDATE pending_stop_loss
- SET status = 'cancelled'
- WHERE id = ?
- """, (order[0],))
- conn.commit()
-
- logger.info(f"Cancelled pending stop loss for {symbol}")
- return True
-
- return False
-
- except Exception as e:
- logger.error(f"Error cancelling pending order for {symbol}: {e}")
- return False
|