Refactor: Separate logic for handle_message handler in TelegramHelper

This commit is contained in:
cyclop-bot
2025-06-02 16:40:24 -05:00
parent f52cee97cd
commit a78bbb6cc5
+118 -51
View File
@@ -3,59 +3,68 @@ import logging
import sys
import asyncio
import time
from typing import TypedDict, Union, TypeAlias
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes, CallbackQueryHandler
from browse_command import browse_command, button_callback
class MessageHandlerLogicResult(TypedDict):
success: bool
response_text: Union[str, None]
error_message: Union[str, None]
LogicResult: TypeAlias = MessageHandlerLogicResult
class TelegramHelper:
CLAUDE_REBOOT_TARGET = 'claude'
HTML_QUOTE_BLOCK_START = '<blockquote expandable><b>Thinking...</b>'
HTML_QUOTE_BLOCK_END = '</blockquote>'
DEFAULT_REBOOT_CLAUDE_FILE = '.reboot_claude'
DEFAULT_REBOOT_FILE = '.doreboot'
CHUNK_MESSAGE_SLEEP_DURATION = 0.1
def __init__(self, bot, reboot_claude_file_path: str | None = None, reboot_file_path: str | None = None):
def __init__(self, bot,
reboot_claude_file_path: str | None = None,
reboot_file_path: str | None = None,
chunk_message_sleep_duration: float | None = None):
self.bot = bot
self.telegram_bot_token = os.getenv('TELEGRAM_BOT_TOKEN')
self.start_time = time.time()
self.reboot_claude_file = reboot_claude_file_path or self.DEFAULT_REBOOT_CLAUDE_FILE
self.reboot_file = reboot_file_path or self.DEFAULT_REBOOT_FILE
self.chunk_message_sleep_duration = chunk_message_sleep_duration if chunk_message_sleep_duration is not None else self.CHUNK_MESSAGE_SLEEP_DURATION
# --- Start Command ---
async def _start_logic(self) -> str: # New logic method
async def _start_logic(self) -> str:
await self.bot.start()
return "Hello! I'm your AI assistant. How can I help you today?"
async def start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: # Modified
async def start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
response_message = await self._start_logic()
await update.message.reply_text(response_message)
# --- Clear Command ---
async def _clear_logic(self, user_id: int) -> str: # New logic method
async def _clear_logic(self, user_id: int) -> str:
self.bot.clear_conversation_history(user_id)
return "Conversation history cleared. Let's start fresh!"
async def clear(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: # Modified
async def clear(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
user_id = update.effective_user.id
response_message = await self._clear_logic(user_id)
await update.message.reply_text(response_message)
# --- Status Command ---
async def _status_logic(self) -> str: # New logic method
async def _status_logic(self) -> str:
return await self.bot.get_bot_status()
async def status(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: # Modified
async def status(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
response_message = await self._status_logic()
await update.message.reply_text(response_message)
# --- Switch Command ---
async def _switch_logic(self) -> str: # New logic method
async def _switch_logic(self) -> str:
if hasattr(self.bot, 'switch_model'):
return await self.bot.switch_model()
else:
return "Model switching is not supported for this bot."
async def switch(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: # Modified
async def switch(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
response_message = await self._switch_logic()
await update.message.reply_text(response_message)
@@ -71,80 +80,136 @@ class TelegramHelper:
reply_markup=reply_markup
)
async def handle_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
async def _handle_message_logic(self, user_id: int, user_message: str) -> LogicResult:
try:
response = await self.bot.handle_message(user_id, user_message)
processed_response = response.replace("<think>", self.HTML_QUOTE_BLOCK_START).replace("</think>", self.HTML_QUOTE_BLOCK_END)
return LogicResult(success=True, response_text=processed_response, error_message=None)
except Exception as e:
logging.error(f"Error in _handle_message_logic for user {user_id}: {str(e)}")
return LogicResult(success=False, response_text=None, error_message=str(e))
async def handle_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
user_id = update.effective_user.id
user_message = update.message.text
chat_id = update.effective_chat.id
status_message_obj = None
logging.info(f"Message from user {user_id}: {user_message}")
status_message_obj = await update.message.reply_text("Processing your request...", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("Abort", callback_data='abort')]]))
try:
status_message_obj = await update.message.reply_text(
"Processing your request...",
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("Abort", callback_data='abort')]])
)
self.bot.set_processing_status(user_id, status_message_obj.message_id)
response = await self.bot.handle_message(user_id, user_message)
logic_result = await self._handle_message_logic(user_id, user_message)
await context.bot.delete_message(chat_id=update.effective_chat.id, message_id=status_message_obj.message_id)
if status_message_obj:
try:
await context.bot.delete_message(chat_id=chat_id, message_id=status_message_obj.message_id)
except Exception as e_del:
logging.warning(f"Failed to delete status message: {e_del}")
self.bot.clear_processing_status(user_id)
response = response.replace("<think>", self.HTML_QUOTE_BLOCK_START).replace("</think>", self.HTML_QUOTE_BLOCK_END)
if len(response) > 4096:
chunks = [response[i:i + 4096] for i in range(0, len(response), 4096)]
if logic_result["success"]:
response_text = logic_result["response_text"]
if response_text:
if len(response_text) > 4096:
chunks = [response_text[i:i + 4096] for i in range(0, len(response_text), 4096)]
for chunk in chunks:
await update.message.reply_text(chunk)
await asyncio.sleep(0.1)
await asyncio.sleep(self.chunk_message_sleep_duration)
else:
await update.message.reply_text(response_text)
else:
logging.warning("Successful logic result but no response text.")
await update.message.reply_text("Something went unexpectedly well, but I have nothing to say.")
else:
await update.message.reply_text(response)
except Exception as e:
logging.error(f"An error occurred: {str(e)}")
await update.message.reply_text("Sorry, an error occurred while processing your request.")
# --- Abort Processing (Callback) ---
async def _abort_processing_logic(self, user_id: int) -> str: # New logic method
except Exception as e:
logging.error(f"Outer error in handle_message for user {user_id}: {str(e)}")
if status_message_obj and self.bot.processing_status.get(user_id):
self.bot.clear_processing_status(user_id)
try:
await update.message.reply_text("Sorry, an unexpected error occurred with the bot.")
except Exception as e_reply:
logging.error(f"Failed to send error reply: {e_reply}")
async def _abort_processing_logic(self, user_id: int) -> str:
return await self.bot.abort_processing(user_id)
async def abort_processing(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: # Modified
async def abort_processing(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
query = update.callback_query
await query.answer() # Telegram specific interaction
await query.answer()
user_id = query.from_user.id
response_text = await self._abort_processing_logic(user_id) # Call logic method
await query.edit_message_text(text=response_text) # Telegram specific interaction
response_text = await self._abort_processing_logic(user_id)
await query.edit_message_text(text=response_text)
async def reboot(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
user_message = update.message.text.split()
if len(user_message) > 1 and user_message[1].lower() == self.CLAUDE_REBOOT_TARGET:
open(self.reboot_claude_file, 'w').close()
user_message_parts = update.message.text.split()
chat_id_to_write = str(update.effective_chat.id) if update and update.effective_chat else ""
self._reboot_logic(user_message_parts, chat_id_to_write) # Call to new logic method
if update:
await update.message.reply_text("Rebooting the bot...")
logging.info("Received reboot command. Exiting process...")
sys.exit(0) # This will be hard to test directly
reboot_f_path = self.reboot_file
if not os.path.exists(reboot_f_path):
with open(reboot_f_path, 'w') as f:
chat_id_to_write = str(update.effective_chat.id) if update and update.effective_chat else ""
# New internal logic method for reboot preparations
def _reboot_logic(self, user_message_parts: list[str], chat_id_to_write: str) -> None:
if len(user_message_parts) > 1 and user_message_parts[1].lower() == self.CLAUDE_REBOOT_TARGET:
try:
with open(self.reboot_claude_file, 'w') as f:
f.write("") # Ensure file is created/truncated
logging.info(f"Created Claude reboot file: {self.reboot_claude_file}")
except IOError as e:
logging.error(f"Failed to create Claude reboot file {self.reboot_claude_file}: {e}")
if not os.path.exists(self.reboot_file):
try:
with open(self.reboot_file, 'w') as f:
f.write(chat_id_to_write)
sys.exit(0)
logging.info(f"Created main reboot file: {self.reboot_file}")
except IOError as e:
logging.error(f"Failed to create main reboot file {self.reboot_file}: {e}")
else:
logging.info(f"Main reboot file {self.reboot_file} already exists.")
async def check_doreboot_file(self, application: Application):
reboot_f_path = self.reboot_file
if os.path.exists(reboot_f_path):
with open(reboot_f_path, 'r') as f:
async def _check_doreboot_file_logic(self) -> str | None:
"""Checks for the reboot file and returns the chat_id if found, then removes the file."""
if os.path.exists(self.reboot_file):
chat_id = None
try:
with open(self.reboot_file, 'r') as f:
chat_id = f.read().strip()
os.remove(self.reboot_file)
logging.info(f"Removed reboot file: {self.reboot_file}")
return chat_id
except IOError as e:
logging.error(f"Error processing reboot file {self.reboot_file}: {e}")
# If we read chat_id but failed to remove, still return chat_id to attempt notification
if chat_id is not None:
return chat_id
return None
async def check_doreboot_file(self, application: Application) -> None:
"""Checks for reboot file and sends notification if applicable."""
chat_id = await self._check_doreboot_file_logic()
if chat_id:
try:
await application.bot.send_message(chat_id=chat_id, text="The application has finished initializing.")
logging.info(f"Sent reboot notification to chat_id: {chat_id}")
except Exception as e:
logging.error(f"Failed to send reboot notification to chat_id {chat_id}: {e}")
os.remove(reboot_f_path)
async def browse(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
# For testing TelegramHelper, browse_command will be mocked or its effects on self.bot asserted.
await browse_command(update, context, self.bot)
def run(self):
# Consider allowing injection of a pre-built application for advanced test cases.
application = Application.builder().token(self.telegram_bot_token).build()
application.add_handler(CommandHandler("start", self.start))
@@ -160,9 +225,11 @@ class TelegramHelper:
logging.info("Bot is running...")
loop = asyncio.get_event_loop()
if loop.is_running():
# These pragma: no cover comments are hints for test coverage tools to ignore these lines if needed,
# as testing both branches of is_running() can be environment-dependent.
if loop.is_running(): # pragma: no cover
loop.create_task(self.check_doreboot_file(application))
else:
else: # pragma: no cover
asyncio.run(self.check_doreboot_file(application))
application.run_polling()