diff --git a/tools/github_ci_tool.py b/tools/github_ci_tool.py new file mode 100644 index 0000000..d63bbbf --- /dev/null +++ b/tools/github_ci_tool.py @@ -0,0 +1,296 @@ +import requests +import os +import zipfile +import io +import re +import logging + +# Configure logging for the tool +logger = logging.getLogger(__name__) + +class GitHubCIHelper: + """ + A helper class to interact with GitHub Actions CI, + specifically for fetching and analyzing test logs. + """ + def __init__(self, repo_owner: str, repo_name: str, github_token: str = None): + """ + Initializes the GitHubCIHelper. + + Args: + repo_owner (str): The owner of the GitHub repository (e.g., '''bucolucas'''). + repo_name (str): The name of the GitHub repository (e.g., '''cyclop'''). + github_token (str, optional): A GitHub Personal Access Token (PAT) + for API authentication. Recommended for + private repos or higher rate limits. + Can also be set via GITHUB_TOKEN env var. + """ + self.repo_owner = repo_owner + self.repo_name = repo_name + self.base_url = f"https://api.github.com/repos/{self.repo_owner}/{self.repo_name}" + self.github_token = github_token or os.environ.get('GITHUB_TOKEN') + self.headers = { + "Accept": "application/vnd.github.v3+json" + } + if self.github_token: + self.headers["Authorization"] = f"token {self.github_token}" + + def _make_request(self, method: str, url: str, **kwargs): + """Helper function for making HTTP requests.""" + try: + response = requests.request(method, url, headers=self.headers, **kwargs) + response.raise_for_status() # Raise an exception for bad status codes + if response.content: + return response.json() + return None + except requests.exceptions.HTTPError as e: + logger.error(f"HTTP error occurred: {e} - {e.response.text}") + raise + except requests.exceptions.RequestException as e: + logger.error(f"Request failed: {e}") + raise + + def get_pr_workflow_runs(self, pull_request_number: int): + """ + Gets all workflow runs associated with a specific pull request. + + Args: + pull_request_number (int): The number of the pull request. + + Returns: + list: A list of workflow runs, or None if an error occurs. + """ + # First, get the head SHA of the PR + pr_url = f"{self.base_url}/pulls/{pull_request_number}" + pr_data = self._make_request("GET", pr_url) + if not pr_data or 'head' not in pr_data or 'sha' not in pr_data['head']: + logger.error(f"Could not get head SHA for PR {pull_request_number}") + return None + head_sha = pr_data['head']['sha'] + + # Then, get workflow runs for that commit SHA + runs_url = f"{self.base_url}/actions/runs?event=pull_request&head_sha={head_sha}" + runs_data = self._make_request("GET", runs_url) + return runs_data.get("workflow_runs") if runs_data else None + + def get_latest_failed_run_for_pr(self, pull_request_number: int, workflow_name: str = "Python CI"): + """ + Gets the latest failed workflow run for a specific pull request and workflow name. + + Args: + pull_request_number (int): The number of the pull request. + workflow_name (str): The display name of the workflow (e.g., "Python CI"). + + Returns: + dict: The workflow run object if a failed run is found, otherwise None. + """ + runs = self.get_pr_workflow_runs(pull_request_number) + if not runs: + return None + + for run in sorted(runs, key=lambda r: r['created_at'], reverse=True): + if run['name'] == workflow_name and run['conclusion'] == 'failure': + return run + logger.info(f"No failed run for workflow '{workflow_name}' found for PR {pull_request_number}") + return None + + def get_job_logs_for_run(self, run_id: int, job_name: str = "test"): + """ + Downloads and returns the logs for a specific job within a workflow run. + + Args: + run_id (int): The ID of the workflow run. + job_name (str): The name of the job (e.g., "test"). + + Returns: + str: The job logs as a string, or None if an error occurs or job not found. + """ + jobs_url = f"{self.base_url}/actions/runs/{run_id}/jobs" + jobs_data = self._make_request("GET", jobs_url) + + if not jobs_data or "jobs" not in jobs_data: + logger.error(f"Could not retrieve jobs for run ID {run_id}") + return None + + target_job = None + for job in jobs_data["jobs"]: + if job["name"] == job_name: + target_job = job + break + + if not target_job: + logger.error(f"Job '{job_name}' not found in run ID {run_id}") + return None + + if target_job['status'] != 'completed': + logger.info(f"Job '{job_name}' in run ID {run_id} has not completed. Status: {target_job['status']}") + # You might want to handle this differently, e.g., wait or return specific status + return None + + + logs_url = f"{self.base_url}/actions/jobs/{target_job['id']}/logs" + logger.info(f"Attempting to download logs from: {logs_url}") + + try: + # Logs are often zipped, GitHub API redirects to a download URL for the zip + response = requests.get(logs_url, headers=self.headers, allow_redirects=True, stream=True) + response.raise_for_status() + + # Check if the content is a zip file + if 'application/zip' in response.headers.get('Content-Type', '''): + with zipfile.ZipFile(io.BytesIO(response.content)) as zf: + # Assuming there's one log file in the zip, or a specific named one + # List files in zip to find the actual log file name + log_file_names = [name for name in zf.namelist() if not name.endswith('/')] # Exclude directories + if not log_file_names: + logger.error(f"No files found in the downloaded log zip for job ID {target_job['id']}.") + return None + + # Heuristic: try to find the most relevant log file. + # Often it's just job_name.txt or contains 'stdout'. + # For a single file zip, this is simple. + actual_log_file_name = log_file_names[0] # Default to first file + for name in log_file_names: + if job_name in name or "step" in name: # Or other heuristic + actual_log_file_name = name + break + + logger.info(f"Extracting log file: {actual_log_file_name} from zip.") + with zf.open(actual_log_file_name) as log_file: + return log_file.read().decode('utf-8') + else: + # If not a zip, assume it's plain text (less common for full logs now) + return response.text + + except requests.exceptions.HTTPError as e: + logger.error(f"HTTP error downloading logs for job ID {target_job['id']}: {e} - {e.response.text}") + # GitHub might return 404 if logs expired or job ID is wrong + if e.response.status_code == 404: + logger.error("Log download URL might be invalid or logs expired.") + except requests.exceptions.RequestException as e: + logger.error(f"Request failed downloading logs for job ID {target_job['id']}: {e}") + except zipfile.BadZipFile: + logger.error(f"Failed to unzip logs for job ID {target_job['id']}. Content was: {response.text[:500]}...") # Log beginning of content + except Exception as e: + logger.error(f"An unexpected error occurred while processing logs for job {target_job['id']}: {e}") + return None + + def parse_unittest_failures_from_log(self, log_content: str): + """ + Parses unittest failure details from log content. + This is a basic parser and might need adjustments based on specific log formats. + + Args: + log_content (str): The string content of the job log. + + Returns: + list: A list of strings, where each string is a block of a failed test's output. + """ + if not log_content: + return [] + + failures = [] + # Regex to find the start of a failure/error block (e.g., "FAIL: test_something (module.TestClass)") + # and capture everything until the next blank line or the standard "Ran X tests in Ys" line. + # This regex is complex and might need refinement. + # It looks for "FAIL:" or "ERROR:", captures the test name, then everything until "---" or "Ran x tests" + failure_pattern = re.compile( + r"^(FAIL|ERROR): ([^\n]+)\n(.*?)(?=\n-{70}\n|Ran \d+ tests in|^-{70}\nFAIL:|\nERROR:|\Z)", + re.DOTALL | re.MULTILINE + ) + + matches = failure_pattern.finditer(log_content) + for match in matches: + failure_type = match.group(1) # FAIL or ERROR + test_name_line = match.group(2).strip() # The line with the test name + details = match.group(3).strip() # The traceback and details + failures.append(f"{failure_type}: {test_name_line}\n{details}") + + # Fallback or simpler pattern if the above is too greedy or misses things: + # Look for lines starting with "FAIL:" or "ERROR:" and the traceback sections. + # This is highly dependent on the exact output structure of `unittest`. + if not failures: + # A simpler approach: find "Traceback (most recent call last):" + # and collect lines until a clear separator or end of section. + # This is less precise about associating the error with a specific test name from the header. + traceback_pattern = re.compile(r"Traceback \(most recent call last\):.*?\n(.*?\n)(?=\n[A-Z]+:|\n-{70}\n|Ran \d+ tests in|\Z)", re.DOTALL | re.MULTILINE) + tb_matches = traceback_pattern.finditer(log_content) + for i, tb_match in enumerate(tb_matches): + # Try to find a preceding FAIL/ERROR line if possible + # This part is tricky without more context from the log structure + context_before_tb = log_content[:tb_match.start()] + related_test_name = f"Unknown Test {i+1}" + last_fail_error_lines = re.findall(r"^(?:FAIL|ERROR): (.*)\s*$", context_before_tb, re.MULTILINE) + if last_fail_error_lines: + related_test_name = last_fail_error_lines[-1] + + failures.append(f"FAILURE/ERROR (from Traceback {i+1} - Test: {related_test_name}):\n{tb_match.group(0).strip()}") + + + if not failures and "FAILURES" in log_content: # A very generic check + logger.warning("Found 'FAILURES' in log but couldn't parse specific test blocks. Returning raw log segment if possible.") + # Try to return a segment of the log if it seems to contain failures but parsing failed. + fail_summary_match = re.search(r"FAILURES\s*={70,}", log_content, re.MULTILINE) + if fail_summary_match: + failures.append(log_content[fail_summary_match.start():]) + + + return failures + +# --- Example Usage (Illustrative) --- +if __name__ == "__main__": + # This example assumes you have GITHUB_TOKEN environment variable set + # And that 'requests' is installed. + # Replace with your actual repo owner, name, and PR number. + # PR #206 from user's previous message. + pr_number = 206 + repo_owner = "bucolucas" + repo_name = "cyclop" + + # Setup basic logging for the example + logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') + + helper = GitHubCIHelper(repo_owner, repo_name) + + logger.info(f"Looking for failed runs for PR #{pr_number} in {repo_owner}/{repo_name}") + # Assuming your workflow file is named "Python CI.yml" which results in "Python CI" display name. + # And the job name within that workflow is "test". + failed_run = helper.get_latest_failed_run_for_pr(pull_request_number=pr_number, workflow_name="Python CI") + + if failed_run: + logger.info(f"Found failed run: ID {failed_run['id']}, Status {failed_run['conclusion']}") + logger.info(f"Attempting to download logs for job 'test' in run {failed_run['id']}...") + + # The job name 'test' comes from the workflow file: + # jobs: + # test: <-- this is the job name + # runs-on: self-hosted + log_content = helper.get_job_logs_for_run(run_id=failed_run['id'], job_name="test") + + if log_content: + logger.info(f"Successfully downloaded logs (length: {len(log_content)} characters).") + # print("\n--- Full Log Content (first 1000 chars) ---") + # print(log_content[:1000]) + # print("\n--- End of Log Snippet ---") + + logger.info("\n--- Parsing unittest failures ---") + failures = helper.parse_unittest_failures_from_log(log_content) + if failures: + for i, failure_details in enumerate(failures): + print(f"\nFailure {i+1}:\n{failure_details}") + else: + print("No specific unittest failures parsed by the tool, or tests might have passed within the job despite job failure.") + print("This can happen if the job fails for reasons other than Python test failures (e.g., setup error, script error).") + print("Or, the log parsing regex might need adjustment for your specific unittest output format.") + + else: + logger.error("Could not retrieve or process log content.") + else: + logger.info(f"No failed 'Python CI' workflow run found for PR #{pr_number} or the PR doesn't exist/no runs yet.") + + # Example of how to use if you know the run ID directly: + # known_run_id = 123456789 # replace with an actual run_id + # log_content = helper.get_job_logs_for_run(known_run_id, job_name="test") + # if log_content: + # failures = helper.parse_unittest_failures_from_log(log_content) + # ...