From 72b460bcfb69312edc78df4d0d0ff2ad542ca4e0 Mon Sep 17 00:00:00 2001 From: cyclop-bot <178948048+cyclop-bot@users.noreply.github.com> Date: Mon, 2 Jun 2025 14:43:51 -0500 Subject: [PATCH] fix: Correct formatting for Anthropic bot update --- anthropic_telegram_inference_bot.py | 224 +++++++++++++++++++++++++++- 1 file changed, 223 insertions(+), 1 deletion(-) diff --git a/anthropic_telegram_inference_bot.py b/anthropic_telegram_inference_bot.py index 449f842..116b00a 100644 --- a/anthropic_telegram_inference_bot.py +++ b/anthropic_telegram_inference_bot.py @@ -1 +1,223 @@ -import os\nimport json\nimport logging\nfrom anthropic import Anthropic\nfrom base_telegram_inference_bot import BaseTelegramInferenceBot\nfrom telegram_helper import TelegramHelper\n\n# logging.basicConfig(level=logging.INFO) # Usually configured in main execution script\n\nclass AnthropicTelegramInferenceBot(BaseTelegramInferenceBot):\n def __init__(self):\n super().__init__()\n self.anthropic_client = Anthropic(api_key=os.environ.get(\"ANTHROPIC_API_KEY\"))\n # Note: default_headers for max_tokens with older models might be needed.\n # For Claude 3.5 Sonnet, max_tokens is a top-level param in messages.create\n \n # Configure model and tokens. Using Sonnet 3.5 as default.\n # ANTHROPIC_MODEL and ANTHROPIC_MAX_TOKENS would be new ENVs.\n self._configure_model_and_tokens(\n os.environ.get(\"ANTHROPIC_MODEL\", \"claude-3-5-sonnet-20240620\"),\n os.environ.get(\"ANTHROPIC_MAX_TOKENS\", \"4096\") # Default max tokens for Sonnet 3.5\n )\n\n def _configure_model_and_tokens(self, model_name, max_tokens_str, default_max_tokens=4096):\n self.model = model_name if model_name else \"claude-3-5-sonnet-20240620\"\n try:\n # Anthropic\'s max_tokens is an integer.\n self.max_tokens = int(max_tokens_str) if max_tokens_str is not None else default_max_tokens\n except ValueError:\n logging.error(f\"Invalid value for Anthropic max_tokens: {max_tokens_str}. Using default {default_max_tokens}.\")\n self.max_tokens = default_max_tokens\n logging.info(f\"Configured to use Anthropic model: {self.model} with max_tokens: {self.max_tokens}\")\n\n def get_system_prompt_description(self) -> str:\n system_prompt_path = os.getenv(\"SYSTEM_PROMPT_PATH\")\n if system_prompt_path and os.path.isfile(system_prompt_path):\n return f\"System Prompt File: {os.path.basename(system_prompt_path)}\"\n elif system_prompt_path:\n return f\"System Prompt File: {os.path.basename(system_prompt_path)} (Not found at path: {system_prompt_path})\"\n else:\n return \"System Prompt File: Not configured (SYSTEM_PROMPT_PATH not set).\"\n\n def get_llm_description(self) -> str:\n return f\"LLM: {self.model}, Max Tokens: {self.max_tokens}\"\n\n def get_chat_response(self, messages_history):\n # Anthropic expects messages in a specific format.\n # Filter out any non-compliant messages or transform if necessary.\n # For simplicity, assume messages_history is already compliant.\n \n # Ensure self.system_prompt is a string, not None.\n current_system_prompt = self.system_prompt if self.system_prompt else \"\"\n\n anthropic_tools = []\n if hasattr(self, \'functions\') and self.functions:\n anthropic_tools = [\n {\n \"name\": function[\'name\'],\n \"description\": function[\'description\'],\n \"input_schema\": function[\'parameters\'] if function[\'parameters\'] not in [None, {}] else {\"type\": \"object\", \"properties\": {}}\n }\n for function in self.functions\n ]\n \n try:\n response = self.anthropic_client.messages.create(\n model=self.model,\n system=current_system_prompt, # System prompt is a top-level parameter\n messages=messages_history, # User/assistant turns\n max_tokens=self.max_tokens, # Use configured max_tokens\n tools=anthropic_tools if anthropic_tools else None,\n tool_choice={\"type\": \"auto\"} if anthropic_tools else None\n )\n return response\n except Exception as e:\n logging.error(f\"Anthropic API call failed: {e}\")\n # Consider how to propagate this error: re-raise or return a specific error structure\n raise # Re-raising for now\n\n async def handle_message(self, user_id, user_message):\n if user_id not in self.conversation_history:\n self.conversation_history[user_id] = []\n # Anthropic doesn\'t use a \"system\" role message in the main `messages` list\n # It\'s passed as a separate `system` parameter to `messages.create`.\n # So, no need to add system_prompt to conversation_history here.\n\n self.conversation_history[user_id].append({\"role\": \"user\", \"content\": user_message})\n \n # Make a mutable copy for this turn, as we\'ll be appending tool calls and responses\n current_turn_messages = list(self.conversation_history[user_id])\n\n MAX_TOOL_ITERATIONS = 5\n tool_use_count = 0\n assistant_response_content = \"\" # Stores the textual part of the assistant's response\n\n while tool_use_count < MAX_TOOL_ITERATIONS:\n response = self.get_chat_response(current_turn_messages)\n\n if not response or not response.content:\n logging.error(\"No valid response content from Anthropic LLM.\")\n # Update conversation history with this attempt before returning an error message\n self.conversation_history[user_id] = current_turn_messages\n return \"Error: Could not get a valid response from the LLM.\"\n\n # Process response.content, which is a list of blocks (e.g., text, tool_use)\n # The assistant's complete turn (text and tool requests) forms one message.\n assistant_current_turn_content_blocks = response.content\n current_turn_messages.append({\"role\": \"assistant\", \"content\": assistant_current_turn_content_blocks})\n\n # Extract text parts and tool calls from the assistant's last message\n text_parts_from_assistant = []\n tool_calls_from_response = []\n for block in assistant_current_turn_content_blocks:\n if block.type == \"text\":\n text_parts_from_assistant.append(block.text)\n elif block.type == \"tool_use\":\n tool_calls_from_response.append(block)\n \n assistant_response_content = \"\".join(text_parts_from_assistant) # Update with latest text\n\n if not tool_calls_from_response: # No tools called, this is the final response from assistant\n break \n\n # If there are tool calls, prepare and add their results\n tool_results_for_model = []\n for tool_call in tool_calls_from_response:\n tool_name = tool_call.name\n tool_input = tool_call.input # This is a dict\n tool_use_id = tool_call.id\n \n logging.info(f\"Attempting to call Anthropic tool: {tool_name} with input: {tool_input}\")\n try:\n # self.call_tool should ideally return a JSON-serializable object or a string.\n # Anthropic expects tool result content to be a list of content blocks (e.g. text, image) or string.\n # For simplicity, we'll assume call_tool returns a string or a dict/list that can be made into a text block.\n tool_response_data = self.call_tool(tool_name, tool_input) \n \n # Format the tool response for Anthropic\n if isinstance(tool_response_data, str):\n tool_result_content_block = [{\"type\": \"text\", \"text\": tool_response_data}]\n elif isinstance(tool_response_data, dict) or isinstance(tool_response_data, list):\n # If it's already a list of blocks, use directly. Otherwise, wrap in text block.\n try:\n # Attempt to treat as pre-formatted list of blocks\n # This is unlikely unless call_tool is specifically designed for Anthropic output\n # A more robust way would be to json.dumps dict/list and put in a text block\n tool_result_content_block = tool_response_data if isinstance(tool_response_data, list) and all(isinstance(i, dict) and \"type\" in i for i in tool_response_data) else [{\"type\": \"text\", \"text\": json.dumps(tool_response_data)}]\n except TypeError:\ # Not easily serializable to JSON in a text block\n tool_result_content_block = [{\"type\": \"text\", \"text\": str(tool_response_data)}]\n else:\n tool_result_content_block = [{\"type\": \"text\", \"text\": str(tool_response_data)}]\n\n tool_results_for_model.append({\n \"type\": \"tool_result\",\n \"tool_use_id\": tool_use_id,\n \"content\": tool_result_content_block \n })\n except Exception as e:\n logging.error(f\"Error calling tool {tool_name}: {e}\")\n tool_results_for_model.append({\n \"type\": \"tool_result\",\n \"tool_use_id\": tool_use_id,\n \"content\": [{\"type\": \"text\", \"text\": f\"Error executing tool {tool_name}: {str(e)}\"}],\n \"is_error\": True \n })\n \n # Add tool results to the history for the next call to the model\n # This message has role \"user\" and content is a list of tool_result blocks.\n current_turn_messages.append({\"role\": \"user\", \"content\": tool_results_for_model})\n \n tool_use_count += 1\n if tool_use_count >= MAX_TOOL_ITERATIONS:\n logging.warning(f\"Max tool iterations ({MAX_TOOL_ITERATIONS}) reached for Anthropic.\")\n # The assistant_response_content already holds the text from the turn *before* this iteration failed to complete tools.\n break\n \n # Persist the full turn history (including successful tool exchanges) \n self.conversation_history[user_id] = current_turn_messages\n \n # Manage conversation history length\n if len(self.conversation_history[user_id]) > 20: # Consider a more robust token-based limit\n # A more sophisticated truncation would ensure the message list integrity (e.g. not cutting mid-tool-exchange)\n self.conversation_history[user_id] = self.conversation_history[user_id][-20:]\n\n # Return the last accumulated textual content from the assistant. This would be the text from the\n # assistant's turn before breaking the loop (either due to no more tools, or max iterations).\n if assistant_response_content:\n return assistant_response_content\n else:\n # Fallback: if assistant_response_content is empty (e.g. error on first API call, or only tool_use)\n # Try to get the *very last* text block from the *very last* assistant message in history.\n if current_turn_messages:\n last_message = current_turn_messages[-1]\n if last_message[\"role\"] == \"assistant\" and isinstance(last_message[\"content\"], list):\n for block in reversed(last_message[\"content\"]):\n if block.type == \"text\":\n return block.text\n return \"No textual response from assistant.\"\n\n\n async def start(self):\n logging.info(\"Anthropic Bot started\")\n # super().start() if applicable\n\n async def clear(self, user_id):\n super().clear_conversation(user_id)\n logging.info(f\"Cleared conversation history for Anthropic bot, user {user_id}\")\n\n # status() method is inherited from BaseTelegramInferenceBot\n\n async def abort_processing(self, user_id):\n if user_id in self.processing_status: # Relies on processing_status from Base\n self.processing_status[user_id][\"processing\"] = False\n await self.clear(user_id)\n return \"Processing aborted and conversation cleared.\"\n else:\n await self.clear(user_id)\n return \"No active processing found to abort. Conversation cleared.\"\n\n async def switch_model(self):\n primary_model = os.environ.get(\"ANTHROPIC_MODEL\", \"claude-3-5-sonnet-20240620\")\n primary_max_tokens = os.environ.get(\"ANTHROPIC_MAX_TOKENS\", \"4096\")\n \n secondary_model_env = os.environ.get(\"ANTHROPIC_SECONDARY_MODEL\")\n secondary_max_tokens_env = os.environ.get(\"ANTHROPIC_SECONDARY_MAX_TOKENS\")\n\n if not secondary_model_env: \n logging.warning(\"ANTHROPIC_SECONDARY_MODEL not defined. Cannot switch model.\")\n return f\"Model switching not configured. Currently using {self.model}.\"\n\n # Determine target model and tokens\n if self.model == primary_model:\n target_model = secondary_model_env\n # Use secondary token ENV if set, else a reasonable default, or primary's if makes sense\n target_max_tokens = secondary_max_tokens_env if secondary_max_tokens_env else \"2048\" \n else: # Assumes current is secondary or an unknown model, switch to primary\n target_model = primary_model\n target_max_tokens = primary_max_tokens\n \n self._configure_model_and_tokens(target_model, target_max_tokens)\n logging.info(f\"Switched Anthropic model to: {self.model}\")\n return f\"Switched to Anthropic model: {self.model}\"\n\ndef main():\n if not os.environ.get(\"ANTHROPIC_API_KEY\"):\n logging.error(\"FATAL: ANTHROPIC_API_KEY environment variable not set.\")\n return\n\n logging.basicConfig(level=logging.INFO, format=\'%(asctime)s - %(name)s - %(levelname)s - %(message)s\')\n \n bot = AnthropicTelegramInferenceBot()\n telegram_helper = TelegramHelper(bot)\n telegram_helper.run()\n\nif __name__ == \'__main__\':\n main()\n \ No newline at end of file +import os +import json +import logging +from anthropic import Anthropic +from base_telegram_inference_bot import BaseTelegramInferenceBot +from telegram_helper import TelegramHelper + +# logging.basicConfig(level=logging.INFO) # Usually configured in main execution script + +class AnthropicTelegramInferenceBot(BaseTelegramInferenceBot): + def __init__(self): + super().__init__() + self.anthropic_client = Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY")) + # Note: default_headers for max_tokens with older models might be needed. + # For Claude 3.5 Sonnet, max_tokens is a top-level param in messages.create + + # Configure model and tokens. Using Sonnet 3.5 as default. + # ANTHROPIC_MODEL and ANTHROPIC_MAX_TOKENS would be new ENVs. + self._configure_model_and_tokens( + os.environ.get("ANTHROPIC_MODEL", "claude-3-5-sonnet-20240620"), + os.environ.get("ANTHROPIC_MAX_TOKENS", "4096") # Default max tokens for Sonnet 3.5 + ) + + def _configure_model_and_tokens(self, model_name, max_tokens_str, default_max_tokens=4096): + self.model = model_name if model_name else "claude-3-5-sonnet-20240620" + try: + # Anthropic's max_tokens is an integer. + self.max_tokens = int(max_tokens_str) if max_tokens_str is not None else default_max_tokens + except ValueError: + logging.error(f"Invalid value for Anthropic max_tokens: {max_tokens_str}. Using default {default_max_tokens}.") + self.max_tokens = default_max_tokens + logging.info(f"Configured to use Anthropic model: {self.model} with max_tokens: {self.max_tokens}") + + def get_system_prompt_description(self) -> str: + system_prompt_path = os.getenv("SYSTEM_PROMPT_PATH") + if system_prompt_path and os.path.isfile(system_prompt_path): + return f"System Prompt File: {os.path.basename(system_prompt_path)}" + elif system_prompt_path: + return f"System Prompt File: {os.path.basename(system_prompt_path)} (Not found at path: {system_prompt_path})" + else: + return "System Prompt File: Not configured (SYSTEM_PROMPT_PATH not set)." + + def get_llm_description(self) -> str: + return f"LLM: {self.model}, Max Tokens: {self.max_tokens}" + + def get_chat_response(self, messages_history): + current_system_prompt = self.system_prompt if self.system_prompt else "" + anthropic_tools = [] + if hasattr(self, 'functions') and self.functions: + anthropic_tools = [ + { + "name": function['name'], + "description": function['description'], + "input_schema": function['parameters'] if function['parameters'] not in [None, {}] else {"type": "object", "properties": {}} + } + for function in self.functions + ] + + try: + response = self.anthropic_client.messages.create( + model=self.model, + system=current_system_prompt, + messages=messages_history, + max_tokens=self.max_tokens, + tools=anthropic_tools if anthropic_tools else None, + tool_choice={"type": "auto"} if anthropic_tools else None + ) + return response + except Exception as e: + logging.error(f"Anthropic API call failed: {e}") + raise + + async def handle_message(self, user_id, user_message): + if user_id not in self.conversation_history: + self.conversation_history[user_id] = [] + + self.conversation_history[user_id].append({"role": "user", "content": user_message}) + current_turn_messages = list(self.conversation_history[user_id]) + + MAX_TOOL_ITERATIONS = 5 + tool_use_count = 0 + assistant_response_content = "" + + while tool_use_count < MAX_TOOL_ITERATIONS: + response = self.get_chat_response(current_turn_messages) + + if not response or not response.content: + logging.error("No valid response content from Anthropic LLM.") + self.conversation_history[user_id] = current_turn_messages # Persist what we have + return "Error: Could not get a valid response from the LLM." + + assistant_current_turn_content_blocks = response.content + current_turn_messages.append({"role": "assistant", "content": assistant_current_turn_content_blocks}) + + text_parts_from_assistant = [] + tool_calls_from_response = [] + for block in assistant_current_turn_content_blocks: + if block.type == "text": + text_parts_from_assistant.append(block.text) + elif block.type == "tool_use": + tool_calls_from_response.append(block) + + assistant_response_content = "".join(text_parts_from_assistant) + + if not tool_calls_from_response: + break + + tool_results_for_model = [] + for tool_call in tool_calls_from_response: + tool_name = tool_call.name + tool_input = tool_call.input + tool_use_id = tool_call.id + + logging.info(f"Attempting to call Anthropic tool: {tool_name} with input: {tool_input}") + try: + tool_response_data = self.call_tool(tool_name, tool_input) + + if isinstance(tool_response_data, str): + tool_result_content_block = [{"type": "text", "text": tool_response_data}] + elif isinstance(tool_response_data, dict) or isinstance(tool_response_data, list): + try: + # If tool_response_data is already a list of Anthropic content blocks, use as is. + # Otherwise, dump to JSON string and wrap in a text block. + is_valid_block_list = isinstance(tool_response_data, list) and all(isinstance(item, dict) and "type" in item for item in tool_response_data) + if is_valid_block_list: + tool_result_content_block = tool_response_data + else: + tool_result_content_block = [{"type": "text", "text": json.dumps(tool_response_data)}] + except (TypeError, json.JSONDecodeError): # Not easily serializable or not a valid block list + tool_result_content_block = [{"type": "text", "text": str(tool_response_data)}] + else: # bool, int, float, None, etc. + tool_result_content_block = [{"type": "text", "text": str(tool_response_data)}] + + tool_results_for_model.append({ + "type": "tool_result", + "tool_use_id": tool_use_id, + "content": tool_result_content_block + }) + except Exception as e: + logging.error(f"Error calling tool {tool_name}: {e}") + tool_results_for_model.append({ + "type": "tool_result", + "tool_use_id": tool_use_id, + "content": [{"type": "text", "text": f"Error executing tool {tool_name}: {str(e)}"}], + "is_error": True + }) + + current_turn_messages.append({"role": "user", "content": tool_results_for_model}) + + tool_use_count += 1 + if tool_use_count >= MAX_TOOL_ITERATIONS: + logging.warning(f"Max tool iterations ({MAX_TOOL_ITERATIONS}) reached for Anthropic.") + break + + self.conversation_history[user_id] = current_turn_messages + + if len(self.conversation_history[user_id]) > 20: + self.conversation_history[user_id] = self.conversation_history[user_id][-20:] + + if assistant_response_content: # Text from the last successful assistant turn (or before max iterations) + return assistant_response_content + else: # Fallback if no text content was generated by assistant (e.g. initial error, or only tool use) + if current_turn_messages: + # Try to get the *very last* text block from the *very last* assistant message in history. + last_message_in_turn = current_turn_messages[-1] + if last_message_in_turn.get("role") == "assistant" and isinstance(last_message_in_turn.get("content"), list): + for block in reversed(last_message_in_turn["content"]): + if block.type == "text": + return block.text + return "No textual response from assistant." + + + async def start(self): + logging.info("Anthropic Bot started") + + async def clear(self, user_id): + super().clear_conversation(user_id) + logging.info(f"Cleared conversation history for Anthropic bot, user {user_id}") + + async def abort_processing(self, user_id): + if user_id in self.processing_status: + self.processing_status[user_id]["processing"] = False + await self.clear(user_id) + return "Processing aborted and conversation cleared." + else: + await self.clear(user_id) + return "No active processing found to abort. Conversation cleared." + + async def switch_model(self): + primary_model = os.environ.get("ANTHROPIC_MODEL", "claude-3-5-sonnet-20240620") + primary_max_tokens = os.environ.get("ANTHROPIC_MAX_TOKENS", "4096") + + secondary_model_env = os.environ.get("ANTHROPIC_SECONDARY_MODEL") + secondary_max_tokens_env = os.environ.get("ANTHROPIC_SECONDARY_MAX_TOKENS") + + if not secondary_model_env: + logging.warning("ANTHROPIC_SECONDARY_MODEL not defined. Cannot switch model.") + return f"Model switching not configured. Currently using {self.model}." + + if self.model == primary_model: + target_model = secondary_model_env + target_max_tokens = secondary_max_tokens_env if secondary_max_tokens_env else "2048" + else: + target_model = primary_model + target_max_tokens = primary_max_tokens + + self._configure_model_and_tokens(target_model, target_max_tokens) + logging.info(f"Switched Anthropic model to: {self.model}") + return f"Switched to Anthropic model: {self.model}" + +def main(): + if not os.environ.get("ANTHROPIC_API_KEY"): + logging.error("FATAL: ANTHROPIC_API_KEY environment variable not set.") + return + + logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') + + bot = AnthropicTelegramInferenceBot() + telegram_helper = TelegramHelper(bot) + telegram_helper.run() + +if __name__ == '__main__': + main()