Files
cyclop/telegram_helper.py
T

167 lines
7.9 KiB
Python

import os
import logging
import sys
import asyncio
import time
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes, CallbackQueryHandler
from browse_command import browse_command, button_callback
class TelegramHelper:
CLAUDE_REBOOT_TARGET = 'claude'
HTML_QUOTE_BLOCK_START = '<blockquote expandable><b>Thinking...</b>'
HTML_QUOTE_BLOCK_END = '</blockquote>'
DEFAULT_REBOOT_CLAUDE_FILE = '.reboot_claude'
DEFAULT_REBOOT_FILE = '.doreboot'
def __init__(self, bot, reboot_claude_file_path: str | None = None, reboot_file_path: str | None = None):
self.bot = bot
self.telegram_bot_token = os.getenv('TELEGRAM_BOT_TOKEN')
self.start_time = time.time()
self.reboot_claude_file = reboot_claude_file_path or self.DEFAULT_REBOOT_CLAUDE_FILE
self.reboot_file = reboot_file_path or self.DEFAULT_REBOOT_FILE
# --- Start Command ---
async def _start_logic(self) -> str: # New logic method
await self.bot.start()
return "Hello! I'm your AI assistant. How can I help you today?"
async def start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: # Modified
response_message = await self._start_logic()
await update.message.reply_text(response_message)
# --- Clear Command ---
async def _clear_logic(self, user_id: int) -> str: # New logic method
self.bot.clear_conversation_history(user_id)
return "Conversation history cleared. Let's start fresh!"
async def clear(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: # Modified
user_id = update.effective_user.id
response_message = await self._clear_logic(user_id) # Await was missing for async consistency, though _clear_logic is not async yet, it calls a sync method.
# For consistency with other _logic methods, making it async.
# --- Status Command ---
async def _status_logic(self) -> str: # New logic method
return await self.bot.get_bot_status()
async def status(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: # Modified
response_message = await self._status_logic()
await update.message.reply_text(response_message)
# --- Switch Command ---
async def _switch_logic(self) -> str: # New logic method
if hasattr(self.bot, 'switch_model'):
return await self.bot.switch_model()
else:
return "Model switching is not supported for this bot."
async def switch(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: # Modified
response_message = await self._switch_logic()
await update.message.reply_text(response_message)
async def update_status_message(self, context: ContextTypes.DEFAULT_TYPE, chat_id: int, message_id: int, status: str):
keyboard = [
[InlineKeyboardButton("Abort", callback_data='abort')]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await context.bot.edit_message_text(
chat_id=chat_id,
message_id=message_id,
text=f"Current status: {status}",
reply_markup=reply_markup
)
async def handle_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
try:
user_id = update.effective_user.id
user_message = update.message.text
logging.info(f"Message from user {user_id}: {user_message}")
status_message_obj = await update.message.reply_text("Processing your request...", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("Abort", callback_data='abort')]]))
self.bot.set_processing_status(user_id, status_message_obj.message_id)
response = await self.bot.handle_message(user_id, user_message)
await context.bot.delete_message(chat_id=update.effective_chat.id, message_id=status_message_obj.message_id)
self.bot.clear_processing_status(user_id)
response = response.replace("<think>", self.HTML_QUOTE_BLOCK_START).replace("</think>", self.HTML_QUOTE_BLOCK_END)
if len(response) > 4096:
chunks = [response[i:i + 4096] for i in range(0, len(response), 4096)]
for chunk in chunks:
await update.message.reply_text(chunk)
await asyncio.sleep(0.1)
else:
await update.message.reply_text(response)
except Exception as e:
logging.error(f"An error occurred: {str(e)}")
await update.message.reply_text("Sorry, an error occurred while processing your request.")
async def abort_processing(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
query = update.callback_query
await query.answer()
user_id = query.from_user.id
result = await self.bot.abort_processing(user_id)
await query.edit_message_text(text=result)
async def reboot(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
user_message = update.message.text.split()
if len(user_message) > 1 and user_message[1].lower() == self.CLAUDE_REBOOT_TARGET:
open(self.reboot_claude_file, 'w').close()
if update:
await update.message.reply_text("Rebooting the bot...")
logging.info("Received reboot command. Exiting process...")
reboot_f_path = self.reboot_file
if not os.path.exists(reboot_f_path):
with open(reboot_f_path, 'w') as f:
# If update is None (e.g. called programmatically without a Telegram context for reboot),
# we should handle this. For now, assuming update is present if this handler is called by Telegram.
# Testability of this part needs care due to sys.exit()
chat_id_to_write = str(update.effective_chat.id) if update and update.effective_chat else ""
f.write(chat_id_to_write)
sys.exit(0)
async def check_doreboot_file(self, application: Application):
reboot_f_path = self.reboot_file
if os.path.exists(reboot_f_path):
with open(reboot_f_path, 'r') as f:
chat_id = f.read().strip()
if chat_id:
try:
await application.bot.send_message(chat_id=chat_id, text="The application has finished initializing.")
except Exception as e:
logging.error(f"Failed to send reboot notification to chat_id {chat_id}: {e}")
os.remove(reboot_f_path)
async def browse(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
await browse_command(update, context, self.bot)
def run(self):
application = Application.builder().token(self.telegram_bot_token).build()
application.add_handler(CommandHandler("start", self.start))
application.add_handler(CommandHandler("clear", self.clear))
application.add_handler(CommandHandler("switch", self.switch))
application.add_handler(CommandHandler("status", self.status))
application.add_handler(CommandHandler("reboot", self.reboot))
application.add_handler(CommandHandler("browse", self.browse))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message))
application.add_handler(CallbackQueryHandler(self.abort_processing, pattern='^abort$'))
application.add_handler(CallbackQueryHandler(button_callback, pattern='^(browse|file):'))
logging.info("Bot is running...")
loop = asyncio.get_event_loop()
if loop.is_running():
loop.create_task(self.check_doreboot_file(application))
else:
asyncio.run(self.check_doreboot_file(application)) # Fallback if loop not running (e.g. tests)
application.run_polling()