diff --git a/telegram_helper.py b/telegram_helper.py index f54e117..2bdbae6 100644 --- a/telegram_helper.py +++ b/telegram_helper.py @@ -3,7 +3,7 @@ import logging import sys import asyncio import time -from typing import TypedDict, Union, TypeAlias +from typing import TypedDict, Union, TypeAlias, List # Added List for type hint from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes, CallbackQueryHandler from browse_command import browse_command, button_callback @@ -146,70 +146,86 @@ class TelegramHelper: response_text = await self._abort_processing_logic(user_id) await query.edit_message_text(text=response_text) - async def reboot(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - user_message_parts = update.message.text.split() - chat_id_to_write = str(update.effective_chat.id) if update and update.effective_chat else "" - - self._reboot_logic(user_message_parts, chat_id_to_write) # Call to new logic method - - if update: - await update.message.reply_text("Rebooting the bot...") - logging.info("Received reboot command. Exiting process...") - sys.exit(0) # This will be hard to test directly - - # New internal logic method for reboot preparations - def _reboot_logic(self, user_message_parts: list[str], chat_id_to_write: str) -> None: + # --- Reboot Command --- + def _reboot_logic(self, user_message_parts: List[str], chat_id_to_write: str) -> None: + """Handles the logic for creating reboot files.""" if len(user_message_parts) > 1 and user_message_parts[1].lower() == self.CLAUDE_REBOOT_TARGET: try: with open(self.reboot_claude_file, 'w') as f: - f.write("") # Ensure file is created/truncated - logging.info(f"Created Claude reboot file: {self.reboot_claude_file}") + f.write("") # Create/truncate the file + logging.info(f"Created/truncated Claude reboot file: {self.reboot_claude_file}") except IOError as e: - logging.error(f"Failed to create Claude reboot file {self.reboot_claude_file}: {e}") + logging.error(f"Failed to create/truncate Claude reboot file {self.reboot_claude_file}: {e}") + # Create the main reboot file if it doesn't exist if not os.path.exists(self.reboot_file): try: with open(self.reboot_file, 'w') as f: f.write(chat_id_to_write) - logging.info(f"Created main reboot file: {self.reboot_file}") + logging.info(f"Created main reboot file: {self.reboot_file} with chat_id.") except IOError as e: logging.error(f"Failed to create main reboot file {self.reboot_file}: {e}") else: - logging.info(f"Main reboot file {self.reboot_file} already exists.") + logging.info(f"Main reboot file {self.reboot_file} already exists. Not overwriting chat_id.") - async def _check_doreboot_file_logic(self) -> str | None: - """Checks for the reboot file and returns the chat_id if found, then removes the file.""" + async def reboot(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + """Handles the /reboot command, triggers file creation and exits.""" + user_message_parts = update.message.text.split() + chat_id_str = str(update.effective_chat.id) if update and update.effective_chat else "" + + self._reboot_logic(user_message_parts, chat_id_str) + + if update: + try: + await update.message.reply_text("Rebooting the bot...") + except Exception as e_reply: + logging.error(f"Failed to send reboot reply: {e_reply}") + + logging.info("Initiating shutdown for reboot...") + sys.exit(0) # This part is not directly testable for completion in unit tests + + # --- Check Doreboot File --- + async def _check_doreboot_file_logic(self) -> Union[str, None]: + """Checks for the reboot file, reads chat_id, removes file, and returns chat_id.""" if os.path.exists(self.reboot_file): chat_id = None try: with open(self.reboot_file, 'r') as f: chat_id = f.read().strip() - os.remove(self.reboot_file) - logging.info(f"Removed reboot file: {self.reboot_file}") + # Attempt to remove the file after reading + try: + os.remove(self.reboot_file) + logging.info(f"Successfully read and removed reboot file: {self.reboot_file}") + except OSError as e_remove: + logging.error(f"Failed to remove reboot file {self.reboot_file} after reading: {e_remove}") + # Still return chat_id if read was successful, to attempt notification return chat_id - except IOError as e: - logging.error(f"Error processing reboot file {self.reboot_file}: {e}") - # If we read chat_id but failed to remove, still return chat_id to attempt notification - if chat_id is not None: - return chat_id - return None + except IOError as e_read: + logging.error(f"Error reading reboot file {self.reboot_file}: {e_read}") + # If reading failed, attempt to remove anyway if it exists, to prevent stale files + if os.path.exists(self.reboot_file): + try: + os.remove(self.reboot_file) + logging.warning(f"Removed reboot file {self.reboot_file} after a read error.") + except OSError as e_remove_after_fail: + logging.error(f"Failed to remove reboot file {self.reboot_file} even after a read error: {e_remove_after_fail}") + return None # Reading failed + return None # File does not exist async def check_doreboot_file(self, application: Application) -> None: - """Checks for reboot file and sends notification if applicable.""" + """Checks for reboot file using logic method and sends notification if applicable.""" chat_id = await self._check_doreboot_file_logic() if chat_id: try: await application.bot.send_message(chat_id=chat_id, text="The application has finished initializing.") - logging.info(f"Sent reboot notification to chat_id: {chat_id}") + logging.info(f"Sent reboot initialization notification to chat_id: {chat_id}") except Exception as e: - logging.error(f"Failed to send reboot notification to chat_id {chat_id}: {e}") + logging.error(f"Failed to send reboot initialization notification to chat_id {chat_id}: {e}") async def browse(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - # For testing TelegramHelper, browse_command will be mocked or its effects on self.bot asserted. await browse_command(update, context, self.bot) def run(self): - # Consider allowing injection of a pre-built application for advanced test cases. application = Application.builder().token(self.telegram_bot_token).build() application.add_handler(CommandHandler("start", self.start)) @@ -225,8 +241,6 @@ class TelegramHelper: logging.info("Bot is running...") loop = asyncio.get_event_loop() - # These pragma: no cover comments are hints for test coverage tools to ignore these lines if needed, - # as testing both branches of is_running() can be environment-dependent. if loop.is_running(): # pragma: no cover loop.create_task(self.check_doreboot_file(application)) else: # pragma: no cover