diff --git a/tools/log_tool.py b/tools/log_tool.py index 4a34557..bcadf04 100644 --- a/tools/log_tool.py +++ b/tools/log_tool.py @@ -1,5 +1,4 @@ # tools/log_tool.py - from .base_tool import BaseTool from .metrics import metrics import logging @@ -7,48 +6,39 @@ import os from datetime import datetime, timedelta class LogTool(BaseTool): - def __init__(self): - # Set up logging - self.logger = logging.getLogger(__name__) - self.logger.setLevel(logging.INFO) - - # Create a file handler - file_handler = logging.FileHandler('log_tool.log') - file_handler.setLevel(logging.INFO) - - # Create a console handler - console_handler = logging.StreamHandler() - console_handler.setLevel(logging.INFO) - - # Create a formatting for the logs - formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') - file_handler.setFormatter(formatter) - console_handler.setFormatter(formatter) - - # Add the handlers to the logger - self.logger.addHandler(file_handler) - self.logger.addHandler(console_handler) + # Default log format string that _get_log_contents expects for time-based filtering. + # Making it a class variable so it's visible and could be overridden by a subclass if needed, + # though the parser is still hardcoded in this version. + EXPECTED_LOG_TIMESTAMP_FORMAT = '%Y-%m-%d %H:%M:%S,%f' + + def __init__(self, log_file_path=None, logger=None): + self.configured_log_file_path = log_file_path if log_file_path else 'logs/output.log' + self.logger = logger if logger else logging.getLogger(__name__) + if not self.logger.handlers: + self.logger.addHandler(logging.NullHandler()) + self.logger.info(f"LogTool initialized. Log file path: {self.configured_log_file_path}") def clear(self): + # No specific state to clear for LogTool in this version. + self.logger.debug("LogTool clear called, no action taken.") pass def get_functions(self): - return [ { "type": "function", "function": { "name": "get_log_contents", - "description": "Get the contents of the log file.", + "description": "Get the contents of the log file. If line_count is not provided, it attempts to return logs from the last 24 hours based on timestamp.", "parameters": { "type": "object", "properties": { "line_count": { "type": "integer", - "description": "Number of lines from the end of the log file to retrieve" + "description": "Number of lines from the end of the log file to retrieve. If omitted, logs from last 24 hours are returned." } }, - "required": [] + "required": [] # line_count is optional } } } @@ -56,37 +46,60 @@ class LogTool(BaseTool): @metrics.measure def execute(self, function_name, **kwargs): - self.logger.info(f"Executing: {function_name}") - + self.logger.info(f"Executing LogTool function: {function_name} with args: {kwargs}") if function_name == "get_log_contents": - return self._get_log_contents(kwargs.get("line_count")) + # kwargs.get("line_count") will be None if not provided, which is handled by _get_log_contents + return self._get_log_contents(line_count=kwargs.get("line_count")) else: error_message = f"Unknown function: {function_name}" self.logger.error(error_message) return error_message @metrics.measure - def _get_log_contents(self, line_count=150): - log_file_path = 'logs/output.log' + def _get_log_contents(self, line_count=None): # Default line_count is None to trigger 24h logic if not specified + self.logger.info(f"Attempting to get log contents from: {self.configured_log_file_path}. Line count: {line_count if line_count is not None else 'Last 24 hours'}") - if not os.path.exists(log_file_path): - error_message = "Log file does not exist." + if not os.path.exists(self.configured_log_file_path): + error_message = f"Log file does not exist at path: {self.configured_log_file_path}" self.logger.error(error_message) return error_message try: - with open(log_file_path, 'r') as log_file: + with open(self.configured_log_file_path, 'r', encoding='utf-8') as log_file: log_lines = log_file.readlines() + self.logger.debug(f"Read {len(log_lines)} total lines from log file.") - if line_count is not None: - log_lines = log_lines[-line_count:] - else: - now = datetime.now() - twenty_four_hours_ago = now - timedelta(days=1) - log_lines = [line for line in log_lines if datetime.strptime(line.split(' - ')[0], '%Y-%m-%d %H:%M:%S,%f') > twenty_four_hours_ago] + if line_count is not None: + # Ensure line_count is positive if specified, otherwise could lead to unexpected slicing + if not isinstance(line_count, int) or line_count <= 0: + self.logger.warning(f"Invalid line_count '{line_count}' provided, defaulting to fetch last 150 lines.") + line_count = 150 # Default to a sensible number if invalid value provided + log_lines = log_lines[-line_count:] + self.logger.info(f"Returning last {len(log_lines)} lines based on line_count: {line_count}") + else: + # Default to last 24 hours if line_count is explicitly None or not provided + self.logger.info(f"Filtering logs for the last 24 hours. Expected timestamp format: {self.EXPECTED_LOG_TIMESTAMP_FORMAT}") + now = datetime.now() + twenty_four_hours_ago = now - timedelta(days=1) + + filtered_lines = [] + for line in log_lines: + try: + # Attempt to parse timestamp from the beginning of the line + timestamp_str = line.split(' - ', 1)[0] + log_time = datetime.strptime(timestamp_str, self.EXPECTED_LOG_TIMESTAMP_FORMAT) + if log_time > twenty_four_hours_ago: + filtered_lines.append(line) + except (ValueError, IndexError) as e: + # This means the line doesn't start with a parsable timestamp in the expected format. + # Depending on requirements, these lines might be skipped or included. + # For strict 24-hour filtering, we skip them. + self.logger.debug(f"Skipping line due to timestamp parse error ('{e}') or format mismatch: {line.strip()[:100]}...") + log_lines = filtered_lines + self.logger.info(f"Returning {len(log_lines)} lines from the last 24 hours.") - return "".join(log_lines) + return "".join(log_lines) except Exception as e: - error_message = f"An error occurred while reading the log file: {e}" - self.logger.error(error_message) - return error_message \ No newline at end of file + error_message = f"An error occurred while reading the log file '{self.configured_log_file_path}': {e}" + self.logger.error(error_message, exc_info=True) + return error_message