|
@@ -0,0 +1,395 @@
|
|
|
+#!/usr/bin/env python3
|
|
|
+"""
|
|
|
+RSI Monitor - Monitors RSI vs RSI_SMA crossovers and sends notifications.
|
|
|
+"""
|
|
|
+
|
|
|
+import logging
|
|
|
+import asyncio
|
|
|
+import numpy as np
|
|
|
+from typing import Optional, Dict, Any, List
|
|
|
+from datetime import datetime, timezone
|
|
|
+from src.config.config import Config
|
|
|
+from src.clients.hyperliquid_client import HyperliquidClient
|
|
|
+from src.notifications.notification_manager import NotificationManager
|
|
|
+
|
|
|
+try:
|
|
|
+ import talib
|
|
|
+except ImportError:
|
|
|
+ logging.error("talib is required for RSI calculations. Install with: pip install talib")
|
|
|
+ raise
|
|
|
+
|
|
|
+logger = logging.getLogger(__name__)
|
|
|
+
|
|
|
+class RsiMonitor:
|
|
|
+ """Monitors RSI vs RSI_SMA crossovers and sends notifications."""
|
|
|
+
|
|
|
+ def __init__(self, hyperliquid_client: HyperliquidClient, notification_manager: NotificationManager):
|
|
|
+ """
|
|
|
+ Initialize the RSI monitor.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ hyperliquid_client: Client for fetching market data
|
|
|
+ notification_manager: Manager for sending notifications
|
|
|
+ """
|
|
|
+ self.client = hyperliquid_client
|
|
|
+ self.notification_manager = notification_manager
|
|
|
+
|
|
|
+ # RSI configuration from config
|
|
|
+ self.enabled = Config.RSI_NOTIFICATION_ENABLED
|
|
|
+ self.timeframe = Config.RSI_TIMEFRAME
|
|
|
+ self.rsi_period = Config.RSI_PERIOD
|
|
|
+ self.rsi_sma_period = Config.RSI_SMA_PERIOD
|
|
|
+
|
|
|
+ # State tracking to avoid duplicate notifications
|
|
|
+ self.last_crossover_state: Dict[str, Optional[str]] = {} # symbol -> 'above' or 'below'
|
|
|
+ self.last_notification_time: Dict[str, datetime] = {} # symbol -> last notification time
|
|
|
+
|
|
|
+ # New candle tracking - only calculate when there's a new candle
|
|
|
+ self.last_candle_timestamps: Dict[str, int] = {} # symbol -> last candle timestamp (UTC)
|
|
|
+
|
|
|
+ # Minimum time between notifications for same symbol (in seconds)
|
|
|
+ self.notification_cooldown = 300 # 5 minutes
|
|
|
+
|
|
|
+ logger.info(f"🔧 RSI Monitor initialized: {self.timeframe} timeframe, RSI({self.rsi_period}), SMA({self.rsi_sma_period})")
|
|
|
+
|
|
|
+ async def has_new_candle(self, symbol: str) -> bool:
|
|
|
+ """
|
|
|
+ Check if there's a new candle for the given symbol and timeframe.
|
|
|
+ Only returns True if there's actually a new candle since last check.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ symbol: Trading symbol (e.g., 'BTC/USDC:USDC')
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ True if new candle detected, False otherwise
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ # Get just the latest candle to check timestamp
|
|
|
+ latest_candles = self.client.get_candle_data(symbol, self.timeframe, limit=1)
|
|
|
+
|
|
|
+ if not latest_candles or len(latest_candles) == 0:
|
|
|
+ logger.warning(f"⚠️ No candle data available for {symbol}")
|
|
|
+ return False
|
|
|
+
|
|
|
+ # Get the timestamp of the latest candle
|
|
|
+ latest_timestamp = int(latest_candles[0][0]) # candle[0] is timestamp
|
|
|
+
|
|
|
+ # Check if this is a new candle
|
|
|
+ last_known_timestamp = self.last_candle_timestamps.get(symbol, 0)
|
|
|
+
|
|
|
+ if latest_timestamp > last_known_timestamp:
|
|
|
+ # New candle detected
|
|
|
+ self.last_candle_timestamps[symbol] = latest_timestamp
|
|
|
+
|
|
|
+ # Convert timestamp to readable format for logging
|
|
|
+ candle_time = datetime.fromtimestamp(latest_timestamp / 1000, tz=timezone.utc)
|
|
|
+ logger.info(f"🕯️ New {self.timeframe} candle detected for {symbol} at {candle_time.strftime('%Y-%m-%d %H:%M:%S')} UTC")
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ # No new candle
|
|
|
+ logger.debug(f"🔄 No new candle for {symbol} (latest: {latest_timestamp}, known: {last_known_timestamp})")
|
|
|
+ return False
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ Error checking for new candle {symbol}: {e}")
|
|
|
+ return False
|
|
|
+
|
|
|
+ async def check_rsi_crossover(self, symbol: str) -> Optional[Dict[str, Any]]:
|
|
|
+ """
|
|
|
+ Check for RSI vs RSI_SMA crossovers for a given symbol.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ symbol: Trading symbol (e.g., 'BTC/USDC:USDC')
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ Dictionary with crossover info if detected, None otherwise
|
|
|
+ """
|
|
|
+ if not self.enabled:
|
|
|
+ return None
|
|
|
+
|
|
|
+ try:
|
|
|
+ # Get candle data - need enough candles for RSI + SMA calculation
|
|
|
+ # RSI needs at least rsi_period + 1, SMA needs rsi_sma_period on top of that
|
|
|
+ required_candles = self.rsi_period + self.rsi_sma_period + 20 # Extra buffer
|
|
|
+
|
|
|
+ logger.debug(f"📊 Fetching {required_candles} candles for {symbol} ({self.timeframe})")
|
|
|
+ candles = self.client.get_candle_data(symbol, self.timeframe, limit=required_candles)
|
|
|
+
|
|
|
+ if not candles or len(candles) < required_candles:
|
|
|
+ logger.warning(f"⚠️ Insufficient candle data for {symbol}: got {len(candles) if candles else 0}, need {required_candles}")
|
|
|
+ return None
|
|
|
+
|
|
|
+ # Extract close prices
|
|
|
+ close_prices = np.array([float(candle[4]) for candle in candles]) # candle[4] is close price
|
|
|
+
|
|
|
+ # Calculate RSI using talib
|
|
|
+ rsi_values = talib.RSI(close_prices, timeperiod=self.rsi_period)
|
|
|
+
|
|
|
+ # Calculate RSI SMA (Simple Moving Average of RSI)
|
|
|
+ rsi_sma_values = talib.SMA(rsi_values, timeperiod=self.rsi_sma_period)
|
|
|
+
|
|
|
+ # Get the latest values (skip NaN values)
|
|
|
+ valid_indices = ~(np.isnan(rsi_values) | np.isnan(rsi_sma_values))
|
|
|
+ if not np.any(valid_indices):
|
|
|
+ logger.warning(f"⚠️ No valid RSI/RSI_SMA values for {symbol}")
|
|
|
+ return None
|
|
|
+
|
|
|
+ # Get last few valid values to detect crossover
|
|
|
+ valid_rsi = rsi_values[valid_indices]
|
|
|
+ valid_rsi_sma = rsi_sma_values[valid_indices]
|
|
|
+
|
|
|
+ if len(valid_rsi) < 2:
|
|
|
+ logger.warning(f"⚠️ Not enough valid RSI data for crossover detection: {symbol}")
|
|
|
+ return None
|
|
|
+
|
|
|
+ # Current and previous values
|
|
|
+ current_rsi = valid_rsi[-1]
|
|
|
+ current_rsi_sma = valid_rsi_sma[-1]
|
|
|
+ prev_rsi = valid_rsi[-2]
|
|
|
+ prev_rsi_sma = valid_rsi_sma[-2]
|
|
|
+
|
|
|
+ logger.debug(f"📈 {symbol} RSI: {current_rsi:.2f} (prev: {prev_rsi:.2f}), RSI_SMA: {current_rsi_sma:.2f} (prev: {prev_rsi_sma:.2f})")
|
|
|
+
|
|
|
+ # Detect crossover
|
|
|
+ crossover_detected = None
|
|
|
+
|
|
|
+ # RSI crosses above RSI_SMA (bullish signal)
|
|
|
+ if prev_rsi <= prev_rsi_sma and current_rsi > current_rsi_sma:
|
|
|
+ crossover_detected = {
|
|
|
+ 'type': 'bullish',
|
|
|
+ 'direction': 'above',
|
|
|
+ 'current_rsi': current_rsi,
|
|
|
+ 'current_rsi_sma': current_rsi_sma,
|
|
|
+ 'prev_rsi': prev_rsi,
|
|
|
+ 'prev_rsi_sma': prev_rsi_sma,
|
|
|
+ 'timestamp': datetime.now()
|
|
|
+ }
|
|
|
+ logger.info(f"🟢 RSI Bullish Crossover detected for {symbol}: RSI {current_rsi:.2f} > RSI_SMA {current_rsi_sma:.2f}")
|
|
|
+
|
|
|
+ # RSI crosses below RSI_SMA (bearish signal)
|
|
|
+ elif prev_rsi >= prev_rsi_sma and current_rsi < current_rsi_sma:
|
|
|
+ crossover_detected = {
|
|
|
+ 'type': 'bearish',
|
|
|
+ 'direction': 'below',
|
|
|
+ 'current_rsi': current_rsi,
|
|
|
+ 'current_rsi_sma': current_rsi_sma,
|
|
|
+ 'prev_rsi': prev_rsi,
|
|
|
+ 'prev_rsi_sma': prev_rsi_sma,
|
|
|
+ 'timestamp': datetime.now()
|
|
|
+ }
|
|
|
+ logger.info(f"🔴 RSI Bearish Crossover detected for {symbol}: RSI {current_rsi:.2f} < RSI_SMA {current_rsi_sma:.2f}")
|
|
|
+
|
|
|
+ return crossover_detected
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ Error checking RSI crossover for {symbol}: {e}")
|
|
|
+ return None
|
|
|
+
|
|
|
+ async def should_send_notification(self, symbol: str, crossover_type: str) -> bool:
|
|
|
+ """
|
|
|
+ Check if we should send a notification based on cooldown and state.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ symbol: Trading symbol
|
|
|
+ crossover_type: 'bullish' or 'bearish'
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ True if notification should be sent
|
|
|
+ """
|
|
|
+ now = datetime.now()
|
|
|
+
|
|
|
+ # Check if this is a different crossover type than last time
|
|
|
+ last_state = self.last_crossover_state.get(symbol)
|
|
|
+ if last_state == crossover_type:
|
|
|
+ logger.debug(f"🔄 Duplicate crossover state for {symbol}: {crossover_type}")
|
|
|
+ return False
|
|
|
+
|
|
|
+ # Check cooldown period
|
|
|
+ last_notification = self.last_notification_time.get(symbol)
|
|
|
+ if last_notification:
|
|
|
+ time_since_last = (now - last_notification).total_seconds()
|
|
|
+ if time_since_last < self.notification_cooldown:
|
|
|
+ logger.debug(f"⏳ Cooldown active for {symbol}: {time_since_last:.0f}s < {self.notification_cooldown}s")
|
|
|
+ return False
|
|
|
+
|
|
|
+ return True
|
|
|
+
|
|
|
+ async def send_crossover_notification(self, symbol: str, crossover_info: Dict[str, Any]):
|
|
|
+ """
|
|
|
+ Send notification for RSI crossover.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ symbol: Trading symbol
|
|
|
+ crossover_info: Crossover details from check_rsi_crossover
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ # Extract token from symbol
|
|
|
+ token = symbol.split('/')[0] if '/' in symbol else symbol.split(':')[0]
|
|
|
+
|
|
|
+ crossover_type = crossover_info['type']
|
|
|
+ direction = crossover_info['direction']
|
|
|
+ current_rsi = crossover_info['current_rsi']
|
|
|
+ current_rsi_sma = crossover_info['current_rsi_sma']
|
|
|
+ timestamp = crossover_info['timestamp']
|
|
|
+
|
|
|
+ # Determine emoji and message tone
|
|
|
+ if crossover_type == 'bullish':
|
|
|
+ main_emoji = "🟢"
|
|
|
+ signal_emoji = "📈"
|
|
|
+ signal_text = "BULLISH"
|
|
|
+ color_text = "GREEN"
|
|
|
+ action_hint = "Consider LONG position"
|
|
|
+ else: # bearish
|
|
|
+ main_emoji = "🔴"
|
|
|
+ signal_emoji = "📉"
|
|
|
+ signal_text = "BEARISH"
|
|
|
+ color_text = "RED"
|
|
|
+ action_hint = "Consider SHORT position"
|
|
|
+
|
|
|
+ message = f"""
|
|
|
+{main_emoji} <b>RSI CROSSOVER ALERT</b>
|
|
|
+
|
|
|
+{signal_emoji} <b>{signal_text} Signal Detected:</b>
|
|
|
+• Token: {token}
|
|
|
+• Timeframe: {self.timeframe}
|
|
|
+• RSI crossed {direction.upper()} RSI_SMA
|
|
|
+
|
|
|
+📊 <b>Current Values:</b>
|
|
|
+• RSI({self.rsi_period}): {current_rsi:.2f}
|
|
|
+• RSI_SMA({self.rsi_sma_period}): {current_rsi_sma:.2f}
|
|
|
+• Difference: {current_rsi - current_rsi_sma:+.2f}
|
|
|
+
|
|
|
+🎯 <b>Signal Strength:</b> {color_text} {signal_emoji}
|
|
|
+💡 <b>Suggestion:</b> {action_hint}
|
|
|
+
|
|
|
+⏰ <b>Time:</b> {timestamp.strftime('%H:%M:%S')}
|
|
|
+
|
|
|
+📱 <b>Quick Actions:</b>
|
|
|
+• /market {token} - View market data
|
|
|
+• /price {token} - Current price
|
|
|
+• /long {token} [amount] - Open long
|
|
|
+• /short {token} [amount] - Open short
|
|
|
+ """
|
|
|
+
|
|
|
+ await self.notification_manager.send_generic_notification(message.strip())
|
|
|
+
|
|
|
+ # Update state tracking
|
|
|
+ self.last_crossover_state[symbol] = crossover_type
|
|
|
+ self.last_notification_time[symbol] = timestamp
|
|
|
+
|
|
|
+ logger.info(f"🔔 RSI crossover notification sent: {token} {signal_text} ({direction})")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ Error sending RSI crossover notification: {e}")
|
|
|
+
|
|
|
+ async def monitor_symbol(self, symbol: str):
|
|
|
+ """
|
|
|
+ Monitor a single symbol for RSI crossovers.
|
|
|
+ Only calculates RSI if there's a new candle.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ symbol: Trading symbol to monitor
|
|
|
+ """
|
|
|
+ if not self.enabled:
|
|
|
+ return
|
|
|
+
|
|
|
+ try:
|
|
|
+ # First check if there's a new candle - if not, skip calculation
|
|
|
+ if not await self.has_new_candle(symbol):
|
|
|
+ logger.debug(f"🔄 No new candle for {symbol}, skipping RSI calculation")
|
|
|
+ return
|
|
|
+
|
|
|
+ logger.debug(f"🔍 New candle detected - checking RSI crossover for {symbol}")
|
|
|
+
|
|
|
+ # Check for crossover (this will fetch full candle data for calculation)
|
|
|
+ crossover_info = await self.check_rsi_crossover(symbol)
|
|
|
+
|
|
|
+ if crossover_info:
|
|
|
+ crossover_type = crossover_info['type']
|
|
|
+
|
|
|
+ # Check if we should send notification
|
|
|
+ if await self.should_send_notification(symbol, crossover_type):
|
|
|
+ await self.send_crossover_notification(symbol, crossover_info)
|
|
|
+ else:
|
|
|
+ logger.debug(f"🔕 Skipping notification for {symbol} ({crossover_type}) - cooldown or duplicate")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ Error monitoring RSI for {symbol}: {e}")
|
|
|
+
|
|
|
+ async def monitor_symbols_for_new_candles(self, symbols: List[str]):
|
|
|
+ """
|
|
|
+ Efficiently monitor multiple symbols for RSI crossovers.
|
|
|
+ Only calculates RSI for symbols that have new candles.
|
|
|
+ This method is optimized for integration with MarketMonitor heartbeat.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ symbols: List of trading symbols to monitor
|
|
|
+ """
|
|
|
+ if not self.enabled:
|
|
|
+ logger.debug("📊 RSI monitoring is disabled")
|
|
|
+ return
|
|
|
+
|
|
|
+ if not symbols:
|
|
|
+ logger.debug("⚠️ No symbols provided for RSI monitoring")
|
|
|
+ return
|
|
|
+
|
|
|
+ # First pass: Check which symbols have new candles (lightweight)
|
|
|
+ symbols_with_new_candles = []
|
|
|
+ for symbol in symbols:
|
|
|
+ try:
|
|
|
+ if await self.has_new_candle(symbol):
|
|
|
+ symbols_with_new_candles.append(symbol)
|
|
|
+ except Exception as e:
|
|
|
+ logger.warning(f"⚠️ Error checking new candle for {symbol}: {e}")
|
|
|
+ continue
|
|
|
+
|
|
|
+ if not symbols_with_new_candles:
|
|
|
+ logger.debug(f"🔄 No new candles detected for any of {len(symbols)} monitored symbols")
|
|
|
+ return
|
|
|
+
|
|
|
+ logger.info(f"🕯️ New candles detected for {len(symbols_with_new_candles)}/{len(symbols)} symbols: {symbols_with_new_candles}")
|
|
|
+
|
|
|
+ # Second pass: Only calculate RSI for symbols with new candles
|
|
|
+ for symbol in symbols_with_new_candles:
|
|
|
+ try:
|
|
|
+ logger.debug(f"🔍 Calculating RSI crossover for {symbol} (new candle)")
|
|
|
+
|
|
|
+ # Check for crossover (this will fetch full candle data for calculation)
|
|
|
+ crossover_info = await self.check_rsi_crossover(symbol)
|
|
|
+
|
|
|
+ if crossover_info:
|
|
|
+ crossover_type = crossover_info['type']
|
|
|
+
|
|
|
+ # Check if we should send notification
|
|
|
+ if await self.should_send_notification(symbol, crossover_type):
|
|
|
+ await self.send_crossover_notification(symbol, crossover_info)
|
|
|
+ else:
|
|
|
+ logger.debug(f"🔕 Skipping notification for {symbol} ({crossover_type}) - cooldown or duplicate")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ Error calculating RSI for {symbol}: {e}")
|
|
|
+ continue
|
|
|
+
|
|
|
+ async def monitor_symbols(self, symbols: List[str]):
|
|
|
+ """
|
|
|
+ Monitor multiple symbols for RSI crossovers (legacy method).
|
|
|
+ For backward compatibility - calls the new efficient method.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ symbols: List of trading symbols to monitor
|
|
|
+ """
|
|
|
+ await self.monitor_symbols_for_new_candles(symbols)
|
|
|
+
|
|
|
+ def get_status(self) -> Dict[str, Any]:
|
|
|
+ """Get current status of RSI monitor."""
|
|
|
+ return {
|
|
|
+ 'enabled': self.enabled,
|
|
|
+ 'timeframe': self.timeframe,
|
|
|
+ 'rsi_period': self.rsi_period,
|
|
|
+ 'rsi_sma_period': self.rsi_sma_period,
|
|
|
+ 'notification_cooldown': self.notification_cooldown,
|
|
|
+ 'monitored_symbols': list(self.last_crossover_state.keys()),
|
|
|
+ 'last_states': dict(self.last_crossover_state),
|
|
|
+ 'tracked_candle_timestamps': dict(self.last_candle_timestamps),
|
|
|
+ 'symbols_with_candle_data': len(self.last_candle_timestamps)
|
|
|
+ }
|