pending_orders_manager.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. import asyncio
  2. import logging
  3. from typing import Dict, List, Optional, Any
  4. from datetime import datetime, timezone, timedelta
  5. import sqlite3
  6. from ..clients.hyperliquid_client import HyperliquidClient
  7. from ..notifications.notification_manager import NotificationManager
  8. logger = logging.getLogger(__name__)
  9. class PendingOrdersManager:
  10. """
  11. Manages pending stop loss orders from /long commands with sl: parameter.
  12. Places stop loss orders when positions are opened.
  13. """
  14. def __init__(self, hl_client: HyperliquidClient, notification_manager: NotificationManager):
  15. self.hl_client = hl_client
  16. self.notification_manager = notification_manager
  17. self.db_path = "data/pending_orders.db"
  18. self.is_running = False
  19. # Initialize database
  20. self._init_database()
  21. def _init_database(self):
  22. """Initialize pending orders database"""
  23. try:
  24. with sqlite3.connect(self.db_path) as conn:
  25. conn.execute("""
  26. CREATE TABLE IF NOT EXISTS pending_stop_loss (
  27. id INTEGER PRIMARY KEY AUTOINCREMENT,
  28. symbol TEXT NOT NULL,
  29. stop_price REAL NOT NULL,
  30. size REAL NOT NULL,
  31. side TEXT NOT NULL,
  32. created_at TEXT NOT NULL,
  33. expires_at TEXT,
  34. status TEXT DEFAULT 'pending',
  35. order_id TEXT,
  36. placed_at TEXT
  37. )
  38. """)
  39. conn.commit()
  40. logger.info("Pending orders database initialized")
  41. except Exception as e:
  42. logger.error(f"Error initializing pending orders database: {e}")
  43. async def start(self):
  44. """Start pending orders manager"""
  45. if self.is_running:
  46. return
  47. self.is_running = True
  48. logger.info("Starting pending orders manager")
  49. # Start monitoring loop
  50. asyncio.create_task(self._monitoring_loop())
  51. async def stop(self):
  52. """Stop pending orders manager"""
  53. self.is_running = False
  54. logger.info("Stopping pending orders manager")
  55. async def add_pending_stop_loss(self, symbol: str, stop_price: float, size: float, side: str, expires_hours: int = 24):
  56. """Add a pending stop loss order"""
  57. try:
  58. created_at = datetime.now(timezone.utc)
  59. expires_at = created_at + timedelta(hours=expires_hours)
  60. with sqlite3.connect(self.db_path) as conn:
  61. conn.execute("""
  62. INSERT INTO pending_stop_loss
  63. (symbol, stop_price, size, side, created_at, expires_at)
  64. VALUES (?, ?, ?, ?, ?, ?)
  65. """, (symbol, stop_price, size, side, created_at.isoformat(), expires_at.isoformat()))
  66. conn.commit()
  67. logger.info(f"Added pending stop loss: {symbol} {side} {size} @ ${stop_price}")
  68. message = (
  69. f"⏳ Pending Stop Loss Added\n"
  70. f"Token: {symbol}\n"
  71. f"Side: {side}\n"
  72. f"Size: {size}\n"
  73. f"Stop Price: ${stop_price:.4f}\n"
  74. f"Expires: {expires_hours}h"
  75. )
  76. await self.notification_manager.send_notification(message)
  77. except Exception as e:
  78. logger.error(f"Error adding pending stop loss: {e}")
  79. async def _monitoring_loop(self):
  80. """Main monitoring loop"""
  81. while self.is_running:
  82. try:
  83. await self._check_pending_orders()
  84. await self._cleanup_expired_orders()
  85. await asyncio.sleep(5) # Check every 5 seconds
  86. except Exception as e:
  87. logger.error(f"Error in pending orders monitoring loop: {e}")
  88. await asyncio.sleep(10)
  89. async def _check_pending_orders(self):
  90. """Check if any pending orders should be placed"""
  91. try:
  92. # Get current positions
  93. positions = self.hl_client.get_positions()
  94. if not positions:
  95. return
  96. current_positions = {}
  97. for position in positions:
  98. size = float(position.get('szi', '0'))
  99. if size != 0:
  100. symbol = position.get('coin', '')
  101. if symbol:
  102. current_positions[symbol] = {
  103. 'size': size,
  104. 'side': 'long' if size > 0 else 'short'
  105. }
  106. # Check pending orders against current positions
  107. with sqlite3.connect(self.db_path) as conn:
  108. cursor = conn.execute("""
  109. SELECT id, symbol, stop_price, size, side
  110. FROM pending_stop_loss
  111. WHERE status = 'pending'
  112. """)
  113. pending_orders = cursor.fetchall()
  114. for order_id, symbol, stop_price, size, side in pending_orders:
  115. if symbol in current_positions:
  116. current_pos = current_positions[symbol]
  117. # Check if position matches pending order
  118. if (current_pos['side'] == side.lower() and
  119. abs(current_pos['size']) >= abs(size) * 0.95): # Allow 5% tolerance
  120. await self._place_stop_loss_order(order_id, symbol, stop_price,
  121. current_pos['size'], side)
  122. except Exception as e:
  123. logger.error(f"Error checking pending orders: {e}")
  124. async def _place_stop_loss_order(self, pending_id: int, symbol: str, stop_price: float,
  125. position_size: float, side: str):
  126. """Place stop loss order on exchange"""
  127. try:
  128. # Determine stop loss side (opposite of position)
  129. sl_side = 'sell' if side.lower() == 'long' else 'buy'
  130. sl_size = abs(position_size)
  131. # Place stop loss order
  132. order_result = await self.hl_client.place_order(
  133. symbol=symbol,
  134. side=sl_side,
  135. size=sl_size,
  136. order_type='stop',
  137. stop_px=stop_price,
  138. reduce_only=True
  139. )
  140. if order_result and 'response' in order_result:
  141. response = order_result['response']
  142. if response.get('type') == 'order' and response.get('data', {}).get('statuses', [{}])[0].get('filled'):
  143. # Order placed successfully
  144. order_id = str(response.get('data', {}).get('statuses', [{}])[0].get('resting', {}).get('oid', ''))
  145. # Update database
  146. with sqlite3.connect(self.db_path) as conn:
  147. conn.execute("""
  148. UPDATE pending_stop_loss
  149. SET status = 'placed', order_id = ?, placed_at = ?
  150. WHERE id = ?
  151. """, (order_id, datetime.now(timezone.utc).isoformat(), pending_id))
  152. conn.commit()
  153. message = (
  154. f"✅ Stop Loss Placed\n"
  155. f"Token: {symbol}\n"
  156. f"Size: {sl_size:.4f}\n"
  157. f"Stop Price: ${stop_price:.4f}\n"
  158. f"Order ID: {order_id}"
  159. )
  160. await self.notification_manager.send_notification(message)
  161. logger.info(f"Placed stop loss: {symbol} {sl_size} @ ${stop_price}")
  162. else:
  163. logger.error(f"Failed to place stop loss for {symbol}: {response}")
  164. except Exception as e:
  165. logger.error(f"Error placing stop loss order for {symbol}: {e}")
  166. async def _cleanup_expired_orders(self):
  167. """Remove expired pending orders"""
  168. try:
  169. current_time = datetime.now(timezone.utc)
  170. with sqlite3.connect(self.db_path) as conn:
  171. # Get expired orders
  172. cursor = conn.execute("""
  173. SELECT id, symbol, stop_price FROM pending_stop_loss
  174. WHERE status = 'pending' AND expires_at < ?
  175. """, (current_time.isoformat(),))
  176. expired_orders = cursor.fetchall()
  177. if expired_orders:
  178. # Mark as expired
  179. conn.execute("""
  180. UPDATE pending_stop_loss
  181. SET status = 'expired'
  182. WHERE status = 'pending' AND expires_at < ?
  183. """, (current_time.isoformat(),))
  184. conn.commit()
  185. for order_id, symbol, stop_price in expired_orders:
  186. logger.info(f"Expired pending stop loss: {symbol} @ ${stop_price}")
  187. except Exception as e:
  188. logger.error(f"Error cleaning up expired orders: {e}")
  189. async def get_pending_orders(self) -> List[Dict]:
  190. """Get all pending orders"""
  191. try:
  192. with sqlite3.connect(self.db_path) as conn:
  193. cursor = conn.execute("""
  194. SELECT symbol, stop_price, size, side, created_at, expires_at, status
  195. FROM pending_stop_loss
  196. ORDER BY created_at DESC
  197. """)
  198. orders = cursor.fetchall()
  199. return [
  200. {
  201. 'symbol': row[0],
  202. 'stop_price': row[1],
  203. 'size': row[2],
  204. 'side': row[3],
  205. 'created_at': row[4],
  206. 'expires_at': row[5],
  207. 'status': row[6]
  208. }
  209. for row in orders
  210. ]
  211. except Exception as e:
  212. logger.error(f"Error getting pending orders: {e}")
  213. return []
  214. async def cancel_pending_order(self, symbol: str) -> bool:
  215. """Cancel pending order for symbol"""
  216. try:
  217. with sqlite3.connect(self.db_path) as conn:
  218. cursor = conn.execute("""
  219. SELECT id FROM pending_stop_loss
  220. WHERE symbol = ? AND status = 'pending'
  221. """, (symbol,))
  222. order = cursor.fetchone()
  223. if order:
  224. conn.execute("""
  225. UPDATE pending_stop_loss
  226. SET status = 'cancelled'
  227. WHERE id = ?
  228. """, (order[0],))
  229. conn.commit()
  230. logger.info(f"Cancelled pending stop loss for {symbol}")
  231. return True
  232. return False
  233. except Exception as e:
  234. logger.error(f"Error cancelling pending order for {symbol}: {e}")
  235. return False