diff --git a/anthropic_telegram_inference_bot.py b/anthropic_telegram_inference_bot.py index 294f860..13a487e 100644 --- a/anthropic_telegram_inference_bot.py +++ b/anthropic_telegram_inference_bot.py @@ -3,25 +3,48 @@ import json import logging from anthropic import Anthropic, APIError, RateLimitError from base_telegram_inference_bot import BaseTelegramInferenceBot -from telegram_helper import TelegramHelper +from telegram_helper import TelegramHelper # Used in main, not class class AnthropicTelegramInferenceBot(BaseTelegramInferenceBot): - def __init__(self): - super().__init__() - self.anthropic_client = Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY")) - - # Initialize with the small model by default - self.small_model_name = os.environ.get("ANTHROPIC_SMALL_MODEL", "claude-3-haiku-20240307") - self.small_model_max_tokens = os.environ.get("ANTHROPIC_SMALL_MODEL_MAX_TOKENS", "2048") - self.large_model_name = os.environ.get("ANTHROPIC_LARGE_MODEL", "claude-3-opus-20240229") - self.large_model_max_tokens = os.environ.get("ANTHROPIC_LARGE_MODEL_MAX_TOKENS", "4096") + DEFAULT_SMALL_MODEL_NAME = "claude-3-haiku-20240307" + DEFAULT_SMALL_MODEL_MAX_TOKENS = "2048" + DEFAULT_LARGE_MODEL_NAME = "claude-3-opus-20240229" + DEFAULT_LARGE_MODEL_MAX_TOKENS = "4096" + def __init__( + self, + anthropic_client: Anthropic | None = None, + api_key: str | None = None, + small_model_name: str | None = None, + small_model_max_tokens: str | None = None, + large_model_name: str | None = None, + large_model_max_tokens: str | None = None, + system_prompt_content: str | None = None, + system_prompt_path: str | None = None + ): + super().__init__(system_prompt_content=system_prompt_content, system_prompt_path=system_prompt_path) + + if anthropic_client: + self.anthropic_client = anthropic_client + else: + _api_key = api_key or os.environ.get("ANTHROPIC_API_KEY") + if not _api_key: + raise ValueError("Anthropic API key must be provided either via argument or ANTHROPIC_API_KEY environment variable.") + self.anthropic_client = Anthropic(api_key=_api_key) + + self.small_model_name = small_model_name or os.environ.get("ANTHROPIC_SMALL_MODEL") or self.DEFAULT_SMALL_MODEL_NAME + self.small_model_max_tokens_str = small_model_max_tokens or os.environ.get("ANTHROPIC_SMALL_MODEL_MAX_TOKENS") or self.DEFAULT_SMALL_MODEL_MAX_TOKENS + self.large_model_name = large_model_name or os.environ.get("ANTHROPIC_LARGE_MODEL") or self.DEFAULT_LARGE_MODEL_NAME + self.large_model_max_tokens_str = large_model_max_tokens or os.environ.get("ANTHROPIC_LARGE_MODEL_MAX_TOKENS") or self.DEFAULT_LARGE_MODEL_MAX_TOKENS + + # Initialize with the small model by default self._configure_model_and_tokens( self.small_model_name, - self.small_model_max_tokens + self.small_model_max_tokens_str, + default_max_tokens=int(self.DEFAULT_SMALL_MODEL_MAX_TOKENS) # pass int for default ) - def _configure_model_and_tokens(self, model_name, max_tokens_str, default_max_tokens=2048): # Default max_tokens adjusted for typical "small" + def _configure_model_and_tokens(self, model_name: str, max_tokens_str: str, default_max_tokens: int = 2048): self.model = model_name try: self.max_tokens = int(max_tokens_str) if max_tokens_str is not None else default_max_tokens @@ -65,17 +88,19 @@ class AnthropicTelegramInferenceBot(BaseTelegramInferenceBot): def _format_tool_response_for_anthropic(self, tool_response_data): if isinstance(tool_response_data, str): + # Wrap plain string in a list of text blocks if not already structured return [{"type": "text", "text": tool_response_data}] + elif isinstance(tool_response_data, list) and all(isinstance(item, dict) and "type" in item for item in tool_response_data): + # Already a list of content blocks + return tool_response_data elif isinstance(tool_response_data, (dict, list)): + # Attempt to JSON dump other dicts/lists if not already in content block format try: - is_valid_block_list = isinstance(tool_response_data, list) and all(isinstance(item, dict) and "type" in item for item in tool_response_data) - if is_valid_block_list: - return tool_response_data - else: - return [{"type": "text", "text": json.dumps(tool_response_data)}] + return [{"type": "text", "text": json.dumps(tool_response_data)}] except (TypeError, json.JSONDecodeError): - return [{"type": "text", "text": str(tool_response_data)}] + return [{"type": "text", "text": str(tool_response_data)}] # Fallback to string else: + # Fallback for other types (int, float, etc.) return [{"type": "text", "text": str(tool_response_data)}] async def handle_message(self, user_id, user_message): @@ -87,14 +112,14 @@ class AnthropicTelegramInferenceBot(BaseTelegramInferenceBot): MAX_TOOL_ITERATIONS = 5 tool_use_count = 0 - assistant_response_content = "" + assistant_response_content = "" while tool_use_count < MAX_TOOL_ITERATIONS: response = self.get_chat_response(current_turn_messages) if not response or not response.content: logging.error("No valid response content from Anthropic LLM.") - self.conversation_history[user_id] = current_turn_messages + self.conversation_history[user_id] = current_turn_messages # Save current state return "Error: Could not get a valid response from the LLM." assistant_current_turn_content_blocks = response.content @@ -111,23 +136,22 @@ class AnthropicTelegramInferenceBot(BaseTelegramInferenceBot): assistant_response_content = "".join(text_parts_from_assistant) if not tool_calls_from_response: - break + break tool_results_for_model = [] for tool_call in tool_calls_from_response: tool_name = tool_call.name - tool_input = tool_call.input + tool_input = tool_call.input tool_use_id = tool_call.id logging.info(f"Attempting to call Anthropic tool: {tool_name} with input: {tool_input}") try: - tool_response_data = self.call_tool(tool_name, tool_input) + tool_response_data = self.call_tool(tool_name, tool_input) tool_result_content_block = self._format_tool_response_for_anthropic(tool_response_data) - tool_results_for_model.append({ "type": "tool_result", "tool_use_id": tool_use_id, - "content": tool_result_content_block + "content": tool_result_content_block }) except Exception as e: logging.error(f"Error calling tool {tool_name}: {e}") @@ -135,14 +159,18 @@ class AnthropicTelegramInferenceBot(BaseTelegramInferenceBot): "type": "tool_result", "tool_use_id": tool_use_id, "content": [{"type": "text", "text": f"Error executing tool {tool_name}: {str(e)}"}], - "is_error": True + "is_error": True }) - current_turn_messages.append({"role": "user", "content": tool_results_for_model}) + current_turn_messages.append({"role": "user", "content": tool_results_for_model}) # Anthropic expects tool results as a user message tool_use_count += 1 if tool_use_count >= MAX_TOOL_ITERATIONS: logging.warning(f"Max tool iterations ({MAX_TOOL_ITERATIONS}) reached for Anthropic.") + # Update assistant_response_content with any text from the last assistant turn before breaking + if not assistant_response_content and text_parts_from_assistant: + assistant_response_content = "".join(text_parts_from_assistant) + assistant_response_content += "\n[Max tool iterations reached]" break self.conversation_history[user_id] = current_turn_messages @@ -153,70 +181,89 @@ class AnthropicTelegramInferenceBot(BaseTelegramInferenceBot): if assistant_response_content: return assistant_response_content else: + # Fallback if no text parts were found but there was an assistant message if current_turn_messages: last_message_in_turn = current_turn_messages[-1] + # Check if the last message content has text blocks (Anthropic specific structure) if last_message_in_turn.get("role") == "assistant" and isinstance(last_message_in_turn.get("content"), list): for block in reversed(last_message_in_turn["content"]): - if block.type == "text": - return block.text - return "No textual response from assistant." - + if block.type == "text" and hasattr(block, 'text') and block.text: + return block.text # Return the first non-empty text found from the end + return "No textual response generated by the assistant after processing." # More informative default async def start(self): logging.info("Anthropic Bot started") - async def clear_conversation_history(self, user_id): - super().clear_conversation_history(user_id) - logging.info(f"Cleared conversation history for Anthropic bot, user {user_id}") + # clear_conversation_history is inherited from BaseTelegramInferenceBot and calls super().clear_conversation_history + # No need to override if the base implementation is sufficient, unless specific logging is needed. + # async def clear_conversation_history(self, user_id): + # super().clear_conversation_history(user_id) + # logging.info(f"Cleared conversation history for Anthropic bot, user {user_id}") async def abort_processing(self, user_id): + # This abort is a soft abort, as actual Anthropic API call is synchronous within handle_message + # It primarily clears state and prevents further processing in the bot's loop if any. if user_id in self.processing_status: - self.processing_status[user_id]["processing"] = False - await self.clear_conversation_history(user_id) - return "Processing aborted and conversation cleared." - else: - await self.clear_conversation_history(user_id) - return "No active processing found to abort. Conversation cleared." + self.processing_status[user_id]["processing"] = False # Mark as not processing + # self.clear_processing_status(user_id) # Use base class method to remove entry + # Clearing history might be too aggressive for a simple abort, depends on desired UX + # For now, let's just stop processing and clear the flag. + # Consider if conversation history should be cleared here or if that is a separate user action. + # super().clear_conversation_history(user_id) # Moved to be less aggressive + logging.info(f"Abort requested for user {user_id}. Processing flag cleared.") + return "Processing aborted. You can send a new message or /clear the conversation." async def switch_model(self): - # Ensure ANTHROPIC_SMALL_MODEL and ANTHROPIC_LARGE_MODEL related env vars are loaded in __init__ - # or ensure they are freshly checked here if they can change during runtime (less common for model names). - # For this implementation, we rely on the values stored during __init__. - if not self.small_model_name or not self.large_model_name: logging.warning("Small or Large model names for Anthropic are not defined. Cannot switch model.") return f"Model switching not fully configured. Currently using {self.model}." - if self.model == self.small_model_name: + current_is_small = self.model == self.small_model_name + current_is_large = self.model == self.large_model_name + + if current_is_small: target_model = self.large_model_name - target_max_tokens = self.large_model_max_tokens - # Use default large max_tokens if specific one isn't set or invalid - default_max_tokens_for_large = "4096" - elif self.model == self.large_model_name: + target_max_tokens_str = self.large_model_max_tokens_str + default_target_max_tokens = int(self.DEFAULT_LARGE_MODEL_MAX_TOKENS) + elif current_is_large: target_model = self.small_model_name - target_max_tokens = self.small_model_max_tokens - # Use default small max_tokens if specific one isn't set or invalid - default_max_tokens_for_large = "2048" + target_max_tokens_str = self.small_model_max_tokens_str + default_target_max_tokens = int(self.DEFAULT_SMALL_MODEL_MAX_TOKENS) else: - # Current model is neither the designated small nor large, switch to small as a reset - logging.warning(f"Current model {self.model} is neither the configured small nor large model. Switching to small model.") + logging.warning(f"Current model {self.model} is unrecognized. Switching to default small model.") target_model = self.small_model_name - target_max_tokens = self.small_model_max_tokens - default_max_tokens_for_large = "2048" + target_max_tokens_str = self.small_model_max_tokens_str + default_target_max_tokens = int(self.DEFAULT_SMALL_MODEL_MAX_TOKENS) - - self._configure_model_and_tokens(target_model, target_max_tokens, default_max_tokens=int(default_max_tokens_for_large)) # Pass appropriate default + self._configure_model_and_tokens(target_model, target_max_tokens_str, default_max_tokens=default_target_max_tokens) logging.info(f"Switched Anthropic model to: {self.model}") - return f"Switched to Anthropic model: {self.model} (Max Tokens: {self.max_tokens})"#Provide token info + return f"Switched to Anthropic model: {self.model} (Max Tokens: {self.max_tokens})" + +# The main function is for standalone execution and basic testing, not part of the class itself. +# It's good practice to update it to reflect changes if you use it for quick tests. +# For unit tests, we'll instantiate the class with mocked dependencies. def main(): - if not os.environ.get("ANTHROPIC_API_KEY"): - logging.error("FATAL: ANTHROPIC_API_KEY environment variable not set.") - return - logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') - bot = AnthropicTelegramInferenceBot() + # Example of how to instantiate with new constructor (assuming API key is in ENV for this example) + # For real tests, you'd mock Anthropic() or pass a mock client. + try: + # These would typically come from a config file or CLI args in a real app if not ENV + # For this example, we rely on ENV or defaults being handled by constructor if not provided. + bot = AnthropicTelegramInferenceBot( + api_key=os.environ.get("ANTHROPIC_API_KEY") # Explicitly pass, or let constructor handle ENV + ) + except ValueError as e: + logging.error(f"Failed to initialize bot: {e}") + return + except Exception as e: # Catch any other init errors + logging.error(f"An unexpected error occurred during bot initialization: {e}") + return + + # TelegramHelper also updated, ensure it's instantiated correctly for this main context. + # For this basic main, we might not pass all configurable paths to TelegramHelper, + # letting them use defaults. telegram_helper = TelegramHelper(bot) telegram_helper.run()