#!/usr/bin/env python3 """ Telegram Bot for Hyperliquid Trading This module provides a Telegram interface for manual Hyperliquid trading with comprehensive statistics tracking and phone-friendly controls. """ import logging import asyncio import re from datetime import datetime, timedelta from typing import Optional, Dict, Any from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup from telegram.ext import Application, CommandHandler, CallbackQueryHandler, ContextTypes, MessageHandler, filters from hyperliquid_client import HyperliquidClient from trading_stats import TradingStats from config import Config from alarm_manager import AlarmManager from logging_config import setup_logging, cleanup_logs, format_log_stats # Set up logging using the new configuration system logger = setup_logging().getChild(__name__) class TelegramTradingBot: """Telegram bot for manual trading with comprehensive statistics.""" def __init__(self): """Initialize the Telegram trading bot.""" self.client = HyperliquidClient(use_testnet=Config.HYPERLIQUID_TESTNET) self.stats = TradingStats() self.alarm_manager = AlarmManager() self.authorized_chat_id = Config.TELEGRAM_CHAT_ID self.application = None # Order monitoring self.monitoring_active = False self.last_known_orders = set() # Track order IDs we've seen self.last_known_positions = {} # Track position sizes for P&L calculation # External trade monitoring self.last_processed_trade_time = None # Track last processed external trade # Initialize stats with current balance self._initialize_stats() def _initialize_stats(self): """Initialize stats with current balance.""" try: balance = self.client.get_balance() if balance and balance.get('total'): # Get USDC balance as the main balance usdc_balance = float(balance['total'].get('USDC', 0)) self.stats.set_initial_balance(usdc_balance) except Exception as e: logger.error(f"Could not initialize stats: {e}") def is_authorized(self, chat_id: str) -> bool: """Check if the chat ID is authorized to use the bot.""" return str(chat_id) == str(self.authorized_chat_id) async def send_message(self, text: str, parse_mode: str = 'HTML') -> None: """Send a message to the authorized chat.""" if self.application and self.authorized_chat_id: try: await self.application.bot.send_message( chat_id=self.authorized_chat_id, text=text, parse_mode=parse_mode ) except Exception as e: logger.error(f"Failed to send message: {e}") async def start_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Handle the /start command.""" if not self.is_authorized(update.effective_chat.id): await update.message.reply_text("โŒ Unauthorized access.") return welcome_text = """ ๐Ÿค– Hyperliquid Manual Trading Bot Welcome to your personal trading assistant! Control your Hyperliquid account directly from your phone. ๐Ÿ“ฑ Quick Actions: Tap the buttons below for instant access to key functions. ๐Ÿ’ผ Account Commands: /balance - Account balance /positions - Open positions /orders - Open orders /stats - Trading statistics ๐Ÿ“Š Market Commands: /market - Market data (default token) /market SOL - Market data for SOL /price - Current price (default token) /price BTC - Price for BTC ๐Ÿš€ Perps Trading: โ€ข /long BTC 100 - Long BTC with $100 USDC (Market Order) โ€ข /long BTC 100 45000 - Long BTC with $100 USDC at $45,000 (Limit Order) โ€ข /short ETH 50 - Short ETH with $50 USDC (Market Order) โ€ข /short ETH 50 3500 - Short ETH with $50 USDC at $3,500 (Limit Order) โ€ข /exit BTC - Close BTC position with Market Order ๐Ÿ›ก๏ธ Risk Management: โ€ข /sl BTC 44000 - Set stop loss for BTC at $44,000 โ€ข /tp BTC 50000 - Set take profit for BTC at $50,000 ๐Ÿšจ Automatic Stop Loss: โ€ข Enabled: {risk_enabled} โ€ข Stop Loss: {stop_loss}% (automatic execution) โ€ข Monitoring: Every {heartbeat} seconds ๐Ÿ“‹ Order Management: โ€ข /orders - Show all open orders โ€ข /orders BTC - Show open orders for BTC only โ€ข /coo BTC - Cancel all open orders for BTC ๐Ÿ“ˆ Statistics & Analytics: /stats - Full trading statistics /performance - Performance metrics /risk - Risk analysis ๐Ÿ”” Price Alerts: โ€ข /alarm - List all alarms โ€ข /alarm BTC 50000 - Set alarm for BTC at $50,000 โ€ข /alarm BTC - Show BTC alarms only โ€ข /alarm 3 - Remove alarm ID 3 ๐Ÿ”„ Automatic Monitoring: โ€ข Real-time order fill alerts โ€ข Position opened/closed notifications โ€ข P&L calculations on trade closure โ€ข Price alarm triggers โ€ข External trade detection & sync โ€ข Auto stats synchronization โ€ข {heartbeat}-second monitoring interval ๐Ÿ“Š Universal Trade Tracking: โ€ข Bot trades: Full logging & notifications โ€ข Platform trades: Auto-detected & synced โ€ข Mobile app trades: Monitored & recorded โ€ข API trades: Tracked & included in stats Type /help for detailed command information. ๐Ÿ”„ Order Monitoring: โ€ข /monitoring - View monitoring status โ€ข /logs - View log file statistics and cleanup โš™๏ธ Configuration: โ€ข Symbol: {symbol} โ€ข Default Token: {symbol} โ€ข Network: {network} ๐Ÿ›ก๏ธ Safety Features: โ€ข All trades logged automatically โ€ข Comprehensive performance tracking โ€ข Real-time balance monitoring โ€ข Risk metrics calculation ๐Ÿ“ฑ Mobile Optimized: โ€ข Quick action buttons โ€ข Instant notifications โ€ข Clean, readable layout โ€ข One-tap commands For support, contact your bot administrator. """.format( symbol=Config.DEFAULT_TRADING_TOKEN, network="Testnet" if Config.HYPERLIQUID_TESTNET else "Mainnet", risk_enabled=Config.RISK_MANAGEMENT_ENABLED, stop_loss=Config.STOP_LOSS_PERCENTAGE, heartbeat=Config.BOT_HEARTBEAT_SECONDS ) keyboard = [ [ InlineKeyboardButton("๐Ÿ’ฐ Balance", callback_data="balance"), InlineKeyboardButton("๐Ÿ“Š Stats", callback_data="stats") ], [ InlineKeyboardButton("๐Ÿ“ˆ Positions", callback_data="positions"), InlineKeyboardButton("๐Ÿ“‹ Orders", callback_data="orders") ], [ InlineKeyboardButton("๐Ÿ’ต Price", callback_data="price"), InlineKeyboardButton("๐Ÿ“Š Market", callback_data="market") ], [ InlineKeyboardButton("๐Ÿ”„ Recent Trades", callback_data="trades"), InlineKeyboardButton("โš™๏ธ Help", callback_data="help") ] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text(welcome_text, parse_mode='HTML', reply_markup=reply_markup) async def help_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Handle the /help command.""" if not self.is_authorized(update.effective_chat.id): await update.message.reply_text("โŒ Unauthorized access.") return help_text = """ ๐Ÿ”ง Hyperliquid Trading Bot - Complete Guide ๐Ÿ’ผ Account Management: โ€ข /balance - Show account balance โ€ข /positions - Show open positions โ€ข /orders - Show open orders ๐Ÿ“Š Market Data: โ€ข /market - Detailed market data (default token) โ€ข /market BTC - Market data for specific token โ€ข /price - Quick price check (default token) โ€ข /price SOL - Price for specific token ๐Ÿš€ Perps Trading: โ€ข /long BTC 100 - Long BTC with $100 USDC (Market Order) โ€ข /long BTC 100 45000 - Long BTC with $100 USDC at $45,000 (Limit Order) โ€ข /short ETH 50 - Short ETH with $50 USDC (Market Order) โ€ข /short ETH 50 3500 - Short ETH with $50 USDC at $3,500 (Limit Order) โ€ข /exit BTC - Close BTC position with Market Order ๐Ÿ›ก๏ธ Risk Management: โ€ข /sl BTC 44000 - Set stop loss for BTC at $44,000 โ€ข /tp BTC 50000 - Set take profit for BTC at $50,000 ๐Ÿšจ Automatic Stop Loss: โ€ข Enabled: {risk_enabled} โ€ข Stop Loss: {stop_loss}% (automatic execution) โ€ข Monitoring: Every {heartbeat} seconds ๐Ÿ“‹ Order Management: โ€ข /orders - Show all open orders โ€ข /orders BTC - Show open orders for BTC only โ€ข /coo BTC - Cancel all open orders for BTC ๐Ÿ“ˆ Statistics & Analytics: โ€ข /stats - Complete trading statistics โ€ข /performance - Win rate, profit factor, etc. โ€ข /risk - Sharpe ratio, drawdown, VaR โ€ข /trades - Recent trade history ๐Ÿ”” Price Alerts: โ€ข /alarm - List all active alarms โ€ข /alarm BTC 50000 - Set alarm for BTC at $50,000 โ€ข /alarm BTC - Show all BTC alarms โ€ข /alarm 3 - Remove alarm ID 3 ๐Ÿ”„ Order Monitoring: โ€ข /monitoring - View monitoring status โ€ข /logs - View log file statistics and cleanup โš™๏ธ Configuration: โ€ข Symbol: {symbol} โ€ข Default Token: {symbol} โ€ข Network: {network} ๐Ÿ›ก๏ธ Safety Features: โ€ข All trades logged automatically โ€ข Comprehensive performance tracking โ€ข Real-time balance monitoring โ€ข Risk metrics calculation ๐Ÿ“ฑ Mobile Optimized: โ€ข Quick action buttons โ€ข Instant notifications โ€ข Clean, readable layout โ€ข One-tap commands For support, contact your bot administrator. """.format( symbol=Config.DEFAULT_TRADING_TOKEN, network="Testnet" if Config.HYPERLIQUID_TESTNET else "Mainnet", risk_enabled=Config.RISK_MANAGEMENT_ENABLED, stop_loss=Config.STOP_LOSS_PERCENTAGE, heartbeat=Config.BOT_HEARTBEAT_SECONDS ) await update.message.reply_text(help_text, parse_mode='HTML') async def stats_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Handle the /stats command.""" if not self.is_authorized(update.effective_chat.id): await update.message.reply_text("โŒ Unauthorized access.") return # Get current balance for stats balance = self.client.get_balance() current_balance = 0 if balance and balance.get('total'): current_balance = float(balance['total'].get('USDC', 0)) stats_message = self.stats.format_stats_message(current_balance) await update.message.reply_text(stats_message, parse_mode='HTML') async def trades_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Handle the /trades command.""" if not self.is_authorized(update.effective_chat.id): await update.message.reply_text("โŒ Unauthorized access.") return recent_trades = self.stats.get_recent_trades(10) if not recent_trades: await update.message.reply_text("๐Ÿ“ No trades recorded yet.") return trades_text = "๐Ÿ”„ Recent Trades\n\n" for trade in reversed(recent_trades[-5:]): # Show last 5 trades timestamp = datetime.fromisoformat(trade['timestamp']).strftime('%m/%d %H:%M') side_emoji = "๐ŸŸข" if trade['side'] == 'buy' else "๐Ÿ”ด" trades_text += f"{side_emoji} {trade['side'].upper()} {trade['amount']} {trade['symbol']}\n" trades_text += f" ๐Ÿ’ฐ ${trade['price']:,.2f} | ๐Ÿ’ต ${trade['value']:,.2f}\n" trades_text += f" ๐Ÿ“… {timestamp}\n\n" await update.message.reply_text(trades_text, parse_mode='HTML') async def balance_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Handle the /balance command.""" if not self.is_authorized(update.effective_chat.id): await update.message.reply_text("โŒ Unauthorized access.") return balance = self.client.get_balance() if balance: balance_text = "๐Ÿ’ฐ Account Balance\n\n" # CCXT balance structure includes 'free', 'used', and 'total' total_balance = balance.get('total', {}) free_balance = balance.get('free', {}) used_balance = balance.get('used', {}) if total_balance: total_value = 0 available_value = 0 # Display individual assets for asset, amount in total_balance.items(): if float(amount) > 0: free_amount = float(free_balance.get(asset, 0)) used_amount = float(used_balance.get(asset, 0)) balance_text += f"๐Ÿ’ต {asset}:\n" balance_text += f" ๐Ÿ“Š Total: {amount}\n" balance_text += f" โœ… Available: {free_amount}\n" if used_amount > 0: balance_text += f" ๐Ÿ”’ In Use: {used_amount}\n" balance_text += "\n" # Calculate totals for USDC (main trading currency) if asset == 'USDC': total_value += float(amount) available_value += free_amount # Summary section balance_text += f"๐Ÿ’ผ Portfolio Summary:\n" balance_text += f" ๐Ÿ’ฐ Total Value: ${total_value:,.2f}\n" balance_text += f" ๐Ÿš€ Available for Trading: ${available_value:,.2f}\n" if total_value - available_value > 0: balance_text += f" ๐Ÿ”’ In Active Use: ${total_value - available_value:,.2f}\n" # Add P&L summary basic_stats = self.stats.get_basic_stats() if basic_stats['initial_balance'] > 0: pnl = total_value - basic_stats['initial_balance'] pnl_percent = (pnl / basic_stats['initial_balance']) * 100 balance_text += f"\n๐Ÿ“Š Performance:\n" balance_text += f" ๐Ÿ’ต P&L: ${pnl:,.2f} ({pnl_percent:+.2f}%)\n" balance_text += f" ๐Ÿ“ˆ Initial: ${basic_stats['initial_balance']:,.2f}" else: balance_text += "๐Ÿ“ญ No balance data available" else: balance_text = "โŒ Could not fetch balance data" await update.message.reply_text(balance_text, parse_mode='HTML') async def positions_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Handle the /positions command.""" if not self.is_authorized(update.effective_chat.id): await update.message.reply_text("โŒ Unauthorized access.") return positions = self.client.get_positions() if positions is not None: # Successfully fetched (could be empty list) positions_text = "๐Ÿ“ˆ Open Positions\n\n" # Filter for actual open positions open_positions = [p for p in positions if float(p.get('contracts', 0)) != 0] if open_positions: total_unrealized = 0 for position in open_positions: symbol = position.get('symbol', 'Unknown') contracts = float(position.get('contracts', 0)) unrealized_pnl = float(position.get('unrealizedPnl', 0)) entry_price = float(position.get('entryPx', 0)) pnl_emoji = "๐ŸŸข" if unrealized_pnl >= 0 else "๐Ÿ”ด" positions_text += f"๐Ÿ“Š {symbol}\n" positions_text += f" ๐Ÿ“ Size: {contracts} contracts\n" positions_text += f" ๐Ÿ’ฐ Entry: ${entry_price:,.2f}\n" positions_text += f" {pnl_emoji} PnL: ${unrealized_pnl:,.2f}\n\n" total_unrealized += unrealized_pnl positions_text += f"๐Ÿ’ผ Total Unrealized P&L: ${total_unrealized:,.2f}" else: positions_text += "๐Ÿ“ญ No open positions currently\n\n" positions_text += "๐Ÿš€ Ready to start trading!\n" positions_text += "Use /buy or /sell commands to open positions." else: # Actual API error positions_text = "โŒ Could not fetch positions data\n\n" positions_text += "๐Ÿ”„ Please try again in a moment.\n" positions_text += "If the issue persists, check your connection." await update.message.reply_text(positions_text, parse_mode='HTML') async def orders_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Handle the /orders command with optional token filter.""" if not self.is_authorized(update.effective_chat.id): await update.message.reply_text("โŒ Unauthorized access.") return # Check if token filter is provided token_filter = None if context.args and len(context.args) >= 1: token_filter = context.args[0].upper() orders = self.client.get_open_orders() if orders is not None: # Successfully fetched (could be empty list) if token_filter: orders_text = f"๐Ÿ“‹ Open Orders - {token_filter}\n\n" # Filter orders for specific token target_symbol = f"{token_filter}/USDC:USDC" filtered_orders = [order for order in orders if order.get('symbol') == target_symbol] else: orders_text = "๐Ÿ“‹ All Open Orders\n\n" filtered_orders = orders if filtered_orders and len(filtered_orders) > 0: for order in filtered_orders: symbol = order.get('symbol', 'Unknown') side = order.get('side', 'Unknown') amount = order.get('amount', 0) price = order.get('price', 0) order_id = order.get('id', 'Unknown') # Extract token from symbol for display token = symbol.split('/')[0] if '/' in symbol else symbol side_emoji = "๐ŸŸข" if side.lower() == 'buy' else "๐Ÿ”ด" orders_text += f"{side_emoji} {token}\n" orders_text += f" ๐Ÿ“Š {side.upper()} {amount} @ ${price:,.2f}\n" orders_text += f" ๐Ÿ’ต Value: ${float(amount) * float(price):,.2f}\n" orders_text += f" ๐Ÿ”‘ ID: {order_id}\n\n" # Add helpful commands if token_filter: orders_text += f"๐Ÿ’ก Quick Actions:\n" orders_text += f"โ€ข /coo {token_filter} - Cancel all {token_filter} orders\n" orders_text += f"โ€ข /orders - View all orders" else: orders_text += f"๐Ÿ’ก Filter by token: /orders BTC, /orders ETH" else: if token_filter: orders_text += f"๐Ÿ“ญ No open orders for {token_filter}\n\n" orders_text += f"๐Ÿ’ก No pending {token_filter} orders found.\n" orders_text += f"Use /long {token_filter} 100 or /short {token_filter} 100 to create new orders." else: orders_text += "๐Ÿ“ญ No open orders currently\n\n" orders_text += "๐Ÿ’ก All clear! No pending orders.\n" orders_text += "Use /long or /short commands to place new orders." else: # Actual API error orders_text = "โŒ Could not fetch orders data\n\n" orders_text += "๐Ÿ”„ Please try again in a moment.\n" orders_text += "If the issue persists, check your connection." await update.message.reply_text(orders_text, parse_mode='HTML') async def market_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Handle the /market command.""" if not self.is_authorized(update.effective_chat.id): await update.message.reply_text("โŒ Unauthorized access.") return # Check if token is provided as argument if context.args and len(context.args) >= 1: token = context.args[0].upper() else: token = Config.DEFAULT_TRADING_TOKEN # Convert token to full symbol format for API symbol = f"{token}/USDC:USDC" market_data = self.client.get_market_data(symbol) if market_data and market_data.get('ticker'): try: ticker = market_data['ticker'] orderbook = market_data.get('orderbook', {}) # Safely extract ticker data with fallbacks current_price = float(ticker.get('last') or 0) high_24h = float(ticker.get('high') or 0) low_24h = float(ticker.get('low') or 0) volume_24h = ticker.get('baseVolume') or ticker.get('volume') or 'N/A' market_text = f"๐Ÿ“Š Market Data - {token}\n\n" if current_price > 0: market_text += f"๐Ÿ’ต Current Price: ${current_price:,.2f}\n" else: market_text += f"๐Ÿ’ต Current Price: N/A\n" if high_24h > 0: market_text += f"๐Ÿ“ˆ 24h High: ${high_24h:,.2f}\n" else: market_text += f"๐Ÿ“ˆ 24h High: N/A\n" if low_24h > 0: market_text += f"๐Ÿ“‰ 24h Low: ${low_24h:,.2f}\n" else: market_text += f"๐Ÿ“‰ 24h Low: N/A\n" market_text += f"๐Ÿ“Š 24h Volume: {volume_24h}\n\n" # Handle orderbook data safely if orderbook and orderbook.get('bids') and orderbook.get('asks'): try: bids = orderbook.get('bids', []) asks = orderbook.get('asks', []) if bids and asks and len(bids) > 0 and len(asks) > 0: best_bid = float(bids[0][0]) if bids[0][0] else 0 best_ask = float(asks[0][0]) if asks[0][0] else 0 if best_bid > 0 and best_ask > 0: spread = best_ask - best_bid spread_percent = (spread / best_ask * 100) if best_ask > 0 else 0 market_text += f"๐ŸŸข Best Bid: ${best_bid:,.2f}\n" market_text += f"๐Ÿ”ด Best Ask: ${best_ask:,.2f}\n" market_text += f"๐Ÿ“ Spread: ${spread:.2f} ({spread_percent:.3f}%)\n" else: market_text += f"๐Ÿ“‹ Orderbook: Data unavailable\n" else: market_text += f"๐Ÿ“‹ Orderbook: No orders available\n" except (IndexError, ValueError, TypeError) as e: market_text += f"๐Ÿ“‹ Orderbook: Error parsing data\n" else: market_text += f"๐Ÿ“‹ Orderbook: Not available\n" # Add usage hint market_text += f"\n๐Ÿ’ก Usage: /market {token} or /market for default" except (ValueError, TypeError) as e: market_text = f"โŒ Error parsing market data\n\n" market_text += f"๐Ÿ”ง Raw data received but couldn't parse values.\n" market_text += f"๐Ÿ“ž Please try again or contact support if this persists." else: market_text = f"โŒ Could not fetch market data for {token}\n\n" market_text += f"๐Ÿ”„ Please try again in a moment.\n" market_text += f"๐ŸŒ Check your network connection.\n" market_text += f"๐Ÿ“ก API may be temporarily unavailable.\n\n" market_text += f"๐Ÿ’ก Usage: /market BTC, /market ETH, /market SOL, etc." await update.message.reply_text(market_text, parse_mode='HTML') async def price_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Handle the /price command.""" if not self.is_authorized(update.effective_chat.id): await update.message.reply_text("โŒ Unauthorized access.") return # Check if token is provided as argument if context.args and len(context.args) >= 1: token = context.args[0].upper() else: token = Config.DEFAULT_TRADING_TOKEN # Convert token to full symbol format for API symbol = f"{token}/USDC:USDC" market_data = self.client.get_market_data(symbol) if market_data and market_data.get('ticker'): try: ticker = market_data['ticker'] price_value = ticker.get('last') if price_value is not None: price = float(price_value) price_text = f"๐Ÿ’ต {token}: ${price:,.2f}" # Add timestamp timestamp = datetime.now().strftime('%H:%M:%S') price_text += f"\nโฐ Updated: {timestamp}" # Add usage hint price_text += f"\n๐Ÿ’ก Usage: /price {symbol} or /price for default" else: price_text = f"๐Ÿ’ต {symbol}: Price not available\nโš ๏ธ Data temporarily unavailable" except (ValueError, TypeError) as e: price_text = f"โŒ Error parsing price for {symbol}\n๐Ÿ”ง Please try again" else: price_text = f"โŒ Could not fetch price for {symbol}\n๐Ÿ”„ Please try again in a moment\n\n" price_text += f"๐Ÿ’ก Usage: /price BTC, /price ETH, /price SOL, etc." await update.message.reply_text(price_text, parse_mode='HTML') async def button_callback(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Handle inline keyboard button presses.""" query = update.callback_query await query.answer() if not self.is_authorized(query.message.chat_id): await query.edit_message_text("โŒ Unauthorized access.") return callback_data = query.data # Handle trading confirmations if callback_data.startswith('confirm_long_'): parts = callback_data.split('_') token = parts[2] usdc_amount = float(parts[3]) price = float(parts[4]) is_limit = len(parts) > 5 and parts[5] == 'limit' await self._execute_long_order(query, token, usdc_amount, price, is_limit) return elif callback_data.startswith('confirm_short_'): parts = callback_data.split('_') token = parts[2] usdc_amount = float(parts[3]) price = float(parts[4]) is_limit = len(parts) > 5 and parts[5] == 'limit' await self._execute_short_order(query, token, usdc_amount, price, is_limit) return elif callback_data.startswith('confirm_exit_'): parts = callback_data.split('_') token = parts[2] exit_side = parts[3] contracts = float(parts[4]) price = float(parts[5]) await self._execute_exit_order(query, token, exit_side, contracts, price) return elif callback_data.startswith('confirm_coo_'): parts = callback_data.split('_') token = parts[2] await self._execute_coo(query, token) return elif callback_data.startswith('confirm_sl_'): parts = callback_data.split('_') token = parts[2] exit_side = parts[3] contracts = float(parts[4]) price = float(parts[5]) await self._execute_sl_order(query, token, exit_side, contracts, price) return elif callback_data.startswith('confirm_tp_'): parts = callback_data.split('_') token = parts[2] exit_side = parts[3] contracts = float(parts[4]) price = float(parts[5]) await self._execute_tp_order(query, token, exit_side, contracts, price) return elif callback_data == 'cancel_order': await query.edit_message_text("โŒ Order cancelled.") return # Create a fake update object for reusing command handlers fake_update = Update( update_id=update.update_id, message=query.message, callback_query=query ) # Handle regular button callbacks if callback_data == "balance": await self.balance_command(fake_update, context) elif callback_data == "stats": await self.stats_command(fake_update, context) elif callback_data == "positions": await self.positions_command(fake_update, context) elif callback_data == "orders": await self.orders_command(fake_update, context) elif callback_data == "market": await self.market_command(fake_update, context) elif callback_data == "price": await self.price_command(fake_update, context) elif callback_data == "trades": await self.trades_command(fake_update, context) elif callback_data == "help": await self.help_command(fake_update, context) async def _execute_long_order(self, query, token: str, usdc_amount: float, price: float, is_limit: bool): """Execute a long order.""" symbol = f"{token}/USDC:USDC" try: await query.edit_message_text("โณ Opening long position...") # Calculate token amount based on USDC value and price token_amount = usdc_amount / price # Place order (limit or market) if is_limit: order = self.client.place_limit_order(symbol, 'buy', token_amount, price) else: order = self.client.place_market_order(symbol, 'buy', token_amount) if order: # Record the trade in stats order_id = order.get('id', 'N/A') actual_price = order.get('average', price) # Use actual fill price if available self.stats.record_trade(symbol, 'buy', token_amount, actual_price, order_id) success_message = f""" โœ… Long Position {'Placed' if is_limit else 'Opened'} Successfully! ๐Ÿ“Š Order Details: โ€ข Token: {token} โ€ข Direction: LONG (Buy) โ€ข Amount: {token_amount:.6f} {token} โ€ข Price: ${price:,.2f} โ€ข USDC Value: ~${usdc_amount:,.2f} โ€ข Order Type: {'Limit' if is_limit else 'Market'} Order โ€ข Order ID: {order_id} ๐Ÿš€ Your {'limit order has been placed' if is_limit else 'long position is now active'}! """ await query.edit_message_text(success_message, parse_mode='HTML') logger.info(f"Long {'limit order placed' if is_limit else 'position opened'}: {token_amount:.6f} {token} @ ${price} ({'Limit' if is_limit else 'Market'})") else: await query.edit_message_text(f"โŒ Failed to {'place limit order' if is_limit else 'open long position'}. Please try again.") except Exception as e: error_message = f"โŒ Error {'placing limit order' if is_limit else 'opening long position'}: {str(e)}" await query.edit_message_text(error_message) logger.error(f"Error in long order: {e}") async def _execute_short_order(self, query, token: str, usdc_amount: float, price: float, is_limit: bool): """Execute a short order.""" symbol = f"{token}/USDC:USDC" try: await query.edit_message_text("โณ Opening short position...") # Calculate token amount based on USDC value and price token_amount = usdc_amount / price # Place order (limit or market) if is_limit: order = self.client.place_limit_order(symbol, 'sell', token_amount, price) else: order = self.client.place_market_order(symbol, 'sell', token_amount) if order: # Record the trade in stats order_id = order.get('id', 'N/A') actual_price = order.get('average', price) # Use actual fill price if available self.stats.record_trade(symbol, 'sell', token_amount, actual_price, order_id) success_message = f""" โœ… Short Position {'Placed' if is_limit else 'Opened'} Successfully! ๐Ÿ“Š Order Details: โ€ข Token: {token} โ€ข Direction: SHORT (Sell) โ€ข Amount: {token_amount:.6f} {token} โ€ข Price: ${price:,.2f} โ€ข USDC Value: ~${usdc_amount:,.2f} โ€ข Order Type: {'Limit' if is_limit else 'Market'} Order โ€ข Order ID: {order_id} ๐Ÿ“‰ Your {'limit order has been placed' if is_limit else 'short position is now active'}! """ await query.edit_message_text(success_message, parse_mode='HTML') logger.info(f"Short {'limit order placed' if is_limit else 'position opened'}: {token_amount:.6f} {token} @ ${price} ({'Limit' if is_limit else 'Market'})") else: await query.edit_message_text(f"โŒ Failed to {'place limit order' if is_limit else 'open short position'}. Please try again.") except Exception as e: error_message = f"โŒ Error {'placing limit order' if is_limit else 'opening short position'}: {str(e)}" await query.edit_message_text(error_message) logger.error(f"Error in short order: {e}") async def _execute_exit_order(self, query, token: str, exit_side: str, contracts: float, price: float): """Execute an exit order.""" symbol = f"{token}/USDC:USDC" try: await query.edit_message_text("โณ Closing position...") # Place market order to close position order = self.client.place_market_order(symbol, exit_side, contracts) if order: # Record the trade in stats order_id = order.get('id', 'N/A') actual_price = order.get('average', price) # Use actual fill price if available self.stats.record_trade(symbol, exit_side, contracts, actual_price, order_id) position_type = "LONG" if exit_side == "sell" else "SHORT" success_message = f""" โœ… Position Closed Successfully! ๐Ÿ“Š Exit Details: โ€ข Token: {token} โ€ข Position Closed: {position_type} โ€ข Exit Side: {exit_side.upper()} โ€ข Amount: {contracts} {token} โ€ข Est. Price: ~${price:,.2f} โ€ข Order Type: Market Order โ€ข Order ID: {order_id} ๐ŸŽฏ Position Summary: โ€ข Status: CLOSED โ€ข Exit Value: ~${contracts * price:,.2f} ๐Ÿ“Š Use /stats to see updated performance metrics. """ await query.edit_message_text(success_message, parse_mode='HTML') logger.info(f"Position closed: {exit_side} {contracts} {token} @ ~${price}") else: await query.edit_message_text("โŒ Failed to close position. Please try again.") except Exception as e: error_message = f"โŒ Error closing position: {str(e)}" await query.edit_message_text(error_message) logger.error(f"Error closing position: {e}") async def _execute_coo(self, query, token: str): """Execute cancel open orders for a specific token.""" symbol = f"{token}/USDC:USDC" try: await query.edit_message_text("โณ Cancelling all orders...") # Get current orders for this token all_orders = self.client.get_open_orders() if all_orders is None: await query.edit_message_text(f"โŒ Could not fetch orders to cancel {token} orders") return # Filter orders for the specific token token_orders = [order for order in all_orders if order.get('symbol') == symbol] if not token_orders: await query.edit_message_text(f"๐Ÿ“ญ No open orders found for {token}") return # Cancel each order cancelled_orders = [] failed_orders = [] for order in token_orders: order_id = order.get('id') if order_id: try: success = self.client.cancel_order(order_id, symbol) if success: cancelled_orders.append(order) else: failed_orders.append(order) except Exception as e: logger.error(f"Failed to cancel order {order_id}: {e}") failed_orders.append(order) # Create result message result_message = f""" โœ… Cancel Orders Results ๐Ÿ“Š Summary: โ€ข Token: {token} โ€ข Cancelled: {len(cancelled_orders)} orders โ€ข Failed: {len(failed_orders)} orders โ€ข Total Attempted: {len(token_orders)} orders """ if cancelled_orders: result_message += f"\n๐Ÿ—‘๏ธ Successfully Cancelled:\n" for order in cancelled_orders: side = order.get('side', 'Unknown') amount = order.get('amount', 0) price = order.get('price', 0) side_emoji = "๐ŸŸข" if side.lower() == 'buy' else "๐Ÿ”ด" result_message += f"{side_emoji} {side.upper()} {amount} @ ${price:,.2f}\n" if failed_orders: result_message += f"\nโŒ Failed to Cancel:\n" for order in failed_orders: side = order.get('side', 'Unknown') amount = order.get('amount', 0) price = order.get('price', 0) order_id = order.get('id', 'Unknown') side_emoji = "๐ŸŸข" if side.lower() == 'buy' else "๐Ÿ”ด" result_message += f"{side_emoji} {side.upper()} {amount} @ ${price:,.2f} (ID: {order_id})\n" if len(cancelled_orders) == len(token_orders): result_message += f"\n๐ŸŽ‰ All {token} orders successfully cancelled!" elif len(cancelled_orders) > 0: result_message += f"\nโš ๏ธ Some orders cancelled. Check failed orders above." else: result_message += f"\nโŒ Could not cancel any {token} orders." await query.edit_message_text(result_message, parse_mode='HTML') logger.info(f"COO executed for {token}: {len(cancelled_orders)}/{len(token_orders)} orders cancelled") except Exception as e: error_message = f"โŒ Error cancelling {token} orders: {str(e)}" await query.edit_message_text(error_message) logger.error(f"Error in COO execution: {e}") async def _execute_sl_order(self, query, token: str, exit_side: str, contracts: float, price: float): """Execute a stop loss order.""" symbol = f"{token}/USDC:USDC" try: await query.edit_message_text("โณ Setting stop loss...") # Place stop loss order order = self.client.place_stop_loss_order(symbol, exit_side, contracts, price) if order: # Record the trade in stats order_id = order.get('id', 'N/A') actual_price = order.get('average', price) # Use actual fill price if available self.stats.record_trade(symbol, exit_side, contracts, actual_price, order_id) position_type = "LONG" if exit_side == "sell" else "SHORT" success_message = f""" โœ… Stop Loss Order Set Successfully! ๐Ÿ“Š Stop Loss Details: โ€ข Token: {token} โ€ข Position: {position_type} โ€ข Size: {contracts} contracts โ€ข Stop Price: ${price:,.2f} โ€ข Action: {exit_side.upper()} (Close {position_type}) โ€ข Amount: {contracts} {token} โ€ข Order Type: Limit Order โ€ข Order ID: {order_id} ๐ŸŽฏ Stop Loss Execution: โ€ข Status: SET โ€ข Exit Value: ~${contracts * price:,.2f} ๐Ÿ“Š Use /stats to see updated performance metrics. """ await query.edit_message_text(success_message, parse_mode='HTML') logger.info(f"Stop loss set: {exit_side} {contracts} {token} @ ${price}") else: await query.edit_message_text("โŒ Failed to set stop loss. Please try again.") except Exception as e: error_message = f"โŒ Error setting stop loss: {str(e)}" await query.edit_message_text(error_message) logger.error(f"Error setting stop loss: {e}") async def _execute_tp_order(self, query, token: str, exit_side: str, contracts: float, price: float): """Execute a take profit order.""" symbol = f"{token}/USDC:USDC" try: await query.edit_message_text("โณ Setting take profit...") # Place take profit order order = self.client.place_take_profit_order(symbol, exit_side, contracts, price) if order: # Record the trade in stats order_id = order.get('id', 'N/A') actual_price = order.get('average', price) # Use actual fill price if available self.stats.record_trade(symbol, exit_side, contracts, actual_price, order_id) position_type = "LONG" if exit_side == "sell" else "SHORT" success_message = f""" โœ… Take Profit Order Set Successfully! ๐Ÿ“Š Take Profit Details: โ€ข Token: {token} โ€ข Position: {position_type} โ€ข Size: {contracts} contracts โ€ข Target Price: ${price:,.2f} โ€ข Action: {exit_side.upper()} (Close {position_type}) โ€ข Amount: {contracts} {token} โ€ข Order Type: Limit Order โ€ข Order ID: {order_id} ๐ŸŽฏ Take Profit Execution: โ€ข Status: SET โ€ข Exit Value: ~${contracts * price:,.2f} ๐Ÿ“Š Use /stats to see updated performance metrics. """ await query.edit_message_text(success_message, parse_mode='HTML') logger.info(f"Take profit set: {exit_side} {contracts} {token} @ ${price}") else: await query.edit_message_text("โŒ Failed to set take profit. Please try again.") except Exception as e: error_message = f"โŒ Error setting take profit: {str(e)}" await query.edit_message_text(error_message) logger.error(f"Error setting take profit: {e}") async def unknown_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Handle unknown commands.""" if not self.is_authorized(update.effective_chat.id): await update.message.reply_text("โŒ Unauthorized access.") return await update.message.reply_text( "โ“ Unknown command. Use /help to see available commands or tap the buttons in /start." ) def setup_handlers(self): """Set up command handlers for the bot.""" if not self.application: return # Command handlers self.application.add_handler(CommandHandler("start", self.start_command)) self.application.add_handler(CommandHandler("help", self.help_command)) self.application.add_handler(CommandHandler("balance", self.balance_command)) self.application.add_handler(CommandHandler("positions", self.positions_command)) self.application.add_handler(CommandHandler("orders", self.orders_command)) self.application.add_handler(CommandHandler("market", self.market_command)) self.application.add_handler(CommandHandler("price", self.price_command)) self.application.add_handler(CommandHandler("stats", self.stats_command)) self.application.add_handler(CommandHandler("trades", self.trades_command)) self.application.add_handler(CommandHandler("long", self.long_command)) self.application.add_handler(CommandHandler("short", self.short_command)) self.application.add_handler(CommandHandler("exit", self.exit_command)) self.application.add_handler(CommandHandler("coo", self.coo_command)) self.application.add_handler(CommandHandler("sl", self.sl_command)) self.application.add_handler(CommandHandler("tp", self.tp_command)) self.application.add_handler(CommandHandler("monitoring", self.monitoring_command)) self.application.add_handler(CommandHandler("alarm", self.alarm_command)) self.application.add_handler(CommandHandler("logs", self.logs_command)) # Callback query handler for inline keyboards self.application.add_handler(CallbackQueryHandler(self.button_callback)) # Handle unknown commands self.application.add_handler(MessageHandler(filters.COMMAND, self.unknown_command)) async def run(self): """Run the Telegram bot.""" if not Config.TELEGRAM_BOT_TOKEN: logger.error("โŒ TELEGRAM_BOT_TOKEN not configured") return if not Config.TELEGRAM_CHAT_ID: logger.error("โŒ TELEGRAM_CHAT_ID not configured") return try: # Create application self.application = Application.builder().token(Config.TELEGRAM_BOT_TOKEN).build() # Set up handlers self.setup_handlers() logger.info("๐Ÿš€ Starting Telegram trading bot...") # Initialize the application await self.application.initialize() # Send startup notification await self.send_message( "๐Ÿค– Manual Trading Bot Started\n\n" f"โœ… Connected to Hyperliquid {'Testnet' if Config.HYPERLIQUID_TESTNET else 'Mainnet'}\n" f"๐Ÿ“Š Default Symbol: {Config.DEFAULT_TRADING_TOKEN}\n" f"๐Ÿ“ฑ Manual trading ready!\n" f"๐Ÿ”„ Order monitoring: Active ({Config.BOT_HEARTBEAT_SECONDS}s interval)\n" f"๐Ÿ”„ External trade monitoring: Active\n" f"๐Ÿ”” Price alarms: Active\n" f"๐Ÿ“Š Auto stats sync: Enabled\n" f"๐Ÿ“ Logging: {'File + Console' if Config.LOG_TO_FILE else 'Console only'}\n" f"โฐ Started at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n" "Use /start for quick actions or /help for all commands." ) # Perform initial log cleanup try: cleanup_logs(days_to_keep=30) logger.info("๐Ÿงน Initial log cleanup completed") except Exception as e: logger.warning(f"โš ๏ธ Initial log cleanup failed: {e}") # Start the application await self.application.start() # Start order monitoring await self.start_order_monitoring() # Start polling for updates manually logger.info("๐Ÿ”„ Starting update polling...") # Get updates in a loop last_update_id = 0 while True: try: # Get updates from Telegram updates = await self.application.bot.get_updates( offset=last_update_id + 1, timeout=30, allowed_updates=None ) # Process each update for update in updates: last_update_id = update.update_id # Process the update through the application await self.application.process_update(update) except Exception as e: logger.error(f"Error processing updates: {e}") await asyncio.sleep(5) # Wait before retrying except asyncio.CancelledError: logger.info("๐Ÿ›‘ Bot polling cancelled") raise except Exception as e: logger.error(f"โŒ Error in telegram bot: {e}") raise finally: # Clean shutdown try: await self.stop_order_monitoring() if self.application: await self.application.stop() await self.application.shutdown() except Exception as e: logger.error(f"Error during shutdown: {e}") async def long_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Handle the /long command for opening long positions.""" if not self.is_authorized(update.effective_chat.id): await update.message.reply_text("โŒ Unauthorized access.") return try: if not context.args or len(context.args) < 2: await update.message.reply_text( "โŒ Usage: /long [token] [USDC amount] [price (optional)]\n" "Examples:\n" "โ€ข /long BTC 100 - Market order\n" "โ€ข /long BTC 100 45000 - Limit order at $45,000" ) return token = context.args[0].upper() usdc_amount = float(context.args[1]) # Check if price is provided for limit order limit_price = None if len(context.args) >= 3: limit_price = float(context.args[2]) order_type = "Limit" order_description = f"at ${limit_price:,.2f}" else: order_type = "Market" order_description = "at current market price" # Convert token to full symbol format for Hyperliquid symbol = f"{token}/USDC:USDC" # Get current market price to calculate amount and for display market_data = self.client.get_market_data(symbol) if not market_data: await update.message.reply_text(f"โŒ Could not fetch price for {token}") return current_price = float(market_data['ticker'].get('last', 0)) if current_price <= 0: await update.message.reply_text(f"โŒ Invalid price for {token}") return # Calculate token amount based on price (market or limit) calculation_price = limit_price if limit_price else current_price token_amount = usdc_amount / calculation_price # Create confirmation message confirmation_text = f""" ๐ŸŸข Long Position Confirmation ๐Ÿ“Š Order Details: โ€ข Token: {token} โ€ข Direction: LONG (Buy) โ€ข USDC Value: ${usdc_amount:,.2f} โ€ข Current Price: ${current_price:,.2f} โ€ข Order Type: {order_type} Order โ€ข Token Amount: {token_amount:.6f} {token} ๐ŸŽฏ Execution: โ€ข Will buy {token_amount:.6f} {token} {order_description} โ€ข Est. Value: ${token_amount * calculation_price:,.2f} โš ๏ธ Are you sure you want to open this long position? """ # Use limit_price for callback if provided, otherwise current_price callback_price = limit_price if limit_price else current_price callback_data = f"confirm_long_{token}_{usdc_amount}_{callback_price}" if limit_price: callback_data += "_limit" keyboard = [ [ InlineKeyboardButton("โœ… Confirm Long", callback_data=callback_data), InlineKeyboardButton("โŒ Cancel", callback_data="cancel_order") ] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text(confirmation_text, parse_mode='HTML', reply_markup=reply_markup) except ValueError: await update.message.reply_text("โŒ Invalid USDC amount or price. Please use numbers only.") except Exception as e: await update.message.reply_text(f"โŒ Error processing long command: {e}") async def short_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Handle the /short command for opening short positions.""" if not self.is_authorized(update.effective_chat.id): await update.message.reply_text("โŒ Unauthorized access.") return try: if not context.args or len(context.args) < 2: await update.message.reply_text( "โŒ Usage: /short [token] [USDC amount] [price (optional)]\n" "Examples:\n" "โ€ข /short BTC 100 - Market order\n" "โ€ข /short BTC 100 46000 - Limit order at $46,000" ) return token = context.args[0].upper() usdc_amount = float(context.args[1]) # Check if price is provided for limit order limit_price = None if len(context.args) >= 3: limit_price = float(context.args[2]) order_type = "Limit" order_description = f"at ${limit_price:,.2f}" else: order_type = "Market" order_description = "at current market price" # Convert token to full symbol format for Hyperliquid symbol = f"{token}/USDC:USDC" # Get current market price to calculate amount and for display market_data = self.client.get_market_data(symbol) if not market_data: await update.message.reply_text(f"โŒ Could not fetch price for {token}") return current_price = float(market_data['ticker'].get('last', 0)) if current_price <= 0: await update.message.reply_text(f"โŒ Invalid price for {token}") return # Calculate token amount based on price (market or limit) calculation_price = limit_price if limit_price else current_price token_amount = usdc_amount / calculation_price # Create confirmation message confirmation_text = f""" ๐Ÿ”ด Short Position Confirmation ๐Ÿ“Š Order Details: โ€ข Token: {token} โ€ข Direction: SHORT (Sell) โ€ข USDC Value: ${usdc_amount:,.2f} โ€ข Current Price: ${current_price:,.2f} โ€ข Order Type: {order_type} Order โ€ข Token Amount: {token_amount:.6f} {token} ๐ŸŽฏ Execution: โ€ข Will sell {token_amount:.6f} {token} {order_description} โ€ข Est. Value: ${token_amount * calculation_price:,.2f} โš ๏ธ Are you sure you want to open this short position? """ # Use limit_price for callback if provided, otherwise current_price callback_price = limit_price if limit_price else current_price callback_data = f"confirm_short_{token}_{usdc_amount}_{callback_price}" if limit_price: callback_data += "_limit" keyboard = [ [ InlineKeyboardButton("โœ… Confirm Short", callback_data=callback_data), InlineKeyboardButton("โŒ Cancel", callback_data="cancel_order") ] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text(confirmation_text, parse_mode='HTML', reply_markup=reply_markup) except ValueError: await update.message.reply_text("โŒ Invalid USDC amount or price. Please use numbers only.") except Exception as e: await update.message.reply_text(f"โŒ Error processing short command: {e}") async def exit_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Handle the /exit command for closing positions.""" if not self.is_authorized(update.effective_chat.id): await update.message.reply_text("โŒ Unauthorized access.") return try: if not context.args or len(context.args) < 1: await update.message.reply_text( "โŒ Usage: /exit [token]\n" "Example: /exit BTC" ) return token = context.args[0].upper() symbol = f"{token}/USDC:USDC" # Get current positions to find the position for this token positions = self.client.get_positions() if positions is None: await update.message.reply_text(f"โŒ Could not fetch positions to check {token} position") return # Find the position for this token current_position = None for position in positions: if position.get('symbol') == symbol and float(position.get('contracts', 0)) != 0: current_position = position break if not current_position: await update.message.reply_text(f"๐Ÿ“ญ No open position found for {token}") return # Extract position details contracts = float(current_position.get('contracts', 0)) entry_price = float(current_position.get('entryPx', 0)) unrealized_pnl = float(current_position.get('unrealizedPnl', 0)) # Determine position direction and exit details if contracts > 0: position_type = "LONG" exit_side = "sell" exit_emoji = "๐Ÿ”ด" else: position_type = "SHORT" exit_side = "buy" exit_emoji = "๐ŸŸข" contracts = abs(contracts) # Make positive for display # Get current market price market_data = self.client.get_market_data(symbol) if not market_data: await update.message.reply_text(f"โŒ Could not fetch current price for {token}") return current_price = float(market_data['ticker'].get('last', 0)) if current_price <= 0: await update.message.reply_text(f"โŒ Invalid current price for {token}") return # Calculate estimated exit value exit_value = contracts * current_price # Create confirmation message pnl_emoji = "๐ŸŸข" if unrealized_pnl >= 0 else "๐Ÿ”ด" confirmation_text = f""" {exit_emoji} Exit Position Confirmation ๐Ÿ“Š Position Details: โ€ข Token: {token} โ€ข Position: {position_type} โ€ข Size: {contracts} contracts โ€ข Entry Price: ${entry_price:,.2f} โ€ข Current Price: ${current_price:,.2f} โ€ข {pnl_emoji} Unrealized P&L: ${unrealized_pnl:,.2f} ๐ŸŽฏ Exit Order: โ€ข Action: {exit_side.upper()} (Close {position_type}) โ€ข Amount: {contracts} {token} โ€ข Est. Value: ~${exit_value:,.2f} โ€ข Order Type: Market Order โš ๏ธ Are you sure you want to close this {position_type} position? This will place a market {exit_side} order to close your entire {token} position. """ keyboard = [ [ InlineKeyboardButton(f"โœ… Close {position_type}", callback_data=f"confirm_exit_{token}_{exit_side}_{contracts}_{current_price}"), InlineKeyboardButton("โŒ Cancel", callback_data="cancel_order") ] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text(confirmation_text, parse_mode='HTML', reply_markup=reply_markup) except ValueError: await update.message.reply_text("โŒ Invalid token format. Please use token symbols like BTC, ETH, etc.") except Exception as e: await update.message.reply_text(f"โŒ Error processing exit command: {e}") async def coo_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Handle the /coo (cancel open orders) command for a specific token.""" if not self.is_authorized(update.effective_chat.id): await update.message.reply_text("โŒ Unauthorized access.") return try: if not context.args or len(context.args) < 1: await update.message.reply_text( "โŒ Usage: /coo [token]\n" "Example: /coo BTC\n\n" "This command cancels ALL open orders for the specified token." ) return token = context.args[0].upper() symbol = f"{token}/USDC:USDC" # Get current orders for this token all_orders = self.client.get_open_orders() if all_orders is None: await update.message.reply_text(f"โŒ Could not fetch orders to cancel {token} orders") return # Filter orders for the specific token token_orders = [order for order in all_orders if order.get('symbol') == symbol] if not token_orders: await update.message.reply_text(f"๐Ÿ“ญ No open orders found for {token}") return # Create confirmation message with order details confirmation_text = f""" โš ๏ธ Cancel All {token} Orders ๐Ÿ“‹ Orders to Cancel: """ total_value = 0 for order in token_orders: side = order.get('side', 'Unknown') amount = order.get('amount', 0) price = order.get('price', 0) order_id = order.get('id', 'Unknown') side_emoji = "๐ŸŸข" if side.lower() == 'buy' else "๐Ÿ”ด" order_value = float(amount) * float(price) total_value += order_value confirmation_text += f"{side_emoji} {side.upper()} {amount} @ ${price:,.2f} (${order_value:,.2f})\n" confirmation_text += f""" ๐Ÿ’ฐ Total Value: ${total_value:,.2f} ๐Ÿ”ข Orders Count: {len(token_orders)} โš ๏ธ Are you sure you want to cancel ALL {token} orders? This action cannot be undone. """ keyboard = [ [ InlineKeyboardButton(f"โœ… Cancel All {token}", callback_data=f"confirm_coo_{token}"), InlineKeyboardButton("โŒ Keep Orders", callback_data="cancel_order") ] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text(confirmation_text, parse_mode='HTML', reply_markup=reply_markup) except ValueError: await update.message.reply_text("โŒ Invalid token format. Please use token symbols like BTC, ETH, etc.") except Exception as e: await update.message.reply_text(f"โŒ Error processing cancel orders command: {e}") async def sl_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Handle the /sl (stop loss) command for setting stop loss orders.""" if not self.is_authorized(update.effective_chat.id): await update.message.reply_text("โŒ Unauthorized access.") return try: if not context.args or len(context.args) < 2: await update.message.reply_text( "โŒ Usage: /sl [token] [price]\n" "Example: /sl BTC 44000\n\n" "This creates a stop loss order at the specified price." ) return token = context.args[0].upper() stop_price = float(context.args[1]) symbol = f"{token}/USDC:USDC" # Get current positions to find the position for this token positions = self.client.get_positions() if positions is None: await update.message.reply_text(f"โŒ Could not fetch positions to check {token} position") return # Find the position for this token current_position = None for position in positions: if position.get('symbol') == symbol and float(position.get('contracts', 0)) != 0: current_position = position break if not current_position: await update.message.reply_text(f"๐Ÿ“ญ No open position found for {token}\n\nYou need an open position to set a stop loss.") return # Extract position details contracts = float(current_position.get('contracts', 0)) entry_price = float(current_position.get('entryPx', 0)) unrealized_pnl = float(current_position.get('unrealizedPnl', 0)) # Determine position direction and validate stop loss price if contracts > 0: # Long position - stop loss should be below entry price position_type = "LONG" exit_side = "sell" exit_emoji = "๐Ÿ”ด" contracts_abs = contracts if stop_price >= entry_price: await update.message.reply_text( f"โš ๏ธ Stop loss price should be BELOW entry price for long positions\n\n" f"๐Ÿ“Š Your {token} LONG position:\n" f"โ€ข Entry Price: ${entry_price:,.2f}\n" f"โ€ข Stop Price: ${stop_price:,.2f} โŒ\n\n" f"๐Ÿ’ก Try a lower price like: /sl {token} {entry_price * 0.95:.0f}" ) return else: # Short position - stop loss should be above entry price position_type = "SHORT" exit_side = "buy" exit_emoji = "๐ŸŸข" contracts_abs = abs(contracts) if stop_price <= entry_price: await update.message.reply_text( f"โš ๏ธ Stop loss price should be ABOVE entry price for short positions\n\n" f"๐Ÿ“Š Your {token} SHORT position:\n" f"โ€ข Entry Price: ${entry_price:,.2f}\n" f"โ€ข Stop Price: ${stop_price:,.2f} โŒ\n\n" f"๐Ÿ’ก Try a higher price like: /sl {token} {entry_price * 1.05:.0f}" ) return # Get current market price for reference market_data = self.client.get_market_data(symbol) current_price = 0 if market_data: current_price = float(market_data['ticker'].get('last', 0)) # Calculate estimated P&L at stop loss if contracts > 0: # Long position pnl_at_stop = (stop_price - entry_price) * contracts_abs else: # Short position pnl_at_stop = (entry_price - stop_price) * contracts_abs # Create confirmation message pnl_emoji = "๐ŸŸข" if pnl_at_stop >= 0 else "๐Ÿ”ด" confirmation_text = f""" ๐Ÿ›‘ Stop Loss Order Confirmation ๐Ÿ“Š Position Details: โ€ข Token: {token} โ€ข Position: {position_type} โ€ข Size: {contracts_abs} contracts โ€ข Entry Price: ${entry_price:,.2f} โ€ข Current Price: ${current_price:,.2f} ๐ŸŽฏ Stop Loss Order: โ€ข Stop Price: ${stop_price:,.2f} โ€ข Action: {exit_side.upper()} (Close {position_type}) โ€ข Amount: {contracts_abs} {token} โ€ข Order Type: Limit Order โ€ข {pnl_emoji} Est. P&L: ${pnl_at_stop:,.2f} โš ๏ธ Are you sure you want to set this stop loss? This will place a limit {exit_side} order at ${stop_price:,.2f} to protect your {position_type} position. """ keyboard = [ [ InlineKeyboardButton(f"โœ… Set Stop Loss", callback_data=f"confirm_sl_{token}_{exit_side}_{contracts_abs}_{stop_price}"), InlineKeyboardButton("โŒ Cancel", callback_data="cancel_order") ] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text(confirmation_text, parse_mode='HTML', reply_markup=reply_markup) except ValueError: await update.message.reply_text("โŒ Invalid price format. Please use numbers only.") except Exception as e: await update.message.reply_text(f"โŒ Error processing stop loss command: {e}") async def tp_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Handle the /tp (take profit) command for setting take profit orders.""" if not self.is_authorized(update.effective_chat.id): await update.message.reply_text("โŒ Unauthorized access.") return try: if not context.args or len(context.args) < 2: await update.message.reply_text( "โŒ Usage: /tp [token] [price]\n" "Example: /tp BTC 50000\n\n" "This creates a take profit order at the specified price." ) return token = context.args[0].upper() profit_price = float(context.args[1]) symbol = f"{token}/USDC:USDC" # Get current positions to find the position for this token positions = self.client.get_positions() if positions is None: await update.message.reply_text(f"โŒ Could not fetch positions to check {token} position") return # Find the position for this token current_position = None for position in positions: if position.get('symbol') == symbol and float(position.get('contracts', 0)) != 0: current_position = position break if not current_position: await update.message.reply_text(f"๐Ÿ“ญ No open position found for {token}\n\nYou need an open position to set a take profit.") return # Extract position details contracts = float(current_position.get('contracts', 0)) entry_price = float(current_position.get('entryPx', 0)) unrealized_pnl = float(current_position.get('unrealizedPnl', 0)) # Determine position direction and validate take profit price if contracts > 0: # Long position - take profit should be above entry price position_type = "LONG" exit_side = "sell" exit_emoji = "๐Ÿ”ด" contracts_abs = contracts if profit_price <= entry_price: await update.message.reply_text( f"โš ๏ธ Take profit price should be ABOVE entry price for long positions\n\n" f"๐Ÿ“Š Your {token} LONG position:\n" f"โ€ข Entry Price: ${entry_price:,.2f}\n" f"โ€ข Take Profit: ${profit_price:,.2f} โŒ\n\n" f"๐Ÿ’ก Try a higher price like: /tp {token} {entry_price * 1.05:.0f}" ) return else: # Short position - take profit should be below entry price position_type = "SHORT" exit_side = "buy" exit_emoji = "๐ŸŸข" contracts_abs = abs(contracts) if profit_price >= entry_price: await update.message.reply_text( f"โš ๏ธ Take profit price should be BELOW entry price for short positions\n\n" f"๐Ÿ“Š Your {token} SHORT position:\n" f"โ€ข Entry Price: ${entry_price:,.2f}\n" f"โ€ข Take Profit: ${profit_price:,.2f} โŒ\n\n" f"๐Ÿ’ก Try a lower price like: /tp {token} {entry_price * 0.95:.0f}" ) return # Get current market price for reference market_data = self.client.get_market_data(symbol) current_price = 0 if market_data: current_price = float(market_data['ticker'].get('last', 0)) # Calculate estimated P&L at take profit if contracts > 0: # Long position pnl_at_tp = (profit_price - entry_price) * contracts_abs else: # Short position pnl_at_tp = (entry_price - profit_price) * contracts_abs # Create confirmation message pnl_emoji = "๐ŸŸข" if pnl_at_tp >= 0 else "๐Ÿ”ด" confirmation_text = f""" ๐ŸŽฏ Take Profit Order Confirmation ๐Ÿ“Š Position Details: โ€ข Token: {token} โ€ข Position: {position_type} โ€ข Size: {contracts_abs} contracts โ€ข Entry Price: ${entry_price:,.2f} โ€ข Current Price: ${current_price:,.2f} ๐Ÿ’ฐ Take Profit Order: โ€ข Target Price: ${profit_price:,.2f} โ€ข Action: {exit_side.upper()} (Close {position_type}) โ€ข Amount: {contracts_abs} {token} โ€ข Order Type: Limit Order โ€ข {pnl_emoji} Est. P&L: ${pnl_at_tp:,.2f} โš ๏ธ Are you sure you want to set this take profit? This will place a limit {exit_side} order at ${profit_price:,.2f} to capture profits from your {position_type} position. """ keyboard = [ [ InlineKeyboardButton(f"โœ… Set Take Profit", callback_data=f"confirm_tp_{token}_{exit_side}_{contracts_abs}_{profit_price}"), InlineKeyboardButton("โŒ Cancel", callback_data="cancel_order") ] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text(confirmation_text, parse_mode='HTML', reply_markup=reply_markup) except ValueError: await update.message.reply_text("โŒ Invalid price format. Please use numbers only.") except Exception as e: await update.message.reply_text(f"โŒ Error processing take profit command: {e}") async def start_order_monitoring(self): """Start the order monitoring background task.""" if self.monitoring_active: return self.monitoring_active = True logger.info("๐Ÿ”„ Starting order monitoring...") # Initialize tracking data await self._initialize_order_tracking() # Start monitoring loop asyncio.create_task(self._order_monitoring_loop()) async def stop_order_monitoring(self): """Stop the order monitoring background task.""" self.monitoring_active = False logger.info("โน๏ธ Stopping order monitoring...") async def _initialize_order_tracking(self): """Initialize order and position tracking.""" try: # Get current open orders to initialize tracking orders = self.client.get_open_orders() if orders: self.last_known_orders = {order.get('id') for order in orders if order.get('id')} logger.info(f"๐Ÿ“‹ Initialized tracking with {len(self.last_known_orders)} open orders") # Get current positions for P&L tracking positions = self.client.get_positions() if positions: for position in positions: symbol = position.get('symbol') contracts = float(position.get('contracts', 0)) entry_price = float(position.get('entryPx', 0)) if symbol and contracts != 0: self.last_known_positions[symbol] = { 'contracts': contracts, 'entry_price': entry_price } logger.info(f"๐Ÿ“Š Initialized tracking with {len(self.last_known_positions)} positions") except Exception as e: logger.error(f"โŒ Error initializing order tracking: {e}") async def _order_monitoring_loop(self): """Main monitoring loop that runs every Config.BOT_HEARTBEAT_SECONDS seconds.""" while self.monitoring_active: try: await self._check_order_fills() await asyncio.sleep(Config.BOT_HEARTBEAT_SECONDS) # Use configurable interval except asyncio.CancelledError: logger.info("๐Ÿ›‘ Order monitoring cancelled") break except Exception as e: logger.error(f"โŒ Error in order monitoring loop: {e}") await asyncio.sleep(Config.BOT_HEARTBEAT_SECONDS) # Continue monitoring even if there's an error async def _check_order_fills(self): """Check for filled orders and send notifications.""" try: # Get current orders and positions current_orders = self.client.get_open_orders() or [] current_positions = self.client.get_positions() or [] # Get current order IDs current_order_ids = {order.get('id') for order in current_orders if order.get('id')} # Find filled orders (orders that were in last_known_orders but not in current_orders) filled_order_ids = self.last_known_orders - current_order_ids if filled_order_ids: logger.info(f"๐ŸŽฏ Detected {len(filled_order_ids)} filled orders") await self._process_filled_orders(filled_order_ids, current_positions) # Update tracking data self.last_known_orders = current_order_ids await self._update_position_tracking(current_positions) # Check price alarms await self._check_price_alarms() # Check external trades (trades made outside the bot) await self._check_external_trades() # Check stop losses (if risk management is enabled) if Config.RISK_MANAGEMENT_ENABLED: await self._check_stop_losses(current_positions) except Exception as e: logger.error(f"โŒ Error checking order fills: {e}") async def _check_price_alarms(self): """Check all active price alarms.""" try: # Get all active alarms active_alarms = self.alarm_manager.get_all_active_alarms() if not active_alarms: return # Get unique tokens from alarms tokens_to_check = list(set(alarm['token'] for alarm in active_alarms)) # Fetch current prices for all tokens price_data = {} for token in tokens_to_check: symbol = f"{token}/USDC:USDC" market_data = self.client.get_market_data(symbol) if market_data and market_data.get('ticker'): current_price = market_data['ticker'].get('last') if current_price is not None: price_data[token] = float(current_price) # Check alarms against current prices triggered_alarms = self.alarm_manager.check_alarms(price_data) # Send notifications for triggered alarms for alarm in triggered_alarms: await self._send_alarm_notification(alarm) except Exception as e: logger.error(f"โŒ Error checking price alarms: {e}") async def _send_alarm_notification(self, alarm: Dict[str, Any]): """Send notification for triggered alarm.""" try: message = self.alarm_manager.format_triggered_alarm(alarm) await self.send_message(message) logger.info(f"๐Ÿ“ข Sent alarm notification: {alarm['token']} ID {alarm['id']} @ ${alarm['triggered_price']}") except Exception as e: logger.error(f"โŒ Error sending alarm notification: {e}") async def _check_external_trades(self): """Check for trades made outside the Telegram bot and update stats.""" try: # Get recent fills from Hyperliquid recent_fills = self.client.get_recent_fills() if not recent_fills: return # Initialize last processed time if first run if self.last_processed_trade_time is None: # Set to current time minus 1 hour to catch recent activity self.last_processed_trade_time = (datetime.now() - timedelta(hours=1)).isoformat() # Filter for new trades since last check new_trades = [] latest_trade_time = self.last_processed_trade_time for fill in recent_fills: fill_time = fill.get('timestamp') if fill_time: # Convert timestamps to comparable format try: # Convert fill_time to string if it's not already if isinstance(fill_time, (int, float)): # Assume it's a unix timestamp fill_time_str = datetime.fromtimestamp(fill_time / 1000 if fill_time > 1e10 else fill_time).isoformat() else: fill_time_str = str(fill_time) # Compare as strings if fill_time_str > self.last_processed_trade_time: new_trades.append(fill) if fill_time_str > latest_trade_time: latest_trade_time = fill_time_str except Exception as timestamp_error: logger.warning(f"โš ๏ธ Error processing timestamp {fill_time}: {timestamp_error}") continue if not new_trades: return # Process new trades for trade in new_trades: await self._process_external_trade(trade) # Update last processed time self.last_processed_trade_time = latest_trade_time if new_trades: logger.info(f"๐Ÿ“Š Processed {len(new_trades)} external trades") except Exception as e: logger.error(f"โŒ Error checking external trades: {e}") async def _process_external_trade(self, trade: Dict[str, Any]): """Process an individual external trade.""" try: # Extract trade information symbol = trade.get('symbol', '') side = trade.get('side', '') amount = float(trade.get('amount', 0)) price = float(trade.get('price', 0)) trade_id = trade.get('id', 'external') timestamp = trade.get('timestamp', '') if not all([symbol, side, amount, price]): return # Record trade in stats self.stats.record_trade(symbol, side, amount, price, trade_id) # Send notification for significant trades await self._send_external_trade_notification(trade) logger.info(f"๐Ÿ“‹ Processed external trade: {side} {amount} {symbol} @ ${price}") except Exception as e: logger.error(f"โŒ Error processing external trade: {e}") async def _send_external_trade_notification(self, trade: Dict[str, Any]): """Send notification for external trades.""" try: symbol = trade.get('symbol', '') side = trade.get('side', '') amount = float(trade.get('amount', 0)) price = float(trade.get('price', 0)) timestamp = trade.get('timestamp', '') # Extract token from symbol token = symbol.split('/')[0] if '/' in symbol else symbol # Format timestamp try: trade_time = datetime.fromisoformat(timestamp.replace('Z', '+00:00')) time_str = trade_time.strftime('%H:%M:%S') except: time_str = "Unknown" # Determine trade type and emoji side_emoji = "๐ŸŸข" if side.lower() == 'buy' else "๐Ÿ”ด" trade_value = amount * price message = f""" ๐Ÿ”„ External Trade Detected ๐Ÿ“Š Trade Details: โ€ข Token: {token} โ€ข Side: {side.upper()} โ€ข Amount: {amount} {token} โ€ข Price: ${price:,.2f} โ€ข Value: ${trade_value:,.2f} {side_emoji} Source: Direct Platform Trade โฐ Time: {time_str} ๐Ÿ“ˆ Note: This trade was executed outside the Telegram bot ๐Ÿ“Š Stats have been automatically updated """ await self.send_message(message.strip()) logger.info(f"๐Ÿ“ข Sent external trade notification: {side} {amount} {token}") except Exception as e: logger.error(f"โŒ Error sending external trade notification: {e}") async def _check_stop_losses(self, current_positions: list): """Check all positions for stop loss triggers and execute automatic exits.""" try: if not current_positions: return stop_loss_triggers = [] for position in current_positions: symbol = position.get('symbol') contracts = float(position.get('contracts', 0)) entry_price = float(position.get('entryPx', 0)) if not symbol or contracts == 0 or entry_price == 0: continue # Get current market price market_data = self.client.get_market_data(symbol) if not market_data or not market_data.get('ticker'): continue current_price = float(market_data['ticker'].get('last', 0)) if current_price == 0: continue # Calculate current P&L percentage if contracts > 0: # Long position pnl_percent = ((current_price - entry_price) / entry_price) * 100 else: # Short position pnl_percent = ((entry_price - current_price) / entry_price) * 100 # Check if stop loss should trigger if pnl_percent <= -Config.STOP_LOSS_PERCENTAGE: token = symbol.split('/')[0] if '/' in symbol else symbol stop_loss_triggers.append({ 'symbol': symbol, 'token': token, 'contracts': contracts, 'entry_price': entry_price, 'current_price': current_price, 'pnl_percent': pnl_percent }) # Execute stop losses for trigger in stop_loss_triggers: await self._execute_automatic_stop_loss(trigger) except Exception as e: logger.error(f"โŒ Error checking stop losses: {e}") async def _execute_automatic_stop_loss(self, trigger: Dict[str, Any]): """Execute an automatic stop loss order.""" try: symbol = trigger['symbol'] token = trigger['token'] contracts = trigger['contracts'] entry_price = trigger['entry_price'] current_price = trigger['current_price'] pnl_percent = trigger['pnl_percent'] # Determine the exit side (opposite of position) exit_side = 'sell' if contracts > 0 else 'buy' contracts_abs = abs(contracts) # Send notification before executing await self._send_stop_loss_notification(trigger, "triggered") # Execute the stop loss order (market order for immediate execution) try: if exit_side == 'sell': order = self.client.create_market_sell_order(symbol, contracts_abs) else: order = self.client.create_market_buy_order(symbol, contracts_abs) if order: logger.info(f"๐Ÿ›‘ Stop loss executed: {token} {exit_side} {contracts_abs} @ ${current_price}") # Record the trade in stats self.stats.record_trade(symbol, exit_side, contracts_abs, current_price, order.get('id', 'stop_loss')) # Send success notification await self._send_stop_loss_notification(trigger, "executed", order) else: logger.error(f"โŒ Stop loss order failed for {token}") await self._send_stop_loss_notification(trigger, "failed") except Exception as order_error: logger.error(f"โŒ Stop loss order execution failed for {token}: {order_error}") await self._send_stop_loss_notification(trigger, "failed", error=str(order_error)) except Exception as e: logger.error(f"โŒ Error executing automatic stop loss: {e}") async def _send_stop_loss_notification(self, trigger: Dict[str, Any], status: str, order: Dict = None, error: str = None): """Send notification for stop loss events.""" try: token = trigger['token'] contracts = trigger['contracts'] entry_price = trigger['entry_price'] current_price = trigger['current_price'] pnl_percent = trigger['pnl_percent'] position_type = "LONG" if contracts > 0 else "SHORT" contracts_abs = abs(contracts) if status == "triggered": title = "๐Ÿ›‘ Stop Loss Triggered" status_text = f"Stop loss triggered at {Config.STOP_LOSS_PERCENTAGE}% loss" emoji = "๐Ÿšจ" elif status == "executed": title = "โœ… Stop Loss Executed" status_text = "Position closed automatically" emoji = "๐Ÿ›‘" elif status == "failed": title = "โŒ Stop Loss Failed" status_text = f"Stop loss execution failed{': ' + error if error else ''}" emoji = "โš ๏ธ" else: return # Calculate loss loss_value = contracts_abs * abs(current_price - entry_price) message = f""" {title} {emoji} Risk Management Alert ๐Ÿ“Š Position Details: โ€ข Token: {token} โ€ข Direction: {position_type} โ€ข Size: {contracts_abs} contracts โ€ข Entry Price: ${entry_price:,.2f} โ€ข Current Price: ${current_price:,.2f} ๐Ÿ”ด Loss Details: โ€ข Loss: ${loss_value:,.2f} ({pnl_percent:.2f}%) โ€ข Stop Loss Threshold: {Config.STOP_LOSS_PERCENTAGE}% ๐Ÿ“‹ Action: {status_text} โฐ Time: {datetime.now().strftime('%H:%M:%S')} """ if order and status == "executed": order_id = order.get('id', 'N/A') message += f"\n๐Ÿ†” Order ID: {order_id}" await self.send_message(message.strip()) logger.info(f"๐Ÿ“ข Sent stop loss notification: {token} {status}") except Exception as e: logger.error(f"โŒ Error sending stop loss notification: {e}") async def _process_filled_orders(self, filled_order_ids: set, current_positions: list): """Process filled orders and determine if they opened or closed positions.""" try: # Create a map of current positions current_position_map = {} for position in current_positions: symbol = position.get('symbol') contracts = float(position.get('contracts', 0)) if symbol: current_position_map[symbol] = contracts # For each symbol, check if position size changed for symbol, old_position_data in self.last_known_positions.items(): old_contracts = old_position_data['contracts'] current_contracts = current_position_map.get(symbol, 0) if old_contracts != current_contracts: # Position changed - determine if it's open or close await self._handle_position_change(symbol, old_position_data, current_contracts) # Check for new positions (symbols not in last_known_positions) for symbol, current_contracts in current_position_map.items(): if symbol not in self.last_known_positions and current_contracts != 0: # New position opened await self._handle_new_position(symbol, current_contracts) except Exception as e: logger.error(f"โŒ Error processing filled orders: {e}") async def _handle_position_change(self, symbol: str, old_position_data: dict, current_contracts: float): """Handle when an existing position changes size.""" old_contracts = old_position_data['contracts'] old_entry_price = old_position_data['entry_price'] # Get current market price market_data = self.client.get_market_data(symbol) current_price = 0 if market_data: current_price = float(market_data['ticker'].get('last', 0)) token = symbol.split('/')[0] if '/' in symbol else symbol if current_contracts == 0 and old_contracts != 0: # Position closed await self._send_close_trade_notification(token, old_contracts, old_entry_price, current_price) elif abs(current_contracts) > abs(old_contracts): # Position increased added_contracts = current_contracts - old_contracts await self._send_open_trade_notification(token, added_contracts, current_price, "increased") elif abs(current_contracts) < abs(old_contracts): # Position decreased (partial close) closed_contracts = old_contracts - current_contracts await self._send_partial_close_notification(token, closed_contracts, old_entry_price, current_price) async def _handle_new_position(self, symbol: str, contracts: float): """Handle when a new position is opened.""" # Get current market price market_data = self.client.get_market_data(symbol) current_price = 0 if market_data: current_price = float(market_data['ticker'].get('last', 0)) token = symbol.split('/')[0] if '/' in symbol else symbol await self._send_open_trade_notification(token, contracts, current_price, "opened") async def _update_position_tracking(self, current_positions: list): """Update the position tracking data.""" new_position_map = {} for position in current_positions: symbol = position.get('symbol') contracts = float(position.get('contracts', 0)) entry_price = float(position.get('entryPx', 0)) if symbol and contracts != 0: new_position_map[symbol] = { 'contracts': contracts, 'entry_price': entry_price } self.last_known_positions = new_position_map async def _send_open_trade_notification(self, token: str, contracts: float, price: float, action: str): """Send notification for opened/increased position.""" position_type = "LONG" if contracts > 0 else "SHORT" contracts_abs = abs(contracts) value = contracts_abs * price if action == "opened": title = "๐Ÿš€ Position Opened" action_text = f"New {position_type} position opened" else: title = "๐Ÿ“ˆ Position Increased" action_text = f"{position_type} position increased" message = f""" {title} ๐Ÿ“Š Trade Details: โ€ข Token: {token} โ€ข Direction: {position_type} โ€ข Size: {contracts_abs} contracts โ€ข Entry Price: ${price:,.2f} โ€ข Value: ${value:,.2f} โœ… Status: {action_text} โฐ Time: {datetime.now().strftime('%H:%M:%S')} ๐Ÿ“ฑ Use /positions to view all positions """ await self.send_message(message.strip()) logger.info(f"๐Ÿ“ข Sent open trade notification: {token} {position_type} {contracts_abs} @ ${price}") async def _send_close_trade_notification(self, token: str, contracts: float, entry_price: float, exit_price: float): """Send notification for closed position with P&L.""" position_type = "LONG" if contracts > 0 else "SHORT" contracts_abs = abs(contracts) # Calculate P&L if contracts > 0: # Long position pnl = (exit_price - entry_price) * contracts_abs else: # Short position pnl = (entry_price - exit_price) * contracts_abs pnl_percent = (pnl / (entry_price * contracts_abs)) * 100 if entry_price > 0 else 0 pnl_emoji = "๐ŸŸข" if pnl >= 0 else "๐Ÿ”ด" exit_value = contracts_abs * exit_price message = f""" ๐ŸŽฏ Position Closed ๐Ÿ“Š Trade Summary: โ€ข Token: {token} โ€ข Direction: {position_type} โ€ข Size: {contracts_abs} contracts โ€ข Entry Price: ${entry_price:,.2f} โ€ข Exit Price: ${exit_price:,.2f} โ€ข Exit Value: ${exit_value:,.2f} {pnl_emoji} Profit & Loss: โ€ข P&L: ${pnl:,.2f} ({pnl_percent:+.2f}%) โ€ข Result: {"PROFIT" if pnl >= 0 else "LOSS"} โœ… Status: Position fully closed โฐ Time: {datetime.now().strftime('%H:%M:%S')} ๐Ÿ“Š Use /stats to view updated performance """ await self.send_message(message.strip()) logger.info(f"๐Ÿ“ข Sent close trade notification: {token} {position_type} P&L: ${pnl:.2f}") async def _send_partial_close_notification(self, token: str, contracts: float, entry_price: float, exit_price: float): """Send notification for partially closed position.""" position_type = "LONG" if contracts > 0 else "SHORT" contracts_abs = abs(contracts) # Calculate P&L for closed portion if contracts > 0: # Long position pnl = (exit_price - entry_price) * contracts_abs else: # Short position pnl = (entry_price - exit_price) * contracts_abs pnl_percent = (pnl / (entry_price * contracts_abs)) * 100 if entry_price > 0 else 0 pnl_emoji = "๐ŸŸข" if pnl >= 0 else "๐Ÿ”ด" message = f""" ๐Ÿ“‰ Position Partially Closed ๐Ÿ“Š Partial Close Details: โ€ข Token: {token} โ€ข Direction: {position_type} โ€ข Closed Size: {contracts_abs} contracts โ€ข Entry Price: ${entry_price:,.2f} โ€ข Exit Price: ${exit_price:,.2f} {pnl_emoji} Partial P&L: โ€ข P&L: ${pnl:,.2f} ({pnl_percent:+.2f}%) โœ… Status: Partial position closed โฐ Time: {datetime.now().strftime('%H:%M:%S')} ๐Ÿ“ˆ Use /positions to view remaining position """ await self.send_message(message.strip()) logger.info(f"๐Ÿ“ข Sent partial close notification: {token} {position_type} Partial P&L: ${pnl:.2f}") async def monitoring_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Handle the /monitoring command to show monitoring status.""" if not self.is_authorized(update.effective_chat.id): await update.message.reply_text("โŒ Unauthorized access.") return # Get alarm statistics alarm_stats = self.alarm_manager.get_statistics() status_text = f""" ๐Ÿ”„ System Monitoring Status ๐Ÿ“Š Order Monitoring: โ€ข Active: {'โœ… Yes' if self.monitoring_active else 'โŒ No'} โ€ข Check Interval: {Config.BOT_HEARTBEAT_SECONDS} seconds โ€ข Orders Tracked: {len(self.last_known_orders)} โ€ข Positions Tracked: {len(self.last_known_positions)} ๐Ÿ”” Price Alarms: โ€ข Active Alarms: {alarm_stats['total_active']} โ€ข Triggered Today: {alarm_stats['total_triggered']} โ€ข Tokens Monitored: {alarm_stats['tokens_tracked']} โ€ข Next Alarm ID: {alarm_stats['next_id']} ๐Ÿ”„ External Trade Monitoring: โ€ข Last Check: {self.last_processed_trade_time or 'Not started'} โ€ข Auto Stats Update: โœ… Enabled โ€ข External Notifications: โœ… Enabled ๐Ÿ›ก๏ธ Risk Management: โ€ข Automatic Stop Loss: {'โœ… Enabled' if Config.RISK_MANAGEMENT_ENABLED else 'โŒ Disabled'} โ€ข Stop Loss Threshold: {Config.STOP_LOSS_PERCENTAGE}% โ€ข Position Monitoring: {'โœ… Active' if Config.RISK_MANAGEMENT_ENABLED else 'โŒ Inactive'} ๐Ÿ“ˆ Notifications: โ€ข ๐Ÿš€ Position Opened/Increased โ€ข ๐Ÿ“‰ Position Partially/Fully Closed โ€ข ๐ŸŽฏ P&L Calculations โ€ข ๐Ÿ”” Price Alarm Triggers โ€ข ๐Ÿ”„ External Trade Detection โ€ข ๐Ÿ›‘ Automatic Stop Loss Triggers โฐ Last Check: {datetime.now().strftime('%H:%M:%S')} ๐Ÿ’ก Monitoring Features: โ€ข Real-time order fill detection โ€ข Automatic P&L calculation โ€ข Position change tracking โ€ข Price alarm monitoring โ€ข External trade monitoring โ€ข Auto stats synchronization โ€ข Instant Telegram notifications """ if alarm_stats['token_breakdown']: status_text += f"\n\n๐Ÿ“‹ Active Alarms by Token:\n" for token, count in alarm_stats['token_breakdown'].items(): status_text += f"โ€ข {token}: {count} alarm{'s' if count != 1 else ''}\n" await update.message.reply_text(status_text.strip(), parse_mode='HTML') async def alarm_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Handle the /alarm command for price alerts.""" if not self.is_authorized(update.effective_chat.id): await update.message.reply_text("โŒ Unauthorized access.") return try: if not context.args or len(context.args) == 0: # No arguments - list all alarms alarms = self.alarm_manager.get_all_active_alarms() message = self.alarm_manager.format_alarm_list(alarms) await update.message.reply_text(message, parse_mode='HTML') return elif len(context.args) == 1: arg = context.args[0] # Check if argument is a number (alarm ID to remove) try: alarm_id = int(arg) # Remove alarm by ID if self.alarm_manager.remove_alarm(alarm_id): await update.message.reply_text(f"โœ… Alarm ID {alarm_id} has been removed.") else: await update.message.reply_text(f"โŒ Alarm ID {alarm_id} not found.") return except ValueError: # Not a number, treat as token token = arg.upper() alarms = self.alarm_manager.get_alarms_by_token(token) message = self.alarm_manager.format_alarm_list(alarms, f"{token} Price Alarms") await update.message.reply_text(message, parse_mode='HTML') return elif len(context.args) == 2: # Set new alarm: /alarm TOKEN PRICE token = context.args[0].upper() target_price = float(context.args[1]) # Get current market price symbol = f"{token}/USDC:USDC" market_data = self.client.get_market_data(symbol) if not market_data or not market_data.get('ticker'): await update.message.reply_text(f"โŒ Could not fetch current price for {token}") return current_price = float(market_data['ticker'].get('last', 0)) if current_price <= 0: await update.message.reply_text(f"โŒ Invalid current price for {token}") return # Create the alarm alarm = self.alarm_manager.create_alarm(token, target_price, current_price) # Format confirmation message direction_emoji = "๐Ÿ“ˆ" if alarm['direction'] == 'above' else "๐Ÿ“‰" price_diff = abs(target_price - current_price) price_diff_percent = (price_diff / current_price) * 100 message = f""" โœ… Price Alarm Created ๐Ÿ“Š Alarm Details: โ€ข Alarm ID: {alarm['id']} โ€ข Token: {token} โ€ข Target Price: ${target_price:,.2f} โ€ข Current Price: ${current_price:,.2f} โ€ข Direction: {alarm['direction'].upper()} {direction_emoji} Alert Condition: Will trigger when {token} price moves {alarm['direction']} ${target_price:,.2f} ๐Ÿ’ฐ Price Difference: โ€ข Distance: ${price_diff:,.2f} ({price_diff_percent:.2f}%) โ€ข Status: ACTIVE โœ… โฐ Created: {datetime.now().strftime('%H:%M:%S')} ๐Ÿ’ก The alarm will be checked every {Config.BOT_HEARTBEAT_SECONDS} seconds and you'll receive a notification when triggered. """ await update.message.reply_text(message.strip(), parse_mode='HTML') else: # Too many arguments await update.message.reply_text( "โŒ Invalid usage. Examples:\n\n" "โ€ข /alarm - List all alarms\n" "โ€ข /alarm BTC - List BTC alarms\n" "โ€ข /alarm BTC 50000 - Set alarm for BTC at $50,000\n" "โ€ข /alarm 3 - Remove alarm ID 3", parse_mode='HTML' ) except ValueError: await update.message.reply_text("โŒ Invalid price format. Please use numbers only.") except Exception as e: error_message = f"โŒ Error processing alarm command: {str(e)}" await update.message.reply_text(error_message) logger.error(f"Error in alarm command: {e}") async def logs_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Handle the /logs command to show log file statistics and cleanup options.""" if not self.is_authorized(update.effective_chat.id): await update.message.reply_text("โŒ Unauthorized access.") return try: # Check for cleanup argument if context.args and len(context.args) >= 1: if context.args[0].lower() == 'cleanup': # Get days parameter (default 30) days_to_keep = 30 if len(context.args) >= 2: try: days_to_keep = int(context.args[1]) except ValueError: await update.message.reply_text("โŒ Invalid number of days. Using default (30).") # Perform cleanup await update.message.reply_text(f"๐Ÿงน Cleaning up log files older than {days_to_keep} days...") cleanup_logs(days_to_keep) await update.message.reply_text(f"โœ… Log cleanup completed!") return # Show log statistics log_stats_text = format_log_stats() # Add additional info status_text = f""" ๐Ÿ“Š System Logging Status {log_stats_text} ๐Ÿ“ˆ Log Configuration: โ€ข Log Level: {Config.LOG_LEVEL} โ€ข Heartbeat Interval: {Config.BOT_HEARTBEAT_SECONDS}s โ€ข Bot Uptime: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ๐Ÿ’ก Log Management: โ€ข /logs cleanup - Clean old logs (30 days) โ€ข /logs cleanup 7 - Clean logs older than 7 days โ€ข Log rotation happens automatically โ€ข Old backups are removed automatically ๐Ÿ”ง Configuration: โ€ข Rotation Type: {Config.LOG_ROTATION_TYPE} โ€ข Max Size: {Config.LOG_MAX_SIZE_MB}MB (size rotation) โ€ข Backup Count: {Config.LOG_BACKUP_COUNT} """ await update.message.reply_text(status_text.strip(), parse_mode='HTML') except Exception as e: error_message = f"โŒ Error processing logs command: {str(e)}" await update.message.reply_text(error_message) logger.error(f"Error in logs command: {e}") async def main_async(): """Async main entry point for the Telegram bot.""" try: # Validate configuration if not Config.validate(): logger.error("โŒ Configuration validation failed!") return if not Config.TELEGRAM_ENABLED: logger.error("โŒ Telegram is not enabled in configuration") return # Create and run the bot bot = TelegramTradingBot() await bot.run() except KeyboardInterrupt: logger.info("๐Ÿ‘‹ Bot stopped by user") except Exception as e: logger.error(f"โŒ Unexpected error: {e}") raise def main(): """Main entry point for the Telegram bot.""" try: # Check if we're already in an asyncio context try: loop = asyncio.get_running_loop() # If we get here, we're already in an asyncio context logger.error("โŒ Cannot run main() from within an asyncio context. Use main_async() instead.") return except RuntimeError: # No running loop, safe to use asyncio.run() pass # Run the async main function asyncio.run(main_async()) except Exception as e: logger.error(f"โŒ Failed to start telegram bot: {e}") raise if __name__ == "__main__": main()