Files
cyclop/telegram_inference_bot.py
T

263 lines
9.8 KiB
Python
Raw Normal View History

"""
telegram_inference_bot.py
This module implements a Telegram bot that processes user messages and
images, interacts with the OpenAI API to provide intelligent responses,
and manages conversation history and temporary file storage for images.
It supports the following commands:
- /start: Initializes the bot and welcomes the user.
- /clear: Clears the user's conversation history and any uploaded images.
"""
2024-08-17 13:00:37 -05:00
import json
2024-08-16 12:43:59 -05:00
import os
import importlib
import inspect
2024-08-17 09:28:17 -05:00
import tempfile
import base64
import logging
2024-08-16 12:43:59 -05:00
from telegram import Update
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes
from openai import OpenAI
from dotenv import load_dotenv
from tools.base_tool import BaseTool
# Load environment variables
load_dotenv()
client = OpenAI()
2024-08-17 09:28:17 -05:00
GPT_4O = "gpt-4o"
GPT_4O_MINI = "gpt-4o-mini"
2024-08-17 13:00:37 -05:00
class StringFilter(logging.Filter):
def __init__(self, strings_to_filter):
super().__init__()
self.strings_to_filter = strings_to_filter
def filter(self, record):
return not any(s in record.getMessage() for s in self.strings_to_filter)
strings_to_filter = ['unwanted_string_1', 'unwanted_string_2'] # Change these to the specific strings you want to filter out
# Set up logging to console and file
logging.basicConfig(level=logging.INFO, handlers=[
logging.StreamHandler(),
logging.FileHandler('logs/output.log', mode='a')
])
logging.getLogger().addFilter(StringFilter(strings_to_filter))
2024-08-16 12:43:59 -05:00
# Set up Telegram bot
TELEGRAM_BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN')
# Load system prompt
with open("prompts/developer_prompt.txt", "r") as file:
system_prompt = file.read().strip()
2024-08-16 12:43:59 -05:00
# Dictionary to store conversation history for each user
conversation_history = {}
2024-08-17 09:28:17 -05:00
# Dictionary to store the last image file for each user
user_images = {}
2024-08-16 12:43:59 -05:00
# Load tools
tools = []
tools_dir = os.path.join(os.path.dirname(__file__), 'tools')
for filename in os.listdir(tools_dir):
if filename.endswith('.py') and filename != '__init__.py' and filename != 'base_tool.py':
module_name = f'tools.{filename[:-3]}'
module = importlib.import_module(module_name)
for name, obj in inspect.getmembers(module):
if inspect.isclass(obj) and issubclass(obj, BaseTool) and obj != BaseTool:
tools.append(obj())
# Collect all function definitions
functions = []
for tool in tools:
functions.extend(tool.get_functions())
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""
Handles the /start command. Sends a welcome message to the user.
Args:
update (Update): Incoming update containing the message.
context (ContextTypes.DEFAULT_TYPE): Context for the callback.
"""
logging.info("Bot started")
2024-08-17 09:28:17 -05:00
await update.message.reply_text("Hello! I'm your AI assistant. How can I help you today? You can send me images and then ask questions about them.")
2024-08-16 12:43:59 -05:00
async def clear(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""
Handles the /clear command. Clears the user's conversation history
and any uploaded images.
Args:
update (Update): Incoming update containing the message.
context (ContextTypes.DEFAULT_TYPE): Context for the callback.
"""
2024-08-16 12:43:59 -05:00
user_id = update.effective_user.id
if user_id in conversation_history:
del conversation_history[user_id]
2024-08-17 09:28:17 -05:00
if user_id in user_images:
os.remove(user_images[user_id])
del user_images[user_id]
logging.info(f"Cleared conversation history and image for user {user_id}")
2024-08-17 09:28:17 -05:00
await update.message.reply_text("Conversation history and image cleared. Let's start fresh!")
async def handle_image(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""
Handles image uploads from users. Downloads the largest photo to
a temporary file for further processing.
Args:
update (Update): Incoming update containing the image.
context (ContextTypes.DEFAULT_TYPE): Context for the callback.
"""
2024-08-17 09:28:17 -05:00
user_id = update.effective_user.id
# Get the largest available photo
photo = max(update.message.photo, key=lambda x: x.file_size)
# Download the photo
photo_file = await context.bot.get_file(photo.file_id)
# Create a temporary file to store the image
with tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') as temp_file:
await photo_file.download_to_drive(custom_path=temp_file.name)
user_images[user_id] = temp_file.name
logging.info(f"Received image from user {user_id}")
2024-08-17 09:28:17 -05:00
await update.message.reply_text("I've received your image. What would you like to know about it?")
2024-08-16 12:43:59 -05:00
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""
Processes incoming text messages from users, calls the OpenAI API
for inference, and sends the assistant's reply back to the user.
Args:
update (Update): Incoming update containing the text message.
context (ContextTypes.DEFAULT_TYPE): Context for the callback.
"""
2024-08-16 12:43:59 -05:00
try:
user_id = update.effective_user.id
user_message = update.message.text
logging.info(f"Message from user {user_id}: {user_message}")
2024-08-16 12:43:59 -05:00
# Initialize conversation history for new users
if user_id not in conversation_history:
conversation_history[user_id] = []
# Add user message to conversation history
conversation_history[user_id].append({"role": "user", "content": user_message})
# Prepare messages for OpenAI API
2024-08-17 13:00:37 -05:00
messages = [{"role": "system", "content": system_prompt}] + conversation_history[user_id]
2024-08-16 12:43:59 -05:00
2024-08-17 09:28:17 -05:00
# Check if there's an image to process
if user_id in user_images:
with open(user_images[user_id], "rb") as image_file:
response = client.chat.completions.create(
model=GPT_4O_MINI,
messages=[
{
"role": "user",
"content": [
{"type": "text", "text": user_message},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{base64.b64encode(image_file.read()).decode('utf-8')}"
}
},
],
}
],
2024-08-17 13:00:37 -05:00
max_tokens=16384
2024-08-17 09:28:17 -05:00
)
# Remove the temporary image file
os.remove(user_images[user_id])
del user_images[user_id]
else:
# Call OpenAI API for inference (text-only)
response = get_chat_response(client, messages, 4096, GPT_4O)
2024-08-16 12:43:59 -05:00
# Extract the assistant's reply
assistant_message = response.choices[0].message
2024-08-17 13:00:37 -05:00
toolUseCount = 0
2024-08-17 09:28:17 -05:00
if hasattr(assistant_message, 'function_call') and assistant_message.function_call:
2024-08-17 18:50:30 -05:00
while hasattr(assistant_message, 'function_call') and assistant_message.function_call and toolUseCount < 50: # Todo: put amount in env
2024-08-17 13:00:37 -05:00
tool_response = call_tool(assistant_message.function_call, messages)
conversation_history[user_id].append({"role": "function", "name": assistant_message.function_call.name, "content": json.dumps(tool_response)})
messages.append({
"role": "function",
"name": assistant_message.function_call.name,
"content": json.dumps(tool_response)
})
# Call API again to get the final response
assistant_message = get_chat_response(client, messages, 4096, GPT_4O).choices[0].message
2024-08-17 13:00:37 -05:00
if not hasattr(assistant_message, 'function_call') or not assistant_message.function_call:
assistant_reply = assistant_message.content
conversation_history[user_id].append({"role": "assistant", "content": assistant_reply})
2024-08-16 12:43:59 -05:00
else:
assistant_reply = assistant_message.content
2024-08-17 13:00:37 -05:00
# Add assistant's reply to conversation history
conversation_history[user_id].append({"role": "assistant", "content": assistant_reply})
2024-08-16 12:43:59 -05:00
# Trim conversation history if it gets too long (e.g., keep last 10 messages)
if len(conversation_history[user_id]) > 10:
conversation_history[user_id] = conversation_history[user_id][-10:]
# Send the reply back to the user
await update.message.reply_text(assistant_reply)
except Exception as e:
logging.error(f"An error occurred: {str(e)}")
2024-08-16 12:43:59 -05:00
await update.message.reply_text("Sorry, an error occurred while processing your request.")
2024-08-17 13:00:37 -05:00
def call_tool(function_call, messages):
"""
Executes a specific tool function based on the function call
provided by the assistant.
Args:
function_call: The function call object containing the function name
and arguments.
messages: The conversation messages to pass to the tool.
Returns:
The response from the executed tool function.
"""
# implementation...
2024-08-17 13:00:37 -05:00
def get_chat_response(client, messages, max_tokens, model):
"""
Calls the OpenAI API to get a chat response based on the messages.
Args:
client: The OpenAI client instance.
messages: The messages to send to the API.
max_tokens: Maximum number of tokens for the response.
model: The model to use for the API call.
Returns:
The response from the OpenAI API.
"""
# implementation...
2024-08-17 13:00:37 -05:00
2024-08-16 12:43:59 -05:00
def main() -> None:
"""
Main function that initializes the Telegram bot, sets up the
command handlers, and starts polling for updates.
"""
# implementation...
2024-08-16 12:43:59 -05:00
if __name__ == '__main__':
2024-08-17 13:00:37 -05:00
main()