From d9bc69b5da0e33df974477336b0800c95682b775 Mon Sep 17 00:00:00 2001 From: cyclop-bot <178948048+cyclop-bot@users.noreply.github.com> Date: Mon, 2 Jun 2025 18:35:11 -0500 Subject: [PATCH 1/3] feat: Add tool definition and BaseTool integration to GitHubCIHelper Integrates GitHubCIHelper with the BaseTool framework by: - Adding get_functions() to define tool's capabilities in JSON format. - Adding an execute() method for dispatching calls to its public methods. - Updating __init__ for logger and session dependency injection. - Applying @metrics.measure decorator to public methods. - Enhancing logging, error handling, and regex for log parsing. --- tools/github_ci_tool.py | 297 +--------------------------------------- 1 file changed, 1 insertion(+), 296 deletions(-) diff --git a/tools/github_ci_tool.py b/tools/github_ci_tool.py index d63bbbf..e1154e4 100644 --- a/tools/github_ci_tool.py +++ b/tools/github_ci_tool.py @@ -1,296 +1 @@ -import requests -import os -import zipfile -import io -import re -import logging - -# Configure logging for the tool -logger = logging.getLogger(__name__) - -class GitHubCIHelper: - """ - A helper class to interact with GitHub Actions CI, - specifically for fetching and analyzing test logs. - """ - def __init__(self, repo_owner: str, repo_name: str, github_token: str = None): - """ - Initializes the GitHubCIHelper. - - Args: - repo_owner (str): The owner of the GitHub repository (e.g., '''bucolucas'''). - repo_name (str): The name of the GitHub repository (e.g., '''cyclop'''). - github_token (str, optional): A GitHub Personal Access Token (PAT) - for API authentication. Recommended for - private repos or higher rate limits. - Can also be set via GITHUB_TOKEN env var. - """ - self.repo_owner = repo_owner - self.repo_name = repo_name - self.base_url = f"https://api.github.com/repos/{self.repo_owner}/{self.repo_name}" - self.github_token = github_token or os.environ.get('GITHUB_TOKEN') - self.headers = { - "Accept": "application/vnd.github.v3+json" - } - if self.github_token: - self.headers["Authorization"] = f"token {self.github_token}" - - def _make_request(self, method: str, url: str, **kwargs): - """Helper function for making HTTP requests.""" - try: - response = requests.request(method, url, headers=self.headers, **kwargs) - response.raise_for_status() # Raise an exception for bad status codes - if response.content: - return response.json() - return None - except requests.exceptions.HTTPError as e: - logger.error(f"HTTP error occurred: {e} - {e.response.text}") - raise - except requests.exceptions.RequestException as e: - logger.error(f"Request failed: {e}") - raise - - def get_pr_workflow_runs(self, pull_request_number: int): - """ - Gets all workflow runs associated with a specific pull request. - - Args: - pull_request_number (int): The number of the pull request. - - Returns: - list: A list of workflow runs, or None if an error occurs. - """ - # First, get the head SHA of the PR - pr_url = f"{self.base_url}/pulls/{pull_request_number}" - pr_data = self._make_request("GET", pr_url) - if not pr_data or 'head' not in pr_data or 'sha' not in pr_data['head']: - logger.error(f"Could not get head SHA for PR {pull_request_number}") - return None - head_sha = pr_data['head']['sha'] - - # Then, get workflow runs for that commit SHA - runs_url = f"{self.base_url}/actions/runs?event=pull_request&head_sha={head_sha}" - runs_data = self._make_request("GET", runs_url) - return runs_data.get("workflow_runs") if runs_data else None - - def get_latest_failed_run_for_pr(self, pull_request_number: int, workflow_name: str = "Python CI"): - """ - Gets the latest failed workflow run for a specific pull request and workflow name. - - Args: - pull_request_number (int): The number of the pull request. - workflow_name (str): The display name of the workflow (e.g., "Python CI"). - - Returns: - dict: The workflow run object if a failed run is found, otherwise None. - """ - runs = self.get_pr_workflow_runs(pull_request_number) - if not runs: - return None - - for run in sorted(runs, key=lambda r: r['created_at'], reverse=True): - if run['name'] == workflow_name and run['conclusion'] == 'failure': - return run - logger.info(f"No failed run for workflow '{workflow_name}' found for PR {pull_request_number}") - return None - - def get_job_logs_for_run(self, run_id: int, job_name: str = "test"): - """ - Downloads and returns the logs for a specific job within a workflow run. - - Args: - run_id (int): The ID of the workflow run. - job_name (str): The name of the job (e.g., "test"). - - Returns: - str: The job logs as a string, or None if an error occurs or job not found. - """ - jobs_url = f"{self.base_url}/actions/runs/{run_id}/jobs" - jobs_data = self._make_request("GET", jobs_url) - - if not jobs_data or "jobs" not in jobs_data: - logger.error(f"Could not retrieve jobs for run ID {run_id}") - return None - - target_job = None - for job in jobs_data["jobs"]: - if job["name"] == job_name: - target_job = job - break - - if not target_job: - logger.error(f"Job '{job_name}' not found in run ID {run_id}") - return None - - if target_job['status'] != 'completed': - logger.info(f"Job '{job_name}' in run ID {run_id} has not completed. Status: {target_job['status']}") - # You might want to handle this differently, e.g., wait or return specific status - return None - - - logs_url = f"{self.base_url}/actions/jobs/{target_job['id']}/logs" - logger.info(f"Attempting to download logs from: {logs_url}") - - try: - # Logs are often zipped, GitHub API redirects to a download URL for the zip - response = requests.get(logs_url, headers=self.headers, allow_redirects=True, stream=True) - response.raise_for_status() - - # Check if the content is a zip file - if 'application/zip' in response.headers.get('Content-Type', '''): - with zipfile.ZipFile(io.BytesIO(response.content)) as zf: - # Assuming there's one log file in the zip, or a specific named one - # List files in zip to find the actual log file name - log_file_names = [name for name in zf.namelist() if not name.endswith('/')] # Exclude directories - if not log_file_names: - logger.error(f"No files found in the downloaded log zip for job ID {target_job['id']}.") - return None - - # Heuristic: try to find the most relevant log file. - # Often it's just job_name.txt or contains 'stdout'. - # For a single file zip, this is simple. - actual_log_file_name = log_file_names[0] # Default to first file - for name in log_file_names: - if job_name in name or "step" in name: # Or other heuristic - actual_log_file_name = name - break - - logger.info(f"Extracting log file: {actual_log_file_name} from zip.") - with zf.open(actual_log_file_name) as log_file: - return log_file.read().decode('utf-8') - else: - # If not a zip, assume it's plain text (less common for full logs now) - return response.text - - except requests.exceptions.HTTPError as e: - logger.error(f"HTTP error downloading logs for job ID {target_job['id']}: {e} - {e.response.text}") - # GitHub might return 404 if logs expired or job ID is wrong - if e.response.status_code == 404: - logger.error("Log download URL might be invalid or logs expired.") - except requests.exceptions.RequestException as e: - logger.error(f"Request failed downloading logs for job ID {target_job['id']}: {e}") - except zipfile.BadZipFile: - logger.error(f"Failed to unzip logs for job ID {target_job['id']}. Content was: {response.text[:500]}...") # Log beginning of content - except Exception as e: - logger.error(f"An unexpected error occurred while processing logs for job {target_job['id']}: {e}") - return None - - def parse_unittest_failures_from_log(self, log_content: str): - """ - Parses unittest failure details from log content. - This is a basic parser and might need adjustments based on specific log formats. - - Args: - log_content (str): The string content of the job log. - - Returns: - list: A list of strings, where each string is a block of a failed test's output. - """ - if not log_content: - return [] - - failures = [] - # Regex to find the start of a failure/error block (e.g., "FAIL: test_something (module.TestClass)") - # and capture everything until the next blank line or the standard "Ran X tests in Ys" line. - # This regex is complex and might need refinement. - # It looks for "FAIL:" or "ERROR:", captures the test name, then everything until "---" or "Ran x tests" - failure_pattern = re.compile( - r"^(FAIL|ERROR): ([^\n]+)\n(.*?)(?=\n-{70}\n|Ran \d+ tests in|^-{70}\nFAIL:|\nERROR:|\Z)", - re.DOTALL | re.MULTILINE - ) - - matches = failure_pattern.finditer(log_content) - for match in matches: - failure_type = match.group(1) # FAIL or ERROR - test_name_line = match.group(2).strip() # The line with the test name - details = match.group(3).strip() # The traceback and details - failures.append(f"{failure_type}: {test_name_line}\n{details}") - - # Fallback or simpler pattern if the above is too greedy or misses things: - # Look for lines starting with "FAIL:" or "ERROR:" and the traceback sections. - # This is highly dependent on the exact output structure of `unittest`. - if not failures: - # A simpler approach: find "Traceback (most recent call last):" - # and collect lines until a clear separator or end of section. - # This is less precise about associating the error with a specific test name from the header. - traceback_pattern = re.compile(r"Traceback \(most recent call last\):.*?\n(.*?\n)(?=\n[A-Z]+:|\n-{70}\n|Ran \d+ tests in|\Z)", re.DOTALL | re.MULTILINE) - tb_matches = traceback_pattern.finditer(log_content) - for i, tb_match in enumerate(tb_matches): - # Try to find a preceding FAIL/ERROR line if possible - # This part is tricky without more context from the log structure - context_before_tb = log_content[:tb_match.start()] - related_test_name = f"Unknown Test {i+1}" - last_fail_error_lines = re.findall(r"^(?:FAIL|ERROR): (.*)\s*$", context_before_tb, re.MULTILINE) - if last_fail_error_lines: - related_test_name = last_fail_error_lines[-1] - - failures.append(f"FAILURE/ERROR (from Traceback {i+1} - Test: {related_test_name}):\n{tb_match.group(0).strip()}") - - - if not failures and "FAILURES" in log_content: # A very generic check - logger.warning("Found 'FAILURES' in log but couldn't parse specific test blocks. Returning raw log segment if possible.") - # Try to return a segment of the log if it seems to contain failures but parsing failed. - fail_summary_match = re.search(r"FAILURES\s*={70,}", log_content, re.MULTILINE) - if fail_summary_match: - failures.append(log_content[fail_summary_match.start():]) - - - return failures - -# --- Example Usage (Illustrative) --- -if __name__ == "__main__": - # This example assumes you have GITHUB_TOKEN environment variable set - # And that 'requests' is installed. - # Replace with your actual repo owner, name, and PR number. - # PR #206 from user's previous message. - pr_number = 206 - repo_owner = "bucolucas" - repo_name = "cyclop" - - # Setup basic logging for the example - logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') - - helper = GitHubCIHelper(repo_owner, repo_name) - - logger.info(f"Looking for failed runs for PR #{pr_number} in {repo_owner}/{repo_name}") - # Assuming your workflow file is named "Python CI.yml" which results in "Python CI" display name. - # And the job name within that workflow is "test". - failed_run = helper.get_latest_failed_run_for_pr(pull_request_number=pr_number, workflow_name="Python CI") - - if failed_run: - logger.info(f"Found failed run: ID {failed_run['id']}, Status {failed_run['conclusion']}") - logger.info(f"Attempting to download logs for job 'test' in run {failed_run['id']}...") - - # The job name 'test' comes from the workflow file: - # jobs: - # test: <-- this is the job name - # runs-on: self-hosted - log_content = helper.get_job_logs_for_run(run_id=failed_run['id'], job_name="test") - - if log_content: - logger.info(f"Successfully downloaded logs (length: {len(log_content)} characters).") - # print("\n--- Full Log Content (first 1000 chars) ---") - # print(log_content[:1000]) - # print("\n--- End of Log Snippet ---") - - logger.info("\n--- Parsing unittest failures ---") - failures = helper.parse_unittest_failures_from_log(log_content) - if failures: - for i, failure_details in enumerate(failures): - print(f"\nFailure {i+1}:\n{failure_details}") - else: - print("No specific unittest failures parsed by the tool, or tests might have passed within the job despite job failure.") - print("This can happen if the job fails for reasons other than Python test failures (e.g., setup error, script error).") - print("Or, the log parsing regex might need adjustment for your specific unittest output format.") - - else: - logger.error("Could not retrieve or process log content.") - else: - logger.info(f"No failed 'Python CI' workflow run found for PR #{pr_number} or the PR doesn't exist/no runs yet.") - - # Example of how to use if you know the run ID directly: - # known_run_id = 123456789 # replace with an actual run_id - # log_content = helper.get_job_logs_for_run(known_run_id, job_name="test") - # if log_content: - # failures = helper.parse_unittest_failures_from_log(log_content) - # ... +import requests\nimport os\nimport zipfile\nimport io\nimport re\nimport logging\nfrom .base_tool import BaseTool # Added\nfrom .metrics import metrics # Added\n\n# Configure logging for the tool - This will be handled by the logger instance now\n# logger = logging.getLogger(__name__) # Commented out or removed\n\nclass GitHubCIHelper(BaseTool): # Inherits from BaseTool\n """\n A helper class to interact with GitHub Actions CI,\n specifically for fetching and analyzing test logs.\n """\n def __init__(self, repo_owner: str, repo_name: str, github_token: str = None, session=None, logger_instance=None): # Added session and logger_instance\n """\n Initializes the GitHubCIHelper.\n\n Args:\n repo_owner (str): The owner of the GitHub repository (e.g., \'\'\'bucolucas\'\'\').\n repo_name (str): The name of the GitHub repository (e.g., \'\'\'cyclop\'\'\').\n github_token (str, optional): A GitHub Personal Access Token (PAT)\n for API authentication. Recommended for\n private repos or higher rate limits.\n Can also be set via GITHUB_TOKEN env var.\n session (requests.Session, optional): An external requests session to use.\n logger_instance (logging.Logger, optional): An external logger instance.\n """\n self.repo_owner = repo_owner\n self.repo_name = repo_name\n self.base_url = f"https://api.github.com/repos/{self.repo_owner}/{self.repo_name}"\n self._token = github_token or os.environ.get(\'GITHUB_TOKEN\') # Renamed to _token for consistency\n \n self.headers = {\n "Accept": "application/vnd.github.v3+json"\n }\n if self._token: # Use self._token\n self.headers["Authorization"] = f"token {self._token}"\n\n if session:\n self.session = session\n else:\n self.session = requests.Session()\n # Headers are applied per-request in _make_request or directly where self.session is used if needed globally for session\n\n self.logger = logger_instance if logger_instance else logging.getLogger(__name__)\n if not self.logger.handlers:\n self.logger.addHandler(logging.NullHandler())\n\n def get_functions(self):\n return [\n {\n "type": "function",\n "function": {\n "name": "get_pr_workflow_runs",\n "description": "Gets all workflow runs associated with a specific pull request.",\n "parameters": {\n "type": "object",\n "properties": {\n "pull_request_number": {"type": "integer", "description": "The number of the pull request."}\n },\n "required": ["pull_request_number"]\n }\n }\n },\n {\n "type": "function",\n "function": {\n "name": "get_latest_failed_run_for_pr",\n "description": "Gets the latest failed workflow run for a specific pull request and workflow name.",\n "parameters": {\n "type": "object",\n "properties": {\n "pull_request_number": {"type": "integer", "description": "The number of the pull request."},\n "workflow_name": {"type": "string", "description": "The display name of the workflow (e.g., \'\'\'Python CI\'\'\').", "default": "Python CI"}\n },\n "required": ["pull_request_number"]\n }\n }\n },\n {\n "type": "function",\n "function": {\n "name": "get_job_logs_for_run",\n "description": "Downloads and returns the logs for a specific job within a workflow run.",\n "parameters": {\n "type": "object",\n "properties": {\n "run_id": {"type": "integer", "description": "The ID of the workflow run."},\n "job_name": {"type": "string", "description": "The name of the job (e.g., \'\'\'test\'\'\').", "default": "test"}\n },\n "required": ["run_id"]\n }\n }\n },\n {\n "type": "function",\n "function": {\n "name": "parse_unittest_failures_from_log",\n "description": "Parses unittest failure details from log content. This is a basic parser and might need adjustments based on specific log formats.",\n "parameters": {\n "type": "object",\n "properties": {\n "log_content": {"type": "string", "description": "The string content of the job log."}\n },\n "required": ["log_content"]\n }\n }\n }\n ]\n\n @metrics.measure\n def execute(self, function_name, **kwargs):\n self.logger.info(f"Executing GitHub CI Helper function: {function_name} with args: {kwargs}")\n # Dispatch to the appropriate public method\n if hasattr(self, function_name) and callable(getattr(self, function_name)) and not function_name.startswith("_"):\n method = getattr(self, function_name)\n try:\n return method(**kwargs)\n except Exception as e:\n self.logger.error(f"Error executing {function_name}: {e}", exc_info=True)\n return f"Error during {function_name} execution: {str(e)}"\n else:\n error_message = f"Unknown or private function: {function_name}"\n self.logger.error(error_message)\n return error_message\n \n def clear(self):\n """Clears any sensitive state if necessary. For this tool, it\'s a no-op but present for interface consistency."""\n self.logger.info("GitHubCIHelper state cleared (no specific state to clear).")\n\n\n @metrics.measure\n def _make_request(self, method: str, url: str, **kwargs): # Added @metrics.measure\n """Helper function for making HTTP requests."""\n try:\n # Use self.session instead of requests directly\n response = self.session.request(method, url, headers=self.headers, **kwargs)\n response.raise_for_status()\n if response.content and response.headers.get(\'Content-Type\', \'\').startswith(\'application/json\'):\n return response.json()\n elif response.content: # For non-JSON content like zip files or plain text logs\n return response \n return None\n except requests.exceptions.HTTPError as e:\n self.logger.error(f"HTTP error occurred: {e} - {e.response.text if e.response else \'No response text\'}") # Use self.logger\n raise\n except requests.exceptions.RequestException as e:\n self.logger.error(f"Request failed: {e}") # Use self.logger\n raise\n\n @metrics.measure\n def get_pr_workflow_runs(self, pull_request_number: int): # Added @metrics.measure\n """\n Gets all workflow runs associated with a specific pull request.\n """\n self.logger.info(f"Getting workflow runs for PR #{pull_request_number}")\n pr_url = f"{self.base_url}/pulls/{pull_request_number}"\n try:\n pr_response = self._make_request("GET", pr_url) # this returns a response object or parsed JSON\n pr_data = pr_response if isinstance(pr_response, dict) else pr_response.json() # Ensure pr_data is dict\n\n if not pr_data or \'head\' not in pr_data or \'sha\' not in pr_data[\'head\']:\n self.logger.error(f"Could not get head SHA for PR {pull_request_number}. Response: {pr_data}")\n return None\n head_sha = pr_data[\'head\'][\'sha\']\n\n runs_url = f"{self.base_url}/actions/runs?event=pull_request&head_sha={head_sha}"\n runs_response = self._make_request("GET", runs_url)\n runs_data = runs_response if isinstance(runs_response, dict) else runs_response.json()\n\n return runs_data.get("workflow_runs") if runs_data else None\n except Exception as e:\n self.logger.error(f"Failed to get PR workflow runs for PR {pull_request_number}: {e}", exc_info=True)\n return None\n\n\n @metrics.measure\n def get_latest_failed_run_for_pr(self, pull_request_number: int, workflow_name: str = "Python CI"): # Added @metrics.measure\n """\n Gets the latest failed workflow run for a specific pull request and workflow name.\n """\n self.logger.info(f"Getting latest failed run for PR #{pull_request_number}, workflow: \'{workflow_name}\'")\n runs = self.get_pr_workflow_runs(pull_request_number)\n if not runs:\n self.logger.info(f"No runs found for PR #{pull_request_number} to check for failures.")\n return None\n\n for run in sorted(runs, key=lambda r: r[\'created_at\'], reverse=True):\n if run[\'name\'] == workflow_name and run[\'conclusion\'] == \'failure\':\n self.logger.info(f"Found failed run {run[\'id\']} for workflow \'{workflow_name}\' in PR #{pull_request_number}")\n return run\n self.logger.info(f"No failed run for workflow \'{workflow_name}\' found for PR #{pull_request_number}") # Use self.logger\n return None\n\n @metrics.measure\n def get_job_logs_for_run(self, run_id: int, job_name: str = "test"): # Added @metrics.measure\n """\n Downloads and returns the logs for a specific job within a workflow run.\n """\n self.logger.info(f"Getting job logs for run ID {run_id}, job name \'{job_name}\'")\n jobs_url = f"{self.base_url}/actions/runs/{run_id}/jobs"\n target_job = None # Initialize target_job here to ensure it\'s defined for later logging\n try:\n jobs_response = self._make_request("GET", jobs_url)\n jobs_data = jobs_response if isinstance(jobs_response, dict) else jobs_response.json()\n\n if not jobs_data or "jobs" not in jobs_data:\n self.logger.error(f"Could not retrieve jobs for run ID {run_id}. Response: {jobs_data}")\n return None\n\n for job in jobs_data["jobs"]:\n if job["name"] == job_name:\n target_job = job\n break\n\n if not target_job:\n self.logger.error(f"Job \'{job_name}\' not found in run ID {run_id}")\n return None\n\n if target_job[\'status\'] != \'completed\':\n self.logger.info(f"Job \'{job_name}\' in run ID {run_id} has not completed. Status: {target_job[\'status\']}")\n return f"Job \'{job_name}\' not yet completed (status: {target_job[\'status\']}). Logs may be unavailable."\n\n\n logs_url = f"{self.base_url}/actions/jobs/{target_job[\'id\']}/logs"\n self.logger.info(f"Attempting to download logs from: {logs_url}")\n \n log_response = self.session.get(logs_url, headers=self.headers, allow_redirects=True, stream=True)\n log_response.raise_for_status()\n\n if \'application/zip\' in log_response.headers.get(\'Content-Type\', \'\'):\n self.logger.info(f"Received zip file for logs of job ID {target_job[\'id\']}.")\n with zipfile.ZipFile(io.BytesIO(log_response.content)) as zf:\n log_file_names = [name for name in zf.namelist() if not name.endswith(\'/\')]\n if not log_file_names:\n self.logger.error(f"No files found in the downloaded log zip for job ID {target_job[\'id\']}.")\n return None\n \n actual_log_file_name = log_file_names[0]\n for name in log_file_names:\n # Improved heuristic for log file name\n if job_name in name or "test" in name.lower() or "log" in name.lower() or "out" in name.lower() or "step" in name:\n actual_log_file_name = name\n break\n \n self.logger.info(f"Extracting log file: {actual_log_file_name} from zip for job ID {target_job[\'id\']}.")\n with zf.open(actual_log_file_name) as log_file:\n return log_file.read().decode(\'utf-8\')\n else:\n self.logger.info(f"Received plain text logs for job ID {target_job[\'id\']}.")\n return log_response.text\n\n except requests.exceptions.HTTPError as e:\n self.logger.error(f"HTTP error downloading logs for job ID {target_job.get(\'id\', \'unknown\') if target_job else \'unknown\'}: {e} - {e.response.text if e.response else \'No response text\'}", exc_info=True)\n if e.response and e.response.status_code == 404:\n self.logger.error("Log download URL might be invalid or logs expired.")\n return f"Error downloading logs: {e}"\n except requests.exceptions.RequestException as e:\n self.logger.error(f"Request failed downloading logs for job ID {target_job.get(\'id\', \'unknown\') if target_job else \'unknown\'}: {e}", exc_info=True)\n return f"Error during log download request: {e}"\n except zipfile.BadZipFile:\n self.logger.error(f"Failed to unzip logs for job ID {target_job.get(\'id\', \'unknown\') if target_job else \'unknown\'}.", exc_info=True)\n # Adding response text for BadZipFile can be risky if it's large binary data.\n # Consider logging only a snippet or specific headers if this occurs frequently.\n return "Failed to unzip logs."\n except Exception as e:\n self.logger.error(f"An unexpected error occurred while processing logs for job {target_job.get(\'id\', \'unknown\') if target_job else \'unknown\'}: {e}", exc_info=True)\n return f"Unexpected error processing logs: {e}"\n\n\n @metrics.measure\n def parse_unittest_failures_from_log(self, log_content: str): # Added @metrics.measure\n """\n Parses unittest failure details from log content.\n """\n if not log_content:\n self.logger.info("Log content is empty, no failures to parse.")\n return []\n\n self.logger.info(f"Parsing unittest failures from log content (length: {len(log_content)}).")\n \n # Regex to capture standard unittest failure blocks\n # It looks for "FAIL:" or "ERROR:", the test name (e.g., test_my_method), the class (e.g., my_module.MyTestClass),\n # then a line of hyphens, "Traceback (most recent call last):", and the traceback details.\n # It stops before the next failure block or common summary lines.\n failure_pattern = re.compile(\n r"^(FAIL|ERROR): (.*?)\s*\\((.*?)\\)\s*\\n-{5,}\\s*\\nTraceback \\(most recent call last\\):\\s*\\n(.*?)(?=\\n(?:FAIL:|ERROR:)|\\n-{5,}\\s*\\nRan \\d+ tests? in|\\Z)",\n re.DOTALL | re.MULTILINE\n )\n \n failures = []\n for match in failure_pattern.finditer(log_content):\n failure_type = match.group(1) # FAIL or ERROR\n test_name = match.group(2).strip() # e.g., test_specific_behavior\n test_module_class = match.group(3).strip() # e.g., tests.test_module.TestMyFeature\n traceback_details = match.group(4).strip() # The actual traceback\n \n # Reconstruct a readable failure block\n failure_block = (\n f"{failure_type}: {test_name} ({test_module_class})\\n"\n f"---------------------\\n"\n f"Traceback (most recent call last):\\n"\n f"{traceback_details}"\n )\n failures.append(failure_block)\n\n if failures:\n self.logger.info(f"Parsed {len(failures)} failure blocks using primary regex.")\n return failures\n\n # Fallback: A more general pattern if the above doesn't match (e.g., due to slight variations in formatting)\n # This looks for "FAIL:" or "ERROR:", a line for the test name, then captures content until common separators.\n general_failure_pattern = re.compile(\n r"^(FAIL|ERROR): ([^\\n]+)\\n(.*?)(?=\\n(?:FAIL:|ERROR:)|\\n-{20,}\\n|Ran \\d+ tests? in|\\Z)",\n re.DOTALL | re.MULTILINE\n )\n for match in general_failure_pattern.finditer(log_content):\n failure_type = match.group(1)\n test_header = match.group(2).strip()\n details = match.group(3).strip()\n full_block = f"{failure_type}: {test_header}\\n{details}"\n # Avoid adding essentially duplicate or overly broad captures if specific ones exist\n if not any(f.startswith(f"{failure_type}: {test_header}") for f in failures): \n failures.append(full_block)\n \n if failures: # Check if fallback added anything\n self.logger.info(f"Parsed {len(failures)} failure blocks using general fallback regex.")\n return failures\n\n # Last resort: if specific "FAILURES!!!" section is found, often this contains a summary.\n # This might be too broad or not structured enough, but better than nothing.\n if "FAILURES!!!" in log_content or "SUMMARY OF FAILURES" in log_content:\n summary_marker = "FAILURES!!!" if "FAILURES!!!" in log_content else "SUMMARY OF FAILURES"\n start_index = log_content.find(summary_marker)\n if start_index != -1:\n # Try to find a reasonable end for this summary block\n end_pattern = re.compile(r"Ran \\d+ tests? in [\\d\\.]+s|\\n-{70,}")\n end_match = end_pattern.search(log_content, start_index)\n end_index = end_match.start() if end_match else len(log_content)\n failure_summary_block = log_content[start_index:end_index].strip()\n if failure_summary_block:\n failures.append(f"FAILURE SUMMARY BLOCK:\\n{failure_summary_block}")\n self.logger.info("Captured a general failure summary block.")\n return failures\n\n if not failures:\n self.logger.info("No specific unittest failure blocks parsed with available patterns.")\n \n return failures\n\n\n# --- Example Usage (Illustrative) ---\nif __name__ == "__main__":\n # This example assumes you have GITHUB_TOKEN environment variable set\n # And that \'requests\' is installed.\n # Replace with your actual repo owner, name, and PR number.\n pr_number = 206 # Example PR\n repo_owner = "bucolucas" # Example owner\n repo_name = "cyclop" # Example repo\n\n # Setup basic logging for the example\n # In a real app, logger would be configured externally\n logging.basicConfig(level=logging.INFO, format=\'%(asctime)s - %(name)s - %(levelname)s - %(message)s\')\n example_logger = logging.getLogger("GitHubCIHelperExample")\n\n\n # Pass the logger to the helper\n helper = GitHubCIHelper(repo_owner, repo_name, logger_instance=example_logger)\n\n example_logger.info(f"Looking for failed runs for PR #{pr_number} in {repo_owner}/{repo_name}")\n failed_run = helper.get_latest_failed_run_for_pr(pull_request_number=pr_number, workflow_name="Python CI")\n\n if failed_run:\n example_logger.info(f"Found failed run: ID {failed_run[\'id\']}, Status {failed_run[\'conclusion\']}")\n example_logger.info(f"Attempting to download logs for job \'test\' in run {failed_run[\'id\']}...")\n \n log_content = helper.get_job_logs_for_run(run_id=failed_run[\'id\'], job_name="test")\n\n if isinstance(log_content, str) and not log_content.startswith("Error") and not log_content.startswith("Job") and not log_content.startswith("Failed"):\n example_logger.info(f"Successfully downloaded logs (length: {len(log_content)} characters).")\n \n example_logger.info("\\n--- Parsing unittest failures ---")\n failures = helper.parse_unittest_failures_from_log(log_content)\n if failures:\n for i, failure_details in enumerate(failures):\n print(f"\\nFailure {i+1}:\\n{failure_details}")\n else:\n print("No specific unittest failures parsed by the tool.")\n # Consider logging the beginning of the log if parsing fails, for debugging the regexes\n # print(f"Log start:\\n{log_content[:2000]}") \n elif log_content is None:\n example_logger.error("Could not retrieve log content (returned None).")\n else: # If it\'s an error message string from the function itself\n example_logger.error(f"Failed to get/process logs: {log_content}")\n\n else:\n example_logger.info(f"No failed \'Python CI\' workflow run found for PR #{pr_number} or the PR doesn\'t exist/no runs yet.")\n\n \ No newline at end of file From fdec4bac8e5339c07394c223d66a3345b33488c1 Mon Sep 17 00:00:00 2001 From: bucolucas Date: Mon, 2 Jun 2025 18:38:03 -0500 Subject: [PATCH 2/3] Fixed newlines in github_ci_tool.py --- tools/github_ci_tool.py | 424 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 423 insertions(+), 1 deletion(-) diff --git a/tools/github_ci_tool.py b/tools/github_ci_tool.py index e1154e4..8e022d9 100644 --- a/tools/github_ci_tool.py +++ b/tools/github_ci_tool.py @@ -1 +1,423 @@ -import requests\nimport os\nimport zipfile\nimport io\nimport re\nimport logging\nfrom .base_tool import BaseTool # Added\nfrom .metrics import metrics # Added\n\n# Configure logging for the tool - This will be handled by the logger instance now\n# logger = logging.getLogger(__name__) # Commented out or removed\n\nclass GitHubCIHelper(BaseTool): # Inherits from BaseTool\n """\n A helper class to interact with GitHub Actions CI,\n specifically for fetching and analyzing test logs.\n """\n def __init__(self, repo_owner: str, repo_name: str, github_token: str = None, session=None, logger_instance=None): # Added session and logger_instance\n """\n Initializes the GitHubCIHelper.\n\n Args:\n repo_owner (str): The owner of the GitHub repository (e.g., \'\'\'bucolucas\'\'\').\n repo_name (str): The name of the GitHub repository (e.g., \'\'\'cyclop\'\'\').\n github_token (str, optional): A GitHub Personal Access Token (PAT)\n for API authentication. Recommended for\n private repos or higher rate limits.\n Can also be set via GITHUB_TOKEN env var.\n session (requests.Session, optional): An external requests session to use.\n logger_instance (logging.Logger, optional): An external logger instance.\n """\n self.repo_owner = repo_owner\n self.repo_name = repo_name\n self.base_url = f"https://api.github.com/repos/{self.repo_owner}/{self.repo_name}"\n self._token = github_token or os.environ.get(\'GITHUB_TOKEN\') # Renamed to _token for consistency\n \n self.headers = {\n "Accept": "application/vnd.github.v3+json"\n }\n if self._token: # Use self._token\n self.headers["Authorization"] = f"token {self._token}"\n\n if session:\n self.session = session\n else:\n self.session = requests.Session()\n # Headers are applied per-request in _make_request or directly where self.session is used if needed globally for session\n\n self.logger = logger_instance if logger_instance else logging.getLogger(__name__)\n if not self.logger.handlers:\n self.logger.addHandler(logging.NullHandler())\n\n def get_functions(self):\n return [\n {\n "type": "function",\n "function": {\n "name": "get_pr_workflow_runs",\n "description": "Gets all workflow runs associated with a specific pull request.",\n "parameters": {\n "type": "object",\n "properties": {\n "pull_request_number": {"type": "integer", "description": "The number of the pull request."}\n },\n "required": ["pull_request_number"]\n }\n }\n },\n {\n "type": "function",\n "function": {\n "name": "get_latest_failed_run_for_pr",\n "description": "Gets the latest failed workflow run for a specific pull request and workflow name.",\n "parameters": {\n "type": "object",\n "properties": {\n "pull_request_number": {"type": "integer", "description": "The number of the pull request."},\n "workflow_name": {"type": "string", "description": "The display name of the workflow (e.g., \'\'\'Python CI\'\'\').", "default": "Python CI"}\n },\n "required": ["pull_request_number"]\n }\n }\n },\n {\n "type": "function",\n "function": {\n "name": "get_job_logs_for_run",\n "description": "Downloads and returns the logs for a specific job within a workflow run.",\n "parameters": {\n "type": "object",\n "properties": {\n "run_id": {"type": "integer", "description": "The ID of the workflow run."},\n "job_name": {"type": "string", "description": "The name of the job (e.g., \'\'\'test\'\'\').", "default": "test"}\n },\n "required": ["run_id"]\n }\n }\n },\n {\n "type": "function",\n "function": {\n "name": "parse_unittest_failures_from_log",\n "description": "Parses unittest failure details from log content. This is a basic parser and might need adjustments based on specific log formats.",\n "parameters": {\n "type": "object",\n "properties": {\n "log_content": {"type": "string", "description": "The string content of the job log."}\n },\n "required": ["log_content"]\n }\n }\n }\n ]\n\n @metrics.measure\n def execute(self, function_name, **kwargs):\n self.logger.info(f"Executing GitHub CI Helper function: {function_name} with args: {kwargs}")\n # Dispatch to the appropriate public method\n if hasattr(self, function_name) and callable(getattr(self, function_name)) and not function_name.startswith("_"):\n method = getattr(self, function_name)\n try:\n return method(**kwargs)\n except Exception as e:\n self.logger.error(f"Error executing {function_name}: {e}", exc_info=True)\n return f"Error during {function_name} execution: {str(e)}"\n else:\n error_message = f"Unknown or private function: {function_name}"\n self.logger.error(error_message)\n return error_message\n \n def clear(self):\n """Clears any sensitive state if necessary. For this tool, it\'s a no-op but present for interface consistency."""\n self.logger.info("GitHubCIHelper state cleared (no specific state to clear).")\n\n\n @metrics.measure\n def _make_request(self, method: str, url: str, **kwargs): # Added @metrics.measure\n """Helper function for making HTTP requests."""\n try:\n # Use self.session instead of requests directly\n response = self.session.request(method, url, headers=self.headers, **kwargs)\n response.raise_for_status()\n if response.content and response.headers.get(\'Content-Type\', \'\').startswith(\'application/json\'):\n return response.json()\n elif response.content: # For non-JSON content like zip files or plain text logs\n return response \n return None\n except requests.exceptions.HTTPError as e:\n self.logger.error(f"HTTP error occurred: {e} - {e.response.text if e.response else \'No response text\'}") # Use self.logger\n raise\n except requests.exceptions.RequestException as e:\n self.logger.error(f"Request failed: {e}") # Use self.logger\n raise\n\n @metrics.measure\n def get_pr_workflow_runs(self, pull_request_number: int): # Added @metrics.measure\n """\n Gets all workflow runs associated with a specific pull request.\n """\n self.logger.info(f"Getting workflow runs for PR #{pull_request_number}")\n pr_url = f"{self.base_url}/pulls/{pull_request_number}"\n try:\n pr_response = self._make_request("GET", pr_url) # this returns a response object or parsed JSON\n pr_data = pr_response if isinstance(pr_response, dict) else pr_response.json() # Ensure pr_data is dict\n\n if not pr_data or \'head\' not in pr_data or \'sha\' not in pr_data[\'head\']:\n self.logger.error(f"Could not get head SHA for PR {pull_request_number}. Response: {pr_data}")\n return None\n head_sha = pr_data[\'head\'][\'sha\']\n\n runs_url = f"{self.base_url}/actions/runs?event=pull_request&head_sha={head_sha}"\n runs_response = self._make_request("GET", runs_url)\n runs_data = runs_response if isinstance(runs_response, dict) else runs_response.json()\n\n return runs_data.get("workflow_runs") if runs_data else None\n except Exception as e:\n self.logger.error(f"Failed to get PR workflow runs for PR {pull_request_number}: {e}", exc_info=True)\n return None\n\n\n @metrics.measure\n def get_latest_failed_run_for_pr(self, pull_request_number: int, workflow_name: str = "Python CI"): # Added @metrics.measure\n """\n Gets the latest failed workflow run for a specific pull request and workflow name.\n """\n self.logger.info(f"Getting latest failed run for PR #{pull_request_number}, workflow: \'{workflow_name}\'")\n runs = self.get_pr_workflow_runs(pull_request_number)\n if not runs:\n self.logger.info(f"No runs found for PR #{pull_request_number} to check for failures.")\n return None\n\n for run in sorted(runs, key=lambda r: r[\'created_at\'], reverse=True):\n if run[\'name\'] == workflow_name and run[\'conclusion\'] == \'failure\':\n self.logger.info(f"Found failed run {run[\'id\']} for workflow \'{workflow_name}\' in PR #{pull_request_number}")\n return run\n self.logger.info(f"No failed run for workflow \'{workflow_name}\' found for PR #{pull_request_number}") # Use self.logger\n return None\n\n @metrics.measure\n def get_job_logs_for_run(self, run_id: int, job_name: str = "test"): # Added @metrics.measure\n """\n Downloads and returns the logs for a specific job within a workflow run.\n """\n self.logger.info(f"Getting job logs for run ID {run_id}, job name \'{job_name}\'")\n jobs_url = f"{self.base_url}/actions/runs/{run_id}/jobs"\n target_job = None # Initialize target_job here to ensure it\'s defined for later logging\n try:\n jobs_response = self._make_request("GET", jobs_url)\n jobs_data = jobs_response if isinstance(jobs_response, dict) else jobs_response.json()\n\n if not jobs_data or "jobs" not in jobs_data:\n self.logger.error(f"Could not retrieve jobs for run ID {run_id}. Response: {jobs_data}")\n return None\n\n for job in jobs_data["jobs"]:\n if job["name"] == job_name:\n target_job = job\n break\n\n if not target_job:\n self.logger.error(f"Job \'{job_name}\' not found in run ID {run_id}")\n return None\n\n if target_job[\'status\'] != \'completed\':\n self.logger.info(f"Job \'{job_name}\' in run ID {run_id} has not completed. Status: {target_job[\'status\']}")\n return f"Job \'{job_name}\' not yet completed (status: {target_job[\'status\']}). Logs may be unavailable."\n\n\n logs_url = f"{self.base_url}/actions/jobs/{target_job[\'id\']}/logs"\n self.logger.info(f"Attempting to download logs from: {logs_url}")\n \n log_response = self.session.get(logs_url, headers=self.headers, allow_redirects=True, stream=True)\n log_response.raise_for_status()\n\n if \'application/zip\' in log_response.headers.get(\'Content-Type\', \'\'):\n self.logger.info(f"Received zip file for logs of job ID {target_job[\'id\']}.")\n with zipfile.ZipFile(io.BytesIO(log_response.content)) as zf:\n log_file_names = [name for name in zf.namelist() if not name.endswith(\'/\')]\n if not log_file_names:\n self.logger.error(f"No files found in the downloaded log zip for job ID {target_job[\'id\']}.")\n return None\n \n actual_log_file_name = log_file_names[0]\n for name in log_file_names:\n # Improved heuristic for log file name\n if job_name in name or "test" in name.lower() or "log" in name.lower() or "out" in name.lower() or "step" in name:\n actual_log_file_name = name\n break\n \n self.logger.info(f"Extracting log file: {actual_log_file_name} from zip for job ID {target_job[\'id\']}.")\n with zf.open(actual_log_file_name) as log_file:\n return log_file.read().decode(\'utf-8\')\n else:\n self.logger.info(f"Received plain text logs for job ID {target_job[\'id\']}.")\n return log_response.text\n\n except requests.exceptions.HTTPError as e:\n self.logger.error(f"HTTP error downloading logs for job ID {target_job.get(\'id\', \'unknown\') if target_job else \'unknown\'}: {e} - {e.response.text if e.response else \'No response text\'}", exc_info=True)\n if e.response and e.response.status_code == 404:\n self.logger.error("Log download URL might be invalid or logs expired.")\n return f"Error downloading logs: {e}"\n except requests.exceptions.RequestException as e:\n self.logger.error(f"Request failed downloading logs for job ID {target_job.get(\'id\', \'unknown\') if target_job else \'unknown\'}: {e}", exc_info=True)\n return f"Error during log download request: {e}"\n except zipfile.BadZipFile:\n self.logger.error(f"Failed to unzip logs for job ID {target_job.get(\'id\', \'unknown\') if target_job else \'unknown\'}.", exc_info=True)\n # Adding response text for BadZipFile can be risky if it's large binary data.\n # Consider logging only a snippet or specific headers if this occurs frequently.\n return "Failed to unzip logs."\n except Exception as e:\n self.logger.error(f"An unexpected error occurred while processing logs for job {target_job.get(\'id\', \'unknown\') if target_job else \'unknown\'}: {e}", exc_info=True)\n return f"Unexpected error processing logs: {e}"\n\n\n @metrics.measure\n def parse_unittest_failures_from_log(self, log_content: str): # Added @metrics.measure\n """\n Parses unittest failure details from log content.\n """\n if not log_content:\n self.logger.info("Log content is empty, no failures to parse.")\n return []\n\n self.logger.info(f"Parsing unittest failures from log content (length: {len(log_content)}).")\n \n # Regex to capture standard unittest failure blocks\n # It looks for "FAIL:" or "ERROR:", the test name (e.g., test_my_method), the class (e.g., my_module.MyTestClass),\n # then a line of hyphens, "Traceback (most recent call last):", and the traceback details.\n # It stops before the next failure block or common summary lines.\n failure_pattern = re.compile(\n r"^(FAIL|ERROR): (.*?)\s*\\((.*?)\\)\s*\\n-{5,}\\s*\\nTraceback \\(most recent call last\\):\\s*\\n(.*?)(?=\\n(?:FAIL:|ERROR:)|\\n-{5,}\\s*\\nRan \\d+ tests? in|\\Z)",\n re.DOTALL | re.MULTILINE\n )\n \n failures = []\n for match in failure_pattern.finditer(log_content):\n failure_type = match.group(1) # FAIL or ERROR\n test_name = match.group(2).strip() # e.g., test_specific_behavior\n test_module_class = match.group(3).strip() # e.g., tests.test_module.TestMyFeature\n traceback_details = match.group(4).strip() # The actual traceback\n \n # Reconstruct a readable failure block\n failure_block = (\n f"{failure_type}: {test_name} ({test_module_class})\\n"\n f"---------------------\\n"\n f"Traceback (most recent call last):\\n"\n f"{traceback_details}"\n )\n failures.append(failure_block)\n\n if failures:\n self.logger.info(f"Parsed {len(failures)} failure blocks using primary regex.")\n return failures\n\n # Fallback: A more general pattern if the above doesn't match (e.g., due to slight variations in formatting)\n # This looks for "FAIL:" or "ERROR:", a line for the test name, then captures content until common separators.\n general_failure_pattern = re.compile(\n r"^(FAIL|ERROR): ([^\\n]+)\\n(.*?)(?=\\n(?:FAIL:|ERROR:)|\\n-{20,}\\n|Ran \\d+ tests? in|\\Z)",\n re.DOTALL | re.MULTILINE\n )\n for match in general_failure_pattern.finditer(log_content):\n failure_type = match.group(1)\n test_header = match.group(2).strip()\n details = match.group(3).strip()\n full_block = f"{failure_type}: {test_header}\\n{details}"\n # Avoid adding essentially duplicate or overly broad captures if specific ones exist\n if not any(f.startswith(f"{failure_type}: {test_header}") for f in failures): \n failures.append(full_block)\n \n if failures: # Check if fallback added anything\n self.logger.info(f"Parsed {len(failures)} failure blocks using general fallback regex.")\n return failures\n\n # Last resort: if specific "FAILURES!!!" section is found, often this contains a summary.\n # This might be too broad or not structured enough, but better than nothing.\n if "FAILURES!!!" in log_content or "SUMMARY OF FAILURES" in log_content:\n summary_marker = "FAILURES!!!" if "FAILURES!!!" in log_content else "SUMMARY OF FAILURES"\n start_index = log_content.find(summary_marker)\n if start_index != -1:\n # Try to find a reasonable end for this summary block\n end_pattern = re.compile(r"Ran \\d+ tests? in [\\d\\.]+s|\\n-{70,}")\n end_match = end_pattern.search(log_content, start_index)\n end_index = end_match.start() if end_match else len(log_content)\n failure_summary_block = log_content[start_index:end_index].strip()\n if failure_summary_block:\n failures.append(f"FAILURE SUMMARY BLOCK:\\n{failure_summary_block}")\n self.logger.info("Captured a general failure summary block.")\n return failures\n\n if not failures:\n self.logger.info("No specific unittest failure blocks parsed with available patterns.")\n \n return failures\n\n\n# --- Example Usage (Illustrative) ---\nif __name__ == "__main__":\n # This example assumes you have GITHUB_TOKEN environment variable set\n # And that \'requests\' is installed.\n # Replace with your actual repo owner, name, and PR number.\n pr_number = 206 # Example PR\n repo_owner = "bucolucas" # Example owner\n repo_name = "cyclop" # Example repo\n\n # Setup basic logging for the example\n # In a real app, logger would be configured externally\n logging.basicConfig(level=logging.INFO, format=\'%(asctime)s - %(name)s - %(levelname)s - %(message)s\')\n example_logger = logging.getLogger("GitHubCIHelperExample")\n\n\n # Pass the logger to the helper\n helper = GitHubCIHelper(repo_owner, repo_name, logger_instance=example_logger)\n\n example_logger.info(f"Looking for failed runs for PR #{pr_number} in {repo_owner}/{repo_name}")\n failed_run = helper.get_latest_failed_run_for_pr(pull_request_number=pr_number, workflow_name="Python CI")\n\n if failed_run:\n example_logger.info(f"Found failed run: ID {failed_run[\'id\']}, Status {failed_run[\'conclusion\']}")\n example_logger.info(f"Attempting to download logs for job \'test\' in run {failed_run[\'id\']}...")\n \n log_content = helper.get_job_logs_for_run(run_id=failed_run[\'id\'], job_name="test")\n\n if isinstance(log_content, str) and not log_content.startswith("Error") and not log_content.startswith("Job") and not log_content.startswith("Failed"):\n example_logger.info(f"Successfully downloaded logs (length: {len(log_content)} characters).")\n \n example_logger.info("\\n--- Parsing unittest failures ---")\n failures = helper.parse_unittest_failures_from_log(log_content)\n if failures:\n for i, failure_details in enumerate(failures):\n print(f"\\nFailure {i+1}:\\n{failure_details}")\n else:\n print("No specific unittest failures parsed by the tool.")\n # Consider logging the beginning of the log if parsing fails, for debugging the regexes\n # print(f"Log start:\\n{log_content[:2000]}") \n elif log_content is None:\n example_logger.error("Could not retrieve log content (returned None).")\n else: # If it\'s an error message string from the function itself\n example_logger.error(f"Failed to get/process logs: {log_content}")\n\n else:\n example_logger.info(f"No failed \'Python CI\' workflow run found for PR #{pr_number} or the PR doesn\'t exist/no runs yet.")\n\n \ No newline at end of file +import requests +import os +import zipfile +import io +import re +import logging +from .base_tool import BaseTool # Added +from .metrics import metrics # Added + +# Configure logging for the tool - This will be handled by the logger instance now +# logger = logging.getLogger(__name__) # Commented out or removed + +class GitHubCIHelper(BaseTool): # Inherits from BaseTool + """ + A helper class to interact with GitHub Actions CI, + specifically for fetching and analyzing test logs. + """ + def __init__(self, repo_owner: str, repo_name: str, github_token: str = None, session=None, logger_instance=None): # Added session and logger_instance + """ + Initializes the GitHubCIHelper. + + Args: + repo_owner (str): The owner of the GitHub repository (e.g., \'\'\'bucolucas\'\'\'). + repo_name (str): The name of the GitHub repository (e.g., \'\'\'cyclop\'\'\'). + github_token (str, optional): A GitHub Personal Access Token (PAT) + for API authentication. Recommended for + private repos or higher rate limits. + Can also be set via GITHUB_TOKEN env var. + session (requests.Session, optional): An external requests session to use. + logger_instance (logging.Logger, optional): An external logger instance. + """ + self.repo_owner = repo_owner + self.repo_name = repo_name + self.base_url = f"https://api.github.com/repos/{self.repo_owner}/{self.repo_name}" + self._token = github_token or os.environ.get(\'GITHUB_TOKEN\') # Renamed to _token for consistency + + self.headers = { + "Accept": "application/vnd.github.v3+json" + } + if self._token: # Use self._token + self.headers["Authorization"] = f"token {self._token}" + + if session: + self.session = session + else: + self.session = requests.Session() + # Headers are applied per-request in _make_request or directly where self.session is used if needed globally for session + + self.logger = logger_instance if logger_instance else logging.getLogger(__name__) + if not self.logger.handlers: + self.logger.addHandler(logging.NullHandler()) + + def get_functions(self): + return [ + { + "type": "function", + "function": { + "name": "get_pr_workflow_runs", + "description": "Gets all workflow runs associated with a specific pull request.", + "parameters": { + "type": "object", + "properties": { + "pull_request_number": {"type": "integer", "description": "The number of the pull request."} + }, + "required": ["pull_request_number"] + } + } + }, + { + "type": "function", + "function": { + "name": "get_latest_failed_run_for_pr", + "description": "Gets the latest failed workflow run for a specific pull request and workflow name.", + "parameters": { + "type": "object", + "properties": { + "pull_request_number": {"type": "integer", "description": "The number of the pull request."}, + "workflow_name": {"type": "string", "description": "The display name of the workflow (e.g., \'\'\'Python CI\'\'\').", "default": "Python CI"} + }, + "required": ["pull_request_number"] + } + } + }, + { + "type": "function", + "function": { + "name": "get_job_logs_for_run", + "description": "Downloads and returns the logs for a specific job within a workflow run.", + "parameters": { + "type": "object", + "properties": { + "run_id": {"type": "integer", "description": "The ID of the workflow run."}, + "job_name": {"type": "string", "description": "The name of the job (e.g., \'\'\'test\'\'\').", "default": "test"} + }, + "required": ["run_id"] + } + } + }, + { + "type": "function", + "function": { + "name": "parse_unittest_failures_from_log", + "description": "Parses unittest failure details from log content. This is a basic parser and might need adjustments based on specific log formats.", + "parameters": { + "type": "object", + "properties": { + "log_content": {"type": "string", "description": "The string content of the job log."} + }, + "required": ["log_content"] + } + } + } + ] + + @metrics.measure + def execute(self, function_name, **kwargs): + self.logger.info(f"Executing GitHub CI Helper function: {function_name} with args: {kwargs}") + # Dispatch to the appropriate public method + if hasattr(self, function_name) and callable(getattr(self, function_name)) and not function_name.startswith("_"): + method = getattr(self, function_name) + try: + return method(**kwargs) + except Exception as e: + self.logger.error(f"Error executing {function_name}: {e}", exc_info=True) + return f"Error during {function_name} execution: {str(e)}" + else: + error_message = f"Unknown or private function: {function_name}" + self.logger.error(error_message) + return error_message + + def clear(self): + """Clears any sensitive state if necessary. For this tool, it\'s a no-op but present for interface consistency.""" + self.logger.info("GitHubCIHelper state cleared (no specific state to clear).") + + + @metrics.measure + def _make_request(self, method: str, url: str, **kwargs): # Added @metrics.measure + """Helper function for making HTTP requests.""" + try: + # Use self.session instead of requests directly + response = self.session.request(method, url, headers=self.headers, **kwargs) + response.raise_for_status() + if response.content and response.headers.get(\'Content-Type\', \'\').startswith(\'application/json\'): + return response.json() + elif response.content: # For non-JSON content like zip files or plain text logs + return response + return None + except requests.exceptions.HTTPError as e: + self.logger.error(f"HTTP error occurred: {e} - {e.response.text if e.response else \'No response text\'}") # Use self.logger + raise + except requests.exceptions.RequestException as e: + self.logger.error(f"Request failed: {e}") # Use self.logger + raise + + @metrics.measure + def get_pr_workflow_runs(self, pull_request_number: int): # Added @metrics.measure + """ + Gets all workflow runs associated with a specific pull request. + """ + self.logger.info(f"Getting workflow runs for PR #{pull_request_number}") + pr_url = f"{self.base_url}/pulls/{pull_request_number}" + try: + pr_response = self._make_request("GET", pr_url) # this returns a response object or parsed JSON + pr_data = pr_response if isinstance(pr_response, dict) else pr_response.json() # Ensure pr_data is dict + + if not pr_data or \'head\' not in pr_data or \'sha\' not in pr_data[\'head\']: + self.logger.error(f"Could not get head SHA for PR {pull_request_number}. Response: {pr_data}") + return None + head_sha = pr_data[\'head\'][\'sha\'] + + runs_url = f"{self.base_url}/actions/runs?event=pull_request&head_sha={head_sha}" + runs_response = self._make_request("GET", runs_url) + runs_data = runs_response if isinstance(runs_response, dict) else runs_response.json() + + return runs_data.get("workflow_runs") if runs_data else None + except Exception as e: + self.logger.error(f"Failed to get PR workflow runs for PR {pull_request_number}: {e}", exc_info=True) + return None + + + @metrics.measure + def get_latest_failed_run_for_pr(self, pull_request_number: int, workflow_name: str = "Python CI"): # Added @metrics.measure + """ + Gets the latest failed workflow run for a specific pull request and workflow name. + """ + self.logger.info(f"Getting latest failed run for PR #{pull_request_number}, workflow: \'{workflow_name}\'") + runs = self.get_pr_workflow_runs(pull_request_number) + if not runs: + self.logger.info(f"No runs found for PR #{pull_request_number} to check for failures.") + return None + + for run in sorted(runs, key=lambda r: r[\'created_at\'], reverse=True): + if run[\'name\'] == workflow_name and run[\'conclusion\'] == \'failure\': + self.logger.info(f"Found failed run {run[\'id\']} for workflow \'{workflow_name}\' in PR #{pull_request_number}") + return run + self.logger.info(f"No failed run for workflow \'{workflow_name}\' found for PR #{pull_request_number}") # Use self.logger + return None + + @metrics.measure + def get_job_logs_for_run(self, run_id: int, job_name: str = "test"): # Added @metrics.measure + """ + Downloads and returns the logs for a specific job within a workflow run. + """ + self.logger.info(f"Getting job logs for run ID {run_id}, job name \'{job_name}\'") + jobs_url = f"{self.base_url}/actions/runs/{run_id}/jobs" + target_job = None # Initialize target_job here to ensure it\'s defined for later logging + try: + jobs_response = self._make_request("GET", jobs_url) + jobs_data = jobs_response if isinstance(jobs_response, dict) else jobs_response.json() + + if not jobs_data or "jobs" not in jobs_data: + self.logger.error(f"Could not retrieve jobs for run ID {run_id}. Response: {jobs_data}") + return None + + for job in jobs_data["jobs"]: + if job["name"] == job_name: + target_job = job + break + + if not target_job: + self.logger.error(f"Job \'{job_name}\' not found in run ID {run_id}") + return None + + if target_job[\'status\'] != \'completed\': + self.logger.info(f"Job \'{job_name}\' in run ID {run_id} has not completed. Status: {target_job[\'status\']}") + return f"Job \'{job_name}\' not yet completed (status: {target_job[\'status\']}). Logs may be unavailable." + + + logs_url = f"{self.base_url}/actions/jobs/{target_job[\'id\']}/logs" + self.logger.info(f"Attempting to download logs from: {logs_url}") + + log_response = self.session.get(logs_url, headers=self.headers, allow_redirects=True, stream=True) + log_response.raise_for_status() + + if \'application/zip\' in log_response.headers.get(\'Content-Type\', \'\'): + self.logger.info(f"Received zip file for logs of job ID {target_job[\'id\']}.") + with zipfile.ZipFile(io.BytesIO(log_response.content)) as zf: + log_file_names = [name for name in zf.namelist() if not name.endswith(\'/\')] + if not log_file_names: + self.logger.error(f"No files found in the downloaded log zip for job ID {target_job[\'id\']}.") + return None + + actual_log_file_name = log_file_names[0] + for name in log_file_names: + # Improved heuristic for log file name + if job_name in name or "test" in name.lower() or "log" in name.lower() or "out" in name.lower() or "step" in name: + actual_log_file_name = name + break + + self.logger.info(f"Extracting log file: {actual_log_file_name} from zip for job ID {target_job[\'id\']}.") + with zf.open(actual_log_file_name) as log_file: + return log_file.read().decode(\'utf-8\') + else: + self.logger.info(f"Received plain text logs for job ID {target_job[\'id\']}.") + return log_response.text + + except requests.exceptions.HTTPError as e: + self.logger.error(f"HTTP error downloading logs for job ID {target_job.get(\'id\', \'unknown\') if target_job else \'unknown\'}: {e} - {e.response.text if e.response else \'No response text\'}", exc_info=True) + if e.response and e.response.status_code == 404: + self.logger.error("Log download URL might be invalid or logs expired.") + return f"Error downloading logs: {e}" + except requests.exceptions.RequestException as e: + self.logger.error(f"Request failed downloading logs for job ID {target_job.get(\'id\', \'unknown\') if target_job else \'unknown\'}: {e}", exc_info=True) + return f"Error during log download request: {e}" + except zipfile.BadZipFile: + self.logger.error(f"Failed to unzip logs for job ID {target_job.get(\'id\', \'unknown\') if target_job else \'unknown\'}.", exc_info=True) + # Adding response text for BadZipFile can be risky if it's large binary data. + # Consider logging only a snippet or specific headers if this occurs frequently. + return "Failed to unzip logs." + except Exception as e: + self.logger.error(f"An unexpected error occurred while processing logs for job {target_job.get(\'id\', \'unknown\') if target_job else \'unknown\'}: {e}", exc_info=True) + return f"Unexpected error processing logs: {e}" + + + @metrics.measure + def parse_unittest_failures_from_log(self, log_content: str): # Added @metrics.measure + """ + Parses unittest failure details from log content. + """ + if not log_content: + self.logger.info("Log content is empty, no failures to parse.") + return [] + + self.logger.info(f"Parsing unittest failures from log content (length: {len(log_content)}).") + + # Regex to capture standard unittest failure blocks + # It looks for "FAIL:" or "ERROR:", the test name (e.g., test_my_method), the class (e.g., my_module.MyTestClass), + # then a line of hyphens, "Traceback (most recent call last):", and the traceback details. + # It stops before the next failure block or common summary lines. + failure_pattern = re.compile( + r"^(FAIL|ERROR): (.*?)\s*\\((.*?)\\)\s*\ +-{5,}\\s*\ +Traceback \\(most recent call last\\):\\s*\ +(.*?)(?=\ +(?:FAIL:|ERROR:)|\ +-{5,}\\s*\ +Ran \\d+ tests? in|\\Z)", + re.DOTALL | re.MULTILINE + ) + + failures = [] + for match in failure_pattern.finditer(log_content): + failure_type = match.group(1) # FAIL or ERROR + test_name = match.group(2).strip() # e.g., test_specific_behavior + test_module_class = match.group(3).strip() # e.g., tests.test_module.TestMyFeature + traceback_details = match.group(4).strip() # The actual traceback + + # Reconstruct a readable failure block + failure_block = ( + f"{failure_type}: {test_name} ({test_module_class})\ +" + f"---------------------\ +" + f"Traceback (most recent call last):\ +" + f"{traceback_details}" + ) + failures.append(failure_block) + + if failures: + self.logger.info(f"Parsed {len(failures)} failure blocks using primary regex.") + return failures + + # Fallback: A more general pattern if the above doesn't match (e.g., due to slight variations in formatting) + # This looks for "FAIL:" or "ERROR:", a line for the test name, then captures content until common separators. + general_failure_pattern = re.compile( + r"^(FAIL|ERROR): ([^\ +]+)\ +(.*?)(?=\ +(?:FAIL:|ERROR:)|\ +-{20,}\ +|Ran \\d+ tests? in|\\Z)", + re.DOTALL | re.MULTILINE + ) + for match in general_failure_pattern.finditer(log_content): + failure_type = match.group(1) + test_header = match.group(2).strip() + details = match.group(3).strip() + full_block = f"{failure_type}: {test_header}\ +{details}" + # Avoid adding essentially duplicate or overly broad captures if specific ones exist + if not any(f.startswith(f"{failure_type}: {test_header}") for f in failures): + failures.append(full_block) + + if failures: # Check if fallback added anything + self.logger.info(f"Parsed {len(failures)} failure blocks using general fallback regex.") + return failures + + # Last resort: if specific "FAILURES!!!" section is found, often this contains a summary. + # This might be too broad or not structured enough, but better than nothing. + if "FAILURES!!!" in log_content or "SUMMARY OF FAILURES" in log_content: + summary_marker = "FAILURES!!!" if "FAILURES!!!" in log_content else "SUMMARY OF FAILURES" + start_index = log_content.find(summary_marker) + if start_index != -1: + # Try to find a reasonable end for this summary block + end_pattern = re.compile(r"Ran \\d+ tests? in [\\d\\.]+s|\ +-{70,}") + end_match = end_pattern.search(log_content, start_index) + end_index = end_match.start() if end_match else len(log_content) + failure_summary_block = log_content[start_index:end_index].strip() + if failure_summary_block: + failures.append(f"FAILURE SUMMARY BLOCK:\ +{failure_summary_block}") + self.logger.info("Captured a general failure summary block.") + return failures + + if not failures: + self.logger.info("No specific unittest failure blocks parsed with available patterns.") + + return failures + + +# --- Example Usage (Illustrative) --- +if __name__ == "__main__": + # This example assumes you have GITHUB_TOKEN environment variable set + # And that \'requests\' is installed. + # Replace with your actual repo owner, name, and PR number. + pr_number = 206 # Example PR + repo_owner = "bucolucas" # Example owner + repo_name = "cyclop" # Example repo + + # Setup basic logging for the example + # In a real app, logger would be configured externally + logging.basicConfig(level=logging.INFO, format=\'%(asctime)s - %(name)s - %(levelname)s - %(message)s\') + example_logger = logging.getLogger("GitHubCIHelperExample") + + + # Pass the logger to the helper + helper = GitHubCIHelper(repo_owner, repo_name, logger_instance=example_logger) + + example_logger.info(f"Looking for failed runs for PR #{pr_number} in {repo_owner}/{repo_name}") + failed_run = helper.get_latest_failed_run_for_pr(pull_request_number=pr_number, workflow_name="Python CI") + + if failed_run: + example_logger.info(f"Found failed run: ID {failed_run[\'id\']}, Status {failed_run[\'conclusion\']}") + example_logger.info(f"Attempting to download logs for job \'test\' in run {failed_run[\'id\']}...") + + log_content = helper.get_job_logs_for_run(run_id=failed_run[\'id\'], job_name="test") + + if isinstance(log_content, str) and not log_content.startswith("Error") and not log_content.startswith("Job") and not log_content.startswith("Failed"): + example_logger.info(f"Successfully downloaded logs (length: {len(log_content)} characters).") + + example_logger.info("\ +--- Parsing unittest failures ---") + failures = helper.parse_unittest_failures_from_log(log_content) + if failures: + for i, failure_details in enumerate(failures): + print(f"\ +Failure {i+1}:\ +{failure_details}") + else: + print("No specific unittest failures parsed by the tool.") + # Consider logging the beginning of the log if parsing fails, for debugging the regexes + # print(f"Log start:\ +{log_content[:2000]}") + elif log_content is None: + example_logger.error("Could not retrieve log content (returned None).") + else: # If it\'s an error message string from the function itself + example_logger.error(f"Failed to get/process logs: {log_content}") + + else: + example_logger.info(f"No failed \'Python CI\' workflow run found for PR #{pr_number} or the PR doesn\'t exist/no runs yet.") + From e036d91b8f08429e0bb89c582fb5f9f575ed88df Mon Sep 17 00:00:00 2001 From: bucolucas Date: Mon, 2 Jun 2025 18:41:03 -0500 Subject: [PATCH 3/3] Fixed newlines in github_ci_tool.py --- tools/github_ci_tool.py | 43 +++++++++++------------------------------ 1 file changed, 11 insertions(+), 32 deletions(-) diff --git a/tools/github_ci_tool.py b/tools/github_ci_tool.py index 8e022d9..7173453 100644 --- a/tools/github_ci_tool.py +++ b/tools/github_ci_tool.py @@ -288,13 +288,7 @@ class GitHubCIHelper(BaseTool): # Inherits from BaseTool # then a line of hyphens, "Traceback (most recent call last):", and the traceback details. # It stops before the next failure block or common summary lines. failure_pattern = re.compile( - r"^(FAIL|ERROR): (.*?)\s*\\((.*?)\\)\s*\ --{5,}\\s*\ -Traceback \\(most recent call last\\):\\s*\ -(.*?)(?=\ -(?:FAIL:|ERROR:)|\ --{5,}\\s*\ -Ran \\d+ tests? in|\\Z)", + r"^(FAIL|ERROR): (.*?)\s*\\((.*?)\\)\s*\n-{5,}\\s*\nTraceback \\(most recent call last\\):\\s*\n(.*?)(?=\n(?:FAIL:|ERROR:)|\n-{5,}\\s*\nRan \\d+ tests? in|\\Z)", re.DOTALL | re.MULTILINE ) @@ -307,12 +301,9 @@ Ran \\d+ tests? in|\\Z)", # Reconstruct a readable failure block failure_block = ( - f"{failure_type}: {test_name} ({test_module_class})\ -" - f"---------------------\ -" - f"Traceback (most recent call last):\ -" + f"{failure_type}: {test_name} ({test_module_class})\n" + f"---------------------\n" + f"Traceback (most recent call last):\n" f"{traceback_details}" ) failures.append(failure_block) @@ -324,20 +315,14 @@ Ran \\d+ tests? in|\\Z)", # Fallback: A more general pattern if the above doesn't match (e.g., due to slight variations in formatting) # This looks for "FAIL:" or "ERROR:", a line for the test name, then captures content until common separators. general_failure_pattern = re.compile( - r"^(FAIL|ERROR): ([^\ -]+)\ -(.*?)(?=\ -(?:FAIL:|ERROR:)|\ --{20,}\ -|Ran \\d+ tests? in|\\Z)", + r"^(FAIL|ERROR): ([^\n]+)\n(.*?)(?=\n(?:FAIL:|ERROR:)|\n-{20,}\n|Ran \\d+ tests? in|\\Z)", re.DOTALL | re.MULTILINE ) for match in general_failure_pattern.finditer(log_content): failure_type = match.group(1) test_header = match.group(2).strip() details = match.group(3).strip() - full_block = f"{failure_type}: {test_header}\ -{details}" + full_block = f"{failure_type}: {test_header}\n{details}" # Avoid adding essentially duplicate or overly broad captures if specific ones exist if not any(f.startswith(f"{failure_type}: {test_header}") for f in failures): failures.append(full_block) @@ -353,14 +338,12 @@ Ran \\d+ tests? in|\\Z)", start_index = log_content.find(summary_marker) if start_index != -1: # Try to find a reasonable end for this summary block - end_pattern = re.compile(r"Ran \\d+ tests? in [\\d\\.]+s|\ --{70,}") + end_pattern = re.compile(r"Ran \\d+ tests? in [\\d\\.]+s|\n-{70,}") end_match = end_pattern.search(log_content, start_index) end_index = end_match.start() if end_match else len(log_content) failure_summary_block = log_content[start_index:end_index].strip() if failure_summary_block: - failures.append(f"FAILURE SUMMARY BLOCK:\ -{failure_summary_block}") + failures.append(f"FAILURE SUMMARY BLOCK:\n{failure_summary_block}") self.logger.info("Captured a general failure summary block.") return failures @@ -400,19 +383,15 @@ if __name__ == "__main__": if isinstance(log_content, str) and not log_content.startswith("Error") and not log_content.startswith("Job") and not log_content.startswith("Failed"): example_logger.info(f"Successfully downloaded logs (length: {len(log_content)} characters).") - example_logger.info("\ ---- Parsing unittest failures ---") + example_logger.info("\n--- Parsing unittest failures ---") failures = helper.parse_unittest_failures_from_log(log_content) if failures: for i, failure_details in enumerate(failures): - print(f"\ -Failure {i+1}:\ -{failure_details}") + print(f"\nFailure {i+1}:\n{failure_details}") else: print("No specific unittest failures parsed by the tool.") # Consider logging the beginning of the log if parsing fails, for debugging the regexes - # print(f"Log start:\ -{log_content[:2000]}") + # print(f"Log start:\n{log_content[:2000]}") elif log_content is None: example_logger.error("Could not retrieve log content (returned None).") else: # If it\'s an error message string from the function itself