Kaynağa Gözat

Refactor authorization checks in management commands for improved security

- Updated the _is_authorized method in InfoCommandsBase to check authorization based on effective_chat instead of effective_user.
- Refactored management commands to utilize the new authorization logic, enhancing code consistency and security.
- Simplified unauthorized access responses by using a common reply method across commands.
Carles Sentis 1 hafta önce
ebeveyn
işleme
6c1b0e76f7
3 değiştirilmiş dosya ile 44 ekleme ve 30 silme
  1. 10 2
      src/commands/info/base.py
  2. 33 27
      src/commands/management_commands.py
  3. 1 1
      trading_bot.py

+ 10 - 2
src/commands/info/base.py

@@ -16,9 +16,17 @@ class InfoCommandsBase:
 
     def _is_authorized(self, update: Update) -> bool:
         """Check if the user is authorized to use the command."""
-        if not update or not update.effective_user:
+        if not update or not update.effective_chat:
             return False
-        return True  # Add your authorization logic here
+        
+        from src.config.config import Config
+        chat_id = update.effective_chat.id
+        authorized = str(chat_id) == str(Config.TELEGRAM_CHAT_ID)
+        
+        if not authorized:
+            logger.warning(f"Unauthorized access attempt by chat_id: {chat_id}")
+        
+        return authorized
 
     async def _reply(self, update: Update, text: str, **kwargs) -> Optional[Message]:
         """Common reply method for all commands."""

+ 33 - 27
src/commands/management_commands.py

@@ -43,17 +43,16 @@ class ManagementCommands(InfoCommandsBase):
         self.monitoring_coordinator = monitoring_coordinator
         self.alarm_manager = AlarmManager()
     
-    def _is_authorized(self, chat_id: str) -> bool:
-        """Check if the chat ID is authorized."""
-        return str(chat_id) == str(Config.TELEGRAM_CHAT_ID)
+
     
     async def monitoring_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
         """Handle the /monitoring command."""
-        chat_id = update.effective_chat.id
-        if not self._is_authorized(chat_id):
-            await context.bot.send_message(chat_id=chat_id, text="❌ Unauthorized access.")
+        if not self._is_authorized(update):
+            await self._reply(update, "❌ Unauthorized access.")
             return
         
+        chat_id = update.effective_chat.id
+        
         # Get alarm statistics
         alarm_stats = self.alarm_manager.get_statistics()
         
@@ -131,11 +130,12 @@ class ManagementCommands(InfoCommandsBase):
     
     async def alarm_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
         """Handle the /alarm command."""
-        chat_id = update.effective_chat.id
-        if not self._is_authorized(chat_id):
-            await context.bot.send_message(chat_id=chat_id, text="❌ Unauthorized access.")
+        if not self._is_authorized(update):
+            await self._reply(update, "❌ Unauthorized access.")
             return
         
+        chat_id = update.effective_chat.id
+        
         try:
             if not context.args or len(context.args) == 0:
                 # No arguments - list all alarms
@@ -238,11 +238,12 @@ Will trigger when {token} price moves {alarm['direction']} {target_price_str}
     
     async def logs_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
         """Handle the /logs command."""
-        chat_id = update.effective_chat.id
-        if not self._is_authorized(chat_id):
-            await context.bot.send_message(chat_id=chat_id, text="❌ Unauthorized access.")
+        if not self._is_authorized(update):
+            await self._reply(update, "❌ Unauthorized access.")
             return
         
+        chat_id = update.effective_chat.id
+        
         try:
             logs_dir = "logs"
             
@@ -358,11 +359,12 @@ Will trigger when {token} price moves {alarm['direction']} {target_price_str}
     
     async def debug_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
         """Handle the /debug command."""
-        chat_id = update.effective_chat.id
-        if not self._is_authorized(chat_id):
-            await context.bot.send_message(chat_id=chat_id, text="❌ Unauthorized access.")
+        if not self._is_authorized(update):
+            await self._reply(update, "❌ Unauthorized access.")
             return
         
+        chat_id = update.effective_chat.id
+        
         try:
             # Get monitoring status
             monitoring_status = await self.monitoring_coordinator.get_monitoring_status()
@@ -431,11 +433,12 @@ Will trigger when {token} price moves {alarm['direction']} {target_price_str}
     
     async def version_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
         """Handle the /version command."""
-        chat_id = update.effective_chat.id
-        if not self._is_authorized(chat_id):
-            await context.bot.send_message(chat_id=chat_id, text="❌ Unauthorized access.")
+        if not self._is_authorized(update):
+            await self._reply(update, "❌ Unauthorized access.")
             return
         
+        chat_id = update.effective_chat.id
+        
         try:
             # Get monitoring status
             monitoring_status = await self.monitoring_coordinator.get_monitoring_status()
@@ -498,10 +501,11 @@ Will trigger when {token} price moves {alarm['direction']} {target_price_str}
     
     async def keyboard_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
         """Handle the /keyboard command to show the main keyboard."""
-        chat_id = update.effective_chat.id
-        if not self._is_authorized(chat_id):
-            await context.bot.send_message(chat_id=chat_id, text="❌ Unauthorized access.")
+        if not self._is_authorized(update):
+            await self._reply(update, "❌ Unauthorized access.")
             return
+        
+        chat_id = update.effective_chat.id
 
         # Define default keyboard layout
         default_keyboard = [
@@ -625,10 +629,11 @@ Will trigger when {token} price moves {alarm['direction']} {target_price_str}
 
     async def deposit_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
         """Handle the /deposit command to record a deposit."""
-        chat_id = update.effective_chat.id
-        if not self._is_authorized(chat_id):
-            await context.bot.send_message(chat_id=chat_id, text="❌ Unauthorized access.")
+        if not self._is_authorized(update):
+            await self._reply(update, "❌ Unauthorized access.")
             return
+        
+        chat_id = update.effective_chat.id
 
         try:
             # Parse arguments
@@ -682,10 +687,11 @@ Will trigger when {token} price moves {alarm['direction']} {target_price_str}
 
     async def withdrawal_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
         """Handle the /withdrawal command to record a withdrawal."""
-        chat_id = update.effective_chat.id
-        if not self._is_authorized(chat_id):
-            await context.bot.send_message(chat_id=chat_id, text="❌ Unauthorized access.")
+        if not self._is_authorized(update):
+            await self._reply(update, "❌ Unauthorized access.")
             return
+        
+        chat_id = update.effective_chat.id
 
         try:
             # Parse arguments

+ 1 - 1
trading_bot.py

@@ -14,7 +14,7 @@ from datetime import datetime
 from pathlib import Path
 
 # Bot version
-BOT_VERSION = "2.6.310"
+BOT_VERSION = "2.6.311"
 
 # Add src directory to Python path
 sys.path.insert(0, str(Path(__file__).parent / "src"))