187 lines
6.8 KiB
Python
187 lines
6.8 KiB
Python
import json
|
|
import os
|
|
import importlib
|
|
import inspect
|
|
import logging
|
|
from telegram import Update
|
|
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes
|
|
from dotenv import load_dotenv
|
|
from tools.base_tool import BaseTool
|
|
from ai_providers import create_ai_provider, AnthropicProvider, OpenAIProvider
|
|
|
|
# Load environment variables
|
|
load_dotenv()
|
|
|
|
# Set up logging to console and file
|
|
logging.basicConfig(level=logging.WARNING, handlers=[
|
|
logging.StreamHandler(),
|
|
logging.FileHandler('logs/output.log', mode='a')
|
|
])
|
|
|
|
# Set up Telegram bot
|
|
TELEGRAM_BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN')
|
|
|
|
# Load system prompt
|
|
with open("prompts/developer_prompt.txt", "r") as file:
|
|
system_prompt = file.read().strip()
|
|
|
|
# Dictionary to store conversation history for each user
|
|
conversation_history = {}
|
|
|
|
# Load tools
|
|
tools = []
|
|
tools_dir = os.path.join(os.path.dirname(__file__), 'tools')
|
|
for filename in os.listdir(tools_dir):
|
|
if filename.endswith('.py') and filename != '__init__.py' and filename != 'base_tool.py':
|
|
module_name = f'tools.{filename[:-3]}'
|
|
module = importlib.import_module(module_name)
|
|
for name, obj in inspect.getmembers(module):
|
|
if inspect.isclass(obj) and issubclass(obj, BaseTool) and obj != BaseTool:
|
|
tools.append(obj())
|
|
|
|
# Collect all function definitions
|
|
functions = []
|
|
for tool in tools:
|
|
functions.extend(tool.get_functions())
|
|
|
|
# Initialize AI provider
|
|
ai_provider = create_ai_provider("anthropic")
|
|
|
|
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
logging.info("Bot started")
|
|
await update.message.reply_text("Hello! I'm your AI assistant. How can I help you today? You can send me images and then ask questions about them.")
|
|
|
|
async def clear(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
user_id = update.effective_user.id
|
|
if user_id in conversation_history:
|
|
del conversation_history[user_id]
|
|
for tool in tools:
|
|
tool.clear()
|
|
|
|
logging.info(f"Cleared conversation history and image for user {user_id}")
|
|
await update.message.reply_text("Conversation history and image cleared. Let's start fresh!")
|
|
|
|
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
try:
|
|
user_id = update.effective_user.id
|
|
user_message = update.message.text
|
|
|
|
logging.info(f"Message from user {user_id}: {user_message}")
|
|
|
|
if user_id not in conversation_history:
|
|
conversation_history[user_id] = []
|
|
|
|
conversation_history[user_id].append({"role": "user", "content": user_message})
|
|
|
|
messages = conversation_history[user_id]
|
|
|
|
response = ai_provider.get_chat_response([{"role": "system", "content": system_prompt}] + messages)
|
|
tool_calls = ai_provider.format_tool_calls(response)
|
|
|
|
toolUseCount = 0
|
|
|
|
while len(tool_calls) > 0 and toolUseCount < 50:
|
|
tool_call = tool_calls.pop(0)
|
|
function_name = tool_call.name
|
|
|
|
tool_response = call_tool(tool_call)
|
|
|
|
formatted_result = ai_provider.format_tool_result(tool_call, tool_response)
|
|
messages.append(formatted_result)
|
|
|
|
response = ai_provider.get_chat_response([{"role": "system", "content": system_prompt}] + messages)
|
|
tool_calls = ai_provider.format_tool_calls(response)
|
|
|
|
toolUseCount += 1
|
|
|
|
if toolUseCount == 0:
|
|
assistant_reply = ai_provider.format_assistant_reply(response)
|
|
conversation_history[user_id].append({"role": "assistant", "content": assistant_reply})
|
|
|
|
if len(conversation_history[user_id]) > 20:
|
|
conversation_history[user_id] = conversation_history[user_id][-20:]
|
|
|
|
await update.message.reply_text(ai_provider.get_reply_text(response))
|
|
|
|
except Exception as e:
|
|
logging.error(f"An error occurred: {str(e)}")
|
|
await update.message.reply_text("Sorry, an error occurred while processing your request.")
|
|
|
|
def call_tool(function_call):
|
|
function_name = function_call.name
|
|
function_args = json.loads(function_call.arguments if hasattr(function_call, 'arguments') else json.dumps(function_call.input))
|
|
for tool in tools:
|
|
if function_name in [f["name"] for f in tool.get_functions()]:
|
|
return tool.execute(function_name, **function_args)
|
|
|
|
async def switch(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
global ai_provider
|
|
model = ai_provider.switch_model()
|
|
logging.info(f"Switched to model: {model}")
|
|
await update.message.reply_text(f"Switched to model: {model}")
|
|
|
|
async def switch_providers(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
await clear(update, context)
|
|
global ai_provider
|
|
new_provider_name = "openai" if isinstance(ai_provider, AnthropicProvider) else "anthropic"
|
|
ai_provider = create_ai_provider(new_provider_name)
|
|
logging.info(f"Switched to {new_provider_name} provider")
|
|
await update.message.reply_text(f"Switched to {new_provider_name} provider")
|
|
|
|
async def status(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
global ai_provider, conversation_history, tools
|
|
|
|
provider_name = ai_provider.__class__.__name__
|
|
model = ai_provider.get_model()
|
|
|
|
total_conversations = sum(len(history) for history in conversation_history.values())
|
|
active_users = len(conversation_history)
|
|
|
|
available_tools = [tool.__class__.__name__ for tool in tools]
|
|
|
|
status_message = f"""
|
|
🤖 Bot Status Report 🤖
|
|
|
|
AI Provider: {provider_name}
|
|
Current Model: {model}
|
|
|
|
📊 Usage Statistics:
|
|
• Total Conversations: {total_conversations}
|
|
• Active Users: {active_users}
|
|
|
|
🛠 Available Tools ({len(available_tools)}):
|
|
{', '.join(available_tools)}
|
|
|
|
💡 Commands:
|
|
• /start - Start the bot
|
|
• /clear - Clear conversation history
|
|
• /switch - Switch AI model (OpenAI only)
|
|
• /toggle - Toggle between AI providers
|
|
• /status - Show this status report
|
|
|
|
🔧 System Info:
|
|
• Python version: {os.sys.version.split()[0]}
|
|
• Telegram Bot API version: {Application.VERSION}
|
|
"""
|
|
|
|
logging.info("Status command executed")
|
|
await update.message.reply_text(status_message)
|
|
|
|
def main() -> None:
|
|
# Create the Application and pass it your bot's token
|
|
application = Application.builder().token(TELEGRAM_BOT_TOKEN).build()
|
|
|
|
# Add handlers
|
|
application.add_handler(CommandHandler("start", start))
|
|
application.add_handler(CommandHandler("clear", clear))
|
|
application.add_handler(CommandHandler("switch", switch))
|
|
application.add_handler(CommandHandler("toggle", switch_providers))
|
|
application.add_handler(CommandHandler("status", status))
|
|
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
|
|
|
|
# Start the Bot
|
|
logging.info("Bot is running...")
|
|
application.run_polling()
|
|
|
|
if __name__ == '__main__':
|
|
main() |