206 lines
9.2 KiB
Python
206 lines
9.2 KiB
Python
import json
|
|
import os
|
|
import importlib
|
|
import inspect
|
|
import tempfile
|
|
import base64
|
|
from telegram import Update
|
|
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes
|
|
from openai import OpenAI
|
|
from dotenv import load_dotenv
|
|
from tools.base_tool import BaseTool
|
|
|
|
# Load environment variables
|
|
load_dotenv()
|
|
|
|
client = OpenAI()
|
|
|
|
GPT_4O = "gpt-4o"
|
|
GPT_4O_MINI = "gpt-4o-mini"
|
|
system_prompt = """Alright, imagine you're a savvy developer with a trusty toolkit. Your mission: to manage a repository like a maestro conducting an orchestra. You don't just wield tools; you dance with them. Let's craft a persona that embodies the essence of a repository wizard:
|
|
|
|
Organized Explorer: You know your way around the file system. You can list files in a directory with ease, and if a file's content is what you seek, reading it is a breeze.
|
|
|
|
Branch Botanist: Branches are your garden. You plant new ones, name them creatively, and make sure they stem from the right place. You keep an eye out for the SHA of the latest commit just for good measure.
|
|
|
|
Persistent Committer: Committing changes is your thrill. You've mastered the art of committing files with purpose, leaving behind a trail of meaningful messages.
|
|
|
|
Pull Request Protagonist: The stage is set for collaboration. You create pull requests with compelling titles and bodies, ensuring your contributions are seen and valued.
|
|
|
|
Code Detective: Whether it's tracking changes or searching for specific code, your investigative skills are top-notch. Histories and queries bend to your inquisitive will.
|
|
|
|
Guardian of the Current: You're always aware of your current branch, and can pivot as needed. Setting and getting the current branch is second nature to you.
|
|
|
|
Archaeologist of Commits: Need to dig up an old file version? No problem. You retrieve file contents from specific commit SHAs like unearthing hidden treasures.
|
|
|
|
Branch Cartographer: Charting out all branches helps you understand the lay of the land. You list them to keep track of your project's evolving terrain.
|
|
|
|
Imagine the possibilities—each tool a note in your grand symphony of repository management. No need to memorize; just embody the spirit of curiosity, precision, and orchestration. Ready to dive in?
|
|
"""
|
|
# Set up Telegram bot
|
|
TELEGRAM_BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN')
|
|
|
|
# Dictionary to store conversation history for each user
|
|
conversation_history = {}
|
|
|
|
# Dictionary to store the last image file for each user
|
|
user_images = {}
|
|
|
|
# Load tools
|
|
tools = []
|
|
tools_dir = os.path.join(os.path.dirname(__file__), 'tools')
|
|
for filename in os.listdir(tools_dir):
|
|
if filename.endswith('.py') and filename != '__init__.py' and filename != 'base_tool.py':
|
|
module_name = f'tools.{filename[:-3]}'
|
|
module = importlib.import_module(module_name)
|
|
for name, obj in inspect.getmembers(module):
|
|
if inspect.isclass(obj) and issubclass(obj, BaseTool) and obj != BaseTool:
|
|
tools.append(obj())
|
|
|
|
# Collect all function definitions
|
|
functions = []
|
|
for tool in tools:
|
|
functions.extend(tool.get_functions())
|
|
|
|
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
await update.message.reply_text("Hello! I'm your AI assistant. How can I help you today? You can send me images and then ask questions about them.")
|
|
|
|
async def clear(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
user_id = update.effective_user.id
|
|
if user_id in conversation_history:
|
|
del conversation_history[user_id]
|
|
if user_id in user_images:
|
|
os.remove(user_images[user_id])
|
|
del user_images[user_id]
|
|
await update.message.reply_text("Conversation history and image cleared. Let's start fresh!")
|
|
|
|
async def handle_image(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
user_id = update.effective_user.id
|
|
|
|
# Get the largest available photo
|
|
photo = max(update.message.photo, key=lambda x: x.file_size)
|
|
|
|
# Download the photo
|
|
photo_file = await context.bot.get_file(photo.file_id)
|
|
|
|
# Create a temporary file to store the image
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') as temp_file:
|
|
await photo_file.download_to_drive(custom_path=temp_file.name)
|
|
user_images[user_id] = temp_file.name
|
|
|
|
await update.message.reply_text("I've received your image. What would you like to know about it?")
|
|
|
|
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
try:
|
|
user_id = update.effective_user.id
|
|
user_message = update.message.text
|
|
|
|
# Initialize conversation history for new users
|
|
if user_id not in conversation_history:
|
|
conversation_history[user_id] = []
|
|
|
|
# Add user message to conversation history
|
|
conversation_history[user_id].append({"role": "user", "content": user_message})
|
|
|
|
# Prepare messages for OpenAI API
|
|
messages = [{"role": "system", "content": system_prompt}] + conversation_history[user_id]
|
|
|
|
# Check if there's an image to process
|
|
if user_id in user_images:
|
|
with open(user_images[user_id], "rb") as image_file:
|
|
response = client.chat.completions.create(
|
|
model=GPT_4O_MINI,
|
|
messages=[
|
|
{
|
|
"role": "user",
|
|
"content": [
|
|
{"type": "text", "text": user_message},
|
|
{
|
|
"type": "image_url",
|
|
"image_url": {
|
|
"url": f"data:image/jpeg;base64,{base64.b64encode(image_file.read()).decode('utf-8')}"
|
|
}
|
|
},
|
|
],
|
|
}
|
|
],
|
|
max_tokens=16384
|
|
)
|
|
# Remove the temporary image file
|
|
os.remove(user_images[user_id])
|
|
del user_images[user_id]
|
|
else:
|
|
# Call OpenAI API for inference (text-only)
|
|
response = get_chat_response(client, messages, 4096, GPT_4O)
|
|
|
|
# Extract the assistant's reply
|
|
assistant_message = response.choices[0].message
|
|
toolUseCount = 0
|
|
if hasattr(assistant_message, 'function_call') and assistant_message.function_call:
|
|
while hasattr(assistant_message, 'function_call') and assistant_message.function_call and toolUseCount < 10:
|
|
tool_response = call_tool(assistant_message.function_call, messages)
|
|
|
|
conversation_history[user_id].append({"role": "function", "name": assistant_message.function_call.name, "content": json.dumps(tool_response)})
|
|
messages.append({
|
|
"role": "function",
|
|
"name": assistant_message.function_call.name,
|
|
"content": json.dumps(tool_response)
|
|
})
|
|
|
|
# Call API again to get the final response
|
|
assistant_message = get_chat_response(client, messages, 4096, GPT_4O).choices[0].message
|
|
if not hasattr(assistant_message, 'function_call') or not assistant_message.function_call:
|
|
assistant_reply = assistant_message.content
|
|
conversation_history[user_id].append({"role": "assistant", "content": assistant_reply})
|
|
else:
|
|
assistant_reply = assistant_message.content
|
|
# Add assistant's reply to conversation history
|
|
conversation_history[user_id].append({"role": "assistant", "content": assistant_reply})
|
|
|
|
|
|
|
|
# Trim conversation history if it gets too long (e.g., keep last 10 messages)
|
|
if len(conversation_history[user_id]) > 10:
|
|
conversation_history[user_id] = conversation_history[user_id][-10:]
|
|
|
|
# Send the reply back to the user
|
|
await update.message.reply_text(assistant_reply)
|
|
|
|
except Exception as e:
|
|
print(f"An error occurred: {str(e)}")
|
|
await update.message.reply_text("Sorry, an error occurred while processing your request.")
|
|
|
|
def call_tool(function_call, messages):
|
|
# Execute the function
|
|
function_name = function_call.name
|
|
function_args = function_call.arguments
|
|
for tool in tools:
|
|
if function_name in [f["name"] for f in tool.get_functions()]:
|
|
return tool.execute(function_name, **eval(function_args))
|
|
|
|
def get_chat_response(client, messages, max_tokens, model):
|
|
response = client.chat.completions.create(
|
|
model=model,
|
|
messages=messages,
|
|
functions=functions,
|
|
function_call="auto",
|
|
max_tokens=max_tokens
|
|
)
|
|
return response
|
|
|
|
def main() -> None:
|
|
# Create the Application and pass it your bot's token
|
|
application = Application.builder().token(TELEGRAM_BOT_TOKEN).build()
|
|
|
|
# Add handlers
|
|
application.add_handler(CommandHandler("start", start))
|
|
application.add_handler(CommandHandler("clear", clear))
|
|
application.add_handler(MessageHandler(filters.PHOTO, handle_image))
|
|
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
|
|
|
|
# Start the Bot
|
|
print("Bot is running...")
|
|
application.run_polling()
|
|
|
|
if __name__ == '__main__':
|
|
main() |