Merge pull request #208 from bucolucas/feature/add-ci-tool-json-definition

Feature: Add JSON tool definition and BaseTool integration for GitHubCIHelper
This commit is contained in:
2025-06-02 18:41:37 -05:00
committed by GitHub
+274 -168
View File
@@ -4,293 +4,399 @@ import zipfile
import io import io
import re import re
import logging import logging
from .base_tool import BaseTool # Added
from .metrics import metrics # Added
# Configure logging for the tool # Configure logging for the tool - This will be handled by the logger instance now
logger = logging.getLogger(__name__) # logger = logging.getLogger(__name__) # Commented out or removed
class GitHubCIHelper: class GitHubCIHelper(BaseTool): # Inherits from BaseTool
""" """
A helper class to interact with GitHub Actions CI, A helper class to interact with GitHub Actions CI,
specifically for fetching and analyzing test logs. specifically for fetching and analyzing test logs.
""" """
def __init__(self, repo_owner: str, repo_name: str, github_token: str = None): def __init__(self, repo_owner: str, repo_name: str, github_token: str = None, session=None, logger_instance=None): # Added session and logger_instance
""" """
Initializes the GitHubCIHelper. Initializes the GitHubCIHelper.
Args: Args:
repo_owner (str): The owner of the GitHub repository (e.g., '''bucolucas'''). repo_owner (str): The owner of the GitHub repository (e.g., \'\'\'bucolucas\'\'\').
repo_name (str): The name of the GitHub repository (e.g., '''cyclop'''). repo_name (str): The name of the GitHub repository (e.g., \'\'\'cyclop\'\'\').
github_token (str, optional): A GitHub Personal Access Token (PAT) github_token (str, optional): A GitHub Personal Access Token (PAT)
for API authentication. Recommended for for API authentication. Recommended for
private repos or higher rate limits. private repos or higher rate limits.
Can also be set via GITHUB_TOKEN env var. Can also be set via GITHUB_TOKEN env var.
session (requests.Session, optional): An external requests session to use.
logger_instance (logging.Logger, optional): An external logger instance.
""" """
self.repo_owner = repo_owner self.repo_owner = repo_owner
self.repo_name = repo_name self.repo_name = repo_name
self.base_url = f"https://api.github.com/repos/{self.repo_owner}/{self.repo_name}" self.base_url = f"https://api.github.com/repos/{self.repo_owner}/{self.repo_name}"
self.github_token = github_token or os.environ.get('GITHUB_TOKEN') self._token = github_token or os.environ.get(\'GITHUB_TOKEN\') # Renamed to _token for consistency
self.headers = { self.headers = {
"Accept": "application/vnd.github.v3+json" "Accept": "application/vnd.github.v3+json"
} }
if self.github_token: if self._token: # Use self._token
self.headers["Authorization"] = f"token {self.github_token}" self.headers["Authorization"] = f"token {self._token}"
def _make_request(self, method: str, url: str, **kwargs): if session:
self.session = session
else:
self.session = requests.Session()
# Headers are applied per-request in _make_request or directly where self.session is used if needed globally for session
self.logger = logger_instance if logger_instance else logging.getLogger(__name__)
if not self.logger.handlers:
self.logger.addHandler(logging.NullHandler())
def get_functions(self):
return [
{
"type": "function",
"function": {
"name": "get_pr_workflow_runs",
"description": "Gets all workflow runs associated with a specific pull request.",
"parameters": {
"type": "object",
"properties": {
"pull_request_number": {"type": "integer", "description": "The number of the pull request."}
},
"required": ["pull_request_number"]
}
}
},
{
"type": "function",
"function": {
"name": "get_latest_failed_run_for_pr",
"description": "Gets the latest failed workflow run for a specific pull request and workflow name.",
"parameters": {
"type": "object",
"properties": {
"pull_request_number": {"type": "integer", "description": "The number of the pull request."},
"workflow_name": {"type": "string", "description": "The display name of the workflow (e.g., \'\'\'Python CI\'\'\').", "default": "Python CI"}
},
"required": ["pull_request_number"]
}
}
},
{
"type": "function",
"function": {
"name": "get_job_logs_for_run",
"description": "Downloads and returns the logs for a specific job within a workflow run.",
"parameters": {
"type": "object",
"properties": {
"run_id": {"type": "integer", "description": "The ID of the workflow run."},
"job_name": {"type": "string", "description": "The name of the job (e.g., \'\'\'test\'\'\').", "default": "test"}
},
"required": ["run_id"]
}
}
},
{
"type": "function",
"function": {
"name": "parse_unittest_failures_from_log",
"description": "Parses unittest failure details from log content. This is a basic parser and might need adjustments based on specific log formats.",
"parameters": {
"type": "object",
"properties": {
"log_content": {"type": "string", "description": "The string content of the job log."}
},
"required": ["log_content"]
}
}
}
]
@metrics.measure
def execute(self, function_name, **kwargs):
self.logger.info(f"Executing GitHub CI Helper function: {function_name} with args: {kwargs}")
# Dispatch to the appropriate public method
if hasattr(self, function_name) and callable(getattr(self, function_name)) and not function_name.startswith("_"):
method = getattr(self, function_name)
try:
return method(**kwargs)
except Exception as e:
self.logger.error(f"Error executing {function_name}: {e}", exc_info=True)
return f"Error during {function_name} execution: {str(e)}"
else:
error_message = f"Unknown or private function: {function_name}"
self.logger.error(error_message)
return error_message
def clear(self):
"""Clears any sensitive state if necessary. For this tool, it\'s a no-op but present for interface consistency."""
self.logger.info("GitHubCIHelper state cleared (no specific state to clear).")
@metrics.measure
def _make_request(self, method: str, url: str, **kwargs): # Added @metrics.measure
"""Helper function for making HTTP requests.""" """Helper function for making HTTP requests."""
try: try:
response = requests.request(method, url, headers=self.headers, **kwargs) # Use self.session instead of requests directly
response.raise_for_status() # Raise an exception for bad status codes response = self.session.request(method, url, headers=self.headers, **kwargs)
if response.content: response.raise_for_status()
if response.content and response.headers.get(\'Content-Type\', \'\').startswith(\'application/json\'):
return response.json() return response.json()
elif response.content: # For non-JSON content like zip files or plain text logs
return response
return None return None
except requests.exceptions.HTTPError as e: except requests.exceptions.HTTPError as e:
logger.error(f"HTTP error occurred: {e} - {e.response.text}") self.logger.error(f"HTTP error occurred: {e} - {e.response.text if e.response else \'No response text\'}") # Use self.logger
raise raise
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
logger.error(f"Request failed: {e}") self.logger.error(f"Request failed: {e}") # Use self.logger
raise raise
def get_pr_workflow_runs(self, pull_request_number: int): @metrics.measure
def get_pr_workflow_runs(self, pull_request_number: int): # Added @metrics.measure
""" """
Gets all workflow runs associated with a specific pull request. Gets all workflow runs associated with a specific pull request.
Args:
pull_request_number (int): The number of the pull request.
Returns:
list: A list of workflow runs, or None if an error occurs.
""" """
# First, get the head SHA of the PR self.logger.info(f"Getting workflow runs for PR #{pull_request_number}")
pr_url = f"{self.base_url}/pulls/{pull_request_number}" pr_url = f"{self.base_url}/pulls/{pull_request_number}"
pr_data = self._make_request("GET", pr_url) try:
if not pr_data or 'head' not in pr_data or 'sha' not in pr_data['head']: pr_response = self._make_request("GET", pr_url) # this returns a response object or parsed JSON
logger.error(f"Could not get head SHA for PR {pull_request_number}") pr_data = pr_response if isinstance(pr_response, dict) else pr_response.json() # Ensure pr_data is dict
if not pr_data or \'head\' not in pr_data or \'sha\' not in pr_data[\'head\']:
self.logger.error(f"Could not get head SHA for PR {pull_request_number}. Response: {pr_data}")
return None return None
head_sha = pr_data['head']['sha'] head_sha = pr_data[\'head\'][\'sha\']
# Then, get workflow runs for that commit SHA
runs_url = f"{self.base_url}/actions/runs?event=pull_request&head_sha={head_sha}" runs_url = f"{self.base_url}/actions/runs?event=pull_request&head_sha={head_sha}"
runs_data = self._make_request("GET", runs_url) runs_response = self._make_request("GET", runs_url)
return runs_data.get("workflow_runs") if runs_data else None runs_data = runs_response if isinstance(runs_response, dict) else runs_response.json()
def get_latest_failed_run_for_pr(self, pull_request_number: int, workflow_name: str = "Python CI"): return runs_data.get("workflow_runs") if runs_data else None
except Exception as e:
self.logger.error(f"Failed to get PR workflow runs for PR {pull_request_number}: {e}", exc_info=True)
return None
@metrics.measure
def get_latest_failed_run_for_pr(self, pull_request_number: int, workflow_name: str = "Python CI"): # Added @metrics.measure
""" """
Gets the latest failed workflow run for a specific pull request and workflow name. Gets the latest failed workflow run for a specific pull request and workflow name.
Args:
pull_request_number (int): The number of the pull request.
workflow_name (str): The display name of the workflow (e.g., "Python CI").
Returns:
dict: The workflow run object if a failed run is found, otherwise None.
""" """
self.logger.info(f"Getting latest failed run for PR #{pull_request_number}, workflow: \'{workflow_name}\'")
runs = self.get_pr_workflow_runs(pull_request_number) runs = self.get_pr_workflow_runs(pull_request_number)
if not runs: if not runs:
self.logger.info(f"No runs found for PR #{pull_request_number} to check for failures.")
return None return None
for run in sorted(runs, key=lambda r: r['created_at'], reverse=True): for run in sorted(runs, key=lambda r: r[\'created_at\'], reverse=True):
if run['name'] == workflow_name and run['conclusion'] == 'failure': if run[\'name\'] == workflow_name and run[\'conclusion\'] == \'failure\':
self.logger.info(f"Found failed run {run[\'id\']} for workflow \'{workflow_name}\' in PR #{pull_request_number}")
return run return run
logger.info(f"No failed run for workflow '{workflow_name}' found for PR {pull_request_number}") self.logger.info(f"No failed run for workflow \'{workflow_name}\' found for PR #{pull_request_number}") # Use self.logger
return None return None
def get_job_logs_for_run(self, run_id: int, job_name: str = "test"): @metrics.measure
def get_job_logs_for_run(self, run_id: int, job_name: str = "test"): # Added @metrics.measure
""" """
Downloads and returns the logs for a specific job within a workflow run. Downloads and returns the logs for a specific job within a workflow run.
Args:
run_id (int): The ID of the workflow run.
job_name (str): The name of the job (e.g., "test").
Returns:
str: The job logs as a string, or None if an error occurs or job not found.
""" """
self.logger.info(f"Getting job logs for run ID {run_id}, job name \'{job_name}\'")
jobs_url = f"{self.base_url}/actions/runs/{run_id}/jobs" jobs_url = f"{self.base_url}/actions/runs/{run_id}/jobs"
jobs_data = self._make_request("GET", jobs_url) target_job = None # Initialize target_job here to ensure it\'s defined for later logging
try:
jobs_response = self._make_request("GET", jobs_url)
jobs_data = jobs_response if isinstance(jobs_response, dict) else jobs_response.json()
if not jobs_data or "jobs" not in jobs_data: if not jobs_data or "jobs" not in jobs_data:
logger.error(f"Could not retrieve jobs for run ID {run_id}") self.logger.error(f"Could not retrieve jobs for run ID {run_id}. Response: {jobs_data}")
return None return None
target_job = None
for job in jobs_data["jobs"]: for job in jobs_data["jobs"]:
if job["name"] == job_name: if job["name"] == job_name:
target_job = job target_job = job
break break
if not target_job: if not target_job:
logger.error(f"Job '{job_name}' not found in run ID {run_id}") self.logger.error(f"Job \'{job_name}\' not found in run ID {run_id}")
return None return None
if target_job['status'] != 'completed': if target_job[\'status\'] != \'completed\':
logger.info(f"Job '{job_name}' in run ID {run_id} has not completed. Status: {target_job['status']}") self.logger.info(f"Job \'{job_name}\' in run ID {run_id} has not completed. Status: {target_job[\'status\']}")
# You might want to handle this differently, e.g., wait or return specific status return f"Job \'{job_name}\' not yet completed (status: {target_job[\'status\']}). Logs may be unavailable."
return None
logs_url = f"{self.base_url}/actions/jobs/{target_job['id']}/logs" logs_url = f"{self.base_url}/actions/jobs/{target_job[\'id\']}/logs"
logger.info(f"Attempting to download logs from: {logs_url}") self.logger.info(f"Attempting to download logs from: {logs_url}")
try: log_response = self.session.get(logs_url, headers=self.headers, allow_redirects=True, stream=True)
# Logs are often zipped, GitHub API redirects to a download URL for the zip log_response.raise_for_status()
response = requests.get(logs_url, headers=self.headers, allow_redirects=True, stream=True)
response.raise_for_status()
# Check if the content is a zip file if \'application/zip\' in log_response.headers.get(\'Content-Type\', \'\'):
if 'application/zip' in response.headers.get('Content-Type', '''): self.logger.info(f"Received zip file for logs of job ID {target_job[\'id\']}.")
with zipfile.ZipFile(io.BytesIO(response.content)) as zf: with zipfile.ZipFile(io.BytesIO(log_response.content)) as zf:
# Assuming there's one log file in the zip, or a specific named one log_file_names = [name for name in zf.namelist() if not name.endswith(\'/\')]
# List files in zip to find the actual log file name
log_file_names = [name for name in zf.namelist() if not name.endswith('/')] # Exclude directories
if not log_file_names: if not log_file_names:
logger.error(f"No files found in the downloaded log zip for job ID {target_job['id']}.") self.logger.error(f"No files found in the downloaded log zip for job ID {target_job[\'id\']}.")
return None return None
# Heuristic: try to find the most relevant log file. actual_log_file_name = log_file_names[0]
# Often it's just job_name.txt or contains 'stdout'.
# For a single file zip, this is simple.
actual_log_file_name = log_file_names[0] # Default to first file
for name in log_file_names: for name in log_file_names:
if job_name in name or "step" in name: # Or other heuristic # Improved heuristic for log file name
if job_name in name or "test" in name.lower() or "log" in name.lower() or "out" in name.lower() or "step" in name:
actual_log_file_name = name actual_log_file_name = name
break break
logger.info(f"Extracting log file: {actual_log_file_name} from zip.") self.logger.info(f"Extracting log file: {actual_log_file_name} from zip for job ID {target_job[\'id\']}.")
with zf.open(actual_log_file_name) as log_file: with zf.open(actual_log_file_name) as log_file:
return log_file.read().decode('utf-8') return log_file.read().decode(\'utf-8\')
else: else:
# If not a zip, assume it's plain text (less common for full logs now) self.logger.info(f"Received plain text logs for job ID {target_job[\'id\']}.")
return response.text return log_response.text
except requests.exceptions.HTTPError as e: except requests.exceptions.HTTPError as e:
logger.error(f"HTTP error downloading logs for job ID {target_job['id']}: {e} - {e.response.text}") self.logger.error(f"HTTP error downloading logs for job ID {target_job.get(\'id\', \'unknown\') if target_job else \'unknown\'}: {e} - {e.response.text if e.response else \'No response text\'}", exc_info=True)
# GitHub might return 404 if logs expired or job ID is wrong if e.response and e.response.status_code == 404:
if e.response.status_code == 404: self.logger.error("Log download URL might be invalid or logs expired.")
logger.error("Log download URL might be invalid or logs expired.") return f"Error downloading logs: {e}"
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
logger.error(f"Request failed downloading logs for job ID {target_job['id']}: {e}") self.logger.error(f"Request failed downloading logs for job ID {target_job.get(\'id\', \'unknown\') if target_job else \'unknown\'}: {e}", exc_info=True)
return f"Error during log download request: {e}"
except zipfile.BadZipFile: except zipfile.BadZipFile:
logger.error(f"Failed to unzip logs for job ID {target_job['id']}. Content was: {response.text[:500]}...") # Log beginning of content self.logger.error(f"Failed to unzip logs for job ID {target_job.get(\'id\', \'unknown\') if target_job else \'unknown\'}.", exc_info=True)
# Adding response text for BadZipFile can be risky if it's large binary data.
# Consider logging only a snippet or specific headers if this occurs frequently.
return "Failed to unzip logs."
except Exception as e: except Exception as e:
logger.error(f"An unexpected error occurred while processing logs for job {target_job['id']}: {e}") self.logger.error(f"An unexpected error occurred while processing logs for job {target_job.get(\'id\', \'unknown\') if target_job else \'unknown\'}: {e}", exc_info=True)
return None return f"Unexpected error processing logs: {e}"
def parse_unittest_failures_from_log(self, log_content: str):
@metrics.measure
def parse_unittest_failures_from_log(self, log_content: str): # Added @metrics.measure
""" """
Parses unittest failure details from log content. Parses unittest failure details from log content.
This is a basic parser and might need adjustments based on specific log formats.
Args:
log_content (str): The string content of the job log.
Returns:
list: A list of strings, where each string is a block of a failed test's output.
""" """
if not log_content: if not log_content:
self.logger.info("Log content is empty, no failures to parse.")
return [] return []
failures = [] self.logger.info(f"Parsing unittest failures from log content (length: {len(log_content)}).")
# Regex to find the start of a failure/error block (e.g., "FAIL: test_something (module.TestClass)")
# and capture everything until the next blank line or the standard "Ran X tests in Ys" line. # Regex to capture standard unittest failure blocks
# This regex is complex and might need refinement. # It looks for "FAIL:" or "ERROR:", the test name (e.g., test_my_method), the class (e.g., my_module.MyTestClass),
# It looks for "FAIL:" or "ERROR:", captures the test name, then everything until "---" or "Ran x tests" # then a line of hyphens, "Traceback (most recent call last):", and the traceback details.
# It stops before the next failure block or common summary lines.
failure_pattern = re.compile( failure_pattern = re.compile(
r"^(FAIL|ERROR): ([^\n]+)\n(.*?)(?=\n-{70}\n|Ran \d+ tests in|^-{70}\nFAIL:|\nERROR:|\Z)", r"^(FAIL|ERROR): (.*?)\s*\\((.*?)\\)\s*\n-{5,}\\s*\nTraceback \\(most recent call last\\):\\s*\n(.*?)(?=\n(?:FAIL:|ERROR:)|\n-{5,}\\s*\nRan \\d+ tests? in|\\Z)",
re.DOTALL | re.MULTILINE re.DOTALL | re.MULTILINE
) )
matches = failure_pattern.finditer(log_content) failures = []
for match in matches: for match in failure_pattern.finditer(log_content):
failure_type = match.group(1) # FAIL or ERROR failure_type = match.group(1) # FAIL or ERROR
test_name_line = match.group(2).strip() # The line with the test name test_name = match.group(2).strip() # e.g., test_specific_behavior
details = match.group(3).strip() # The traceback and details test_module_class = match.group(3).strip() # e.g., tests.test_module.TestMyFeature
failures.append(f"{failure_type}: {test_name_line}\n{details}") traceback_details = match.group(4).strip() # The actual traceback
# Reconstruct a readable failure block
failure_block = (
f"{failure_type}: {test_name} ({test_module_class})\n"
f"---------------------\n"
f"Traceback (most recent call last):\n"
f"{traceback_details}"
)
failures.append(failure_block)
if failures:
self.logger.info(f"Parsed {len(failures)} failure blocks using primary regex.")
return failures
# Fallback: A more general pattern if the above doesn't match (e.g., due to slight variations in formatting)
# This looks for "FAIL:" or "ERROR:", a line for the test name, then captures content until common separators.
general_failure_pattern = re.compile(
r"^(FAIL|ERROR): ([^\n]+)\n(.*?)(?=\n(?:FAIL:|ERROR:)|\n-{20,}\n|Ran \\d+ tests? in|\\Z)",
re.DOTALL | re.MULTILINE
)
for match in general_failure_pattern.finditer(log_content):
failure_type = match.group(1)
test_header = match.group(2).strip()
details = match.group(3).strip()
full_block = f"{failure_type}: {test_header}\n{details}"
# Avoid adding essentially duplicate or overly broad captures if specific ones exist
if not any(f.startswith(f"{failure_type}: {test_header}") for f in failures):
failures.append(full_block)
if failures: # Check if fallback added anything
self.logger.info(f"Parsed {len(failures)} failure blocks using general fallback regex.")
return failures
# Last resort: if specific "FAILURES!!!" section is found, often this contains a summary.
# This might be too broad or not structured enough, but better than nothing.
if "FAILURES!!!" in log_content or "SUMMARY OF FAILURES" in log_content:
summary_marker = "FAILURES!!!" if "FAILURES!!!" in log_content else "SUMMARY OF FAILURES"
start_index = log_content.find(summary_marker)
if start_index != -1:
# Try to find a reasonable end for this summary block
end_pattern = re.compile(r"Ran \\d+ tests? in [\\d\\.]+s|\n-{70,}")
end_match = end_pattern.search(log_content, start_index)
end_index = end_match.start() if end_match else len(log_content)
failure_summary_block = log_content[start_index:end_index].strip()
if failure_summary_block:
failures.append(f"FAILURE SUMMARY BLOCK:\n{failure_summary_block}")
self.logger.info("Captured a general failure summary block.")
return failures
# Fallback or simpler pattern if the above is too greedy or misses things:
# Look for lines starting with "FAIL:" or "ERROR:" and the traceback sections.
# This is highly dependent on the exact output structure of `unittest`.
if not failures: if not failures:
# A simpler approach: find "Traceback (most recent call last):" self.logger.info("No specific unittest failure blocks parsed with available patterns.")
# and collect lines until a clear separator or end of section.
# This is less precise about associating the error with a specific test name from the header.
traceback_pattern = re.compile(r"Traceback \(most recent call last\):.*?\n(.*?\n)(?=\n[A-Z]+:|\n-{70}\n|Ran \d+ tests in|\Z)", re.DOTALL | re.MULTILINE)
tb_matches = traceback_pattern.finditer(log_content)
for i, tb_match in enumerate(tb_matches):
# Try to find a preceding FAIL/ERROR line if possible
# This part is tricky without more context from the log structure
context_before_tb = log_content[:tb_match.start()]
related_test_name = f"Unknown Test {i+1}"
last_fail_error_lines = re.findall(r"^(?:FAIL|ERROR): (.*)\s*$", context_before_tb, re.MULTILINE)
if last_fail_error_lines:
related_test_name = last_fail_error_lines[-1]
failures.append(f"FAILURE/ERROR (from Traceback {i+1} - Test: {related_test_name}):\n{tb_match.group(0).strip()}")
if not failures and "FAILURES" in log_content: # A very generic check
logger.warning("Found 'FAILURES' in log but couldn't parse specific test blocks. Returning raw log segment if possible.")
# Try to return a segment of the log if it seems to contain failures but parsing failed.
fail_summary_match = re.search(r"FAILURES\s*={70,}", log_content, re.MULTILINE)
if fail_summary_match:
failures.append(log_content[fail_summary_match.start():])
return failures return failures
# --- Example Usage (Illustrative) --- # --- Example Usage (Illustrative) ---
if __name__ == "__main__": if __name__ == "__main__":
# This example assumes you have GITHUB_TOKEN environment variable set # This example assumes you have GITHUB_TOKEN environment variable set
# And that 'requests' is installed. # And that \'requests\' is installed.
# Replace with your actual repo owner, name, and PR number. # Replace with your actual repo owner, name, and PR number.
# PR #206 from user's previous message. pr_number = 206 # Example PR
pr_number = 206 repo_owner = "bucolucas" # Example owner
repo_owner = "bucolucas" repo_name = "cyclop" # Example repo
repo_name = "cyclop"
# Setup basic logging for the example # Setup basic logging for the example
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') # In a real app, logger would be configured externally
logging.basicConfig(level=logging.INFO, format=\'%(asctime)s - %(name)s - %(levelname)s - %(message)s\')
example_logger = logging.getLogger("GitHubCIHelperExample")
helper = GitHubCIHelper(repo_owner, repo_name)
logger.info(f"Looking for failed runs for PR #{pr_number} in {repo_owner}/{repo_name}") # Pass the logger to the helper
# Assuming your workflow file is named "Python CI.yml" which results in "Python CI" display name. helper = GitHubCIHelper(repo_owner, repo_name, logger_instance=example_logger)
# And the job name within that workflow is "test".
example_logger.info(f"Looking for failed runs for PR #{pr_number} in {repo_owner}/{repo_name}")
failed_run = helper.get_latest_failed_run_for_pr(pull_request_number=pr_number, workflow_name="Python CI") failed_run = helper.get_latest_failed_run_for_pr(pull_request_number=pr_number, workflow_name="Python CI")
if failed_run: if failed_run:
logger.info(f"Found failed run: ID {failed_run['id']}, Status {failed_run['conclusion']}") example_logger.info(f"Found failed run: ID {failed_run[\'id\']}, Status {failed_run[\'conclusion\']}")
logger.info(f"Attempting to download logs for job 'test' in run {failed_run['id']}...") example_logger.info(f"Attempting to download logs for job \'test\' in run {failed_run[\'id\']}...")
# The job name 'test' comes from the workflow file: log_content = helper.get_job_logs_for_run(run_id=failed_run[\'id\'], job_name="test")
# jobs:
# test: <-- this is the job name
# runs-on: self-hosted
log_content = helper.get_job_logs_for_run(run_id=failed_run['id'], job_name="test")
if log_content: if isinstance(log_content, str) and not log_content.startswith("Error") and not log_content.startswith("Job") and not log_content.startswith("Failed"):
logger.info(f"Successfully downloaded logs (length: {len(log_content)} characters).") example_logger.info(f"Successfully downloaded logs (length: {len(log_content)} characters).")
# print("\n--- Full Log Content (first 1000 chars) ---")
# print(log_content[:1000])
# print("\n--- End of Log Snippet ---")
logger.info("\n--- Parsing unittest failures ---") example_logger.info("\n--- Parsing unittest failures ---")
failures = helper.parse_unittest_failures_from_log(log_content) failures = helper.parse_unittest_failures_from_log(log_content)
if failures: if failures:
for i, failure_details in enumerate(failures): for i, failure_details in enumerate(failures):
print(f"\nFailure {i+1}:\n{failure_details}") print(f"\nFailure {i+1}:\n{failure_details}")
else: else:
print("No specific unittest failures parsed by the tool, or tests might have passed within the job despite job failure.") print("No specific unittest failures parsed by the tool.")
print("This can happen if the job fails for reasons other than Python test failures (e.g., setup error, script error).") # Consider logging the beginning of the log if parsing fails, for debugging the regexes
print("Or, the log parsing regex might need adjustment for your specific unittest output format.") # print(f"Log start:\n{log_content[:2000]}")
elif log_content is None:
example_logger.error("Could not retrieve log content (returned None).")
else: # If it\'s an error message string from the function itself
example_logger.error(f"Failed to get/process logs: {log_content}")
else: else:
logger.error("Could not retrieve or process log content.") example_logger.info(f"No failed \'Python CI\' workflow run found for PR #{pr_number} or the PR doesn\'t exist/no runs yet.")
else:
logger.info(f"No failed 'Python CI' workflow run found for PR #{pr_number} or the PR doesn't exist/no runs yet.")
# Example of how to use if you know the run ID directly:
# known_run_id = 123456789 # replace with an actual run_id
# log_content = helper.get_job_logs_for_run(known_run_id, job_name="test")
# if log_content:
# failures = helper.parse_unittest_failures_from_log(log_content)
# ...