Files
cyclop/telegram_helper.py
T

127 lines
5.8 KiB
Python

import os
import logging
import sys
import asyncio
import time
import git
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes, CallbackQueryHandler
from browse_command import browse_command, button_callback
class TelegramHelper:
def __init__(self, bot):
self.bot = bot
self.telegram_bot_token = os.getenv('TELEGRAM_BOT_TOKEN')
self.repo = git.Repo(".")
self.start_time = time.time()
async def start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
await self.bot.start()
await update.message.reply_text(
"Hello! I'm your AI assistant. How can I help you today?"
)
async def clear(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
user_id = update.effective_user.id
await self.bot.clear(user_id)
await update.message.reply_text("Conversation history cleared. Let's start fresh!")
async def status(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
status_message = await self.bot.status()
await update.message.reply_text(status_message)
async def switch(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if hasattr(self.bot, 'switch_model'):
status_message = await self.bot.switch_model()
await update.message.reply_text(status_message)
else:
await update.message.reply_text("Model switching is not supported for this bot.")
async def update_status_message(self, context: ContextTypes.DEFAULT_TYPE, chat_id: int, message_id: int, status: str):
keyboard = [
[InlineKeyboardButton("Abort", callback_data='abort')]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await context.bot.edit_message_text(
chat_id=chat_id,
message_id=message_id,
text=f"Current status: {status}",
reply_markup=reply_markup
)
async def handle_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
try:
user_id = update.effective_user.id
user_message = update.message.text
logging.info(f"Message from user {user_id}: {user_message}")
status_message = await update.message.reply_text("Processing your request...", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("Abort", callback_data='abort')]]))
self.bot.processing_status[user_id] = {"processing": True, "message_id": status_message.message_id}
response = await self.bot.handle_message(user_id, user_message)
await context.bot.delete_message(chat_id=update.effective_chat.id, message_id=status_message.message_id)
del self.bot.processing_status[user_id]
await update.message.reply_text(response)
except Exception as e:
logging.error(f"An error occurred: {str(e)}")
await update.message.reply_text("Sorry, an error occurred while processing your request.")
async def abort_processing(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
query = update.callback_query
await query.answer()
user_id = query.from_user.id
result = await self.bot.abort_processing(user_id)
await query.edit_message_text(text=result)
async def reboot(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
user_message = update.message.text.split() # Split the message to check for 'claude'
if len(user_message) > 1 and user_message[1].lower() == 'claude':
open('./.reboot_claude', 'w').close() # Create an empty file
if update:
await update.message.reply_text("Rebooting the bot...")
logging.info("Received reboot command. Exiting process...")
reboot_file_path = "./.doreboot"
if not os.path.exists(reboot_file_path):
with open(reboot_file_path, 'w') as f:
f.write(str(update.effective_chat.id) if update else "")
sys.exit(0)
async def check_doreboot_file(self, application: Application):
reboot_file_path = "./.doreboot"
if os.path.exists(reboot_file_path):
with open(reboot_file_path, 'r') as f:
chat_id = f.read().strip()
if chat_id:
await application.bot.send_message(chat_id=chat_id, text="The application has finished initializing.")
os.remove(reboot_file_path)
async def browse(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
await browse_command(update, context, self.bot)
def run(self):
application = Application.builder().token(self.telegram_bot_token).build()
application.add_handler(CommandHandler("start", self.start))
application.add_handler(CommandHandler("clear", self.clear))
application.add_handler(CommandHandler("switch", self.switch))
application.add_handler(CommandHandler("status", self.status))
application.add_handler(CommandHandler("reboot", self.reboot))
application.add_handler(CommandHandler("browse", self.browse))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message))
application.add_handler(CallbackQueryHandler(self.abort_processing, pattern='^abort$'))
application.add_handler(CallbackQueryHandler(button_callback, pattern='^(browse|file):'))
logging.info("Bot is running...")
# Check for .doreboot file and send message if it exists
asyncio.get_event_loop().create_task(self.check_doreboot_file(application))
# Commenting out the commit checking task
# asyncio.get_event_loop().create_task(self.check_for_new_commits())
application.run_polling()