import json import os import importlib import inspect import tempfile import base64 import logging from telegram import Update from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes from openai import OpenAI from dotenv import load_dotenv from tools.base_tool import BaseTool # Load environment variables load_dotenv() client = OpenAI() GPT_4O = "gpt-4o" GPT_4O_MINI = "gpt-4o-mini" class StringFilter(logging.Filter): def __init__(self, strings_to_filter): super().__init__() self.strings_to_filter = strings_to_filter def filter(self, record): return not any(string in record.getMessage() for string in self.strings_to_filter) unwanted_strings = ['unwanted_string1', 'unwanted_string2'] # Add other strings you want to filter out here # Set up logging to console and file logging.basicConfig(level=logging.INFO, handlers=[ logging.StreamHandler(), logging.FileHandler('logs/output.log', mode='a') ]) logging.getLogger().addFilter(StringFilter(unwanted_strings)) # Set up Telegram bot TELEGRAM_BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN') # Load system prompt with open("prompts/developer_prompt.txt", "r") as file: system_prompt = file.read().strip() # Dictionary to store conversation history for each user conversation_history = {} # Dictionary to store the last image file for each user user_images = {} # Load tools tools = [] tools_dir = os.path.join(os.path.dirname(__file__), 'tools') for filename in os.listdir(tools_dir): if filename.endswith('.py') and filename != '__init__.py' and filename != 'base_tool.py': module_name = f'tools.{filename[:-3]}' module = importlib.import_module(module_name) for name, obj in inspect.getmembers(module): if inspect.isclass(obj) and issubclass(obj, BaseTool) and obj != BaseTool: tools.append(obj()) # Collect all function definitions functions = [] for tool in tools: functions.extend(tool.get_functions()) async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: logging.info("Bot started") await update.message.reply_text("Hello! I'm your AI assistant. How can I help you today? You can send me images and then ask questions about them.") async def clear(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: user_id = update.effective_user.id if user_id in conversation_history: del conversation_history[user_id] if user_id in user_images: os.remove(user_images[user_id]) del user_images[user_id] logging.info(f"Cleared conversation history and image for user {user_id}") await update.message.reply_text("Conversation history and image cleared. Let's start fresh!") async def handle_image(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: user_id = update.effective_user.id # Get the largest available photo photo = max(update.message.photo, key=lambda x: x.file_size) # Download the photo photo_file = await context.bot.get_file(photo.file_id) # Create a temporary file to store the image with tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') as temp_file: await photo_file.download_to_drive(custom_path=temp_file.name) user_images[user_id] = temp_file.name logging.info(f"Received image from user {user_id}") await update.message.reply_text("I've received your image. What would you like to know about it?") async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: try: user_id = update.effective_user.id user_message = update.message.text logging.info(f"Message from user {user_id}: {user_message}") # Initialize conversation history for new users if user_id not in conversation_history: conversation_history[user_id] = [] # Add user message to conversation history conversation_history[user_id].append({"role": "user", "content": user_message}) # Prepare messages for OpenAI API messages = [{"role": "system", "content": system_prompt}] + conversation_history[user_id] # Check if there's an image to process if user_id in user_images: with open(user_images[user_id], "rb") as image_file: response = client.chat.completions.create( model=GPT_4O_MINI, messages=[ { "role": "user", "content": [ {"type": "text", "text": user_message}, { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{base64.b64encode(image_file.read()).decode('utf-8')}" } }, ], } ], max_tokens=16384 ) # Remove the temporary image file os.remove(user_images[user_id]) del user_images[user_id] else: # Call OpenAI API for inference (text-only) response = get_chat_response(client, messages, 4096, GPT_4O) # Extract the assistant's reply assistant_message = response.choices[0].message toolUseCount = 0 if hasattr(assistant_message, 'function_call') and assistant_message.function_call: while hasattr(assistant_message, 'function_call') and assistant_message.function_call and toolUseCount < 50: # Todo: put amount in env tool_response = call_tool(assistant_message.function_call, messages) conversation_history[user_id].append({"role": "function", "name": assistant_message.function_call.name, "content": json.dumps(tool_response)}) messages.append({ "role": "function", "name": assistant_message.function_call.name, "content": json.dumps(tool_response) }) # Call API again to get the final response assistant_message = get_chat_response(client, messages, 4096, GPT_4O).choices[0].message if not hasattr(assistant_message, 'function_call') or not assistant_message.function_call: assistant_reply = assistant_message.content conversation_history[user_id].append({"role": "assistant", "content": assistant_reply}) else: assistant_reply = assistant_message.content # Add assistant's reply to conversation history conversation_history[user_id].append({"role": "assistant", "content": assistant_reply}) # Trim conversation history if it gets too long (e.g., keep last 10 messages) if len(conversation_history[user_id]) > 10: conversation_history[user_id] = conversation_history[user_id][-10:] # Send the reply back to the user await update.message.reply_text(assistant_reply) except Exception as e: logging.error(f"An error occurred: {str(e)}") await update.message.reply_text("Sorry, an error occurred while processing your request.") def call_tool(function_call, messages): # Execute the function function_name = function_call.name function_args = function_call.arguments for tool in tools: if function_name in [f["name"] for f in tool.get_functions()]: return tool.execute(function_name, **eval(function_args)) def get_chat_response(client, messages, max_tokens, model): response = client.chat.completions.create( model=model, messages=messages, functions=functions, function_call="auto", max_tokens=max_tokens ) return response def main() -> None: # Create the Application and pass it your bot's token application = Application.builder().token(TELEGRAM_BOT_TOKEN).build() # Add handlers application.add_handler(CommandHandler("start", start)) application.add_handler(CommandHandler("clear", clear)) application.add_handler(MessageHandler(filters.PHOTO, handle_image)) application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message)) # Start the Bot logging.info("Bot is running...") application.run_polling() if __name__ == '__main__': main()