diff --git a/anthropic_telegram_inference_bot.py b/anthropic_telegram_inference_bot.py index 2c39863..449f842 100644 --- a/anthropic_telegram_inference_bot.py +++ b/anthropic_telegram_inference_bot.py @@ -1,110 +1 @@ -import os -import json -import logging -from anthropic import Anthropic -from base_telegram_inference_bot import BaseTelegramInferenceBot -from telegram_helper import TelegramHelper - -class AnthropicTelegramInferenceBot(BaseTelegramInferenceBot): - def __init__(self): - super().__init__() - self.anthropic_client = Anthropic( - api_key=os.environ.get("ANTHROPIC_API_KEY"), - default_headers={"anthropic-beta": "max-tokens-3-5-sonnet-2024-07-15"} - ) - - def get_chat_response(self, messages): - anthropic_tools = [ - { - "name": function['name'], - "description": function['description'], - "input_schema": function['parameters'] if function['parameters'] not in [None, {}] else {"type": "object", "properties": {"param1": {"type": "string", "description": "Unnecessary"}}, "required": []} - } - for function in self.functions - ] - try: - response = self.anthropic_client.messages.create( - model="claude-3-5-sonnet-20240620", - system=self.system_prompt, - messages=messages, - max_tokens=8192, - tools=anthropic_tools, - tool_choice={"type": "auto"} - ) - except Exception as e: - logging.error(f"An error occurred: {str(e)}") - return None - - return response - - async def handle_message(self, user_id, user_message): - if user_id not in self.conversation_history: - self.conversation_history[user_id] = [] - - self.conversation_history[user_id].append({"role": "user", "content": user_message}) - messages = self.conversation_history[user_id] - - response = self.get_chat_response(messages) - tool_calls = [] - full_message = [] - for message_part in response.content: - full_message.append(message_part) - if message_part.type == "tool_use": - tool_calls.append(message_part) - messages.append({"role": "assistant", "content": full_message}) - - tool_use_count = 0 - while len(tool_calls) > 0 and tool_use_count < 50: - tool_use_results = [] - while len(tool_calls) > 0: - tool_call = tool_calls.pop(0) - tool_response = self.call_tool(tool_call.name, json.dumps(tool_call.input)) - tool_use_results.append({"type": "tool_result", "tool_use_id": tool_call.id, "content": json.dumps(tool_response)}) - - messages.append({"role": "user", "content": tool_use_results}) - - response = self.get_chat_response(messages) - full_message = [] - - for message_part in response.content: - full_message.append(message_part) - if message_part.type == "tool_use": - tool_calls.append(message_part) - messages.append({"role": "assistant", "content": full_message}) - - tool_use_count += 1 - - if (tool_use_count == 0): - assistant_reply = response.content - self.conversation_history[user_id].append({"role": "assistant", "content": assistant_reply}) - - if len(self.conversation_history[user_id]) > 20: - self.conversation_history[user_id] = self.conversation_history[user_id][-20:] - - return messages[-1]["content"][0].text - - async def start(self): - logging.info("Bot started") - - async def clear(self, user_id): - super().clear_conversation(user_id) - logging.info(f"Cleared conversation history and image for user {user_id}") - - async def status(self): - return "Currently using claude-3-5-sonnet-20240620" - - async def abort_processing(self, user_id): - if user_id in self.processing_status: - self.processing_status[user_id]["processing"] = False - await self.clear(user_id) - return "Processing aborted." - else: - return "No active processing to abort." - -def main(): - bot = AnthropicTelegramInferenceBot() - telegram_helper = TelegramHelper(bot) - telegram_helper.run() - -if __name__ == '__main__': - main() \ No newline at end of file +import os\nimport json\nimport logging\nfrom anthropic import Anthropic\nfrom base_telegram_inference_bot import BaseTelegramInferenceBot\nfrom telegram_helper import TelegramHelper\n\n# logging.basicConfig(level=logging.INFO) # Usually configured in main execution script\n\nclass AnthropicTelegramInferenceBot(BaseTelegramInferenceBot):\n def __init__(self):\n super().__init__()\n self.anthropic_client = Anthropic(api_key=os.environ.get(\"ANTHROPIC_API_KEY\"))\n # Note: default_headers for max_tokens with older models might be needed.\n # For Claude 3.5 Sonnet, max_tokens is a top-level param in messages.create\n \n # Configure model and tokens. Using Sonnet 3.5 as default.\n # ANTHROPIC_MODEL and ANTHROPIC_MAX_TOKENS would be new ENVs.\n self._configure_model_and_tokens(\n os.environ.get(\"ANTHROPIC_MODEL\", \"claude-3-5-sonnet-20240620\"),\n os.environ.get(\"ANTHROPIC_MAX_TOKENS\", \"4096\") # Default max tokens for Sonnet 3.5\n )\n\n def _configure_model_and_tokens(self, model_name, max_tokens_str, default_max_tokens=4096):\n self.model = model_name if model_name else \"claude-3-5-sonnet-20240620\"\n try:\n # Anthropic\'s max_tokens is an integer.\n self.max_tokens = int(max_tokens_str) if max_tokens_str is not None else default_max_tokens\n except ValueError:\n logging.error(f\"Invalid value for Anthropic max_tokens: {max_tokens_str}. Using default {default_max_tokens}.\")\n self.max_tokens = default_max_tokens\n logging.info(f\"Configured to use Anthropic model: {self.model} with max_tokens: {self.max_tokens}\")\n\n def get_system_prompt_description(self) -> str:\n system_prompt_path = os.getenv(\"SYSTEM_PROMPT_PATH\")\n if system_prompt_path and os.path.isfile(system_prompt_path):\n return f\"System Prompt File: {os.path.basename(system_prompt_path)}\"\n elif system_prompt_path:\n return f\"System Prompt File: {os.path.basename(system_prompt_path)} (Not found at path: {system_prompt_path})\"\n else:\n return \"System Prompt File: Not configured (SYSTEM_PROMPT_PATH not set).\"\n\n def get_llm_description(self) -> str:\n return f\"LLM: {self.model}, Max Tokens: {self.max_tokens}\"\n\n def get_chat_response(self, messages_history):\n # Anthropic expects messages in a specific format.\n # Filter out any non-compliant messages or transform if necessary.\n # For simplicity, assume messages_history is already compliant.\n \n # Ensure self.system_prompt is a string, not None.\n current_system_prompt = self.system_prompt if self.system_prompt else \"\"\n\n anthropic_tools = []\n if hasattr(self, \'functions\') and self.functions:\n anthropic_tools = [\n {\n \"name\": function[\'name\'],\n \"description\": function[\'description\'],\n \"input_schema\": function[\'parameters\'] if function[\'parameters\'] not in [None, {}] else {\"type\": \"object\", \"properties\": {}}\n }\n for function in self.functions\n ]\n \n try:\n response = self.anthropic_client.messages.create(\n model=self.model,\n system=current_system_prompt, # System prompt is a top-level parameter\n messages=messages_history, # User/assistant turns\n max_tokens=self.max_tokens, # Use configured max_tokens\n tools=anthropic_tools if anthropic_tools else None,\n tool_choice={\"type\": \"auto\"} if anthropic_tools else None\n )\n return response\n except Exception as e:\n logging.error(f\"Anthropic API call failed: {e}\")\n # Consider how to propagate this error: re-raise or return a specific error structure\n raise # Re-raising for now\n\n async def handle_message(self, user_id, user_message):\n if user_id not in self.conversation_history:\n self.conversation_history[user_id] = []\n # Anthropic doesn\'t use a \"system\" role message in the main `messages` list\n # It\'s passed as a separate `system` parameter to `messages.create`.\n # So, no need to add system_prompt to conversation_history here.\n\n self.conversation_history[user_id].append({\"role\": \"user\", \"content\": user_message})\n \n # Make a mutable copy for this turn, as we\'ll be appending tool calls and responses\n current_turn_messages = list(self.conversation_history[user_id])\n\n MAX_TOOL_ITERATIONS = 5\n tool_use_count = 0\n assistant_response_content = \"\" # Stores the textual part of the assistant's response\n\n while tool_use_count < MAX_TOOL_ITERATIONS:\n response = self.get_chat_response(current_turn_messages)\n\n if not response or not response.content:\n logging.error(\"No valid response content from Anthropic LLM.\")\n # Update conversation history with this attempt before returning an error message\n self.conversation_history[user_id] = current_turn_messages\n return \"Error: Could not get a valid response from the LLM.\"\n\n # Process response.content, which is a list of blocks (e.g., text, tool_use)\n # The assistant's complete turn (text and tool requests) forms one message.\n assistant_current_turn_content_blocks = response.content\n current_turn_messages.append({\"role\": \"assistant\", \"content\": assistant_current_turn_content_blocks})\n\n # Extract text parts and tool calls from the assistant's last message\n text_parts_from_assistant = []\n tool_calls_from_response = []\n for block in assistant_current_turn_content_blocks:\n if block.type == \"text\":\n text_parts_from_assistant.append(block.text)\n elif block.type == \"tool_use\":\n tool_calls_from_response.append(block)\n \n assistant_response_content = \"\".join(text_parts_from_assistant) # Update with latest text\n\n if not tool_calls_from_response: # No tools called, this is the final response from assistant\n break \n\n # If there are tool calls, prepare and add their results\n tool_results_for_model = []\n for tool_call in tool_calls_from_response:\n tool_name = tool_call.name\n tool_input = tool_call.input # This is a dict\n tool_use_id = tool_call.id\n \n logging.info(f\"Attempting to call Anthropic tool: {tool_name} with input: {tool_input}\")\n try:\n # self.call_tool should ideally return a JSON-serializable object or a string.\n # Anthropic expects tool result content to be a list of content blocks (e.g. text, image) or string.\n # For simplicity, we'll assume call_tool returns a string or a dict/list that can be made into a text block.\n tool_response_data = self.call_tool(tool_name, tool_input) \n \n # Format the tool response for Anthropic\n if isinstance(tool_response_data, str):\n tool_result_content_block = [{\"type\": \"text\", \"text\": tool_response_data}]\n elif isinstance(tool_response_data, dict) or isinstance(tool_response_data, list):\n # If it's already a list of blocks, use directly. Otherwise, wrap in text block.\n try:\n # Attempt to treat as pre-formatted list of blocks\n # This is unlikely unless call_tool is specifically designed for Anthropic output\n # A more robust way would be to json.dumps dict/list and put in a text block\n tool_result_content_block = tool_response_data if isinstance(tool_response_data, list) and all(isinstance(i, dict) and \"type\" in i for i in tool_response_data) else [{\"type\": \"text\", \"text\": json.dumps(tool_response_data)}]\n except TypeError:\ # Not easily serializable to JSON in a text block\n tool_result_content_block = [{\"type\": \"text\", \"text\": str(tool_response_data)}]\n else:\n tool_result_content_block = [{\"type\": \"text\", \"text\": str(tool_response_data)}]\n\n tool_results_for_model.append({\n \"type\": \"tool_result\",\n \"tool_use_id\": tool_use_id,\n \"content\": tool_result_content_block \n })\n except Exception as e:\n logging.error(f\"Error calling tool {tool_name}: {e}\")\n tool_results_for_model.append({\n \"type\": \"tool_result\",\n \"tool_use_id\": tool_use_id,\n \"content\": [{\"type\": \"text\", \"text\": f\"Error executing tool {tool_name}: {str(e)}\"}],\n \"is_error\": True \n })\n \n # Add tool results to the history for the next call to the model\n # This message has role \"user\" and content is a list of tool_result blocks.\n current_turn_messages.append({\"role\": \"user\", \"content\": tool_results_for_model})\n \n tool_use_count += 1\n if tool_use_count >= MAX_TOOL_ITERATIONS:\n logging.warning(f\"Max tool iterations ({MAX_TOOL_ITERATIONS}) reached for Anthropic.\")\n # The assistant_response_content already holds the text from the turn *before* this iteration failed to complete tools.\n break\n \n # Persist the full turn history (including successful tool exchanges) \n self.conversation_history[user_id] = current_turn_messages\n \n # Manage conversation history length\n if len(self.conversation_history[user_id]) > 20: # Consider a more robust token-based limit\n # A more sophisticated truncation would ensure the message list integrity (e.g. not cutting mid-tool-exchange)\n self.conversation_history[user_id] = self.conversation_history[user_id][-20:]\n\n # Return the last accumulated textual content from the assistant. This would be the text from the\n # assistant's turn before breaking the loop (either due to no more tools, or max iterations).\n if assistant_response_content:\n return assistant_response_content\n else:\n # Fallback: if assistant_response_content is empty (e.g. error on first API call, or only tool_use)\n # Try to get the *very last* text block from the *very last* assistant message in history.\n if current_turn_messages:\n last_message = current_turn_messages[-1]\n if last_message[\"role\"] == \"assistant\" and isinstance(last_message[\"content\"], list):\n for block in reversed(last_message[\"content\"]):\n if block.type == \"text\":\n return block.text\n return \"No textual response from assistant.\"\n\n\n async def start(self):\n logging.info(\"Anthropic Bot started\")\n # super().start() if applicable\n\n async def clear(self, user_id):\n super().clear_conversation(user_id)\n logging.info(f\"Cleared conversation history for Anthropic bot, user {user_id}\")\n\n # status() method is inherited from BaseTelegramInferenceBot\n\n async def abort_processing(self, user_id):\n if user_id in self.processing_status: # Relies on processing_status from Base\n self.processing_status[user_id][\"processing\"] = False\n await self.clear(user_id)\n return \"Processing aborted and conversation cleared.\"\n else:\n await self.clear(user_id)\n return \"No active processing found to abort. Conversation cleared.\"\n\n async def switch_model(self):\n primary_model = os.environ.get(\"ANTHROPIC_MODEL\", \"claude-3-5-sonnet-20240620\")\n primary_max_tokens = os.environ.get(\"ANTHROPIC_MAX_TOKENS\", \"4096\")\n \n secondary_model_env = os.environ.get(\"ANTHROPIC_SECONDARY_MODEL\")\n secondary_max_tokens_env = os.environ.get(\"ANTHROPIC_SECONDARY_MAX_TOKENS\")\n\n if not secondary_model_env: \n logging.warning(\"ANTHROPIC_SECONDARY_MODEL not defined. Cannot switch model.\")\n return f\"Model switching not configured. Currently using {self.model}.\"\n\n # Determine target model and tokens\n if self.model == primary_model:\n target_model = secondary_model_env\n # Use secondary token ENV if set, else a reasonable default, or primary's if makes sense\n target_max_tokens = secondary_max_tokens_env if secondary_max_tokens_env else \"2048\" \n else: # Assumes current is secondary or an unknown model, switch to primary\n target_model = primary_model\n target_max_tokens = primary_max_tokens\n \n self._configure_model_and_tokens(target_model, target_max_tokens)\n logging.info(f\"Switched Anthropic model to: {self.model}\")\n return f\"Switched to Anthropic model: {self.model}\"\n\ndef main():\n if not os.environ.get(\"ANTHROPIC_API_KEY\"):\n logging.error(\"FATAL: ANTHROPIC_API_KEY environment variable not set.\")\n return\n\n logging.basicConfig(level=logging.INFO, format=\'%(asctime)s - %(name)s - %(levelname)s - %(message)s\')\n \n bot = AnthropicTelegramInferenceBot()\n telegram_helper = TelegramHelper(bot)\n telegram_helper.run()\n\nif __name__ == \'__main__\':\n main()\n \ No newline at end of file