Merge pull request #235 from bucolucas/docs/add-tools-readme

Add RepoIndexTool and expand tools README
This commit is contained in:
2025-08-09 18:42:19 -05:00
committed by GitHub
2 changed files with 285 additions and 0 deletions
+57
View File
@@ -0,0 +1,57 @@
# Tools Overview
This repository contains a set of pluggable tools that the assistant can use to operate on itself and on the surrounding GitHub repository. Each tool follows a simple interface defined by BaseTool and exposes a set of callable functions that higher-level orchestration can invoke.
## Architecture
- BaseTool: All tools inherit from tools/base_tool.py and implement:
- get_functions(): returns a list of function specs (name, description, JSON schema for parameters)
- execute(function_name, **kwargs): dispatches calls to concrete implementations
- clear(): resets transient state
- Discovery: Tools are simple Python modules under tools/. They can be imported and registered by the host application. Each tool is self-contained and may use environment variables for configuration.
## Available tools
- GitHubTool (tools/github_tool.py)
- Rich integration with the GitHub REST API for repository tasks.
- Examples of capabilities: read_file, list_files, search_code, create_branch, commit_file, commit_file_patch, create_pull_request, PR review helpers, issues and project boards, branch utilities, and more.
- GitHubCIHelper (tools/github_ci_tool.py)
- Focused helpers for GitHub Actions CI: discover PR workflow runs, fetch job logs, and parse unittest failure blocks from logs.
- LogTool (tools/log_tool.py)
- Reads the local logs/output.log file. Supports tailing by line count or filtering to the last 24 hours using a timestamp-aware parser.
- StandaloneLLMTool (tools/standalone_llm_tool.py)
- Bridges to external LLMs or a separate copilot service.
- Functions: call_external_llm (uses OPENAI_API_KEY), call_external_copilot (uses COPILOT_API_URL).
- RepoIndexTool (tools/repo_index_tool.py) [NEW]
- Quickly builds a lightweight index of repository paths via the GitHub Contents API to aid navigation, discovery, and targeted reads.
- Functions:
- get_repo_tree(path="", ref="main", max_depth=3, include_files=True, include_dirs=True)
- find_files(pattern, path="", ref="main", max_results=50)
- get_file_head(path, ref="main", max_bytes=4096)
## Adding a new tool
1) Create a new module under tools/ and subclass BaseTool.
2) Implement get_functions() to describe your function signatures and parameters.
3) Implement execute() to route to internal methods and return structured results or error strings.
4) Prefer dependency injection and env vars over hardcoding. Reuse a shared requests.Session where practical.
5) Log responsibly using the logging module; do not print directly. Attach a NullHandler by default to avoid handler warnings.
6) Avoid storing secrets in memory; prefer short-lived per-request usage. Implement clear() to drop state.
## Environment variables (common)
- GITHUB_TOKEN: required for GitHub API access in most tools.
- GITHUB_REPOSITORY: owner/repo used by GitHubTool and RepoIndexTool.
- OPENAI_API_KEY: used by StandaloneLLMTool.
- COPILOT_API_URL: used by StandaloneLLMTool for external copilot calls.
## Notes
- Tools should be defensive and return clear error messages on failures.
- Keep function results concise and JSON-serializable where possible.
- If your tool fetches large data, consider pagination or size limits and expose parameters for control.
+228
View File
@@ -0,0 +1,228 @@
from .base_tool import BaseTool
import os
import requests
import base64
import logging
import fnmatch
from typing import List, Dict, Any, Optional
class RepoIndexTool(BaseTool):
"""
Lightweight repository index and discovery helper using the GitHub Contents API.
Functions provided:
- get_repo_tree(path="", ref="main", max_depth=3, include_files=True, include_dirs=True)
- find_files(pattern, path="", ref="main", max_results=50)
- get_file_head(path, ref="main", max_bytes=4096)
"""
def __init__(self, session: Optional[requests.Session] = None, token: Optional[str] = None,
repo: Optional[str] = None, base_url: Optional[str] = None, logger: Optional[logging.Logger] = None):
self.base_url = base_url if base_url else "https://api.github.com"
self._token = token if token else os.environ.get("GITHUB_TOKEN")
self._repo = repo if repo else os.environ.get("GITHUB_REPOSITORY")
if not self._token:
raise ValueError("GitHub token must be provided either as an argument or via GITHUB_TOKEN env var.")
if not self._repo:
raise ValueError("GitHub repository (e.g., 'owner/repo') must be provided either as an argument or via GITHUB_REPOSITORY env var.")
if session:
self.session = session
else:
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"token {self._token}",
"Accept": "application/vnd.github.v3+json"
})
self.logger = logger if logger else logging.getLogger(__name__)
if not self.logger.handlers:
self.logger.addHandler(logging.NullHandler())
def clear(self):
# No state to clear for this tool currently
self.logger.debug("RepoIndexTool.clear called; no state to reset.")
def get_functions(self):
return [
{
"type": "function",
"function": {
"name": "get_repo_tree",
"description": "List repository paths under a directory with depth control via the Contents API.",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "Path to start from (directory or file). Use '' or '.' for repo root.", "default": ""},
"ref": {"type": "string", "description": "Branch, tag, or commit SHA to query.", "default": "main"},
"max_depth": {"type": "integer", "description": "Maximum directory depth to recurse (0 = only the provided path).", "default": 3},
"include_files": {"type": "boolean", "description": "Include file entries in results.", "default": True},
"include_dirs": {"type": "boolean", "description": "Include directory entries in results.", "default": True}
},
"required": []
}
},
"_tags": ["read"]
},
{
"type": "function",
"function": {
"name": "find_files",
"description": "Find files matching a glob pattern starting under a path.",
"parameters": {
"type": "object",
"properties": {
"pattern": {"type": "string", "description": "Glob pattern (e.g., **/*.py, *.md) applied to full repository path."},
"path": {"type": "string", "description": "Directory to start search from.", "default": ""},
"ref": {"type": "string", "description": "Branch, tag, or commit SHA to query.", "default": "main"},
"max_results": {"type": "integer", "description": "Maximum number of results to return.", "default": 50}
},
"required": ["pattern"]
}
},
"_tags": ["read"]
},
{
"type": "function",
"function": {
"name": "get_file_head",
"description": "Return the first N bytes of a file (decoded as text) for a quick preview.",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "Path to the file in the repository."},
"ref": {"type": "string", "description": "Branch, tag, or commit SHA to query.", "default": "main"},
"max_bytes": {"type": "integer", "description": "Maximum number of bytes from the beginning of the file to return.", "default": 4096}
},
"required": ["path"]
}
},
"_tags": ["read"]
}
]
def execute(self, function_name, **kwargs):
self.logger.info(f"Executing RepoIndexTool function: {function_name} with args: {kwargs}")
method_name = f"_{function_name}"
if hasattr(self, method_name) and callable(getattr(self, method_name)):
try:
return getattr(self, method_name)(**kwargs)
except Exception as e:
self.logger.error(f"Error executing {function_name}: {e}", exc_info=True)
return f"Error during {function_name} execution: {str(e)}"
else:
msg = f"Unknown function: {function_name}"
self.logger.error(msg)
return msg
# Private implementations
def _get_repo_tree(self, path: str = "", ref: str = "main", max_depth: int = 3,
include_files: bool = True, include_dirs: bool = True):
start_path = path.strip("/")
if start_path in (".",):
start_path = ""
self.logger.info(f"Building repo tree from path='{start_path or '/'}', ref='{ref}', max_depth={max_depth}")
results: List[Dict[str, Any]] = []
try:
self._collect_entries(start_path, ref, depth=max_depth, include_files=include_files, include_dirs=include_dirs, out=results)
# Sort results for stability: directories first, then files, lexicographically by path
results.sort(key=lambda e: (0 if e.get('type') == 'dir' else 1, e.get('path', '')))
return results
except requests.HTTPError as e:
return f"HTTP error while listing '{start_path or '/'}' at '{ref}': {e.response.status_code} - {e.response.text if e.response is not None else e}"
except Exception as e:
return f"Error while building repo tree from '{start_path or '/'}' at '{ref}': {str(e)}"
def _collect_entries(self, path: str, ref: str, depth: int, include_files: bool, include_dirs: bool, out: List[Dict[str, Any]]):
# Stop condition: when depth < 0, we do nothing; when depth == 0, we only include the node itself (if file/dir allowed)
url = f"{self.base_url}/repos/{self._repo}/contents/{path}" if path else f"{self.base_url}/repos/{self._repo}/contents"
params = {"ref": ref}
self.logger.debug(f"Listing '{path or '/'}' with depth={depth}")
resp = self.session.get(url, params=params)
if resp.status_code == 404:
self.logger.warning(f"Path not found: '{path or '/'}' at ref '{ref}'")
return
resp.raise_for_status()
data = resp.json()
if isinstance(data, dict) and data.get('type') == 'file':
if include_files:
out.append(self._entry_from_item(data))
return
# If it's a directory (list of items)
if not isinstance(data, list):
# Unexpected payload
self.logger.debug(f"Unexpected payload for path '{path}': {type(data)}")
return
# Add the directory itself if requested
if include_dirs:
out.append({
"name": os.path.basename(path) if path else "/",
"path": data[0]["path"].rsplit('/', 1)[0] if data else path, # derive path; if empty dir, fallback to path
"type": "dir"
})
if depth <= 0:
return
for item in data:
item_type = item.get('type')
if item_type == 'file':
if include_files:
out.append(self._entry_from_item(item))
elif item_type == 'dir':
if depth > 0:
# Recurse into subdir
self._collect_entries(item.get('path', ''), ref, depth=depth - 1,
include_files=include_files, include_dirs=include_dirs, out=out)
else:
# symlink, submodule, etc. We include a minimal record.
out.append({"name": item.get('name'), "path": item.get('path'), "type": item_type or 'unknown'})
def _entry_from_item(self, item: Dict[str, Any]) -> Dict[str, Any]:
return {
"name": item.get('name'),
"path": item.get('path'),
"type": item.get('type'),
"size": item.get('size'),
"sha": item.get('sha')
}
def _find_files(self, pattern: str, path: str = "", ref: str = "main", max_results: int = 50):
# Build a limited-depth tree; for search, default to a moderate depth to avoid huge traversals.
# Here we use a depth of 10 to allow most repos; adjust if needed via path segmentation.
self.logger.info(f"Finding files matching pattern='{pattern}' under '{path or '/'}' at ref '{ref}' (max_results={max_results})")
tree = self._get_repo_tree(path=path, ref=ref, max_depth=10, include_files=True, include_dirs=False)
if isinstance(tree, str):
return tree # error string
matches = []
for entry in tree:
p = entry.get('path', '')
if entry.get('type') == 'file' and fnmatch.fnmatch(p, pattern):
matches.append(entry)
if len(matches) >= max_results:
break
return matches
def _get_file_head(self, path: str, ref: str = "main", max_bytes: int = 4096):
self.logger.info(f"Fetching head of file '{path}' at ref '{ref}', up to {max_bytes} bytes")
url = f"{self.base_url}/repos/{self._repo}/contents/{path}"
resp = self.session.get(url, params={"ref": ref})
if resp.status_code == 404:
return f"Error: File '{path}' not found at ref '{ref}'."
if resp.status_code != 200:
return f"Error reading file '{path}' at ref '{ref}': {resp.status_code} - {resp.text}"
data = resp.json()
if data.get('type') != 'file':
return f"Error: Path '{path}' is not a file."
try:
content_b64 = data.get('content', '')
decoded = base64.b64decode(content_b64).decode('utf-8', errors='replace')
return decoded[:max_bytes]
except Exception as e:
return f"Error decoding content for '{path}': {str(e)}"'''
}