Refactor anthropic_telegram_inference_bot.py: Improve tool response formatting, enhance API error handling, and align with BaseTelegramInferenceBot.

This commit is contained in:
cyclop-bot
2025-06-02 14:56:47 -05:00
parent 1f5fbc3db8
commit 6a54035c51
+31 -45
View File
@@ -1,45 +1,29 @@
import os
import json
import logging
from anthropic import Anthropic
from anthropic import Anthropic, APIError, RateLimitError
from base_telegram_inference_bot import BaseTelegramInferenceBot
from telegram_helper import TelegramHelper
# logging.basicConfig(level=logging.INFO) # Usually configured in main execution script
class AnthropicTelegramInferenceBot(BaseTelegramInferenceBot):
def __init__(self):
super().__init__()
self.anthropic_client = Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY"))
# Note: default_headers for max_tokens with older models might be needed.
# For Claude 3.5 Sonnet, max_tokens is a top-level param in messages.create
# Configure model and tokens. Using Sonnet 3.5 as default.
# ANTHROPIC_MODEL and ANTHROPIC_MAX_TOKENS would be new ENVs.
self._configure_model_and_tokens(
os.environ.get("ANTHROPIC_MODEL", "claude-3-5-sonnet-20240620"),
os.environ.get("ANTHROPIC_MAX_TOKENS", "4096") # Default max tokens for Sonnet 3.5
os.environ.get("ANTHROPIC_MAX_TOKENS", "4096")
)
def _configure_model_and_tokens(self, model_name, max_tokens_str, default_max_tokens=4096):
self.model = model_name if model_name else "claude-3-5-sonnet-20240620"
try:
# Anthropic's max_tokens is an integer.
self.max_tokens = int(max_tokens_str) if max_tokens_str is not None else default_max_tokens
except ValueError:
logging.error(f"Invalid value for Anthropic max_tokens: {max_tokens_str}. Using default {default_max_tokens}.")
self.max_tokens = default_max_tokens
logging.info(f"Configured to use Anthropic model: {self.model} with max_tokens: {self.max_tokens}")
def get_system_prompt_description(self) -> str:
system_prompt_path = os.getenv("SYSTEM_PROMPT_PATH")
if system_prompt_path and os.path.isfile(system_prompt_path):
return f"System Prompt File: {os.path.basename(system_prompt_path)}"
elif system_prompt_path:
return f"System Prompt File: {os.path.basename(system_prompt_path)} (Not found at path: {system_prompt_path})"
else:
return "System Prompt File: Not configured (SYSTEM_PROMPT_PATH not set)."
def get_llm_description(self) -> str:
return f"LLM: {self.model}, Max Tokens: {self.max_tokens}"
@@ -66,9 +50,27 @@ class AnthropicTelegramInferenceBot(BaseTelegramInferenceBot):
tool_choice={"type": "auto"} if anthropic_tools else None
)
return response
except Exception as e:
logging.error(f"Anthropic API call failed: {e}")
except (APIError, RateLimitError) as e:
logging.error(f"Anthropic API error: {e}")
raise
except Exception as e:
logging.error(f"An unexpected error occurred during Anthropic API call: {e}")
raise
def _format_tool_response_for_anthropic(self, tool_response_data):
if isinstance(tool_response_data, str):
return [{"type": "text", "text": tool_response_data}]
elif isinstance(tool_response_data, (dict, list)):
try:
is_valid_block_list = isinstance(tool_response_data, list) and all(isinstance(item, dict) and "type" in item for item in tool_response_data)
if is_valid_block_list:
return tool_response_data
else:
return [{"type": "text", "text": json.dumps(tool_response_data)}]
except (TypeError, json.JSONDecodeError):
return [{"type": "text", "text": str(tool_response_data)}]
else:
return [{"type": "text", "text": str(tool_response_data)}]
async def handle_message(self, user_id, user_message):
if user_id not in self.conversation_history:
@@ -86,7 +88,7 @@ class AnthropicTelegramInferenceBot(BaseTelegramInferenceBot):
if not response or not response.content:
logging.error("No valid response content from Anthropic LLM.")
self.conversation_history[user_id] = current_turn_messages # Persist what we have
self.conversation_history[user_id] = current_turn_messages
return "Error: Could not get a valid response from the LLM."
assistant_current_turn_content_blocks = response.content
@@ -114,22 +116,7 @@ class AnthropicTelegramInferenceBot(BaseTelegramInferenceBot):
logging.info(f"Attempting to call Anthropic tool: {tool_name} with input: {tool_input}")
try:
tool_response_data = self.call_tool(tool_name, tool_input)
if isinstance(tool_response_data, str):
tool_result_content_block = [{"type": "text", "text": tool_response_data}]
elif isinstance(tool_response_data, dict) or isinstance(tool_response_data, list):
try:
# If tool_response_data is already a list of Anthropic content blocks, use as is.
# Otherwise, dump to JSON string and wrap in a text block.
is_valid_block_list = isinstance(tool_response_data, list) and all(isinstance(item, dict) and "type" in item for item in tool_response_data)
if is_valid_block_list:
tool_result_content_block = tool_response_data
else:
tool_result_content_block = [{"type": "text", "text": json.dumps(tool_response_data)}]
except (TypeError, json.JSONDecodeError): # Not easily serializable or not a valid block list
tool_result_content_block = [{"type": "text", "text": str(tool_response_data)}]
else: # bool, int, float, None, etc.
tool_result_content_block = [{"type": "text", "text": str(tool_response_data)}]
tool_result_content_block = self._format_tool_response_for_anthropic(tool_response_data)
tool_results_for_model.append({
"type": "tool_result",
@@ -157,11 +144,10 @@ class AnthropicTelegramInferenceBot(BaseTelegramInferenceBot):
if len(self.conversation_history[user_id]) > 20:
self.conversation_history[user_id] = self.conversation_history[user_id][-20:]
if assistant_response_content: # Text from the last successful assistant turn (or before max iterations)
if assistant_response_content:
return assistant_response_content
else: # Fallback if no text content was generated by assistant (e.g. initial error, or only tool use)
else:
if current_turn_messages:
# Try to get the *very last* text block from the *very last* assistant message in history.
last_message_in_turn = current_turn_messages[-1]
if last_message_in_turn.get("role") == "assistant" and isinstance(last_message_in_turn.get("content"), list):
for block in reversed(last_message_in_turn["content"]):
@@ -173,17 +159,17 @@ class AnthropicTelegramInferenceBot(BaseTelegramInferenceBot):
async def start(self):
logging.info("Anthropic Bot started")
async def clear(self, user_id):
super().clear_conversation(user_id)
async def clear_conversation_history(self, user_id):
super().clear_conversation_history(user_id)
logging.info(f"Cleared conversation history for Anthropic bot, user {user_id}")
async def abort_processing(self, user_id):
if user_id in self.processing_status:
self.processing_status[user_id]["processing"] = False
await self.clear(user_id)
await self.clear_conversation_history(user_id)
return "Processing aborted and conversation cleared."
else:
await self.clear(user_id)
await self.clear_conversation_history(user_id)
return "No active processing found to abort. Conversation cleared."
async def switch_model(self):
@@ -200,7 +186,7 @@ class AnthropicTelegramInferenceBot(BaseTelegramInferenceBot):
if self.model == primary_model:
target_model = secondary_model_env
target_max_tokens = secondary_max_tokens_env if secondary_max_tokens_env else "2048"
else:
else:
target_model = primary_model
target_max_tokens = primary_max_tokens