Refactor: Improve testability of LogTool

This commit is contained in:
cyclop-bot
2025-06-02 17:01:03 -05:00
parent bd6cb21806
commit 5c0b39a858
+55 -42
View File
@@ -1,5 +1,4 @@
# tools/log_tool.py # tools/log_tool.py
from .base_tool import BaseTool from .base_tool import BaseTool
from .metrics import metrics from .metrics import metrics
import logging import logging
@@ -7,48 +6,39 @@ import os
from datetime import datetime, timedelta from datetime import datetime, timedelta
class LogTool(BaseTool): class LogTool(BaseTool):
def __init__(self): # Default log format string that _get_log_contents expects for time-based filtering.
# Set up logging # Making it a class variable so it's visible and could be overridden by a subclass if needed,
self.logger = logging.getLogger(__name__) # though the parser is still hardcoded in this version.
self.logger.setLevel(logging.INFO) EXPECTED_LOG_TIMESTAMP_FORMAT = '%Y-%m-%d %H:%M:%S,%f'
# Create a file handler def __init__(self, log_file_path=None, logger=None):
file_handler = logging.FileHandler('log_tool.log') self.configured_log_file_path = log_file_path if log_file_path else 'logs/output.log'
file_handler.setLevel(logging.INFO) self.logger = logger if logger else logging.getLogger(__name__)
if not self.logger.handlers:
# Create a console handler self.logger.addHandler(logging.NullHandler())
console_handler = logging.StreamHandler() self.logger.info(f"LogTool initialized. Log file path: {self.configured_log_file_path}")
console_handler.setLevel(logging.INFO)
# Create a formatting for the logs
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# Add the handlers to the logger
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
def clear(self): def clear(self):
# No specific state to clear for LogTool in this version.
self.logger.debug("LogTool clear called, no action taken.")
pass pass
def get_functions(self): def get_functions(self):
return [ return [
{ {
"type": "function", "type": "function",
"function": { "function": {
"name": "get_log_contents", "name": "get_log_contents",
"description": "Get the contents of the log file.", "description": "Get the contents of the log file. If line_count is not provided, it attempts to return logs from the last 24 hours based on timestamp.",
"parameters": { "parameters": {
"type": "object", "type": "object",
"properties": { "properties": {
"line_count": { "line_count": {
"type": "integer", "type": "integer",
"description": "Number of lines from the end of the log file to retrieve" "description": "Number of lines from the end of the log file to retrieve. If omitted, logs from last 24 hours are returned."
} }
}, },
"required": [] "required": [] # line_count is optional
} }
} }
} }
@@ -56,37 +46,60 @@ class LogTool(BaseTool):
@metrics.measure @metrics.measure
def execute(self, function_name, **kwargs): def execute(self, function_name, **kwargs):
self.logger.info(f"Executing: {function_name}") self.logger.info(f"Executing LogTool function: {function_name} with args: {kwargs}")
if function_name == "get_log_contents": if function_name == "get_log_contents":
return self._get_log_contents(kwargs.get("line_count")) # kwargs.get("line_count") will be None if not provided, which is handled by _get_log_contents
return self._get_log_contents(line_count=kwargs.get("line_count"))
else: else:
error_message = f"Unknown function: {function_name}" error_message = f"Unknown function: {function_name}"
self.logger.error(error_message) self.logger.error(error_message)
return error_message return error_message
@metrics.measure @metrics.measure
def _get_log_contents(self, line_count=150): def _get_log_contents(self, line_count=None): # Default line_count is None to trigger 24h logic if not specified
log_file_path = 'logs/output.log' self.logger.info(f"Attempting to get log contents from: {self.configured_log_file_path}. Line count: {line_count if line_count is not None else 'Last 24 hours'}")
if not os.path.exists(log_file_path): if not os.path.exists(self.configured_log_file_path):
error_message = "Log file does not exist." error_message = f"Log file does not exist at path: {self.configured_log_file_path}"
self.logger.error(error_message) self.logger.error(error_message)
return error_message return error_message
try: try:
with open(log_file_path, 'r') as log_file: with open(self.configured_log_file_path, 'r', encoding='utf-8') as log_file:
log_lines = log_file.readlines() log_lines = log_file.readlines()
self.logger.debug(f"Read {len(log_lines)} total lines from log file.")
if line_count is not None: if line_count is not None:
log_lines = log_lines[-line_count:] # Ensure line_count is positive if specified, otherwise could lead to unexpected slicing
else: if not isinstance(line_count, int) or line_count <= 0:
now = datetime.now() self.logger.warning(f"Invalid line_count '{line_count}' provided, defaulting to fetch last 150 lines.")
twenty_four_hours_ago = now - timedelta(days=1) line_count = 150 # Default to a sensible number if invalid value provided
log_lines = [line for line in log_lines if datetime.strptime(line.split(' - ')[0], '%Y-%m-%d %H:%M:%S,%f') > twenty_four_hours_ago] log_lines = log_lines[-line_count:]
self.logger.info(f"Returning last {len(log_lines)} lines based on line_count: {line_count}")
else:
# Default to last 24 hours if line_count is explicitly None or not provided
self.logger.info(f"Filtering logs for the last 24 hours. Expected timestamp format: {self.EXPECTED_LOG_TIMESTAMP_FORMAT}")
now = datetime.now()
twenty_four_hours_ago = now - timedelta(days=1)
return "".join(log_lines) filtered_lines = []
for line in log_lines:
try:
# Attempt to parse timestamp from the beginning of the line
timestamp_str = line.split(' - ', 1)[0]
log_time = datetime.strptime(timestamp_str, self.EXPECTED_LOG_TIMESTAMP_FORMAT)
if log_time > twenty_four_hours_ago:
filtered_lines.append(line)
except (ValueError, IndexError) as e:
# This means the line doesn't start with a parsable timestamp in the expected format.
# Depending on requirements, these lines might be skipped or included.
# For strict 24-hour filtering, we skip them.
self.logger.debug(f"Skipping line due to timestamp parse error ('{e}') or format mismatch: {line.strip()[:100]}...")
log_lines = filtered_lines
self.logger.info(f"Returning {len(log_lines)} lines from the last 24 hours.")
return "".join(log_lines)
except Exception as e: except Exception as e:
error_message = f"An error occurred while reading the log file: {e}" error_message = f"An error occurred while reading the log file '{self.configured_log_file_path}': {e}"
self.logger.error(error_message) self.logger.error(error_message, exc_info=True)
return error_message return error_message