Update github_tool.py
This commit is contained in:
+234
-17
@@ -356,66 +356,283 @@ class GitHubTool(BaseTool):
|
||||
self.logger.error(error_message)
|
||||
return error_message
|
||||
|
||||
@metrics.measure
|
||||
@metrics.measure
|
||||
def _read_file(self, path):
|
||||
# ... (rest of the method remains unchanged)
|
||||
self.logger.info(f"Reading file: {path} from branch: {self.current_branch}")
|
||||
url = f"{self.base_url}/repos/{self.repo}/contents/{path}"
|
||||
response = requests.get(url, headers=self.headers, params={"ref": self.current_branch})
|
||||
if response.status_code == 200:
|
||||
content = response.json()["content"]
|
||||
decoded_content = base64.b64decode(content).decode('utf-8')
|
||||
self.logger.info(f"Successfully read file: {path}")
|
||||
return decoded_content
|
||||
else:
|
||||
error_message = f"Error reading file: {response.status_code}"
|
||||
self.logger.error(error_message)
|
||||
return error_message
|
||||
|
||||
@metrics.measure
|
||||
def _create_branch(self, branch_name, base_branch):
|
||||
# ... (rest of the method remains unchanged)
|
||||
self.logger.info(f"Creating branch: {branch_name} from base: {base_branch}")
|
||||
url = f"{self.base_url}/repos/{self.repo}/git/refs"
|
||||
response = requests.get(f"{url}/heads/{base_branch}", headers=self.headers)
|
||||
if response.status_code != 200:
|
||||
error_message = f"Error getting base branch: {response.status_code}"
|
||||
self.logger.error(error_message)
|
||||
return error_message
|
||||
|
||||
sha = response.json()["object"]["sha"]
|
||||
data = {
|
||||
"ref": f"refs/heads/{branch_name}",
|
||||
"sha": sha
|
||||
}
|
||||
response = requests.post(url, headers=self.headers, json=data)
|
||||
if response.status_code == 201:
|
||||
self.current_branch = branch_name
|
||||
success_message = f"Branch '{branch_name}' created successfully and set as current branch"
|
||||
self.logger.info(success_message)
|
||||
return success_message
|
||||
else:
|
||||
error_message = f"Error creating branch: {response.status_code}"
|
||||
self.logger.error(error_message)
|
||||
return error_message
|
||||
|
||||
@metrics.measure
|
||||
def _commit_file(self, file_path, content, commit_message):
|
||||
# ... (rest of the method remains unchanged)
|
||||
self.logger.info(f"Committing file: {file_path} to branch: {self.current_branch}")
|
||||
if self.current_branch == "main":
|
||||
error_message = "Cannot commit directly to main branch"
|
||||
self.logger.error(error_message)
|
||||
return error_message
|
||||
|
||||
url = f"{self.base_url}/repos/{self.repo}/contents/{file_path}"
|
||||
|
||||
self.logger.info("Checking if file already exists")
|
||||
response = requests.get(url, headers=self.headers, params={"ref": self.current_branch})
|
||||
|
||||
data = {
|
||||
"message": commit_message,
|
||||
"content": base64.b64encode(content.encode()).decode(),
|
||||
"branch": self.current_branch
|
||||
}
|
||||
|
||||
if response.status_code == 200:
|
||||
self.logger.info("File exists, updating")
|
||||
file_sha = response.json()["sha"]
|
||||
data["sha"] = file_sha
|
||||
else:
|
||||
self.logger.info("File does not exist, creating new file")
|
||||
|
||||
response = requests.put(url, headers=self.headers, json=data)
|
||||
|
||||
if response.status_code in [200, 201]:
|
||||
success_message = f"File committed successfully to branch '{self.current_branch}'"
|
||||
self.logger.info(success_message)
|
||||
return success_message
|
||||
else:
|
||||
error_message = f"Error committing file: {response.status_code}\nResponse: {response.text}"
|
||||
self.logger.error(error_message)
|
||||
return error_message
|
||||
|
||||
@metrics.measure
|
||||
def _create_pull_request(self, title, body, base):
|
||||
# ... (rest of the method remains unchanged)
|
||||
self.logger.info(f"Creating pull request: {title} from {self.current_branch} to {base}")
|
||||
url = f"{self.base_url}/repos/{self.repo}/pulls"
|
||||
data = {
|
||||
"title": title,
|
||||
"body": body,
|
||||
"head": self.current_branch,
|
||||
"base": base
|
||||
}
|
||||
response = requests.post(url, headers=self.headers, json=data)
|
||||
if response.status_code == 201:
|
||||
success_message = f"Pull request created successfully: {response.json()['html_url']}"
|
||||
self.logger.info(success_message)
|
||||
return success_message
|
||||
else:
|
||||
error_message = f"Error creating pull request: {response.status_code}\nResponse: {response.text}"
|
||||
self.logger.error(error_message)
|
||||
return error_message
|
||||
|
||||
@metrics.measure
|
||||
def _get_branch_sha(self, branch):
|
||||
# ... (rest of the method remains unchanged)
|
||||
url = f"{self.base_url}/repos/{self.repo}/git/refs/heads/{branch}"
|
||||
response = requests.get(url, headers=self.headers)
|
||||
if response.status_code == 200:
|
||||
return response.json()["object"]["sha"]
|
||||
else:
|
||||
return f"Error getting branch SHA: {response.status_code}"
|
||||
|
||||
@metrics.measure
|
||||
def _list_files(self, path):
|
||||
# ... (rest of the method remains unchanged)
|
||||
self.logger.info(f"Listing files in: {path} on branch: {self.current_branch}")
|
||||
url = f"{self.base_url}/repos/{self.repo}/contents/{path}"
|
||||
response = requests.get(url, headers=self.headers, params={"ref": self.current_branch})
|
||||
if response.status_code == 200:
|
||||
files = [item["name"] for item in response.json() if item["type"] == "file"]
|
||||
directories = [item["name"] for item in response.json() if item["type"] == "dir"]
|
||||
self.logger.info(f"Successfully listed files and directories in {path}")
|
||||
return {"files": files, "directories": directories}
|
||||
else:
|
||||
error_message = f"Error listing files: {response.status_code}"
|
||||
self.logger.error(error_message)
|
||||
return error_message
|
||||
|
||||
@metrics.measure
|
||||
def _search_code(self, query):
|
||||
# ... (rest of the method remains unchanged)
|
||||
self.logger.info(f"Searching code with query: {query}")
|
||||
url = f"{self.base_url}/search/code"
|
||||
params = {
|
||||
"q": f"{query} repo:{self.repo}",
|
||||
"per_page": 10
|
||||
}
|
||||
response = requests.get(url, headers=self.headers, params=params)
|
||||
if response.status_code == 200:
|
||||
results = [{"file": item["path"], "url": item["html_url"]} for item in response.json()["items"]]
|
||||
self.logger.info(f"Successfully searched code. Found {len(results)} results.")
|
||||
return results
|
||||
else:
|
||||
error_message = f"Error searching code: {response.status_code}"
|
||||
self.logger.error(error_message)
|
||||
return error_message
|
||||
|
||||
@metrics.measure
|
||||
def _get_commit_history(self, file_path, num_commits):
|
||||
# ... (rest of the method remains unchanged)
|
||||
self.logger.info(f"Getting commit history for file: {file_path}, number of commits: {num_commits}")
|
||||
url = f"{self.base_url}/repos/{self.repo}/commits"
|
||||
params = {
|
||||
"path": file_path,
|
||||
"per_page": num_commits
|
||||
}
|
||||
response = requests.get(url, headers=self.headers, params=params)
|
||||
if response.status_code == 200:
|
||||
commits = [{"sha": commit["sha"], "message": commit["commit"]["message"], "date": commit["commit"]["author"]["date"]} for commit in response.json()]
|
||||
self.logger.info(f"Successfully retrieved commit history. Found {len(commits)} commits.")
|
||||
return commits
|
||||
else:
|
||||
error_message = f"Error getting commit history: {response.status_code}"
|
||||
self.logger.error(error_message)
|
||||
return error_message
|
||||
|
||||
@metrics.measure
|
||||
def _get_current_branch(self):
|
||||
# ... (rest of the method remains unchanged)
|
||||
self.logger.info(f"Getting current branch: {self.current_branch}")
|
||||
return self.current_branch
|
||||
|
||||
@metrics.measure
|
||||
def _set_current_branch(self, branch_name):
|
||||
# ... (rest of the method remains unchanged)
|
||||
self.logger.info(f"Setting current branch from {self.current_branch} to {branch_name}")
|
||||
self.current_branch = branch_name
|
||||
return f"Current branch set to: {self.current_branch}"
|
||||
|
||||
@metrics.measure
|
||||
def _get_file_at_commit(self, file_path, commit_sha):
|
||||
# ... (rest of the method remains unchanged)
|
||||
self.logger.info(f"Getting file: {file_path} at commit: {commit_sha}")
|
||||
url = f"{self.base_url}/repos/{self.repo}/contents/{file_path}"
|
||||
response = requests.get(url, headers=self.headers, params={"ref": commit_sha})
|
||||
if response.status_code == 200:
|
||||
content = response.json()["content"]
|
||||
decoded_content = base64.b64decode(content).decode('utf-8')
|
||||
self.logger.info(f"Successfully retrieved file at commit")
|
||||
return decoded_content
|
||||
else:
|
||||
error_message = f"Error reading file at commit: {response.status_code}"
|
||||
self.logger.error(error_message)
|
||||
return error_message
|
||||
|
||||
@metrics.measure
|
||||
def _list_branches(self, per_page=100, all_pages=True):
|
||||
# ... (rest of the method remains unchanged)
|
||||
self.logger.info(f"Listing branches. Per page: {per_page}, All pages: {all_pages}")
|
||||
url = f"{self.base_url}/repos/{self.repo}/branches"
|
||||
params = {"per_page": min(per_page, 100)} # GitHub API max is 100 per page
|
||||
all_branches = []
|
||||
|
||||
while url:
|
||||
self.logger.info(f"Fetching branches from: {url}")
|
||||
response = requests.get(url, headers=self.headers, params=params)
|
||||
if response.status_code != 200:
|
||||
error_message = f"Error listing branches: {response.status_code}"
|
||||
self.logger.error(error_message)
|
||||
return error_message
|
||||
|
||||
branches = [branch["name"] for branch in response.json()]
|
||||
all_branches.extend(branches)
|
||||
self.logger.info(f"Fetched {len(branches)} branches")
|
||||
|
||||
if not all_pages:
|
||||
break
|
||||
|
||||
# Check if there's a next page
|
||||
url = response.links.get('next', {}).get('url')
|
||||
if url:
|
||||
params = {} # Remove per_page for subsequent requests
|
||||
|
||||
self.logger.info(f"Successfully listed all branches. Total: {len(all_branches)}")
|
||||
return all_branches
|
||||
|
||||
@metrics.measure
|
||||
def _approve_pull_request(self, pull_number):
|
||||
# ... (rest of the method remains unchanged)
|
||||
self.logger.info(f"Approving pull request: {pull_number}")
|
||||
url = f"{self.base_url}/repos/{self.repo}/pulls/{pull_number}/reviews"
|
||||
data = {
|
||||
"event": "APPROVE"
|
||||
}
|
||||
response = requests.post(url, headers=self.headers, json=data)
|
||||
if response.status_code == 200:
|
||||
success_message = f"Pull request {pull_number} approved successfully"
|
||||
self.logger.info(success_message)
|
||||
return success_message
|
||||
else:
|
||||
error_message = f"Error approving pull request: {response.status_code}\nResponse: {response.text}"
|
||||
self.logger.error(error_message)
|
||||
return error_message
|
||||
|
||||
@metrics.measure
|
||||
def _close_pull_request(self, pull_number):
|
||||
# ... (rest of the method remains unchanged)
|
||||
self.logger.info(f"Closing pull request: {pull_number}")
|
||||
url = f"{self.base_url}/repos/{self.repo}/pulls/{pull_number}"
|
||||
data = {
|
||||
"state": "closed"
|
||||
}
|
||||
response = requests.patch(url, headers=self.headers, json=data)
|
||||
if response.status_code == 200:
|
||||
success_message = f"Pull request {pull_number} closed successfully"
|
||||
self.logger.info(success_message)
|
||||
return success_message
|
||||
else:
|
||||
error_message = f"Error closing pull request: {response.status_code}\nResponse: {response.text}"
|
||||
self.logger.error(error_message)
|
||||
return error_message
|
||||
|
||||
@metrics.measure
|
||||
def _merge_pull_request(self, pull_number, commit_title, commit_message, merge_method):
|
||||
# ... (rest of the method remains unchanged)
|
||||
self.logger.info(f"Merging pull request: {pull_number}")
|
||||
url = f"{self.base_url}/repos/{self.repo}/pulls/{pull_number}/merge"
|
||||
data = {
|
||||
"commit_title": commit_title,
|
||||
"commit_message": commit_message,
|
||||
"merge_method": merge_method
|
||||
}
|
||||
response = requests.put(url, headers=self.headers, json=data)
|
||||
if response.status_code == 200:
|
||||
success_message = f"Pull request {pull_number} merged successfully"
|
||||
self.logger.info(success_message)
|
||||
return success_message
|
||||
else:
|
||||
error_message = f"Error merging pull request: {response.status_code}\nResponse: {response.text}"
|
||||
self.logger.error(error_message)
|
||||
return error_message
|
||||
|
||||
@metrics.measure
|
||||
def _delete_branch(self, branch_name):
|
||||
# ... (rest of the method remains unchanged)
|
||||
self.logger.info(f"Deleting branch: {branch_name}")
|
||||
url = f"{self.base_url}/repos/{self.repo}/git/refs/heads/{branch_name}"
|
||||
response = requests.delete(url, headers=self.headers)
|
||||
if response.status_code == 204:
|
||||
success_message = f"Branch {branch_name} deleted successfully"
|
||||
self.logger.info(success_message)
|
||||
return success_message
|
||||
else:
|
||||
error_message = f"Error deleting branch: {response.status_code}\nResponse: {response.text}"
|
||||
self.logger.error(error_message)
|
||||
return error_message
|
||||
|
||||
Reference in New Issue
Block a user