424 lines
21 KiB
Python
424 lines
21 KiB
Python
import requests
|
|
import os
|
|
import zipfile
|
|
import io
|
|
import re
|
|
import logging
|
|
from .base_tool import BaseTool # Added
|
|
from .metrics import metrics # Added
|
|
|
|
# Configure logging for the tool - This will be handled by the logger instance now
|
|
# logger = logging.getLogger(__name__) # Commented out or removed
|
|
|
|
class GitHubCIHelper(BaseTool): # Inherits from BaseTool
|
|
"""
|
|
A helper class to interact with GitHub Actions CI,
|
|
specifically for fetching and analyzing test logs.
|
|
"""
|
|
def __init__(self, repo_owner: str, repo_name: str, github_token: str = None, session=None, logger_instance=None): # Added session and logger_instance
|
|
"""
|
|
Initializes the GitHubCIHelper.
|
|
|
|
Args:
|
|
repo_owner (str): The owner of the GitHub repository (e.g., \'\'\'bucolucas\'\'\').
|
|
repo_name (str): The name of the GitHub repository (e.g., \'\'\'cyclop\'\'\').
|
|
github_token (str, optional): A GitHub Personal Access Token (PAT)
|
|
for API authentication. Recommended for
|
|
private repos or higher rate limits.
|
|
Can also be set via GITHUB_TOKEN env var.
|
|
session (requests.Session, optional): An external requests session to use.
|
|
logger_instance (logging.Logger, optional): An external logger instance.
|
|
"""
|
|
self.repo_owner = repo_owner
|
|
self.repo_name = repo_name
|
|
self.base_url = f"https://api.github.com/repos/{self.repo_owner}/{self.repo_name}"
|
|
self._token = github_token or os.environ.get(\'GITHUB_TOKEN\') # Renamed to _token for consistency
|
|
|
|
self.headers = {
|
|
"Accept": "application/vnd.github.v3+json"
|
|
}
|
|
if self._token: # Use self._token
|
|
self.headers["Authorization"] = f"token {self._token}"
|
|
|
|
if session:
|
|
self.session = session
|
|
else:
|
|
self.session = requests.Session()
|
|
# Headers are applied per-request in _make_request or directly where self.session is used if needed globally for session
|
|
|
|
self.logger = logger_instance if logger_instance else logging.getLogger(__name__)
|
|
if not self.logger.handlers:
|
|
self.logger.addHandler(logging.NullHandler())
|
|
|
|
def get_functions(self):
|
|
return [
|
|
{
|
|
"type": "function",
|
|
"function": {
|
|
"name": "get_pr_workflow_runs",
|
|
"description": "Gets all workflow runs associated with a specific pull request.",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"pull_request_number": {"type": "integer", "description": "The number of the pull request."}
|
|
},
|
|
"required": ["pull_request_number"]
|
|
}
|
|
}
|
|
},
|
|
{
|
|
"type": "function",
|
|
"function": {
|
|
"name": "get_latest_failed_run_for_pr",
|
|
"description": "Gets the latest failed workflow run for a specific pull request and workflow name.",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"pull_request_number": {"type": "integer", "description": "The number of the pull request."},
|
|
"workflow_name": {"type": "string", "description": "The display name of the workflow (e.g., \'\'\'Python CI\'\'\').", "default": "Python CI"}
|
|
},
|
|
"required": ["pull_request_number"]
|
|
}
|
|
}
|
|
},
|
|
{
|
|
"type": "function",
|
|
"function": {
|
|
"name": "get_job_logs_for_run",
|
|
"description": "Downloads and returns the logs for a specific job within a workflow run.",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"run_id": {"type": "integer", "description": "The ID of the workflow run."},
|
|
"job_name": {"type": "string", "description": "The name of the job (e.g., \'\'\'test\'\'\').", "default": "test"}
|
|
},
|
|
"required": ["run_id"]
|
|
}
|
|
}
|
|
},
|
|
{
|
|
"type": "function",
|
|
"function": {
|
|
"name": "parse_unittest_failures_from_log",
|
|
"description": "Parses unittest failure details from log content. This is a basic parser and might need adjustments based on specific log formats.",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"log_content": {"type": "string", "description": "The string content of the job log."}
|
|
},
|
|
"required": ["log_content"]
|
|
}
|
|
}
|
|
}
|
|
]
|
|
|
|
@metrics.measure
|
|
def execute(self, function_name, **kwargs):
|
|
self.logger.info(f"Executing GitHub CI Helper function: {function_name} with args: {kwargs}")
|
|
# Dispatch to the appropriate public method
|
|
if hasattr(self, function_name) and callable(getattr(self, function_name)) and not function_name.startswith("_"):
|
|
method = getattr(self, function_name)
|
|
try:
|
|
return method(**kwargs)
|
|
except Exception as e:
|
|
self.logger.error(f"Error executing {function_name}: {e}", exc_info=True)
|
|
return f"Error during {function_name} execution: {str(e)}"
|
|
else:
|
|
error_message = f"Unknown or private function: {function_name}"
|
|
self.logger.error(error_message)
|
|
return error_message
|
|
|
|
def clear(self):
|
|
"""Clears any sensitive state if necessary. For this tool, it\'s a no-op but present for interface consistency."""
|
|
self.logger.info("GitHubCIHelper state cleared (no specific state to clear).")
|
|
|
|
|
|
@metrics.measure
|
|
def _make_request(self, method: str, url: str, **kwargs): # Added @metrics.measure
|
|
"""Helper function for making HTTP requests."""
|
|
try:
|
|
# Use self.session instead of requests directly
|
|
response = self.session.request(method, url, headers=self.headers, **kwargs)
|
|
response.raise_for_status()
|
|
if response.content and response.headers.get(\'Content-Type\', \'\').startswith(\'application/json\'):
|
|
return response.json()
|
|
elif response.content: # For non-JSON content like zip files or plain text logs
|
|
return response
|
|
return None
|
|
except requests.exceptions.HTTPError as e:
|
|
self.logger.error(f"HTTP error occurred: {e} - {e.response.text if e.response else \'No response text\'}") # Use self.logger
|
|
raise
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.error(f"Request failed: {e}") # Use self.logger
|
|
raise
|
|
|
|
@metrics.measure
|
|
def get_pr_workflow_runs(self, pull_request_number: int): # Added @metrics.measure
|
|
"""
|
|
Gets all workflow runs associated with a specific pull request.
|
|
"""
|
|
self.logger.info(f"Getting workflow runs for PR #{pull_request_number}")
|
|
pr_url = f"{self.base_url}/pulls/{pull_request_number}"
|
|
try:
|
|
pr_response = self._make_request("GET", pr_url) # this returns a response object or parsed JSON
|
|
pr_data = pr_response if isinstance(pr_response, dict) else pr_response.json() # Ensure pr_data is dict
|
|
|
|
if not pr_data or \'head\' not in pr_data or \'sha\' not in pr_data[\'head\']:
|
|
self.logger.error(f"Could not get head SHA for PR {pull_request_number}. Response: {pr_data}")
|
|
return None
|
|
head_sha = pr_data[\'head\'][\'sha\']
|
|
|
|
runs_url = f"{self.base_url}/actions/runs?event=pull_request&head_sha={head_sha}"
|
|
runs_response = self._make_request("GET", runs_url)
|
|
runs_data = runs_response if isinstance(runs_response, dict) else runs_response.json()
|
|
|
|
return runs_data.get("workflow_runs") if runs_data else None
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to get PR workflow runs for PR {pull_request_number}: {e}", exc_info=True)
|
|
return None
|
|
|
|
|
|
@metrics.measure
|
|
def get_latest_failed_run_for_pr(self, pull_request_number: int, workflow_name: str = "Python CI"): # Added @metrics.measure
|
|
"""
|
|
Gets the latest failed workflow run for a specific pull request and workflow name.
|
|
"""
|
|
self.logger.info(f"Getting latest failed run for PR #{pull_request_number}, workflow: \'{workflow_name}\'")
|
|
runs = self.get_pr_workflow_runs(pull_request_number)
|
|
if not runs:
|
|
self.logger.info(f"No runs found for PR #{pull_request_number} to check for failures.")
|
|
return None
|
|
|
|
for run in sorted(runs, key=lambda r: r[\'created_at\'], reverse=True):
|
|
if run[\'name\'] == workflow_name and run[\'conclusion\'] == \'failure\':
|
|
self.logger.info(f"Found failed run {run[\'id\']} for workflow \'{workflow_name}\' in PR #{pull_request_number}")
|
|
return run
|
|
self.logger.info(f"No failed run for workflow \'{workflow_name}\' found for PR #{pull_request_number}") # Use self.logger
|
|
return None
|
|
|
|
@metrics.measure
|
|
def get_job_logs_for_run(self, run_id: int, job_name: str = "test"): # Added @metrics.measure
|
|
"""
|
|
Downloads and returns the logs for a specific job within a workflow run.
|
|
"""
|
|
self.logger.info(f"Getting job logs for run ID {run_id}, job name \'{job_name}\'")
|
|
jobs_url = f"{self.base_url}/actions/runs/{run_id}/jobs"
|
|
target_job = None # Initialize target_job here to ensure it\'s defined for later logging
|
|
try:
|
|
jobs_response = self._make_request("GET", jobs_url)
|
|
jobs_data = jobs_response if isinstance(jobs_response, dict) else jobs_response.json()
|
|
|
|
if not jobs_data or "jobs" not in jobs_data:
|
|
self.logger.error(f"Could not retrieve jobs for run ID {run_id}. Response: {jobs_data}")
|
|
return None
|
|
|
|
for job in jobs_data["jobs"]:
|
|
if job["name"] == job_name:
|
|
target_job = job
|
|
break
|
|
|
|
if not target_job:
|
|
self.logger.error(f"Job \'{job_name}\' not found in run ID {run_id}")
|
|
return None
|
|
|
|
if target_job[\'status\'] != \'completed\':
|
|
self.logger.info(f"Job \'{job_name}\' in run ID {run_id} has not completed. Status: {target_job[\'status\']}")
|
|
return f"Job \'{job_name}\' not yet completed (status: {target_job[\'status\']}). Logs may be unavailable."
|
|
|
|
|
|
logs_url = f"{self.base_url}/actions/jobs/{target_job[\'id\']}/logs"
|
|
self.logger.info(f"Attempting to download logs from: {logs_url}")
|
|
|
|
log_response = self.session.get(logs_url, headers=self.headers, allow_redirects=True, stream=True)
|
|
log_response.raise_for_status()
|
|
|
|
if \'application/zip\' in log_response.headers.get(\'Content-Type\', \'\'):
|
|
self.logger.info(f"Received zip file for logs of job ID {target_job[\'id\']}.")
|
|
with zipfile.ZipFile(io.BytesIO(log_response.content)) as zf:
|
|
log_file_names = [name for name in zf.namelist() if not name.endswith(\'/\')]
|
|
if not log_file_names:
|
|
self.logger.error(f"No files found in the downloaded log zip for job ID {target_job[\'id\']}.")
|
|
return None
|
|
|
|
actual_log_file_name = log_file_names[0]
|
|
for name in log_file_names:
|
|
# Improved heuristic for log file name
|
|
if job_name in name or "test" in name.lower() or "log" in name.lower() or "out" in name.lower() or "step" in name:
|
|
actual_log_file_name = name
|
|
break
|
|
|
|
self.logger.info(f"Extracting log file: {actual_log_file_name} from zip for job ID {target_job[\'id\']}.")
|
|
with zf.open(actual_log_file_name) as log_file:
|
|
return log_file.read().decode(\'utf-8\')
|
|
else:
|
|
self.logger.info(f"Received plain text logs for job ID {target_job[\'id\']}.")
|
|
return log_response.text
|
|
|
|
except requests.exceptions.HTTPError as e:
|
|
self.logger.error(f"HTTP error downloading logs for job ID {target_job.get(\'id\', \'unknown\') if target_job else \'unknown\'}: {e} - {e.response.text if e.response else \'No response text\'}", exc_info=True)
|
|
if e.response and e.response.status_code == 404:
|
|
self.logger.error("Log download URL might be invalid or logs expired.")
|
|
return f"Error downloading logs: {e}"
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.error(f"Request failed downloading logs for job ID {target_job.get(\'id\', \'unknown\') if target_job else \'unknown\'}: {e}", exc_info=True)
|
|
return f"Error during log download request: {e}"
|
|
except zipfile.BadZipFile:
|
|
self.logger.error(f"Failed to unzip logs for job ID {target_job.get(\'id\', \'unknown\') if target_job else \'unknown\'}.", exc_info=True)
|
|
# Adding response text for BadZipFile can be risky if it's large binary data.
|
|
# Consider logging only a snippet or specific headers if this occurs frequently.
|
|
return "Failed to unzip logs."
|
|
except Exception as e:
|
|
self.logger.error(f"An unexpected error occurred while processing logs for job {target_job.get(\'id\', \'unknown\') if target_job else \'unknown\'}: {e}", exc_info=True)
|
|
return f"Unexpected error processing logs: {e}"
|
|
|
|
|
|
@metrics.measure
|
|
def parse_unittest_failures_from_log(self, log_content: str): # Added @metrics.measure
|
|
"""
|
|
Parses unittest failure details from log content.
|
|
"""
|
|
if not log_content:
|
|
self.logger.info("Log content is empty, no failures to parse.")
|
|
return []
|
|
|
|
self.logger.info(f"Parsing unittest failures from log content (length: {len(log_content)}).")
|
|
|
|
# Regex to capture standard unittest failure blocks
|
|
# It looks for "FAIL:" or "ERROR:", the test name (e.g., test_my_method), the class (e.g., my_module.MyTestClass),
|
|
# then a line of hyphens, "Traceback (most recent call last):", and the traceback details.
|
|
# It stops before the next failure block or common summary lines.
|
|
failure_pattern = re.compile(
|
|
r"^(FAIL|ERROR): (.*?)\s*\\((.*?)\\)\s*\
|
|
-{5,}\\s*\
|
|
Traceback \\(most recent call last\\):\\s*\
|
|
(.*?)(?=\
|
|
(?:FAIL:|ERROR:)|\
|
|
-{5,}\\s*\
|
|
Ran \\d+ tests? in|\\Z)",
|
|
re.DOTALL | re.MULTILINE
|
|
)
|
|
|
|
failures = []
|
|
for match in failure_pattern.finditer(log_content):
|
|
failure_type = match.group(1) # FAIL or ERROR
|
|
test_name = match.group(2).strip() # e.g., test_specific_behavior
|
|
test_module_class = match.group(3).strip() # e.g., tests.test_module.TestMyFeature
|
|
traceback_details = match.group(4).strip() # The actual traceback
|
|
|
|
# Reconstruct a readable failure block
|
|
failure_block = (
|
|
f"{failure_type}: {test_name} ({test_module_class})\
|
|
"
|
|
f"---------------------\
|
|
"
|
|
f"Traceback (most recent call last):\
|
|
"
|
|
f"{traceback_details}"
|
|
)
|
|
failures.append(failure_block)
|
|
|
|
if failures:
|
|
self.logger.info(f"Parsed {len(failures)} failure blocks using primary regex.")
|
|
return failures
|
|
|
|
# Fallback: A more general pattern if the above doesn't match (e.g., due to slight variations in formatting)
|
|
# This looks for "FAIL:" or "ERROR:", a line for the test name, then captures content until common separators.
|
|
general_failure_pattern = re.compile(
|
|
r"^(FAIL|ERROR): ([^\
|
|
]+)\
|
|
(.*?)(?=\
|
|
(?:FAIL:|ERROR:)|\
|
|
-{20,}\
|
|
|Ran \\d+ tests? in|\\Z)",
|
|
re.DOTALL | re.MULTILINE
|
|
)
|
|
for match in general_failure_pattern.finditer(log_content):
|
|
failure_type = match.group(1)
|
|
test_header = match.group(2).strip()
|
|
details = match.group(3).strip()
|
|
full_block = f"{failure_type}: {test_header}\
|
|
{details}"
|
|
# Avoid adding essentially duplicate or overly broad captures if specific ones exist
|
|
if not any(f.startswith(f"{failure_type}: {test_header}") for f in failures):
|
|
failures.append(full_block)
|
|
|
|
if failures: # Check if fallback added anything
|
|
self.logger.info(f"Parsed {len(failures)} failure blocks using general fallback regex.")
|
|
return failures
|
|
|
|
# Last resort: if specific "FAILURES!!!" section is found, often this contains a summary.
|
|
# This might be too broad or not structured enough, but better than nothing.
|
|
if "FAILURES!!!" in log_content or "SUMMARY OF FAILURES" in log_content:
|
|
summary_marker = "FAILURES!!!" if "FAILURES!!!" in log_content else "SUMMARY OF FAILURES"
|
|
start_index = log_content.find(summary_marker)
|
|
if start_index != -1:
|
|
# Try to find a reasonable end for this summary block
|
|
end_pattern = re.compile(r"Ran \\d+ tests? in [\\d\\.]+s|\
|
|
-{70,}")
|
|
end_match = end_pattern.search(log_content, start_index)
|
|
end_index = end_match.start() if end_match else len(log_content)
|
|
failure_summary_block = log_content[start_index:end_index].strip()
|
|
if failure_summary_block:
|
|
failures.append(f"FAILURE SUMMARY BLOCK:\
|
|
{failure_summary_block}")
|
|
self.logger.info("Captured a general failure summary block.")
|
|
return failures
|
|
|
|
if not failures:
|
|
self.logger.info("No specific unittest failure blocks parsed with available patterns.")
|
|
|
|
return failures
|
|
|
|
|
|
# --- Example Usage (Illustrative) ---
|
|
if __name__ == "__main__":
|
|
# This example assumes you have GITHUB_TOKEN environment variable set
|
|
# And that \'requests\' is installed.
|
|
# Replace with your actual repo owner, name, and PR number.
|
|
pr_number = 206 # Example PR
|
|
repo_owner = "bucolucas" # Example owner
|
|
repo_name = "cyclop" # Example repo
|
|
|
|
# Setup basic logging for the example
|
|
# In a real app, logger would be configured externally
|
|
logging.basicConfig(level=logging.INFO, format=\'%(asctime)s - %(name)s - %(levelname)s - %(message)s\')
|
|
example_logger = logging.getLogger("GitHubCIHelperExample")
|
|
|
|
|
|
# Pass the logger to the helper
|
|
helper = GitHubCIHelper(repo_owner, repo_name, logger_instance=example_logger)
|
|
|
|
example_logger.info(f"Looking for failed runs for PR #{pr_number} in {repo_owner}/{repo_name}")
|
|
failed_run = helper.get_latest_failed_run_for_pr(pull_request_number=pr_number, workflow_name="Python CI")
|
|
|
|
if failed_run:
|
|
example_logger.info(f"Found failed run: ID {failed_run[\'id\']}, Status {failed_run[\'conclusion\']}")
|
|
example_logger.info(f"Attempting to download logs for job \'test\' in run {failed_run[\'id\']}...")
|
|
|
|
log_content = helper.get_job_logs_for_run(run_id=failed_run[\'id\'], job_name="test")
|
|
|
|
if isinstance(log_content, str) and not log_content.startswith("Error") and not log_content.startswith("Job") and not log_content.startswith("Failed"):
|
|
example_logger.info(f"Successfully downloaded logs (length: {len(log_content)} characters).")
|
|
|
|
example_logger.info("\
|
|
--- Parsing unittest failures ---")
|
|
failures = helper.parse_unittest_failures_from_log(log_content)
|
|
if failures:
|
|
for i, failure_details in enumerate(failures):
|
|
print(f"\
|
|
Failure {i+1}:\
|
|
{failure_details}")
|
|
else:
|
|
print("No specific unittest failures parsed by the tool.")
|
|
# Consider logging the beginning of the log if parsing fails, for debugging the regexes
|
|
# print(f"Log start:\
|
|
{log_content[:2000]}")
|
|
elif log_content is None:
|
|
example_logger.error("Could not retrieve log content (returned None).")
|
|
else: # If it\'s an error message string from the function itself
|
|
example_logger.error(f"Failed to get/process logs: {log_content}")
|
|
|
|
else:
|
|
example_logger.info(f"No failed \'Python CI\' workflow run found for PR #{pr_number} or the PR doesn\'t exist/no runs yet.")
|
|
|