diff --git a/telegram_helper.py b/telegram_helper.py
index 3c73328..f54e117 100644
--- a/telegram_helper.py
+++ b/telegram_helper.py
@@ -3,59 +3,68 @@ import logging
import sys
import asyncio
import time
+from typing import TypedDict, Union, TypeAlias
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes, CallbackQueryHandler
from browse_command import browse_command, button_callback
+class MessageHandlerLogicResult(TypedDict):
+ success: bool
+ response_text: Union[str, None]
+ error_message: Union[str, None]
+
+LogicResult: TypeAlias = MessageHandlerLogicResult
+
class TelegramHelper:
CLAUDE_REBOOT_TARGET = 'claude'
HTML_QUOTE_BLOCK_START = '
Thinking...'
HTML_QUOTE_BLOCK_END = '
'
DEFAULT_REBOOT_CLAUDE_FILE = '.reboot_claude'
DEFAULT_REBOOT_FILE = '.doreboot'
+ CHUNK_MESSAGE_SLEEP_DURATION = 0.1
- def __init__(self, bot, reboot_claude_file_path: str | None = None, reboot_file_path: str | None = None):
+ def __init__(self, bot,
+ reboot_claude_file_path: str | None = None,
+ reboot_file_path: str | None = None,
+ chunk_message_sleep_duration: float | None = None):
self.bot = bot
self.telegram_bot_token = os.getenv('TELEGRAM_BOT_TOKEN')
self.start_time = time.time()
self.reboot_claude_file = reboot_claude_file_path or self.DEFAULT_REBOOT_CLAUDE_FILE
self.reboot_file = reboot_file_path or self.DEFAULT_REBOOT_FILE
+ self.chunk_message_sleep_duration = chunk_message_sleep_duration if chunk_message_sleep_duration is not None else self.CHUNK_MESSAGE_SLEEP_DURATION
- # --- Start Command ---
- async def _start_logic(self) -> str: # New logic method
+ async def _start_logic(self) -> str:
await self.bot.start()
return "Hello! I'm your AI assistant. How can I help you today?"
- async def start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: # Modified
+ async def start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
response_message = await self._start_logic()
await update.message.reply_text(response_message)
- # --- Clear Command ---
- async def _clear_logic(self, user_id: int) -> str: # New logic method
+ async def _clear_logic(self, user_id: int) -> str:
self.bot.clear_conversation_history(user_id)
return "Conversation history cleared. Let's start fresh!"
- async def clear(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: # Modified
+ async def clear(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
user_id = update.effective_user.id
response_message = await self._clear_logic(user_id)
await update.message.reply_text(response_message)
- # --- Status Command ---
- async def _status_logic(self) -> str: # New logic method
+ async def _status_logic(self) -> str:
return await self.bot.get_bot_status()
- async def status(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: # Modified
+ async def status(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
response_message = await self._status_logic()
await update.message.reply_text(response_message)
- # --- Switch Command ---
- async def _switch_logic(self) -> str: # New logic method
+ async def _switch_logic(self) -> str:
if hasattr(self.bot, 'switch_model'):
return await self.bot.switch_model()
else:
return "Model switching is not supported for this bot."
- async def switch(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: # Modified
+ async def switch(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
response_message = await self._switch_logic()
await update.message.reply_text(response_message)
@@ -71,80 +80,136 @@ class TelegramHelper:
reply_markup=reply_markup
)
- async def handle_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
+ async def _handle_message_logic(self, user_id: int, user_message: str) -> LogicResult:
try:
- user_id = update.effective_user.id
- user_message = update.message.text
+ response = await self.bot.handle_message(user_id, user_message)
+ processed_response = response.replace("", self.HTML_QUOTE_BLOCK_START).replace("", self.HTML_QUOTE_BLOCK_END)
+ return LogicResult(success=True, response_text=processed_response, error_message=None)
+ except Exception as e:
+ logging.error(f"Error in _handle_message_logic for user {user_id}: {str(e)}")
+ return LogicResult(success=False, response_text=None, error_message=str(e))
- logging.info(f"Message from user {user_id}: {user_message}")
+ async def handle_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
+ user_id = update.effective_user.id
+ user_message = update.message.text
+ chat_id = update.effective_chat.id
+ status_message_obj = None
- status_message_obj = await update.message.reply_text("Processing your request...", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("Abort", callback_data='abort')]]))
+ try:
+ status_message_obj = await update.message.reply_text(
+ "Processing your request...",
+ reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("Abort", callback_data='abort')]])
+ )
self.bot.set_processing_status(user_id, status_message_obj.message_id)
- response = await self.bot.handle_message(user_id, user_message)
+ logic_result = await self._handle_message_logic(user_id, user_message)
- await context.bot.delete_message(chat_id=update.effective_chat.id, message_id=status_message_obj.message_id)
+ if status_message_obj:
+ try:
+ await context.bot.delete_message(chat_id=chat_id, message_id=status_message_obj.message_id)
+ except Exception as e_del:
+ logging.warning(f"Failed to delete status message: {e_del}")
self.bot.clear_processing_status(user_id)
- response = response.replace("", self.HTML_QUOTE_BLOCK_START).replace("", self.HTML_QUOTE_BLOCK_END)
-
- if len(response) > 4096:
- chunks = [response[i:i + 4096] for i in range(0, len(response), 4096)]
- for chunk in chunks:
- await update.message.reply_text(chunk)
- await asyncio.sleep(0.1)
+ if logic_result["success"]:
+ response_text = logic_result["response_text"]
+ if response_text:
+ if len(response_text) > 4096:
+ chunks = [response_text[i:i + 4096] for i in range(0, len(response_text), 4096)]
+ for chunk in chunks:
+ await update.message.reply_text(chunk)
+ await asyncio.sleep(self.chunk_message_sleep_duration)
+ else:
+ await update.message.reply_text(response_text)
+ else:
+ logging.warning("Successful logic result but no response text.")
+ await update.message.reply_text("Something went unexpectedly well, but I have nothing to say.")
else:
- await update.message.reply_text(response)
+ await update.message.reply_text("Sorry, an error occurred while processing your request.")
except Exception as e:
- logging.error(f"An error occurred: {str(e)}")
- await update.message.reply_text("Sorry, an error occurred while processing your request.")
+ logging.error(f"Outer error in handle_message for user {user_id}: {str(e)}")
+ if status_message_obj and self.bot.processing_status.get(user_id):
+ self.bot.clear_processing_status(user_id)
+ try:
+ await update.message.reply_text("Sorry, an unexpected error occurred with the bot.")
+ except Exception as e_reply:
+ logging.error(f"Failed to send error reply: {e_reply}")
- # --- Abort Processing (Callback) ---
- async def _abort_processing_logic(self, user_id: int) -> str: # New logic method
+ async def _abort_processing_logic(self, user_id: int) -> str:
return await self.bot.abort_processing(user_id)
- async def abort_processing(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: # Modified
+ async def abort_processing(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
query = update.callback_query
- await query.answer() # Telegram specific interaction
-
+ await query.answer()
user_id = query.from_user.id
- response_text = await self._abort_processing_logic(user_id) # Call logic method
-
- await query.edit_message_text(text=response_text) # Telegram specific interaction
+ response_text = await self._abort_processing_logic(user_id)
+ await query.edit_message_text(text=response_text)
async def reboot(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
- user_message = update.message.text.split()
- if len(user_message) > 1 and user_message[1].lower() == self.CLAUDE_REBOOT_TARGET:
- open(self.reboot_claude_file, 'w').close()
+ user_message_parts = update.message.text.split()
+ chat_id_to_write = str(update.effective_chat.id) if update and update.effective_chat else ""
+
+ self._reboot_logic(user_message_parts, chat_id_to_write) # Call to new logic method
if update:
await update.message.reply_text("Rebooting the bot...")
logging.info("Received reboot command. Exiting process...")
-
- reboot_f_path = self.reboot_file
- if not os.path.exists(reboot_f_path):
- with open(reboot_f_path, 'w') as f:
- chat_id_to_write = str(update.effective_chat.id) if update and update.effective_chat else ""
- f.write(chat_id_to_write)
- sys.exit(0)
+ sys.exit(0) # This will be hard to test directly
- async def check_doreboot_file(self, application: Application):
- reboot_f_path = self.reboot_file
- if os.path.exists(reboot_f_path):
- with open(reboot_f_path, 'r') as f:
- chat_id = f.read().strip()
- if chat_id:
- try:
- await application.bot.send_message(chat_id=chat_id, text="The application has finished initializing.")
- except Exception as e:
- logging.error(f"Failed to send reboot notification to chat_id {chat_id}: {e}")
- os.remove(reboot_f_path)
+ # New internal logic method for reboot preparations
+ def _reboot_logic(self, user_message_parts: list[str], chat_id_to_write: str) -> None:
+ if len(user_message_parts) > 1 and user_message_parts[1].lower() == self.CLAUDE_REBOOT_TARGET:
+ try:
+ with open(self.reboot_claude_file, 'w') as f:
+ f.write("") # Ensure file is created/truncated
+ logging.info(f"Created Claude reboot file: {self.reboot_claude_file}")
+ except IOError as e:
+ logging.error(f"Failed to create Claude reboot file {self.reboot_claude_file}: {e}")
+
+ if not os.path.exists(self.reboot_file):
+ try:
+ with open(self.reboot_file, 'w') as f:
+ f.write(chat_id_to_write)
+ logging.info(f"Created main reboot file: {self.reboot_file}")
+ except IOError as e:
+ logging.error(f"Failed to create main reboot file {self.reboot_file}: {e}")
+ else:
+ logging.info(f"Main reboot file {self.reboot_file} already exists.")
+
+ async def _check_doreboot_file_logic(self) -> str | None:
+ """Checks for the reboot file and returns the chat_id if found, then removes the file."""
+ if os.path.exists(self.reboot_file):
+ chat_id = None
+ try:
+ with open(self.reboot_file, 'r') as f:
+ chat_id = f.read().strip()
+ os.remove(self.reboot_file)
+ logging.info(f"Removed reboot file: {self.reboot_file}")
+ return chat_id
+ except IOError as e:
+ logging.error(f"Error processing reboot file {self.reboot_file}: {e}")
+ # If we read chat_id but failed to remove, still return chat_id to attempt notification
+ if chat_id is not None:
+ return chat_id
+ return None
+
+ async def check_doreboot_file(self, application: Application) -> None:
+ """Checks for reboot file and sends notification if applicable."""
+ chat_id = await self._check_doreboot_file_logic()
+ if chat_id:
+ try:
+ await application.bot.send_message(chat_id=chat_id, text="The application has finished initializing.")
+ logging.info(f"Sent reboot notification to chat_id: {chat_id}")
+ except Exception as e:
+ logging.error(f"Failed to send reboot notification to chat_id {chat_id}: {e}")
async def browse(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
+ # For testing TelegramHelper, browse_command will be mocked or its effects on self.bot asserted.
await browse_command(update, context, self.bot)
def run(self):
+ # Consider allowing injection of a pre-built application for advanced test cases.
application = Application.builder().token(self.telegram_bot_token).build()
application.add_handler(CommandHandler("start", self.start))
@@ -160,9 +225,11 @@ class TelegramHelper:
logging.info("Bot is running...")
loop = asyncio.get_event_loop()
- if loop.is_running():
+ # These pragma: no cover comments are hints for test coverage tools to ignore these lines if needed,
+ # as testing both branches of is_running() can be environment-dependent.
+ if loop.is_running(): # pragma: no cover
loop.create_task(self.check_doreboot_file(application))
- else:
+ else: # pragma: no cover
asyncio.run(self.check_doreboot_file(application))
application.run_polling()