From f5b75f77caa8819aff22ee5416341bcb3c3ab007 Mon Sep 17 00:00:00 2001 From: cyclop-bot <178948048+cyclop-bot@users.noreply.github.com> Date: Mon, 2 Jun 2025 14:30:08 -0500 Subject: [PATCH 1/5] feat: Enhance status command to show system prompt and LLM info (base bot) --- base_telegram_inference_bot.py | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/base_telegram_inference_bot.py b/base_telegram_inference_bot.py index 3e4709d..5979c0b 100644 --- a/base_telegram_inference_bot.py +++ b/base_telegram_inference_bot.py @@ -63,7 +63,24 @@ class BaseTelegramInferenceBot(ABC): for function in tool.get_functions(): if function["function"]["name"] == function_name: return tool.execute(function_name, **function_args) - + + @abstractmethod + def get_system_prompt_description(self) -> str: + """Returns a description of the system prompt being used.""" + pass + + @abstractmethod + def get_llm_description(self) -> str: + """Returns a description of the LLM being used.""" + pass + + async def status(self) -> str: # Changed from abstract to concrete + """Provides a status message including prompt and LLM information.""" + prompt_desc = self.get_system_prompt_description() + llm_desc = self.get_llm_description() + # Consider potential async calls if get_... methods were async + # For now, assuming they are synchronous as per design + return f"{prompt_desc}\n{llm_desc}" @abstractmethod async def start(self): @@ -73,10 +90,6 @@ class BaseTelegramInferenceBot(ABC): async def clear(self, user_id): pass - @abstractmethod - async def status(self): - pass - @abstractmethod async def abort_processing(self, user_id): - pass \ No newline at end of file + pass From d1c8693cc40af1af0c71d7463a7ed33a66ed2793 Mon Sep 17 00:00:00 2001 From: cyclop-bot <178948048+cyclop-bot@users.noreply.github.com> Date: Mon, 2 Jun 2025 14:31:30 -0500 Subject: [PATCH 2/5] feat: Implement prompt/LLM status and refine tool handling (Gemini bot) --- gemini_telegram_inference_bot.py | 154 ++++++++++++++++++++----------- 1 file changed, 100 insertions(+), 54 deletions(-) diff --git a/gemini_telegram_inference_bot.py b/gemini_telegram_inference_bot.py index 4e59b7d..fccde2f 100644 --- a/gemini_telegram_inference_bot.py +++ b/gemini_telegram_inference_bot.py @@ -1,12 +1,11 @@ import json import os import logging -from base_telegram_inference_bot import BaseTelegramInferenceBot # Assuming this base class exists -from telegram_helper import TelegramHelper # Assuming this helper class exists +from base_telegram_inference_bot import BaseTelegramInferenceBot +from telegram_helper import TelegramHelper # This import might be unused if main() is removed or TelegramHelper is not directly instantiated here. from openai import OpenAI -# Ensure basic logging is configured if not done elsewhere -# logging.basicConfig(level=logging.INFO) # Example: You might have a more sophisticated setup +# logging.basicConfig(level=logging.INFO) # Usually configured in main execution script class GeminiTelegramInferenceBot(BaseTelegramInferenceBot): def __init__(self): @@ -14,12 +13,12 @@ class GeminiTelegramInferenceBot(BaseTelegramInferenceBot): self.client = OpenAI(api_key=os.environ.get("GEMINI_API_KEY"), base_url=os.environ.get("GEMINI_API_BASE_URL")) self._configure_model_and_tokens( - os.environ.get("GEMINI_SMALL_MODEL"), # Default model - os.environ.get("GEMINI_SMALL_MODEL_MAX_TOKENS") # Default tokens + os.environ.get("GEMINI_SMALL_MODEL"), + os.environ.get("GEMINI_SMALL_MODEL_MAX_TOKENS") ) def _configure_model_and_tokens(self, model_name, max_tokens_str, default_max_tokens=1000): - self.model = model_name + self.model = model_name if model_name else "default-gemini-model" # Ensure model has a default try: self.max_tokens = int(max_tokens_str) if max_tokens_str is not None else default_max_tokens except ValueError: @@ -27,11 +26,23 @@ class GeminiTelegramInferenceBot(BaseTelegramInferenceBot): self.max_tokens = default_max_tokens logging.info(f"Configured to use model: {self.model} with max_tokens: {self.max_tokens}") + def get_system_prompt_description(self) -> str: + system_prompt_path = os.getenv("SYSTEM_PROMPT_PATH") + if system_prompt_path and os.path.isfile(system_prompt_path): + return f"System Prompt File: {os.path.basename(system_prompt_path)}" + elif system_prompt_path: # Path is set but file not found + return f"System Prompt File: {os.path.basename(system_prompt_path)} (Not found at path: {system_prompt_path})" + else: # Path not set + return "System Prompt File: Not configured (SYSTEM_PROMPT_PATH not set)." + + def get_llm_description(self) -> str: + return f"LLM: {self.model}, Max Tokens: {self.max_tokens}" + def get_chat_response(self, messages): try: response = self.client.chat.completions.create( model=self.model, - messages=messages, # The system prompt is expected to be part of messages here + messages=messages, tools=self.functions if hasattr(self, 'functions') and self.functions else None, tool_choice="auto" if hasattr(self, 'functions') and self.functions else None, max_tokens=self.max_tokens @@ -39,6 +50,8 @@ class GeminiTelegramInferenceBot(BaseTelegramInferenceBot): return response except Exception as e: logging.error(f"Gemini API call failed: {e}") + # Return a more structured error or re-raise a custom exception + # For now, re-raising to be handled by the caller raise async def handle_message(self, user_id, user_message): @@ -52,92 +65,125 @@ class GeminiTelegramInferenceBot(BaseTelegramInferenceBot): response = self.get_chat_response(messages) - tool_calls = [] - - for message_part in response.choices: - if message_part.finish_reason == "tool_calls": - tool_calls.extend(message_part.message.tool_calls) + # Ensure response.choices[0].message exists before appending + if response.choices and response.choices[0].message: + messages.append(response.choices[0].message) # Append the assistant's response message + else: + logging.error("No valid response choice message from LLM.") + return "Error: Could not get a valid response from the LLM." + + tool_calls_from_response = [] + if response.choices[0].message.tool_calls: + tool_calls_from_response.extend(response.choices[0].message.tool_calls) - messages.append(response.choices[0].message) - tool_use_count = 0 - while len(tool_calls) > 0 and tool_use_count < 500: - tool_use_results = [] + MAX_TOOL_ITERATIONS = 5 # Define a max to prevent infinite loops more explicitly - while len(tool_calls) > 0: - tool_call_message = tool_calls.pop(0) - tool_call_id = tool_call_message.id - tool_call = tool_call_message.function - tool_response = self.call_tool(tool_call.name, tool_call.arguments) + while tool_calls_from_response and tool_use_count < MAX_TOOL_ITERATIONS: + tool_results_for_model = [] # Results to be sent back to the model + + for tool_call in tool_calls_from_response: + tool_call_id = tool_call.id + function_to_call = tool_call.function + + logging.info(f"Attempting to call tool: {function_to_call.name} with args: {function_to_call.arguments}") try: - tool_use_results.append({"role": "tool", "tool_call_id": tool_call_id, "name":tool_call.name, "content": str(tool_response) }) - except (TypeError, ValueError) as e: - logging.error(f"Failed to serialize tool response: {e}") - tool_use_results.append({"role": "function", "name": tool_call.name, "content": "Serialization error"}) + tool_response_content = self.call_tool(function_to_call.name, function_to_call.arguments) + # Ensure tool_response_content is a string for the API + if not isinstance(tool_response_content, str): + tool_response_content = json.dumps(tool_response_content) + except Exception as e: + logging.error(f"Error calling tool {function_to_call.name}: {e}") + tool_response_content = f"Error executing tool {function_to_call.name}: {str(e)}" + + tool_results_for_model.append({ + "role": "tool", + "tool_call_id": tool_call_id, + "name": function_to_call.name, + "content": tool_response_content + }) - messages.extend(tool_use_results) + messages.extend(tool_results_for_model) # Add tool responses to message history + # Get new response from model based on tool execution results response = self.get_chat_response(messages) - - for message_part in response.choices: - if message_part.finish_reason == "tool_calls": - tool_calls.extend(message_part.message.tool_calls) - - messages.append(response.choices[0].message) + if not (response.choices and response.choices[0].message): + logging.error("No valid response choice message from LLM after tool call.") + return "Error: Could not get a valid response from the LLM after tool call." + messages.append(response.choices[0].message) # Append new assistant message + + # Check for new tool calls + tool_calls_from_response = [] # Reset for this iteration + if response.choices[0].message.tool_calls: + tool_calls_from_response.extend(response.choices[0].message.tool_calls) + tool_use_count += 1 + if tool_use_count >= MAX_TOOL_ITERATIONS and tool_calls_from_response: + logging.warning(f"Max tool iterations ({MAX_TOOL_ITERATIONS}) reached. Returning last assistant message.") + # May need to return a message indicating this to user - if len(self.conversation_history[user_id]) > 2000: + # Conversation history management + if len(self.conversation_history[user_id]) > 2000: # Assuming this limit is for messages, not tokens self.conversation_history[user_id] = self.conversation_history[user_id][-2000:] - return messages[-1].content + # Return the latest assistant content + final_assistant_message = messages[-1] + return final_assistant_message.content if final_assistant_message.role == "assistant" and final_assistant_message.content else "No content in final message." + async def start(self): - logging.info("Bot started") - # Potentially call super().start() if it exists and does something + logging.info("Gemini Bot started") + # super().start() if Base class start() has common logic async def clear(self, user_id): - super().clear_conversation(user_id) + super().clear_conversation(user_id) # Calls base class method - - async def status(self): - return f"Currently using: {self.model}, Max Tokens: {self.max_tokens}" + # status() method is inherited from BaseTelegramInferenceBot async def abort_processing(self, user_id): - # This depends on how processing_status is managed, likely in BaseTelegramInferenceBot - if hasattr(self, 'processing_status') and user_id in self.processing_status: - self.processing_status[user_id]["processing"] = False # Example - await self.clear(user_id) # Clearing conversation on abort might be desired + if user_id in self.processing_status: + self.processing_status[user_id]["processing"] = False + # It's good practice to also clear the conversation for an aborted state + await self.clear(user_id) return "Processing aborted and conversation cleared." else: - # If not tracking processing_status here, just clear for safety + # If no specific status, clearing conversation is a safe default await self.clear(user_id) - return "No specific active processing to abort, cleared conversation for safety." + return "No active processing found to abort. Conversation cleared." async def switch_model(self): current_small_model = os.environ.get("GEMINI_SMALL_MODEL") current_large_model = os.environ.get("GEMINI_LARGE_MODEL") - if self.model == current_small_model: - target_model = current_large_model - target_max_tokens = os.environ.get("GEMINI_LARGE_MODEL_MAX_TOKENS") - else: + # Default to small model if current model is not recognized or if it's the large one + if self.model == current_large_model or self.model != current_small_model : target_model = current_small_model target_max_tokens = os.environ.get("GEMINI_SMALL_MODEL_MAX_TOKENS") + else: # Current is small, switch to large + target_model = current_large_model + target_max_tokens = os.environ.get("GEMINI_LARGE_MODEL_MAX_TOKENS") self._configure_model_and_tokens(target_model, target_max_tokens) + logging.info(f"Switched to model: {self.model}") return f"Switched to model: {self.model}" +# The main() function and if __name__ == '__main__': block are for standalone execution. +# If this bot is imported as a module, these might not be necessary or might be handled differently. +# For now, keeping them as they were. def main(): - # Ensure GEMINI_API_KEY and other environment variables are set if not os.environ.get("GEMINI_API_KEY"): logging.error("FATAL: GEMINI_API_KEY environment variable not set.") return + # Configure logging here if it's the main entry point + logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') + bot = GeminiTelegramInferenceBot() + # The instantiation of TelegramHelper and running it implies this file can be an entry point. + # If it's purely a module, this main() would be removed. telegram_helper = TelegramHelper(bot) telegram_helper.run() if __name__ == '__main__': - logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') - main() \ No newline at end of file + main() From d167fc7b39460c0a1821516a93305cbe38edc9ff Mon Sep 17 00:00:00 2001 From: cyclop-bot <178948048+cyclop-bot@users.noreply.github.com> Date: Mon, 2 Jun 2025 14:37:57 -0500 Subject: [PATCH 3/5] feat: Update ChatGPT bot with new status methods and tool handling --- chatgpt_telegram_inference_bot.py | 139 ++++++++++++++++++------------ 1 file changed, 85 insertions(+), 54 deletions(-) diff --git a/chatgpt_telegram_inference_bot.py b/chatgpt_telegram_inference_bot.py index 1644d3c..9e6374f 100644 --- a/chatgpt_telegram_inference_bot.py +++ b/chatgpt_telegram_inference_bot.py @@ -1,12 +1,11 @@ import json import os import logging -from base_telegram_inference_bot import BaseTelegramInferenceBot # Assuming this base class exists -from telegram_helper import TelegramHelper # Assuming this helper class exists +from base_telegram_inference_bot import BaseTelegramInferenceBot +from telegram_helper import TelegramHelper from openai import OpenAI -# Ensure basic logging is configured if not done elsewhere -# logging.basicConfig(level=logging.INFO) # Example: You might have a more sophisticated setup +# logging.basicConfig(level=logging.INFO) # Usually configured in main execution script class ChatGPTTelegramInferenceBot(BaseTelegramInferenceBot): def __init__(self): @@ -14,12 +13,12 @@ class ChatGPTTelegramInferenceBot(BaseTelegramInferenceBot): self.client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY")) self._configure_model_and_tokens( - os.environ.get("OPENAI_SMALL_MODEL"), # Default model - os.environ.get("OPENAI_SMALL_MODEL_MAX_TOKENS") # Default tokens + os.environ.get("OPENAI_SMALL_MODEL", "gpt-3.5-turbo"), # Default to a common small model + os.environ.get("OPENAI_SMALL_MODEL_MAX_TOKENS") ) def _configure_model_and_tokens(self, model_name, max_tokens_str, default_max_tokens=1000): - self.model = model_name + self.model = model_name if model_name else "gpt-3.5-turbo" # Ensure model has a default try: self.max_tokens = int(max_tokens_str) if max_tokens_str is not None else default_max_tokens except ValueError: @@ -27,11 +26,23 @@ class ChatGPTTelegramInferenceBot(BaseTelegramInferenceBot): self.max_tokens = default_max_tokens logging.info(f"Configured to use model: {self.model} with max_tokens: {self.max_tokens}") + def get_system_prompt_description(self) -> str: + system_prompt_path = os.getenv("SYSTEM_PROMPT_PATH") + if system_prompt_path and os.path.isfile(system_prompt_path): + return f"System Prompt File: {os.path.basename(system_prompt_path)}" + elif system_prompt_path: # Path is set but file not found + return f"System Prompt File: {os.path.basename(system_prompt_path)} (Not found at path: {system_prompt_path})" + else: # Path not set + return "System Prompt File: Not configured (SYSTEM_PROMPT_PATH not set)." + + def get_llm_description(self) -> str: + return f"LLM: {self.model}, Max Tokens: {self.max_tokens}" + def get_chat_response(self, messages): try: response = self.client.chat.completions.create( model=self.model, - messages=messages, # The system prompt is expected to be part of messages here + messages=messages, tools=self.functions if hasattr(self, 'functions') and self.functions else None, tool_choice="auto" if hasattr(self, 'functions') and self.functions else None, max_tokens=self.max_tokens @@ -52,92 +63,112 @@ class ChatGPTTelegramInferenceBot(BaseTelegramInferenceBot): response = self.get_chat_response(messages) - tool_calls = [] - - for message_part in response.choices: - if message_part.finish_reason == "tool_calls": - tool_calls.extend(message_part.message.tool_calls) + if not (response.choices and response.choices[0].message): + logging.error("No valid response choice message from LLM.") + return "Error: Could not get a valid response from the LLM." - messages.append(response.choices[0].message) + messages.append(response.choices[0].message) # Append the assistant's response message + tool_calls_from_response = [] + if response.choices[0].message.tool_calls: + tool_calls_from_response.extend(response.choices[0].message.tool_calls) + tool_use_count = 0 - while len(tool_calls) > 0 and tool_use_count < 500: - tool_use_results = [] + MAX_TOOL_ITERATIONS = 5 - while len(tool_calls) > 0: - tool_call_message = tool_calls.pop(0) - tool_call_id = tool_call_message.id - tool_call = tool_call_message.function - tool_response = self.call_tool(tool_call.name, tool_call.arguments) + while tool_calls_from_response and tool_use_count < MAX_TOOL_ITERATIONS: + tool_results_for_model = [] + + for tool_call in tool_calls_from_response: + tool_call_id = tool_call.id + function_to_call = tool_call.function + + logging.info(f"Attempting to call tool: {function_to_call.name} with args: {function_to_call.arguments}") try: - tool_use_results.append({"role": "tool", "tool_call_id": tool_call_id, "name":tool_call.name, "content": str(tool_response) }) - except (TypeError, ValueError) as e: - logging.error(f"Failed to serialize tool response: {e}") - tool_use_results.append({"role": "function", "name": tool_call.name, "content": "Serialization error"}) + tool_response_content = self.call_tool(function_to_call.name, function_to_call.arguments) + if not isinstance(tool_response_content, str): + tool_response_content = json.dumps(tool_response_content) + except Exception as e: + logging.error(f"Error calling tool {function_to_call.name}: {e}") + tool_response_content = f"Error executing tool {function_to_call.name}: {str(e)}" + + tool_results_for_model.append({ + "role": "tool", + "tool_call_id": tool_call_id, + "name": function_to_call.name, + "content": tool_response_content + }) - messages.extend(tool_use_results) + messages.extend(tool_results_for_model) response = self.get_chat_response(messages) - - for message_part in response.choices: - if message_part.finish_reason == "tool_calls": - tool_calls.extend(message_part.message.tool_calls) - + if not (response.choices and response.choices[0].message): + logging.error("No valid response choice message from LLM after tool call.") + return "Error: Could not get a valid response from the LLM after tool call." + messages.append(response.choices[0].message) + tool_calls_from_response = [] + if response.choices[0].message.tool_calls: + tool_calls_from_response.extend(response.choices[0].message.tool_calls) + tool_use_count += 1 + if tool_use_count >= MAX_TOOL_ITERATIONS and tool_calls_from_response: + logging.warning(f"Max tool iterations ({MAX_TOOL_ITERATIONS}) reached. Returning last assistant message.") - if len(self.conversation_history[user_id]) > 20: + if len(self.conversation_history[user_id]) > 20: # This limit seems small, consider increasing self.conversation_history[user_id] = self.conversation_history[user_id][-20:] - return messages[-1].content + final_assistant_message = messages[-1] + return final_assistant_message.content if final_assistant_message.role == "assistant" and final_assistant_message.content else "No content in final message." + async def start(self): - logging.info("Bot started") - # Potentially call super().start() if it exists and does something + logging.info("ChatGPT Bot started") + # super().start() if Base class start() has common logic async def clear(self, user_id): super().clear_conversation(user_id) - - async def status(self): - return f"Currently using: {self.model}, Max Tokens: {self.max_tokens}" + # status() method is inherited from BaseTelegramInferenceBot async def abort_processing(self, user_id): - # This depends on how processing_status is managed, likely in BaseTelegramInferenceBot - if hasattr(self, 'processing_status') and user_id in self.processing_status: - self.processing_status[user_id]["processing"] = False # Example - await self.clear(user_id) # Clearing conversation on abort might be desired + if user_id in self.processing_status: # Relies on processing_status from Base + self.processing_status[user_id]["processing"] = False + await self.clear(user_id) return "Processing aborted and conversation cleared." else: - # If not tracking processing_status here, just clear for safety await self.clear(user_id) - return "No specific active processing to abort, cleared conversation for safety." + return "No active processing found to abort. Conversation cleared." async def switch_model(self): - current_small_model = os.environ.get("OPENAI_SMALL_MODEL") - current_large_model = os.environ.get("OPENAI_LARGE_MODEL") + # Ensure environment variables for model names are set for this to work meaningfully + current_small_model = os.environ.get("OPENAI_SMALL_MODEL", "gpt-3.5-turbo") + current_large_model = os.environ.get("OPENAI_LARGE_MODEL", "gpt-4") # Example large model - if self.model == current_small_model: - target_model = current_large_model - target_max_tokens = os.environ.get("OPENAI_LARGE_MODEL_MAX_TOKENS") - else: + # Default to small model if current model is not recognized or if it's the large one + if self.model == current_large_model or self.model != current_small_model : target_model = current_small_model target_max_tokens = os.environ.get("OPENAI_SMALL_MODEL_MAX_TOKENS") + else: # Current is small (or default), switch to large + target_model = current_large_model + target_max_tokens = os.environ.get("OPENAI_LARGE_MODEL_MAX_TOKENS") self._configure_model_and_tokens(target_model, target_max_tokens) + logging.info(f"Switched to model: {self.model}") return f"Switched to model: {self.model}" def main(): - # Ensure OPENAI_API_KEY and other environment variables are set if not os.environ.get("OPENAI_API_KEY"): logging.error("FATAL: OPENAI_API_KEY environment variable not set.") return - + + # Configure logging here if it's the main entry point + logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') + bot = ChatGPTTelegramInferenceBot() telegram_helper = TelegramHelper(bot) telegram_helper.run() if __name__ == '__main__': - logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') - main() \ No newline at end of file + main() From 80d7aa721cc9d9e1bf6f813717ceae15c030087a Mon Sep 17 00:00:00 2001 From: cyclop-bot <178948048+cyclop-bot@users.noreply.github.com> Date: Mon, 2 Jun 2025 14:38:33 -0500 Subject: [PATCH 4/5] feat: Update Anthropic bot with new status methods and tool handling --- anthropic_telegram_inference_bot.py | 111 +--------------------------- 1 file changed, 1 insertion(+), 110 deletions(-) diff --git a/anthropic_telegram_inference_bot.py b/anthropic_telegram_inference_bot.py index 2c39863..449f842 100644 --- a/anthropic_telegram_inference_bot.py +++ b/anthropic_telegram_inference_bot.py @@ -1,110 +1 @@ -import os -import json -import logging -from anthropic import Anthropic -from base_telegram_inference_bot import BaseTelegramInferenceBot -from telegram_helper import TelegramHelper - -class AnthropicTelegramInferenceBot(BaseTelegramInferenceBot): - def __init__(self): - super().__init__() - self.anthropic_client = Anthropic( - api_key=os.environ.get("ANTHROPIC_API_KEY"), - default_headers={"anthropic-beta": "max-tokens-3-5-sonnet-2024-07-15"} - ) - - def get_chat_response(self, messages): - anthropic_tools = [ - { - "name": function['name'], - "description": function['description'], - "input_schema": function['parameters'] if function['parameters'] not in [None, {}] else {"type": "object", "properties": {"param1": {"type": "string", "description": "Unnecessary"}}, "required": []} - } - for function in self.functions - ] - try: - response = self.anthropic_client.messages.create( - model="claude-3-5-sonnet-20240620", - system=self.system_prompt, - messages=messages, - max_tokens=8192, - tools=anthropic_tools, - tool_choice={"type": "auto"} - ) - except Exception as e: - logging.error(f"An error occurred: {str(e)}") - return None - - return response - - async def handle_message(self, user_id, user_message): - if user_id not in self.conversation_history: - self.conversation_history[user_id] = [] - - self.conversation_history[user_id].append({"role": "user", "content": user_message}) - messages = self.conversation_history[user_id] - - response = self.get_chat_response(messages) - tool_calls = [] - full_message = [] - for message_part in response.content: - full_message.append(message_part) - if message_part.type == "tool_use": - tool_calls.append(message_part) - messages.append({"role": "assistant", "content": full_message}) - - tool_use_count = 0 - while len(tool_calls) > 0 and tool_use_count < 50: - tool_use_results = [] - while len(tool_calls) > 0: - tool_call = tool_calls.pop(0) - tool_response = self.call_tool(tool_call.name, json.dumps(tool_call.input)) - tool_use_results.append({"type": "tool_result", "tool_use_id": tool_call.id, "content": json.dumps(tool_response)}) - - messages.append({"role": "user", "content": tool_use_results}) - - response = self.get_chat_response(messages) - full_message = [] - - for message_part in response.content: - full_message.append(message_part) - if message_part.type == "tool_use": - tool_calls.append(message_part) - messages.append({"role": "assistant", "content": full_message}) - - tool_use_count += 1 - - if (tool_use_count == 0): - assistant_reply = response.content - self.conversation_history[user_id].append({"role": "assistant", "content": assistant_reply}) - - if len(self.conversation_history[user_id]) > 20: - self.conversation_history[user_id] = self.conversation_history[user_id][-20:] - - return messages[-1]["content"][0].text - - async def start(self): - logging.info("Bot started") - - async def clear(self, user_id): - super().clear_conversation(user_id) - logging.info(f"Cleared conversation history and image for user {user_id}") - - async def status(self): - return "Currently using claude-3-5-sonnet-20240620" - - async def abort_processing(self, user_id): - if user_id in self.processing_status: - self.processing_status[user_id]["processing"] = False - await self.clear(user_id) - return "Processing aborted." - else: - return "No active processing to abort." - -def main(): - bot = AnthropicTelegramInferenceBot() - telegram_helper = TelegramHelper(bot) - telegram_helper.run() - -if __name__ == '__main__': - main() \ No newline at end of file +import os\nimport json\nimport logging\nfrom anthropic import Anthropic\nfrom base_telegram_inference_bot import BaseTelegramInferenceBot\nfrom telegram_helper import TelegramHelper\n\n# logging.basicConfig(level=logging.INFO) # Usually configured in main execution script\n\nclass AnthropicTelegramInferenceBot(BaseTelegramInferenceBot):\n def __init__(self):\n super().__init__()\n self.anthropic_client = Anthropic(api_key=os.environ.get(\"ANTHROPIC_API_KEY\"))\n # Note: default_headers for max_tokens with older models might be needed.\n # For Claude 3.5 Sonnet, max_tokens is a top-level param in messages.create\n \n # Configure model and tokens. Using Sonnet 3.5 as default.\n # ANTHROPIC_MODEL and ANTHROPIC_MAX_TOKENS would be new ENVs.\n self._configure_model_and_tokens(\n os.environ.get(\"ANTHROPIC_MODEL\", \"claude-3-5-sonnet-20240620\"),\n os.environ.get(\"ANTHROPIC_MAX_TOKENS\", \"4096\") # Default max tokens for Sonnet 3.5\n )\n\n def _configure_model_and_tokens(self, model_name, max_tokens_str, default_max_tokens=4096):\n self.model = model_name if model_name else \"claude-3-5-sonnet-20240620\"\n try:\n # Anthropic\'s max_tokens is an integer.\n self.max_tokens = int(max_tokens_str) if max_tokens_str is not None else default_max_tokens\n except ValueError:\n logging.error(f\"Invalid value for Anthropic max_tokens: {max_tokens_str}. Using default {default_max_tokens}.\")\n self.max_tokens = default_max_tokens\n logging.info(f\"Configured to use Anthropic model: {self.model} with max_tokens: {self.max_tokens}\")\n\n def get_system_prompt_description(self) -> str:\n system_prompt_path = os.getenv(\"SYSTEM_PROMPT_PATH\")\n if system_prompt_path and os.path.isfile(system_prompt_path):\n return f\"System Prompt File: {os.path.basename(system_prompt_path)}\"\n elif system_prompt_path:\n return f\"System Prompt File: {os.path.basename(system_prompt_path)} (Not found at path: {system_prompt_path})\"\n else:\n return \"System Prompt File: Not configured (SYSTEM_PROMPT_PATH not set).\"\n\n def get_llm_description(self) -> str:\n return f\"LLM: {self.model}, Max Tokens: {self.max_tokens}\"\n\n def get_chat_response(self, messages_history):\n # Anthropic expects messages in a specific format.\n # Filter out any non-compliant messages or transform if necessary.\n # For simplicity, assume messages_history is already compliant.\n \n # Ensure self.system_prompt is a string, not None.\n current_system_prompt = self.system_prompt if self.system_prompt else \"\"\n\n anthropic_tools = []\n if hasattr(self, \'functions\') and self.functions:\n anthropic_tools = [\n {\n \"name\": function[\'name\'],\n \"description\": function[\'description\'],\n \"input_schema\": function[\'parameters\'] if function[\'parameters\'] not in [None, {}] else {\"type\": \"object\", \"properties\": {}}\n }\n for function in self.functions\n ]\n \n try:\n response = self.anthropic_client.messages.create(\n model=self.model,\n system=current_system_prompt, # System prompt is a top-level parameter\n messages=messages_history, # User/assistant turns\n max_tokens=self.max_tokens, # Use configured max_tokens\n tools=anthropic_tools if anthropic_tools else None,\n tool_choice={\"type\": \"auto\"} if anthropic_tools else None\n )\n return response\n except Exception as e:\n logging.error(f\"Anthropic API call failed: {e}\")\n # Consider how to propagate this error: re-raise or return a specific error structure\n raise # Re-raising for now\n\n async def handle_message(self, user_id, user_message):\n if user_id not in self.conversation_history:\n self.conversation_history[user_id] = []\n # Anthropic doesn\'t use a \"system\" role message in the main `messages` list\n # It\'s passed as a separate `system` parameter to `messages.create`.\n # So, no need to add system_prompt to conversation_history here.\n\n self.conversation_history[user_id].append({\"role\": \"user\", \"content\": user_message})\n \n # Make a mutable copy for this turn, as we\'ll be appending tool calls and responses\n current_turn_messages = list(self.conversation_history[user_id])\n\n MAX_TOOL_ITERATIONS = 5\n tool_use_count = 0\n assistant_response_content = \"\" # Stores the textual part of the assistant's response\n\n while tool_use_count < MAX_TOOL_ITERATIONS:\n response = self.get_chat_response(current_turn_messages)\n\n if not response or not response.content:\n logging.error(\"No valid response content from Anthropic LLM.\")\n # Update conversation history with this attempt before returning an error message\n self.conversation_history[user_id] = current_turn_messages\n return \"Error: Could not get a valid response from the LLM.\"\n\n # Process response.content, which is a list of blocks (e.g., text, tool_use)\n # The assistant's complete turn (text and tool requests) forms one message.\n assistant_current_turn_content_blocks = response.content\n current_turn_messages.append({\"role\": \"assistant\", \"content\": assistant_current_turn_content_blocks})\n\n # Extract text parts and tool calls from the assistant's last message\n text_parts_from_assistant = []\n tool_calls_from_response = []\n for block in assistant_current_turn_content_blocks:\n if block.type == \"text\":\n text_parts_from_assistant.append(block.text)\n elif block.type == \"tool_use\":\n tool_calls_from_response.append(block)\n \n assistant_response_content = \"\".join(text_parts_from_assistant) # Update with latest text\n\n if not tool_calls_from_response: # No tools called, this is the final response from assistant\n break \n\n # If there are tool calls, prepare and add their results\n tool_results_for_model = []\n for tool_call in tool_calls_from_response:\n tool_name = tool_call.name\n tool_input = tool_call.input # This is a dict\n tool_use_id = tool_call.id\n \n logging.info(f\"Attempting to call Anthropic tool: {tool_name} with input: {tool_input}\")\n try:\n # self.call_tool should ideally return a JSON-serializable object or a string.\n # Anthropic expects tool result content to be a list of content blocks (e.g. text, image) or string.\n # For simplicity, we'll assume call_tool returns a string or a dict/list that can be made into a text block.\n tool_response_data = self.call_tool(tool_name, tool_input) \n \n # Format the tool response for Anthropic\n if isinstance(tool_response_data, str):\n tool_result_content_block = [{\"type\": \"text\", \"text\": tool_response_data}]\n elif isinstance(tool_response_data, dict) or isinstance(tool_response_data, list):\n # If it's already a list of blocks, use directly. Otherwise, wrap in text block.\n try:\n # Attempt to treat as pre-formatted list of blocks\n # This is unlikely unless call_tool is specifically designed for Anthropic output\n # A more robust way would be to json.dumps dict/list and put in a text block\n tool_result_content_block = tool_response_data if isinstance(tool_response_data, list) and all(isinstance(i, dict) and \"type\" in i for i in tool_response_data) else [{\"type\": \"text\", \"text\": json.dumps(tool_response_data)}]\n except TypeError:\ # Not easily serializable to JSON in a text block\n tool_result_content_block = [{\"type\": \"text\", \"text\": str(tool_response_data)}]\n else:\n tool_result_content_block = [{\"type\": \"text\", \"text\": str(tool_response_data)}]\n\n tool_results_for_model.append({\n \"type\": \"tool_result\",\n \"tool_use_id\": tool_use_id,\n \"content\": tool_result_content_block \n })\n except Exception as e:\n logging.error(f\"Error calling tool {tool_name}: {e}\")\n tool_results_for_model.append({\n \"type\": \"tool_result\",\n \"tool_use_id\": tool_use_id,\n \"content\": [{\"type\": \"text\", \"text\": f\"Error executing tool {tool_name}: {str(e)}\"}],\n \"is_error\": True \n })\n \n # Add tool results to the history for the next call to the model\n # This message has role \"user\" and content is a list of tool_result blocks.\n current_turn_messages.append({\"role\": \"user\", \"content\": tool_results_for_model})\n \n tool_use_count += 1\n if tool_use_count >= MAX_TOOL_ITERATIONS:\n logging.warning(f\"Max tool iterations ({MAX_TOOL_ITERATIONS}) reached for Anthropic.\")\n # The assistant_response_content already holds the text from the turn *before* this iteration failed to complete tools.\n break\n \n # Persist the full turn history (including successful tool exchanges) \n self.conversation_history[user_id] = current_turn_messages\n \n # Manage conversation history length\n if len(self.conversation_history[user_id]) > 20: # Consider a more robust token-based limit\n # A more sophisticated truncation would ensure the message list integrity (e.g. not cutting mid-tool-exchange)\n self.conversation_history[user_id] = self.conversation_history[user_id][-20:]\n\n # Return the last accumulated textual content from the assistant. This would be the text from the\n # assistant's turn before breaking the loop (either due to no more tools, or max iterations).\n if assistant_response_content:\n return assistant_response_content\n else:\n # Fallback: if assistant_response_content is empty (e.g. error on first API call, or only tool_use)\n # Try to get the *very last* text block from the *very last* assistant message in history.\n if current_turn_messages:\n last_message = current_turn_messages[-1]\n if last_message[\"role\"] == \"assistant\" and isinstance(last_message[\"content\"], list):\n for block in reversed(last_message[\"content\"]):\n if block.type == \"text\":\n return block.text\n return \"No textual response from assistant.\"\n\n\n async def start(self):\n logging.info(\"Anthropic Bot started\")\n # super().start() if applicable\n\n async def clear(self, user_id):\n super().clear_conversation(user_id)\n logging.info(f\"Cleared conversation history for Anthropic bot, user {user_id}\")\n\n # status() method is inherited from BaseTelegramInferenceBot\n\n async def abort_processing(self, user_id):\n if user_id in self.processing_status: # Relies on processing_status from Base\n self.processing_status[user_id][\"processing\"] = False\n await self.clear(user_id)\n return \"Processing aborted and conversation cleared.\"\n else:\n await self.clear(user_id)\n return \"No active processing found to abort. Conversation cleared.\"\n\n async def switch_model(self):\n primary_model = os.environ.get(\"ANTHROPIC_MODEL\", \"claude-3-5-sonnet-20240620\")\n primary_max_tokens = os.environ.get(\"ANTHROPIC_MAX_TOKENS\", \"4096\")\n \n secondary_model_env = os.environ.get(\"ANTHROPIC_SECONDARY_MODEL\")\n secondary_max_tokens_env = os.environ.get(\"ANTHROPIC_SECONDARY_MAX_TOKENS\")\n\n if not secondary_model_env: \n logging.warning(\"ANTHROPIC_SECONDARY_MODEL not defined. Cannot switch model.\")\n return f\"Model switching not configured. Currently using {self.model}.\"\n\n # Determine target model and tokens\n if self.model == primary_model:\n target_model = secondary_model_env\n # Use secondary token ENV if set, else a reasonable default, or primary's if makes sense\n target_max_tokens = secondary_max_tokens_env if secondary_max_tokens_env else \"2048\" \n else: # Assumes current is secondary or an unknown model, switch to primary\n target_model = primary_model\n target_max_tokens = primary_max_tokens\n \n self._configure_model_and_tokens(target_model, target_max_tokens)\n logging.info(f\"Switched Anthropic model to: {self.model}\")\n return f\"Switched to Anthropic model: {self.model}\"\n\ndef main():\n if not os.environ.get(\"ANTHROPIC_API_KEY\"):\n logging.error(\"FATAL: ANTHROPIC_API_KEY environment variable not set.\")\n return\n\n logging.basicConfig(level=logging.INFO, format=\'%(asctime)s - %(name)s - %(levelname)s - %(message)s\')\n \n bot = AnthropicTelegramInferenceBot()\n telegram_helper = TelegramHelper(bot)\n telegram_helper.run()\n\nif __name__ == \'__main__\':\n main()\n \ No newline at end of file From 72b460bcfb69312edc78df4d0d0ff2ad542ca4e0 Mon Sep 17 00:00:00 2001 From: cyclop-bot <178948048+cyclop-bot@users.noreply.github.com> Date: Mon, 2 Jun 2025 14:43:51 -0500 Subject: [PATCH 5/5] fix: Correct formatting for Anthropic bot update --- anthropic_telegram_inference_bot.py | 224 +++++++++++++++++++++++++++- 1 file changed, 223 insertions(+), 1 deletion(-) diff --git a/anthropic_telegram_inference_bot.py b/anthropic_telegram_inference_bot.py index 449f842..116b00a 100644 --- a/anthropic_telegram_inference_bot.py +++ b/anthropic_telegram_inference_bot.py @@ -1 +1,223 @@ -import os\nimport json\nimport logging\nfrom anthropic import Anthropic\nfrom base_telegram_inference_bot import BaseTelegramInferenceBot\nfrom telegram_helper import TelegramHelper\n\n# logging.basicConfig(level=logging.INFO) # Usually configured in main execution script\n\nclass AnthropicTelegramInferenceBot(BaseTelegramInferenceBot):\n def __init__(self):\n super().__init__()\n self.anthropic_client = Anthropic(api_key=os.environ.get(\"ANTHROPIC_API_KEY\"))\n # Note: default_headers for max_tokens with older models might be needed.\n # For Claude 3.5 Sonnet, max_tokens is a top-level param in messages.create\n \n # Configure model and tokens. Using Sonnet 3.5 as default.\n # ANTHROPIC_MODEL and ANTHROPIC_MAX_TOKENS would be new ENVs.\n self._configure_model_and_tokens(\n os.environ.get(\"ANTHROPIC_MODEL\", \"claude-3-5-sonnet-20240620\"),\n os.environ.get(\"ANTHROPIC_MAX_TOKENS\", \"4096\") # Default max tokens for Sonnet 3.5\n )\n\n def _configure_model_and_tokens(self, model_name, max_tokens_str, default_max_tokens=4096):\n self.model = model_name if model_name else \"claude-3-5-sonnet-20240620\"\n try:\n # Anthropic\'s max_tokens is an integer.\n self.max_tokens = int(max_tokens_str) if max_tokens_str is not None else default_max_tokens\n except ValueError:\n logging.error(f\"Invalid value for Anthropic max_tokens: {max_tokens_str}. Using default {default_max_tokens}.\")\n self.max_tokens = default_max_tokens\n logging.info(f\"Configured to use Anthropic model: {self.model} with max_tokens: {self.max_tokens}\")\n\n def get_system_prompt_description(self) -> str:\n system_prompt_path = os.getenv(\"SYSTEM_PROMPT_PATH\")\n if system_prompt_path and os.path.isfile(system_prompt_path):\n return f\"System Prompt File: {os.path.basename(system_prompt_path)}\"\n elif system_prompt_path:\n return f\"System Prompt File: {os.path.basename(system_prompt_path)} (Not found at path: {system_prompt_path})\"\n else:\n return \"System Prompt File: Not configured (SYSTEM_PROMPT_PATH not set).\"\n\n def get_llm_description(self) -> str:\n return f\"LLM: {self.model}, Max Tokens: {self.max_tokens}\"\n\n def get_chat_response(self, messages_history):\n # Anthropic expects messages in a specific format.\n # Filter out any non-compliant messages or transform if necessary.\n # For simplicity, assume messages_history is already compliant.\n \n # Ensure self.system_prompt is a string, not None.\n current_system_prompt = self.system_prompt if self.system_prompt else \"\"\n\n anthropic_tools = []\n if hasattr(self, \'functions\') and self.functions:\n anthropic_tools = [\n {\n \"name\": function[\'name\'],\n \"description\": function[\'description\'],\n \"input_schema\": function[\'parameters\'] if function[\'parameters\'] not in [None, {}] else {\"type\": \"object\", \"properties\": {}}\n }\n for function in self.functions\n ]\n \n try:\n response = self.anthropic_client.messages.create(\n model=self.model,\n system=current_system_prompt, # System prompt is a top-level parameter\n messages=messages_history, # User/assistant turns\n max_tokens=self.max_tokens, # Use configured max_tokens\n tools=anthropic_tools if anthropic_tools else None,\n tool_choice={\"type\": \"auto\"} if anthropic_tools else None\n )\n return response\n except Exception as e:\n logging.error(f\"Anthropic API call failed: {e}\")\n # Consider how to propagate this error: re-raise or return a specific error structure\n raise # Re-raising for now\n\n async def handle_message(self, user_id, user_message):\n if user_id not in self.conversation_history:\n self.conversation_history[user_id] = []\n # Anthropic doesn\'t use a \"system\" role message in the main `messages` list\n # It\'s passed as a separate `system` parameter to `messages.create`.\n # So, no need to add system_prompt to conversation_history here.\n\n self.conversation_history[user_id].append({\"role\": \"user\", \"content\": user_message})\n \n # Make a mutable copy for this turn, as we\'ll be appending tool calls and responses\n current_turn_messages = list(self.conversation_history[user_id])\n\n MAX_TOOL_ITERATIONS = 5\n tool_use_count = 0\n assistant_response_content = \"\" # Stores the textual part of the assistant's response\n\n while tool_use_count < MAX_TOOL_ITERATIONS:\n response = self.get_chat_response(current_turn_messages)\n\n if not response or not response.content:\n logging.error(\"No valid response content from Anthropic LLM.\")\n # Update conversation history with this attempt before returning an error message\n self.conversation_history[user_id] = current_turn_messages\n return \"Error: Could not get a valid response from the LLM.\"\n\n # Process response.content, which is a list of blocks (e.g., text, tool_use)\n # The assistant's complete turn (text and tool requests) forms one message.\n assistant_current_turn_content_blocks = response.content\n current_turn_messages.append({\"role\": \"assistant\", \"content\": assistant_current_turn_content_blocks})\n\n # Extract text parts and tool calls from the assistant's last message\n text_parts_from_assistant = []\n tool_calls_from_response = []\n for block in assistant_current_turn_content_blocks:\n if block.type == \"text\":\n text_parts_from_assistant.append(block.text)\n elif block.type == \"tool_use\":\n tool_calls_from_response.append(block)\n \n assistant_response_content = \"\".join(text_parts_from_assistant) # Update with latest text\n\n if not tool_calls_from_response: # No tools called, this is the final response from assistant\n break \n\n # If there are tool calls, prepare and add their results\n tool_results_for_model = []\n for tool_call in tool_calls_from_response:\n tool_name = tool_call.name\n tool_input = tool_call.input # This is a dict\n tool_use_id = tool_call.id\n \n logging.info(f\"Attempting to call Anthropic tool: {tool_name} with input: {tool_input}\")\n try:\n # self.call_tool should ideally return a JSON-serializable object or a string.\n # Anthropic expects tool result content to be a list of content blocks (e.g. text, image) or string.\n # For simplicity, we'll assume call_tool returns a string or a dict/list that can be made into a text block.\n tool_response_data = self.call_tool(tool_name, tool_input) \n \n # Format the tool response for Anthropic\n if isinstance(tool_response_data, str):\n tool_result_content_block = [{\"type\": \"text\", \"text\": tool_response_data}]\n elif isinstance(tool_response_data, dict) or isinstance(tool_response_data, list):\n # If it's already a list of blocks, use directly. Otherwise, wrap in text block.\n try:\n # Attempt to treat as pre-formatted list of blocks\n # This is unlikely unless call_tool is specifically designed for Anthropic output\n # A more robust way would be to json.dumps dict/list and put in a text block\n tool_result_content_block = tool_response_data if isinstance(tool_response_data, list) and all(isinstance(i, dict) and \"type\" in i for i in tool_response_data) else [{\"type\": \"text\", \"text\": json.dumps(tool_response_data)}]\n except TypeError:\ # Not easily serializable to JSON in a text block\n tool_result_content_block = [{\"type\": \"text\", \"text\": str(tool_response_data)}]\n else:\n tool_result_content_block = [{\"type\": \"text\", \"text\": str(tool_response_data)}]\n\n tool_results_for_model.append({\n \"type\": \"tool_result\",\n \"tool_use_id\": tool_use_id,\n \"content\": tool_result_content_block \n })\n except Exception as e:\n logging.error(f\"Error calling tool {tool_name}: {e}\")\n tool_results_for_model.append({\n \"type\": \"tool_result\",\n \"tool_use_id\": tool_use_id,\n \"content\": [{\"type\": \"text\", \"text\": f\"Error executing tool {tool_name}: {str(e)}\"}],\n \"is_error\": True \n })\n \n # Add tool results to the history for the next call to the model\n # This message has role \"user\" and content is a list of tool_result blocks.\n current_turn_messages.append({\"role\": \"user\", \"content\": tool_results_for_model})\n \n tool_use_count += 1\n if tool_use_count >= MAX_TOOL_ITERATIONS:\n logging.warning(f\"Max tool iterations ({MAX_TOOL_ITERATIONS}) reached for Anthropic.\")\n # The assistant_response_content already holds the text from the turn *before* this iteration failed to complete tools.\n break\n \n # Persist the full turn history (including successful tool exchanges) \n self.conversation_history[user_id] = current_turn_messages\n \n # Manage conversation history length\n if len(self.conversation_history[user_id]) > 20: # Consider a more robust token-based limit\n # A more sophisticated truncation would ensure the message list integrity (e.g. not cutting mid-tool-exchange)\n self.conversation_history[user_id] = self.conversation_history[user_id][-20:]\n\n # Return the last accumulated textual content from the assistant. This would be the text from the\n # assistant's turn before breaking the loop (either due to no more tools, or max iterations).\n if assistant_response_content:\n return assistant_response_content\n else:\n # Fallback: if assistant_response_content is empty (e.g. error on first API call, or only tool_use)\n # Try to get the *very last* text block from the *very last* assistant message in history.\n if current_turn_messages:\n last_message = current_turn_messages[-1]\n if last_message[\"role\"] == \"assistant\" and isinstance(last_message[\"content\"], list):\n for block in reversed(last_message[\"content\"]):\n if block.type == \"text\":\n return block.text\n return \"No textual response from assistant.\"\n\n\n async def start(self):\n logging.info(\"Anthropic Bot started\")\n # super().start() if applicable\n\n async def clear(self, user_id):\n super().clear_conversation(user_id)\n logging.info(f\"Cleared conversation history for Anthropic bot, user {user_id}\")\n\n # status() method is inherited from BaseTelegramInferenceBot\n\n async def abort_processing(self, user_id):\n if user_id in self.processing_status: # Relies on processing_status from Base\n self.processing_status[user_id][\"processing\"] = False\n await self.clear(user_id)\n return \"Processing aborted and conversation cleared.\"\n else:\n await self.clear(user_id)\n return \"No active processing found to abort. Conversation cleared.\"\n\n async def switch_model(self):\n primary_model = os.environ.get(\"ANTHROPIC_MODEL\", \"claude-3-5-sonnet-20240620\")\n primary_max_tokens = os.environ.get(\"ANTHROPIC_MAX_TOKENS\", \"4096\")\n \n secondary_model_env = os.environ.get(\"ANTHROPIC_SECONDARY_MODEL\")\n secondary_max_tokens_env = os.environ.get(\"ANTHROPIC_SECONDARY_MAX_TOKENS\")\n\n if not secondary_model_env: \n logging.warning(\"ANTHROPIC_SECONDARY_MODEL not defined. Cannot switch model.\")\n return f\"Model switching not configured. Currently using {self.model}.\"\n\n # Determine target model and tokens\n if self.model == primary_model:\n target_model = secondary_model_env\n # Use secondary token ENV if set, else a reasonable default, or primary's if makes sense\n target_max_tokens = secondary_max_tokens_env if secondary_max_tokens_env else \"2048\" \n else: # Assumes current is secondary or an unknown model, switch to primary\n target_model = primary_model\n target_max_tokens = primary_max_tokens\n \n self._configure_model_and_tokens(target_model, target_max_tokens)\n logging.info(f\"Switched Anthropic model to: {self.model}\")\n return f\"Switched to Anthropic model: {self.model}\"\n\ndef main():\n if not os.environ.get(\"ANTHROPIC_API_KEY\"):\n logging.error(\"FATAL: ANTHROPIC_API_KEY environment variable not set.\")\n return\n\n logging.basicConfig(level=logging.INFO, format=\'%(asctime)s - %(name)s - %(levelname)s - %(message)s\')\n \n bot = AnthropicTelegramInferenceBot()\n telegram_helper = TelegramHelper(bot)\n telegram_helper.run()\n\nif __name__ == \'__main__\':\n main()\n \ No newline at end of file +import os +import json +import logging +from anthropic import Anthropic +from base_telegram_inference_bot import BaseTelegramInferenceBot +from telegram_helper import TelegramHelper + +# logging.basicConfig(level=logging.INFO) # Usually configured in main execution script + +class AnthropicTelegramInferenceBot(BaseTelegramInferenceBot): + def __init__(self): + super().__init__() + self.anthropic_client = Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY")) + # Note: default_headers for max_tokens with older models might be needed. + # For Claude 3.5 Sonnet, max_tokens is a top-level param in messages.create + + # Configure model and tokens. Using Sonnet 3.5 as default. + # ANTHROPIC_MODEL and ANTHROPIC_MAX_TOKENS would be new ENVs. + self._configure_model_and_tokens( + os.environ.get("ANTHROPIC_MODEL", "claude-3-5-sonnet-20240620"), + os.environ.get("ANTHROPIC_MAX_TOKENS", "4096") # Default max tokens for Sonnet 3.5 + ) + + def _configure_model_and_tokens(self, model_name, max_tokens_str, default_max_tokens=4096): + self.model = model_name if model_name else "claude-3-5-sonnet-20240620" + try: + # Anthropic's max_tokens is an integer. + self.max_tokens = int(max_tokens_str) if max_tokens_str is not None else default_max_tokens + except ValueError: + logging.error(f"Invalid value for Anthropic max_tokens: {max_tokens_str}. Using default {default_max_tokens}.") + self.max_tokens = default_max_tokens + logging.info(f"Configured to use Anthropic model: {self.model} with max_tokens: {self.max_tokens}") + + def get_system_prompt_description(self) -> str: + system_prompt_path = os.getenv("SYSTEM_PROMPT_PATH") + if system_prompt_path and os.path.isfile(system_prompt_path): + return f"System Prompt File: {os.path.basename(system_prompt_path)}" + elif system_prompt_path: + return f"System Prompt File: {os.path.basename(system_prompt_path)} (Not found at path: {system_prompt_path})" + else: + return "System Prompt File: Not configured (SYSTEM_PROMPT_PATH not set)." + + def get_llm_description(self) -> str: + return f"LLM: {self.model}, Max Tokens: {self.max_tokens}" + + def get_chat_response(self, messages_history): + current_system_prompt = self.system_prompt if self.system_prompt else "" + anthropic_tools = [] + if hasattr(self, 'functions') and self.functions: + anthropic_tools = [ + { + "name": function['name'], + "description": function['description'], + "input_schema": function['parameters'] if function['parameters'] not in [None, {}] else {"type": "object", "properties": {}} + } + for function in self.functions + ] + + try: + response = self.anthropic_client.messages.create( + model=self.model, + system=current_system_prompt, + messages=messages_history, + max_tokens=self.max_tokens, + tools=anthropic_tools if anthropic_tools else None, + tool_choice={"type": "auto"} if anthropic_tools else None + ) + return response + except Exception as e: + logging.error(f"Anthropic API call failed: {e}") + raise + + async def handle_message(self, user_id, user_message): + if user_id not in self.conversation_history: + self.conversation_history[user_id] = [] + + self.conversation_history[user_id].append({"role": "user", "content": user_message}) + current_turn_messages = list(self.conversation_history[user_id]) + + MAX_TOOL_ITERATIONS = 5 + tool_use_count = 0 + assistant_response_content = "" + + while tool_use_count < MAX_TOOL_ITERATIONS: + response = self.get_chat_response(current_turn_messages) + + if not response or not response.content: + logging.error("No valid response content from Anthropic LLM.") + self.conversation_history[user_id] = current_turn_messages # Persist what we have + return "Error: Could not get a valid response from the LLM." + + assistant_current_turn_content_blocks = response.content + current_turn_messages.append({"role": "assistant", "content": assistant_current_turn_content_blocks}) + + text_parts_from_assistant = [] + tool_calls_from_response = [] + for block in assistant_current_turn_content_blocks: + if block.type == "text": + text_parts_from_assistant.append(block.text) + elif block.type == "tool_use": + tool_calls_from_response.append(block) + + assistant_response_content = "".join(text_parts_from_assistant) + + if not tool_calls_from_response: + break + + tool_results_for_model = [] + for tool_call in tool_calls_from_response: + tool_name = tool_call.name + tool_input = tool_call.input + tool_use_id = tool_call.id + + logging.info(f"Attempting to call Anthropic tool: {tool_name} with input: {tool_input}") + try: + tool_response_data = self.call_tool(tool_name, tool_input) + + if isinstance(tool_response_data, str): + tool_result_content_block = [{"type": "text", "text": tool_response_data}] + elif isinstance(tool_response_data, dict) or isinstance(tool_response_data, list): + try: + # If tool_response_data is already a list of Anthropic content blocks, use as is. + # Otherwise, dump to JSON string and wrap in a text block. + is_valid_block_list = isinstance(tool_response_data, list) and all(isinstance(item, dict) and "type" in item for item in tool_response_data) + if is_valid_block_list: + tool_result_content_block = tool_response_data + else: + tool_result_content_block = [{"type": "text", "text": json.dumps(tool_response_data)}] + except (TypeError, json.JSONDecodeError): # Not easily serializable or not a valid block list + tool_result_content_block = [{"type": "text", "text": str(tool_response_data)}] + else: # bool, int, float, None, etc. + tool_result_content_block = [{"type": "text", "text": str(tool_response_data)}] + + tool_results_for_model.append({ + "type": "tool_result", + "tool_use_id": tool_use_id, + "content": tool_result_content_block + }) + except Exception as e: + logging.error(f"Error calling tool {tool_name}: {e}") + tool_results_for_model.append({ + "type": "tool_result", + "tool_use_id": tool_use_id, + "content": [{"type": "text", "text": f"Error executing tool {tool_name}: {str(e)}"}], + "is_error": True + }) + + current_turn_messages.append({"role": "user", "content": tool_results_for_model}) + + tool_use_count += 1 + if tool_use_count >= MAX_TOOL_ITERATIONS: + logging.warning(f"Max tool iterations ({MAX_TOOL_ITERATIONS}) reached for Anthropic.") + break + + self.conversation_history[user_id] = current_turn_messages + + if len(self.conversation_history[user_id]) > 20: + self.conversation_history[user_id] = self.conversation_history[user_id][-20:] + + if assistant_response_content: # Text from the last successful assistant turn (or before max iterations) + return assistant_response_content + else: # Fallback if no text content was generated by assistant (e.g. initial error, or only tool use) + if current_turn_messages: + # Try to get the *very last* text block from the *very last* assistant message in history. + last_message_in_turn = current_turn_messages[-1] + if last_message_in_turn.get("role") == "assistant" and isinstance(last_message_in_turn.get("content"), list): + for block in reversed(last_message_in_turn["content"]): + if block.type == "text": + return block.text + return "No textual response from assistant." + + + async def start(self): + logging.info("Anthropic Bot started") + + async def clear(self, user_id): + super().clear_conversation(user_id) + logging.info(f"Cleared conversation history for Anthropic bot, user {user_id}") + + async def abort_processing(self, user_id): + if user_id in self.processing_status: + self.processing_status[user_id]["processing"] = False + await self.clear(user_id) + return "Processing aborted and conversation cleared." + else: + await self.clear(user_id) + return "No active processing found to abort. Conversation cleared." + + async def switch_model(self): + primary_model = os.environ.get("ANTHROPIC_MODEL", "claude-3-5-sonnet-20240620") + primary_max_tokens = os.environ.get("ANTHROPIC_MAX_TOKENS", "4096") + + secondary_model_env = os.environ.get("ANTHROPIC_SECONDARY_MODEL") + secondary_max_tokens_env = os.environ.get("ANTHROPIC_SECONDARY_MAX_TOKENS") + + if not secondary_model_env: + logging.warning("ANTHROPIC_SECONDARY_MODEL not defined. Cannot switch model.") + return f"Model switching not configured. Currently using {self.model}." + + if self.model == primary_model: + target_model = secondary_model_env + target_max_tokens = secondary_max_tokens_env if secondary_max_tokens_env else "2048" + else: + target_model = primary_model + target_max_tokens = primary_max_tokens + + self._configure_model_and_tokens(target_model, target_max_tokens) + logging.info(f"Switched Anthropic model to: {self.model}") + return f"Switched to Anthropic model: {self.model}" + +def main(): + if not os.environ.get("ANTHROPIC_API_KEY"): + logging.error("FATAL: ANTHROPIC_API_KEY environment variable not set.") + return + + logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') + + bot = AnthropicTelegramInferenceBot() + telegram_helper = TelegramHelper(bot) + telegram_helper.run() + +if __name__ == '__main__': + main()