Files
cyclop/telegram_inference_bot.py
T

151 lines
5.9 KiB
Python

import json
import os
import importlib
import inspect
import logging
from telegram import Update
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes
from dotenv import load_dotenv
from tools.base_tool import BaseTool
from ai_providers import create_ai_provider, AnthropicProvider, OpenAIProvider
# Load environment variables
load_dotenv()
# Set up logging to console and file
logging.basicConfig(level=logging.WARNING, handlers=[
logging.StreamHandler(),
logging.FileHandler('logs/output.log', mode='a')
])
# Set up Telegram bot
TELEGRAM_BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN')
# Load system prompt
with open("prompts/developer_prompt.txt", "r") as file:
system_prompt = file.read().strip()
# Dictionary to store conversation history for each user
conversation_history = {}
# Load tools
tools = []
tools_dir = os.path.join(os.path.dirname(__file__), 'tools')
for filename in os.listdir(tools_dir):
if filename.endswith('.py') and filename != '__init__.py' and filename != 'base_tool.py':
module_name = f'tools.{filename[:-3]}'
module = importlib.import_module(module_name)
for name, obj in inspect.getmembers(module):
if inspect.isclass(obj) and issubclass(obj, BaseTool) and obj != BaseTool:
tools.append(obj())
# Collect all function definitions
functions = []
for tool in tools:
functions.extend(tool.get_functions())
# Initialize AI provider
ai_provider = create_ai_provider("anthropic")
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
logging.info("Bot started")
await update.message.reply_text("Hello! I'm your AI assistant. How can I help you today? You can send me images and then ask questions about them.")
async def clear(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
user_id = update.effective_user.id
if user_id in conversation_history:
del conversation_history[user_id]
for tool in tools:
tool.clear()
logging.info(f"Cleared conversation history and image for user {user_id}")
await update.message.reply_text("Conversation history and image cleared. Let's start fresh!")
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
try:
user_id = update.effective_user.id
user_message = update.message.text
logging.info(f"Message from user {user_id}: {user_message}")
if user_id not in conversation_history:
conversation_history[user_id] = []
conversation_history[user_id].append({"role": "user", "content": user_message})
messages = conversation_history[user_id]
response = ai_provider.get_chat_response([{"role": "system", "content": system_prompt}] + messages)
tool_calls = ai_provider.format_tool_calls(response)
toolUseCount = 0
while len(tool_calls) > 0 and toolUseCount < 50:
tool_call = tool_calls.pop(0)
function_name = tool_call.name
tool_response = call_tool(tool_call)
formatted_result = ai_provider.format_tool_result(tool_call, tool_response)
messages.append(formatted_result)
response = ai_provider.get_chat_response([{"role": "system", "content": system_prompt}] + messages)
tool_calls = ai_provider.format_tool_calls(response)
toolUseCount += 1
if toolUseCount == 0:
assistant_reply = ai_provider.format_assistant_reply(response)
conversation_history[user_id].append({"role": "assistant", "content": assistant_reply})
if len(conversation_history[user_id]) > 20:
conversation_history[user_id] = conversation_history[user_id][-20:]
await update.message.reply_text(ai_provider.get_reply_text(response))
except Exception as e:
logging.error(f"An error occurred: {str(e)}")
await update.message.reply_text("Sorry, an error occurred while processing your request.")
def call_tool(function_call):
function_name = function_call.name
function_args = json.loads(function_call.arguments if hasattr(function_call, 'arguments') else json.dumps(function_call.input))
for tool in tools:
if function_name in [f["name"] for f in tool.get_functions()]:
return tool.execute(function_name, **function_args)
async def switch(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
global ai_provider
model = ai_provider.switch_model()
logging.info(f"Switched to model: {model}")
await update.message.reply_text(f"Switched to model: {model}")
async def switch_providers(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
await clear(update, context)
global ai_provider
new_provider_name = "openai" if isinstance(ai_provider, AnthropicProvider) else "anthropic"
ai_provider = create_ai_provider(new_provider_name)
logging.info(f"Switched to {new_provider_name} provider")
await update.message.reply_text(f"Switched to {new_provider_name} provider")
async def status(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
await update.message.reply_text(f"Currently using {ai_provider.__class__.__name__}: {ai_provider.get_model()}")
def main() -> None:
# Create the Application and pass it your bot's token
application = Application.builder().token(TELEGRAM_BOT_TOKEN).build()
# Add handlers
application.add_handler(CommandHandler("start", start))
application.add_handler(CommandHandler("clear", clear))
application.add_handler(CommandHandler("switch", switch))
application.add_handler(CommandHandler("toggle", switch_providers))
application.add_handler(CommandHandler("status", status))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
# Start the Bot
logging.info("Bot is running...")
application.run_polling()
if __name__ == '__main__':
main()