Refactor: Separate logic for reboot and check_doreboot_file in TelegramHelper

This commit is contained in:
cyclop-bot
2025-06-02 16:41:02 -05:00
parent a78bbb6cc5
commit 0c1a0d1e5b
+50 -36
View File
@@ -3,7 +3,7 @@ import logging
import sys import sys
import asyncio import asyncio
import time import time
from typing import TypedDict, Union, TypeAlias from typing import TypedDict, Union, TypeAlias, List # Added List for type hint
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes, CallbackQueryHandler from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes, CallbackQueryHandler
from browse_command import browse_command, button_callback from browse_command import browse_command, button_callback
@@ -146,70 +146,86 @@ class TelegramHelper:
response_text = await self._abort_processing_logic(user_id) response_text = await self._abort_processing_logic(user_id)
await query.edit_message_text(text=response_text) await query.edit_message_text(text=response_text)
async def reboot(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: # --- Reboot Command ---
user_message_parts = update.message.text.split() def _reboot_logic(self, user_message_parts: List[str], chat_id_to_write: str) -> None:
chat_id_to_write = str(update.effective_chat.id) if update and update.effective_chat else "" """Handles the logic for creating reboot files."""
self._reboot_logic(user_message_parts, chat_id_to_write) # Call to new logic method
if update:
await update.message.reply_text("Rebooting the bot...")
logging.info("Received reboot command. Exiting process...")
sys.exit(0) # This will be hard to test directly
# New internal logic method for reboot preparations
def _reboot_logic(self, user_message_parts: list[str], chat_id_to_write: str) -> None:
if len(user_message_parts) > 1 and user_message_parts[1].lower() == self.CLAUDE_REBOOT_TARGET: if len(user_message_parts) > 1 and user_message_parts[1].lower() == self.CLAUDE_REBOOT_TARGET:
try: try:
with open(self.reboot_claude_file, 'w') as f: with open(self.reboot_claude_file, 'w') as f:
f.write("") # Ensure file is created/truncated f.write("") # Create/truncate the file
logging.info(f"Created Claude reboot file: {self.reboot_claude_file}") logging.info(f"Created/truncated Claude reboot file: {self.reboot_claude_file}")
except IOError as e: except IOError as e:
logging.error(f"Failed to create Claude reboot file {self.reboot_claude_file}: {e}") logging.error(f"Failed to create/truncate Claude reboot file {self.reboot_claude_file}: {e}")
# Create the main reboot file if it doesn't exist
if not os.path.exists(self.reboot_file): if not os.path.exists(self.reboot_file):
try: try:
with open(self.reboot_file, 'w') as f: with open(self.reboot_file, 'w') as f:
f.write(chat_id_to_write) f.write(chat_id_to_write)
logging.info(f"Created main reboot file: {self.reboot_file}") logging.info(f"Created main reboot file: {self.reboot_file} with chat_id.")
except IOError as e: except IOError as e:
logging.error(f"Failed to create main reboot file {self.reboot_file}: {e}") logging.error(f"Failed to create main reboot file {self.reboot_file}: {e}")
else: else:
logging.info(f"Main reboot file {self.reboot_file} already exists.") logging.info(f"Main reboot file {self.reboot_file} already exists. Not overwriting chat_id.")
async def _check_doreboot_file_logic(self) -> str | None: async def reboot(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Checks for the reboot file and returns the chat_id if found, then removes the file.""" """Handles the /reboot command, triggers file creation and exits."""
user_message_parts = update.message.text.split()
chat_id_str = str(update.effective_chat.id) if update and update.effective_chat else ""
self._reboot_logic(user_message_parts, chat_id_str)
if update:
try:
await update.message.reply_text("Rebooting the bot...")
except Exception as e_reply:
logging.error(f"Failed to send reboot reply: {e_reply}")
logging.info("Initiating shutdown for reboot...")
sys.exit(0) # This part is not directly testable for completion in unit tests
# --- Check Doreboot File ---
async def _check_doreboot_file_logic(self) -> Union[str, None]:
"""Checks for the reboot file, reads chat_id, removes file, and returns chat_id."""
if os.path.exists(self.reboot_file): if os.path.exists(self.reboot_file):
chat_id = None chat_id = None
try: try:
with open(self.reboot_file, 'r') as f: with open(self.reboot_file, 'r') as f:
chat_id = f.read().strip() chat_id = f.read().strip()
os.remove(self.reboot_file) # Attempt to remove the file after reading
logging.info(f"Removed reboot file: {self.reboot_file}") try:
os.remove(self.reboot_file)
logging.info(f"Successfully read and removed reboot file: {self.reboot_file}")
except OSError as e_remove:
logging.error(f"Failed to remove reboot file {self.reboot_file} after reading: {e_remove}")
# Still return chat_id if read was successful, to attempt notification
return chat_id return chat_id
except IOError as e: except IOError as e_read:
logging.error(f"Error processing reboot file {self.reboot_file}: {e}") logging.error(f"Error reading reboot file {self.reboot_file}: {e_read}")
# If we read chat_id but failed to remove, still return chat_id to attempt notification # If reading failed, attempt to remove anyway if it exists, to prevent stale files
if chat_id is not None: if os.path.exists(self.reboot_file):
return chat_id try:
return None os.remove(self.reboot_file)
logging.warning(f"Removed reboot file {self.reboot_file} after a read error.")
except OSError as e_remove_after_fail:
logging.error(f"Failed to remove reboot file {self.reboot_file} even after a read error: {e_remove_after_fail}")
return None # Reading failed
return None # File does not exist
async def check_doreboot_file(self, application: Application) -> None: async def check_doreboot_file(self, application: Application) -> None:
"""Checks for reboot file and sends notification if applicable.""" """Checks for reboot file using logic method and sends notification if applicable."""
chat_id = await self._check_doreboot_file_logic() chat_id = await self._check_doreboot_file_logic()
if chat_id: if chat_id:
try: try:
await application.bot.send_message(chat_id=chat_id, text="The application has finished initializing.") await application.bot.send_message(chat_id=chat_id, text="The application has finished initializing.")
logging.info(f"Sent reboot notification to chat_id: {chat_id}") logging.info(f"Sent reboot initialization notification to chat_id: {chat_id}")
except Exception as e: except Exception as e:
logging.error(f"Failed to send reboot notification to chat_id {chat_id}: {e}") logging.error(f"Failed to send reboot initialization notification to chat_id {chat_id}: {e}")
async def browse(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: async def browse(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
# For testing TelegramHelper, browse_command will be mocked or its effects on self.bot asserted.
await browse_command(update, context, self.bot) await browse_command(update, context, self.bot)
def run(self): def run(self):
# Consider allowing injection of a pre-built application for advanced test cases.
application = Application.builder().token(self.telegram_bot_token).build() application = Application.builder().token(self.telegram_bot_token).build()
application.add_handler(CommandHandler("start", self.start)) application.add_handler(CommandHandler("start", self.start))
@@ -225,8 +241,6 @@ class TelegramHelper:
logging.info("Bot is running...") logging.info("Bot is running...")
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
# These pragma: no cover comments are hints for test coverage tools to ignore these lines if needed,
# as testing both branches of is_running() can be environment-dependent.
if loop.is_running(): # pragma: no cover if loop.is_running(): # pragma: no cover
loop.create_task(self.check_doreboot_file(application)) loop.create_task(self.check_doreboot_file(application))
else: # pragma: no cover else: # pragma: no cover