2024-08-17 14:08:28 -05:00
# tools/log_tool.py
from . base_tool import BaseTool
import logging
import os
from datetime import datetime , timedelta
class LogTool ( BaseTool ) :
2025-06-02 17:01:03 -05:00
# Default log format string that _get_log_contents expects for time-based filtering.
# Making it a class variable so it's visible and could be overridden by a subclass if needed,
# though the parser is still hardcoded in this version.
EXPECTED_LOG_TIMESTAMP_FORMAT = ' % Y- % m- %d % H: % M: % S, %f '
def __init__ ( self , log_file_path = None , logger = None ) :
self . configured_log_file_path = log_file_path if log_file_path else ' logs/output.log '
self . logger = logger if logger else logging . getLogger ( __name__ )
if not self . logger . handlers :
self . logger . addHandler ( logging . NullHandler ( ) )
self . logger . info ( f " LogTool initialized. Log file path: { self . configured_log_file_path } " )
2024-08-17 14:08:28 -05:00
2024-08-18 09:19:29 -05:00
def clear ( self ) :
2025-06-02 17:01:03 -05:00
# No specific state to clear for LogTool in this version.
self . logger . debug ( " LogTool clear called, no action taken. " )
2024-08-18 09:19:29 -05:00
pass
2024-08-17 14:08:28 -05:00
def get_functions ( self ) :
return [
{
2025-06-01 11:50:12 -05:00
" type " : " function " ,
" function " : {
" name " : " get_log_contents " ,
2025-06-02 17:01:03 -05:00
" description " : " Get the contents of the log file. If line_count is not provided, it attempts to return logs from the last 24 hours based on timestamp. " ,
2025-06-01 11:50:12 -05:00
" parameters " : {
" type " : " object " ,
" properties " : {
" line_count " : {
" type " : " integer " ,
2025-06-02 17:01:03 -05:00
" description " : " Number of lines from the end of the log file to retrieve. If omitted, logs from last 24 hours are returned. "
2025-06-01 11:50:12 -05:00
}
} ,
2025-06-02 17:01:03 -05:00
" required " : [ ] # line_count is optional
2025-06-01 11:50:12 -05:00
}
2024-08-17 14:08:28 -05:00
}
}
]
def execute ( self , function_name , * * kwargs ) :
2025-06-02 17:01:03 -05:00
self . logger . info ( f " Executing LogTool function: { function_name } with args: { kwargs } " )
2024-08-17 14:08:28 -05:00
if function_name == " get_log_contents " :
2025-06-02 17:01:03 -05:00
# kwargs.get("line_count") will be None if not provided, which is handled by _get_log_contents
return self . _get_log_contents ( line_count = kwargs . get ( " line_count " ) )
2024-08-17 14:08:28 -05:00
else :
error_message = f " Unknown function: { function_name } "
self . logger . error ( error_message )
return error_message
2025-06-02 17:01:03 -05:00
def _get_log_contents ( self , line_count = None ) : # Default line_count is None to trigger 24h logic if not specified
self . logger . info ( f " Attempting to get log contents from: { self . configured_log_file_path } . Line count: { line_count if line_count is not None else ' Last 24 hours ' } " )
2024-08-17 14:08:28 -05:00
2025-06-02 17:01:03 -05:00
if not os . path . exists ( self . configured_log_file_path ) :
error_message = f " Log file does not exist at path: { self . configured_log_file_path } "
2024-08-17 14:08:28 -05:00
self . logger . error ( error_message )
return error_message
try :
2025-06-02 17:01:03 -05:00
with open ( self . configured_log_file_path , ' r ' , encoding = ' utf-8 ' ) as log_file :
2024-08-17 14:08:28 -05:00
log_lines = log_file . readlines ( )
2025-06-02 17:01:03 -05:00
self . logger . debug ( f " Read { len ( log_lines ) } total lines from log file. " )
2024-08-17 14:08:28 -05:00
2025-06-02 17:01:03 -05:00
if line_count is not None :
# Ensure line_count is positive if specified, otherwise could lead to unexpected slicing
if not isinstance ( line_count , int ) or line_count < = 0 :
self . logger . warning ( f " Invalid line_count ' { line_count } ' provided, defaulting to fetch last 150 lines. " )
line_count = 150 # Default to a sensible number if invalid value provided
log_lines = log_lines [ - line_count : ]
self . logger . info ( f " Returning last { len ( log_lines ) } lines based on line_count: { line_count } " )
else :
# Default to last 24 hours if line_count is explicitly None or not provided
self . logger . info ( f " Filtering logs for the last 24 hours. Expected timestamp format: { self . EXPECTED_LOG_TIMESTAMP_FORMAT } " )
now = datetime . now ( )
twenty_four_hours_ago = now - timedelta ( days = 1 )
filtered_lines = [ ]
for line in log_lines :
try :
# Attempt to parse timestamp from the beginning of the line
timestamp_str = line . split ( ' - ' , 1 ) [ 0 ]
log_time = datetime . strptime ( timestamp_str , self . EXPECTED_LOG_TIMESTAMP_FORMAT )
if log_time > twenty_four_hours_ago :
filtered_lines . append ( line )
except ( ValueError , IndexError ) as e :
# This means the line doesn't start with a parsable timestamp in the expected format.
# Depending on requirements, these lines might be skipped or included.
# For strict 24-hour filtering, we skip them.
self . logger . debug ( f " Skipping line due to timestamp parse error ( ' { e } ' ) or format mismatch: { line . strip ( ) [ : 100 ] } ... " )
log_lines = filtered_lines
self . logger . info ( f " Returning { len ( log_lines ) } lines from the last 24 hours. " )
2024-08-17 14:08:28 -05:00
2025-06-02 17:01:03 -05:00
return " " . join ( log_lines )
2024-08-17 14:08:28 -05:00
except Exception as e :
2025-06-02 17:01:03 -05:00
error_message = f " An error occurred while reading the log file ' { self . configured_log_file_path } ' : { e } "
self . logger . error ( error_message , exc_info = True )
return error_message