Files
cyclop/tools/github_ci_tool.py
T

355 lines
19 KiB
Python

import requests
import os
import zipfile
import io
import re
import logging
from .base_tool import BaseTool
# Configure logging for the tool - This will be handled by the logger instance now
# logger = logging.getLogger(__name__) # Commented out or removed
class GitHubCIHelper(BaseTool): # Inherits from BaseTool
"""
A helper class to interact with GitHub Actions CI,
specifically for fetching and analyzing test logs.
"""
def __init__(self, repo_owner: str = None, repo_name: str = None, github_token: str = None, session=None, logger_instance=None):
# Logger setup: Prefer logger_instance, then BaseTool's logger, then a default.
if logger_instance:
self.logger = logger_instance
elif not hasattr(self, 'logger') or self.logger is None: # If BaseTool didn't set it or set to None
self.logger = logging.getLogger(__name__ + '.' + self.__class__.__name__)
# Ensure logger has handlers to prevent "No handlers could be found"
if not self.logger.handlers:
self.logger.addHandler(logging.NullHandler())
# Repo owner and name handling
if repo_owner and repo_name:
self.repo_owner = repo_owner
self.repo_name = repo_name
else:
github_repository = os.environ.get("GITHUB_REPOSITORY")
if not github_repository or '/' not in github_repository:
self.logger.error("GITHUB_REPOSITORY environment variable not set or in incorrect format (expected 'owner/repo') and repo_owner/repo_name not provided during __init__.")
raise ValueError("GITHUB_REPOSITORY environment variable not set or in incorrect format (expected 'owner/repo') and repo_owner/repo_name not provided.")
self.repo_owner, self.repo_name = github_repository.split('/', 1)
self.logger.info(f"Initialized repo_owner/repo_name from GITHUB_REPOSITORY: {self.repo_owner}/{self.repo_name}")
self.base_url = f"https://api.github.com/repos/{self.repo_owner}/{self.repo_name}"
self._token = github_token or os.environ.get("GITHUB_TOKEN")
if not self._token:
self.logger.warning("GitHub token not provided directly or via GITHUB_TOKEN. API requests may be rate-limited or fail for private resources.")
self.headers = {
"Accept": "application/vnd.github.v3+json"
}
if self._token:
self.headers["Authorization"] = f"token {self._token}"
if session:
self.session = session
else:
self.session = requests.Session()
self.logger.info(f"GitHubCIHelper initialized for {self.repo_owner}/{self.repo_name}.")
def get_functions(self):
return [
{
"type": "function",
"function": {
"name": "get_pr_workflow_runs",
"description": "Gets all workflow runs associated with a specific pull request.",
"parameters": {
"type": "object",
"properties": {
"pull_request_number": {"type": "integer", "description": "The number of the pull request."}
},
"required": ["pull_request_number"]
}
},
"_tags": ["read"]
},
{
"type": "function",
"function": {
"name": "get_latest_failed_run_for_pr",
"description": "Gets the latest failed workflow run for a specific pull request and workflow name.",
"parameters": {
"type": "object",
"properties": {
"pull_request_number": {"type": "integer", "description": "The number of the pull request."},
"workflow_name": {"type": "string", "description": "The display name of the workflow (e.g., '''Python CI''').", "default": "Python CI"}
},
"required": ["pull_request_number"]
}
},
"_tags": ["read"]
},
{
"type": "function",
"function": {
"name": "get_job_logs_for_run",
"description": "Downloads and returns the logs for a specific job within a workflow run.",
"parameters": {
"type": "object",
"properties": {
"run_id": {"type": "integer", "description": "The ID of the workflow run."},
"job_name": {"type": "string", "description": "The name of the job (e.g., '''test''').", "default": "test"}
},
"required": ["run_id"]
}
},
"_tags": ["read"]
},
{
"type": "function",
"function": {
"name": "parse_unittest_failures_from_log",
"description": "Parses unittest failure details from log content. This is a basic parser and might need adjustments based on specific log formats.",
"parameters": {
"type": "object",
"properties": {
"log_content": {"type": "string", "description": "The string content of the job log."}
},
"required": ["log_content"]
}
},
"_tags": ["read"]
}
]
def execute(self, function_name, **kwargs):
self.logger.info(f"Executing GitHub CI Helper function: {function_name} with args: {kwargs}")
# Dispatch to the appropriate public method
if hasattr(self, function_name) and callable(getattr(self, function_name)) and not function_name.startswith("_"):
method = getattr(self, function_name)
try:
return method(**kwargs)
except Exception as e:
self.logger.error(f"Error executing {function_name}: {e}", exc_info=True)
return f"Error during {function_name} execution: {str(e)}"
else:
error_message = f"Unknown or private function: {function_name}"
self.logger.error(error_message)
return error_message
def clear(self):
"""Clears any sensitive state if necessary. For this tool, it's a no-op but present for interface consistency."""
self.logger.info("GitHubCIHelper state cleared (no specific state to clear).")
def _make_request(self, method: str, url: str, **kwargs): # Added @metrics.measure
"""Helper function for making HTTP requests."""
try:
# Use self.session instead of requests directly
response = self.session.request(method, url, headers=self.headers, **kwargs)
response.raise_for_status()
if response.content and response.headers.get("Content-Type", "").startswith("application/json"):
return response.json()
elif response.content: # For non-JSON content like zip files or plain text logs
return response
return None
except requests.exceptions.HTTPError as e:
self.logger.error(f"HTTP error occurred: {e} - {e.response.text if e.response else 'No response text'}") # Use self.logger
raise
except requests.exceptions.RequestException as e:
self.logger.error(f"Request failed: {e}") # Use self.logger
raise
def get_pr_workflow_runs(self, pull_request_number: int): # Added @metrics.measure
"""
Gets all workflow runs associated with a specific pull request.
"""
self.logger.info(f"Getting workflow runs for PR #{pull_request_number}")
pr_url = f"{self.base_url}/pulls/{pull_request_number}"
try:
pr_response = self._make_request("GET", pr_url) # this returns a response object or parsed JSON
pr_data = pr_response if isinstance(pr_response, dict) else pr_response.json() # Ensure pr_data is dict
if not pr_data or "head" not in pr_data or "sha" not in pr_data["head"]:
self.logger.error(f"Could not get head SHA for PR {pull_request_number}. Response: {pr_data}")
return None
head_sha = pr_data["head"]["sha"]
runs_url = f"{self.base_url}/actions/runs?event=pull_request&head_sha={head_sha}"
runs_response = self._make_request("GET", runs_url)
runs_data = runs_response if isinstance(runs_response, dict) else runs_response.json()
return runs_data.get("workflow_runs") if runs_data else None
except Exception as e:
self.logger.error(f"Failed to get PR workflow runs for PR {pull_request_number}: {e}", exc_info=True)
return None
def get_latest_failed_run_for_pr(self, pull_request_number: int, workflow_name: str = "Python CI"): # Added @metrics.measure
"""
Gets the latest failed workflow run for a specific pull request and workflow name.
"""
self.logger.info(f"Getting latest failed run for PR #{pull_request_number}, workflow: '{workflow_name}'")
runs = self.get_pr_workflow_runs(pull_request_number)
if not runs:
self.logger.info(f"No runs found for PR #{pull_request_number} to check for failures.")
return None
for run in sorted(runs, key=lambda r: r["created_at"], reverse=True):
if run["name"] == workflow_name and run["conclusion"] == "failure":
self.logger.info(f"Found failed run {run['id']} for workflow '{workflow_name}' in PR #{pull_request_number}")
return run
self.logger.info(f"No failed run for workflow '{workflow_name}' found for PR #{pull_request_number}") # Use self.logger
return None
def get_job_logs_for_run(self, run_id: int, job_name: str = "test"): # Added @metrics.measure
"""
Downloads and returns the logs for a specific job within a workflow run.
"""
self.logger.info(f"Getting job logs for run ID {run_id}, job name '{job_name}'")
jobs_url = f"{self.base_url}/actions/runs/{run_id}/jobs"
target_job = None # Initialize target_job here to ensure it's defined for later logging
try:
jobs_response = self._make_request("GET", jobs_url)
jobs_data = jobs_response if isinstance(jobs_response, dict) else jobs_response.json()
if not jobs_data or "jobs" not in jobs_data:
self.logger.error(f"Could not retrieve jobs for run ID {run_id}. Response: {jobs_data}")
return None
for job in jobs_data["jobs"]:
if job["name"] == job_name:
target_job = job
break
if not target_job:
self.logger.error(f"Job '{job_name}' not found in run ID {run_id}")
return None
if target_job["status"] != "completed":
self.logger.info(f"Job '{job_name}' in run ID {run_id} has not completed. Status: {target_job['status']}")
return f"Job '{job_name}' not yet completed (status: {target_job['status']}). Logs may be unavailable."
logs_url = f"{self.base_url}/actions/jobs/{target_job['id']}/logs"
self.logger.info(f"Attempting to download logs from: {logs_url}")
log_response = self.session.get(logs_url, headers=self.headers, allow_redirects=True, stream=True)
log_response.raise_for_status()
if 'application/zip' in log_response.headers.get('Content-Type', ''):
self.logger.info(f"Received zip file for logs of job ID {target_job['id']}.")
with zipfile.ZipFile(io.BytesIO(log_response.content)) as zf:
log_file_names = [name for name in zf.namelist() if not name.endswith('/')]
if not log_file_names:
self.logger.error(f"No files found in the downloaded log zip for job ID {target_job['id']}.")
return None
actual_log_file_name = log_file_names[0]
for name in log_file_names:
# Improved heuristic for log file name
if job_name in name or "test" in name.lower() or "log" in name.lower() or "out" in name.lower() or "step" in name:
actual_log_file_name = name
break
self.logger.info(f"Extracting log file: {actual_log_file_name} from zip for job ID {target_job['id']}.")
with zf.open(actual_log_file_name) as log_file:
return log_file.read().decode("utf-8")
else:
self.logger.info(f"Received plain text logs for job ID {target_job['id']}.")
return log_response.text
except requests.exceptions.HTTPError as e:
self.logger.error(f"HTTP error downloading logs for job ID {target_job.get('id', 'unknown') if target_job else 'unknown'}: {e} - {e.response.text if e.response else 'No response text'}", exc_info=True)
if e.response and e.response.status_code == 404:
self.logger.error("Log download URL might be invalid or logs expired.")
return f"Error downloading logs: {e}"
except requests.exceptions.RequestException as e:
self.logger.error(f"Request failed downloading logs for job ID {target_job.get('id', 'unknown') if target_job else 'unknown'}: {e}", exc_info=True)
return f"Error during log download request: {e}"
except zipfile.BadZipFile:
self.logger.error(f"Failed to unzip logs for job ID {target_job.get('id', 'unknown') if target_job else 'unknown'}.", exc_info=True)
# Adding response text for BadZipFile can be risky if it's large binary data.
# Consider logging only a snippet or specific headers if this occurs frequently.
return "Failed to unzip logs."
except Exception as e:
self.logger.error(f"An unexpected error occurred while processing logs for job {target_job.get('id', 'unknown') if target_job else 'unknown'}: {e}", exc_info=True)
return f"Unexpected error processing logs: {e}"
def parse_unittest_failures_from_log(self, log_content: str): # Added @metrics.measure
"""
Parses unittest failure details from log content.
"""
if not log_content:
self.logger.info("Log content is empty, no failures to parse.")
return []
self.logger.info(f"Parsing unittest failures from log content (length: {len(log_content)}).")
# Regex to capture standard unittest failure blocks
# It looks for "FAIL:" or "ERROR:", the test name (e.g., test_my_method), the class (e.g., my_module.MyTestClass),
# then a line of hyphens, "Traceback (most recent call last):", and the traceback details.
# It stops before the next failure block or common summary lines.
failure_pattern = re.compile(
r"^(FAIL|ERROR): (.*?)\s*\\((.*?)\\)\s*\n-{5,}\\s*\nTraceback \\(most recent call last\\):\\s*\n(.*?)(?=\n(?:FAIL:|ERROR:)|\n-{5,}\\s*\nRan \\d+ tests? in|\\Z)",
re.DOTALL | re.MULTILINE
)
failures = []
for match in failure_pattern.finditer(log_content):
failure_type = match.group(1) # FAIL or ERROR
test_name = match.group(2).strip() # e.g., test_specific_behavior
test_module_class = match.group(3).strip() # e.g., tests.test_module.TestMyFeature
traceback_details = match.group(4).strip() # The actual traceback
# Reconstruct a readable failure block
failure_block = (
f"{failure_type}: {test_name} ({test_module_class})\n"
f"---------------------\n"
f"Traceback (most recent call last):\n"
f"{traceback_details}"
)
failures.append(failure_block)
if failures:
self.logger.info(f"Parsed {len(failures)} failure blocks using primary regex.")
return failures
# Fallback: A more general pattern if the above doesn't match (e.g., due to slight variations in formatting)
# This looks for "FAIL:" or "ERROR:", a line for the test name, then captures content until common separators.
general_failure_pattern = re.compile(
r"^(FAIL|ERROR): ([^\n]+)\n(.*?)(?=\n(?:FAIL:|ERROR:)|\n-{20,}\n|Ran \\d+ tests? in|\\Z)",
re.DOTALL | re.MULTILINE
)
for match in general_failure_pattern.finditer(log_content):
failure_type = match.group(1)
test_header = match.group(2).strip()
details = match.group(3).strip()
full_block = f"{failure_type}: {test_header}\n{details}"
# Avoid adding essentially duplicate or overly broad captures if specific ones exist
if not any(f.startswith(f"{failure_type}: {test_header}") for f in failures):
failures.append(full_block)
if failures: # Check if fallback added anything
self.logger.info(f"Parsed {len(failures)} failure blocks using general fallback regex.")
return failures
# Last resort: if specific "FAILURES!!!" section is found, often this contains a summary.
# This might be too broad or not structured enough, but better than nothing.
if "FAILURES!!!" in log_content or "SUMMARY OF FAILURES" in log_content:
summary_marker = "FAILURES!!!" if "FAILURES!!!" in log_content else "SUMMARY OF FAILURES"
start_index = log_content.find(summary_marker)
if start_index != -1:
# Try to find a reasonable end for this summary block
end_pattern = re.compile(r"Ran \\d+ tests? in [\\d\\.]+s|\n-{70,}")
end_match = end_pattern.search(log_content, start_index)
end_index = end_match.start() if end_match else len(log_content)
failure_summary_block = log_content[start_index:end_index].strip()
if failure_summary_block:
failures.append(f"FAILURE SUMMARY BLOCK:\n{failure_summary_block}")
self.logger.info("Captured a general failure summary block.")
return failures
if not failures:
self.logger.info("No specific unittest failure blocks parsed with available patterns.")
return failures