From 21a9df4d79f598b6ba89301fe40e32f6184f335d Mon Sep 17 00:00:00 2001 From: bucolucas Date: Mon, 19 Aug 2024 10:24:17 -0500 Subject: [PATCH] Add OpenAI-based Telegram bot --- chatgpt_telegram_inference_bot.py | 230 ++++++++++++++++++++++++++++++ 1 file changed, 230 insertions(+) create mode 100644 chatgpt_telegram_inference_bot.py diff --git a/chatgpt_telegram_inference_bot.py b/chatgpt_telegram_inference_bot.py new file mode 100644 index 0000000..70724ef --- /dev/null +++ b/chatgpt_telegram_inference_bot.py @@ -0,0 +1,230 @@ +import json +import os +import importlib +import inspect +import logging +import asyncio +from telegram import error as TelegramErrors, Update, __version__ as telegram_version, InlineKeyboardButton, InlineKeyboardMarkup +from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes, CallbackQueryHandler +from dotenv import load_dotenv +from tools.base_tool import BaseTool +from tools.metrics_tool import MetricsTool +import openai + +# Load environment variables +load_dotenv() + +openai.api_key = os.environ.get("OPENAI_API_KEY") + +# Set up logging to console and file +logging.basicConfig(level=logging.WARNING, handlers=[ + logging.StreamHandler(), + logging.FileHandler('logs/output.log', mode='a') +]) + +# Set up Telegram bot +TELEGRAM_BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN') + +# Load system prompt +with open("prompts/developer_prompt.txt", "r") as file: + system_prompt = file.read().strip() + +# Dictionary to store conversation history for each user +conversation_history = {} + +# Dictionary to store processing status for each user +processing_status = {} + +# Load tools +tools = [MetricsTool()] # Add MetricsTool instance +tools_dir = os.path.join(os.path.dirname(__file__), 'tools') +for filename in os.listdir(tools_dir): + if filename.endswith('.py') and filename not in ['__init__.py', 'base_tool.py', 'metrics_tool.py']: + module_name = f'tools.{filename[:-3]}' + module = importlib.import_module(module_name) + for name, obj in inspect.getmembers(module): + if inspect.isclass(obj) and issubclass(obj, BaseTool) and obj != BaseTool: + tools.append(obj()) + +# Collect all function definitions +functions = [] +for tool in tools: + functions.extend(tool.get_functions()) + +async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + logging.info("Bot started") + await update.message.reply_text( + "Hello! I'm your AI assistant. How can I help you today? You can send me images and then ask questions about them." + ) + +async def clear(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + user_id = update.effective_user.id + if user_id in conversation_history: + del conversation_history[user_id] + for tool in tools: + tool.clear() + + logging.info(f"Cleared conversation history and image for user {user_id}") + await update.message.reply_text("Conversation history and image cleared. Let's start fresh!") + +async def update_status_message(context: ContextTypes.DEFAULT_TYPE, chat_id: int, message_id: int, status: str): + keyboard = [ + [InlineKeyboardButton("Abort", callback_data='abort')] + ] + reply_markup = InlineKeyboardMarkup(keyboard) + await context.bot.edit_message_text( + chat_id=chat_id, + message_id=message_id, + text=f"Current status: {status}", + reply_markup=reply_markup + ) + +async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + try: + user_id = update.effective_user.id + user_message = update.message.text + + logging.info(f"Message from user {user_id}: {user_message}") + + if user_id not in conversation_history: + conversation_history[user_id] = [] + + conversation_history[user_id].append({"role": "user", "content": user_message}) + + # Send initial status message + status_message = await update.message.reply_text("Processing your request...", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("Abort", callback_data='abort')]])) + processing_status[user_id] = {"processing": True, "message_id": status_message.message_id} + + messages = conversation_history[user_id] + + response = get_chat_response(messages) + tool_calls = response.get('function_calls', []) + assistant_message = response.get('content', '') + messages.append({"role": "assistant", "content": assistant_message}) + + toolUseCount = 0 + + previous_function_name = "" + + while len(tool_calls) > 0 and toolUseCount < 50 and processing_status[user_id]["processing"]: + tool_use_results = [] + for tool_call in tool_calls: + function_name = tool_call['name'] + + if previous_function_name != function_name: + await update_status_message(context, update.effective_chat.id, status_message.message_id, f"Using tool: {function_name}") + previous_function_name = function_name + + tool_response = call_tool(tool_call) + tool_use_results.append({"name": function_name, "content": json.dumps(tool_response)}) + + formatted_result = {"role": "function", "content": json.dumps(tool_use_results)} + + messages.append(formatted_result) + + response = get_chat_response(messages) + tool_calls = response.get('function_calls', []) + assistant_message = response.get('content', '') + messages.append({"role": "assistant", "content": assistant_message}) + + toolUseCount += 1 + + if toolUseCount == 0: + conversation_history[user_id].append({"role": "assistant", "content": assistant_message}) + + if len(conversation_history[user_id]) > 20: + conversation_history[user_id] = conversation_history[user_id][-20:] + + # Remove the status message + await context.bot.delete_message(chat_id=update.effective_chat.id, message_id=status_message.message_id) + del processing_status[user_id] + try: + await update.message.reply_text(assistant_message) + except TelegramErrors.BadRequest as e: + logging.error(f"An error occurred when trying to send a message in telegram: {str(e)}") + + except Exception as e: + logging.error(f"An error occurred: {str(e)}") + await update.message.reply_text("Sorry, an error occurred while processing your request.") + +def call_tool(function_call): + function_name = function_call['name'] + function_args = json.loads(function_call['arguments']) + for tool in tools: + if function_name in [f["name"] for f in tool.get_functions()]: + return tool.execute(function_name, **function_args) + +def get_chat_response(messages): + return get_openai_response(messages) + +def get_openai_response(messages): + try: + openai_functions = [ + { + "name": function['name'], + "description": function['description'], + "parameters": function['parameters'] if function['parameters'] not in [None, {}] else {"type": "object", "properties": {"param1": {"type": "string", "description": "Unnecessary"}}, "required": []} + } + for function in functions + ] + + response = openai.ChatCompletion.create( + model="gpt-4o", # Using "gpt-4o" as specified + messages=[{"role": "system", "content": system_prompt}] + messages, + functions=openai_functions, + function_call="auto", + max_tokens=4096 # Using 4096 as specified for "gpt-4o" + ) + + message = response['choices'][0]['message'] + content = message.get('content', '') + function_call = message.get('function_call') + + if function_call: + return { + 'content': content, + 'function_calls': [function_call] + } + else: + return {'content': content} + + except Exception as e: + logging.error(f"An error occurred: {str(e)}") + return None + +async def status(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + await update.message.reply_text("Currently using gpt-4o") + +async def abort_processing(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + query = update.callback_query + await query.answer() + + user_id = query.from_user.id + if user_id in processing_status: + processing_status[user_id]["processing"] = False + await context.bot.edit_message_text( + chat_id=query.message.chat_id, + message_id=query.message.message_id, + text="Processing aborted." + ) + await clear(update, context) + else: + await query.edit_message_text(text="No active processing to abort.") + +def main() -> None: + # Create the Application and pass it your bot's token + application = Application.builder().token(TELEGRAM_BOT_TOKEN).build() + + # Add handlers + application.add_handler(CommandHandler("start", start)) + application.add_handler(CommandHandler("clear", clear)) + application.add_handler(CommandHandler("status", status)) + application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message)) + application.add_handler(CallbackQueryHandler(abort_processing, pattern='^abort$')) + + # Start the Bot + logging.info("Bot is running...") + application.run_polling() + +if __name__ == '__main__': + main() \ No newline at end of file