Revert github tool refactoring

This commit is contained in:
2024-08-20 12:24:33 -05:00
parent 6c465d4afc
commit 819445ec35
21 changed files with 766 additions and 1307 deletions
+764 -35
View File
@@ -1,49 +1,778 @@
# tools/github_tool.py
from .base_tool import BaseTool
from .metrics import metrics
import requests
import os import os
import json import base64
import logging import logging
logging.basicConfig(level=logging.DEBUG) class GitHubTool(BaseTool):
logger = logging.getLogger(__name__)
class GitHubTool:
def __init__(self): def __init__(self):
self.base_url = "https://api.github.com" self.base_url = "https://api.github.com"
self.token = os.environ.get("GITHUB_TOKEN") self.token = os.environ.get("GITHUB_TOKEN")
self.headers = {
"Authorization": f"token {self.token}",
"Accept": "application/vnd.github.v3+json"
}
self.repo = os.environ.get("GITHUB_REPOSITORY") self.repo = os.environ.get("GITHUB_REPOSITORY")
self.current_branch = "main" # Default to main branch self.current_branch = "main" # Default to main branch
self.instances = {} # Set up logging
self.functions = [] self.logger = logging.getLogger(__name__)
logger.debug(f"Initialized GitHubTool with repo: {self.repo}") self.logger.setLevel(logging.INFO)
self._load_functions()
def _load_functions(self): # Create a file handler
logger.debug("Starting to load functions") file_handler = logging.FileHandler('github_tool.log')
function_dir = os.path.join(os.path.dirname(__file__), "github_tool_functions") file_handler.setLevel(logging.INFO)
for filename in os.listdir(function_dir):
if filename.endswith(".py") and filename != "__init__.py":
logger.debug(f"Processing file: {filename}")
function_name = filename[:-3]
module = __import__(f"tools.github_tool_functions.{function_name}", fromlist=[function_name.capitalize()])
class_name = getattr(module, function_name.capitalize())
self.instances[function_name] = class_name(self.base_url, self.token, self.repo, self.current_branch)
with open(os.path.join(function_dir, filename), 'r') as f: # Create a console handler
content = f.read() console_handler = logging.StreamHandler()
json_start = content.find('{') console_handler.setLevel(logging.INFO)
json_end = content.rfind('}') + 1
if json_start != -1 and json_end != -1:
json_def = json.loads(content[json_start:json_end])
self.functions.append(json_def)
logger.debug(f"Loaded function: {json_def['name']}")
logger.debug(f"Loaded {len(self.functions)} functions")
def execute(self, function_name, **kwargs): # Create a formatting for the logs
if function_name in self.instances: formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
return self.instances[function_name](**kwargs) file_handler.setFormatter(formatter)
else: console_handler.setFormatter(formatter)
error_message = f"Unknown function: {function_name}"
return {"error": error_message} # Add the handlers to the logger
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
def clear(self):
if (self.current_branch != "main"):
self._set_current_branch("main")
pass
def get_functions(self): def get_functions(self):
return self.functions return [
{
"name": "read_file",
"description": "Read a file from the repository",
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the file in the repository"
}
},
"required": ["path"]
}
},
{
"name": "create_branch",
"description": "Create a new branch in the repository",
"parameters": {
"type": "object",
"properties": {
"branch_name": {
"type": "string",
"description": "Name of the new branch"
},
"base_branch": {
"type": "string",
"description": "Name of the base branch",
"default": "main"
}
},
"required": ["branch_name"]
}
},
{
"name": "commit_file",
"description": "Commit a file to a branch (not main)",
"parameters": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Path to the file in the repository"
},
"commit_message": {
"type": "string",
"description": "Commit message"
},
"content": {
"type": "string",
"description": "Content of the file"
}
},
"required": ["file_path", "commit_message", "content"]
}
},
{
"name": "create_pull_request",
"description": "Create a pull request",
"parameters": {
"type": "object",
"properties": {
"title": {
"type": "string",
"description": "Title of the pull request"
},
"body": {
"type": "string",
"description": "Body of the pull request"
},
"base": {
"type": "string",
"description": "The name of the branch you want the changes pulled into",
"default": "main"
}
},
"required": ["title", "body"]
}
},
{
"name": "list_files",
"description": "List files in a directory of the repository",
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the directory in the repository"
}
},
"required": ["path"]
}
},
{
"name": "search_code",
"description": "Search for code in the repository",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query"
}
},
"required": ["query"]
}
},
{
"name": "get_commit_history",
"description": "Get commit history for a file",
"parameters": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Path to the file in the repository"
},
"num_commits": {
"type": "integer",
"description": "Number of commits to retrieve",
"default": 10
}
},
"required": ["file_path"]
}
},
{
"name": "get_branch_sha",
"description": "Get the SHA of the latest commit on a branch",
"parameters": {
"type": "object",
"properties": {
"branch": {
"type": "string",
"description": "Name of the branch"
}
},
"required": ["branch"]
}
},
{
"name": "get_current_branch",
"description": "Get the name of the current branch",
"parameters": {}
},
{
"name": "set_current_branch",
"description": "Set the current branch",
"parameters": {
"type": "object",
"properties": {
"branch_name": {
"type": "string",
"description": "Name of the branch to set as current"
}
},
"required": ["branch_name"]
}
},
{
"name": "get_file_at_commit",
"description": "Get the contents of a file at a specific commit",
"parameters": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Path to the file in the repository"
},
"commit_sha": {
"type": "string",
"description": "SHA of the commit to retrieve the file from"
}
},
"required": ["file_path", "commit_sha"]
}
},
{
"name": "list_branches",
"description": "List all branches in the repository",
"parameters": {
"type": "object",
"properties": {
"per_page": {
"type": "integer",
"description": "Number of branches to return per page (max 100)",
"default": 100
},
"all_pages": {
"type": "boolean",
"description": "Whether to fetch all pages of results",
"default": True
}
}
}
},
{
"name": "approve_pull_request",
"description": "Approve a pull request",
"parameters": {
"type": "object",
"properties": {
"pull_number": {
"type": "integer",
"description": "The number of the pull request"
}
},
"required": ["pull_number"]
}
},
{
"name": "close_pull_request",
"description": "Close a pull request",
"parameters": {
"type": "object",
"properties": {
"pull_number": {
"type": "integer",
"description": "The number of the pull request"
}
},
"required": ["pull_number"]
}
},
{
"name": "merge_pull_request",
"description": "Merge a pull request",
"parameters": {
"type": "object",
"properties": {
"pull_number": {
"type": "integer",
"description": "The number of the pull request"
},
"commit_title": {
"type": "string",
"description": "Title for the automatic commit message",
"default": "Merge pull request"
},
"commit_message": {
"type": "string",
"description": "Extra detail to append to automatic commit message",
"default": ""
},
"merge_method": {
"type": "string",
"description": "Merge method to use",
"enum": ["merge", "squash", "rebase"],
"default": "merge"
}
},
"required": ["pull_number"]
}
},
{
"name": "delete_branch",
"description": "Delete a branch",
"parameters": {
"type": "object",
"properties": {
"branch_name": {
"type": "string",
"description": "Name of the branch to delete"
}
},
"required": ["branch_name"]
}
},
{
"name": "get_issue_details",
"description": "Get details of a specific issue",
"parameters": {
"type": "object",
"properties": {
"issue_number": {
"type": "integer",
"description": "The number of the issue"
}
},
"required": ["issue_number"]
}
},
{
"name": "create_issue",
"description": "Create a new issue in the repository",
"parameters": {
"type": "object",
"properties": {
"title": {
"type": "string",
"description": "Title of the issue"
},
"body": {
"type": "string",
"description": "Body of the issue"
},
"labels": {
"type": "array",
"items": {
"type": "string"
},
"description": "Labels to apply to the issue"
}
},
"required": ["title", "body"]
}
},
{
"name": "list_issues",
"description": "List issues in the repository",
"parameters": {
"type": "object",
"properties": {
"state": {
"type": "string",
"enum": ["open", "closed", "all"],
"default": "open",
"description": "State of the issues to retrieve"
},
"per_page": {
"type": "integer",
"default": 30,
"description": "Number of issues to return per page"
},
"page": {
"type": "integer",
"default": 1,
"description": "Page number of the results to fetch"
}
}
}
}
]
@metrics.measure
def execute(self, function_name, **kwargs):
self.logger.info(f"Executing: {function_name}")
if function_name == "read_file":
return self._read_file(kwargs["path"])
elif function_name == "create_branch":
return self._create_branch(kwargs["branch_name"], kwargs.get("base_branch", "main"))
elif function_name == "commit_file":
return self._commit_file(kwargs["file_path"], kwargs["content"], kwargs["commit_message"])
elif function_name == "create_pull_request":
return self._create_pull_request(kwargs["title"], kwargs["body"], kwargs.get("base", "main"))
elif function_name == "list_files":
return self._list_files(kwargs["path"])
elif function_name == "search_code":
return self._search_code(kwargs["query"])
elif function_name == "get_commit_history":
return self._get_commit_history(kwargs["file_path"], kwargs.get("num_commits", 10))
elif function_name == "get_current_branch":
return self._get_current_branch()
elif function_name == "set_current_branch":
return self._set_current_branch(kwargs["branch_name"])
elif function_name == "get_file_at_commit":
return self._get_file_at_commit(kwargs["file_path"], kwargs["commit_sha"])
elif function_name == "list_branches":
return self._list_branches(kwargs.get("per_page", 100), kwargs.get("all_pages", True))
elif function_name == "get_branch_sha":
return self._get_branch_sha(kwargs["branch"])
elif function_name == "approve_pull_request":
return self._approve_pull_request(kwargs["pull_number"])
elif function_name == "close_pull_request":
return self._close_pull_request(kwargs["pull_number"])
elif function_name == "merge_pull_request":
return self._merge_pull_request(kwargs["pull_number"], kwargs.get("commit_title", "Merge pull request"),
kwargs.get("commit_message", ""), kwargs.get("merge_method", "merge"))
elif function_name == "delete_branch":
return self._delete_branch(kwargs["branch_name"])
elif function_name == "get_issue_details":
return self._get_issue_details(kwargs["issue_number"])
elif function_name == "create_issue":
return self._create_issue(kwargs["title"], kwargs["body"], kwargs.get("labels", []))
elif function_name == "list_issues":
return self._list_issues(kwargs.get("state", "open"), kwargs.get("per_page", 30), kwargs.get("page", 1))
else:
error_message = f"Unknown function: {function_name}"
self.logger.error(error_message)
return error_message
@metrics.measure
def _read_file(self, path):
self.logger.info(f"Reading file: {path} from branch: {self.current_branch}")
url = f"{self.base_url}/repos/{self.repo}/contents/{path}"
response = requests.get(url, headers=self.headers, params={"ref": self.current_branch})
if response.status_code == 200:
content = response.json()["content"]
decoded_content = base64.b64decode(content).decode('utf-8')
self.logger.info(f"Successfully read file: {path}")
return decoded_content
else:
error_message = f"Error reading file: {response.status_code}"
self.logger.error(error_message)
return error_message
@metrics.measure
def _create_branch(self, branch_name, base_branch):
self.logger.info(f"Creating branch: {branch_name} from base: {base_branch}")
url = f"{self.base_url}/repos/{self.repo}/git/refs"
response = requests.get(f"{url}/heads/{base_branch}", headers=self.headers)
if response.status_code != 200:
error_message = f"Error getting base branch: {response.status_code}"
self.logger.error(error_message)
return error_message
sha = response.json()["object"]["sha"]
data = {
"ref": f"refs/heads/{branch_name}",
"sha": sha
}
response = requests.post(url, headers=self.headers, json=data)
if response.status_code == 201:
self.current_branch = branch_name
success_message = f"Branch '{branch_name}' created successfully and set as current branch"
self.logger.info(success_message)
return success_message
else:
error_message = f"Error creating branch: {response.status_code}"
self.logger.error(error_message)
return error_message
@metrics.measure
def _commit_file(self, file_path, content, commit_message):
self.logger.info(f"Committing file: {file_path} to branch: {self.current_branch}")
if self.current_branch == "main":
error_message = "Cannot commit directly to main branch"
self.logger.error(error_message)
return error_message
url = f"{self.base_url}/repos/{self.repo}/contents/{file_path}"
self.logger.info("Checking if file already exists")
response = requests.get(url, headers=self.headers, params={"ref": self.current_branch})
data = {
"message": commit_message,
"content": base64.b64encode(content.encode()).decode(),
"branch": self.current_branch
}
if response.status_code == 200:
self.logger.info("File exists, updating")
file_sha = response.json()["sha"]
data["sha"] = file_sha
else:
self.logger.info("File does not exist, creating new file")
response = requests.put(url, headers=self.headers, json=data)
if response.status_code in [200, 201]:
success_message = f"File committed successfully to branch '{self.current_branch}'"
self.logger.info(success_message)
return success_message
else:
error_message = f"Error committing file: {response.status_code}\nResponse: {response.text}"
self.logger.error(error_message)
return error_message
@metrics.measure
def _create_pull_request(self, title, body, base):
self.logger.info(f"Creating pull request: {title} from {self.current_branch} to {base}")
url = f"{self.base_url}/repos/{self.repo}/pulls"
data = {
"title": title,
"body": body,
"head": self.current_branch,
"base": base
}
response = requests.post(url, headers=self.headers, json=data)
if response.status_code == 201:
success_message = f"Pull request created successfully: {response.json()['html_url']}"
self.logger.info(success_message)
return success_message
else:
error_message = f"Error creating pull request: {response.status_code}\nResponse: {response.text}"
self.logger.error(error_message)
return error_message
@metrics.measure
def _get_branch_sha(self, branch):
url = f"{self.base_url}/repos/{self.repo}/git/refs/heads/{branch}"
response = requests.get(url, headers=self.headers)
if response.status_code == 200:
return response.json()["object"]["sha"]
else:
return f"Error getting branch SHA: {response.status_code}"
@metrics.measure
def _list_files(self, path):
self.logger.info(f"Listing files in: {path} on branch: {self.current_branch}")
url = f"{self.base_url}/repos/{self.repo}/contents/{path}"
response = requests.get(url, headers=self.headers, params={"ref": self.current_branch})
if response.status_code == 200:
files = [item["name"] for item in response.json() if item["type"] == "file"]
directories = [item["name"] for item in response.json() if item["type"] == "dir"]
self.logger.info(f"Successfully listed files and directories in {path}")
return {"files": files, "directories": directories}
else:
error_message = f"Error listing files: {response.status_code}"
self.logger.error(error_message)
return error_message
@metrics.measure
def _search_code(self, query):
self.logger.info(f"Searching code with query: {query}")
url = f"{self.base_url}/search/code"
params = {
"q": f"{query} repo:{self.repo}",
"per_page": 10
}
response = requests.get(url, headers=self.headers, params=params)
if response.status_code == 200:
results = [{"file": item["path"], "url": item["html_url"]} for item in response.json()["items"]]
self.logger.info(f"Successfully searched code. Found {len(results)} results.")
return results
else:
error_message = f"Error searching code: {response.status_code}"
self.logger.error(error_message)
return error_message
@metrics.measure
def _get_commit_history(self, file_path, num_commits):
self.logger.info(f"Getting commit history for file: {file_path}, number of commits: {num_commits}")
url = f"{self.base_url}/repos/{self.repo}/commits"
params = {
"path": file_path,
"per_page": num_commits
}
response = requests.get(url, headers=self.headers, params=params)
if response.status_code == 200:
commits = [{"sha": commit["sha"], "message": commit["commit"]["message"], "date": commit["commit"]["author"]["date"]} for commit in response.json()]
self.logger.info(f"Successfully retrieved commit history. Found {len(commits)} commits.")
return commits
else:
error_message = f"Error getting commit history: {response.status_code}"
self.logger.error(error_message)
return error_message
@metrics.measure
def _get_current_branch(self):
self.logger.info(f"Getting current branch: {self.current_branch}")
return self.current_branch
@metrics.measure
def _set_current_branch(self, branch_name):
self.logger.info(f"Setting current branch from {self.current_branch} to {branch_name}")
self.current_branch = branch_name
return f"Current branch set to: {self.current_branch}"
@metrics.measure
def _get_file_at_commit(self, file_path, commit_sha):
self.logger.info(f"Getting file: {file_path} at commit: {commit_sha}")
url = f"{self.base_url}/repos/{self.repo}/contents/{file_path}"
response = requests.get(url, headers=self.headers, params={"ref": commit_sha})
if response.status_code == 200:
content = response.json()["content"]
decoded_content = base64.b64decode(content).decode('utf-8')
self.logger.info(f"Successfully retrieved file at commit")
return decoded_content
else:
error_message = f"Error reading file at commit: {response.status_code}"
self.logger.error(error_message)
return error_message
@metrics.measure
def _list_branches(self, per_page=100, all_pages=True):
self.logger.info(f"Listing branches. Per page: {per_page}, All pages: {all_pages}")
url = f"{self.base_url}/repos/{self.repo}/branches"
params = {"per_page": min(per_page, 100)} # GitHub API max is 100 per page
all_branches = []
while url:
self.logger.info(f"Fetching branches from: {url}")
response = requests.get(url, headers=self.headers, params=params)
if response.status_code != 200:
error_message = f"Error listing branches: {response.status_code}"
self.logger.error(error_message)
return error_message
branches = [branch["name"] for branch in response.json()]
all_branches.extend(branches)
self.logger.info(f"Fetched {len(branches)} branches")
if not all_pages:
break
# Check if there's a next page
url = response.links.get('next', {}).get('url')
if url:
params = {} # Remove per_page for subsequent requests
self.logger.info(f"Successfully listed all branches. Total: {len(all_branches)}")
return all_branches
@metrics.measure
def _approve_pull_request(self, pull_number):
self.logger.info(f"Approving pull request: {pull_number}")
url = f"{self.base_url}/repos/{self.repo}/pulls/{pull_number}/reviews"
data = {
"event": "APPROVE"
}
response = requests.post(url, headers=self.headers, json=data)
if response.status_code == 200:
success_message = f"Pull request {pull_number} approved successfully"
self.logger.info(success_message)
return success_message
else:
error_message = f"Error approving pull request: {response.status_code}\nResponse: {response.text}"
self.logger.error(error_message)
return error_message
@metrics.measure
def _close_pull_request(self, pull_number):
self.logger.info(f"Closing pull request: {pull_number}")
url = f"{self.base_url}/repos/{self.repo}/pulls/{pull_number}"
data = {
"state": "closed"
}
response = requests.patch(url, headers=self.headers, json=data)
if response.status_code == 200:
success_message = f"Pull request {pull_number} closed successfully"
self.logger.info(success_message)
return success_message
else:
error_message = f"Error closing pull request: {response.status_code}\nResponse: {response.text}"
self.logger.error(error_message)
return error_message
@metrics.measure
def _merge_pull_request(self, pull_number, commit_title, commit_message, merge_method):
self.logger.info(f"Merging pull request: {pull_number}")
url = f"{self.base_url}/repos/{self.repo}/pulls/{pull_number}/merge"
data = {
"commit_title": commit_title,
"commit_message": commit_message,
"merge_method": merge_method
}
response = requests.put(url, headers=self.headers, json=data)
if response.status_code == 200:
success_message = f"Pull request {pull_number} merged successfully"
self.logger.info(success_message)
return success_message
else:
error_message = f"Error merging pull request: {response.status_code}\nResponse: {response.text}"
self.logger.error(error_message)
return error_message
@metrics.measure
def _delete_branch(self, branch_name):
self.logger.info(f"Deleting branch: {branch_name}")
url = f"{self.base_url}/repos/{self.repo}/git/refs/heads/{branch_name}"
response = requests.delete(url, headers=self.headers)
if response.status_code == 204:
success_message = f"Branch {branch_name} deleted successfully"
self.logger.info(success_message)
return success_message
else:
error_message = f"Error deleting branch: {response.status_code}\nResponse: {response.text}"
self.logger.error(error_message)
return error_message
@metrics.measure
def _get_issue_details(self, issue_number):
self.logger.info(f"Getting details for issue: {issue_number}")
url = f"{self.base_url}/repos/{self.repo}/issues/{issue_number}"
response = requests.get(url, headers=self.headers)
if response.status_code == 200:
issue_data = response.json()
issue_details = {
"number": issue_data["number"],
"title": issue_data["title"],
"state": issue_data["state"],
"body": issue_data["body"],
"created_at": issue_data["created_at"],
"updated_at": issue_data["updated_at"],
"labels": [label["name"] for label in issue_data["labels"]],
"assignees": [assignee["login"] for assignee in issue_data["assignees"]],
"comments": issue_data["comments"]
}
self.logger.info(f"Successfully retrieved details for issue {issue_number}")
return issue_details
else:
error_message = f"Error getting issue details: {response.status_code}\nResponse: {response.text}"
self.logger.error(error_message)
return error_message
@metrics.measure
def _create_issue(self, title, body, labels=None):
self.logger.info(f"Creating issue: {title}")
url = f"{self.base_url}/repos/{self.repo}/issues"
data = {
"title": title,
"body": body
}
if labels:
data["labels"] = labels
response = requests.post(url, headers=self.headers, json=data)
if response.status_code == 201:
issue = response.json()
success_message = f"Issue created successfully: {issue['html_url']}"
self.logger.info(success_message)
return success_message
else:
error_message = f"Error creating issue: {response.status_code}\nResponse: {response.text}"
self.logger.error(error_message)
return error_message
@metrics.measure
def _list_issues(self, state="open", per_page=30, page=1):
self.logger.info(f"Listing issues. State: {state}, Per page: {per_page}, Page: {page}")
url = f"{self.base_url}/repos/{self.repo}/issues"
params = {
"state": state,
"per_page": per_page,
"page": page
}
response = requests.get(url, headers=self.headers, params=params)
if response.status_code == 200:
issues = [{
"number": issue["number"],
"title": issue["title"],
"state": issue["state"],
"created_at": issue["created_at"],
"url": issue["html_url"]
} for issue in response.json()]
self.logger.info(f"Successfully listed issues. Found {len(issues)} issues.")
return issues
else:
error_message = f"Error listing issues: {response.status_code}\nResponse: {response.text}"
self.logger.error(error_message)
return error_message
@@ -1,64 +0,0 @@
import requests
import logging
class ApprovePullRequest:
def __init__(self, base_url, token, repo):
self.base_url = base_url
self.headers = {
"Authorization": f"token {token}",
"Accept": "application/vnd.github.v3+json"
}
self.repo = repo
# Set up logging
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.INFO)
# Create a file handler
file_handler = logging.FileHandler('approve_pull_request.log')
file_handler.setLevel(logging.INFO)
# Create a console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# Create a formatting for the logs
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# Add the handlers to the logger
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
def __call__(self, pull_number):
self.logger.info(f"Approving pull request: {pull_number}")
url = f"{self.base_url}/repos/{self.repo}/pulls/{pull_number}/reviews"
data = {
"event": "APPROVE"
}
response = requests.post(url, headers=self.headers, json=data)
if response.status_code == 200:
success_message = f"Pull request {pull_number} approved successfully"
self.logger.info(success_message)
return success_message
else:
error_message = f"Error approving pull request: {response.status_code}\nResponse: {response.text}"
self.logger.error(error_message)
return error_message
# JSON definition for the approve_pull_request function
approve_pull_request_definition = {
"name": "approve_pull_request",
"description": "Approve a pull request",
"parameters": {
"type": "object",
"properties": {
"pull_number": {
"type": "integer",
"description": "The number of the pull request"
}
},
"required": ["pull_number"]
}
}
@@ -1,64 +0,0 @@
import requests
import logging
class ClosePullRequest:
def __init__(self, base_url, token, repo):
self.base_url = base_url
self.headers = {
"Authorization": f"token {token}",
"Accept": "application/vnd.github.v3+json"
}
self.repo = repo
# Set up logging
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.INFO)
# Create a file handler
file_handler = logging.FileHandler('close_pull_request.log')
file_handler.setLevel(logging.INFO)
# Create a console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# Create a formatting for the logs
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levellevel)s - %(message)s')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# Add the handlers to the logger
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
def __call__(self, pull_number):
self.logger.info(f"Closing pull request: {pull_number}")
url = f"{self.base_url}/repos/{self.repo}/pulls/{pull_number}"
data = {
"state": "closed"
}
response = requests.patch(url, headers=this.headers, json=data)
if response.status_code == 200:
success_message = f"Pull request {pull_number} closed successfully"
self.logger.info(success_message)
return success_message
else:
error_message = f"Error closing pull request: {response.status_code}\nResponse: {response.text}"
self.logger.error(error_message)
return error_message
# JSON definition for the close_pull_request function
close_pull_request_definition = {
"name": "close_pull_request",
"description": "Close a pull request",
"parameters": {
"type": "object",
"properties": {
"pull_number": {
"type": "integer",
"description": "The number of the pull request"
}
},
"required": ["pull_number"]
}
}
@@ -1,94 +0,0 @@
import base64
import requests
import logging
class CommitFile:
def __init__(self, base_url, token, repo, current_branch):
self.base_url = base_url
self.headers = {
"Authorization": f"token {token}",
"Accept": "application/vnd.github.v3+json"
}
self.repo = repo
self.current_branch = current_branch
# Set up logging
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.INFO)
# Create a file handler
file_handler = logging.FileHandler('commit_file.log')
file_handler.setLevel(logging.INFO)
# Create a console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# Create a formatting for the logs
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# Add the handlers to the logger
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
def __call__(self, file_path, content, commit_message):
self.logger.info(f"Committing file: {file_path} to branch: {self.current_branch}")
if self.current_branch == "main":
error_message = "Cannot commit directly to main branch"
self.logger.error(error_message)
return error_message
url = f"{self.base_url}/repos/{self.repo}/contents/{file_path}"
self.logger.info("Checking if file already exists")
response = requests.get(url, headers=self.headers, params={"ref": self.current_branch})
data = {
"message": commit_message,
"content": base64.b64encode(content.encode()).decode(),
"branch": self.current_branch
}
if response.status_code == 200:
self.logger.info("File exists, updating")
file_sha = response.json()["sha"]
data["sha"] = file_sha
else:
self.logger.info("File does not exist, creating new file")
response = requests.put(url, headers=self.headers, json=data)
if response.status_code in [200, 201]:
success_message = f"File committed successfully to branch '{self.current_branch}'"
self.logger.info(success_message)
return success_message
else:
error_message = f"Error committing file: {response.status_code}\nResponse: {response.text}"
self.logger.error(error_message)
return error_message
# JSON definition for the commit_file function
commit_file_definition = {
"name": "commit_file",
"description": "Commit a file to a branch (not main)",
"parameters": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Path to the file in the repository"
},
"commit_message": {
"type": "string",
"description": "Commit message"
},
"content": {
"type": "string",
"description": "Content of the file"
}
},
"required": ["file_path", "commit_message", "content"]
}
}
@@ -1,58 +0,0 @@
import requests
import logging
class CreateBranch:
def __init__(self, base_url, token, repo, current_branch):
self.base_url = base_url
self.headers = {
"Authorization": f"token {token}",
"Accept": "application/vnd.github.v3+json"
}
self.repo = repo
self.current_branch = current_branch
# Set up logging
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.INFO)
# Create a file handler
file_handler = logging.FileHandler('create_branch.log')
file_handler.setLevel(logging.INFO)
# Create a console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# Create a formatting for the logs
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# Add the handlers to the logger
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
def __call__(self, branch_name, base_branch="main"):
self.logger.info(f"Creating branch: {branch_name} from base: {base_branch}")
url = f"{self.base_url}/repos/{self.repo}/git/refs"
response = requests.get(f"{url}/heads/{base_branch}", headers=self.headers)
if response.status_code != 200:
error_message = f"Error getting base branch: {response.status_code}"
self.logger.error(error_message)
return error_message
sha = response.json()["object"]["sha"]
data = {
"ref": f"refs/heads/{branch_name}",
"sha": sha
}
response = requests.post(url, headers=self.headers, json=data)
if response.status_code == 201:
self.current_branch = branch_name
success_message = f"Branch '{branch_name}' created successfully and set as current branch"
self.logger.info(success_message)
return success_message
else:
error_message = f"Error creating branch: {response.status_code}"
self.logger.error(error_message)
return error_message
@@ -1,79 +0,0 @@
import requests
import logging
class CreateIssue:
def __init__(self, base_url, token, repo):
self.base_url = base_url
self.headers = {
"Authorization": f"token {token}",
"Accept": "application/vnd.github.v3+json"
}
self.repo = repo
# Set up logging
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.INFO)
# Create a file handler
file_handler = logging.FileHandler('create_issue.log')
file_handler.setLevel(logging.INFO)
# Create a console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# Create a formatting for the logs
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levellevel)s - %(message)s')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# Add the handlers to the logger
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
def __call__(self, title, body, labels=None):
self.logger.info(f"Creating issue: {title}")
url = f"{self.base_url}/repos/{self.repo}/issues"
data = {
"title": title,
"body": body
}
if labels:
data["labels"] = labels
response = requests.post(url, headers=self.headers, json=data)
if response.status_code == 201:
issue = response.json()
success_message = f"Issue created successfully: {issue['html_url']}"
self.logger.info(success_message)
return success_message
else:
error_message = f"Error creating issue: {response.status_code}\nResponse: {response.text}"
self.logger.error(error_message)
return error_message
# JSON definition for the create_issue function
create_issue_definition = {
"name": "create_issue",
"description": "Create a new issue in the repository",
"parameters": {
"type": "object",
"properties": {
"title": {
"type": "string",
"description": "Title of the issue"
},
"body": {
"type": "string",
"description": "Body of the issue"
},
"labels": {
"type": "array",
"items": {
"type": "string"
},
"description": "Labels to apply to the issue"
}
},
"required": ["title", "body"]
}
}
@@ -1,77 +0,0 @@
import requests
import logging
class CreatePullRequest:
def __init__(self, base_url, token, repo, current_branch):
self.base_url = base_url
self.headers = {
"Authorization": f"token {token}",
"Accept": "application/vnd.github.v3+json"
}
self.repo = repo
self.current_branch = current_branch
# Set up logging
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.INFO)
# Create a file handler
file_handler = logging.FileHandler('create_pull_request.log')
file_handler.setLevel(logging.INFO)
# Create a console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# Create a formatting for the logs
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# Add the handlers to the logger
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
def __call__(self, title, body, base="main"):
self.logger.info(f"Creating pull request: {title} from {self.current_branch} to {base}")
url = f"{self.base_url}/repos/{self.repo}/pulls"
data = {
"title": title,
"body": body,
"head": self.current_branch,
"base": base
}
response = requests.post(url, headers=self.headers, json=data)
if response.status_code == 201:
success_message = f"Pull request created successfully: {response.json()['html_url']}"
self.logger.info(success_message)
return success_message
else:
error_message = f"Error creating pull request: {response.status_code}\nResponse: {response.text}"
self.logger.error(error_message)
return error_message
# JSON definition for the create_pull_request function
create_pull_request_definition = {
"name": "create_pull_request",
"description": "Create a pull request",
"parameters": {
"type": "object",
"properties": {
"title": {
"type": "string",
"description": "Title of the pull request"
},
"body": {
"type": "string",
"description": "Body of the pull request"
},
"base": {
"type": "string",
"description": "The name of the branch you want the changes pulled into",
"default": "main"
}
},
"required": ["title", "body"]
}
}
@@ -1,61 +0,0 @@
import requests
import logging
class DeleteBranch:
def __init__(self, base_url, token, repo):
self.base_url = base_url
self.headers = {
"Authorization": f"token {token}",
"Accept": "application/vnd.github.v3+json"
}
self.repo = repo
# Set up logging
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.INFO)
# Create a file handler
file_handler = logging.FileHandler('delete_branch.log')
file_handler.setLevel(logging.INFO)
# Create a console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# Create a formatting for the logs
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levellevel)s - %(message)s')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# Add the handlers to the logger
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
def __call__(self, branch_name):
self.logger.info(f"Deleting branch: {branch_name}")
url = f"{self.base_url}/repos/{self.repo}/git/refs/heads/{branch_name}"
response = requests.delete(url, headers=self.headers)
if response.status_code == 204:
success_message = f"Branch {branch_name} deleted successfully"
self.logger.info(success_message)
return success_message
else:
error_message = f"Error deleting branch: {response.status_code}\nResponse: {response.text}"
self.logger.error(error_message)
return error_message
# JSON definition for the delete_branch function
delete_branch_definition = {
"name": "delete_branch",
"description": "Delete a branch",
"parameters": {
"type": "object",
"properties": {
"branch_name": {
"type": "string",
"description": "Name of the branch to delete"
}
},
"required": ["branch_name"]
}
}
@@ -1,56 +0,0 @@
import requests
import logging
class GetBranchSHA:
def __init__(self, base_url, token, repo):
self.base_url = base_url
self.headers = {
"Authorization": f"token {token}",
"Accept": "application/vnd.github.v3+json"
}
self.repo = repo
# Set up logging
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.INFO)
# Create a file handler
file_handler = logging.FileHandler('get_branch_sha.log')
file_handler.setLevel(logging.INFO)
# Create a console handler
console_handler = logging.StreamHandler()
console_handler.setLevel=logging.INFO)
# Create a formatting for the logs
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# Add the handlers to the logger
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
def __call__(self, branch):
url = f"{self.base_url}/repos/{self.repo}/git/refs/heads/{branch}"
response = requests.get(url, headers=self.headers)
if response.status_code == 200:
return response.json()["object"]["sha"]
else:
return f"Error getting branch SHA: {response.status_code}"
# JSON definition for the get_branch_sha function
get_branch_sha_definition = {
"name": "get_branch_sha",
"description": "Get the SHA of the latest commit on a branch",
"parameters": {
"type": "object",
"properties": {
"branch": {
"type": "string",
"description": "Name of the branch"
}
},
"required": ["branch"]
}
}
@@ -1,70 +0,0 @@
import requests
import logging
class GetCommitHistory:
def __init__(self, base_url, token, repo):
self.base_url = base_url
self.headers = {
"Authorization": f"token {token}",
"Accept": "application/vnd.github.v3+json"
}
self.repo = repo
# Set up logging
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.INFO)
# Create a file handler
file_handler = logging.FileHandler('get_commit_history.log')
file_handler.setLevel(logging.INFO)
# Create a console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# Create a formatting for the logs
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# Add the handlers to the logger
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
def __call__(self, file_path, num_commits=10):
self.logger.info(f"Getting commit history for file: {file_path}, number of commits: {num_commits}")
url = f"{self.base_url}/repos/{self.repo}/commits"
params = {
"path": file_path,
"per_page": num_commits
}
response = requests.get(url, headers=self.headers, params=params)
if response.status_code == 200:
commits = [{"sha": commit["sha"], "message": commit["commit"]["message"], "date": commit["commit"]["author"]["date"]} for commit in response.json()]
self.logger.info(f"Successfully retrieved commit history. Found {len(commits)} commits.")
return commits
else:
error_message = f"Error getting commit history: {response.status_code}"
self.logger.error(error_message)
return error_message
# JSON definition for the get_commit_history function
get_commit_history_definition = {
"name": "get_commit_history",
"description": "Get commit history for a file",
"parameters": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Path to the file in the repository"
},
"num_commits": {
"type": "integer",
"description": "Number of commits to retrieve",
"default": 10
}
},
"required": ["file_path"]
}
}
@@ -1,37 +0,0 @@
import logging
class GetCurrentBranch:
def __init__(self, current_branch):
self.current_branch = current_branch
# Set up logging
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.INFO)
# Create a file handler
file_handler = logging.FileHandler('get_current_branch.log')
file_handler.setLevel(logging.INFO)
# Create a console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# Create a formatting for the logs
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# Add the handlers to the logger
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
def __call__(self):
self.logger.info(f"Getting current branch: {self.current_branch}")
return self.current_branch
# JSON definition for the get_current_branch function
get_current_branch_definition = {
"name": "get_current_branch",
"description": "Get the name of the current branch",
"parameters": {}
}
@@ -1,67 +0,0 @@
import base64
import requests
import logging
class GetFileAtCommit:
def __init__(self, base_url, token, repo):
self.base_url = base_url
self.headers = {
"Authorization": f"token {token}",
"Accept": "application/vnd.github.v3+json"
}
self.repo = repo
# Set up logging
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.INFO)
# Create a file handler
file_handler = logging.FileHandler('get_file_at_commit.log')
file_handler.setLevel(logging.INFO)
# Create a console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# Create a formatting for the logs
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# Add the handlers to the logger
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
def __call__(self, file_path, commit_sha):
self.logger.info(f"Getting file: {file_path} at commit: {commit_sha}")
url = f"{self.base_url}/repos/{self.repo}/contents/{file_path}"
response = requests.get(url, headers=self.headers, params={"ref": commit_sha})
if response.status_code == 200:
content = response.json()["content"]
decoded_content = base64.b64decode(content).decode('utf-8')
self.logger.info(f"Successfully retrieved file at commit")
return decoded_content
else:
error_message = f"Error reading file at commit: {response.status_code}"
self.logger.error(error_message)
return error_message
# JSON definition for the get_file_at_commit function
get_file_at_commit_definition = {
"name": "get_file_at_commit",
"description": "Get the contents of a file at a specific commit",
"parameters": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Path to the file in the repository"
},
"commit_sha": {
"type": "string",
"description": "SHA of the commit to retrieve the file from"
}
},
"required": ["file_path", "commit_sha"]
}
}
@@ -1,72 +0,0 @@
import requests
import logging
class GetIssueDetails:
def __init__(self, base_url, token, repo):
self.base_url = base_url
self.headers = {
"Authorization": f"token {token}",
"Accept": "application/vnd.github.v3+json"
}
self.repo = repo
# Set up logging
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.INFO)
# Create a file handler
file_handler = logging.FileHandler('get_issue_details.log')
file_handler.setLevel(logging.INFO)
# Create a console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# Create a formatting for the logs
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levellevel)s - %(message)s')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# Add the handlers to the logger
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
def __call__(self, issue_number):
self.logger.info(f"Getting details for issue: {issue_number}")
url = f"{self.base_url}/repos/{self.repo}/issues/{issue_number}"
response = requests.get(url, headers=self.headers)
if response.status_code == 200:
issue_data = response.json()
issue_details = {
"number": issue_data["number"],
"title": issue_data["title"],
"state": issue_data["state"],
"body": issue_data["body"],
"created_at": issue_data["created_at"],
"updated_at": issue_data["updated_at"],
"labels": [label["name"] for label in issue_data["labels"]],
"assignees": [assignee["login"] for assignee in issue_data["assignees"]],
"comments": issue_data["comments"]
}
self.logger.info(f"Successfully retrieved details for issue {issue_number}")
return issue_details
else:
error_message = f"Error getting issue details: {response.status_code}\nResponse: {response.text}"
self.logger.error(error_message)
return error_message
# JSON definition for the get_issue_details function
get_issue_details_definition = {
"name": "get_issue_details",
"description": "Get details of a specific issue",
"parameters": {
"type": "object",
"properties": {
"issue_number": {
"type": "integer",
"description": "The number of the issue"
}
},
"required": ["issue_number"]
}
}
@@ -1,82 +0,0 @@
import requests
import logging
class ListBranches:
def __init__(self, base_url, token, repo):
self.base_url = base_url
self.headers = {
"Authorization": f"token {token}",
"Accept": "application/vnd.github.v3+json"
}
self.repo = repo
# Set up logging
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.INFO)
# Create a file handler
file_handler = logging.FileHandler('list_branches.log')
file_handler.setLevel(logging.INFO)
# Create a console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# Create a formatting for the logs
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# Add the handlers to the logger
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
def __call__(self, per_page=100, all_pages=True):
self.logger.info(f"Listing branches. Per page: {per_page}, All pages: {all_pages}")
url = f"{self.base_url}/repos/{self.repo}/branches"
params = {"per_page": min(per_page, 100)} # GitHub API max is 100 per page
all_branches = []
while url:
self.logger.info(f"Fetching branches from: {url}")
response = requests.get(url, headers=self.headers, params=params)
if response.status_code != 200:
error_message = f"Error listing branches: {response.status_code}"
self.logger.error(error_message)
return error_message
branches = [branch["name"] for branch in response.json()]
all_branches.extend(branches)
self.logger.info(f"Fetched {len(branches)} branches")
if not all_pages:
break
# Check if there's a next page
url = response.links.get('next', {}).get('url')
if url:
params = {} # Remove per_page for subsequent requests
self.logger.info(f"Successfully listed all branches. Total: {len(all_branches)}")
return all_branches
# JSON definition for the list_branches function
list_branches_definition = {
"name": "list_branches",
"description": "List all branches in the repository",
"parameters": {
"type": "object",
"properties": {
"per_page": {
"type": "integer",
"description": "Number of branches to return per page (max 100)",
"default": 100
},
"all_pages": {
"type": "boolean",
"description": "Whether to fetch all pages of results",
"default": True
}
}
}
}
-63
View File
@@ -1,63 +0,0 @@
import requests
import logging
class ListFiles:
def __init__(self, base_url, token, repo, current_branch):
self.base_url = base_url
self.headers = {
"Authorization": f"token {token}",
"Accept": "application/vnd.github.v3+json"
}
self.repo = repo
self.current_branch = current_branch
# Set up logging
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.INFO)
# Create a file handler
file_handler = logging.FileHandler('list_files.log')
file_handler.setLevel(logging.INFO)
# Create a console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# Create a formatting for the logs
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# Add the handlers to the logger
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
def __call__(self, path):
self.logger.info(f"Listing files in: {path} on branch: {self.current_branch}")
url = f"{self.base_url}/repos/{self.repo}/contents/{path}"
response = requests.get(url, headers=self.headers, params={"ref": self.current_branch})
if response.status_code == 200:
files = [item["name"] for item in response.json() if item["type"] == "file"]
directories = [item["name"] for item in response.json() if item["type"] == "dir"]
self.logger.info(f"Successfully listed files and directories in {path}")
return {"files": files, "directories": directories}
else:
error_message = f"Error listing files: {response.status_code}"
self.logger.error(error_message)
return error_message
# JSON definition for the list_files function
list_files_definition = {
"name": "list_files",
"description": "List files in a directory of the repository",
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the directory in the repository"
}
},
"required": ["path"]
}
}
@@ -1,83 +0,0 @@
import requests
import logging
class ListIssues:
def __init__(self, base_url, token, repo):
self.base_url = base_url
self.headers = {
"Authorization": f"token {token}",
"Accept": "application/vnd.github.v3+json"
}
self.repo = repo
# Set up logging
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.INFO)
# Create a file handler
file_handler = logging.FileHandler('list_issues.log')
file_handler.setLevel(logging.INFO)
# Create a console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# Create a formatting for the logs
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levellevel)s - %(message)s')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# Add the handlers to the logger
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
def __call__(self, state="open", per_page=30, page=1):
self.logger.info(f"Listing issues. State: {state}, Per page: {per_page}, Page: {page}")
url = f"{self.base_url}/repos/{self.repo}/issues"
params = {
"state": state,
"per_page": per_page,
"page": page
}
response = requests.get(url, headers=this.headers, params=params)
if response.status_code == 200:
issues = [{
"number": issue["number"],
"title": issue["title"],
"state": issue["state"],
"created_at": issue["created_at"],
"url": issue["html_url"]
} for issue in response.json()]
self.logger.info(f"Successfully listed issues. Found {len(issues)} issues.")
return issues
else:
error_message = f"Error listing issues: {response.status_code}\nResponse: {response.text}"
self.logger.error(error_message)
return error_message
# JSON definition for the list_issues function
list_issues_definition = {
"name": "list_issues",
"description": "List issues in the repository",
"parameters": {
"type": "object",
"properties": {
"state": {
"type": "string",
"enum": ["open", "closed", "all"],
"default": "open",
"description": "State of the issues to retrieve"
},
"per_page": {
"type": "integer",
"default": 30,
"description": "Number of issues to return per page"
},
"page": {
"type": "integer",
"default": 1,
"description": "Page number of the results to fetch"
}
}
}
}
@@ -1,82 +0,0 @@
import requests
import logging
class MergePullRequest:
def __init__(self, base_url, token, repo):
self.base_url = base_url
self.headers = {
"Authorization": f"token {token}",
"Accept": "application/vnd.github.v3+json"
}
self.repo = repo
# Set up logging
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.INFO)
# Create a file handler
file_handler = logging.FileHandler('merge_pull_request.log')
file_handler.setLevel(logging.INFO)
# Create a console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# Create a formatting for the logs
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levellevel)s - %(message)s')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# Add the handlers to the logger
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
def __call__(self, pull_number, commit_title, commit_message, merge_method):
self.logger.info(f"Merging pull request: {pull_number}")
url = f"{self.base_url}/repos/{self.repo}/pulls/{pull_number}/merge"
data = {
"commit_title": commit_title,
"commit_message": commit_message,
"merge_method": merge_method
}
response =.requests.put(url, headers=self.headers, json=data)
if response.status_code == 200:
success_message = f"Pull request {pull_number} merged successfully"
self.logger.info(success_message)
return success_message
else:
error_message = f"Error merging pull request: {response.status_code}\nResponse: {response.text}"
self.logger.error(error_message)
return error_message
# JSON definition for the merge_pull_request function
merge_pull_request_definition = {
"name": "merge_pull_request",
"description": "Merge a pull request",
"parameters": {
"type": "object",
"properties": {
"pull_number": {
"type": "integer",
"description": "The number of the pull request"
},
"commit_title": {
"type": "string",
"description": "Title for the automatic commit message",
"default": "Merge pull request"
},
"commit_message": {
"type": "string",
"description": "Extra detail to append to automatic commit message",
"default": ""
},
"merge_method": {
"type": "string",
"description": "Merge method to use",
"enum": ["merge", "squash", "rebase"],
"default": "merge"
}
},
"required": ["pull_number"]
}
}
-49
View File
@@ -1,49 +0,0 @@
import base64
import requests
import os
import logging
class ReadFile:
def __init__(self, base_url, token, repo, current_branch):
self.base_url = base_url
self.headers = {
"Authorization": f"token {token}",
"Accept": "application/vnd.github.v3+json"
}
self.repo = repo
self.current_branch = current_branch
# Set up logging
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.INFO)
# Create a file handler
file_handler = logging.FileHandler('read_file.log')
file_handler.setLevel(logging.INFO)
# Create a console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# Create a formatting for the logs
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# Add the handlers to the logger
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
def __call__(self, path):
self.logger.info(f"Reading file: {path} from branch: {self.current_branch}")
url = f"{self.base_url}/repos/{self.repo}/contents/{path}"
response = requests.get(url, headers=self.headers, params={"ref": self.current_branch})
if response.status_code == 200:
content = response.json()["content"]
decoded_content = base64.b64decode(content).decode('utf-8')
self.logger.info(f"Successfully read file: {path}")
return decoded_content
else:
error_message = f"Error reading file: {response.status_code}"
self.logger.error(error_message)
return error_message
@@ -1,65 +0,0 @@
import requests
import logging
class SearchCode:
def __init__(self, base_url, token, repo):
self.base_url = base_url
self.headers = {
"Authorization": f"token {token}",
"Accept": "application/vnd.github.v3+json"
}
self.repo = repo
# Set up logging
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.INFO)
# Create a file handler
file_handler = logging.FileHandler('search_code.log')
file_handler.setLevel(logging.INFO)
# Create a console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# Create a formatting for the logs
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# Add the handlers to the logger
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
def __call__(self, query):
self.logger.info(f"Searching code with query: {query}")
url = f"{self.base_url}/search/code"
params = {
"q": f"{query} repo:{self.repo}",
"per_page": 10
}
response = requests.get(url, headers=self.headers, params=params)
if response.status_code == 200:
results = [{"file": item["path"], "url": item["html_url"]} for item in response.json()["items"]]
self.logger.info(f"Successfully searched code. Found {len(results)} results.")
return results
else:
error_message = f"Error searching code: {response.status_code}"
self.logger.error(error_message)
return error_message
# JSON definition for the search_code function
search_code_definition = {
"name": "search_code",
"description": "Search for code in the repository",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query"
}
},
"required": ["query"]
}
}
@@ -1,47 +0,0 @@
import logging
class SetCurrentBranch:
def __init__(self, current_branch):
self.current_branch = current_branch
# Set up logging
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.INFO)
# Create a file handler
file_handler = logging.FileHandler('set_current_branch.log')
file_handler.setLevel(logging.INFO)
# Create a console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# Create a formatting for the logs
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# Add the handlers to the logger
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
def __call__(self, branch_name):
self.logger.info(f"Setting current branch from {self.current_branch} to {branch_name}")
self.current_branch = branch_name
return f"Current branch set to: {self.current_branch}"
# JSON definition for the set_current_branch function
set_current_branch_definition = {
"name": "set_current_branch",
"description": "Set the current branch",
"parameters": {
"type": "object",
"properties": {
"branch_name": {
"type": "string",
"description": "Name of the branch to set as current"
}
},
"required": ["branch_name"]
}
}