Files
cyclop/base_telegram_inference_bot.py
T

139 lines
6.1 KiB
Python
Raw Normal View History

2024-08-19 12:54:13 -05:00
import importlib
2024-08-19 11:34:31 -05:00
import os
import json
2024-08-19 12:54:13 -05:00
import inspect
import logging
2024-08-19 11:34:31 -05:00
from abc import ABC, abstractmethod
2024-08-19 12:54:13 -05:00
from tools.base_tool import BaseTool
2024-08-19 11:34:31 -05:00
class BaseTelegramInferenceBot(ABC):
def __init__(self):
self.conversation_history = {}
self.processing_status = {}
self.system_prompt = self.load_system_prompt()
2024-08-19 12:54:13 -05:00
self.tools, self.functions = self.load_functions()
logging.info(f'System Prompt: {os.environ.get("SYSTEM_PROMPT_PATH")}')
logging.info(f'Github Repository: {os.environ.get("GITHUB_REPOSITORY")}')
2024-08-19 11:34:31 -05:00
def load_system_prompt(self):
system_prompt_path = os.getenv("SYSTEM_PROMPT_PATH")
if system_prompt_path and os.path.isfile(system_prompt_path):
try:
with open(system_prompt_path, "r", encoding="utf-8") as file:
return file.read().strip()
except IOError as e:
logging.warning(f"Could not read system prompt file {system_prompt_path}: {e}")
return "You are a helpful AI assistant."
else:
logging.warning("SYSTEM_PROMPT_PATH is not set or file does not exist. Using default system prompt.")
return "You are a helpful AI assistant."
2024-08-19 11:34:31 -05:00
def load_functions(self):
2024-08-19 12:54:13 -05:00
tools = []
functions = []
2024-08-19 12:54:13 -05:00
tools_dir = os.path.join(os.path.dirname(__file__), 'tools')
if not os.path.exists(tools_dir):
logging.warning(f"Tools directory not found: {tools_dir}")
return [], []
2024-08-19 12:54:13 -05:00
for filename in os.listdir(tools_dir):
if filename.endswith('.py') and filename != '__init__.py' and filename != 'base_tool.py':
module_name = f'tools.{filename[:-3]}'
try:
module = importlib.import_module(module_name)
for name, obj in inspect.getmembers(module):
if inspect.isclass(obj) and issubclass(obj, BaseTool) and obj != BaseTool:
try:
tools.append(obj())
except Exception as e:
logging.error(f"Error instantiating tool {name} from {filename}: {e}")
except Exception as e:
logging.error(f"Error importing module {module_name}: {e}")
2024-08-19 12:54:13 -05:00
for tool in tools:
functions.extend(tool.get_functions())
return tools, functions
2024-08-19 11:34:31 -05:00
@abstractmethod
def get_chat_response(self, messages):
pass
@abstractmethod
async def handle_message(self, user_id, user_message):
pass
def clear_conversation_history(self, user_id):
2024-08-19 11:34:31 -05:00
if user_id in self.conversation_history:
del self.conversation_history[user_id]
2024-08-19 13:38:39 -05:00
for tool in self.tools:
tool.clear()
2024-08-19 11:34:31 -05:00
def set_processing_status(self, user_id: int, message_id: int):
self.processing_status[user_id] = {"processing": True, "message_id": message_id}
def clear_processing_status(self, user_id: int):
if user_id in self.processing_status:
del self.processing_status[user_id]
2024-08-19 12:54:13 -05:00
def call_tool(self, function_call_name, function_call_arguments):
function_name = function_call_name
function_args = None
if isinstance(function_call_arguments, dict):
function_args = function_call_arguments
elif isinstance(function_call_arguments, str):
try:
function_args = json.loads(function_call_arguments)
except json.JSONDecodeError as e:
logging.error(f"Error decoding function call arguments (string) for {function_call_name}: {e}. Arguments: {function_call_arguments}")
return f"Error: Malformed arguments for tool call: {e}"
else: # Handle cases where arguments might be None or other unexpected types
if function_call_arguments is None:
function_args = {} # Default to empty dict if arguments are None
else:
logging.error(f"Unexpected type for function_call_arguments for {function_call_name}: {type(function_call_arguments)}. Arguments: {function_call_arguments}")
return f"Error: Invalid argument type for tool call: {type(function_call_arguments)}"
2024-08-19 11:34:31 -05:00
for tool in self.tools:
2025-06-01 11:50:12 -05:00
for function in tool.get_functions():
if function["function"]["name"] == function_name:
try:
# Ensure function_args is a dictionary before unpacking
if not isinstance(function_args, dict):
logging.error(f"Internal error: function_args not a dict for {function_name} before execution. Args: {function_args}")
return f"Internal error preparing arguments for tool {function_name}."
return tool.execute(function_name, **function_args)
except Exception as e:
logging.error(f"Error executing tool {function_name} with args {function_args}: {e}")
return f"Error executing tool {function_name}: {e}"
logging.warning(f"Tool function {function_name} not found.")
return f"Error: Tool function {function_name} not found."
def get_system_prompt_description(self) -> str:
"""Returns a description of the system prompt being used."""
return f"System Prompt: {'Custom' if os.getenv('SYSTEM_PROMPT_PATH') else 'Default'}"
2024-08-19 11:34:31 -05:00
@abstractmethod
def get_llm_description(self) -> str:
"""Returns a description of the LLM being used."""
2024-08-19 11:34:31 -05:00
pass
async def get_bot_status(self) -> str:
"""Provides a status message including prompt and LLM information."""
prompt_desc = self.get_system_prompt_description()
llm_desc = self.get_llm_description()
return f"{prompt_desc}\n{llm_desc}"
2024-08-19 11:34:31 -05:00
@abstractmethod
async def start(self):
2024-08-19 11:34:31 -05:00
pass
@abstractmethod
async def abort_processing(self, user_id):
2024-08-19 11:34:31 -05:00
pass
2024-08-19 11:34:31 -05:00
@abstractmethod
async def switch_model(self):
"""Switches the underlying model if supported by the bot."""
pass