Files
cyclop/telegram_inference_bot.py
T
2024-08-18 10:40:59 -05:00

233 lines
8.4 KiB
Python

import json
import os
import importlib
import inspect
import logging
import anthropic
from telegram import Update
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes
from openai import OpenAI
from dotenv import load_dotenv
from tools.base_tool import BaseTool
# Load environment variables
load_dotenv()
openai_client = OpenAI()
anthropic_client = anthropic.Anthropic(
api_key=os.environ.get("ANTHROPIC_API_KEY")
)
GPT_4O = "gpt-4o"
GPT_4O_MINI = "gpt-4o-mini"
model_max_tokens = {
GPT_4O: 4096,
GPT_4O_MINI: 16384
}
use_smart_model = True
use_anthropic = False
# Set up logging to console and file
logging.basicConfig(level=logging.WARNING, handlers=[
logging.StreamHandler(),
logging.FileHandler('logs/output.log', mode='a')
])
# Set up Telegram bot
TELEGRAM_BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN')
# Load system prompt
with open("prompts/developer_prompt.txt", "r") as file:
system_prompt = file.read().strip()
# Dictionary to store conversation history for each user
conversation_history = {}
# Load tools
tools = []
tools_dir = os.path.join(os.path.dirname(__file__), 'tools')
for filename in os.listdir(tools_dir):
if filename.endswith('.py') and filename != '__init__.py' and filename != 'base_tool.py':
module_name = f'tools.{filename[:-3]}'
module = importlib.import_module(module_name)
for name, obj in inspect.getmembers(module):
if inspect.isclass(obj) and issubclass(obj, BaseTool) and obj != BaseTool:
tools.append(obj())
# Collect all function definitions
functions = []
for tool in tools:
functions.extend(tool.get_functions())
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
logging.info("Bot started")
await update.message.reply_text("Hello! I'm your AI assistant. How can I help you today? You can send me images and then ask questions about them.")
async def clear(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
user_id = update.effective_user.id
if user_id in conversation_history:
del conversation_history[user_id]
for tool in tools:
tool.clear()
logging.info(f"Cleared conversation history and image for user {user_id}")
await update.message.reply_text("Conversation history and image cleared. Let's start fresh!")
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
try:
user_id = update.effective_user.id
user_message = update.message.text
logging.info(f"Message from user {user_id}: {user_message}")
if user_id not in conversation_history:
conversation_history[user_id] = []
conversation_history[user_id].append({"role": "user", "content": user_message})
messages = conversation_history[user_id]
response = get_chat_response(messages)
tool_calls = []
if use_anthropic:
for message in response.content:
if message.type == "tool_use":
tool_calls.append(message)
assistant_message = ""
else:
assistant_message = message
tool_calls = None
else:
assistant_message = response.choices[0].message
if hasattr(assistant_message, 'function_call'):
tool_calls.append(assistant_message.function_call)
toolUseCount = 0
while len(tool_calls) > 0 and toolUseCount < 50:
tool_call = tool_calls.pop(0)
function_name = tool_call.name
tool_response = call_tool(tool_call)
conversation_history[user_id].append({"role": "function", "name": function_name, "content": json.dumps(tool_response)})
messages.append({
"role": "function",
"name": function_name,
"content": json.dumps(tool_response)
})
response = get_chat_response(messages)
if use_anthropic:
for message in response.content:
if message.type == "tool_use":
tool_calls.append(message)
assistant_message = ""
else:
assistant_message = message
tool_calls = None
else:
assistant_message = response.choices[0].message
if assistant_message.function_call is not None:
tool_calls.append(assistant_message.function_call)
toolUseCount += 1
assistant_reply = assistant_message
conversation_history[user_id].append({"role": "assistant", "content": assistant_reply})
if len(conversation_history[user_id]) > 20:
conversation_history[user_id] = conversation_history[user_id][-20:]
await update.message.reply_text(assistant_reply.content if not use_anthropic else assistant_reply)
except Exception as e:
logging.error(f"An error occurred: {str(e)}")
await update.message.reply_text("Sorry, an error occurred while processing your request.")
def call_tool(function_call):
function_name = function_call.name if use_anthropic else function_call.name
function_args = "{}" if use_anthropic else function_call.arguments
for tool in tools:
if function_name in [f["name"] for f in tool.get_functions()]:
return tool.execute(function_name, **json.loads(function_args))
def get_chat_response(messages):
return get_claude_response(messages) if use_anthropic else get_openai_response(messages)
def get_openai_response(messages):
model = GPT_4O if use_smart_model else GPT_4O_MINI
response = openai_client.chat.completions.create(
model=model,
messages = [{"role": "system", "content": system_prompt}] + messages,
functions=functions,
function_call="auto",
max_tokens=model_max_tokens[model]
)
return response
def get_claude_response(messages):
anthropic_tools = [
{
"name": function['name'],
"description": function['description'],
"input_schema": function['parameters'] if function['parameters'] not in [None, {}] else {"type": "object", "properties": {"param1": {"type": "string", "description": "Unnecessary"}}, "required": []}
}
for function in functions
]
try:
response = anthropic_client.messages.create(
model="claude-3-sonnet-20240229",
system=system_prompt,
messages=[{"role": m["role"], "content": m["content"]} for m in messages],
max_tokens=4096,
tools=anthropic_tools
)
except Exception as e:
logging.error(f"An error occurred: {str(e)}")
return None
return response
async def switch(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
global use_smart_model
use_smart_model = not use_smart_model
model = GPT_4O if use_smart_model else GPT_4O_MINI
logging.info(f"Switched to model: {model}")
await update.message.reply_text(f"Switched to model: {model}")
async def switch_providers(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
global use_anthropic
use_anthropic = not use_anthropic
logging.info("Using Anthropic" if use_anthropic else "Using OpenAI")
await update.message.reply_text("Using Anthropic" if use_anthropic else "Using OpenAI")
async def status(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if use_anthropic:
await update.message.reply_text("Currently using claude-3-5-sonnet-20240620")
else:
model = GPT_4O if use_smart_model else GPT_4O_MINI
await update.message.reply_text(f"Currently using: {model}")
def main() -> None:
# Create the Application and pass it your bot's token
application = Application.builder().token(TELEGRAM_BOT_TOKEN).build()
# Add handlers
application.add_handler(CommandHandler("start", start))
application.add_handler(CommandHandler("clear", clear))
application.add_handler(CommandHandler("switch", switch))
application.add_handler(CommandHandler("toggle", switch_providers))
application.add_handler(CommandHandler("status", status))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
# Start the Bot
logging.info("Bot is running...")
application.run_polling()
if __name__ == '__main__':
main()