Files
cyclop/telegram_inference_bot.py
T

187 lines
6.8 KiB
Python
Raw Normal View History

2024-08-17 13:00:37 -05:00
import json
2024-08-16 12:43:59 -05:00
import os
import importlib
import inspect
import logging
2024-08-16 12:43:59 -05:00
from telegram import Update
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes
from dotenv import load_dotenv
from tools.base_tool import BaseTool
2024-08-18 13:20:33 -05:00
from ai_providers import create_ai_provider, AnthropicProvider, OpenAIProvider
2024-08-16 12:43:59 -05:00
# Load environment variables
load_dotenv()
# Set up logging to console and file
2024-08-18 07:35:52 -05:00
logging.basicConfig(level=logging.WARNING, handlers=[
logging.StreamHandler(),
logging.FileHandler('logs/output.log', mode='a')
])
2024-08-16 12:43:59 -05:00
# Set up Telegram bot
TELEGRAM_BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN')
# Load system prompt
with open("prompts/developer_prompt.txt", "r") as file:
system_prompt = file.read().strip()
2024-08-16 12:43:59 -05:00
# Dictionary to store conversation history for each user
conversation_history = {}
# Load tools
tools = []
tools_dir = os.path.join(os.path.dirname(__file__), 'tools')
for filename in os.listdir(tools_dir):
if filename.endswith('.py') and filename != '__init__.py' and filename != 'base_tool.py':
module_name = f'tools.{filename[:-3]}'
module = importlib.import_module(module_name)
for name, obj in inspect.getmembers(module):
if inspect.isclass(obj) and issubclass(obj, BaseTool) and obj != BaseTool:
tools.append(obj())
# Collect all function definitions
functions = []
for tool in tools:
functions.extend(tool.get_functions())
2024-08-18 12:54:11 -05:00
# Initialize AI provider
ai_provider = create_ai_provider("anthropic")
2024-08-16 12:43:59 -05:00
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
logging.info("Bot started")
2024-08-17 09:28:17 -05:00
await update.message.reply_text("Hello! I'm your AI assistant. How can I help you today? You can send me images and then ask questions about them.")
2024-08-16 12:43:59 -05:00
async def clear(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
user_id = update.effective_user.id
if user_id in conversation_history:
del conversation_history[user_id]
for tool in tools:
tool.clear()
logging.info(f"Cleared conversation history and image for user {user_id}")
2024-08-17 09:28:17 -05:00
await update.message.reply_text("Conversation history and image cleared. Let's start fresh!")
2024-08-16 12:43:59 -05:00
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
try:
user_id = update.effective_user.id
user_message = update.message.text
logging.info(f"Message from user {user_id}: {user_message}")
2024-08-16 12:43:59 -05:00
if user_id not in conversation_history:
conversation_history[user_id] = []
conversation_history[user_id].append({"role": "user", "content": user_message})
2024-08-18 08:55:22 -05:00
messages = conversation_history[user_id]
2024-08-16 12:43:59 -05:00
2024-08-18 12:54:11 -05:00
response = ai_provider.get_chat_response([{"role": "system", "content": system_prompt}] + messages)
tool_calls = ai_provider.format_tool_calls(response)
2024-08-16 12:43:59 -05:00
2024-08-18 10:40:59 -05:00
toolUseCount = 0
2024-08-16 12:43:59 -05:00
2024-08-18 10:40:59 -05:00
while len(tool_calls) > 0 and toolUseCount < 50:
tool_call = tool_calls.pop(0)
function_name = tool_call.name
tool_response = call_tool(tool_call)
2024-08-18 12:16:03 -05:00
formatted_result = ai_provider.format_tool_result(tool_call, tool_response)
2024-08-18 12:16:03 -05:00
messages.append(formatted_result)
2024-08-18 10:40:59 -05:00
2024-08-18 12:54:11 -05:00
response = ai_provider.get_chat_response([{"role": "system", "content": system_prompt}] + messages)
tool_calls = ai_provider.format_tool_calls(response)
2024-08-18 10:40:59 -05:00
toolUseCount += 1
2024-08-18 12:54:11 -05:00
if toolUseCount == 0:
assistant_reply = ai_provider.format_assistant_reply(response)
2024-08-18 12:16:03 -05:00
conversation_history[user_id].append({"role": "assistant", "content": assistant_reply})
2024-08-18 10:40:59 -05:00
if len(conversation_history[user_id]) > 20:
conversation_history[user_id] = conversation_history[user_id][-20:]
await update.message.reply_text(ai_provider.get_reply_text(response))
2024-08-16 12:43:59 -05:00
except Exception as e:
logging.error(f"An error occurred: {str(e)}")
2024-08-16 12:43:59 -05:00
await update.message.reply_text("Sorry, an error occurred while processing your request.")
2024-08-18 07:35:52 -05:00
def call_tool(function_call):
2024-08-18 12:54:11 -05:00
function_name = function_call.name
function_args = json.loads(function_call.arguments if hasattr(function_call, 'arguments') else json.dumps(function_call.input))
2024-08-17 13:00:37 -05:00
for tool in tools:
if function_name in [f["name"] for f in tool.get_functions()]:
2024-08-18 12:54:11 -05:00
return tool.execute(function_name, **function_args)
2024-08-18 08:55:22 -05:00
async def switch(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
2024-08-18 12:54:11 -05:00
global ai_provider
model = ai_provider.switch_model()
logging.info(f"Switched to model: {model}")
await update.message.reply_text(f"Switched to model: {model}")
2024-08-18 07:35:52 -05:00
2024-08-18 10:40:59 -05:00
async def switch_providers(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
2024-08-18 12:16:03 -05:00
await clear(update, context)
2024-08-18 12:54:11 -05:00
global ai_provider
new_provider_name = "openai" if isinstance(ai_provider, AnthropicProvider) else "anthropic"
ai_provider = create_ai_provider(new_provider_name)
logging.info(f"Switched to {new_provider_name} provider")
await update.message.reply_text(f"Switched to {new_provider_name} provider")
2024-08-18 07:35:52 -05:00
async def status(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
global ai_provider, conversation_history, tools
provider_name = ai_provider.__class__.__name__
model = ai_provider.get_model()
total_conversations = sum(len(history) for history in conversation_history.values())
active_users = len(conversation_history)
available_tools = [tool.__class__.__name__ for tool in tools]
status_message = f"""
🤖 Bot Status Report 🤖
AI Provider: {provider_name}
Current Model: {model}
📊 Usage Statistics:
• Total Conversations: {total_conversations}
• Active Users: {active_users}
🛠 Available Tools ({len(available_tools)}):
{', '.join(available_tools)}
💡 Commands:
• /start - Start the bot
• /clear - Clear conversation history
• /switch - Switch AI model (OpenAI only)
• /toggle - Toggle between AI providers
• /status - Show this status report
🔧 System Info:
• Python version: {os.sys.version.split()[0]}
• Telegram Bot API version: {Application.VERSION}
"""
logging.info("Status command executed")
await update.message.reply_text(status_message)
2024-08-18 07:35:52 -05:00
2024-08-16 12:43:59 -05:00
def main() -> None:
# Create the Application and pass it your bot's token
application = Application.builder().token(TELEGRAM_BOT_TOKEN).build()
# Add handlers
application.add_handler(CommandHandler("start", start))
application.add_handler(CommandHandler("clear", clear))
2024-08-18 07:35:52 -05:00
application.add_handler(CommandHandler("switch", switch))
2024-08-18 10:40:59 -05:00
application.add_handler(CommandHandler("toggle", switch_providers))
2024-08-18 07:35:52 -05:00
application.add_handler(CommandHandler("status", status))
2024-08-16 12:43:59 -05:00
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
# Start the Bot
logging.info("Bot is running...")
2024-08-16 12:43:59 -05:00
application.run_polling()
if __name__ == '__main__':
2024-08-17 13:00:37 -05:00
main()