import json import os import logging from base_telegram_inference_bot import BaseTelegramInferenceBot # Assuming this base class exists from telegram_helper import TelegramHelper # Assuming this helper class exists from openai import OpenAI # Ensure basic logging is configured if not done elsewhere # logging.basicConfig(level=logging.INFO) # Example: You might have a more sophisticated setup class ChatGPTTelegramInferenceBot(BaseTelegramInferenceBot): def __init__(self): super().__init__() self.client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY")) self._configure_model_and_tokens( os.environ.get("OPENAI_SMALL_MODEL"), # Default model os.environ.get("OPENAI_SMALL_MODEL_MAX_TOKENS") # Default tokens ) def _configure_model_and_tokens(self, model_name, max_tokens_str, default_max_tokens=1000): self.model = model_name try: self.max_tokens = int(max_tokens_str) if max_tokens_str is not None else default_max_tokens except ValueError: logging.error(f"Invalid value for max_tokens: {max_tokens_str}. Using default {default_max_tokens}.") self.max_tokens = default_max_tokens logging.info(f"Configured to use model: {self.model} with max_tokens: {self.max_tokens}") def get_chat_response(self, messages): try: response = self.client.chat.completions.create( model=self.model, messages=messages, # The system prompt is expected to be part of messages here tools=self.functions if hasattr(self, 'functions') and self.functions else None, tool_choice="auto" if hasattr(self, 'functions') and self.functions else None, max_tokens=self.max_tokens ) return response except Exception as e: logging.error(f"OpenAI API call failed: {e}") raise async def handle_message(self, user_id, user_message): if user_id not in self.conversation_history: self.conversation_history[user_id] = [] if hasattr(self, 'system_prompt') and self.system_prompt: self.conversation_history[user_id].append({"role": "system", "content": self.system_prompt}) self.conversation_history[user_id].append({"role": "user", "content": user_message}) messages = self.conversation_history[user_id] response = self.get_chat_response(messages) tool_calls = [] for message_part in response.choices: if message_part.finish_reason == "tool_calls": tool_calls.extend(message_part.message.tool_calls) messages.append(response.choices[0].message) tool_use_count = 0 while len(tool_calls) > 0 and tool_use_count < 500: tool_use_results = [] while len(tool_calls) > 0: tool_call_message = tool_calls.pop(0) tool_call_id = tool_call_message.id tool_call = tool_call_message.function tool_response = self.call_tool(tool_call.name, tool_call.arguments) try: tool_use_results.append({"role": "tool", "tool_call_id": tool_call_id, "name":tool_call.name, "content": str(tool_response) }) except (TypeError, ValueError) as e: logging.error(f"Failed to serialize tool response: {e}") tool_use_results.append({"role": "function", "name": tool_call.name, "content": "Serialization error"}) messages.extend(tool_use_results) response = self.get_chat_response(messages) for message_part in response.choices: if message_part.finish_reason == "tool_calls": tool_calls.extend(message_part.message.tool_calls) messages.append(response.choices[0].message) tool_use_count += 1 if len(self.conversation_history[user_id]) > 20: self.conversation_history[user_id] = self.conversation_history[user_id][-20:] return messages[-1].content async def start(self): logging.info("Bot started") # Potentially call super().start() if it exists and does something async def clear(self, user_id): super().clear_conversation(user_id) async def status(self): return f"Currently using: {self.model}, Max Tokens: {self.max_tokens}" async def abort_processing(self, user_id): # This depends on how processing_status is managed, likely in BaseTelegramInferenceBot if hasattr(self, 'processing_status') and user_id in self.processing_status: self.processing_status[user_id]["processing"] = False # Example await self.clear(user_id) # Clearing conversation on abort might be desired return "Processing aborted and conversation cleared." else: # If not tracking processing_status here, just clear for safety await self.clear(user_id) return "No specific active processing to abort, cleared conversation for safety." async def switch_model(self): current_small_model = os.environ.get("OPENAI_SMALL_MODEL") current_large_model = os.environ.get("OPENAI_LARGE_MODEL") if self.model == current_small_model: target_model = current_large_model target_max_tokens = os.environ.get("OPENAI_LARGE_MODEL_MAX_TOKENS") else: target_model = current_small_model target_max_tokens = os.environ.get("OPENAI_SMALL_MODEL_MAX_TOKENS") self._configure_model_and_tokens(target_model, target_max_tokens) return f"Switched to model: {self.model}" def main(): # Ensure OPENAI_API_KEY and other environment variables are set if not os.environ.get("OPENAI_API_KEY"): logging.error("FATAL: OPENAI_API_KEY environment variable not set.") return bot = ChatGPTTelegramInferenceBot() telegram_helper = TelegramHelper(bot) telegram_helper.run() if __name__ == '__main__': logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') main()