diff --git a/tools/github_tool.py b/tools/github_tool.py index a14a063..7720fa8 100644 --- a/tools/github_tool.py +++ b/tools/github_tool.py @@ -7,42 +7,41 @@ import base64 import logging class GitHubTool(BaseTool): - def __init__(self): - self.base_url = "https://api.github.com" - self.token = os.environ.get("GITHUB_TOKEN") + def __init__(self, session=None, token=None, repo=None, base_url=None, initial_branch="main", logger=None): + self.base_url = base_url if base_url else "https://api.github.com" + self._token = token if token else os.environ.get("GITHUB_TOKEN") + self._repo = repo if repo else os.environ.get("GITHUB_REPOSITORY") - self.headers = { - "Authorization": f"token {self.token}", - "Accept": "application/vnd.github.v3+json" - } - self.repo = os.environ.get("GITHUB_REPOSITORY") - self.current_branch = "main" # Default to main branch + if not self._token: + # In a real scenario, might raise an error or operate in a degraded mode. + # For this tool, token is essential. + raise ValueError("GitHub token must be provided either as an argument or via GITHUB_TOKEN env var.") + if not self._repo: + raise ValueError("GitHub repository (e.g., 'owner/repo') must be provided either as an argument or via GITHUB_REPOSITORY env var.") - # Set up logging - self.logger = logging.getLogger(__name__) - self.logger.setLevel(logging.INFO) + if session: + self.session = session + else: + self.session = requests.Session() + self.session.headers.update({ + "Authorization": f"token {self._token}", + "Accept": "application/vnd.github.v3+json" + }) - # Create a file handler - file_handler = logging.FileHandler('github_tool.log') - file_handler.setLevel(logging.INFO) - - # Create a console handler - console_handler = logging.StreamHandler() - console_handler.setLevel(logging.INFO) - - # Create a formatting for the logs - formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') - file_handler.setFormatter(formatter) - console_handler.setFormatter(formatter) - - # Add the handlers to the logger - self.logger.addHandler(file_handler) - self.logger.addHandler(console_handler) + self.current_branch = initial_branch + + # Use provided logger or get a new one for the module + # The application using this tool should configure the logging handlers and formatting. + self.logger = logger if logger else logging.getLogger(__name__) + # If no handlers are configured by the application, add a NullHandler + # to prevent "No handler found" warnings if the tool logs something. + if not self.logger.handlers: + self.logger.addHandler(logging.NullHandler()) def clear(self): - if (self.current_branch != "main"): + if self.current_branch != "main": self._set_current_branch("main") - pass + self.logger.info(f"GitHubTool state cleared. Current branch is {self.current_branch}") def get_functions(self): return [ @@ -372,7 +371,7 @@ class GitHubTool(BaseTool): } } }, - { # New function definition for get_pull_request_general_comments + { "type": "function", "function": { "name": "get_pull_request_general_comments", @@ -485,7 +484,6 @@ class GitHubTool(BaseTool): } } }, - # New functions for PR review { "type": "function", "function": { @@ -581,740 +579,679 @@ class GitHubTool(BaseTool): } ] - @metrics.measure def execute(self, function_name, **kwargs): - self.logger.info(f"Executing: {function_name}") - - if function_name == "read_file": - return self._read_file(kwargs["path"]) - elif function_name == "create_branch": - return self._create_branch(kwargs["branch_name"], kwargs.get("base_branch", "main")) - elif function_name == "commit_file": - return self._commit_file(kwargs["file_path"], kwargs["content"], kwargs["commit_message"]) - elif function_name == "create_pull_request": - return self._create_pull_request(kwargs["title"], kwargs["body"], kwargs.get("base", "main")) - elif function_name == "list_files": - return self._list_files(kwargs["path"]) - elif function_name == "search_code": - return self._search_code(kwargs["query"]) - elif function_name == "get_commit_history": - return self._get_commit_history(kwargs["file_path"], kwargs.get("num_commits", 10)) - elif function_name == "view_commit_details_for_file": - return self._view_commit_details_for_file(kwargs["file_path"], kwargs.get("num_commits", 10)) - elif function_name == "get_current_branch": - return self._get_current_branch() - elif function_name == "set_current_branch": - return self._set_current_branch(kwargs["branch_name"]) - elif function_name == "get_file_at_commit": - return self._get_file_at_commit(kwargs["file_path"], kwargs["commit_sha"]) - elif function_name == "list_branches": - return self._list_branches(kwargs.get("per_page", 100), kwargs.get("all_pages", True)) - elif function_name == "get_branch_sha": - return self._get_branch_sha(kwargs["branch"]) - elif function_name == "approve_pull_request": - return self._approve_pull_request(kwargs["pull_number"]) - elif function_name == "close_pull_request": - return self._close_pull_request(kwargs["pull_number"]) - elif function_name == "merge_pull_request": - return self._merge_pull_request(kwargs["pull_number"], kwargs.get("commit_title", "Merge pull request"), - kwargs.get("commit_message", ""), kwargs.get("merge_method", "merge")) - elif function_name == "delete_branch": - return self._delete_branch(kwargs["branch_name"]) - elif function_name == "get_issue_details": - return self._get_issue_details(kwargs["issue_number"]) - elif function_name == "create_issue": - return self._create_issue(kwargs["title"], kwargs["body"], kwargs.get("labels", [])) - elif function_name == "list_issues": - return self._list_issues(kwargs.get("state", "open"), kwargs.get("per_page", 30), kwargs.get("page", 1)) - elif function_name == "add_issue_comment": - return self._add_issue_comment(kwargs["issue_number"], kwargs["comment"]) - elif function_name == "get_issue_comments": - return self._get_issue_comments(kwargs["issue_number"]) - elif function_name == "get_pull_request_general_comments": # New dispatch entry - return self._get_pull_request_general_comments(kwargs["pull_number"]) - elif function_name == "create_project_board": - return self._create_project_board(kwargs["name"], kwargs.get("body", "")) - elif function_name == "create_project_column": - return self._create_project_column(kwargs["project_id"], kwargs["column_name"]) - elif function_name == "create_project_card": - return self._create_project_card(kwargs["column_id"], kwargs["note"]) - elif function_name == "move_project_card": - return self._move_project_card(kwargs["card_id"], kwargs["position"], kwargs["column_id"]) - elif function_name == "link_issue_to_project_card": - return self._link_issue_to_project_card(kwargs["card_id"], kwargs["content_id"], kwargs["content_type"]) - elif function_name == "list_project_boards": - return self._list_project_boards() - elif function_name == "view_project_board_items": - return self._view_project_board_items(kwargs["project_id"]) - # New function dispatching - elif function_name == "get_pull_request_details": - return self._get_pull_request_details(kwargs["pull_number"]) - elif function_name == "get_pull_request_diff": - return self._get_pull_request_diff(kwargs["pull_number"]) - elif function_name == "get_pull_request_files": - return self._get_pull_request_files(kwargs["pull_number"]) - elif function_name == "create_pull_request_review_comment": - return self._create_pull_request_review_comment(kwargs["pull_number"], kwargs["body"], kwargs["commit_id"], - kwargs["path"], kwargs["position"], kwargs.get("side", "RIGHT"), - kwargs.get("start_line"), kwargs.get("start_side")) - elif function_name == "list_pull_request_review_comments": - return self._list_pull_request_review_comments(kwargs["pull_number"]) - elif function_name == "submit_pull_request_review": - return self._submit_pull_request_review(kwargs["pull_number"], kwargs["event"], kwargs.get("body")) + self.logger.info(f"Executing GitHub Tool function: {function_name} with args: {kwargs}") + # Dispatch to the appropriate private method + method_name = f"_{function_name}" + if hasattr(self, method_name): + method = getattr(self, method_name) + try: + return method(**kwargs) # Ensure only expected args are passed if method signature is strict + except Exception as e: + self.logger.error(f"Error executing {method_name}: {e}", exc_info=True) + return f"Error during {function_name} execution: {str(e)}" else: error_message = f"Unknown function: {function_name}" self.logger.error(error_message) return error_message + # Private methods for each function, using self.session for HTTP requests + @metrics.measure def _read_file(self, path): self.logger.info(f"Reading file: {path} from branch: {self.current_branch}") - url = f"{self.base_url}/repos/{self.repo}/contents/{path}" - response = requests.get(url, headers=self.headers, params={"ref": self.current_branch}) + url = f"{self.base_url}/repos/{self._repo}/contents/{path}" + response = self.session.get(url, params={"ref": self.current_branch}) if response.status_code == 200: content = response.json()["content"] decoded_content = base64.b64decode(content).decode('utf-8') self.logger.info(f"Successfully read file: {path}") return decoded_content else: - error_message = f"Error reading file: {response.status_code}" + error_message = f"Error reading file ({path}): {response.status_code} - {response.text}" self.logger.error(error_message) return error_message @metrics.measure - def _create_branch(self, branch_name, base_branch): + def _create_branch(self, branch_name, base_branch="main"): self.logger.info(f"Creating branch: {branch_name} from base: {base_branch}") - url = f"{self.base_url}/repos/{self.repo}/git/refs" - response = requests.get(f"{url}/heads/{base_branch}", headers=self.headers) - if response.status_code != 200: - error_message = f"Error getting base branch: {response.status_code}" + # Get SHA of base branch + ref_url = f"{self.base_url}/repos/{self._repo}/git/refs/heads/{base_branch}" + response_sha = self.session.get(ref_url) + if response_sha.status_code != 200: + error_message = f"Error getting base branch SHA ({base_branch}): {response_sha.status_code} - {response_sha.text}" self.logger.error(error_message) return error_message - - sha = response.json()["object"]["sha"] - data = { - "ref": f"refs/heads/{branch_name}", - "sha": sha - } - response = requests.post(url, headers=self.headers, json=data) - if response.status_code == 201: + sha = response_sha.json()["object"]["sha"] + + # Create new branch + create_ref_url = f"{self.base_url}/repos/{self._repo}/git/refs" + data = {"ref": f"refs/heads/{branch_name}", "sha": sha} + response_create = self.session.post(create_ref_url, json=data) + if response_create.status_code == 201: self.current_branch = branch_name - success_message = f"Branch '{branch_name}' created successfully and set as current branch" + success_message = f"Branch '{branch_name}' created successfully from '{base_branch}' and set as current branch." self.logger.info(success_message) return success_message else: - error_message = f"Error creating branch: {response.status_code}" + error_message = f"Error creating branch '{branch_name}': {response_create.status_code} - {response_create.text}" self.logger.error(error_message) return error_message @metrics.measure def _commit_file(self, file_path, content, commit_message): - self.logger.info(f"Committing file: {file_path} to branch: {self.current_branch}") + self.logger.info(f"Committing file: {file_path} to branch: {self.current_branch} with message: '{commit_message}'") if self.current_branch == "main": - error_message = "Cannot commit directly to main branch" - self.logger.error(error_message) + error_message = "Action directly to main branch is not allowed. Please create and switch to a new branch first." + self.logger.warning(error_message) return error_message - url = f"{self.base_url}/repos/{self.repo}/contents/{file_path}" - - self.logger.info("Checking if file already exists") - response = requests.get(url, headers=self.headers, params={"ref": self.current_branch}) - + url = f"{self.base_url}/repos/{self._repo}/contents/{file_path}" + encoded_content = base64.b64encode(content.encode('utf-8')).decode('utf-8') data = { "message": commit_message, - "content": base64.b64encode(content.encode()).decode(), + "content": encoded_content, "branch": self.current_branch } - if response.status_code == 200: - self.logger.info("File exists, updating") - file_sha = response.json()["sha"] - data["sha"] = file_sha + # Check if file exists to get its SHA for update + self.logger.info(f"Checking if file '{file_path}' exists on branch '{self.current_branch}'") + get_response = self.session.get(url, params={"ref": self.current_branch}) + if get_response.status_code == 200: + data["sha"] = get_response.json()["sha"] + self.logger.info(f"File '{file_path}' exists, will update.") + elif get_response.status_code == 404: + self.logger.info(f"File '{file_path}' does not exist, will create.") else: - self.logger.info("File does not exist, creating new file") + error_message = f"Error checking file existence for '{file_path}': {get_response.status_code} - {get_response.text}" + self.logger.error(error_message) + return error_message - response = requests.put(url, headers=self.headers, json=data) - - if response.status_code in [200, 201]: - success_message = f"File committed successfully to branch '{self.current_branch}'" + response = self.session.put(url, json=data) + if response.status_code in [200, 201]: # 200 for update, 201 for create + commit_sha = response.json().get("commit", {}).get("sha", "N/A") + success_message = f"File '{file_path}' committed successfully to branch '{self.current_branch}'. Commit SHA: {commit_sha}" self.logger.info(success_message) return success_message else: - error_message = f"Error committing file: {response.status_code}\nResponse: {response.text}" + error_message = f"Error committing file '{file_path}': {response.status_code} - {response.text}" self.logger.error(error_message) return error_message @metrics.measure - def _create_pull_request(self, title, body, base): - self.logger.info(f"Creating pull request: {title} from {self.current_branch} to {base}") - url = f"{self.base_url}/repos/{self.repo}/pulls" - data = { - "title": title, - "body": body, - "head": self.current_branch, - "base": base - } - response = requests.post(url, headers=self.headers, json=data) + def _create_pull_request(self, title, body, base="main"): + self.logger.info(f"Creating pull request: '{title}' from branch '{self.current_branch}' to '{base}'") + if self.current_branch == base: + error_message = f"Cannot create a pull request from branch '{self.current_branch}' to itself ('{base}')." + self.logger.warning(error_message) + return error_message + + url = f"{self.base_url}/repos/{self._repo}/pulls" + data = {"title": title, "body": body, "head": self.current_branch, "base": base} + response = self.session.post(url, json=data) if response.status_code == 201: - success_message = f"Pull request created successfully: {response.json()['html_url']}" + pr_html_url = response.json().get("html_url", "N/A") + pr_number = response.json().get("number", "N/A") + success_message = f"Pull request '{title}' created successfully: {pr_html_url} (Number: {pr_number})" self.logger.info(success_message) return success_message else: - error_message = f"Error creating pull request: {response.status_code}\nResponse: {response.text}" + error_message = f"Error creating pull request: {response.status_code} - {response.text}" self.logger.error(error_message) return error_message @metrics.measure def _get_branch_sha(self, branch): - url = f"{self.base_url}/repos/{self.repo}/git/refs/heads/{branch}" - response = requests.get(url, headers=self.headers) - if response.status_code == 200: - return response.json()["object"]["sha"] - else: - return f"Error getting branch SHA: {response.status_code}" + self.logger.info(f"Getting SHA for branch: {branch}") + url = f"{self.base_url}/repos/{self._repo}/git/refs/heads/{branch}" + response = self.session.get(url) + if response.status_code == 200: + sha = response.json()["object"]["sha"] + self.logger.info(f"SHA for branch '{branch}' is {sha}") + return sha + else: + error_message = f"Error getting SHA for branch '{branch}': {response.status_code} - {response.text}" + self.logger.error(error_message) + return error_message @metrics.measure def _list_files(self, path): - self.logger.info(f"Listing files in: {path} on branch: {self.current_branch}") - url = f"{self.base_url}/repos/{self.repo}/contents/{path}" - response = requests.get(url, headers=self.headers, params={"ref": self.current_branch}) + self.logger.info(f"Listing files in path: '{path}' on branch: '{self.current_branch}'") + url = f"{self.base_url}/repos/{self._repo}/contents/{path.strip('/')}" # Ensure no leading/trailing slashes for consistency + response = self.session.get(url, params={"ref": self.current_branch}) if response.status_code == 200: - files = [{"type": "file", "name": item["name"]} for item in response.json() if item["type"] == "file"] - directories = [{"type": "directory", "name": item["name"]} for item in response.json() if item["type"] == "dir"] - self.logger.info(f"Successfully listed files and directories in {path}") - files.extend(directories) - return files + items = response.json() + results = [] + if isinstance(items, list): # It's a directory listing + for item in items: + results.append({"name": item["name"], "type": item["type"], "path": item["path"]}) + elif isinstance(items, dict) and 'type' in items: # It's a single file response + results.append({"name": items["name"], "type": items["type"], "path": items["path"]}) + self.logger.info(f"Successfully listed {len(results)} items in '{path}'.") + return results + elif response.status_code == 404: + self.logger.warning(f"Path '{path}' not found on branch '{self.current_branch}'.") + return f"Error: Path '{path}' not found." else: - error_message = f"Error listing files: {response.status_code}" + error_message = f"Error listing files in '{path}': {response.status_code} - {response.text}" self.logger.error(error_message) return error_message @metrics.measure def _search_code(self, query): - self.logger.info(f"Searching code with query: {query}") + self.logger.info(f"Searching code with query: '{query}' in repo: '{self._repo}'") url = f"{self.base_url}/search/code" - params = { - "q": f"{query} repo:{self.repo}", - "per_page": 10 - } - response = requests.get(url, headers=self.headers, params=params) + params = {"q": f"{query} repo:{self._repo}"} + response = self.session.get(url, params=params) if response.status_code == 200: - results = [{"file": item["path"], "url": item["html_url"]} for item in response.json()["items"]] - self.logger.info(f"Successfully searched code. Found {len(results)} results.") + search_results = response.json().get("items", []) + results = [{"path": item["path"], "url": item["html_url"]} for item in search_results] + self.logger.info(f"Code search for '{query}' found {len(results)} items.") return results else: - error_message = f"Error searching code: {response.status_code}" + error_message = f"Error searching code for '{query}': {response.status_code} - {response.text}" self.logger.error(error_message) return error_message @metrics.measure - def _get_commit_history(self, file_path, num_commits): - self.logger.info(f"Getting commit history for file: {file_path}, number of commits: {num_commits}") - url = f"{self.base_url}/repos/{self.repo}/commits" - params = { - "path": file_path, - "per_page": num_commits - } - response = requests.get(url, headers=self.headers, params=params) + def _get_commit_history(self, file_path, num_commits=10): + self.logger.info(f"Getting last {num_commits} commit(s) for file: '{file_path}' on branch '{self.current_branch}'") + url = f"{self.base_url}/repos/{self._repo}/commits" + params = {"path": file_path, "sha": self.current_branch, "per_page": num_commits} + response = self.session.get(url, params=params) if response.status_code == 200: - commits = [{"sha": commit["sha"], "message": commit["commit"]["message"], "date": commit["commit"]["author"]["date"]} for commit in response.json()] - self.logger.info(f"Successfully retrieved commit history. Found {len(commits)} commits.") + commits_data = response.json() + commits = [{ + "sha": commit["sha"], + "message": commit["commit"]["message"], + "author": commit["commit"]["author"]["name"], + "date": commit["commit"]["author"]["date"] + } for commit in commits_data] + self.logger.info(f"Successfully retrieved {len(commits)} commit(s) for '{file_path}'.") return commits else: - error_message = f"Error getting commit history: {response.status_code}" + error_message = f"Error getting commit history for '{file_path}': {response.status_code} - {response.text}" self.logger.error(error_message) return error_message @metrics.measure - def _view_commit_details_for_file(self, file_path, num_commits): - self.logger.info(f"Viewing commit details for file: {file_path}, number of commits: {num_commits} (via _get_commit_history)") + def _view_commit_details_for_file(self, file_path, num_commits=10): + # This function is essentially the same as get_commit_history based on its description. + self.logger.info(f"Viewing commit details for file '{file_path}' (last {num_commits} commits) - using _get_commit_history.") return self._get_commit_history(file_path, num_commits) @metrics.measure def _get_current_branch(self): - self.logger.info(f"Getting current branch: {self.current_branch}") + self.logger.info(f"Current branch is: {self.current_branch}") return self.current_branch @metrics.measure def _set_current_branch(self, branch_name): - self.logger.info(f"Setting current branch from {self.current_branch} to {branch_name}") + self.logger.info(f"Attempting to set current branch to: {branch_name}") + # Check if branch exists by trying to get its SHA + sha_info = self._get_branch_sha(branch_name) + if isinstance(sha_info, str) and sha_info.startswith("Error getting SHA"): # Crude check for error string + error_message = f"Cannot set current branch: Branch '{branch_name}' not found or error accessing it. Details: {sha_info}" + self.logger.warning(error_message) + return error_message + self.current_branch = branch_name - return f"Current branch set to: {self.current_branch}" + success_message = f"Current branch set to: {self.current_branch}" + self.logger.info(success_message) + return success_message @metrics.measure def _get_file_at_commit(self, file_path, commit_sha): - self.logger.info(f"Getting file: {file_path} at commit: {commit_sha}") - url = f"{self.base_url}/repos/{self.repo}/contents/{file_path}" - response = requests.get(url, headers=self.headers, params={"ref": commit_sha}) + self.logger.info(f"Getting file '{file_path}' at commit SHA: {commit_sha}") + url = f"{self.base_url}/repos/{self._repo}/contents/{file_path}" + response = self.session.get(url, params={"ref": commit_sha}) if response.status_code == 200: content = response.json()["content"] decoded_content = base64.b64decode(content).decode('utf-8') - self.logger.info(f"Successfully retrieved file at commit") + self.logger.info(f"Successfully retrieved file '{file_path}' at commit {commit_sha}.") return decoded_content else: - error_message = f"Error reading file at commit: {response.status_code}" + error_message = f"Error reading file '{file_path}' at commit {commit_sha}: {response.status_code} - {response.text}" self.logger.error(error_message) return error_message @metrics.measure def _list_branches(self, per_page=100, all_pages=True): - self.logger.info(f"Listing branches. Per page: {per_page}, All pages: {all_pages}") - url = f"{self.base_url}/repos/{self.repo}/branches" - params = {"per_page": min(per_page, 100)} # GitHub API max is 100 per page - all_branches = [] - + self.logger.info(f"Listing branches for repo '{self._repo}'. Per_page={per_page}, All_pages={all_pages}") + url = f"{self.base_url}/repos/{self._repo}/branches" + params = {"per_page": min(per_page, 100)} # Respect GitHub API limit + branches_list = [] + page = 1 while url: - self.logger.info(f"Fetching branches from: {url}") - response = requests.get(url, headers=self.headers, params=params) + self.logger.debug(f"Fetching page {page} from {url} with params {params if page==1 else {}}") + response = self.session.get(url, params=params if page == 1 else None) # params only for first page if paginating via links if response.status_code != 200: - error_message = f"Error listing branches: {response.status_code}" + error_message = f"Error listing branches: {response.status_code} - {response.text}" self.logger.error(error_message) return error_message + + current_page_branches = [branch["name"] for branch in response.json()] + branches_list.extend(current_page_branches) + self.logger.debug(f"Fetched {len(current_page_branches)} branches on page {page}.") - branches = [branch["name"] for branch in response.json()] - all_branches.extend(branches) - self.logger.info(f"Fetched {len(branches)} branches") - - if not all_pages: + if not all_pages or not response.links.get("next"): break + url = response.links["next"]["url"] + page += 1 + params = {} # Clear params for subsequent calls using a link that includes them - # Check if there's a next page - url = response.links.get('next', {}).get('url') - if url: - params = {} # Remove per_page for subsequent requests - - self.logger.info(f"Successfully listed all branches. Total: {len(all_branches)}") - return all_branches + self.logger.info(f"Successfully listed {len(branches_list)} branches.") + return branches_list @metrics.measure def _approve_pull_request(self, pull_number): - self.logger.info(f"Approving pull request: {pull_number}") - url = f"{self.base_url}/repos/{self.repo}/pulls/{pull_number}/reviews" + self.logger.info(f"Approving pull request #{pull_number}") + url = f"{self.base_url}/repos/{self._repo}/pulls/{pull_number}/reviews" data = {"event": "APPROVE"} - response = requests.post(url, headers=self.headers, json=data) + response = self.session.post(url, json=data) if response.status_code == 200: - success_message = f"Pull request {pull_number} approved successfully" + success_message = f"Pull request #{pull_number} approved successfully." self.logger.info(success_message) return success_message else: - error_message = f"Error approving pull request: {response.status_code}\nResponse: {response.text}" + error_message = f"Error approving pull request #{pull_number}: {response.status_code} - {response.text}" self.logger.error(error_message) return error_message @metrics.measure def _close_pull_request(self, pull_number): - self.logger.info(f"Closing pull request: {pull_number}") - url = f"{self.base_url}/repos/{self.repo}/pulls/{pull_number}" + self.logger.info(f"Closing pull request #{pull_number}") + url = f"{self.base_url}/repos/{self._repo}/pulls/{pull_number}" data = {"state": "closed"} - response = requests.patch(url, headers=self.headers, json=data) + response = self.session.patch(url, json=data) # Use PATCH for update if response.status_code == 200: - success_message = f"Pull request {pull_number} closed successfully" + success_message = f"Pull request #{pull_number} closed successfully." self.logger.info(success_message) return success_message else: - error_message = f"Error closing pull request: {response.status_code}\nResponse: {response.text}" + error_message = f"Error closing pull request #{pull_number}: {response.status_code} - {response.text}" self.logger.error(error_message) return error_message @metrics.measure - def _merge_pull_request(self, pull_number, commit_title, commit_message, merge_method): - self.logger.info(f"Merging pull request: {pull_number}") - url = f"{self.base_url}/repos/{self.repo}/pulls/{pull_number}/merge" + def _merge_pull_request(self, pull_number, commit_title="Merge pull request", commit_message="", merge_method="merge"): + self.logger.info(f"Merging pull request #{pull_number} using method '{merge_method}'") + url = f"{self.base_url}/repos/{self._repo}/pulls/{pull_number}/merge" data = {"commit_title": commit_title, "commit_message": commit_message, "merge_method": merge_method} - response = requests.put(url, headers=self.headers, json=data) + response = self.session.put(url, json=data) if response.status_code == 200: - success_message = f"Pull request {pull_number} merged successfully" + success_message = f"Pull request #{pull_number} merged successfully." self.logger.info(success_message) return success_message + elif response.status_code == 405: # Method Not Allowed (e.g., PR not mergeable) + error_message = f"Error merging pull request #{pull_number}: Not mergeable. {response.json().get('message', response.text)}" + self.logger.warning(error_message) + return error_message + elif response.status_code == 409: # Conflict + error_message = f"Error merging pull request #{pull_number}: Merge conflict. {response.json().get('message', response.text)}" + self.logger.warning(error_message) + return error_message else: - error_message = f"Error merging pull request: {response.status_code}\nResponse: {response.text}" + error_message = f"Error merging pull request #{pull_number}: {response.status_code} - {response.text}" self.logger.error(error_message) return error_message @metrics.measure def _delete_branch(self, branch_name): self.logger.info(f"Deleting branch: {branch_name}") - url = f"{self.base_url}/repos/{self.repo}/git/refs/heads/{branch_name}" - response = requests.delete(url, headers=self.headers) + if branch_name == "main" or (hasattr(self, 'default_branch') and branch_name == self.default_branch) : + # Add a check for a configurable default branch if necessary + error_message = f"Cannot delete protected branch: {branch_name}" + self.logger.warning(error_message) + return error_message + + url = f"{self.base_url}/repos/{self._repo}/git/refs/heads/{branch_name}" + response = self.session.delete(url) if response.status_code == 204: - success_message = f"Branch {branch_name} deleted successfully" + success_message = f"Branch '{branch_name}' deleted successfully." self.logger.info(success_message) + if self.current_branch == branch_name: + self.current_branch = "main" # Or some other default + self.logger.info(f"Current branch was {branch_name}, reset to {self.current_branch}.") return success_message else: - error_message = f"Error deleting branch: {response.status_code}\nResponse: {response.text}" + error_message = f"Error deleting branch '{branch_name}': {response.status_code} - {response.text}" self.logger.error(error_message) return error_message + @metrics.measure def _get_issue_details(self, issue_number): - self.logger.info(f"Getting details for issue: {issue_number}") - url = f"{self.base_url}/repos/{self.repo}/issues/{issue_number}" - response = requests.get(url, headers=self.headers) + self.logger.info(f"Getting details for issue #{issue_number}") + url = f"{self.base_url}/repos/{self._repo}/issues/{issue_number}" + response = self.session.get(url) if response.status_code == 200: - issue_data = response.json() - issue_details = { - "number": issue_data["number"], - "title": issue_data["title"], - "state": issue_data["state"], - "body": issue_data["body"], - "created_at": issue_data["created_at"], - "updated_at": issue_data["updated_at"], - "labels": [label["name"] for label in issue_data["labels"]], - "assignees": [assignee["login"] for assignee in issue_data["assignees"]], - "comments": issue_data["comments"] - } - self.logger.info(f"Successfully retrieved details for issue {issue_number}") - return issue_details + self.logger.info(f"Successfully retrieved details for issue #{issue_number}.") + return response.json() # Return raw JSON data for now else: - error_message = f"Error getting issue details: {response.status_code}\nResponse: {response.text}" + error_message = f"Error getting details for issue #{issue_number}: {response.status_code} - {response.text}" self.logger.error(error_message) return error_message - + @metrics.measure def _create_issue(self, title, body, labels=None): - self.logger.info(f"Creating issue: {title}") - url = f"{self.base_url}/repos/{self.repo}/issues" - data = { - "title": title, - "body": body - } - if labels: - data["labels"] = labels - response = requests.post(url, headers=self.headers, json=data) + self.logger.info(f"Creating new issue with title: '{title}'") + url = f"{self.base_url}/repos/{self._repo}/issues" + data = {"title": title, "body": body} + if labels: # Ensure labels is a list of strings + data["labels"] = labels if isinstance(labels, list) else [labels] + + response = self.session.post(url, json=data) if response.status_code == 201: - issue = response.json() - success_message = f"Issue created successfully: {issue['html_url']}" + issue_html_url = response.json().get("html_url", "N/A") + issue_number = response.json().get("number", "N/A") + success_message = f"Issue '{title}' created successfully: {issue_html_url} (Number: {issue_number})" self.logger.info(success_message) return success_message else: - error_message = f"Error creating issue: {response.status_code}\nResponse: {response.text}" + error_message = f"Error creating issue '{title}': {response.status_code} - {response.text}" self.logger.error(error_message) return error_message @metrics.measure def _list_issues(self, state="open", per_page=30, page=1): - self.logger.info(f"Listing issues. State: {state}, Per page: {per_page}, Page: {page}") - url = f"{self.base_url}/repos/{self.repo}/issues" - params = { - "state": state, - "per_page": per_page, - "page": page - } - response = requests.get(url, headers=self.headers, params=params) + self.logger.info(f"Listing issues with state: {state}, per_page: {per_page}, page: {page}") + url = f"{self.base_url}/repos/{self._repo}/issues" + params = {"state": state, "per_page": per_page, "page": page} + response = self.session.get(url, params=params) if response.status_code == 200: - issues = [{ - "number": issue["number"], - "title": issue["title"], - "state": issue["state"], - "created_at": issue["created_at"], - "url": issue["html_url"] - } for issue in response.json()] - self.logger.info(f"Successfully listed issues. Found {len(issues)} issues.") - return issues + issues_data = response.json() + self.logger.info(f"Successfully listed {len(issues_data)} issues.") + # Return a summary or full data based on needs + return [{ "title": i["title"], "number": i["number"], "state": i["state"], "url": i["html_url"] } for i in issues_data] else: - error_message = f"Error listing issues: {response.status_code}\nResponse: {response.text}" + error_message = f"Error listing issues: {response.status_code} - {response.text}" self.logger.error(error_message) return error_message - + @metrics.measure def _add_issue_comment(self, issue_number, comment): - self.logger.info(f"Adding comment to issue: {issue_number}") - url = f"{self.base_url}/repos/{self.repo}/issues/{issue_number}/comments" + self.logger.info(f"Adding comment to issue #{issue_number}: '{comment[:50]}...'") + url = f"{self.base_url}/repos/{self._repo}/issues/{issue_number}/comments" data = {"body": comment} - response = requests.post(url, headers=self.headers, json=data) + response = self.session.post(url, json=data) if response.status_code == 201: - comment_data = response.json() - success_message = f"Comment added successfully to issue {issue_number}: {comment_data['html_url']}" + comment_html_url = response.json().get("html_url", "N/A") + success_message = f"Comment added to issue #{issue_number} successfully: {comment_html_url}" self.logger.info(success_message) return success_message else: - error_message = f"Error adding comment to issue: {response.status_code}\nResponse: {response.text}" + error_message = f"Error adding comment to issue #{issue_number}: {response.status_code} - {response.text}" self.logger.error(error_message) return error_message @metrics.measure def _get_issue_comments(self, issue_number): - self.logger.info(f"Getting comments for issue: {issue_number}") - url = f"{self.base_url}/repos/{self.repo}/issues/{issue_number}/comments" - response = requests.get(url, headers=self.headers) + self.logger.info(f"Getting comments for issue #{issue_number}") + url = f"{self.base_url}/repos/{self._repo}/issues/{issue_number}/comments" + response = self.session.get(url) if response.status_code == 200: - comments = [{ - "id": comment["id"], - "user": comment["user"]["login"], - "body": comment["body"], - "created_at": comment["created_at"], - "updated_at": comment["updated_at"] - } for comment in response.json()] - self.logger.info(f"Successfully retrieved comments for issue {issue_number}. Found {len(comments)} comments.") - return comments + comments_data = response.json() + self.logger.info(f"Successfully retrieved {len(comments_data)} comments for issue #{issue_number}.") + # Return summary or full data + return [{ "user": c["user"]["login"], "body": c["body"], "created_at": c["created_at"] } for c in comments_data] else: - error_message = f"Error getting issue comments: {response.status_code}\nResponse: {response.text}" + error_message = f"Error getting comments for issue #{issue_number}: {response.status_code} - {response.text}" self.logger.error(error_message) return error_message - # New method for PR general comments @metrics.measure def _get_pull_request_general_comments(self, pull_number): - self.logger.info(f"Getting general comments for pull request: {pull_number}") - # Pull request comments are treated as issue comments in the GitHub API - # Re-use the existing _get_issue_comments method + self.logger.info(f"Getting general comments for pull request #{pull_number}") + # In GitHub API, PR comments (general, not review comments on lines) are issue comments. + # The PR is also an issue, so use the issue comments endpoint. return self._get_issue_comments(issue_number=pull_number) @metrics.measure def _create_project_board(self, name, body=None): - url = f"{self.base_url}/repos/{self.repo}/projects" - data = {"name": name, "body": body} - response = requests.post(url, headers=self.headers, json=data) + self.logger.info(f"Creating project board: '{name}'") + url = f"{self.base_url}/repos/{self._repo}/projects" + headers = self.session.headers.copy() # Get existing session headers + headers["Accept"] = "application/vnd.github.inertia-preview+json" # Required for Projects API + data = {"name": name} + if body: data["body"] = body + response = self.session.post(url, headers=headers, json=data) if response.status_code == 201: - project = response.json() - success_message = f"Project board '{name}' created successfully." + project_data = response.json() + success_message = f"Project board '{name}' created successfully with ID: {project_data['id']}" self.logger.info(success_message) - return { - "status": "success", - "status_code": response.status_code, - "message": success_message, - "data": project - } + return project_data # Return full project data else: - error_message = f"Error creating project board: {response.status_code}" + error_message = f"Error creating project board '{name}': {response.status_code} - {response.text}" self.logger.error(error_message) - return { - "status": "error", - "status_code": response.status_code, - "message": error_message, - "response": response.text - } + return error_message @metrics.measure def _create_project_column(self, project_id, column_name): + self.logger.info(f"Creating column '{column_name}' for project ID: {project_id}") url = f"{self.base_url}/projects/{project_id}/columns" + headers = self.session.headers.copy() + headers["Accept"] = "application/vnd.github.inertia-preview+json" data = {"name": column_name} - response = requests.post(url, headers=self.headers, json=data) + response = self.session.post(url, headers=headers, json=data) if response.status_code == 201: - column = response.json() - success_message = f"Column '{column_name}' created successfully in project {project_id}." + column_data = response.json() + success_message = f"Column '{column_name}' created successfully for project {project_id} with ID: {column_data['id']}" self.logger.info(success_message) - return { - "status": "success", - "status_code": response.status_code, - "message": success_message, - "data": column - } + return column_data else: - error_message = f"Error creating project column: {response.status_code}" + error_message = f"Error creating column '{column_name}' for project {project_id}: {response.status_code} - {response.text}" self.logger.error(error_message) - return { - "status": "error", - "status_code": response.status_code, - "message": error_message, - "response": response.text - } + return error_message @metrics.measure - def _create_project_card(self, column_id, note): + def _create_project_card(self, column_id, note=None, content_id=None, content_type=None): + self.logger.info(f"Creating card in column ID: {column_id}") url = f"{self.base_url}/projects/columns/{column_id}/cards" - data = {"note": note} - response = requests.post(url, headers=self.headers, json=data) + headers = self.session.headers.copy() + headers["Accept"] = "application/vnd.github.inertia-preview+json" + data = {} + if note: + data["note"] = note + if content_id and content_type: + data["content_id"] = content_id + data["content_type"] = content_type + elif (content_id and not content_type) or (not content_id and content_type): + err = "Both content_id and content_type must be provided to link content to a project card." + self.logger.warning(err) + return err + + if not data: + return "Error: Card must have a note or content to link." + + response = self.session.post(url, headers=headers, json=data) if response.status_code == 201: - card = response.json() - success_message = f"Card created successfully in column {column_id}." + card_data = response.json() + success_message = f"Card created successfully in column {column_id} with ID: {card_data['id']}" self.logger.info(success_message) - return { - "status": "success", - "status_code": response.status_code, - "message": success_message, - "data": card - } + return card_data else: - error_message = f"Error creating project card: {response.status_code}" + error_message = f"Error creating card in column {column_id}: {response.status_code} - {response.text}" self.logger.error(error_message) - return { - "status": "error", - "status_code": response.status_code, - "message": error_message, - "response": response.text - } + return error_message @metrics.measure - def _move_project_card(self, card_id, position, column_id): + def _move_project_card(self, card_id, position, column_id=None): + self.logger.info(f"Moving card ID: {card_id} to position: {position}" + (f" in column ID: {column_id}" if column_id else "")) url = f"{self.base_url}/projects/columns/cards/{card_id}/moves" - data = {"position": position, "column_id": column_id} - response = requests.post(url, headers=self.headers, json=data) - if response.status_code == 201: - success_message = f"Card {card_id} moved successfully." + headers = self.session.headers.copy() + headers["Accept"] = "application/vnd.github.inertia-preview+json" + data = {"position": position} + if column_id: + data["column_id"] = column_id + + response = self.session.post(url, headers=headers, json=data) + if response.status_code == 201: # Successful move returns 201 with empty body + success_message = f"Card {card_id} moved successfully to position {position}" + (f" in column {column_id}" if column_id else ".") self.logger.info(success_message) - return { - "status": "success", - "status_code": response.status_code, - "message": success_message - } + return success_message # Return success message as body is empty else: - error_message = f"Error moving project card: {response.status_code}" + error_message = f"Error moving card {card_id}: {response.status_code} - {response.text}" self.logger.error(error_message) - return { - "status": "error", - "status_code": response.status_code, - "message": error_message, - "response": response.text - } + return error_message + # _link_issue_to_project_card is effectively handled by _create_project_card if content_id and content_type are passed. + # The API used to have a separate link endpoint, but now it is part of card creation/update. + # For updating an existing card to link an issue, one would PATCH the card's content_id/content_type. + # Let's assume the function intends to update an existing card if it's a separate function. + # However, the provided API spec for `link_issue_to_project_card` uses PATCH on card_id, so let's implement that. @metrics.measure def _link_issue_to_project_card(self, card_id, content_id, content_type): - url = f"{self.base_url}/projects/columns/cards/{card_id}" + self.logger.info(f"Linking content_id {content_id} (type: {content_type}) to card_id {card_id}") + url = f"{self.base_url}/projects/cards/{card_id}" # Note: API docs suggest /projects/columns/cards/{card_id} or /projects/cards/{card_id} + # Using /projects/cards/{card_id} as it seems more general for card update. + headers = self.session.headers.copy() + headers["Accept"] = "application/vnd.github.inertia-preview+json" data = {"content_id": content_id, "content_type": content_type} - response = requests.patch(url, headers=self.headers, json=data) + + response = self.session.patch(url, headers=headers, json=data) if response.status_code == 200: - success_message = f"Issue/PR linked to card {card_id} successfully." + updated_card = response.json() + success_message = f"{content_type} {content_id} linked to card {card_id} successfully." self.logger.info(success_message) - return { - "status": "success", - "status_code": response.status_code, - "message": success_message - } + return updated_card else: - error_message = f"Error linking issue/PR to project card: {response.status_code}" + error_message = f"Error linking {content_type} {content_id} to card {card_id}: {response.status_code} - {response.text}" self.logger.error(error_message) - return { - "status": "error", - "status_code": response.status_code, - "message": error_message, - "response": response.text - } - + return error_message + @metrics.measure def _list_project_boards(self): - self.logger.info("Fetching project boards...") - url = f"{self.base_url}/repos/{self.repo}/projects" - response = requests.get(url, headers=self.headers) + self.logger.info(f"Listing project boards for repo: {self._repo}") + url = f"{self.base_url}/repos/{self._repo}/projects" + headers = self.session.headers.copy() + headers["Accept"] = "application/vnd.github.inertia-preview+json" + response = self.session.get(url, headers=headers) if response.status_code == 200: - boards = response.json() - self.logger.info(f"Successfully fetched {len(boards)} project boards.") - return boards + projects_data = response.json() + self.logger.info(f"Successfully listed {len(projects_data)} project boards.") + return projects_data else: - error_message = f"Error fetching project boards: {response.status_code}" + error_message = f"Error listing project boards: {response.status_code} - {response.text}" self.logger.error(error_message) return error_message @metrics.measure def _view_project_board_items(self, project_id): - self.logger.info(f"Fetching items for project board ID: {project_id}...") + self.logger.info(f"Viewing items for project ID: {project_id}") columns_url = f"{self.base_url}/projects/{project_id}/columns" - columns_response = requests.get(columns_url, headers=self.headers) - if columns_response.status_code == 200: - columns = columns_response.json() - items = [] - for column in columns: - column_id = column["id"] - column_name = column["name"] - cards_url = f"{self.base_url}/projects/columns/{column_id}/cards" - cards_response = requests.get(cards_url, headers=self.headers) - if cards_response.status_code == 200: - cards = cards_response.json() - items.append({"column": column_name, "cards": cards}) - else: - self.logger.error(f"Error fetching cards for column {column_id}: {cards_response.status_code}") - items.append({"column": column_name, "cards": "Error fetching cards"}) - self.logger.info(f"Successfully fetched items for project board ID: {project_id}.") - return items - else: - error_message = f"Error fetching columns for project board: {columns_response.status_code}" + headers = self.session.headers.copy() + headers["Accept"] = "application/vnd.github.inertia-preview+json" + + columns_response = self.session.get(columns_url, headers=headers) + if columns_response.status_code != 200: + error_message = f"Error fetching columns for project {project_id}: {columns_response.status_code} - {columns_response.text}" self.logger.error(error_message) return error_message + + columns_data = columns_response.json() + project_items = [] + for column in columns_data: + column_info = {"id": column["id"], "name": column["name"], "cards": []} + cards_url = column["cards_url"] + cards_response = self.session.get(cards_url, headers=headers) + if cards_response.status_code == 200: + column_info["cards"] = cards_response.json() + else: + self.logger.error(f"Error fetching cards for column {column['id']}('{column['name']}'): {cards_response.status_code} - {cards_response.text}") + column_info["cards"] = "Error fetching cards" + project_items.append(column_info) + + self.logger.info(f"Successfully retrieved items for project ID: {project_id}.") + return project_items - # New functions for PR review capabilities @metrics.measure def _get_pull_request_details(self, pull_number): - self.logger.info(f"Getting details for pull request: {pull_number}") - url = f"{self.base_url}/repos/{self.repo}/pulls/{pull_number}" - response = requests.get(url, headers=self.headers) + self.logger.info(f"Getting details for PR #{pull_number}") + url = f"{self.base_url}/repos/{self._repo}/pulls/{pull_number}" + response = self.session.get(url) if response.status_code == 200: - self.logger.info(f"Successfully retrieved details for PR {pull_number}") + self.logger.info(f"Successfully retrieved details for PR #{pull_number}.") return response.json() else: - error_message = f"Error getting pull request details: {response.status_code}\nResponse: {response.text}" + error_message = f"Error getting details for PR #{pull_number}: {response.status_code} - {response.text}" self.logger.error(error_message) return error_message @metrics.measure def _get_pull_request_diff(self, pull_number): - self.logger.info(f"Getting diff for pull request: {pull_number}") - url = f"{self.base_url}/repos/{self.repo}/pulls/{pull_number}" - headers = self.headers.copy() - headers["Accept"] = "application/vnd.github.v3.diff" - response = requests.get(url, headers=headers) + self.logger.info(f"Getting diff for PR #{pull_number}") + url = f"{self.base_url}/repos/{self._repo}/pulls/{pull_number}" + diff_headers = self.session.headers.copy() + diff_headers["Accept"] = "application/vnd.github.diff" + response = self.session.get(url, headers=diff_headers) if response.status_code == 200: - self.logger.info(f"Successfully retrieved diff for PR {pull_number}") + self.logger.info(f"Successfully retrieved diff for PR #{pull_number}.") return response.text else: - error_message = f"Error getting pull request diff: {response.status_code}\nResponse: {response.text}" + error_message = f"Error getting diff for PR #{pull_number}: {response.status_code} - {response.text}" self.logger.error(error_message) return error_message @metrics.measure def _get_pull_request_files(self, pull_number): - self.logger.info(f"Getting files for pull request: {pull_number}") - url = f"{self.base_url}/repos/{self.repo}/pulls/{pull_number}/files" - response = requests.get(url, headers=self.headers) + self.logger.info(f"Getting files for PR #{pull_number}") + url = f"{self.base_url}/repos/{self._repo}/pulls/{pull_number}/files" + response = self.session.get(url) if response.status_code == 200: - self.logger.info(f"Successfully retrieved files for PR {pull_number}") + self.logger.info(f"Successfully retrieved files for PR #{pull_number}.") return response.json() else: - error_message = f"Error getting pull request files: {response.status_code}\nResponse: {response.text}" + error_message = f"Error getting files for PR #{pull_number}: {response.status_code} - {response.text}" self.logger.error(error_message) return error_message @metrics.measure def _create_pull_request_review_comment(self, pull_number, body, commit_id, path, position, side="RIGHT", start_line=None, start_side=None): - self.logger.info(f"Adding review comment to PR {pull_number} on file {path} at position {position}") - url = f"{self.base_url}/repos/{self.repo}/pulls/{pull_number}/comments" - data = { - "body": body, - "commit_id": commit_id, - "path": path, - "position": position, - "side": side - } - if start_line is not None: - data["start_line"] = start_line - if start_side is not None: - data["start_side"] = start_side - - response = requests.post(url, headers=self.headers, json=data) + self.logger.info(f"Creating review comment on PR #{pull_number}, file '{path}', position {position}") + url = f"{self.base_url}/repos/{self._repo}/pulls/{pull_number}/comments" + data = {"body": body, "commit_id": commit_id, "path": path, "position": position, "side": side} + if start_line is not None: data["start_line"] = start_line + if start_side is not None: data["start_side"] = start_side + + response = self.session.post(url, json=data) if response.status_code == 201: - success_message = f"Comment added to PR {pull_number} successfully: {response.json()['html_url']}" + comment_url = response.json().get("html_url", "N/A") + success_message = f"Review comment created successfully on PR #{pull_number}: {comment_url}" self.logger.info(success_message) return success_message else: - error_message = f"Error creating pull request review comment: {response.status_code}\nResponse: {response.text}" + error_message = f"Error creating review comment on PR #{pull_number}: {response.status_code} - {response.text}" self.logger.error(error_message) return error_message @metrics.measure def _list_pull_request_review_comments(self, pull_number): - self.logger.info(f"Listing review comments for pull request: {pull_number}") - url = f"{self.base_url}/repos/{self.repo}/pulls/{pull_number}/comments" - response = requests.get(url, headers=self.headers) + self.logger.info(f"Listing review comments for PR #{pull_number}") + url = f"{self.base_url}/repos/{self._repo}/pulls/{pull_number}/comments" + response = self.session.get(url) if response.status_code == 200: - self.logger.info(f"Successfully retrieved review comments for PR {pull_number}") + self.logger.info(f"Successfully retrieved review comments for PR #{pull_number}.") return response.json() else: - error_message = f"Error listing pull request review comments: {response.status_code}\nResponse: {response.text}" + error_message = f"Error listing review comments for PR #{pull_number}: {response.status_code} - {response.text}" self.logger.error(error_message) return error_message @metrics.measure def _submit_pull_request_review(self, pull_number, event, body=None): - self.logger.info(f"Submitting review for pull request {pull_number} with event: {event}") - url = f"{self.base_url}/repos/{self.repo}/pulls/{pull_number}/reviews" - data = {"event": event} - if body: - data["body"] = body - response = requests.post(url, headers=self.headers, json=data) + self.logger.info(f"Submitting '{event}' review for PR #{pull_number}") + url = f"{self.base_url}/repos/{self._repo}/pulls/{pull_number}/reviews" + data = {"event": event.upper()} # Ensure event is uppercase as per API + if body: data["body"] = body + + response = self.session.post(url, json=data) if response.status_code == 200: - success_message = f"Review submitted for PR {pull_number} successfully." + review_url = response.json().get("html_url", "N/A") + success_message = f"Review ({event}) submitted successfully for PR #{pull_number}: {review_url}" self.logger.info(success_message) return success_message else: - error_message = f"Error submitting pull request review: {response.status_code}\nResponse: {response.text}" + error_message = f"Error submitting review for PR #{pull_number}: {response.status_code} - {response.text}" self.logger.error(error_message) return error_message