Files
cyclop/tools/log_tool.py
T
2025-06-02 17:01:03 -05:00

106 lines
5.5 KiB
Python

# tools/log_tool.py
from .base_tool import BaseTool
from .metrics import metrics
import logging
import os
from datetime import datetime, timedelta
class LogTool(BaseTool):
# Default log format string that _get_log_contents expects for time-based filtering.
# Making it a class variable so it's visible and could be overridden by a subclass if needed,
# though the parser is still hardcoded in this version.
EXPECTED_LOG_TIMESTAMP_FORMAT = '%Y-%m-%d %H:%M:%S,%f'
def __init__(self, log_file_path=None, logger=None):
self.configured_log_file_path = log_file_path if log_file_path else 'logs/output.log'
self.logger = logger if logger else logging.getLogger(__name__)
if not self.logger.handlers:
self.logger.addHandler(logging.NullHandler())
self.logger.info(f"LogTool initialized. Log file path: {self.configured_log_file_path}")
def clear(self):
# No specific state to clear for LogTool in this version.
self.logger.debug("LogTool clear called, no action taken.")
pass
def get_functions(self):
return [
{
"type": "function",
"function": {
"name": "get_log_contents",
"description": "Get the contents of the log file. If line_count is not provided, it attempts to return logs from the last 24 hours based on timestamp.",
"parameters": {
"type": "object",
"properties": {
"line_count": {
"type": "integer",
"description": "Number of lines from the end of the log file to retrieve. If omitted, logs from last 24 hours are returned."
}
},
"required": [] # line_count is optional
}
}
}
]
@metrics.measure
def execute(self, function_name, **kwargs):
self.logger.info(f"Executing LogTool function: {function_name} with args: {kwargs}")
if function_name == "get_log_contents":
# kwargs.get("line_count") will be None if not provided, which is handled by _get_log_contents
return self._get_log_contents(line_count=kwargs.get("line_count"))
else:
error_message = f"Unknown function: {function_name}"
self.logger.error(error_message)
return error_message
@metrics.measure
def _get_log_contents(self, line_count=None): # Default line_count is None to trigger 24h logic if not specified
self.logger.info(f"Attempting to get log contents from: {self.configured_log_file_path}. Line count: {line_count if line_count is not None else 'Last 24 hours'}")
if not os.path.exists(self.configured_log_file_path):
error_message = f"Log file does not exist at path: {self.configured_log_file_path}"
self.logger.error(error_message)
return error_message
try:
with open(self.configured_log_file_path, 'r', encoding='utf-8') as log_file:
log_lines = log_file.readlines()
self.logger.debug(f"Read {len(log_lines)} total lines from log file.")
if line_count is not None:
# Ensure line_count is positive if specified, otherwise could lead to unexpected slicing
if not isinstance(line_count, int) or line_count <= 0:
self.logger.warning(f"Invalid line_count '{line_count}' provided, defaulting to fetch last 150 lines.")
line_count = 150 # Default to a sensible number if invalid value provided
log_lines = log_lines[-line_count:]
self.logger.info(f"Returning last {len(log_lines)} lines based on line_count: {line_count}")
else:
# Default to last 24 hours if line_count is explicitly None or not provided
self.logger.info(f"Filtering logs for the last 24 hours. Expected timestamp format: {self.EXPECTED_LOG_TIMESTAMP_FORMAT}")
now = datetime.now()
twenty_four_hours_ago = now - timedelta(days=1)
filtered_lines = []
for line in log_lines:
try:
# Attempt to parse timestamp from the beginning of the line
timestamp_str = line.split(' - ', 1)[0]
log_time = datetime.strptime(timestamp_str, self.EXPECTED_LOG_TIMESTAMP_FORMAT)
if log_time > twenty_four_hours_ago:
filtered_lines.append(line)
except (ValueError, IndexError) as e:
# This means the line doesn't start with a parsable timestamp in the expected format.
# Depending on requirements, these lines might be skipped or included.
# For strict 24-hour filtering, we skip them.
self.logger.debug(f"Skipping line due to timestamp parse error ('{e}') or format mismatch: {line.strip()[:100]}...")
log_lines = filtered_lines
self.logger.info(f"Returning {len(log_lines)} lines from the last 24 hours.")
return "".join(log_lines)
except Exception as e:
error_message = f"An error occurred while reading the log file '{self.configured_log_file_path}': {e}"
self.logger.error(error_message, exc_info=True)
return error_message