From a78bbb6cc5d0eba16961f135cc1723c6a7d670a0 Mon Sep 17 00:00:00 2001 From: cyclop-bot <178948048+cyclop-bot@users.noreply.github.com> Date: Mon, 2 Jun 2025 16:40:24 -0500 Subject: [PATCH] Refactor: Separate logic for handle_message handler in TelegramHelper --- telegram_helper.py | 189 ++++++++++++++++++++++++++++++--------------- 1 file changed, 128 insertions(+), 61 deletions(-) diff --git a/telegram_helper.py b/telegram_helper.py index 3c73328..f54e117 100644 --- a/telegram_helper.py +++ b/telegram_helper.py @@ -3,59 +3,68 @@ import logging import sys import asyncio import time +from typing import TypedDict, Union, TypeAlias from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes, CallbackQueryHandler from browse_command import browse_command, button_callback +class MessageHandlerLogicResult(TypedDict): + success: bool + response_text: Union[str, None] + error_message: Union[str, None] + +LogicResult: TypeAlias = MessageHandlerLogicResult + class TelegramHelper: CLAUDE_REBOOT_TARGET = 'claude' HTML_QUOTE_BLOCK_START = '
Thinking...' HTML_QUOTE_BLOCK_END = '
' DEFAULT_REBOOT_CLAUDE_FILE = '.reboot_claude' DEFAULT_REBOOT_FILE = '.doreboot' + CHUNK_MESSAGE_SLEEP_DURATION = 0.1 - def __init__(self, bot, reboot_claude_file_path: str | None = None, reboot_file_path: str | None = None): + def __init__(self, bot, + reboot_claude_file_path: str | None = None, + reboot_file_path: str | None = None, + chunk_message_sleep_duration: float | None = None): self.bot = bot self.telegram_bot_token = os.getenv('TELEGRAM_BOT_TOKEN') self.start_time = time.time() self.reboot_claude_file = reboot_claude_file_path or self.DEFAULT_REBOOT_CLAUDE_FILE self.reboot_file = reboot_file_path or self.DEFAULT_REBOOT_FILE + self.chunk_message_sleep_duration = chunk_message_sleep_duration if chunk_message_sleep_duration is not None else self.CHUNK_MESSAGE_SLEEP_DURATION - # --- Start Command --- - async def _start_logic(self) -> str: # New logic method + async def _start_logic(self) -> str: await self.bot.start() return "Hello! I'm your AI assistant. How can I help you today?" - async def start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: # Modified + async def start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: response_message = await self._start_logic() await update.message.reply_text(response_message) - # --- Clear Command --- - async def _clear_logic(self, user_id: int) -> str: # New logic method + async def _clear_logic(self, user_id: int) -> str: self.bot.clear_conversation_history(user_id) return "Conversation history cleared. Let's start fresh!" - async def clear(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: # Modified + async def clear(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: user_id = update.effective_user.id response_message = await self._clear_logic(user_id) await update.message.reply_text(response_message) - # --- Status Command --- - async def _status_logic(self) -> str: # New logic method + async def _status_logic(self) -> str: return await self.bot.get_bot_status() - async def status(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: # Modified + async def status(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: response_message = await self._status_logic() await update.message.reply_text(response_message) - # --- Switch Command --- - async def _switch_logic(self) -> str: # New logic method + async def _switch_logic(self) -> str: if hasattr(self.bot, 'switch_model'): return await self.bot.switch_model() else: return "Model switching is not supported for this bot." - async def switch(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: # Modified + async def switch(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: response_message = await self._switch_logic() await update.message.reply_text(response_message) @@ -71,80 +80,136 @@ class TelegramHelper: reply_markup=reply_markup ) - async def handle_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + async def _handle_message_logic(self, user_id: int, user_message: str) -> LogicResult: try: - user_id = update.effective_user.id - user_message = update.message.text + response = await self.bot.handle_message(user_id, user_message) + processed_response = response.replace("", self.HTML_QUOTE_BLOCK_START).replace("", self.HTML_QUOTE_BLOCK_END) + return LogicResult(success=True, response_text=processed_response, error_message=None) + except Exception as e: + logging.error(f"Error in _handle_message_logic for user {user_id}: {str(e)}") + return LogicResult(success=False, response_text=None, error_message=str(e)) - logging.info(f"Message from user {user_id}: {user_message}") + async def handle_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + user_id = update.effective_user.id + user_message = update.message.text + chat_id = update.effective_chat.id + status_message_obj = None - status_message_obj = await update.message.reply_text("Processing your request...", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("Abort", callback_data='abort')]])) + try: + status_message_obj = await update.message.reply_text( + "Processing your request...", + reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("Abort", callback_data='abort')]]) + ) self.bot.set_processing_status(user_id, status_message_obj.message_id) - response = await self.bot.handle_message(user_id, user_message) + logic_result = await self._handle_message_logic(user_id, user_message) - await context.bot.delete_message(chat_id=update.effective_chat.id, message_id=status_message_obj.message_id) + if status_message_obj: + try: + await context.bot.delete_message(chat_id=chat_id, message_id=status_message_obj.message_id) + except Exception as e_del: + logging.warning(f"Failed to delete status message: {e_del}") self.bot.clear_processing_status(user_id) - response = response.replace("", self.HTML_QUOTE_BLOCK_START).replace("", self.HTML_QUOTE_BLOCK_END) - - if len(response) > 4096: - chunks = [response[i:i + 4096] for i in range(0, len(response), 4096)] - for chunk in chunks: - await update.message.reply_text(chunk) - await asyncio.sleep(0.1) + if logic_result["success"]: + response_text = logic_result["response_text"] + if response_text: + if len(response_text) > 4096: + chunks = [response_text[i:i + 4096] for i in range(0, len(response_text), 4096)] + for chunk in chunks: + await update.message.reply_text(chunk) + await asyncio.sleep(self.chunk_message_sleep_duration) + else: + await update.message.reply_text(response_text) + else: + logging.warning("Successful logic result but no response text.") + await update.message.reply_text("Something went unexpectedly well, but I have nothing to say.") else: - await update.message.reply_text(response) + await update.message.reply_text("Sorry, an error occurred while processing your request.") except Exception as e: - logging.error(f"An error occurred: {str(e)}") - await update.message.reply_text("Sorry, an error occurred while processing your request.") + logging.error(f"Outer error in handle_message for user {user_id}: {str(e)}") + if status_message_obj and self.bot.processing_status.get(user_id): + self.bot.clear_processing_status(user_id) + try: + await update.message.reply_text("Sorry, an unexpected error occurred with the bot.") + except Exception as e_reply: + logging.error(f"Failed to send error reply: {e_reply}") - # --- Abort Processing (Callback) --- - async def _abort_processing_logic(self, user_id: int) -> str: # New logic method + async def _abort_processing_logic(self, user_id: int) -> str: return await self.bot.abort_processing(user_id) - async def abort_processing(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: # Modified + async def abort_processing(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: query = update.callback_query - await query.answer() # Telegram specific interaction - + await query.answer() user_id = query.from_user.id - response_text = await self._abort_processing_logic(user_id) # Call logic method - - await query.edit_message_text(text=response_text) # Telegram specific interaction + response_text = await self._abort_processing_logic(user_id) + await query.edit_message_text(text=response_text) async def reboot(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - user_message = update.message.text.split() - if len(user_message) > 1 and user_message[1].lower() == self.CLAUDE_REBOOT_TARGET: - open(self.reboot_claude_file, 'w').close() + user_message_parts = update.message.text.split() + chat_id_to_write = str(update.effective_chat.id) if update and update.effective_chat else "" + + self._reboot_logic(user_message_parts, chat_id_to_write) # Call to new logic method if update: await update.message.reply_text("Rebooting the bot...") logging.info("Received reboot command. Exiting process...") - - reboot_f_path = self.reboot_file - if not os.path.exists(reboot_f_path): - with open(reboot_f_path, 'w') as f: - chat_id_to_write = str(update.effective_chat.id) if update and update.effective_chat else "" - f.write(chat_id_to_write) - sys.exit(0) + sys.exit(0) # This will be hard to test directly - async def check_doreboot_file(self, application: Application): - reboot_f_path = self.reboot_file - if os.path.exists(reboot_f_path): - with open(reboot_f_path, 'r') as f: - chat_id = f.read().strip() - if chat_id: - try: - await application.bot.send_message(chat_id=chat_id, text="The application has finished initializing.") - except Exception as e: - logging.error(f"Failed to send reboot notification to chat_id {chat_id}: {e}") - os.remove(reboot_f_path) + # New internal logic method for reboot preparations + def _reboot_logic(self, user_message_parts: list[str], chat_id_to_write: str) -> None: + if len(user_message_parts) > 1 and user_message_parts[1].lower() == self.CLAUDE_REBOOT_TARGET: + try: + with open(self.reboot_claude_file, 'w') as f: + f.write("") # Ensure file is created/truncated + logging.info(f"Created Claude reboot file: {self.reboot_claude_file}") + except IOError as e: + logging.error(f"Failed to create Claude reboot file {self.reboot_claude_file}: {e}") + + if not os.path.exists(self.reboot_file): + try: + with open(self.reboot_file, 'w') as f: + f.write(chat_id_to_write) + logging.info(f"Created main reboot file: {self.reboot_file}") + except IOError as e: + logging.error(f"Failed to create main reboot file {self.reboot_file}: {e}") + else: + logging.info(f"Main reboot file {self.reboot_file} already exists.") + + async def _check_doreboot_file_logic(self) -> str | None: + """Checks for the reboot file and returns the chat_id if found, then removes the file.""" + if os.path.exists(self.reboot_file): + chat_id = None + try: + with open(self.reboot_file, 'r') as f: + chat_id = f.read().strip() + os.remove(self.reboot_file) + logging.info(f"Removed reboot file: {self.reboot_file}") + return chat_id + except IOError as e: + logging.error(f"Error processing reboot file {self.reboot_file}: {e}") + # If we read chat_id but failed to remove, still return chat_id to attempt notification + if chat_id is not None: + return chat_id + return None + + async def check_doreboot_file(self, application: Application) -> None: + """Checks for reboot file and sends notification if applicable.""" + chat_id = await self._check_doreboot_file_logic() + if chat_id: + try: + await application.bot.send_message(chat_id=chat_id, text="The application has finished initializing.") + logging.info(f"Sent reboot notification to chat_id: {chat_id}") + except Exception as e: + logging.error(f"Failed to send reboot notification to chat_id {chat_id}: {e}") async def browse(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + # For testing TelegramHelper, browse_command will be mocked or its effects on self.bot asserted. await browse_command(update, context, self.bot) def run(self): + # Consider allowing injection of a pre-built application for advanced test cases. application = Application.builder().token(self.telegram_bot_token).build() application.add_handler(CommandHandler("start", self.start)) @@ -160,9 +225,11 @@ class TelegramHelper: logging.info("Bot is running...") loop = asyncio.get_event_loop() - if loop.is_running(): + # These pragma: no cover comments are hints for test coverage tools to ignore these lines if needed, + # as testing both branches of is_running() can be environment-dependent. + if loop.is_running(): # pragma: no cover loop.create_task(self.check_doreboot_file(application)) - else: + else: # pragma: no cover asyncio.run(self.check_doreboot_file(application)) application.run_polling()