297 lines
14 KiB
Python
297 lines
14 KiB
Python
import requests
|
|
import os
|
|
import zipfile
|
|
import io
|
|
import re
|
|
import logging
|
|
|
|
# Configure logging for the tool
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class GitHubCIHelper:
|
|
"""
|
|
A helper class to interact with GitHub Actions CI,
|
|
specifically for fetching and analyzing test logs.
|
|
"""
|
|
def __init__(self, repo_owner: str, repo_name: str, github_token: str = None):
|
|
"""
|
|
Initializes the GitHubCIHelper.
|
|
|
|
Args:
|
|
repo_owner (str): The owner of the GitHub repository (e.g., '''bucolucas''').
|
|
repo_name (str): The name of the GitHub repository (e.g., '''cyclop''').
|
|
github_token (str, optional): A GitHub Personal Access Token (PAT)
|
|
for API authentication. Recommended for
|
|
private repos or higher rate limits.
|
|
Can also be set via GITHUB_TOKEN env var.
|
|
"""
|
|
self.repo_owner = repo_owner
|
|
self.repo_name = repo_name
|
|
self.base_url = f"https://api.github.com/repos/{self.repo_owner}/{self.repo_name}"
|
|
self.github_token = github_token or os.environ.get('GITHUB_TOKEN')
|
|
self.headers = {
|
|
"Accept": "application/vnd.github.v3+json"
|
|
}
|
|
if self.github_token:
|
|
self.headers["Authorization"] = f"token {self.github_token}"
|
|
|
|
def _make_request(self, method: str, url: str, **kwargs):
|
|
"""Helper function for making HTTP requests."""
|
|
try:
|
|
response = requests.request(method, url, headers=self.headers, **kwargs)
|
|
response.raise_for_status() # Raise an exception for bad status codes
|
|
if response.content:
|
|
return response.json()
|
|
return None
|
|
except requests.exceptions.HTTPError as e:
|
|
logger.error(f"HTTP error occurred: {e} - {e.response.text}")
|
|
raise
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Request failed: {e}")
|
|
raise
|
|
|
|
def get_pr_workflow_runs(self, pull_request_number: int):
|
|
"""
|
|
Gets all workflow runs associated with a specific pull request.
|
|
|
|
Args:
|
|
pull_request_number (int): The number of the pull request.
|
|
|
|
Returns:
|
|
list: A list of workflow runs, or None if an error occurs.
|
|
"""
|
|
# First, get the head SHA of the PR
|
|
pr_url = f"{self.base_url}/pulls/{pull_request_number}"
|
|
pr_data = self._make_request("GET", pr_url)
|
|
if not pr_data or 'head' not in pr_data or 'sha' not in pr_data['head']:
|
|
logger.error(f"Could not get head SHA for PR {pull_request_number}")
|
|
return None
|
|
head_sha = pr_data['head']['sha']
|
|
|
|
# Then, get workflow runs for that commit SHA
|
|
runs_url = f"{self.base_url}/actions/runs?event=pull_request&head_sha={head_sha}"
|
|
runs_data = self._make_request("GET", runs_url)
|
|
return runs_data.get("workflow_runs") if runs_data else None
|
|
|
|
def get_latest_failed_run_for_pr(self, pull_request_number: int, workflow_name: str = "Python CI"):
|
|
"""
|
|
Gets the latest failed workflow run for a specific pull request and workflow name.
|
|
|
|
Args:
|
|
pull_request_number (int): The number of the pull request.
|
|
workflow_name (str): The display name of the workflow (e.g., "Python CI").
|
|
|
|
Returns:
|
|
dict: The workflow run object if a failed run is found, otherwise None.
|
|
"""
|
|
runs = self.get_pr_workflow_runs(pull_request_number)
|
|
if not runs:
|
|
return None
|
|
|
|
for run in sorted(runs, key=lambda r: r['created_at'], reverse=True):
|
|
if run['name'] == workflow_name and run['conclusion'] == 'failure':
|
|
return run
|
|
logger.info(f"No failed run for workflow '{workflow_name}' found for PR {pull_request_number}")
|
|
return None
|
|
|
|
def get_job_logs_for_run(self, run_id: int, job_name: str = "test"):
|
|
"""
|
|
Downloads and returns the logs for a specific job within a workflow run.
|
|
|
|
Args:
|
|
run_id (int): The ID of the workflow run.
|
|
job_name (str): The name of the job (e.g., "test").
|
|
|
|
Returns:
|
|
str: The job logs as a string, or None if an error occurs or job not found.
|
|
"""
|
|
jobs_url = f"{self.base_url}/actions/runs/{run_id}/jobs"
|
|
jobs_data = self._make_request("GET", jobs_url)
|
|
|
|
if not jobs_data or "jobs" not in jobs_data:
|
|
logger.error(f"Could not retrieve jobs for run ID {run_id}")
|
|
return None
|
|
|
|
target_job = None
|
|
for job in jobs_data["jobs"]:
|
|
if job["name"] == job_name:
|
|
target_job = job
|
|
break
|
|
|
|
if not target_job:
|
|
logger.error(f"Job '{job_name}' not found in run ID {run_id}")
|
|
return None
|
|
|
|
if target_job['status'] != 'completed':
|
|
logger.info(f"Job '{job_name}' in run ID {run_id} has not completed. Status: {target_job['status']}")
|
|
# You might want to handle this differently, e.g., wait or return specific status
|
|
return None
|
|
|
|
|
|
logs_url = f"{self.base_url}/actions/jobs/{target_job['id']}/logs"
|
|
logger.info(f"Attempting to download logs from: {logs_url}")
|
|
|
|
try:
|
|
# Logs are often zipped, GitHub API redirects to a download URL for the zip
|
|
response = requests.get(logs_url, headers=self.headers, allow_redirects=True, stream=True)
|
|
response.raise_for_status()
|
|
|
|
# Check if the content is a zip file
|
|
if 'application/zip' in response.headers.get('Content-Type', '''):
|
|
with zipfile.ZipFile(io.BytesIO(response.content)) as zf:
|
|
# Assuming there's one log file in the zip, or a specific named one
|
|
# List files in zip to find the actual log file name
|
|
log_file_names = [name for name in zf.namelist() if not name.endswith('/')] # Exclude directories
|
|
if not log_file_names:
|
|
logger.error(f"No files found in the downloaded log zip for job ID {target_job['id']}.")
|
|
return None
|
|
|
|
# Heuristic: try to find the most relevant log file.
|
|
# Often it's just job_name.txt or contains 'stdout'.
|
|
# For a single file zip, this is simple.
|
|
actual_log_file_name = log_file_names[0] # Default to first file
|
|
for name in log_file_names:
|
|
if job_name in name or "step" in name: # Or other heuristic
|
|
actual_log_file_name = name
|
|
break
|
|
|
|
logger.info(f"Extracting log file: {actual_log_file_name} from zip.")
|
|
with zf.open(actual_log_file_name) as log_file:
|
|
return log_file.read().decode('utf-8')
|
|
else:
|
|
# If not a zip, assume it's plain text (less common for full logs now)
|
|
return response.text
|
|
|
|
except requests.exceptions.HTTPError as e:
|
|
logger.error(f"HTTP error downloading logs for job ID {target_job['id']}: {e} - {e.response.text}")
|
|
# GitHub might return 404 if logs expired or job ID is wrong
|
|
if e.response.status_code == 404:
|
|
logger.error("Log download URL might be invalid or logs expired.")
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Request failed downloading logs for job ID {target_job['id']}: {e}")
|
|
except zipfile.BadZipFile:
|
|
logger.error(f"Failed to unzip logs for job ID {target_job['id']}. Content was: {response.text[:500]}...") # Log beginning of content
|
|
except Exception as e:
|
|
logger.error(f"An unexpected error occurred while processing logs for job {target_job['id']}: {e}")
|
|
return None
|
|
|
|
def parse_unittest_failures_from_log(self, log_content: str):
|
|
"""
|
|
Parses unittest failure details from log content.
|
|
This is a basic parser and might need adjustments based on specific log formats.
|
|
|
|
Args:
|
|
log_content (str): The string content of the job log.
|
|
|
|
Returns:
|
|
list: A list of strings, where each string is a block of a failed test's output.
|
|
"""
|
|
if not log_content:
|
|
return []
|
|
|
|
failures = []
|
|
# Regex to find the start of a failure/error block (e.g., "FAIL: test_something (module.TestClass)")
|
|
# and capture everything until the next blank line or the standard "Ran X tests in Ys" line.
|
|
# This regex is complex and might need refinement.
|
|
# It looks for "FAIL:" or "ERROR:", captures the test name, then everything until "---" or "Ran x tests"
|
|
failure_pattern = re.compile(
|
|
r"^(FAIL|ERROR): ([^\n]+)\n(.*?)(?=\n-{70}\n|Ran \d+ tests in|^-{70}\nFAIL:|\nERROR:|\Z)",
|
|
re.DOTALL | re.MULTILINE
|
|
)
|
|
|
|
matches = failure_pattern.finditer(log_content)
|
|
for match in matches:
|
|
failure_type = match.group(1) # FAIL or ERROR
|
|
test_name_line = match.group(2).strip() # The line with the test name
|
|
details = match.group(3).strip() # The traceback and details
|
|
failures.append(f"{failure_type}: {test_name_line}\n{details}")
|
|
|
|
# Fallback or simpler pattern if the above is too greedy or misses things:
|
|
# Look for lines starting with "FAIL:" or "ERROR:" and the traceback sections.
|
|
# This is highly dependent on the exact output structure of `unittest`.
|
|
if not failures:
|
|
# A simpler approach: find "Traceback (most recent call last):"
|
|
# and collect lines until a clear separator or end of section.
|
|
# This is less precise about associating the error with a specific test name from the header.
|
|
traceback_pattern = re.compile(r"Traceback \(most recent call last\):.*?\n(.*?\n)(?=\n[A-Z]+:|\n-{70}\n|Ran \d+ tests in|\Z)", re.DOTALL | re.MULTILINE)
|
|
tb_matches = traceback_pattern.finditer(log_content)
|
|
for i, tb_match in enumerate(tb_matches):
|
|
# Try to find a preceding FAIL/ERROR line if possible
|
|
# This part is tricky without more context from the log structure
|
|
context_before_tb = log_content[:tb_match.start()]
|
|
related_test_name = f"Unknown Test {i+1}"
|
|
last_fail_error_lines = re.findall(r"^(?:FAIL|ERROR): (.*)\s*$", context_before_tb, re.MULTILINE)
|
|
if last_fail_error_lines:
|
|
related_test_name = last_fail_error_lines[-1]
|
|
|
|
failures.append(f"FAILURE/ERROR (from Traceback {i+1} - Test: {related_test_name}):\n{tb_match.group(0).strip()}")
|
|
|
|
|
|
if not failures and "FAILURES" in log_content: # A very generic check
|
|
logger.warning("Found 'FAILURES' in log but couldn't parse specific test blocks. Returning raw log segment if possible.")
|
|
# Try to return a segment of the log if it seems to contain failures but parsing failed.
|
|
fail_summary_match = re.search(r"FAILURES\s*={70,}", log_content, re.MULTILINE)
|
|
if fail_summary_match:
|
|
failures.append(log_content[fail_summary_match.start():])
|
|
|
|
|
|
return failures
|
|
|
|
# --- Example Usage (Illustrative) ---
|
|
if __name__ == "__main__":
|
|
# This example assumes you have GITHUB_TOKEN environment variable set
|
|
# And that 'requests' is installed.
|
|
# Replace with your actual repo owner, name, and PR number.
|
|
# PR #206 from user's previous message.
|
|
pr_number = 206
|
|
repo_owner = "bucolucas"
|
|
repo_name = "cyclop"
|
|
|
|
# Setup basic logging for the example
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
|
|
helper = GitHubCIHelper(repo_owner, repo_name)
|
|
|
|
logger.info(f"Looking for failed runs for PR #{pr_number} in {repo_owner}/{repo_name}")
|
|
# Assuming your workflow file is named "Python CI.yml" which results in "Python CI" display name.
|
|
# And the job name within that workflow is "test".
|
|
failed_run = helper.get_latest_failed_run_for_pr(pull_request_number=pr_number, workflow_name="Python CI")
|
|
|
|
if failed_run:
|
|
logger.info(f"Found failed run: ID {failed_run['id']}, Status {failed_run['conclusion']}")
|
|
logger.info(f"Attempting to download logs for job 'test' in run {failed_run['id']}...")
|
|
|
|
# The job name 'test' comes from the workflow file:
|
|
# jobs:
|
|
# test: <-- this is the job name
|
|
# runs-on: self-hosted
|
|
log_content = helper.get_job_logs_for_run(run_id=failed_run['id'], job_name="test")
|
|
|
|
if log_content:
|
|
logger.info(f"Successfully downloaded logs (length: {len(log_content)} characters).")
|
|
# print("\n--- Full Log Content (first 1000 chars) ---")
|
|
# print(log_content[:1000])
|
|
# print("\n--- End of Log Snippet ---")
|
|
|
|
logger.info("\n--- Parsing unittest failures ---")
|
|
failures = helper.parse_unittest_failures_from_log(log_content)
|
|
if failures:
|
|
for i, failure_details in enumerate(failures):
|
|
print(f"\nFailure {i+1}:\n{failure_details}")
|
|
else:
|
|
print("No specific unittest failures parsed by the tool, or tests might have passed within the job despite job failure.")
|
|
print("This can happen if the job fails for reasons other than Python test failures (e.g., setup error, script error).")
|
|
print("Or, the log parsing regex might need adjustment for your specific unittest output format.")
|
|
|
|
else:
|
|
logger.error("Could not retrieve or process log content.")
|
|
else:
|
|
logger.info(f"No failed 'Python CI' workflow run found for PR #{pr_number} or the PR doesn't exist/no runs yet.")
|
|
|
|
# Example of how to use if you know the run ID directly:
|
|
# known_run_id = 123456789 # replace with an actual run_id
|
|
# log_content = helper.get_job_logs_for_run(known_run_id, job_name="test")
|
|
# if log_content:
|
|
# failures = helper.parse_unittest_failures_from_log(log_content)
|
|
# ...
|