Files
cyclop/telegram_helper.py
T

169 lines
7.7 KiB
Python

import os
import logging
import sys
import asyncio
import time
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes, CallbackQueryHandler
from browse_command import browse_command, button_callback
class TelegramHelper:
CLAUDE_REBOOT_TARGET = 'claude'
HTML_QUOTE_BLOCK_START = '<blockquote expandable><b>Thinking...</b>'
HTML_QUOTE_BLOCK_END = '</blockquote>'
DEFAULT_REBOOT_CLAUDE_FILE = '.reboot_claude'
DEFAULT_REBOOT_FILE = '.doreboot'
def __init__(self, bot, reboot_claude_file_path: str | None = None, reboot_file_path: str | None = None):
self.bot = bot
self.telegram_bot_token = os.getenv('TELEGRAM_BOT_TOKEN')
self.start_time = time.time()
self.reboot_claude_file = reboot_claude_file_path or self.DEFAULT_REBOOT_CLAUDE_FILE
self.reboot_file = reboot_file_path or self.DEFAULT_REBOOT_FILE
# --- Start Command ---
async def _start_logic(self) -> str: # New logic method
await self.bot.start()
return "Hello! I'm your AI assistant. How can I help you today?"
async def start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: # Modified
response_message = await self._start_logic()
await update.message.reply_text(response_message)
# --- Clear Command ---
async def _clear_logic(self, user_id: int) -> str: # New logic method
self.bot.clear_conversation_history(user_id)
return "Conversation history cleared. Let's start fresh!"
async def clear(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: # Modified
user_id = update.effective_user.id
response_message = await self._clear_logic(user_id)
await update.message.reply_text(response_message)
# --- Status Command ---
async def _status_logic(self) -> str: # New logic method
return await self.bot.get_bot_status()
async def status(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: # Modified
response_message = await self._status_logic()
await update.message.reply_text(response_message)
# --- Switch Command ---
async def _switch_logic(self) -> str: # New logic method
if hasattr(self.bot, 'switch_model'):
return await self.bot.switch_model()
else:
return "Model switching is not supported for this bot."
async def switch(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: # Modified
response_message = await self._switch_logic()
await update.message.reply_text(response_message)
async def update_status_message(self, context: ContextTypes.DEFAULT_TYPE, chat_id: int, message_id: int, status: str):
keyboard = [
[InlineKeyboardButton("Abort", callback_data='abort')]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await context.bot.edit_message_text(
chat_id=chat_id,
message_id=message_id,
text=f"Current status: {status}",
reply_markup=reply_markup
)
async def handle_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
try:
user_id = update.effective_user.id
user_message = update.message.text
logging.info(f"Message from user {user_id}: {user_message}")
status_message_obj = await update.message.reply_text("Processing your request...", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("Abort", callback_data='abort')]]))
self.bot.set_processing_status(user_id, status_message_obj.message_id)
response = await self.bot.handle_message(user_id, user_message)
await context.bot.delete_message(chat_id=update.effective_chat.id, message_id=status_message_obj.message_id)
self.bot.clear_processing_status(user_id)
response = response.replace("<think>", self.HTML_QUOTE_BLOCK_START).replace("</think>", self.HTML_QUOTE_BLOCK_END)
if len(response) > 4096:
chunks = [response[i:i + 4096] for i in range(0, len(response), 4096)]
for chunk in chunks:
await update.message.reply_text(chunk)
await asyncio.sleep(0.1)
else:
await update.message.reply_text(response)
except Exception as e:
logging.error(f"An error occurred: {str(e)}")
await update.message.reply_text("Sorry, an error occurred while processing your request.")
# --- Abort Processing (Callback) ---
async def _abort_processing_logic(self, user_id: int) -> str: # New logic method
return await self.bot.abort_processing(user_id)
async def abort_processing(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: # Modified
query = update.callback_query
await query.answer() # Telegram specific interaction
user_id = query.from_user.id
response_text = await self._abort_processing_logic(user_id) # Call logic method
await query.edit_message_text(text=response_text) # Telegram specific interaction
async def reboot(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
user_message = update.message.text.split()
if len(user_message) > 1 and user_message[1].lower() == self.CLAUDE_REBOOT_TARGET:
open(self.reboot_claude_file, 'w').close()
if update:
await update.message.reply_text("Rebooting the bot...")
logging.info("Received reboot command. Exiting process...")
reboot_f_path = self.reboot_file
if not os.path.exists(reboot_f_path):
with open(reboot_f_path, 'w') as f:
chat_id_to_write = str(update.effective_chat.id) if update and update.effective_chat else ""
f.write(chat_id_to_write)
sys.exit(0)
async def check_doreboot_file(self, application: Application):
reboot_f_path = self.reboot_file
if os.path.exists(reboot_f_path):
with open(reboot_f_path, 'r') as f:
chat_id = f.read().strip()
if chat_id:
try:
await application.bot.send_message(chat_id=chat_id, text="The application has finished initializing.")
except Exception as e:
logging.error(f"Failed to send reboot notification to chat_id {chat_id}: {e}")
os.remove(reboot_f_path)
async def browse(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
await browse_command(update, context, self.bot)
def run(self):
application = Application.builder().token(self.telegram_bot_token).build()
application.add_handler(CommandHandler("start", self.start))
application.add_handler(CommandHandler("clear", self.clear))
application.add_handler(CommandHandler("switch", self.switch))
application.add_handler(CommandHandler("status", self.status))
application.add_handler(CommandHandler("reboot", self.reboot))
application.add_handler(CommandHandler("browse", self.browse))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message))
application.add_handler(CallbackQueryHandler(self.abort_processing, pattern='^abort$'))
application.add_handler(CallbackQueryHandler(button_callback, pattern='^(browse|file):'))
logging.info("Bot is running...")
loop = asyncio.get_event_loop()
if loop.is_running():
loop.create_task(self.check_doreboot_file(application))
else:
asyncio.run(self.check_doreboot_file(application))
application.run_polling()