#!/usr/bin/env python3
"""
Telegram Bot for Hyperliquid Trading
This module provides a Telegram interface for manual Hyperliquid trading
with comprehensive statistics tracking and phone-friendly controls.
"""
import logging
import asyncio
import re
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Application, CommandHandler, CallbackQueryHandler, ContextTypes, MessageHandler, filters
from hyperliquid_client import HyperliquidClient
from trading_stats import TradingStats
from config import Config
from alarm_manager import AlarmManager
from logging_config import setup_logging, cleanup_logs, format_log_stats
# Set up logging using the new configuration system
logger = setup_logging().getChild(__name__)
class TelegramTradingBot:
"""Telegram trading bot for manual trading operations."""
def __init__(self):
"""Initialize the Telegram trading bot."""
self.client = HyperliquidClient()
self.application = None
self.order_monitoring_task = None
self.last_filled_orders = set()
self.alarms = [] # List to store price alarms
self.bot_heartbeat_seconds = getattr(Config, 'BOT_HEARTBEAT_SECONDS', 10)
self.external_trade_timestamps = set() # Track external trade timestamps to avoid duplicates
self.last_position_check = {} # Track last position state for comparison
self._position_tracker = {} # For enhanced position tracking
self.stats = None
self.version = "Unknown" # Will be set by launcher
# Initialize stats
self._initialize_stats()
def _initialize_stats(self):
"""Initialize stats with current balance."""
try:
balance = self.client.get_balance()
if balance and balance.get('total'):
# Get USDC balance as the main balance
usdc_balance = float(balance['total'].get('USDC', 0))
self.stats.set_initial_balance(usdc_balance)
except Exception as e:
logger.error(f"Could not initialize stats: {e}")
def is_authorized(self, chat_id: str) -> bool:
"""Check if the chat ID is authorized to use the bot."""
return str(chat_id) == str(Config.TELEGRAM_CHAT_ID)
async def send_message(self, text: str, parse_mode: str = 'HTML') -> None:
"""Send a message to the authorized chat."""
if self.application and Config.TELEGRAM_CHAT_ID:
try:
await self.application.bot.send_message(
chat_id=Config.TELEGRAM_CHAT_ID,
text=text,
parse_mode=parse_mode
)
except Exception as e:
logger.error(f"Failed to send message: {e}")
async def start_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle the /start command."""
if not self.is_authorized(update.effective_chat.id):
await update.message.reply_text("โ Unauthorized access.")
return
welcome_text = f"""
๐ค Welcome to Hyperliquid Trading Bot
๐ฑ Quick Actions:
โข Trading: /long BTC 100 or /short ETH 50
โข Exit: /exit BTC (closes position)
โข Info: /balance, /positions, /orders
๐ Market Data:
โข /market - Detailed market overview
โข /price - Quick price check
โก Quick Commands:
โข /balance - Account balance
โข /positions - Open positions
โข /orders - Active orders
โข /market - Market data & prices
๐ Trading:
โข /long BTC 100 - Long position
โข /short ETH 50 - Short position
โข /exit BTC - Close position
โข /coo BTC - Cancel open orders
๐ก๏ธ Risk Management:
โข Enabled: {risk_enabled}
โข Auto Stop Loss: {stop_loss}%
โข /sl BTC 44000 - Manual stop loss
โข /tp BTC 50000 - Take profit order
๐ Performance & Analytics:
โข /stats - Complete trading statistics
โข /performance - Token performance ranking & detailed stats
โข /daily - Daily performance (last 10 days)
โข /weekly - Weekly performance (last 10 weeks)
โข /monthly - Monthly performance (last 10 months)
โข /risk - Sharpe ratio, drawdown, VaR
โข /version - Bot version & system information
โข /trades - Recent trade history
๐ Price Alerts:
โข /alarm - List all active alarms
โข /alarm BTC 50000 - Set alarm for BTC at $50,000
โข /alarm BTC - Show all BTC alarms
โข /alarm 3 - Remove alarm ID 3
๐ Automatic Monitoring:
โข Real-time order fill alerts
โข Position opened/closed notifications
โข P&L calculations on trade closure
โข Price alarm triggers
โข External trade detection & sync
โข Auto stats synchronization
โข {heartbeat}-second monitoring interval
๐ Universal Trade Tracking:
โข Bot trades: Full logging & notifications
โข Platform trades: Auto-detected & synced
โข Mobile app trades: Monitored & recorded
โข API trades: Tracked & included in stats
Type /help for detailed command information.
๐ Order Monitoring:
โข /monitoring - View monitoring status
โข /logs - View log file statistics and cleanup
โ๏ธ Configuration:
โข Symbol: {symbol}
โข Default Token: {symbol}
โข Network: {network}
๐ก๏ธ Safety Features:
โข All trades logged automatically
โข Comprehensive performance tracking
โข Real-time balance monitoring
โข Risk metrics calculation
๐ฑ Mobile Optimized:
โข Quick action buttons
โข Instant notifications
โข Clean, readable layout
โข One-tap commands
๐ก Quick Access:
โข /commands or /c - One-tap button menu for all commands
โข Buttons below for instant access to key functions
For support, contact your bot administrator.
""".format(
symbol=Config.DEFAULT_TRADING_TOKEN,
network="Testnet" if Config.HYPERLIQUID_TESTNET else "Mainnet",
risk_enabled=Config.RISK_MANAGEMENT_ENABLED,
stop_loss=Config.STOP_LOSS_PERCENTAGE,
heartbeat=Config.BOT_HEARTBEAT_SECONDS
)
keyboard = [
[
InlineKeyboardButton("๐ฐ Balance", callback_data="balance"),
InlineKeyboardButton("๐ Stats", callback_data="stats")
],
[
InlineKeyboardButton("๐ Positions", callback_data="positions"),
InlineKeyboardButton("๐ Orders", callback_data="orders")
],
[
InlineKeyboardButton("๐ต Price", callback_data="price"),
InlineKeyboardButton("๐ Market", callback_data="market")
],
[
InlineKeyboardButton("๐ Recent Trades", callback_data="trades"),
InlineKeyboardButton("โ๏ธ Help", callback_data="help")
]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(welcome_text, parse_mode='HTML', reply_markup=reply_markup)
async def help_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle the /help command."""
if not self.is_authorized(update.effective_chat.id):
await update.message.reply_text("โ Unauthorized access.")
return
help_text = """
๐ง Hyperliquid Trading Bot - Complete Guide
๐ผ Account Management:
โข /balance - Show account balance
โข /positions - Show open positions
โข /orders - Show open orders
๐ Market Data:
โข /market - Detailed market data (default token)
โข /market BTC - Market data for specific token
โข /price - Quick price check (default token)
โข /price SOL - Price for specific token
๐ Perps Trading:
โข /long BTC 100 - Long BTC with $100 USDC (Market Order)
โข /long BTC 100 45000 - Long BTC with $100 USDC at $45,000 (Limit Order)
โข /short ETH 50 - Short ETH with $50 USDC (Market Order)
โข /short ETH 50 3500 - Short ETH with $50 USDC at $3,500 (Limit Order)
โข /exit BTC - Close BTC position with Market Order
๐ก๏ธ Risk Management:
โข /sl BTC 44000 - Set stop loss for BTC at $44,000
โข /tp BTC 50000 - Set take profit for BTC at $50,000
๐จ Automatic Stop Loss:
โข Enabled: {risk_enabled}
โข Stop Loss: {stop_loss}% (automatic execution)
โข Monitoring: Every {heartbeat} seconds
๐ Order Management:
โข /orders - Show all open orders
โข /orders BTC - Show open orders for BTC only
โข /coo BTC - Cancel all open orders for BTC
๐ Statistics & Analytics:
โข /stats - Complete trading statistics
โข /performance - Win rate, profit factor, etc.
โข /risk - Sharpe ratio, drawdown, VaR
โข /version - Bot version & system information
โข /trades - Recent trade history
๐ Price Alerts:
โข /alarm - List all active alarms
โข /alarm BTC 50000 - Set alarm for BTC at $50,000
โข /alarm BTC - Show all BTC alarms
โข /alarm 3 - Remove alarm ID 3
๐ Order Monitoring:
โข /monitoring - View monitoring status
โข /logs - View log file statistics and cleanup
โ๏ธ Configuration:
โข Symbol: {symbol}
โข Default Token: {symbol}
โข Network: {network}
๐ก๏ธ Safety Features:
โข All trades logged automatically
โข Comprehensive performance tracking
โข Real-time balance monitoring
โข Risk metrics calculation
๐ฑ Mobile Optimized:
โข Quick action buttons
โข Instant notifications
โข Clean, readable layout
โข One-tap commands
For support, contact your bot administrator.
""".format(
symbol=Config.DEFAULT_TRADING_TOKEN,
network="Testnet" if Config.HYPERLIQUID_TESTNET else "Mainnet",
risk_enabled=Config.RISK_MANAGEMENT_ENABLED,
stop_loss=Config.STOP_LOSS_PERCENTAGE,
heartbeat=Config.BOT_HEARTBEAT_SECONDS
)
await update.message.reply_text(help_text, parse_mode='HTML')
async def commands_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle the /commands and /c command with quick action buttons."""
if not self.is_authorized(update.effective_chat.id):
await update.message.reply_text("โ Unauthorized access.")
return
commands_text = """
๐ฑ Quick Commands
Tap any button below for instant access to bot functions:
๐ก Pro Tip: These buttons work the same as typing the commands manually, but faster!
"""
keyboard = [
[
InlineKeyboardButton("๐ฐ Balance", callback_data="balance"),
InlineKeyboardButton("๐ Positions", callback_data="positions")
],
[
InlineKeyboardButton("๐ Orders", callback_data="orders"),
InlineKeyboardButton("๐ Stats", callback_data="stats")
],
[
InlineKeyboardButton("๐ต Price", callback_data="price"),
InlineKeyboardButton("๐ Market", callback_data="market")
],
[
InlineKeyboardButton("๐ Performance", callback_data="performance"),
InlineKeyboardButton("๐ Alarms", callback_data="alarm")
],
[
InlineKeyboardButton("๐
Daily", callback_data="daily"),
InlineKeyboardButton("๐ Weekly", callback_data="weekly")
],
[
InlineKeyboardButton("๐ Monthly", callback_data="monthly"),
InlineKeyboardButton("๐ Trades", callback_data="trades")
],
[
InlineKeyboardButton("๐ Monitoring", callback_data="monitoring"),
InlineKeyboardButton("๐ Logs", callback_data="logs")
],
[
InlineKeyboardButton("โ๏ธ Help", callback_data="help")
]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(commands_text, parse_mode='HTML', reply_markup=reply_markup)
async def stats_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle the /stats command."""
if not self.is_authorized(update.effective_chat.id):
await update.message.reply_text("โ Unauthorized access.")
return
# Get current balance for stats
balance = self.client.get_balance()
current_balance = 0
if balance and balance.get('total'):
current_balance = float(balance['total'].get('USDC', 0))
stats_message = self.stats.format_stats_message(current_balance)
await update.message.reply_text(stats_message, parse_mode='HTML')
async def trades_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle the /trades command."""
if not self.is_authorized(update.effective_chat.id):
await update.message.reply_text("โ Unauthorized access.")
return
recent_trades = self.stats.get_recent_trades(10)
if not recent_trades:
await update.message.reply_text("๐ No trades recorded yet.")
return
trades_text = "๐ Recent Trades\n\n"
for trade in reversed(recent_trades[-5:]): # Show last 5 trades
timestamp = datetime.fromisoformat(trade['timestamp']).strftime('%m/%d %H:%M')
side_emoji = "๐ข" if trade['side'] == 'buy' else "๐ด"
trades_text += f"{side_emoji} {trade['side'].upper()} {trade['amount']} {trade['symbol']}\n"
trades_text += f" ๐ฐ ${trade['price']:,.2f} | ๐ต ${trade['value']:,.2f}\n"
trades_text += f" ๐
{timestamp}\n\n"
await update.message.reply_text(trades_text, parse_mode='HTML')
async def balance_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle the /balance command."""
if not self.is_authorized(update.effective_chat.id):
await update.message.reply_text("โ Unauthorized access.")
return
balance = self.client.get_balance()
if balance:
balance_text = "๐ฐ Account Balance\n\n"
# CCXT balance structure includes 'free', 'used', and 'total'
total_balance = balance.get('total', {})
free_balance = balance.get('free', {})
used_balance = balance.get('used', {})
if total_balance:
total_value = 0
available_value = 0
# Display individual assets
for asset, amount in total_balance.items():
if float(amount) > 0:
free_amount = float(free_balance.get(asset, 0))
used_amount = float(used_balance.get(asset, 0))
balance_text += f"๐ต {asset}:\n"
balance_text += f" ๐ Total: {amount}\n"
balance_text += f" โ
Available: {free_amount}\n"
if used_amount > 0:
balance_text += f" ๐ In Use: {used_amount}\n"
balance_text += "\n"
# Calculate totals for USDC (main trading currency)
if asset == 'USDC':
total_value += float(amount)
available_value += free_amount
# Summary section
balance_text += f"๐ผ Portfolio Summary:\n"
balance_text += f" ๐ฐ Total Value: ${total_value:,.2f}\n"
balance_text += f" ๐ Available for Trading: ${available_value:,.2f}\n"
if total_value - available_value > 0:
balance_text += f" ๐ In Active Use: ${total_value - available_value:,.2f}\n"
# Add P&L summary
basic_stats = self.stats.get_basic_stats()
if basic_stats['initial_balance'] > 0:
pnl = total_value - basic_stats['initial_balance']
pnl_percent = (pnl / basic_stats['initial_balance']) * 100
balance_text += f"\n๐ Performance:\n"
balance_text += f" ๐ต P&L: ${pnl:,.2f} ({pnl_percent:+.2f}%)\n"
balance_text += f" ๐ Initial: ${basic_stats['initial_balance']:,.2f}"
else:
balance_text += "๐ญ No balance data available"
else:
balance_text = "โ Could not fetch balance data"
await update.message.reply_text(balance_text, parse_mode='HTML')
async def positions_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle the /positions command."""
if not self.is_authorized(update.effective_chat.id):
await update.message.reply_text("โ Unauthorized access.")
return
positions = self.client.get_positions()
if positions is not None: # Successfully fetched (could be empty list)
positions_text = "๐ Open Positions\n\n"
# Filter for actual open positions
open_positions = [p for p in positions if float(p.get('contracts', 0)) != 0]
if open_positions:
total_unrealized = 0
total_position_value = 0
for position in open_positions:
symbol = position.get('symbol', 'Unknown')
contracts = float(position.get('contracts', 0))
unrealized_pnl = float(position.get('unrealizedPnl', 0))
entry_price = float(position.get('entryPx', 0))
# Calculate position value and P&L percentage
position_value = abs(contracts) * entry_price
pnl_percentage = (unrealized_pnl / position_value * 100) if position_value > 0 else 0
pnl_emoji = "๐ข" if unrealized_pnl >= 0 else "๐ด"
# Extract token name for cleaner display
token = symbol.split('/')[0] if '/' in symbol else symbol
position_type = "LONG" if contracts > 0 else "SHORT"
positions_text += f"๐ {token} ({position_type})\n"
positions_text += f" ๐ Size: {abs(contracts):.6f} {token}\n"
positions_text += f" ๐ฐ Entry: ${entry_price:,.2f}\n"
positions_text += f" ๐ต Value: ${position_value:,.2f}\n"
positions_text += f" {pnl_emoji} P&L: ${unrealized_pnl:,.2f} ({pnl_percentage:+.2f}%)\n\n"
total_unrealized += unrealized_pnl
total_position_value += position_value
# Calculate overall P&L percentage
total_pnl_percentage = (total_unrealized / total_position_value * 100) if total_position_value > 0 else 0
total_pnl_emoji = "๐ข" if total_unrealized >= 0 else "๐ด"
positions_text += f"๐ผ Total Portfolio:\n"
positions_text += f" ๐ต Total Value: ${total_position_value:,.2f}\n"
positions_text += f" {total_pnl_emoji} Total P&L: ${total_unrealized:,.2f} ({total_pnl_percentage:+.2f}%)"
else:
positions_text += "๐ญ No open positions currently\n\n"
positions_text += "๐ Ready to start trading!\n"
positions_text += "Use /buy or /sell commands to open positions."
else:
# Actual API error
positions_text = "โ Could not fetch positions data\n\n"
positions_text += "๐ Please try again in a moment.\n"
positions_text += "If the issue persists, check your connection."
await update.message.reply_text(positions_text, parse_mode='HTML')
async def orders_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle the /orders command with optional token filter."""
if not self.is_authorized(update.effective_chat.id):
await update.message.reply_text("โ Unauthorized access.")
return
# Check if token filter is provided
token_filter = None
if context.args and len(context.args) >= 1:
token_filter = context.args[0].upper()
orders = self.client.get_open_orders()
if orders is not None: # Successfully fetched (could be empty list)
if token_filter:
orders_text = f"๐ Open Orders - {token_filter}\n\n"
# Filter orders for specific token
target_symbol = f"{token_filter}/USDC:USDC"
filtered_orders = [order for order in orders if order.get('symbol') == target_symbol]
else:
orders_text = "๐ All Open Orders\n\n"
filtered_orders = orders
if filtered_orders and len(filtered_orders) > 0:
for order in filtered_orders:
symbol = order.get('symbol', 'Unknown')
side = order.get('side', 'Unknown')
amount = order.get('amount', 0)
price = order.get('price', 0)
order_id = order.get('id', 'Unknown')
# Extract token from symbol for display
token = symbol.split('/')[0] if '/' in symbol else symbol
side_emoji = "๐ข" if side.lower() == 'buy' else "๐ด"
orders_text += f"{side_emoji} {token}\n"
orders_text += f" ๐ {side.upper()} {amount} @ ${price:,.2f}\n"
orders_text += f" ๐ต Value: ${float(amount) * float(price):,.2f}\n"
orders_text += f" ๐ ID: {order_id}
\n\n"
# Add helpful commands
if token_filter:
orders_text += f"๐ก Quick Actions:\n"
orders_text += f"โข /coo {token_filter}
- Cancel all {token_filter} orders\n"
orders_text += f"โข /orders
- View all orders"
else:
orders_text += f"๐ก Filter by token: /orders BTC
, /orders ETH
"
else:
if token_filter:
orders_text += f"๐ญ No open orders for {token_filter}\n\n"
orders_text += f"๐ก No pending {token_filter} orders found.\n"
orders_text += f"Use /long {token_filter} 100
or /short {token_filter} 100
to create new orders."
else:
orders_text += "๐ญ No open orders currently\n\n"
orders_text += "๐ก All clear! No pending orders.\n"
orders_text += "Use /long or /short commands to place new orders."
else:
# Actual API error
orders_text = "โ Could not fetch orders data\n\n"
orders_text += "๐ Please try again in a moment.\n"
orders_text += "If the issue persists, check your connection."
await update.message.reply_text(orders_text, parse_mode='HTML')
async def market_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle the /market command."""
if not self.is_authorized(update.effective_chat.id):
await update.message.reply_text("โ Unauthorized access.")
return
# Check if token is provided as argument
if context.args and len(context.args) >= 1:
token = context.args[0].upper()
else:
token = Config.DEFAULT_TRADING_TOKEN
# Convert token to full symbol format for API
symbol = f"{token}/USDC:USDC"
market_data = self.client.get_market_data(symbol)
if market_data and market_data.get('ticker'):
try:
ticker = market_data['ticker']
orderbook = market_data.get('orderbook', {})
# Safely extract ticker data with fallbacks
current_price = float(ticker.get('last') or 0)
high_24h = float(ticker.get('high') or 0)
low_24h = float(ticker.get('low') or 0)
volume_24h = ticker.get('baseVolume') or ticker.get('volume') or 'N/A'
market_text = f"๐ Market Data - {token}\n\n"
if current_price > 0:
market_text += f"๐ต Current Price: ${current_price:,.2f}\n"
else:
market_text += f"๐ต Current Price: N/A\n"
if high_24h > 0:
market_text += f"๐ 24h High: ${high_24h:,.2f}\n"
else:
market_text += f"๐ 24h High: N/A\n"
if low_24h > 0:
market_text += f"๐ 24h Low: ${low_24h:,.2f}\n"
else:
market_text += f"๐ 24h Low: N/A\n"
market_text += f"๐ 24h Volume: {volume_24h}\n\n"
# Handle orderbook data safely
if orderbook and orderbook.get('bids') and orderbook.get('asks'):
try:
bids = orderbook.get('bids', [])
asks = orderbook.get('asks', [])
if bids and asks and len(bids) > 0 and len(asks) > 0:
best_bid = float(bids[0][0]) if bids[0][0] else 0
best_ask = float(asks[0][0]) if asks[0][0] else 0
if best_bid > 0 and best_ask > 0:
spread = best_ask - best_bid
spread_percent = (spread / best_ask * 100) if best_ask > 0 else 0
market_text += f"๐ข Best Bid: ${best_bid:,.2f}\n"
market_text += f"๐ด Best Ask: ${best_ask:,.2f}\n"
market_text += f"๐ Spread: ${spread:.2f} ({spread_percent:.3f}%)\n"
else:
market_text += f"๐ Orderbook: Data unavailable\n"
else:
market_text += f"๐ Orderbook: No orders available\n"
except (IndexError, ValueError, TypeError) as e:
market_text += f"๐ Orderbook: Error parsing data\n"
else:
market_text += f"๐ Orderbook: Not available\n"
# Add usage hint
market_text += f"\n๐ก Usage: /market {token}
or /market
for default"
except (ValueError, TypeError) as e:
market_text = f"โ Error parsing market data\n\n"
market_text += f"๐ง Raw data received but couldn't parse values.\n"
market_text += f"๐ Please try again or contact support if this persists."
else:
market_text = f"โ Could not fetch market data for {token}\n\n"
market_text += f"๐ Please try again in a moment.\n"
market_text += f"๐ Check your network connection.\n"
market_text += f"๐ก API may be temporarily unavailable.\n\n"
market_text += f"๐ก Usage: /market BTC
, /market ETH
, /market SOL
, etc."
await update.message.reply_text(market_text, parse_mode='HTML')
async def price_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle the /price command."""
if not self.is_authorized(update.effective_chat.id):
await update.message.reply_text("โ Unauthorized access.")
return
# Check if token is provided as argument
if context.args and len(context.args) >= 1:
token = context.args[0].upper()
else:
token = Config.DEFAULT_TRADING_TOKEN
# Convert token to full symbol format for API
symbol = f"{token}/USDC:USDC"
market_data = self.client.get_market_data(symbol)
if market_data and market_data.get('ticker'):
try:
ticker = market_data['ticker']
price_value = ticker.get('last')
if price_value is not None:
price = float(price_value)
price_text = f"๐ต {token}: ${price:,.2f}"
# Add timestamp
timestamp = datetime.now().strftime('%H:%M:%S')
price_text += f"\nโฐ Updated: {timestamp}"
# Add usage hint
price_text += f"\n๐ก Usage: /price {symbol}
or /price
for default"
else:
price_text = f"๐ต {symbol}: Price not available\nโ ๏ธ Data temporarily unavailable"
except (ValueError, TypeError) as e:
price_text = f"โ Error parsing price for {symbol}\n๐ง Please try again"
else:
price_text = f"โ Could not fetch price for {symbol}\n๐ Please try again in a moment\n\n"
price_text += f"๐ก Usage: /price BTC
, /price ETH
, /price SOL
, etc."
await update.message.reply_text(price_text, parse_mode='HTML')
async def button_callback(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle inline keyboard button presses."""
query = update.callback_query
await query.answer()
if not self.is_authorized(query.message.chat_id):
await query.edit_message_text("โ Unauthorized access.")
return
callback_data = query.data
# Handle trading confirmations
if callback_data.startswith('confirm_long_'):
parts = callback_data.split('_')
token = parts[2]
usdc_amount = float(parts[3])
price = float(parts[4])
is_limit = len(parts) > 5 and parts[5] == 'limit'
await self._execute_long_order(query, token, usdc_amount, price, is_limit)
return
elif callback_data.startswith('confirm_short_'):
parts = callback_data.split('_')
token = parts[2]
usdc_amount = float(parts[3])
price = float(parts[4])
is_limit = len(parts) > 5 and parts[5] == 'limit'
await self._execute_short_order(query, token, usdc_amount, price, is_limit)
return
elif callback_data.startswith('confirm_exit_'):
parts = callback_data.split('_')
token = parts[2]
exit_side = parts[3]
contracts = float(parts[4])
price = float(parts[5])
await self._execute_exit_order(query, token, exit_side, contracts, price)
return
elif callback_data.startswith('confirm_coo_'):
parts = callback_data.split('_')
token = parts[2]
await self._execute_coo(query, token)
return
elif callback_data.startswith('confirm_sl_'):
parts = callback_data.split('_')
token = parts[2]
exit_side = parts[3]
contracts = float(parts[4])
price = float(parts[5])
await self._execute_sl_order(query, token, exit_side, contracts, price)
return
elif callback_data.startswith('confirm_tp_'):
parts = callback_data.split('_')
token = parts[2]
exit_side = parts[3]
contracts = float(parts[4])
price = float(parts[5])
await self._execute_tp_order(query, token, exit_side, contracts, price)
return
elif callback_data == 'cancel_order':
await query.edit_message_text("โ Order cancelled.")
return
# Create a fake update object for reusing command handlers
fake_update = Update(
update_id=update.update_id,
message=query.message,
callback_query=query
)
# Handle regular button callbacks
if callback_data == "balance":
await self.balance_command(fake_update, context)
elif callback_data == "stats":
await self.stats_command(fake_update, context)
elif callback_data == "positions":
await self.positions_command(fake_update, context)
elif callback_data == "orders":
await self.orders_command(fake_update, context)
elif callback_data == "market":
await self.market_command(fake_update, context)
elif callback_data == "price":
await self.price_command(fake_update, context)
elif callback_data == "trades":
await self.trades_command(fake_update, context)
elif callback_data == "help":
await self.help_command(fake_update, context)
elif callback_data == "performance":
await self.performance_command(fake_update, context)
elif callback_data == "alarm":
await self.alarm_command(fake_update, context)
elif callback_data == "daily":
await self.daily_command(fake_update, context)
elif callback_data == "weekly":
await self.weekly_command(fake_update, context)
elif callback_data == "monthly":
await self.monthly_command(fake_update, context)
elif callback_data == "monitoring":
await self.monitoring_command(fake_update, context)
elif callback_data == "logs":
await self.logs_command(fake_update, context)
async def _execute_long_order(self, query, token: str, usdc_amount: float, price: float, is_limit: bool):
"""Execute a long order."""
symbol = f"{token}/USDC:USDC"
try:
await query.edit_message_text("โณ Opening long position...")
# Calculate token amount based on USDC value and price
token_amount = usdc_amount / price
# Place order (limit or market)
if is_limit:
order = self.client.place_limit_order(symbol, 'buy', token_amount, price)
else:
order = self.client.place_market_order(symbol, 'buy', token_amount)
if order:
# Record the trade in stats
order_id = order.get('id', 'N/A')
actual_price = order.get('average', price) # Use actual fill price if available
action_type = self.stats.record_trade_with_enhanced_tracking(symbol, 'buy', token_amount, actual_price, order_id, "bot")
success_message = f"""
โ
Long Position {'Placed' if is_limit else 'Opened'} Successfully!
๐ Order Details:
โข Token: {token}
โข Direction: LONG (Buy)
โข Amount: {token_amount:.6f} {token}
โข Price: ${price:,.2f}
โข USDC Value: ~${usdc_amount:,.2f}
โข Order Type: {'Limit' if is_limit else 'Market'} Order
โข Order ID: {order_id}
๐ Your {'limit order has been placed' if is_limit else 'long position is now active'}!
"""
await query.edit_message_text(success_message, parse_mode='HTML')
logger.info(f"Long {'limit order placed' if is_limit else 'position opened'}: {token_amount:.6f} {token} @ ${price} ({'Limit' if is_limit else 'Market'})")
else:
await query.edit_message_text(f"โ Failed to {'place limit order' if is_limit else 'open long position'}. Please try again.")
except Exception as e:
error_message = f"โ Error {'placing limit order' if is_limit else 'opening long position'}: {str(e)}"
await query.edit_message_text(error_message)
logger.error(f"Error in long order: {e}")
async def _execute_short_order(self, query, token: str, usdc_amount: float, price: float, is_limit: bool):
"""Execute a short order."""
symbol = f"{token}/USDC:USDC"
try:
await query.edit_message_text("โณ Opening short position...")
# Calculate token amount based on USDC value and price
token_amount = usdc_amount / price
# Place order (limit or market)
if is_limit:
order = self.client.place_limit_order(symbol, 'sell', token_amount, price)
else:
order = self.client.place_market_order(symbol, 'sell', token_amount)
if order:
# Record the trade in stats
order_id = order.get('id', 'N/A')
actual_price = order.get('average', price) # Use actual fill price if available
action_type = self.stats.record_trade_with_enhanced_tracking(symbol, 'sell', token_amount, actual_price, order_id, "bot")
success_message = f"""
โ
Short Position {'Placed' if is_limit else 'Opened'} Successfully!
๐ Order Details:
โข Token: {token}
โข Direction: SHORT (Sell)
โข Amount: {token_amount:.6f} {token}
โข Price: ${price:,.2f}
โข USDC Value: ~${usdc_amount:,.2f}
โข Order Type: {'Limit' if is_limit else 'Market'} Order
โข Order ID: {order_id}
๐ Your {'limit order has been placed' if is_limit else 'short position is now active'}!
"""
await query.edit_message_text(success_message, parse_mode='HTML')
logger.info(f"Short {'limit order placed' if is_limit else 'position opened'}: {token_amount:.6f} {token} @ ${price} ({'Limit' if is_limit else 'Market'})")
else:
await query.edit_message_text(f"โ Failed to {'place limit order' if is_limit else 'open short position'}. Please try again.")
except Exception as e:
error_message = f"โ Error {'placing limit order' if is_limit else 'opening short position'}: {str(e)}"
await query.edit_message_text(error_message)
logger.error(f"Error in short order: {e}")
async def _execute_exit_order(self, query, token: str, exit_side: str, contracts: float, price: float):
"""Execute an exit order."""
symbol = f"{token}/USDC:USDC"
try:
await query.edit_message_text("โณ Closing position...")
# Place market order to close position
order = self.client.place_market_order(symbol, exit_side, contracts)
if order:
# Record the trade in stats
order_id = order.get('id', 'N/A')
actual_price = order.get('average', price) # Use actual fill price if available
action_type = self.stats.record_trade_with_enhanced_tracking(symbol, exit_side, contracts, actual_price, order_id, "bot")
position_type = "LONG" if exit_side == "sell" else "SHORT"
success_message = f"""
โ
Position Closed Successfully!
๐ Exit Details:
โข Token: {token}
โข Position Closed: {position_type}
โข Exit Side: {exit_side.upper()}
โข Amount: {contracts} {token}
โข Est. Price: ~${price:,.2f}
โข Order Type: Market Order
โข Order ID: {order_id}
๐ฏ Position Summary:
โข Status: CLOSED
โข Exit Value: ~${contracts * price:,.2f}
๐ Use /stats to see updated performance metrics.
"""
await query.edit_message_text(success_message, parse_mode='HTML')
logger.info(f"Position closed: {exit_side} {contracts} {token} @ ~${price}")
else:
await query.edit_message_text("โ Failed to close position. Please try again.")
except Exception as e:
error_message = f"โ Error closing position: {str(e)}"
await query.edit_message_text(error_message)
logger.error(f"Error closing position: {e}")
async def _execute_coo(self, query, token: str):
"""Execute cancel open orders for a specific token."""
symbol = f"{token}/USDC:USDC"
try:
await query.edit_message_text("โณ Cancelling all orders...")
# Get current orders for this token
all_orders = self.client.get_open_orders()
if all_orders is None:
await query.edit_message_text(f"โ Could not fetch orders to cancel {token} orders")
return
# Filter orders for the specific token
token_orders = [order for order in all_orders if order.get('symbol') == symbol]
if not token_orders:
await query.edit_message_text(f"๐ญ No open orders found for {token}")
return
# Cancel each order
cancelled_orders = []
failed_orders = []
for order in token_orders:
order_id = order.get('id')
if order_id:
try:
success = self.client.cancel_order(order_id, symbol)
if success:
cancelled_orders.append(order)
else:
failed_orders.append(order)
except Exception as e:
logger.error(f"Failed to cancel order {order_id}: {e}")
failed_orders.append(order)
# Create result message
result_message = f"""
โ
Cancel Orders Results
๐ Summary:
โข Token: {token}
โข Cancelled: {len(cancelled_orders)} orders
โข Failed: {len(failed_orders)} orders
โข Total Attempted: {len(token_orders)} orders
"""
if cancelled_orders:
result_message += f"\n๐๏ธ Successfully Cancelled:\n"
for order in cancelled_orders:
side = order.get('side', 'Unknown')
amount = order.get('amount', 0)
price = order.get('price', 0)
side_emoji = "๐ข" if side.lower() == 'buy' else "๐ด"
result_message += f"{side_emoji} {side.upper()} {amount} @ ${price:,.2f}\n"
if failed_orders:
result_message += f"\nโ Failed to Cancel:\n"
for order in failed_orders:
side = order.get('side', 'Unknown')
amount = order.get('amount', 0)
price = order.get('price', 0)
order_id = order.get('id', 'Unknown')
side_emoji = "๐ข" if side.lower() == 'buy' else "๐ด"
result_message += f"{side_emoji} {side.upper()} {amount} @ ${price:,.2f} (ID: {order_id})\n"
if len(cancelled_orders) == len(token_orders):
result_message += f"\n๐ All {token} orders successfully cancelled!"
elif len(cancelled_orders) > 0:
result_message += f"\nโ ๏ธ Some orders cancelled. Check failed orders above."
else:
result_message += f"\nโ Could not cancel any {token} orders."
await query.edit_message_text(result_message, parse_mode='HTML')
logger.info(f"COO executed for {token}: {len(cancelled_orders)}/{len(token_orders)} orders cancelled")
except Exception as e:
error_message = f"โ Error cancelling {token} orders: {str(e)}"
await query.edit_message_text(error_message)
logger.error(f"Error in COO execution: {e}")
async def _execute_sl_order(self, query, token: str, exit_side: str, contracts: float, price: float):
"""Execute a stop loss order."""
symbol = f"{token}/USDC:USDC"
try:
await query.edit_message_text("โณ Setting stop loss...")
# Place stop loss order
order = self.client.place_stop_loss_order(symbol, exit_side, contracts, price)
if order:
# Record the trade in stats
order_id = order.get('id', 'N/A')
actual_price = order.get('average', price) # Use actual fill price if available
action_type = self.stats.record_trade_with_enhanced_tracking(symbol, exit_side, contracts, actual_price, order_id, "bot")
position_type = "LONG" if exit_side == "sell" else "SHORT"
success_message = f"""
โ
Stop Loss Order Set Successfully!
๐ Stop Loss Details:
โข Token: {token}
โข Position: {position_type}
โข Size: {contracts} contracts
โข Stop Price: ${price:,.2f}
โข Action: {exit_side.upper()} (Close {position_type})
โข Amount: {contracts} {token}
โข Order Type: Limit Order
โข Order ID: {order_id}
๐ฏ Stop Loss Execution:
โข Status: SET
โข Exit Value: ~${contracts * price:,.2f}
๐ Use /stats to see updated performance metrics.
"""
await query.edit_message_text(success_message, parse_mode='HTML')
logger.info(f"Stop loss set: {exit_side} {contracts} {token} @ ${price}")
else:
await query.edit_message_text("โ Failed to set stop loss. Please try again.")
except Exception as e:
error_message = f"โ Error setting stop loss: {str(e)}"
await query.edit_message_text(error_message)
logger.error(f"Error setting stop loss: {e}")
async def _execute_tp_order(self, query, token: str, exit_side: str, contracts: float, price: float):
"""Execute a take profit order."""
symbol = f"{token}/USDC:USDC"
try:
await query.edit_message_text("โณ Setting take profit...")
# Place take profit order
order = self.client.place_take_profit_order(symbol, exit_side, contracts, price)
if order:
# Record the trade in stats
order_id = order.get('id', 'N/A')
actual_price = order.get('average', price) # Use actual fill price if available
action_type = self.stats.record_trade_with_enhanced_tracking(symbol, exit_side, contracts, actual_price, order_id, "bot")
position_type = "LONG" if exit_side == "sell" else "SHORT"
success_message = f"""
โ
Take Profit Order Set Successfully!
๐ Take Profit Details:
โข Token: {token}
โข Position: {position_type}
โข Size: {contracts} contracts
โข Target Price: ${price:,.2f}
โข Action: {exit_side.upper()} (Close {position_type})
โข Amount: {contracts} {token}
โข Order Type: Limit Order
โข Order ID: {order_id}
๐ฏ Take Profit Execution:
โข Status: SET
โข Exit Value: ~${contracts * price:,.2f}
๐ Use /stats to see updated performance metrics.
"""
await query.edit_message_text(success_message, parse_mode='HTML')
logger.info(f"Take profit set: {exit_side} {contracts} {token} @ ${price}")
else:
await query.edit_message_text("โ Failed to set take profit. Please try again.")
except Exception as e:
error_message = f"โ Error setting take profit: {str(e)}"
await query.edit_message_text(error_message)
logger.error(f"Error setting take profit: {e}")
async def unknown_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle unknown commands."""
if not self.is_authorized(update.effective_chat.id):
await update.message.reply_text("โ Unauthorized access.")
return
await update.message.reply_text(
"โ Unknown command. Use /help to see available commands or tap the buttons in /start."
)
def setup_handlers(self):
"""Set up command handlers for the bot."""
if not self.application:
return
# Command handlers
self.application.add_handler(CommandHandler("start", self.start_command))
self.application.add_handler(CommandHandler("help", self.help_command))
self.application.add_handler(CommandHandler("commands", self.commands_command))
self.application.add_handler(CommandHandler("c", self.commands_command))
self.application.add_handler(CommandHandler("balance", self.balance_command))
self.application.add_handler(CommandHandler("positions", self.positions_command))
self.application.add_handler(CommandHandler("orders", self.orders_command))
self.application.add_handler(CommandHandler("market", self.market_command))
self.application.add_handler(CommandHandler("price", self.price_command))
self.application.add_handler(CommandHandler("stats", self.stats_command))
self.application.add_handler(CommandHandler("trades", self.trades_command))
self.application.add_handler(CommandHandler("long", self.long_command))
self.application.add_handler(CommandHandler("short", self.short_command))
self.application.add_handler(CommandHandler("exit", self.exit_command))
self.application.add_handler(CommandHandler("coo", self.coo_command))
self.application.add_handler(CommandHandler("sl", self.sl_command))
self.application.add_handler(CommandHandler("tp", self.tp_command))
self.application.add_handler(CommandHandler("monitoring", self.monitoring_command))
self.application.add_handler(CommandHandler("alarm", self.alarm_command))
self.application.add_handler(CommandHandler("logs", self.logs_command))
self.application.add_handler(CommandHandler("performance", self.performance_command))
self.application.add_handler(CommandHandler("daily", self.daily_command))
self.application.add_handler(CommandHandler("weekly", self.weekly_command))
self.application.add_handler(CommandHandler("monthly", self.monthly_command))
self.application.add_handler(CommandHandler("risk", self.risk_command))
self.application.add_handler(CommandHandler("version", self.version_command))
# Callback query handler for inline keyboards
self.application.add_handler(CallbackQueryHandler(self.button_callback))
# Handle unknown commands
self.application.add_handler(MessageHandler(filters.COMMAND, self.unknown_command))
async def run(self):
"""Run the Telegram bot."""
if not Config.TELEGRAM_BOT_TOKEN:
logger.error("โ TELEGRAM_BOT_TOKEN not configured")
return
if not Config.TELEGRAM_CHAT_ID:
logger.error("โ TELEGRAM_CHAT_ID not configured")
return
try:
# Create application
self.application = Application.builder().token(Config.TELEGRAM_BOT_TOKEN).build()
# Set up handlers
self.setup_handlers()
logger.info("๐ Starting Telegram trading bot...")
# Initialize the application
await self.application.initialize()
# Send startup notification
await self.send_message(
f"๐ค Manual Trading Bot v{self.version} Started\n\n"
f"โ
Connected to Hyperliquid {'Testnet' if Config.HYPERLIQUID_TESTNET else 'Mainnet'}\n"
f"๐ Default Symbol: {Config.DEFAULT_TRADING_TOKEN}\n"
f"๐ฑ Manual trading ready!\n"
f"๐ Order monitoring: Active ({Config.BOT_HEARTBEAT_SECONDS}s interval)\n"
f"๐ External trade monitoring: Active\n"
f"๐ Price alarms: Active\n"
f"๐ Auto stats sync: Enabled\n"
f"๐ Logging: {'File + Console' if Config.LOG_TO_FILE else 'Console only'}\n"
f"โฐ Started at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
"Use /start for quick actions or /help for all commands."
)
# Perform initial log cleanup
try:
cleanup_logs(days_to_keep=30)
logger.info("๐งน Initial log cleanup completed")
except Exception as e:
logger.warning(f"โ ๏ธ Initial log cleanup failed: {e}")
# Start the application
await self.application.start()
# Start order monitoring
await self.start_order_monitoring()
# Start polling for updates manually
logger.info("๐ Starting update polling...")
# Get updates in a loop
last_update_id = 0
while True:
try:
# Get updates from Telegram
updates = await self.application.bot.get_updates(
offset=last_update_id + 1,
timeout=30,
allowed_updates=None
)
# Process each update
for update in updates:
last_update_id = update.update_id
# Process the update through the application
await self.application.process_update(update)
except Exception as e:
logger.error(f"Error processing updates: {e}")
await asyncio.sleep(5) # Wait before retrying
except asyncio.CancelledError:
logger.info("๐ Bot polling cancelled")
raise
except Exception as e:
logger.error(f"โ Error in telegram bot: {e}")
raise
finally:
# Clean shutdown
try:
await self.stop_order_monitoring()
if self.application:
await self.application.stop()
await self.application.shutdown()
except Exception as e:
logger.error(f"Error during shutdown: {e}")
async def long_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle the /long command for opening long positions."""
if not self.is_authorized(update.effective_chat.id):
await update.message.reply_text("โ Unauthorized access.")
return
try:
if not context.args or len(context.args) < 2:
await update.message.reply_text(
"โ Usage: /long [token] [USDC amount] [price (optional)]\n"
"Examples:\n"
"โข /long BTC 100 - Market order\n"
"โข /long BTC 100 45000 - Limit order at $45,000"
)
return
token = context.args[0].upper()
usdc_amount = float(context.args[1])
# Check if price is provided for limit order
limit_price = None
if len(context.args) >= 3:
limit_price = float(context.args[2])
order_type = "Limit"
order_description = f"at ${limit_price:,.2f}"
else:
order_type = "Market"
order_description = "at current market price"
# Convert token to full symbol format for Hyperliquid
symbol = f"{token}/USDC:USDC"
# Get current market price to calculate amount and for display
market_data = self.client.get_market_data(symbol)
if not market_data:
await update.message.reply_text(f"โ Could not fetch price for {token}")
return
current_price = float(market_data['ticker'].get('last', 0))
if current_price <= 0:
await update.message.reply_text(f"โ Invalid price for {token}")
return
# Calculate token amount based on price (market or limit)
calculation_price = limit_price if limit_price else current_price
token_amount = usdc_amount / calculation_price
# Create confirmation message
confirmation_text = f"""
๐ข Long Position Confirmation
๐ Order Details:
โข Token: {token}
โข Direction: LONG (Buy)
โข USDC Value: ${usdc_amount:,.2f}
โข Current Price: ${current_price:,.2f}
โข Order Type: {order_type} Order
โข Token Amount: {token_amount:.6f} {token}
๐ฏ Execution:
โข Will buy {token_amount:.6f} {token} {order_description}
โข Est. Value: ${token_amount * calculation_price:,.2f}
โ ๏ธ Are you sure you want to open this long position?
"""
# Use limit_price for callback if provided, otherwise current_price
callback_price = limit_price if limit_price else current_price
callback_data = f"confirm_long_{token}_{usdc_amount}_{callback_price}"
if limit_price:
callback_data += "_limit"
keyboard = [
[
InlineKeyboardButton("โ
Confirm Long", callback_data=callback_data),
InlineKeyboardButton("โ Cancel", callback_data="cancel_order")
]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(confirmation_text, parse_mode='HTML', reply_markup=reply_markup)
except ValueError:
await update.message.reply_text("โ Invalid USDC amount or price. Please use numbers only.")
except Exception as e:
await update.message.reply_text(f"โ Error processing long command: {e}")
async def short_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle the /short command for opening short positions."""
if not self.is_authorized(update.effective_chat.id):
await update.message.reply_text("โ Unauthorized access.")
return
try:
if not context.args or len(context.args) < 2:
await update.message.reply_text(
"โ Usage: /short [token] [USDC amount] [price (optional)]\n"
"Examples:\n"
"โข /short BTC 100 - Market order\n"
"โข /short BTC 100 46000 - Limit order at $46,000"
)
return
token = context.args[0].upper()
usdc_amount = float(context.args[1])
# Check if price is provided for limit order
limit_price = None
if len(context.args) >= 3:
limit_price = float(context.args[2])
order_type = "Limit"
order_description = f"at ${limit_price:,.2f}"
else:
order_type = "Market"
order_description = "at current market price"
# Convert token to full symbol format for Hyperliquid
symbol = f"{token}/USDC:USDC"
# Get current market price to calculate amount and for display
market_data = self.client.get_market_data(symbol)
if not market_data:
await update.message.reply_text(f"โ Could not fetch price for {token}")
return
current_price = float(market_data['ticker'].get('last', 0))
if current_price <= 0:
await update.message.reply_text(f"โ Invalid price for {token}")
return
# Calculate token amount based on price (market or limit)
calculation_price = limit_price if limit_price else current_price
token_amount = usdc_amount / calculation_price
# Create confirmation message
confirmation_text = f"""
๐ด Short Position Confirmation
๐ Order Details:
โข Token: {token}
โข Direction: SHORT (Sell)
โข USDC Value: ${usdc_amount:,.2f}
โข Current Price: ${current_price:,.2f}
โข Order Type: {order_type} Order
โข Token Amount: {token_amount:.6f} {token}
๐ฏ Execution:
โข Will sell {token_amount:.6f} {token} {order_description}
โข Est. Value: ${token_amount * calculation_price:,.2f}
โ ๏ธ Are you sure you want to open this short position?
"""
# Use limit_price for callback if provided, otherwise current_price
callback_price = limit_price if limit_price else current_price
callback_data = f"confirm_short_{token}_{usdc_amount}_{callback_price}"
if limit_price:
callback_data += "_limit"
keyboard = [
[
InlineKeyboardButton("โ
Confirm Short", callback_data=callback_data),
InlineKeyboardButton("โ Cancel", callback_data="cancel_order")
]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(confirmation_text, parse_mode='HTML', reply_markup=reply_markup)
except ValueError:
await update.message.reply_text("โ Invalid USDC amount or price. Please use numbers only.")
except Exception as e:
await update.message.reply_text(f"โ Error processing short command: {e}")
async def exit_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle the /exit command for closing positions."""
if not self.is_authorized(update.effective_chat.id):
await update.message.reply_text("โ Unauthorized access.")
return
try:
if not context.args or len(context.args) < 1:
await update.message.reply_text(
"โ Usage: /exit [token]\n"
"Example: /exit BTC"
)
return
token = context.args[0].upper()
symbol = f"{token}/USDC:USDC"
# Get current positions to find the position for this token
positions = self.client.get_positions()
if positions is None:
await update.message.reply_text(f"โ Could not fetch positions to check {token} position")
return
# Find the position for this token
current_position = None
for position in positions:
if position.get('symbol') == symbol and float(position.get('contracts', 0)) != 0:
current_position = position
break
if not current_position:
await update.message.reply_text(f"๐ญ No open position found for {token}")
return
# Extract position details
contracts = float(current_position.get('contracts', 0))
entry_price = float(current_position.get('entryPx', 0))
unrealized_pnl = float(current_position.get('unrealizedPnl', 0))
# Determine position direction and exit details
if contracts > 0:
position_type = "LONG"
exit_side = "sell"
exit_emoji = "๐ด"
else:
position_type = "SHORT"
exit_side = "buy"
exit_emoji = "๐ข"
contracts = abs(contracts) # Make positive for display
# Get current market price
market_data = self.client.get_market_data(symbol)
if not market_data:
await update.message.reply_text(f"โ Could not fetch current price for {token}")
return
current_price = float(market_data['ticker'].get('last', 0))
if current_price <= 0:
await update.message.reply_text(f"โ Invalid current price for {token}")
return
# Calculate estimated exit value
exit_value = contracts * current_price
# Create confirmation message
pnl_emoji = "๐ข" if unrealized_pnl >= 0 else "๐ด"
confirmation_text = f"""
{exit_emoji} Exit Position Confirmation
๐ Position Details:
โข Token: {token}
โข Position: {position_type}
โข Size: {contracts} contracts
โข Entry Price: ${entry_price:,.2f}
โข Current Price: ${current_price:,.2f}
โข {pnl_emoji} Unrealized P&L: ${unrealized_pnl:,.2f}
๐ฏ Exit Order:
โข Action: {exit_side.upper()} (Close {position_type})
โข Amount: {contracts} {token}
โข Est. Value: ~${exit_value:,.2f}
โข Order Type: Market Order
โ ๏ธ Are you sure you want to close this {position_type} position?
This will place a market {exit_side} order to close your entire {token} position.
"""
keyboard = [
[
InlineKeyboardButton(f"โ
Close {position_type}", callback_data=f"confirm_exit_{token}_{exit_side}_{contracts}_{current_price}"),
InlineKeyboardButton("โ Cancel", callback_data="cancel_order")
]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(confirmation_text, parse_mode='HTML', reply_markup=reply_markup)
except ValueError:
await update.message.reply_text("โ Invalid token format. Please use token symbols like BTC, ETH, etc.")
except Exception as e:
await update.message.reply_text(f"โ Error processing exit command: {e}")
async def coo_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle the /coo (cancel open orders) command for a specific token."""
if not self.is_authorized(update.effective_chat.id):
await update.message.reply_text("โ Unauthorized access.")
return
try:
if not context.args or len(context.args) < 1:
await update.message.reply_text(
"โ Usage: /coo [token]\n"
"Example: /coo BTC\n\n"
"This command cancels ALL open orders for the specified token."
)
return
token = context.args[0].upper()
symbol = f"{token}/USDC:USDC"
# Get current orders for this token
all_orders = self.client.get_open_orders()
if all_orders is None:
await update.message.reply_text(f"โ Could not fetch orders to cancel {token} orders")
return
# Filter orders for the specific token
token_orders = [order for order in all_orders if order.get('symbol') == symbol]
if not token_orders:
await update.message.reply_text(f"๐ญ No open orders found for {token}")
return
# Create confirmation message with order details
confirmation_text = f"""
โ ๏ธ Cancel All {token} Orders
๐ Orders to Cancel:
"""
total_value = 0
for order in token_orders:
side = order.get('side', 'Unknown')
amount = order.get('amount', 0)
price = order.get('price', 0)
order_id = order.get('id', 'Unknown')
side_emoji = "๐ข" if side.lower() == 'buy' else "๐ด"
order_value = float(amount) * float(price)
total_value += order_value
confirmation_text += f"{side_emoji} {side.upper()} {amount} @ ${price:,.2f} (${order_value:,.2f})\n"
confirmation_text += f"""
๐ฐ Total Value: ${total_value:,.2f}
๐ข Orders Count: {len(token_orders)}
โ ๏ธ Are you sure you want to cancel ALL {token} orders?
This action cannot be undone.
"""
keyboard = [
[
InlineKeyboardButton(f"โ
Cancel All {token}", callback_data=f"confirm_coo_{token}"),
InlineKeyboardButton("โ Keep Orders", callback_data="cancel_order")
]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(confirmation_text, parse_mode='HTML', reply_markup=reply_markup)
except ValueError:
await update.message.reply_text("โ Invalid token format. Please use token symbols like BTC, ETH, etc.")
except Exception as e:
await update.message.reply_text(f"โ Error processing cancel orders command: {e}")
async def sl_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle the /sl (stop loss) command for setting stop loss orders."""
if not self.is_authorized(update.effective_chat.id):
await update.message.reply_text("โ Unauthorized access.")
return
try:
if not context.args or len(context.args) < 2:
await update.message.reply_text(
"โ Usage: /sl [token] [price]\n"
"Example: /sl BTC 44000\n\n"
"This creates a stop loss order at the specified price."
)
return
token = context.args[0].upper()
stop_price = float(context.args[1])
symbol = f"{token}/USDC:USDC"
# Get current positions to find the position for this token
positions = self.client.get_positions()
if positions is None:
await update.message.reply_text(f"โ Could not fetch positions to check {token} position")
return
# Find the position for this token
current_position = None
for position in positions:
if position.get('symbol') == symbol and float(position.get('contracts', 0)) != 0:
current_position = position
break
if not current_position:
await update.message.reply_text(f"๐ญ No open position found for {token}\n\nYou need an open position to set a stop loss.")
return
# Extract position details
contracts = float(current_position.get('contracts', 0))
entry_price = float(current_position.get('entryPx', 0))
unrealized_pnl = float(current_position.get('unrealizedPnl', 0))
# Determine position direction and validate stop loss price
if contracts > 0:
# Long position - stop loss should be below entry price
position_type = "LONG"
exit_side = "sell"
exit_emoji = "๐ด"
contracts_abs = contracts
if stop_price >= entry_price:
await update.message.reply_text(
f"โ ๏ธ Stop loss price should be BELOW entry price for long positions\n\n"
f"๐ Your {token} LONG position:\n"
f"โข Entry Price: ${entry_price:,.2f}\n"
f"โข Stop Price: ${stop_price:,.2f} โ\n\n"
f"๐ก Try a lower price like: /sl {token} {entry_price * 0.95:.0f}"
)
return
else:
# Short position - stop loss should be above entry price
position_type = "SHORT"
exit_side = "buy"
exit_emoji = "๐ข"
contracts_abs = abs(contracts)
if stop_price <= entry_price:
await update.message.reply_text(
f"โ ๏ธ Stop loss price should be ABOVE entry price for short positions\n\n"
f"๐ Your {token} SHORT position:\n"
f"โข Entry Price: ${entry_price:,.2f}\n"
f"โข Stop Price: ${stop_price:,.2f} โ\n\n"
f"๐ก Try a higher price like: /sl {token} {entry_price * 1.05:.0f}"
)
return
# Get current market price for reference
market_data = self.client.get_market_data(symbol)
current_price = 0
if market_data:
current_price = float(market_data['ticker'].get('last', 0))
# Calculate estimated P&L at stop loss
if contracts > 0: # Long position
pnl_at_stop = (stop_price - entry_price) * contracts_abs
else: # Short position
pnl_at_stop = (entry_price - stop_price) * contracts_abs
# Create confirmation message
pnl_emoji = "๐ข" if pnl_at_stop >= 0 else "๐ด"
confirmation_text = f"""
๐ Stop Loss Order Confirmation
๐ Position Details:
โข Token: {token}
โข Position: {position_type}
โข Size: {contracts_abs} contracts
โข Entry Price: ${entry_price:,.2f}
โข Current Price: ${current_price:,.2f}
๐ฏ Stop Loss Order:
โข Stop Price: ${stop_price:,.2f}
โข Action: {exit_side.upper()} (Close {position_type})
โข Amount: {contracts_abs} {token}
โข Order Type: Limit Order
โข {pnl_emoji} Est. P&L: ${pnl_at_stop:,.2f}
โ ๏ธ Are you sure you want to set this stop loss?
This will place a limit {exit_side} order at ${stop_price:,.2f} to protect your {position_type} position.
"""
keyboard = [
[
InlineKeyboardButton(f"โ
Set Stop Loss", callback_data=f"confirm_sl_{token}_{exit_side}_{contracts_abs}_{stop_price}"),
InlineKeyboardButton("โ Cancel", callback_data="cancel_order")
]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(confirmation_text, parse_mode='HTML', reply_markup=reply_markup)
except ValueError:
await update.message.reply_text("โ Invalid price format. Please use numbers only.")
except Exception as e:
await update.message.reply_text(f"โ Error processing stop loss command: {e}")
async def tp_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle the /tp (take profit) command for setting take profit orders."""
if not self.is_authorized(update.effective_chat.id):
await update.message.reply_text("โ Unauthorized access.")
return
try:
if not context.args or len(context.args) < 2:
await update.message.reply_text(
"โ Usage: /tp [token] [price]\n"
"Example: /tp BTC 50000\n\n"
"This creates a take profit order at the specified price."
)
return
token = context.args[0].upper()
profit_price = float(context.args[1])
symbol = f"{token}/USDC:USDC"
# Get current positions to find the position for this token
positions = self.client.get_positions()
if positions is None:
await update.message.reply_text(f"โ Could not fetch positions to check {token} position")
return
# Find the position for this token
current_position = None
for position in positions:
if position.get('symbol') == symbol and float(position.get('contracts', 0)) != 0:
current_position = position
break
if not current_position:
await update.message.reply_text(f"๐ญ No open position found for {token}\n\nYou need an open position to set a take profit.")
return
# Extract position details
contracts = float(current_position.get('contracts', 0))
entry_price = float(current_position.get('entryPx', 0))
unrealized_pnl = float(current_position.get('unrealizedPnl', 0))
# Determine position direction and validate take profit price
if contracts > 0:
# Long position - take profit should be above entry price
position_type = "LONG"
exit_side = "sell"
exit_emoji = "๐ด"
contracts_abs = contracts
if profit_price <= entry_price:
await update.message.reply_text(
f"โ ๏ธ Take profit price should be ABOVE entry price for long positions\n\n"
f"๐ Your {token} LONG position:\n"
f"โข Entry Price: ${entry_price:,.2f}\n"
f"โข Take Profit: ${profit_price:,.2f} โ\n\n"
f"๐ก Try a higher price like: /tp {token} {entry_price * 1.05:.0f}"
)
return
else:
# Short position - take profit should be below entry price
position_type = "SHORT"
exit_side = "buy"
exit_emoji = "๐ข"
contracts_abs = abs(contracts)
if profit_price >= entry_price:
await update.message.reply_text(
f"โ ๏ธ Take profit price should be BELOW entry price for short positions\n\n"
f"๐ Your {token} SHORT position:\n"
f"โข Entry Price: ${entry_price:,.2f}\n"
f"โข Take Profit: ${profit_price:,.2f} โ\n\n"
f"๐ก Try a lower price like: /tp {token} {entry_price * 0.95:.0f}"
)
return
# Get current market price for reference
market_data = self.client.get_market_data(symbol)
current_price = 0
if market_data:
current_price = float(market_data['ticker'].get('last', 0))
# Calculate estimated P&L at take profit
if contracts > 0: # Long position
pnl_at_tp = (profit_price - entry_price) * contracts_abs
else: # Short position
pnl_at_tp = (entry_price - profit_price) * contracts_abs
# Create confirmation message
pnl_emoji = "๐ข" if pnl_at_tp >= 0 else "๐ด"
confirmation_text = f"""
๐ฏ Take Profit Order Confirmation
๐ Position Details:
โข Token: {token}
โข Position: {position_type}
โข Size: {contracts_abs} contracts
โข Entry Price: ${entry_price:,.2f}
โข Current Price: ${current_price:,.2f}
๐ฐ Take Profit Order:
โข Target Price: ${profit_price:,.2f}
โข Action: {exit_side.upper()} (Close {position_type})
โข Amount: {contracts_abs} {token}
โข Order Type: Limit Order
โข {pnl_emoji} Est. P&L: ${pnl_at_tp:,.2f}
โ ๏ธ Are you sure you want to set this take profit?
This will place a limit {exit_side} order at ${profit_price:,.2f} to capture profits from your {position_type} position.
"""
keyboard = [
[
InlineKeyboardButton(f"โ
Set Take Profit", callback_data=f"confirm_tp_{token}_{exit_side}_{contracts_abs}_{profit_price}"),
InlineKeyboardButton("โ Cancel", callback_data="cancel_order")
]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(confirmation_text, parse_mode='HTML', reply_markup=reply_markup)
except ValueError:
await update.message.reply_text("โ Invalid price format. Please use numbers only.")
except Exception as e:
await update.message.reply_text(f"โ Error processing take profit command: {e}")
async def start_order_monitoring(self):
"""Start the order monitoring background task."""
if self.monitoring_active:
return
self.monitoring_active = True
logger.info("๐ Starting order monitoring...")
# Initialize tracking data
await self._initialize_order_tracking()
# Start monitoring loop
asyncio.create_task(self._order_monitoring_loop())
async def stop_order_monitoring(self):
"""Stop the order monitoring background task."""
self.monitoring_active = False
logger.info("โน๏ธ Stopping order monitoring...")
async def _initialize_order_tracking(self):
"""Initialize order and position tracking."""
try:
# Get current open orders to initialize tracking
orders = self.client.get_open_orders()
if orders:
self.last_known_orders = {order.get('id') for order in orders if order.get('id')}
logger.info(f"๐ Initialized tracking with {len(self.last_known_orders)} open orders")
# Get current positions for P&L tracking
positions = self.client.get_positions()
if positions:
for position in positions:
symbol = position.get('symbol')
contracts = float(position.get('contracts', 0))
entry_price = float(position.get('entryPx', 0))
if symbol and contracts != 0:
self.last_known_positions[symbol] = {
'contracts': contracts,
'entry_price': entry_price
}
logger.info(f"๐ Initialized tracking with {len(self.last_known_positions)} positions")
except Exception as e:
logger.error(f"โ Error initializing order tracking: {e}")
async def _order_monitoring_loop(self):
"""Main monitoring loop that runs every Config.BOT_HEARTBEAT_SECONDS seconds."""
while self.monitoring_active:
try:
await self._check_order_fills()
await asyncio.sleep(Config.BOT_HEARTBEAT_SECONDS) # Use configurable interval
except asyncio.CancelledError:
logger.info("๐ Order monitoring cancelled")
break
except Exception as e:
logger.error(f"โ Error in order monitoring loop: {e}")
await asyncio.sleep(Config.BOT_HEARTBEAT_SECONDS) # Continue monitoring even if there's an error
async def _check_order_fills(self):
"""Check for filled orders and send notifications."""
try:
# Get current orders and positions
current_orders = self.client.get_open_orders() or []
current_positions = self.client.get_positions() or []
# Get current order IDs
current_order_ids = {order.get('id') for order in current_orders if order.get('id')}
# Find filled orders (orders that were in last_known_orders but not in current_orders)
filled_order_ids = self.last_known_orders - current_order_ids
if filled_order_ids:
logger.info(f"๐ฏ Detected {len(filled_order_ids)} filled orders")
await self._process_filled_orders(filled_order_ids, current_positions)
# Update tracking data
self.last_known_orders = current_order_ids
await self._update_position_tracking(current_positions)
# Check price alarms
await self._check_price_alarms()
# Check external trades (trades made outside the bot)
await self._check_external_trades()
# Check stop losses (if risk management is enabled)
if Config.RISK_MANAGEMENT_ENABLED:
await self._check_stop_losses(current_positions)
except Exception as e:
logger.error(f"โ Error checking order fills: {e}")
async def _check_price_alarms(self):
"""Check all active price alarms."""
try:
# Get all active alarms
active_alarms = self.alarm_manager.get_all_active_alarms()
if not active_alarms:
return
# Get unique tokens from alarms
tokens_to_check = list(set(alarm['token'] for alarm in active_alarms))
# Fetch current prices for all tokens
price_data = {}
for token in tokens_to_check:
symbol = f"{token}/USDC:USDC"
market_data = self.client.get_market_data(symbol)
if market_data and market_data.get('ticker'):
current_price = market_data['ticker'].get('last')
if current_price is not None:
price_data[token] = float(current_price)
# Check alarms against current prices
triggered_alarms = self.alarm_manager.check_alarms(price_data)
# Send notifications for triggered alarms
for alarm in triggered_alarms:
await self._send_alarm_notification(alarm)
except Exception as e:
logger.error(f"โ Error checking price alarms: {e}")
async def _send_alarm_notification(self, alarm: Dict[str, Any]):
"""Send notification for triggered alarm."""
try:
message = self.alarm_manager.format_triggered_alarm(alarm)
await self.send_message(message)
logger.info(f"๐ข Sent alarm notification: {alarm['token']} ID {alarm['id']} @ ${alarm['triggered_price']}")
except Exception as e:
logger.error(f"โ Error sending alarm notification: {e}")
async def _check_external_trades(self):
"""Check for trades made outside the Telegram bot and update stats."""
try:
# Get recent fills from Hyperliquid
recent_fills = self.client.get_recent_fills()
if not recent_fills:
return
# Initialize last processed time if first run
if self.last_processed_trade_time is None:
# Set to current time minus 1 hour to catch recent activity
self.last_processed_trade_time = (datetime.now() - timedelta(hours=1)).isoformat()
# Filter for new trades since last check
new_trades = []
latest_trade_time = self.last_processed_trade_time
for fill in recent_fills:
fill_time = fill.get('timestamp')
if fill_time:
# Convert timestamps to comparable format
try:
# Convert fill_time to string if it's not already
if isinstance(fill_time, (int, float)):
# Assume it's a unix timestamp
fill_time_str = datetime.fromtimestamp(fill_time / 1000 if fill_time > 1e10 else fill_time).isoformat()
else:
fill_time_str = str(fill_time)
# Compare as strings
if fill_time_str > self.last_processed_trade_time:
new_trades.append(fill)
if fill_time_str > latest_trade_time:
latest_trade_time = fill_time_str
except Exception as timestamp_error:
logger.warning(f"โ ๏ธ Error processing timestamp {fill_time}: {timestamp_error}")
continue
if not new_trades:
return
# Process new trades
for trade in new_trades:
await self._process_external_trade(trade)
# Update last processed time
self.last_processed_trade_time = latest_trade_time
if new_trades:
logger.info(f"๐ Processed {len(new_trades)} external trades")
except Exception as e:
logger.error(f"โ Error checking external trades: {e}")
async def _process_external_trade(self, trade: Dict[str, Any]):
"""Process an individual external trade and determine if it's opening or closing a position."""
try:
# Extract trade information
symbol = trade.get('symbol', '')
side = trade.get('side', '')
amount = float(trade.get('amount', 0))
price = float(trade.get('price', 0))
trade_id = trade.get('id', 'external')
timestamp = trade.get('timestamp', '')
if not all([symbol, side, amount, price]):
return
# Record trade in stats and get action type using enhanced tracking
action_type = self.stats.record_trade_with_enhanced_tracking(symbol, side, amount, price, trade_id, "external")
# Send enhanced notification based on action type
await self._send_enhanced_trade_notification(symbol, side, amount, price, action_type, timestamp)
logger.info(f"๐ Processed external trade: {side} {amount} {symbol} @ ${price} ({action_type})")
except Exception as e:
logger.error(f"โ Error processing external trade: {e}")
async def _send_enhanced_trade_notification(self, symbol: str, side: str, amount: float, price: float, action_type: str, timestamp: str = None):
"""Send enhanced trade notification based on position action type."""
try:
token = symbol.split('/')[0] if '/' in symbol else symbol
position = self.stats.get_enhanced_position_state(symbol)
if timestamp is None:
time_str = datetime.now().strftime('%H:%M:%S')
else:
try:
time_obj = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
time_str = time_obj.strftime('%H:%M:%S')
except:
time_str = "Unknown"
# Handle different action types
if action_type in ['long_opened', 'short_opened']:
await self._send_position_opened_notification(token, side, amount, price, action_type, time_str)
elif action_type in ['long_increased', 'short_increased']:
await self._send_position_increased_notification(token, side, amount, price, position, action_type, time_str)
elif action_type in ['long_reduced', 'short_reduced']:
pnl_data = self.stats.calculate_enhanced_position_pnl(symbol, amount, price)
await self._send_position_reduced_notification(token, side, amount, price, position, pnl_data, action_type, time_str)
elif action_type in ['long_closed', 'short_closed']:
pnl_data = self.stats.calculate_enhanced_position_pnl(symbol, amount, price)
await self._send_position_closed_notification(token, side, amount, price, position, pnl_data, action_type, time_str)
elif action_type in ['long_closed_and_short_opened', 'short_closed_and_long_opened']:
await self._send_position_flipped_notification(token, side, amount, price, action_type, time_str)
else:
# Fallback to generic notification
await self._send_external_trade_notification({
'symbol': symbol,
'side': side,
'amount': amount,
'price': price,
'timestamp': timestamp or datetime.now().isoformat()
})
except Exception as e:
logger.error(f"โ Error sending enhanced trade notification: {e}")
async def _send_position_opened_notification(self, token: str, side: str, amount: float, price: float, action_type: str, time_str: str):
"""Send notification for newly opened position."""
position_type = "LONG" if action_type == 'long_opened' else "SHORT"
side_emoji = "๐ข" if side.lower() == 'buy' else "๐ด"
trade_value = amount * price
message = f"""
๐ Position Opened
๐ New {position_type} Position:
โข Token: {token}
โข Direction: {position_type}
โข Entry Size: {amount} {token}
โข Entry Price: ${price:,.2f}
โข Position Value: ${trade_value:,.2f}
{side_emoji} Trade Details:
โข Side: {side.upper()}
โข Order Type: Market/Limit
โข Status: OPENED โ
โฐ Time: {time_str}
๐ Note: New {position_type} position established
๐ Use /positions to view current holdings
"""
await self.send_message(message.strip())
logger.info(f"๐ข Position opened: {token} {position_type} {amount} @ ${price}")
async def _send_position_increased_notification(self, token: str, side: str, amount: float, price: float, position: Dict, action_type: str, time_str: str):
"""Send notification for position increase (additional entry)."""
position_type = "LONG" if action_type == 'long_increased' else "SHORT"
side_emoji = "๐ข" if side.lower() == 'buy' else "๐ด"
total_size = abs(position['contracts'])
avg_entry = position['avg_entry_price']
entry_count = position['entry_count']
total_value = total_size * avg_entry
message = f"""
๐ Position Increased
๐ {position_type} Position Updated:
โข Token: {token}
โข Direction: {position_type}
โข Added Size: {amount} {token} @ ${price:,.2f}
โข New Total Size: {total_size} {token}
โข Average Entry: ${avg_entry:,.2f}
{side_emoji} Position Summary:
โข Total Value: ${total_value:,.2f}
โข Entry Points: {entry_count}
โข Last Entry: ${price:,.2f}
โข Status: INCREASED โฌ๏ธ
โฐ Time: {time_str}
๐ก Strategy: Multiple entry averaging
๐ Use /positions for complete position details
"""
await self.send_message(message.strip())
logger.info(f"๐ข Position increased: {token} {position_type} +{amount} @ ${price} (total: {total_size})")
async def _send_position_reduced_notification(self, token: str, side: str, amount: float, price: float, position: Dict, pnl_data: Dict, action_type: str, time_str: str):
"""Send notification for partial position close."""
position_type = "LONG" if action_type == 'long_reduced' else "SHORT"
remaining_size = abs(position['contracts'])
avg_entry = pnl_data.get('avg_entry_price', position['avg_entry_price'])
pnl = pnl_data['pnl']
pnl_percent = pnl_data['pnl_percent']
pnl_emoji = "๐ข" if pnl >= 0 else "๐ด"
partial_value = amount * price
message = f"""
๐ Position Partially Closed
๐ {position_type} Partial Exit:
โข Token: {token}
โข Direction: {position_type}
โข Closed Size: {amount} {token}
โข Exit Price: ${price:,.2f}
โข Remaining Size: {remaining_size} {token}
{pnl_emoji} Partial P&L:
โข Entry Price: ${avg_entry:,.2f}
โข Exit Value: ${partial_value:,.2f}
โข P&L: ${pnl:,.2f} ({pnl_percent:+.2f}%)
โข Result: {"PROFIT" if pnl >= 0 else "LOSS"}
๐ฐ Position Status:
โข Status: PARTIALLY CLOSED ๐
โข Take Profit Strategy: Active
โฐ Time: {time_str}
๐ Use /positions to view remaining position
"""
await self.send_message(message.strip())
logger.info(f"๐ข Position reduced: {token} {position_type} -{amount} @ ${price} P&L: ${pnl:.2f}")
async def _send_position_closed_notification(self, token: str, side: str, amount: float, price: float, position: Dict, pnl_data: Dict, action_type: str, time_str: str):
"""Send notification for fully closed position."""
position_type = "LONG" if action_type == 'long_closed' else "SHORT"
avg_entry = pnl_data.get('avg_entry_price', position['avg_entry_price'])
pnl = pnl_data['pnl']
pnl_percent = pnl_data['pnl_percent']
pnl_emoji = "๐ข" if pnl >= 0 else "๐ด"
entry_count = position.get('entry_count', 1)
exit_value = amount * price
message = f"""
๐ฏ Position Fully Closed
๐ {position_type} Position Summary:
โข Token: {token}
โข Direction: {position_type}
โข Total Size: {amount} {token}
โข Average Entry: ${avg_entry:,.2f}
โข Exit Price: ${price:,.2f}
โข Exit Value: ${exit_value:,.2f}
{pnl_emoji} Total P&L:
โข P&L: ${pnl:,.2f} ({pnl_percent:+.2f}%)
โข Result: {"PROFIT" if pnl >= 0 else "LOSS"}
โข Entry Points Used: {entry_count}
โ
Trade Complete:
โข Status: FULLY CLOSED ๐ฏ
โข Position: FLAT
โฐ Time: {time_str}
๐ Use /stats to view updated performance
"""
await self.send_message(message.strip())
logger.info(f"๐ข Position closed: {token} {position_type} {amount} @ ${price} Total P&L: ${pnl:.2f}")
async def _send_position_flipped_notification(self, token: str, side: str, amount: float, price: float, action_type: str, time_str: str):
"""Send notification for position flip (close and reverse)."""
if action_type == 'long_closed_and_short_opened':
old_type = "LONG"
new_type = "SHORT"
else:
old_type = "SHORT"
new_type = "LONG"
message = f"""
๐ Position Flipped
๐ Direction Change:
โข Token: {token}
โข Previous: {old_type} position
โข New: {new_type} position
โข Size: {amount} {token}
โข Price: ${price:,.2f}
๐ฏ Trade Summary:
โข {old_type} position: CLOSED โ
โข {new_type} position: OPENED ๐
โข Flip Price: ${price:,.2f}
โข Status: POSITION REVERSED
โฐ Time: {time_str}
๐ก Strategy: Directional change
๐ Use /positions to view new position
"""
await self.send_message(message.strip())
logger.info(f"๐ข Position flipped: {token} {old_type} -> {new_type} @ ${price}")
async def monitoring_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle the /monitoring command to show monitoring status."""
if not self.is_authorized(update.effective_chat.id):
await update.message.reply_text("โ Unauthorized access.")
return
# Get alarm statistics
alarm_stats = self.alarm_manager.get_statistics()
status_text = f"""
๐ System Monitoring Status
๐ Order Monitoring:
โข Active: {'โ
Yes' if self.monitoring_active else 'โ No'}
โข Check Interval: {Config.BOT_HEARTBEAT_SECONDS} seconds
โข Orders Tracked: {len(self.last_known_orders)}
โข Positions Tracked: {len(self.last_known_positions)}
๐ Price Alarms:
โข Active Alarms: {alarm_stats['total_active']}
โข Triggered Today: {alarm_stats['total_triggered']}
โข Tokens Monitored: {alarm_stats['tokens_tracked']}
โข Next Alarm ID: {alarm_stats['next_id']}
๐ External Trade Monitoring:
โข Last Check: {self.last_processed_trade_time or 'Not started'}
โข Auto Stats Update: โ
Enabled
โข External Notifications: โ
Enabled
๐ก๏ธ Risk Management:
โข Automatic Stop Loss: {'โ
Enabled' if Config.RISK_MANAGEMENT_ENABLED else 'โ Disabled'}
โข Stop Loss Threshold: {Config.STOP_LOSS_PERCENTAGE}%
โข Position Monitoring: {'โ
Active' if Config.RISK_MANAGEMENT_ENABLED else 'โ Inactive'}
๐ Notifications:
โข ๐ Position Opened/Increased
โข ๐ Position Partially/Fully Closed
โข ๐ฏ P&L Calculations
โข ๐ Price Alarm Triggers
โข ๐ External Trade Detection
โข ๐ Automatic Stop Loss Triggers
โฐ Last Check: {datetime.now().strftime('%H:%M:%S')}
๐ก Monitoring Features:
โข Real-time order fill detection
โข Automatic P&L calculation
โข Position change tracking
โข Price alarm monitoring
โข External trade monitoring
โข Auto stats synchronization
โข Instant Telegram notifications
"""
if alarm_stats['token_breakdown']:
status_text += f"\n\n๐ Active Alarms by Token:\n"
for token, count in alarm_stats['token_breakdown'].items():
status_text += f"โข {token}: {count} alarm{'s' if count != 1 else ''}\n"
await update.message.reply_text(status_text.strip(), parse_mode='HTML')
async def alarm_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle the /alarm command for price alerts."""
if not self.is_authorized(update.effective_chat.id):
await update.message.reply_text("โ Unauthorized access.")
return
try:
if not context.args or len(context.args) == 0:
# No arguments - list all alarms
alarms = self.alarm_manager.get_all_active_alarms()
message = self.alarm_manager.format_alarm_list(alarms)
await update.message.reply_text(message, parse_mode='HTML')
return
elif len(context.args) == 1:
arg = context.args[0]
# Check if argument is a number (alarm ID to remove)
try:
alarm_id = int(arg)
# Remove alarm by ID
if self.alarm_manager.remove_alarm(alarm_id):
await update.message.reply_text(f"โ
Alarm ID {alarm_id} has been removed.")
else:
await update.message.reply_text(f"โ Alarm ID {alarm_id} not found.")
return
except ValueError:
# Not a number, treat as token
token = arg.upper()
alarms = self.alarm_manager.get_alarms_by_token(token)
message = self.alarm_manager.format_alarm_list(alarms, f"{token} Price Alarms")
await update.message.reply_text(message, parse_mode='HTML')
return
elif len(context.args) == 2:
# Set new alarm: /alarm TOKEN PRICE
token = context.args[0].upper()
target_price = float(context.args[1])
# Get current market price
symbol = f"{token}/USDC:USDC"
market_data = self.client.get_market_data(symbol)
if not market_data or not market_data.get('ticker'):
await update.message.reply_text(f"โ Could not fetch current price for {token}")
return
current_price = float(market_data['ticker'].get('last', 0))
if current_price <= 0:
await update.message.reply_text(f"โ Invalid current price for {token}")
return
# Create the alarm
alarm = self.alarm_manager.create_alarm(token, target_price, current_price)
# Format confirmation message
direction_emoji = "๐" if alarm['direction'] == 'above' else "๐"
price_diff = abs(target_price - current_price)
price_diff_percent = (price_diff / current_price) * 100
message = f"""
โ
Price Alarm Created
๐ Alarm Details:
โข Alarm ID: {alarm['id']}
โข Token: {token}
โข Target Price: ${target_price:,.2f}
โข Current Price: ${current_price:,.2f}
โข Direction: {alarm['direction'].upper()}
{direction_emoji} Alert Condition:
Will trigger when {token} price moves {alarm['direction']} ${target_price:,.2f}
๐ฐ Price Difference:
โข Distance: ${price_diff:,.2f} ({price_diff_percent:.2f}%)
โข Status: ACTIVE โ
โฐ Created: {datetime.now().strftime('%H:%M:%S')}
๐ก The alarm will be checked every {Config.BOT_HEARTBEAT_SECONDS} seconds and you'll receive a notification when triggered.
"""
await update.message.reply_text(message.strip(), parse_mode='HTML')
else:
# Too many arguments
await update.message.reply_text(
"โ Invalid usage. Examples:\n\n"
"โข /alarm
- List all alarms\n"
"โข /alarm BTC
- List BTC alarms\n"
"โข /alarm BTC 50000
- Set alarm for BTC at $50,000\n"
"โข /alarm 3
- Remove alarm ID 3",
parse_mode='HTML'
)
except ValueError:
await update.message.reply_text("โ Invalid price format. Please use numbers only.")
except Exception as e:
error_message = f"โ Error processing alarm command: {str(e)}"
await update.message.reply_text(error_message)
logger.error(f"Error in alarm command: {e}")
async def logs_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle the /logs command to show log file statistics and cleanup options."""
if not self.is_authorized(update.effective_chat.id):
await update.message.reply_text("โ Unauthorized access.")
return
try:
# Check for cleanup argument
if context.args and len(context.args) >= 1:
if context.args[0].lower() == 'cleanup':
# Get days parameter (default 30)
days_to_keep = 30
if len(context.args) >= 2:
try:
days_to_keep = int(context.args[1])
except ValueError:
await update.message.reply_text("โ Invalid number of days. Using default (30).")
# Perform cleanup
await update.message.reply_text(f"๐งน Cleaning up log files older than {days_to_keep} days...")
cleanup_logs(days_to_keep)
await update.message.reply_text(f"โ
Log cleanup completed!")
return
# Show log statistics
log_stats_text = format_log_stats()
# Add additional info
status_text = f"""
๐ System Logging Status
{log_stats_text}
๐ Log Configuration:
โข Log Level: {Config.LOG_LEVEL}
โข Heartbeat Interval: {Config.BOT_HEARTBEAT_SECONDS}s
โข Bot Uptime: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
๐ก Log Management:
โข /logs cleanup
- Clean old logs (30 days)
โข /logs cleanup 7
- Clean logs older than 7 days
โข Log rotation happens automatically
โข Old backups are removed automatically
๐ง Configuration:
โข Rotation Type: {Config.LOG_ROTATION_TYPE}
โข Max Size: {Config.LOG_MAX_SIZE_MB}MB (size rotation)
โข Backup Count: {Config.LOG_BACKUP_COUNT}
"""
await update.message.reply_text(status_text.strip(), parse_mode='HTML')
except Exception as e:
error_message = f"โ Error processing logs command: {str(e)}"
await update.message.reply_text(error_message)
logger.error(f"Error in logs command: {e}")
async def performance_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle the /performance command to show token performance ranking or detailed stats."""
if not self.is_authorized(update.effective_chat.id):
await update.message.reply_text("โ Unauthorized access.")
return
try:
# Check if specific token is requested
if context.args and len(context.args) >= 1:
# Detailed performance for specific token
token = context.args[0].upper()
await self._show_token_performance(update, token)
else:
# Show token performance ranking
await self._show_performance_ranking(update)
except Exception as e:
error_message = f"โ Error processing performance command: {str(e)}"
await update.message.reply_text(error_message)
logger.error(f"Error in performance command: {e}")
async def _show_performance_ranking(self, update: Update):
"""Show token performance ranking (compressed view)."""
token_performance = self.stats.get_token_performance()
if not token_performance:
await update.message.reply_text(
"๐ Token Performance\n\n"
"๐ญ No trading data available yet.\n\n"
"๐ก Performance tracking starts after your first completed trades.\n"
"Use /long or /short to start trading!",
parse_mode='HTML'
)
return
# Sort tokens by total P&L (best to worst)
sorted_tokens = sorted(
token_performance.items(),
key=lambda x: x[1]['total_pnl'],
reverse=True
)
performance_text = "๐ Token Performance Ranking\n\n"
# Add ranking with emojis
for i, (token, stats) in enumerate(sorted_tokens, 1):
# Ranking emoji
if i == 1:
rank_emoji = "๐ฅ"
elif i == 2:
rank_emoji = "๐ฅ"
elif i == 3:
rank_emoji = "๐ฅ"
else:
rank_emoji = f"#{i}"
# P&L emoji
pnl_emoji = "๐ข" if stats['total_pnl'] >= 0 else "๐ด"
# Format the line
performance_text += f"{rank_emoji} {token}\n"
performance_text += f" {pnl_emoji} P&L: ${stats['total_pnl']:,.2f} ({stats['pnl_percentage']:+.1f}%)\n"
performance_text += f" ๐ Trades: {stats['completed_trades']}"
# Add win rate if there are completed trades
if stats['completed_trades'] > 0:
performance_text += f" | Win: {stats['win_rate']:.0f}%"
performance_text += "\n\n"
# Add summary
total_pnl = sum(stats['total_pnl'] for stats in token_performance.values())
total_trades = sum(stats['completed_trades'] for stats in token_performance.values())
total_pnl_emoji = "๐ข" if total_pnl >= 0 else "๐ด"
performance_text += f"๐ผ Portfolio Summary:\n"
performance_text += f" {total_pnl_emoji} Total P&L: ${total_pnl:,.2f}\n"
performance_text += f" ๐ Tokens Traded: {len(token_performance)}\n"
performance_text += f" ๐ Completed Trades: {total_trades}\n\n"
performance_text += f"๐ก Usage: /performance BTC
for detailed {Config.DEFAULT_TRADING_TOKEN} stats"
await update.message.reply_text(performance_text.strip(), parse_mode='HTML')
async def _show_token_performance(self, update: Update, token: str):
"""Show detailed performance for a specific token."""
token_stats = self.stats.get_token_detailed_stats(token)
# Check if token has any data
if token_stats.get('total_trades', 0) == 0:
await update.message.reply_text(
f"๐ {token} Performance\n\n"
f"๐ญ No trading history found for {token}.\n\n"
f"๐ก Start trading {token} with:\n"
f"โข /long {token} 100
\n"
f"โข /short {token} 100
\n\n"
f"๐ Use /performance
to see all token rankings.",
parse_mode='HTML'
)
return
# Check if there's a message (no completed trades)
if 'message' in token_stats and token_stats.get('completed_trades', 0) == 0:
await update.message.reply_text(
f"๐ {token} Performance\n\n"
f"{token_stats['message']}\n\n"
f"๐ Current Activity:\n"
f"โข Total Trades: {token_stats['total_trades']}\n"
f"โข Buy Orders: {token_stats.get('buy_trades', 0)}\n"
f"โข Sell Orders: {token_stats.get('sell_trades', 0)}\n"
f"โข Volume: ${token_stats.get('total_volume', 0):,.2f}\n\n"
f"๐ก Complete some trades to see P&L statistics!\n"
f"๐ Use /performance
to see all token rankings.",
parse_mode='HTML'
)
return
# Detailed stats display
pnl_emoji = "๐ข" if token_stats['total_pnl'] >= 0 else "๐ด"
performance_text = f"""
๐ {token} Detailed Performance
๐ฐ P&L Summary:
โข {pnl_emoji} Total P&L: ${token_stats['total_pnl']:,.2f} ({token_stats['pnl_percentage']:+.2f}%)
โข ๐ต Total Volume: ${token_stats['completed_volume']:,.2f}
โข ๐ Expectancy: ${token_stats['expectancy']:,.2f}
๐ Trading Activity:
โข Total Trades: {token_stats['total_trades']}
โข Completed: {token_stats['completed_trades']}
โข Buy Orders: {token_stats['buy_trades']}
โข Sell Orders: {token_stats['sell_trades']}
๐ Performance Metrics:
โข Win Rate: {token_stats['win_rate']:.1f}%
โข Profit Factor: {token_stats['profit_factor']:.2f}
โข Wins: {token_stats['total_wins']} | Losses: {token_stats['total_losses']}
๐ก Best/Worst:
โข Largest Win: ${token_stats['largest_win']:,.2f}
โข Largest Loss: ${token_stats['largest_loss']:,.2f}
โข Avg Win: ${token_stats['avg_win']:,.2f}
โข Avg Loss: ${token_stats['avg_loss']:,.2f}
"""
# Add recent trades if available
if token_stats.get('recent_trades'):
performance_text += f"\n๐ Recent Trades:\n"
for trade in token_stats['recent_trades'][-3:]: # Last 3 trades
trade_time = datetime.fromisoformat(trade['timestamp']).strftime('%m/%d %H:%M')
side_emoji = "๐ข" if trade['side'] == 'buy' else "๐ด"
pnl_display = f" | P&L: ${trade.get('pnl', 0):.2f}" if trade.get('pnl', 0) != 0 else ""
performance_text += f"โข {side_emoji} {trade['side'].upper()} ${trade['value']:,.0f} @ {trade_time}{pnl_display}\n"
performance_text += f"\n๐ Use /performance
to see all token rankings"
await update.message.reply_text(performance_text.strip(), parse_mode='HTML')
async def daily_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle the /daily command to show daily performance stats."""
if not self.is_authorized(update.effective_chat.id):
await update.message.reply_text("โ Unauthorized access.")
return
try:
daily_stats = self.stats.get_daily_stats(10)
if not daily_stats:
await update.message.reply_text(
"๐
Daily Performance\n\n"
"๐ญ No daily performance data available yet.\n\n"
"๐ก Daily stats are calculated from completed trades.\n"
"Start trading to see daily performance!",
parse_mode='HTML'
)
return
daily_text = "๐
Daily Performance (Last 10 Days)\n\n"
total_pnl = 0
total_trades = 0
trading_days = 0
for day_stats in daily_stats:
if day_stats['has_trades']:
# Day with completed trades
pnl_emoji = "๐ข" if day_stats['pnl'] >= 0 else "๐ด"
daily_text += f"๐ {day_stats['date_formatted']}\n"
daily_text += f" {pnl_emoji} P&L: ${day_stats['pnl']:,.2f} ({day_stats['pnl_pct']:+.1f}%)\n"
daily_text += f" ๐ Trades: {day_stats['trades']}\n\n"
total_pnl += day_stats['pnl']
total_trades += day_stats['trades']
trading_days += 1
else:
# Day with no trades
daily_text += f"๐ {day_stats['date_formatted']}\n"
daily_text += f" ๐ญ No completed trades\n\n"
# Add summary
if trading_days > 0:
total_pnl_emoji = "๐ข" if total_pnl >= 0 else "๐ด"
daily_text += f"๐ผ 10-Day Summary:\n"
daily_text += f" {total_pnl_emoji} Total P&L: ${total_pnl:,.2f}\n"
daily_text += f" ๐ Total Trades: {total_trades}\n"
daily_text += f" ๐ Trading Days: {trading_days}/10\n"
daily_text += f" ๐ Avg per Trading Day: ${total_pnl/trading_days:,.2f}"
else:
daily_text += f"๐ผ 10-Day Summary:\n"
daily_text += f" ๐ญ No completed trades in the last 10 days\n"
daily_text += f" ๐ก Start trading to see daily performance!"
await update.message.reply_text(daily_text.strip(), parse_mode='HTML')
except Exception as e:
error_message = f"โ Error processing daily command: {str(e)}"
await update.message.reply_text(error_message)
logger.error(f"Error in daily command: {e}")
async def weekly_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle the /weekly command to show weekly performance stats."""
if not self.is_authorized(update.effective_chat.id):
await update.message.reply_text("โ Unauthorized access.")
return
try:
weekly_stats = self.stats.get_weekly_stats(10)
if not weekly_stats:
await update.message.reply_text(
"๐ Weekly Performance\n\n"
"๐ญ No weekly performance data available yet.\n\n"
"๐ก Weekly stats are calculated from completed trades.\n"
"Start trading to see weekly performance!",
parse_mode='HTML'
)
return
weekly_text = "๐ Weekly Performance (Last 10 Weeks)\n\n"
total_pnl = 0
total_trades = 0
trading_weeks = 0
for week_stats in weekly_stats:
if week_stats['has_trades']:
# Week with completed trades
pnl_emoji = "๐ข" if week_stats['pnl'] >= 0 else "๐ด"
weekly_text += f"๐ {week_stats['week_formatted']}\n"
weekly_text += f" {pnl_emoji} P&L: ${week_stats['pnl']:,.2f} ({week_stats['pnl_pct']:+.1f}%)\n"
weekly_text += f" ๐ Trades: {week_stats['trades']}\n\n"
total_pnl += week_stats['pnl']
total_trades += week_stats['trades']
trading_weeks += 1
else:
# Week with no trades
weekly_text += f"๐ {week_stats['week_formatted']}\n"
weekly_text += f" ๐ญ No completed trades\n\n"
# Add summary
if trading_weeks > 0:
total_pnl_emoji = "๐ข" if total_pnl >= 0 else "๐ด"
weekly_text += f"๐ผ 10-Week Summary:\n"
weekly_text += f" {total_pnl_emoji} Total P&L: ${total_pnl:,.2f}\n"
weekly_text += f" ๐ Total Trades: {total_trades}\n"
weekly_text += f" ๐ Trading Weeks: {trading_weeks}/10\n"
weekly_text += f" ๐ Avg per Trading Week: ${total_pnl/trading_weeks:,.2f}"
else:
weekly_text += f"๐ผ 10-Week Summary:\n"
weekly_text += f" ๐ญ No completed trades in the last 10 weeks\n"
weekly_text += f" ๐ก Start trading to see weekly performance!"
await update.message.reply_text(weekly_text.strip(), parse_mode='HTML')
except Exception as e:
error_message = f"โ Error processing weekly command: {str(e)}"
await update.message.reply_text(error_message)
logger.error(f"Error in weekly command: {e}")
async def monthly_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle the /monthly command to show monthly performance stats."""
if not self.is_authorized(update.effective_chat.id):
await update.message.reply_text("โ Unauthorized access.")
return
try:
monthly_stats = self.stats.get_monthly_stats(10)
if not monthly_stats:
await update.message.reply_text(
"๐ Monthly Performance\n\n"
"๐ญ No monthly performance data available yet.\n\n"
"๐ก Monthly stats are calculated from completed trades.\n"
"Start trading to see monthly performance!",
parse_mode='HTML'
)
return
monthly_text = "๐ Monthly Performance (Last 10 Months)\n\n"
total_pnl = 0
total_trades = 0
trading_months = 0
for month_stats in monthly_stats:
if month_stats['has_trades']:
# Month with completed trades
pnl_emoji = "๐ข" if month_stats['pnl'] >= 0 else "๐ด"
monthly_text += f"๐
{month_stats['month_formatted']}\n"
monthly_text += f" {pnl_emoji} P&L: ${month_stats['pnl']:,.2f} ({month_stats['pnl_pct']:+.1f}%)\n"
monthly_text += f" ๐ Trades: {month_stats['trades']}\n\n"
total_pnl += month_stats['pnl']
total_trades += month_stats['trades']
trading_months += 1
else:
# Month with no trades
monthly_text += f"๐
{month_stats['month_formatted']}\n"
monthly_text += f" ๐ญ No completed trades\n\n"
# Add summary
if trading_months > 0:
total_pnl_emoji = "๐ข" if total_pnl >= 0 else "๐ด"
monthly_text += f"๐ผ 10-Month Summary:\n"
monthly_text += f" {total_pnl_emoji} Total P&L: ${total_pnl:,.2f}\n"
monthly_text += f" ๐ Total Trades: {total_trades}\n"
monthly_text += f" ๐ Trading Months: {trading_months}/10\n"
monthly_text += f" ๐ Avg per Trading Month: ${total_pnl/trading_months:,.2f}"
else:
monthly_text += f"๐ผ 10-Month Summary:\n"
monthly_text += f" ๐ญ No completed trades in the last 10 months\n"
monthly_text += f" ๐ก Start trading to see monthly performance!"
await update.message.reply_text(monthly_text.strip(), parse_mode='HTML')
except Exception as e:
error_message = f"โ Error processing monthly command: {str(e)}"
await update.message.reply_text(error_message)
logger.error(f"Error in monthly command: {e}")
async def risk_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle the /risk command to show advanced risk metrics."""
if not self.is_authorized(update.effective_chat.id):
await update.message.reply_text("โ Unauthorized access.")
return
try:
# Get current balance for context
balance = self.client.get_balance()
current_balance = 0
if balance and balance.get('total'):
current_balance = float(balance['total'].get('USDC', 0))
# Get risk metrics and basic stats
risk_metrics = self.stats.get_risk_metrics()
basic_stats = self.stats.get_basic_stats()
# Check if we have enough data for risk calculations
if basic_stats['completed_trades'] < 2:
await update.message.reply_text(
"๐ Risk Analysis\n\n"
"๐ญ Insufficient Data\n\n"
f"โข Current completed trades: {basic_stats['completed_trades']}\n"
f"โข Required for risk analysis: 2+ trades\n"
f"โข Daily balance snapshots: {len(self.stats.data.get('daily_balances', []))}\n\n"
"๐ก To enable risk analysis:\n"
"โข Complete more trades to generate returns data\n"
"โข Bot automatically records daily balance snapshots\n"
"โข Risk metrics will be available after sufficient trading history\n\n"
"๐ Use /stats for current performance metrics",
parse_mode='HTML'
)
return
# Format the risk analysis message
risk_text = f"""
๐ Risk Analysis & Advanced Metrics
๐ฏ Risk-Adjusted Performance:
โข Sharpe Ratio: {risk_metrics['sharpe_ratio']:.3f}
โข Sortino Ratio: {risk_metrics['sortino_ratio']:.3f}
โข Annual Volatility: {risk_metrics['volatility']:.2f}%
๐ Drawdown Analysis:
โข Maximum Drawdown: {risk_metrics['max_drawdown']:.2f}%
โข Value at Risk (95%): {risk_metrics['var_95']:.2f}%
๐ฐ Portfolio Context:
โข Current Balance: ${current_balance:,.2f}
โข Initial Balance: ${basic_stats['initial_balance']:,.2f}
โข Total P&L: ${basic_stats['total_pnl']:,.2f}
โข Days Active: {basic_stats['days_active']}
๐ Risk Interpretation:
"""
# Add interpretive guidance
sharpe = risk_metrics['sharpe_ratio']
if sharpe > 2.0:
risk_text += "โข ๐ข Excellent risk-adjusted returns (Sharpe > 2.0)\n"
elif sharpe > 1.0:
risk_text += "โข ๐ก Good risk-adjusted returns (Sharpe > 1.0)\n"
elif sharpe > 0.5:
risk_text += "โข ๐ Moderate risk-adjusted returns (Sharpe > 0.5)\n"
elif sharpe > 0:
risk_text += "โข ๐ด Poor risk-adjusted returns (Sharpe > 0)\n"
else:
risk_text += "โข โซ Negative risk-adjusted returns (Sharpe < 0)\n"
max_dd = risk_metrics['max_drawdown']
if max_dd < 5:
risk_text += "โข ๐ข Low maximum drawdown (< 5%)\n"
elif max_dd < 15:
risk_text += "โข ๐ก Moderate maximum drawdown (< 15%)\n"
elif max_dd < 30:
risk_text += "โข ๐ High maximum drawdown (< 30%)\n"
else:
risk_text += "โข ๐ด Very High maximum drawdown (> 30%)\n"
volatility = risk_metrics['volatility']
if volatility < 10:
risk_text += "โข ๐ข Low portfolio volatility (< 10%)\n"
elif volatility < 25:
risk_text += "โข ๐ก Moderate portfolio volatility (< 25%)\n"
elif volatility < 50:
risk_text += "โข ๐ High portfolio volatility (< 50%)\n"
else:
risk_text += "โข ๐ด Very High portfolio volatility (> 50%)\n"
risk_text += f"""
๐ก Risk Definitions:
โข Sharpe Ratio: Risk-adjusted return (excess return / volatility)
โข Sortino Ratio: Return / downside volatility (focuses on bad volatility)
โข Max Drawdown: Largest peak-to-trough decline
โข VaR 95%: Maximum expected loss 95% of the time
โข Volatility: Annualized standard deviation of returns
๐ Data Based On:
โข Completed Trades: {basic_stats['completed_trades']}
โข Daily Balance Records: {len(self.stats.data.get('daily_balances', []))}
โข Trading Period: {basic_stats['days_active']} days
๐ Use /stats for trading performance metrics
"""
await update.message.reply_text(risk_text.strip(), parse_mode='HTML')
except Exception as e:
error_message = f"โ Error processing risk command: {str(e)}"
await update.message.reply_text(error_message)
logger.error(f"Error in risk command: {e}")
async def version_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle the /version command to show bot version and system info."""
if not self.is_authorized(update.effective_chat.id):
await update.message.reply_text("โ Unauthorized access.")
return
try:
# Get system info
import platform
import sys
from datetime import datetime
uptime_info = "Unknown"
try:
# Try to get process uptime if available
import psutil
process = psutil.Process()
create_time = datetime.fromtimestamp(process.create_time())
uptime = datetime.now() - create_time
days = uptime.days
hours, remainder = divmod(uptime.seconds, 3600)
minutes, _ = divmod(remainder, 60)
uptime_info = f"{days}d {hours}h {minutes}m"
except ImportError:
# psutil not available, skip uptime
pass
# Get stats info
basic_stats = self.stats.get_basic_stats()
version_text = f"""
๐ค Trading Bot Version & System Info
๐ฑ Bot Information:
โข Version: {self.version}
โข Network: {'Testnet' if Config.HYPERLIQUID_TESTNET else 'Mainnet'}
โข Uptime: {uptime_info}
โข Default Token: {Config.DEFAULT_TRADING_TOKEN}
๐ป System Information:
โข Python: {sys.version.split()[0]}
โข Platform: {platform.system()} {platform.release()}
โข Architecture: {platform.machine()}
๐ Trading Stats:
โข Total Orders: {basic_stats['total_trades']}
โข Completed Trades: {basic_stats['completed_trades']}
โข Days Active: {basic_stats['days_active']}
โข Start Date: {basic_stats['start_date']}
๐ Monitoring Status:
โข Order Monitoring: {'โ
Active' if self.order_monitoring_task and not self.order_monitoring_task.done() else 'โ Inactive'}
โข External Trades: โ
Active
โข Price Alarms: โ
Active ({len(self.alarms)} active)
โข Risk Management: {'โ
Enabled' if Config.RISK_MANAGEMENT_ENABLED else 'โ Disabled'}
โฐ Current Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
"""
await update.message.reply_text(version_text.strip(), parse_mode='HTML')
except Exception as e:
error_message = f"โ Error processing version command: {str(e)}"
await update.message.reply_text(error_message)
logger.error(f"Error in version command: {e}")
def _get_position_state(self, symbol: str) -> Dict[str, Any]:
"""Get current position state for a symbol."""
if symbol not in self.position_tracker:
self.position_tracker[symbol] = {
'contracts': 0.0,
'avg_entry_price': 0.0,
'total_cost_basis': 0.0,
'entry_count': 0,
'entry_history': [], # List of {price, amount, timestamp}
'last_update': datetime.now().isoformat()
}
return self.position_tracker[symbol]
def _update_position_state(self, symbol: str, side: str, amount: float, price: float, timestamp: str = None):
"""Update position state with a new trade."""
if timestamp is None:
timestamp = datetime.now().isoformat()
position = self._get_position_state(symbol)
if side.lower() == 'buy':
# Adding to long position or reducing short position
if position['contracts'] >= 0:
# Opening/adding to long position
new_cost = amount * price
old_cost = position['total_cost_basis']
old_contracts = position['contracts']
position['contracts'] += amount
position['total_cost_basis'] += new_cost
position['avg_entry_price'] = position['total_cost_basis'] / position['contracts'] if position['contracts'] > 0 else 0
position['entry_count'] += 1
position['entry_history'].append({
'price': price,
'amount': amount,
'timestamp': timestamp,
'side': 'buy'
})
logger.info(f"๐ Position updated: {symbol} LONG {position['contracts']:.6f} @ avg ${position['avg_entry_price']:.2f}")
return 'long_opened' if old_contracts == 0 else 'long_increased'
else:
# Reducing short position
reduction = min(amount, abs(position['contracts']))
position['contracts'] += reduction
if position['contracts'] >= 0:
# Short position fully closed or flipped to long
if position['contracts'] == 0:
self._reset_position_state(symbol)
return 'short_closed'
else:
# Flipped to long - need to track new long position
remaining_amount = amount - reduction
position['contracts'] = remaining_amount
position['total_cost_basis'] = remaining_amount * price
position['avg_entry_price'] = price
return 'short_closed_and_long_opened'
else:
return 'short_reduced'
elif side.lower() == 'sell':
# Adding to short position or reducing long position
if position['contracts'] <= 0:
# Opening/adding to short position
position['contracts'] -= amount
position['entry_count'] += 1
position['entry_history'].append({
'price': price,
'amount': amount,
'timestamp': timestamp,
'side': 'sell'
})
logger.info(f"๐ Position updated: {symbol} SHORT {abs(position['contracts']):.6f} @ ${price:.2f}")
return 'short_opened' if position['contracts'] == -amount else 'short_increased'
else:
# Reducing long position
reduction = min(amount, position['contracts'])
position['contracts'] -= reduction
# Adjust cost basis proportionally
if position['contracts'] > 0:
reduction_ratio = reduction / (position['contracts'] + reduction)
position['total_cost_basis'] *= (1 - reduction_ratio)
return 'long_reduced'
else:
# Long position fully closed
if position['contracts'] == 0:
self._reset_position_state(symbol)
return 'long_closed'
else:
# Flipped to short
remaining_amount = amount - reduction
position['contracts'] = -remaining_amount
return 'long_closed_and_short_opened'
position['last_update'] = timestamp
return 'unknown'
def _reset_position_state(self, symbol: str):
"""Reset position state when position is fully closed."""
if symbol in self.position_tracker:
del self.position_tracker[symbol]
def _calculate_position_pnl(self, symbol: str, exit_amount: float, exit_price: float) -> Dict[str, float]:
"""Calculate P&L for a position exit."""
position = self._get_position_state(symbol)
if position['contracts'] == 0:
return {'pnl': 0.0, 'pnl_percent': 0.0}
avg_entry = position['avg_entry_price']
if position['contracts'] > 0: # Long position
pnl = exit_amount * (exit_price - avg_entry)
else: # Short position
pnl = exit_amount * (avg_entry - exit_price)
cost_basis = exit_amount * avg_entry
pnl_percent = (pnl / cost_basis * 100) if cost_basis > 0 else 0
return {
'pnl': pnl,
'pnl_percent': pnl_percent,
'avg_entry_price': avg_entry
}
async def _send_external_trade_notification(self, trade: Dict[str, Any]):
"""Send generic notification for external trades (fallback)."""
try:
symbol = trade.get('symbol', '')
side = trade.get('side', '')
amount = float(trade.get('amount', 0))
price = float(trade.get('price', 0))
timestamp = trade.get('timestamp', '')
# Extract token from symbol
token = symbol.split('/')[0] if '/' in symbol else symbol
# Format timestamp
try:
trade_time = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
time_str = trade_time.strftime('%H:%M:%S')
except:
time_str = "Unknown"
# Determine trade type and emoji
side_emoji = "๐ข" if side.lower() == 'buy' else "๐ด"
trade_value = amount * price
message = f"""
๐ External Trade Detected
๐ Trade Details:
โข Token: {token}
โข Side: {side.upper()}
โข Amount: {amount} {token}
โข Price: ${price:,.2f}
โข Value: ${trade_value:,.2f}
{side_emoji} Source: External Platform Trade
โฐ Time: {time_str}
๐ Note: This trade was executed outside the Telegram bot
๐ Stats have been automatically updated
"""
await self.send_message(message.strip())
logger.info(f"๐ข Sent generic external trade notification: {side} {amount} {token}")
except Exception as e:
logger.error(f"โ Error sending external trade notification: {e}")
async def _check_stop_losses(self, current_positions: list):
"""Check all positions for stop loss triggers and execute automatic exits."""
try:
if not current_positions:
return
stop_loss_triggers = []
for position in current_positions:
symbol = position.get('symbol')
contracts = float(position.get('contracts', 0))
entry_price = float(position.get('entryPx', 0))
if not symbol or contracts == 0 or entry_price == 0:
continue
# Get current market price
market_data = self.client.get_market_data(symbol)
if not market_data or not market_data.get('ticker'):
continue
current_price = float(market_data['ticker'].get('last', 0))
if current_price == 0:
continue
# Calculate current P&L percentage
if contracts > 0: # Long position
pnl_percent = ((current_price - entry_price) / entry_price) * 100
else: # Short position
pnl_percent = ((entry_price - current_price) / entry_price) * 100
# Check if stop loss should trigger
if pnl_percent <= -Config.STOP_LOSS_PERCENTAGE:
token = symbol.split('/')[0] if '/' in symbol else symbol
stop_loss_triggers.append({
'symbol': symbol,
'token': token,
'contracts': contracts,
'entry_price': entry_price,
'current_price': current_price,
'pnl_percent': pnl_percent
})
# Execute stop losses
for trigger in stop_loss_triggers:
await self._execute_automatic_stop_loss(trigger)
except Exception as e:
logger.error(f"โ Error checking stop losses: {e}")
async def _execute_automatic_stop_loss(self, trigger: Dict[str, Any]):
"""Execute an automatic stop loss order."""
try:
symbol = trigger['symbol']
token = trigger['token']
contracts = trigger['contracts']
entry_price = trigger['entry_price']
current_price = trigger['current_price']
pnl_percent = trigger['pnl_percent']
# Determine the exit side (opposite of position)
exit_side = 'sell' if contracts > 0 else 'buy'
contracts_abs = abs(contracts)
# Send notification before executing
await self._send_stop_loss_notification(trigger, "triggered")
# Execute the stop loss order (market order for immediate execution)
try:
if exit_side == 'sell':
order = self.client.create_market_sell_order(symbol, contracts_abs)
else:
order = self.client.create_market_buy_order(symbol, contracts_abs)
if order:
logger.info(f"๐ Stop loss executed: {token} {exit_side} {contracts_abs} @ ${current_price}")
# Record the trade in stats and update position tracking
action_type = self.stats.record_trade_with_enhanced_tracking(symbol, exit_side, contracts_abs, current_price, order.get('id', 'stop_loss'), "auto_stop_loss")
# Send success notification
await self._send_stop_loss_notification(trigger, "executed", order)
else:
logger.error(f"โ Stop loss order failed for {token}")
await self._send_stop_loss_notification(trigger, "failed")
except Exception as order_error:
logger.error(f"โ Stop loss order execution failed for {token}: {order_error}")
await self._send_stop_loss_notification(trigger, "failed", error=str(order_error))
except Exception as e:
logger.error(f"โ Error executing automatic stop loss: {e}")
async def _send_stop_loss_notification(self, trigger: Dict[str, Any], status: str, order: Dict = None, error: str = None):
"""Send notification for stop loss events."""
try:
token = trigger['token']
contracts = trigger['contracts']
entry_price = trigger['entry_price']
current_price = trigger['current_price']
pnl_percent = trigger['pnl_percent']
position_type = "LONG" if contracts > 0 else "SHORT"
contracts_abs = abs(contracts)
if status == "triggered":
title = "๐ Stop Loss Triggered"
status_text = f"Stop loss triggered at {Config.STOP_LOSS_PERCENTAGE}% loss"
emoji = "๐จ"
elif status == "executed":
title = "โ
Stop Loss Executed"
status_text = "Position closed automatically"
emoji = "๐"
elif status == "failed":
title = "โ Stop Loss Failed"
status_text = f"Stop loss execution failed{': ' + error if error else ''}"
emoji = "โ ๏ธ"
else:
return
# Calculate loss
loss_value = contracts_abs * abs(current_price - entry_price)
message = f"""
{title}
{emoji} Risk Management Alert
๐ Position Details:
โข Token: {token}
โข Direction: {position_type}
โข Size: {contracts_abs} contracts
โข Entry Price: ${entry_price:,.2f}
โข Current Price: ${current_price:,.2f}
๐ด Loss Details:
โข Loss: ${loss_value:,.2f} ({pnl_percent:.2f}%)
โข Stop Loss Threshold: {Config.STOP_LOSS_PERCENTAGE}%
๐ Action: {status_text}
โฐ Time: {datetime.now().strftime('%H:%M:%S')}
"""
if order and status == "executed":
order_id = order.get('id', 'N/A')
message += f"\n๐ Order ID: {order_id}"
await self.send_message(message.strip())
logger.info(f"๐ข Sent stop loss notification: {token} {status}")
except Exception as e:
logger.error(f"โ Error sending stop loss notification: {e}")
async def _process_filled_orders(self, filled_order_ids: set, current_positions: list):
"""Process filled orders using enhanced position tracking."""
try:
# For bot-initiated orders, we'll detect changes in position size
# and send appropriate notifications using the enhanced system
# This method will be triggered when orders placed through the bot are filled
# The external trade monitoring will handle trades made outside the bot
# Update position tracking based on current positions
await self._update_position_tracking(current_positions)
except Exception as e:
logger.error(f"โ Error processing filled orders: {e}")
async def _update_position_tracking(self, current_positions: list):
"""Update the legacy position tracking data for compatibility."""
new_position_map = {}
for position in current_positions:
symbol = position.get('symbol')
contracts = float(position.get('contracts', 0))
entry_price = float(position.get('entryPx', 0))
if symbol and contracts != 0:
new_position_map[symbol] = {
'contracts': contracts,
'entry_price': entry_price
}
# Also update our enhanced position tracker if not already present
if symbol not in self.position_tracker:
self._get_position_state(symbol)
self.position_tracker[symbol]['contracts'] = contracts
self.position_tracker[symbol]['avg_entry_price'] = entry_price
self.position_tracker[symbol]['total_cost_basis'] = contracts * entry_price
self.last_known_positions = new_position_map
async def main_async():
"""Async main entry point for the Telegram bot."""
try:
# Validate configuration
if not Config.validate():
logger.error("โ Configuration validation failed!")
return
if not Config.TELEGRAM_ENABLED:
logger.error("โ Telegram is not enabled in configuration")
return
# Create and run the bot
bot = TelegramTradingBot()
await bot.run()
except KeyboardInterrupt:
logger.info("๐ Bot stopped by user")
except Exception as e:
logger.error(f"โ Unexpected error: {e}")
raise
def main():
"""Main entry point for the Telegram bot."""
try:
# Check if we're already in an asyncio context
try:
loop = asyncio.get_running_loop()
# If we get here, we're already in an asyncio context
logger.error("โ Cannot run main() from within an asyncio context. Use main_async() instead.")
return
except RuntimeError:
# No running loop, safe to use asyncio.run()
pass
# Run the async main function
asyncio.run(main_async())
except Exception as e:
logger.error(f"โ Failed to start telegram bot: {e}")
raise
if __name__ == "__main__":
main()