From 1f5fbc3db8a83ba08cc233abfe16333ee9046059 Mon Sep 17 00:00:00 2001 From: cyclop-bot <178948048+cyclop-bot@users.noreply.github.com> Date: Mon, 2 Jun 2025 14:56:32 -0500 Subject: [PATCH] Refactor gemini_telegram_inference_bot.py to use OpenAICompatibleInferenceBot. --- gemini_telegram_inference_bot.py | 159 ++----------------------------- 1 file changed, 7 insertions(+), 152 deletions(-) diff --git a/gemini_telegram_inference_bot.py b/gemini_telegram_inference_bot.py index fccde2f..5c5f549 100644 --- a/gemini_telegram_inference_bot.py +++ b/gemini_telegram_inference_bot.py @@ -1,166 +1,27 @@ -import json import os import logging -from base_telegram_inference_bot import BaseTelegramInferenceBot -from telegram_helper import TelegramHelper # This import might be unused if main() is removed or TelegramHelper is not directly instantiated here. from openai import OpenAI +from openai_compatible_inference_bot import OpenAICompatibleInferenceBot +from telegram_helper import TelegramHelper -# logging.basicConfig(level=logging.INFO) # Usually configured in main execution script - -class GeminiTelegramInferenceBot(BaseTelegramInferenceBot): +class GeminiTelegramInferenceBot(OpenAICompatibleInferenceBot): def __init__(self): super().__init__() self.client = OpenAI(api_key=os.environ.get("GEMINI_API_KEY"), base_url=os.environ.get("GEMINI_API_BASE_URL")) self._configure_model_and_tokens( - os.environ.get("GEMINI_SMALL_MODEL"), + os.environ.get("GEMINI_SMALL_MODEL", "gemini-pro"), os.environ.get("GEMINI_SMALL_MODEL_MAX_TOKENS") ) - def _configure_model_and_tokens(self, model_name, max_tokens_str, default_max_tokens=1000): - self.model = model_name if model_name else "default-gemini-model" # Ensure model has a default - try: - self.max_tokens = int(max_tokens_str) if max_tokens_str is not None else default_max_tokens - except ValueError: - logging.error(f"Invalid value for max_tokens: {max_tokens_str}. Using default {default_max_tokens}.") - self.max_tokens = default_max_tokens - logging.info(f"Configured to use model: {self.model} with max_tokens: {self.max_tokens}") - - def get_system_prompt_description(self) -> str: - system_prompt_path = os.getenv("SYSTEM_PROMPT_PATH") - if system_prompt_path and os.path.isfile(system_prompt_path): - return f"System Prompt File: {os.path.basename(system_prompt_path)}" - elif system_prompt_path: # Path is set but file not found - return f"System Prompt File: {os.path.basename(system_prompt_path)} (Not found at path: {system_prompt_path})" - else: # Path not set - return "System Prompt File: Not configured (SYSTEM_PROMPT_PATH not set)." - - def get_llm_description(self) -> str: - return f"LLM: {self.model}, Max Tokens: {self.max_tokens}" - - def get_chat_response(self, messages): - try: - response = self.client.chat.completions.create( - model=self.model, - messages=messages, - tools=self.functions if hasattr(self, 'functions') and self.functions else None, - tool_choice="auto" if hasattr(self, 'functions') and self.functions else None, - max_tokens=self.max_tokens - ) - return response - except Exception as e: - logging.error(f"Gemini API call failed: {e}") - # Return a more structured error or re-raise a custom exception - # For now, re-raising to be handled by the caller - raise - - async def handle_message(self, user_id, user_message): - if user_id not in self.conversation_history: - self.conversation_history[user_id] = [] - if hasattr(self, 'system_prompt') and self.system_prompt: - self.conversation_history[user_id].append({"role": "system", "content": self.system_prompt}) - - self.conversation_history[user_id].append({"role": "user", "content": user_message}) - messages = self.conversation_history[user_id] - - response = self.get_chat_response(messages) - - # Ensure response.choices[0].message exists before appending - if response.choices and response.choices[0].message: - messages.append(response.choices[0].message) # Append the assistant's response message - else: - logging.error("No valid response choice message from LLM.") - return "Error: Could not get a valid response from the LLM." - - tool_calls_from_response = [] - if response.choices[0].message.tool_calls: - tool_calls_from_response.extend(response.choices[0].message.tool_calls) - - tool_use_count = 0 - MAX_TOOL_ITERATIONS = 5 # Define a max to prevent infinite loops more explicitly - - while tool_calls_from_response and tool_use_count < MAX_TOOL_ITERATIONS: - tool_results_for_model = [] # Results to be sent back to the model - - for tool_call in tool_calls_from_response: - tool_call_id = tool_call.id - function_to_call = tool_call.function - - logging.info(f"Attempting to call tool: {function_to_call.name} with args: {function_to_call.arguments}") - try: - tool_response_content = self.call_tool(function_to_call.name, function_to_call.arguments) - # Ensure tool_response_content is a string for the API - if not isinstance(tool_response_content, str): - tool_response_content = json.dumps(tool_response_content) - except Exception as e: - logging.error(f"Error calling tool {function_to_call.name}: {e}") - tool_response_content = f"Error executing tool {function_to_call.name}: {str(e)}" - - tool_results_for_model.append({ - "role": "tool", - "tool_call_id": tool_call_id, - "name": function_to_call.name, - "content": tool_response_content - }) - - messages.extend(tool_results_for_model) # Add tool responses to message history - - # Get new response from model based on tool execution results - response = self.get_chat_response(messages) - if not (response.choices and response.choices[0].message): - logging.error("No valid response choice message from LLM after tool call.") - return "Error: Could not get a valid response from the LLM after tool call." - - messages.append(response.choices[0].message) # Append new assistant message - - # Check for new tool calls - tool_calls_from_response = [] # Reset for this iteration - if response.choices[0].message.tool_calls: - tool_calls_from_response.extend(response.choices[0].message.tool_calls) - - tool_use_count += 1 - if tool_use_count >= MAX_TOOL_ITERATIONS and tool_calls_from_response: - logging.warning(f"Max tool iterations ({MAX_TOOL_ITERATIONS}) reached. Returning last assistant message.") - # May need to return a message indicating this to user - - # Conversation history management - if len(self.conversation_history[user_id]) > 2000: # Assuming this limit is for messages, not tokens - self.conversation_history[user_id] = self.conversation_history[user_id][-2000:] - - # Return the latest assistant content - final_assistant_message = messages[-1] - return final_assistant_message.content if final_assistant_message.role == "assistant" and final_assistant_message.content else "No content in final message." - - - async def start(self): - logging.info("Gemini Bot started") - # super().start() if Base class start() has common logic - - async def clear(self, user_id): - super().clear_conversation(user_id) # Calls base class method - - # status() method is inherited from BaseTelegramInferenceBot - - async def abort_processing(self, user_id): - if user_id in self.processing_status: - self.processing_status[user_id]["processing"] = False - # It's good practice to also clear the conversation for an aborted state - await self.clear(user_id) - return "Processing aborted and conversation cleared." - else: - # If no specific status, clearing conversation is a safe default - await self.clear(user_id) - return "No active processing found to abort. Conversation cleared." - async def switch_model(self): - current_small_model = os.environ.get("GEMINI_SMALL_MODEL") - current_large_model = os.environ.get("GEMINI_LARGE_MODEL") + current_small_model = os.environ.get("GEMINI_SMALL_MODEL", "gemini-pro") + current_large_model = os.environ.get("GEMINI_LARGE_MODEL", "gemini-1.5-pro-latest") - # Default to small model if current model is not recognized or if it's the large one if self.model == current_large_model or self.model != current_small_model : target_model = current_small_model target_max_tokens = os.environ.get("GEMINI_SMALL_MODEL_MAX_TOKENS") - else: # Current is small, switch to large + else: target_model = current_large_model target_max_tokens = os.environ.get("GEMINI_LARGE_MODEL_MAX_TOKENS") @@ -168,20 +29,14 @@ class GeminiTelegramInferenceBot(BaseTelegramInferenceBot): logging.info(f"Switched to model: {self.model}") return f"Switched to model: {self.model}" -# The main() function and if __name__ == '__main__': block are for standalone execution. -# If this bot is imported as a module, these might not be necessary or might be handled differently. -# For now, keeping them as they were. def main(): if not os.environ.get("GEMINI_API_KEY"): logging.error("FATAL: GEMINI_API_KEY environment variable not set.") return - # Configure logging here if it's the main entry point logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') bot = GeminiTelegramInferenceBot() - # The instantiation of TelegramHelper and running it implies this file can be an entry point. - # If it's purely a module, this main() would be removed. telegram_helper = TelegramHelper(bot) telegram_helper.run()