diff --git a/anthropic_telegram_inference_bot.py b/anthropic_telegram_inference_bot.py index bb8c5c2..238163b 100644 --- a/anthropic_telegram_inference_bot.py +++ b/anthropic_telegram_inference_bot.py @@ -1,232 +1,104 @@ -import json import os -import importlib -import inspect +import json import logging -import asyncio -from telegram import error as TelegramErrors, Update, __version__ as telegram_version, InlineKeyboardButton, InlineKeyboardMarkup -from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes, CallbackQueryHandler -from dotenv import load_dotenv -from tools.base_tool import BaseTool -from tools.metrics_tool import MetricsTool from anthropic import Anthropic +from base_inference_bot import BaseInferenceBot +from telegram_helper import TelegramHelper -# Load environment variables -load_dotenv() +class AnthropicTelegramInferenceBot(BaseInferenceBot): + def __init__(self): + super().__init__() + self.anthropic_client = Anthropic( + api_key=os.environ.get("ANTHROPIC_API_KEY"), + default_headers={"anthropic-beta": "max-tokens-3-5-sonnet-2024-07-15"} + ) -anthropic_client = Anthropic( - api_key=os.environ.get("ANTHROPIC_API_KEY"), - default_headers={"anthropic-beta": "max-tokens-3-5-sonnet-2024-07-15"} -) - -# Set up logging to console and file -logging.basicConfig(level=logging.WARNING, handlers=[ - logging.StreamHandler(), - logging.FileHandler('logs/output.log', mode='a') -]) - -# Set up Telegram bot -TELEGRAM_BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN') - -# Load system prompt -with open("prompts/developer_prompt.txt", "r") as file: - system_prompt = file.read().strip() - -# Dictionary to store conversation history for each user -conversation_history = {} - -# Dictionary to store processing status for each user -processing_status = {} - -# Load tools -tools = [MetricsTool()] # Add MetricsTool instance -tools_dir = os.path.join(os.path.dirname(__file__), 'tools') -for filename in os.listdir(tools_dir): - if filename.endswith('.py') and filename not in ['__init__.py', 'base_tool.py', 'metrics_tool.py']: - module_name = f'tools.{filename[:-3]}' - module = importlib.import_module(module_name) - for name, obj in inspect.getmembers(module): - if inspect.isclass(obj) and issubclass(obj, BaseTool) and obj != BaseTool: - tools.append(obj()) - -# Collect all function definitions -functions = [] -for tool in tools: - functions.extend(tool.get_functions()) - -async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - logging.info("Bot started") - await update.message.reply_text( - "Hello! I'm your AI assistant. How can I help you today? You can send me images and then ask questions about them." - ) - -async def clear(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - user_id = update.effective_user.id - if user_id in conversation_history: - del conversation_history[user_id] - for tool in tools: - tool.clear() - - logging.info(f"Cleared conversation history and image for user {user_id}") - await update.message.reply_text("Conversation history and image cleared. Let's start fresh!") - -async def update_status_message(context: ContextTypes.DEFAULT_TYPE, chat_id: int, message_id: int, status: str): - keyboard = [ - [InlineKeyboardButton("Abort", callback_data='abort')] - ] - reply_markup = InlineKeyboardMarkup(keyboard) - await context.bot.edit_message_text( - chat_id=chat_id, - message_id=message_id, - text=f"Current status: {status}", - reply_markup=reply_markup - ) - -async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - try: - user_id = update.effective_user.id - user_message = update.message.text - - logging.info(f"Message from user {user_id}: {user_message}") - - if user_id not in conversation_history: - conversation_history[user_id] = [] - - conversation_history[user_id].append({"role": "user", "content": user_message}) - - # Send initial status message - status_message = await update.message.reply_text("Processing your request...", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("Abort", callback_data='abort')]])) - processing_status[user_id] = {"processing": True, "message_id": status_message.message_id} - - messages = conversation_history[user_id] - - response = get_chat_response(messages) - tool_calls = [] - fullMessage = [] - for message_part in response.content: - fullMessage.append(message_part) - if message_part.type == "tool_use": - tool_calls.append(message_part) - messages.append({"role": "assistant", "content": fullMessage}) - - toolUseCount = 0 - - previous_function_name = "" - - while len(tool_calls) > 0 and toolUseCount < 50 and processing_status[user_id]["processing"]: - tool_use_results = [] - while len(tool_calls) > 0: - tool_call = tool_calls.pop(0) - function_name = tool_call.name - - if previous_function_name != function_name: - await update_status_message(context, update.effective_chat.id, status_message.message_id, f"Using tool: {function_name}") - previous_function_name = function_name - - tool_response = call_tool(tool_call) - tool_use_results.append({"type": "tool_result", "tool_use_id": tool_call.id, "content": json.dumps(tool_response)}) - - formatted_result = {} - - formatted_result = {"role": "user", "content":tool_use_results} - - messages.append(formatted_result) - - response = get_chat_response(messages) - fullMessage = [] - for message_part in response.content: - fullMessage.append(message_part) - if message_part.type == "tool_use": - tool_calls.append(message_part) - messages.append({"role": "assistant", "content": fullMessage}) - - toolUseCount += 1 - - if (toolUseCount == 0): - assistant_reply = response.content - conversation_history[user_id].append({"role": "assistant", "content": assistant_reply}) - - if len(conversation_history[user_id]) > 20: - conversation_history[user_id] = conversation_history[user_id][-20:] - - # Remove the status message - await context.bot.delete_message(chat_id=update.effective_chat.id, message_id=status_message.message_id) - del processing_status[user_id] - try: - await update.message.reply_text(messages[-1]["content"][0].text) - except TelegramErrors.BadRequest as e: - logging.error(f"An error occurred when trying to send a message in telegram: {str(e)}") - - except Exception as e: - logging.error(f"An error occurred: {str(e)}") - await update.message.reply_text("Sorry, an error occurred while processing your request.") - -def call_tool(function_call): - function_name = function_call.name - function_args = json.dumps(function_call.input) - for tool in tools: - if function_name in [f["name"] for f in tool.get_functions()]: - return tool.execute(function_name, **json.loads(function_args)) - -def get_chat_response(messages): - return get_claude_response(messages) - -def get_claude_response(messages): - anthropic_tools = [ + def get_chat_response(self, messages): + anthropic_tools = [ { "name": function['name'], "description": function['description'], "input_schema": function['parameters'] if function['parameters'] not in [None, {}] else {"type": "object", "properties": {"param1": {"type": "string", "description": "Unnecessary"}}, "required": []} } - for function in functions + for function in self.functions ] - try: - response = anthropic_client.messages.create( - model="claude-3-5-sonnet-20240620", - system=system_prompt, - messages=messages, - max_tokens=8192, - tools=anthropic_tools - ) - except Exception as e: - logging.error(f"An error occurred: {str(e)}") - return None - - return response + try: + response = self.anthropic_client.messages.create( + model="claude-3-5-sonnet-20240620", + system=self.system_prompt, + messages=messages, + max_tokens=8192, + tools=anthropic_tools + ) + except Exception as e: + logging.error(f"An error occurred: {str(e)}") + return None + + return response -async def status(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - await update.message.reply_text("Currently using claude-3-5-sonnet-20240620") + async def handle_message(self, user_id, user_message): + if user_id not in self.conversation_history: + self.conversation_history[user_id] = [] -async def abort_processing(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - query = update.callback_query - await query.answer() - - user_id = query.from_user.id - if user_id in processing_status: - processing_status[user_id]["processing"] = False - await context.bot.edit_message_text( - chat_id=query.message.chat_id, - message_id=query.message.message_id, - text="Processing aborted." - ) - await clear(update, context) - else: - await query.edit_message_text(text="No active processing to abort.") + self.conversation_history[user_id].append({"role": "user", "content": user_message}) + messages = self.conversation_history[user_id] -def main() -> None: - # Create the Application and pass it your bot's token - application = Application.builder().token(TELEGRAM_BOT_TOKEN).build() + response = self.get_chat_response(messages) + tool_calls = [] + full_message = [] + for message_part in response.content: + full_message.append(message_part) + if message_part.type == "tool_use": + tool_calls.append(message_part) + messages.append({"role": "assistant", "content": full_message}) - # Add handlers - application.add_handler(CommandHandler("start", start)) - application.add_handler(CommandHandler("clear", clear)) - application.add_handler(CommandHandler("status", status)) - application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message)) - application.add_handler(CallbackQueryHandler(abort_processing, pattern='^abort$')) + tool_use_count = 0 + while len(tool_calls) > 0 and tool_use_count < 50: + tool_use_results = [] + for tool_call in tool_calls: + tool_response = self.call_tool(tool_call) + tool_use_results.append({"type": "tool_result", "tool_use_id": tool_call.id, "content": json.dumps(tool_response)}) - # Start the Bot - logging.info("Bot is running...") - application.run_polling() + messages.append({"role": "user", "content": tool_use_results}) + + response = self.get_chat_response(messages) + full_message = [] + tool_calls = [] + for message_part in response.content: + full_message.append(message_part) + if message_part.type == "tool_use": + tool_calls.append(message_part) + messages.append({"role": "assistant", "content": full_message}) + + tool_use_count += 1 + + if len(self.conversation_history[user_id]) > 20: + self.conversation_history[user_id] = self.conversation_history[user_id][-20:] + + return messages[-1]["content"][0].text + + async def start(self): + logging.info("Bot started") + + async def clear(self, user_id): + super().clear_conversation(user_id) + logging.info(f"Cleared conversation history and image for user {user_id}") + + async def status(self): + return "Currently using claude-3-5-sonnet-20240620" + + async def abort_processing(self, user_id): + if user_id in self.processing_status: + self.processing_status[user_id]["processing"] = False + await self.clear(user_id) + return "Processing aborted." + else: + return "No active processing to abort." + +def main(): + bot = AnthropicTelegramInferenceBot() + telegram_helper = TelegramHelper(bot) + telegram_helper.run() if __name__ == '__main__': main() \ No newline at end of file