1342 lines
56 KiB
Python
1342 lines
56 KiB
Python
"""
|
|
title: Gitea Dev
|
|
author: Jeff Smith + minimax + Claude
|
|
version: 1.1.0
|
|
license: MIT
|
|
description: Interact with Gitea repositories - read, write, branch, and PR/Issue workflows
|
|
requirements: pydantic, httpx
|
|
"""
|
|
|
|
from typing import Optional, Callable, Any, List, Dict
|
|
from pydantic import BaseModel, Field
|
|
import httpx
|
|
import base64
|
|
|
|
|
|
class Tools:
|
|
class Valves(BaseModel):
|
|
"""Shared configuration (set by admin)"""
|
|
|
|
GITEA_URL: str = Field(
|
|
default="https://gitea.example.com",
|
|
description="Gitea URL (ingress or internal service)",
|
|
)
|
|
DEFAULT_REPO: str = Field(
|
|
default="",
|
|
description="Default repository in owner/repo format",
|
|
)
|
|
DEFAULT_BRANCH: str = Field(default="main", description="Default branch name")
|
|
DEFAULT_ORG: str = Field(default="", description="Default organization")
|
|
ALLOW_USER_OVERRIDES: bool = Field(
|
|
default=True,
|
|
description="Allow users to override defaults via UserValves (False = locked to admin defaults)",
|
|
)
|
|
# Pagination defaults
|
|
DEFAULT_PAGE_SIZE: int = Field(
|
|
default=50,
|
|
description="Default page size for list operations (max 50)",
|
|
)
|
|
|
|
class UserValves(BaseModel):
|
|
"""Per-user configuration (personal credentials and overrides)"""
|
|
|
|
GITEA_TOKEN: str = Field(
|
|
default="",
|
|
description="Your Gitea API token (Settings > Applications > Generate Token)",
|
|
)
|
|
USER_DEFAULT_REPO: str = Field(
|
|
default="",
|
|
description="Override default repository",
|
|
)
|
|
USER_DEFAULT_BRANCH: str = Field(
|
|
default="",
|
|
description="Override default branch",
|
|
)
|
|
USER_DEFAULT_ORG: str = Field(
|
|
default="",
|
|
description="Override default organization",
|
|
)
|
|
|
|
def __init__(self):
|
|
self.valves = self.Valves()
|
|
self.citation = False
|
|
|
|
def _api_url(self, endpoint: str) -> str:
|
|
base = self._get_url()
|
|
return f"{base}/api/v1{endpoint}"
|
|
|
|
def _get_url(self) -> str:
|
|
"""Get effective URL."""
|
|
return self.valves.GITEA_URL.rstrip("/")
|
|
|
|
def _get_token(self, __user__: dict = None) -> str:
|
|
"""Extract token from framework-provided user context."""
|
|
if __user__ and "valves" in __user__:
|
|
user_valves = self.UserValves(**__user__["valves"])
|
|
return user_valves.GITEA_TOKEN
|
|
return ""
|
|
|
|
def _headers(self, __user__: dict = None) -> dict:
|
|
token = self._get_token(__user__)
|
|
return {
|
|
"Authorization": f"token {token}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
|
|
def _get_repo(self, repo: Optional[str], __user__: dict = None) -> str:
|
|
"""Get effective repo with priority."""
|
|
if repo:
|
|
return repo
|
|
if __user__ and "valves" in __user__:
|
|
user_valves = self.UserValves(**__user__["valves"])
|
|
if self.valves.ALLOW_USER_OVERRIDES and user_valves.USER_DEFAULT_REPO:
|
|
return user_valves.USER_DEFAULT_REPO
|
|
return self.valves.DEFAULT_REPO
|
|
|
|
def _get_branch(self, branch: Optional[str], __user__: dict = None) -> str:
|
|
"""Get effective branch with priority."""
|
|
if branch:
|
|
return branch
|
|
if __user__ and "valves" in __user__:
|
|
user_valves = self.UserValves(**__user__["valves"])
|
|
if self.valves.ALLOW_USER_OVERRIDES and user_valves.USER_DEFAULT_BRANCH:
|
|
return user_valves.USER_DEFAULT_BRANCH
|
|
return self.valves.DEFAULT_BRANCH
|
|
|
|
def _get_org(self, org: Optional[str], __user__: dict = None) -> str:
|
|
"""Get effective org with priority."""
|
|
if org:
|
|
return org
|
|
if __user__ and "valves" in __user__:
|
|
user_valves = self.UserValves(**__user__["valves"])
|
|
if self.valves.ALLOW_USER_OVERRIDES and user_valves.USER_DEFAULT_ORG:
|
|
return user_valves.USER_DEFAULT_ORG
|
|
return self.valves.DEFAULT_ORG
|
|
|
|
def _resolve_repo(
|
|
self, repo: Optional[str], __user__: dict = None
|
|
) -> tuple[str, str]:
|
|
effective_repo = self._get_repo(repo, __user__)
|
|
if not effective_repo:
|
|
raise ValueError(
|
|
"No repository specified. Set DEFAULT_REPO in Valves or USER_DEFAULT_REPO in UserValves."
|
|
)
|
|
if "/" not in effective_repo:
|
|
raise ValueError(
|
|
f"Repository must be in owner/repo format, got: {effective_repo}"
|
|
)
|
|
return effective_repo.split("/", 1)
|
|
|
|
def _get_page_size(self, limit: Optional[int] = None) -> int:
|
|
"""Get effective page size, capped at 50 (Gitea max)."""
|
|
if limit is not None:
|
|
return min(limit, 50)
|
|
return min(self.valves.DEFAULT_PAGE_SIZE, 50)
|
|
|
|
# REPOSITORY DISCOVERY (WITH PAGINATION)
|
|
async def list_repos(
|
|
self,
|
|
page: int = 1,
|
|
limit: Optional[int] = None,
|
|
__user__: dict = None,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> str:
|
|
"""List repositories accessible to the authenticated user."""
|
|
token = self._get_token(__user__)
|
|
if not token:
|
|
return "Error: GITEA_TOKEN not configured. Add it in User Settings > Gitea Dev."
|
|
page_size = self._get_page_size(limit)
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {
|
|
"description": f"Fetching repositories (page {page})...",
|
|
"done": False,
|
|
},
|
|
}
|
|
)
|
|
try:
|
|
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
|
|
response = await client.get(
|
|
self._api_url("/user/repos"),
|
|
headers=self._headers(__user__),
|
|
params={"page": page, "limit": page_size},
|
|
)
|
|
response.raise_for_status()
|
|
repos = response.json()
|
|
total_count = response.headers.get("x-total-count", "?")
|
|
output = f"Accessible Repositories (Page {page}, {page_size}/page):\n\n"
|
|
for repo in repos:
|
|
full_name = repo.get("full_name", "")
|
|
desc = repo.get("description", "")[:50] or "No description"
|
|
private = "🔒" if repo.get("private") else "🌐"
|
|
default_branch = repo.get("default_branch", "main")
|
|
output += f"- {private} {full_name} ({default_branch}): {desc}\n"
|
|
output += f"\nShowing {len(repos)} repos (Total: {total_count})"
|
|
if len(repos) == page_size:
|
|
output += (
|
|
f"\nMore results may exist. Use page={page + 1} to continue."
|
|
)
|
|
effective_repo = self._get_repo(None, __user__)
|
|
if effective_repo:
|
|
output += f"\nActive project: {effective_repo}"
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {
|
|
"description": "Done",
|
|
"done": True,
|
|
"hidden": True,
|
|
},
|
|
}
|
|
)
|
|
return output
|
|
except httpx.HTTPStatusError as e:
|
|
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
|
|
except Exception as e:
|
|
return f"Error: {type(e).__name__}: {e}"
|
|
|
|
async def list_branches(
|
|
self,
|
|
repo: Optional[str] = None,
|
|
page: int = 1,
|
|
limit: Optional[int] = None,
|
|
__user__: dict = None,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> str:
|
|
"""List branches in a repository."""
|
|
token = self._get_token(__user__)
|
|
if not token:
|
|
return "Error: GITEA_TOKEN not configured"
|
|
try:
|
|
owner, repo_name = self._resolve_repo(repo, __user__)
|
|
except ValueError as e:
|
|
return f"Error: {e}"
|
|
page_size = self._get_page_size(limit)
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {
|
|
"description": f"Fetching branches for {owner}/{repo_name} (page {page})...",
|
|
"done": False,
|
|
},
|
|
}
|
|
)
|
|
try:
|
|
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
|
|
response = await client.get(
|
|
self._api_url(f"/repos/{owner}/{repo_name}/branches"),
|
|
headers=self._headers(__user__),
|
|
params={"page": page, "limit": page_size},
|
|
)
|
|
response.raise_for_status()
|
|
branches = response.json()
|
|
total_count = response.headers.get("x-total-count", "?")
|
|
output = f"Branches in {owner}/{repo_name} (Page {page}):\n\n"
|
|
for branch in branches:
|
|
name = branch.get("name", "")
|
|
protected = "🛡️" if branch.get("protected") else ""
|
|
commit_sha = branch.get("commit", {}).get("id", "")[:8]
|
|
output += f"- {name} {protected} [{commit_sha}]\n"
|
|
output += f"\nShowing {len(branches)} branches (Total: {total_count})"
|
|
if len(branches) == page_size:
|
|
output += (
|
|
f"\nMore results may exist. Use page={page + 1} to continue."
|
|
)
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {
|
|
"description": "Done",
|
|
"done": True,
|
|
"hidden": True,
|
|
},
|
|
}
|
|
)
|
|
return output
|
|
except httpx.HTTPStatusError as e:
|
|
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
|
|
except Exception as e:
|
|
return f"Error: {type(e).__name__}: {e}"
|
|
|
|
async def list_files(
|
|
self,
|
|
path: str = "",
|
|
repo: Optional[str] = None,
|
|
branch: Optional[str] = None,
|
|
__user__: dict = None,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> str:
|
|
"""List files and directories in a repository path."""
|
|
token = self._get_token(__user__)
|
|
if not token:
|
|
return "Error: GITEA_TOKEN not configured"
|
|
try:
|
|
owner, repo_name = self._resolve_repo(repo, __user__)
|
|
except ValueError as e:
|
|
return f"Error: {e}"
|
|
effective_branch = self._get_branch(branch, __user__)
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {
|
|
"description": f"Listing {path or 'root'}...",
|
|
"done": False,
|
|
},
|
|
}
|
|
)
|
|
try:
|
|
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
|
|
response = await client.get(
|
|
self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
|
|
headers=self._headers(__user__),
|
|
params={"ref": effective_branch},
|
|
)
|
|
response.raise_for_status()
|
|
contents = response.json()
|
|
if isinstance(contents, dict):
|
|
return f"'{path}' is a file, not a directory. Use get_file() to read it."
|
|
output = f"Contents of {owner}/{repo_name}/{path or '.'} ({effective_branch}):\n\n"
|
|
dirs = [c for c in contents if c.get("type") == "dir"]
|
|
files = [c for c in contents if c.get("type") == "file"]
|
|
for item in sorted(dirs, key=lambda x: x.get("name", "")):
|
|
output += f"📁 {item.get('name', '')}/\n"
|
|
for item in sorted(files, key=lambda x: x.get("name", "")):
|
|
size = item.get("size", 0)
|
|
size_str = f"{size}B" if size < 1024 else f"{size//1024}KB"
|
|
output += f"📄 {item.get('name', '')} ({size_str})\n"
|
|
output += f"\nTotal: {len(dirs)} directories, {len(files)} files"
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {
|
|
"description": "Done",
|
|
"done": True,
|
|
"hidden": True,
|
|
},
|
|
}
|
|
)
|
|
return output
|
|
except httpx.HTTPStatusError as e:
|
|
if e.response.status_code == 404:
|
|
return f"Path not found: {path}"
|
|
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
|
|
except Exception as e:
|
|
return f"Error: {type(e).__name__}: {e}"
|
|
|
|
async def get_file(
|
|
self,
|
|
path: str,
|
|
repo: Optional[str] = None,
|
|
branch: Optional[str] = None,
|
|
__user__: dict = None,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> str:
|
|
"""Get contents of a file from the repository."""
|
|
token = self._get_token(__user__)
|
|
if not token:
|
|
return "Error: GITEA_TOKEN not configured"
|
|
try:
|
|
owner, repo_name = self._resolve_repo(repo, __user__)
|
|
except ValueError as e:
|
|
return f"Error: {e}"
|
|
effective_branch = self._get_branch(branch, __user__)
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {"description": f"Reading {path}...", "done": False},
|
|
}
|
|
)
|
|
try:
|
|
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
|
|
response = await client.get(
|
|
self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
|
|
headers=self._headers(__user__),
|
|
params={"ref": effective_branch},
|
|
)
|
|
response.raise_for_status()
|
|
file_info = response.json()
|
|
if isinstance(file_info, list):
|
|
return f"'{path}' is a directory, not a file. Use list_files() to browse."
|
|
if file_info.get("type") != "file":
|
|
return f"'{path}' is not a file (type: {file_info.get('type')})"
|
|
content_b64 = file_info.get("content", "")
|
|
try:
|
|
content = base64.b64decode(content_b64).decode("utf-8")
|
|
except Exception:
|
|
return "Error: Could not decode file content (may be binary)"
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {
|
|
"description": "Done",
|
|
"done": True,
|
|
"hidden": True,
|
|
},
|
|
}
|
|
)
|
|
return f"File: {owner}/{repo_name}/{path} ({effective_branch})\nSHA: {file_info.get('sha', 'unknown')[:8]}\n\n```\n{content}\n```"
|
|
except httpx.HTTPStatusError as e:
|
|
if e.response.status_code == 404:
|
|
return f"File not found: {path}"
|
|
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
|
|
except Exception as e:
|
|
return f"Error: {type(e).__name__}: {e}"
|
|
|
|
async def create_file(
|
|
self,
|
|
path: str,
|
|
content: str,
|
|
message: str,
|
|
repo: Optional[str] = None,
|
|
branch: Optional[str] = None,
|
|
__user__: dict = None,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> str:
|
|
"""Create a new file in the repository."""
|
|
token = self._get_token(__user__)
|
|
if not token:
|
|
return "Error: GITEA_TOKEN not configured"
|
|
try:
|
|
owner, repo_name = self._resolve_repo(repo, __user__)
|
|
except ValueError as e:
|
|
return f"Error: {e}"
|
|
effective_branch = self._get_branch(branch, __user__)
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {"description": f"Creating {path}...", "done": False},
|
|
}
|
|
)
|
|
try:
|
|
content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii")
|
|
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
|
|
response = await client.post(
|
|
self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
|
|
headers=self._headers(__user__),
|
|
json={
|
|
"content": content_b64,
|
|
"message": message,
|
|
"branch": effective_branch,
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
commit_sha = result.get("commit", {}).get("sha", "unknown")[:8]
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {
|
|
"description": "Done",
|
|
"done": True,
|
|
"hidden": True,
|
|
},
|
|
}
|
|
)
|
|
return f"Created {path} on {effective_branch}\nCommit: {commit_sha}\nMessage: {message}"
|
|
except httpx.HTTPStatusError as e:
|
|
if e.response.status_code == 422:
|
|
return f"File already exists: {path}. Use update_file() instead."
|
|
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
|
|
except Exception as e:
|
|
return f"Error: {type(e).__name__}: {e}"
|
|
|
|
async def update_file(
|
|
self,
|
|
path: str,
|
|
content: str,
|
|
message: str,
|
|
repo: Optional[str] = None,
|
|
branch: Optional[str] = None,
|
|
__user__: dict = None,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> str:
|
|
"""Update an existing file in the repository."""
|
|
token = self._get_token(__user__)
|
|
if not token:
|
|
return "Error: GITEA_TOKEN not configured"
|
|
try:
|
|
owner, repo_name = self._resolve_repo(repo, __user__)
|
|
except ValueError as e:
|
|
return f"Error: {e}"
|
|
effective_branch = self._get_branch(branch, __user__)
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {"description": f"Updating {path}...", "done": False},
|
|
}
|
|
)
|
|
try:
|
|
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
|
|
get_response = await client.get(
|
|
self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
|
|
headers=self._headers(__user__),
|
|
params={"ref": effective_branch},
|
|
)
|
|
if get_response.status_code == 404:
|
|
return f"File not found: {path}. Use create_file() instead."
|
|
get_response.raise_for_status()
|
|
file_info = get_response.json()
|
|
sha = file_info.get("sha")
|
|
if not sha:
|
|
return "Error: Could not get file SHA for update"
|
|
content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii")
|
|
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
|
|
response = await client.put(
|
|
self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
|
|
headers=self._headers(__user__),
|
|
json={
|
|
"content": content_b64,
|
|
"message": message,
|
|
"branch": effective_branch,
|
|
"sha": sha,
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
commit_sha = result.get("commit", {}).get("sha", "unknown")[:8]
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {
|
|
"description": "Done",
|
|
"done": True,
|
|
"hidden": True,
|
|
},
|
|
}
|
|
)
|
|
return f"Updated {path} on {effective_branch}\nCommit: {commit_sha}\nMessage: {message}"
|
|
except httpx.HTTPStatusError as e:
|
|
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
|
|
except Exception as e:
|
|
return f"Error: {type(e).__name__}: {e}"
|
|
|
|
async def delete_file(
|
|
self,
|
|
path: str,
|
|
message: str,
|
|
repo: Optional[str] = None,
|
|
branch: Optional[str] = None,
|
|
__user__: dict = None,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
__event_call__: Callable[[dict], Any] = None,
|
|
) -> str:
|
|
"""Delete a file from the repository."""
|
|
token = self._get_token(__user__)
|
|
if not token:
|
|
return "Error: GITEA_TOKEN not configured"
|
|
try:
|
|
owner, repo_name = self._resolve_repo(repo, __user__)
|
|
except ValueError as e:
|
|
return f"Error: {e}"
|
|
effective_branch = self._get_branch(branch, __user__)
|
|
# Confirm deletion
|
|
if __event_call__:
|
|
result = await __event_call__(
|
|
{
|
|
"type": "confirmation",
|
|
"data": {
|
|
"title": "Delete File?",
|
|
"message": f"Delete {path} from {owner}/{repo_name} ({effective_branch})?",
|
|
},
|
|
}
|
|
)
|
|
if result is None or result is False:
|
|
return "delete_file CANCELLED"
|
|
if isinstance(result, dict) and not result.get("confirmed"):
|
|
return "delete_file CANCELLED"
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {"description": f"Deleting {path}...", "done": False},
|
|
}
|
|
)
|
|
try:
|
|
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
|
|
get_response = await client.get(
|
|
self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
|
|
headers=self._headers(__user__),
|
|
params={"ref": effective_branch},
|
|
)
|
|
if get_response.status_code == 404:
|
|
return f"File not found: {path}"
|
|
get_response.raise_for_status()
|
|
file_info = get_response.json()
|
|
sha = file_info.get("sha")
|
|
if not sha:
|
|
return "Error: Could not get file SHA for delete"
|
|
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
|
|
response = await client.delete(
|
|
self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
|
|
headers=self._headers(__user__),
|
|
json={
|
|
"message": message,
|
|
"branch": effective_branch,
|
|
"sha": sha,
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
commit_sha = result.get("commit", {}).get("sha", "unknown")[:8]
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {
|
|
"description": "Done",
|
|
"done": True,
|
|
"hidden": True,
|
|
},
|
|
}
|
|
)
|
|
return f"Deleted {path} on {effective_branch}\nCommit: {commit_sha}\nMessage: {message}"
|
|
except httpx.HTTPStatusError as e:
|
|
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
|
|
except Exception as e:
|
|
return f"Error: {type(e).__name__}: {e}"
|
|
|
|
# BRANCHING & PR (WITH PAGINATION)
|
|
async def create_branch(
|
|
self,
|
|
branch_name: str,
|
|
from_branch: Optional[str] = None,
|
|
repo: Optional[str] = None,
|
|
__user__: dict = None,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> str:
|
|
"""Create a new branch."""
|
|
token = self._get_token(__user__)
|
|
if not token:
|
|
return "Error: GITEA_TOKEN not configured"
|
|
try:
|
|
owner, repo_name = self._resolve_repo(repo, __user__)
|
|
except ValueError as e:
|
|
return f"Error: {e}"
|
|
effective_from = from_branch or self._get_branch(None, __user__)
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {
|
|
"description": f"Creating branch {branch_name}...",
|
|
"done": False,
|
|
},
|
|
}
|
|
)
|
|
try:
|
|
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
|
|
response = await client.post(
|
|
self._api_url(f"/repos/{owner}/{repo_name}/branches"),
|
|
headers=self._headers(__user__),
|
|
json={
|
|
"new_branch_name": branch_name,
|
|
"old_branch_name": effective_from,
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {
|
|
"description": "Done",
|
|
"done": True,
|
|
"hidden": True,
|
|
},
|
|
}
|
|
)
|
|
return f"Created branch '{branch_name}' from '{effective_from}' in {owner}/{repo_name}"
|
|
except httpx.HTTPStatusError as e:
|
|
if e.response.status_code == 409:
|
|
return f"Branch '{branch_name}' already exists"
|
|
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
|
|
except Exception as e:
|
|
return f"Error: {type(e).__name__}: {e}"
|
|
|
|
async def create_pull_request(
|
|
self,
|
|
title: str,
|
|
head_branch: str,
|
|
base_branch: Optional[str] = None,
|
|
body: str = "",
|
|
repo: Optional[str] = None,
|
|
__user__: dict = None,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> str:
|
|
"""Create a pull request."""
|
|
token = self._get_token(__user__)
|
|
if not token:
|
|
return "Error: GITEA_TOKEN not configured"
|
|
try:
|
|
owner, repo_name = self._resolve_repo(repo, __user__)
|
|
except ValueError as e:
|
|
return f"Error: {e}"
|
|
effective_base = base_branch or self._get_branch(None, __user__)
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {"description": "Creating pull request...", "done": False},
|
|
}
|
|
)
|
|
try:
|
|
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
|
|
response = await client.post(
|
|
self._api_url(f"/repos/{owner}/{repo_name}/pulls"),
|
|
headers=self._headers(__user__),
|
|
json={
|
|
"title": title,
|
|
"head": head_branch,
|
|
"base": effective_base,
|
|
"body": body,
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
pr = response.json()
|
|
pr_number = pr.get("number")
|
|
pr_url = pr.get("html_url", "")
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {
|
|
"description": "Done",
|
|
"done": True,
|
|
"hidden": True,
|
|
},
|
|
}
|
|
)
|
|
return f"Created PR #{pr_number}: {title}\n{head_branch} → {effective_base}\nURL: {pr_url}"
|
|
except httpx.HTTPStatusError as e:
|
|
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
|
|
except Exception as e:
|
|
return f"Error: {type(e).__name__}: {e}"
|
|
|
|
async def list_pull_requests(
|
|
self,
|
|
state: str = "open",
|
|
repo: Optional[str] = None,
|
|
page: int = 1,
|
|
limit: Optional[int] = None,
|
|
__user__: dict = None,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> str:
|
|
"""List pull requests."""
|
|
token = self._get_token(__user__)
|
|
if not token:
|
|
return "Error: GITEA_TOKEN not configured"
|
|
try:
|
|
owner, repo_name = self._resolve_repo(repo, __user__)
|
|
except ValueError as e:
|
|
return f"Error: {e}"
|
|
page_size = self._get_page_size(limit)
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {"description": "Fetching pull requests...", "done": False},
|
|
}
|
|
)
|
|
try:
|
|
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
|
|
response = await client.get(
|
|
self._api_url(f"/repos/{owner}/{repo_name}/pulls"),
|
|
headers=self._headers(__user__),
|
|
params={"state": state, "page": page, "limit": page_size},
|
|
)
|
|
response.raise_for_status()
|
|
prs = response.json()
|
|
total_count = response.headers.get("x-total-count", "?")
|
|
output = (
|
|
f"Pull Requests in {owner}/{repo_name} ({state}, page {page}):\n\n"
|
|
)
|
|
if not prs:
|
|
output += "No pull requests found."
|
|
else:
|
|
for pr in prs:
|
|
number = pr.get("number")
|
|
title = pr.get("title", "")[:60]
|
|
user = pr.get("user", {}).get("login", "unknown")
|
|
head = pr.get("head", {}).get("ref", "")
|
|
base = pr.get("base", {}).get("ref", "")
|
|
mergeable = pr.get("mergeable", None)
|
|
merge_status = ""
|
|
if mergeable is True:
|
|
merge_status = " ✅"
|
|
elif mergeable is False:
|
|
merge_status = " ⚠️ conflicts"
|
|
output += f"- #{number}: {title}\n {head} → {base} (by {user}){merge_status}\n"
|
|
output += f"\nShowing {len(prs)} PRs (Total: {total_count})"
|
|
if len(prs) == page_size:
|
|
output += f"\nMore results may exist. Use page={page + 1} to continue."
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {
|
|
"description": "Done",
|
|
"done": True,
|
|
"hidden": True,
|
|
},
|
|
}
|
|
)
|
|
return output
|
|
except httpx.HTTPStatusError as e:
|
|
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
|
|
except Exception as e:
|
|
return f"Error: {type(e).__name__}: {e}"
|
|
|
|
async def get_pull_request(
|
|
self,
|
|
pr_number: int,
|
|
repo: Optional[str] = None,
|
|
__user__: dict = None,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> str:
|
|
"""Get details of a specific pull request."""
|
|
token = self._get_token(__user__)
|
|
if not token:
|
|
return "Error: GITEA_TOKEN not configured"
|
|
try:
|
|
owner, repo_name = self._resolve_repo(repo, __user__)
|
|
except ValueError as e:
|
|
return f"Error: {e}"
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {
|
|
"description": f"Fetching PR #{pr_number}...",
|
|
"done": False,
|
|
},
|
|
}
|
|
)
|
|
try:
|
|
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
|
|
response = await client.get(
|
|
self._api_url(f"/repos/{owner}/{repo_name}/pulls/{pr_number}"),
|
|
headers=self._headers(__user__),
|
|
)
|
|
response.raise_for_status()
|
|
pr = response.json()
|
|
output = f"Pull Request #{pr_number} in {owner}/{repo_name}\n\n"
|
|
output += f"**Title:** {pr.get('title', '')}\n"
|
|
output += f"**State:** {pr.get('state', 'unknown')}\n"
|
|
output += f"**Author:** {pr.get('user', {}).get('login', 'unknown')}\n"
|
|
output += f"**Branch:** {pr.get('head', {}).get('ref', '')} → {pr.get('base', {}).get('ref', '')}\n"
|
|
output += f"**Mergeable:** {pr.get('mergeable', 'unknown')}\n"
|
|
output += f"**URL:** {pr.get('html_url', '')}\n"
|
|
body = pr.get("body", "")
|
|
if body:
|
|
output += f"\n**Description:**\n{body[:500]}"
|
|
if len(body) > 500:
|
|
output += "...(truncated)"
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {
|
|
"description": "Done",
|
|
"done": True,
|
|
"hidden": True,
|
|
},
|
|
}
|
|
)
|
|
return output
|
|
except httpx.HTTPStatusError as e:
|
|
if e.response.status_code == 404:
|
|
return f"PR #{pr_number} not found"
|
|
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
|
|
except Exception as e:
|
|
return f"Error: {type(e).__name__}: {e}"
|
|
|
|
# ISSUES (WITH PAGINATION)
|
|
async def list_issues(
|
|
self,
|
|
state: str = "open",
|
|
repo: Optional[str] = None,
|
|
page: int = 1,
|
|
limit: Optional[int] = None,
|
|
labels: Optional[str] = None,
|
|
__user__: dict = None,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> str:
|
|
"""List issues in a repository."""
|
|
token = self._get_token(__user__)
|
|
if not token:
|
|
return "Error: GITEA_TOKEN not configured"
|
|
try:
|
|
owner, repo_name = self._resolve_repo(repo, __user__)
|
|
except ValueError as e:
|
|
return f"Error: {e}"
|
|
page_size = self._get_page_size(limit)
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {"description": "Fetching issues...", "done": False},
|
|
}
|
|
)
|
|
try:
|
|
params = {
|
|
"state": state,
|
|
"type": "issues",
|
|
"page": page,
|
|
"limit": page_size,
|
|
}
|
|
if labels:
|
|
params["labels"] = labels
|
|
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
|
|
response = await client.get(
|
|
self._api_url(f"/repos/{owner}/{repo_name}/issues"),
|
|
headers=self._headers(__user__),
|
|
params=params,
|
|
)
|
|
response.raise_for_status()
|
|
issues = response.json()
|
|
total_count = response.headers.get("x-total-count", "?")
|
|
output = f"Issues in {owner}/{repo_name} ({state}, page {page}):\n\n"
|
|
if not issues:
|
|
output += "No issues found."
|
|
else:
|
|
for issue in issues:
|
|
number = issue.get("number")
|
|
title = issue.get("title", "")[:60]
|
|
user = issue.get("user", {}).get("login", "unknown")
|
|
issue_labels = ", ".join(
|
|
[label.get("name", "") for label in issue.get("labels", [])]
|
|
)
|
|
labels_str = f" [{issue_labels}]" if issue_labels else ""
|
|
assignee = issue.get("assignee", {})
|
|
assignee_str = (
|
|
f" → {assignee.get('login', '')}" if assignee else ""
|
|
)
|
|
output += f"- #{number}: {title}{labels_str} (by {user}){assignee_str}\n"
|
|
output += f"\nShowing {len(issues)} issues (Total: {total_count})"
|
|
if len(issues) == page_size:
|
|
output += f"\nMore results may exist. Use page={page + 1} to continue."
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {
|
|
"description": "Done",
|
|
"done": True,
|
|
"hidden": True,
|
|
},
|
|
}
|
|
)
|
|
return output
|
|
except httpx.HTTPStatusError as e:
|
|
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
|
|
except Exception as e:
|
|
return f"Error: {type(e).__name__}: {e}"
|
|
|
|
async def create_issue(
|
|
self,
|
|
title: str,
|
|
body: str = "",
|
|
repo: Optional[str] = None,
|
|
labels: Optional[List[str]] = None,
|
|
assignee: Optional[str] = None,
|
|
__user__: dict = None,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> str:
|
|
"""Create a new issue."""
|
|
token = self._get_token(__user__)
|
|
if not token:
|
|
return "Error: GITEA_TOKEN not configured"
|
|
try:
|
|
owner, repo_name = self._resolve_repo(repo, __user__)
|
|
except ValueError as e:
|
|
return f"Error: {e}"
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {"description": "Creating issue...", "done": False},
|
|
}
|
|
)
|
|
try:
|
|
payload = {"title": title, "body": body}
|
|
if labels:
|
|
payload["labels"] = labels
|
|
if assignee:
|
|
payload["assignee"] = assignee
|
|
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
|
|
response = await client.post(
|
|
self._api_url(f"/repos/{owner}/{repo_name}/issues"),
|
|
headers=self._headers(__user__),
|
|
json=payload,
|
|
)
|
|
response.raise_for_status()
|
|
issue = response.json()
|
|
issue_number = issue.get("number")
|
|
issue_url = issue.get("html_url", "")
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {
|
|
"description": "Done",
|
|
"done": True,
|
|
"hidden": True,
|
|
},
|
|
}
|
|
)
|
|
return f"Created issue #{issue_number}: {title}\nURL: {issue_url}"
|
|
except httpx.HTTPStatusError as e:
|
|
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
|
|
except Exception as e:
|
|
return f"Error: {type(e).__name__}: {e}"
|
|
|
|
async def get_issue(
|
|
self,
|
|
issue_number: int,
|
|
repo: Optional[str] = None,
|
|
__user__: dict = None,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> str:
|
|
"""Get details of a specific issue."""
|
|
token = self._get_token(__user__)
|
|
if not token:
|
|
return "Error: GITEA_TOKEN not configured"
|
|
try:
|
|
owner, repo_name = self._resolve_repo(repo, __user__)
|
|
except ValueError as e:
|
|
return f"Error: {e}"
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {
|
|
"description": f"Fetching issue #{issue_number}...",
|
|
"done": False,
|
|
},
|
|
}
|
|
)
|
|
try:
|
|
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
|
|
response = await client.get(
|
|
self._api_url(f"/repos/{owner}/{repo_name}/issues/{issue_number}"),
|
|
headers=self._headers(__user__),
|
|
)
|
|
response.raise_for_status()
|
|
issue = response.json()
|
|
output = f"Issue #{issue_number} in {owner}/{repo_name}\n\n"
|
|
output += f"**Title:** {issue.get('title', '')}\n"
|
|
output += f"**State:** {issue.get('state', 'unknown')}\n"
|
|
output += (
|
|
f"**Author:** {issue.get('user', {}).get('login', 'unknown')}\n"
|
|
)
|
|
labels = [label.get("name", "") for label in issue.get("labels", [])]
|
|
if labels:
|
|
output += f"**Labels:** {', '.join(labels)}\n"
|
|
assignee = issue.get("assignee")
|
|
if assignee:
|
|
output += f"**Assignee:** {assignee.get('login', '')}\n"
|
|
output += f"**URL:** {issue.get('html_url', '')}\n"
|
|
body = issue.get("body", "")
|
|
if body:
|
|
output += f"\n**Description:**\n{body[:1000]}"
|
|
if len(body) > 1000:
|
|
output += "...(truncated)"
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {
|
|
"description": "Done",
|
|
"done": True,
|
|
"hidden": True,
|
|
},
|
|
}
|
|
)
|
|
return output
|
|
except httpx.HTTPStatusError as e:
|
|
if e.response.status_code == 404:
|
|
return f"Issue #{issue_number} not found"
|
|
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
|
|
except Exception as e:
|
|
return f"Error: {type(e).__name__}: {e}"
|
|
|
|
async def update_issue(
|
|
self,
|
|
issue_number: int,
|
|
repo: Optional[str] = None,
|
|
title: Optional[str] = None,
|
|
body: Optional[str] = None,
|
|
state: Optional[str] = None,
|
|
assignee: Optional[str] = None,
|
|
__user__: dict = None,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> str:
|
|
"""Update an existing issue."""
|
|
token = self._get_token(__user__)
|
|
if not token:
|
|
return "Error: GITEA_TOKEN not configured"
|
|
try:
|
|
owner, repo_name = self._resolve_repo(repo, __user__)
|
|
except ValueError as e:
|
|
return f"Error: {e}"
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {
|
|
"description": f"Updating issue #{issue_number}...",
|
|
"done": False,
|
|
},
|
|
}
|
|
)
|
|
# Build payload with only provided fields
|
|
payload = {}
|
|
if title is not None:
|
|
payload["title"] = title
|
|
if body is not None:
|
|
payload["body"] = body
|
|
if state is not None:
|
|
if state not in ("open", "closed"):
|
|
return f"Error: Invalid state '{state}'. Use 'open' or 'closed'."
|
|
payload["state"] = state
|
|
if assignee is not None:
|
|
payload["assignee"] = assignee
|
|
if not payload:
|
|
return "Error: No fields to update. Provide at least one of: title, body, state, assignee"
|
|
try:
|
|
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
|
|
response = await client.patch(
|
|
self._api_url(f"/repos/{owner}/{repo_name}/issues/{issue_number}"),
|
|
headers=self._headers(__user__),
|
|
json=payload,
|
|
)
|
|
response.raise_for_status()
|
|
issue = response.json()
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {
|
|
"description": "Done",
|
|
"done": True,
|
|
"hidden": True,
|
|
},
|
|
}
|
|
)
|
|
return f"Updated issue #{issue_number}: {issue.get('title', '')}\nState: {issue.get('state', 'unknown')}\nURL: {issue.get('html_url', '')}"
|
|
except httpx.HTTPStatusError as e:
|
|
if e.response.status_code == 404:
|
|
return f"Issue #{issue_number} not found"
|
|
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
|
|
except Exception as e:
|
|
return f"Error: {type(e).__name__}: {e}"
|
|
|
|
# PROJECT CONTEXT
|
|
async def get_project_context(
|
|
self,
|
|
repo: Optional[str] = None,
|
|
__user__: dict = None,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> str:
|
|
"""Get project context: README, structure, recent commits."""
|
|
token = self._get_token(__user__)
|
|
if not token:
|
|
return "Error: GITEA_TOKEN not configured"
|
|
try:
|
|
owner, repo_name = self._resolve_repo(repo, __user__)
|
|
except ValueError as e:
|
|
return f"Error: {e}"
|
|
branch = self._get_branch(None, __user__)
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {
|
|
"description": f"Loading project context for {owner}/{repo_name}...",
|
|
"done": False,
|
|
},
|
|
}
|
|
)
|
|
output = f"# Project: {owner}/{repo_name}\n\n"
|
|
try:
|
|
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
|
|
repo_response = await client.get(
|
|
self._api_url(f"/repos/{owner}/{repo_name}"),
|
|
headers=self._headers(__user__),
|
|
)
|
|
if repo_response.status_code == 200:
|
|
repo_info = repo_response.json()
|
|
output += f"**Description:** {repo_info.get('description', 'No description')}\n"
|
|
output += f"**Default Branch:** {repo_info.get('default_branch', 'main')}\n"
|
|
output += (
|
|
f"**Language:** {repo_info.get('language', 'Unknown')}\n\n"
|
|
)
|
|
root_response = await client.get(
|
|
self._api_url(f"/repos/{owner}/{repo_name}/contents/"),
|
|
headers=self._headers(__user__),
|
|
params={"ref": branch},
|
|
)
|
|
if root_response.status_code == 200:
|
|
contents = root_response.json()
|
|
output += "## Root Structure\n\n"
|
|
for item in sorted(
|
|
contents,
|
|
key=lambda x: (x.get("type") != "dir", x.get("name", "")),
|
|
):
|
|
icon = "📁" if item.get("type") == "dir" else "📄"
|
|
output += f"{icon} {item.get('name', '')}\n"
|
|
output += "\n"
|
|
for readme_name in [
|
|
"README.md",
|
|
"readme.md",
|
|
"README",
|
|
"README.txt",
|
|
]:
|
|
readme_response = await client.get(
|
|
self._api_url(
|
|
f"/repos/{owner}/{repo_name}/contents/{readme_name}"
|
|
),
|
|
headers=self._headers(__user__),
|
|
params={"ref": branch},
|
|
)
|
|
if readme_response.status_code == 200:
|
|
readme_info = readme_response.json()
|
|
content_b64 = readme_info.get("content", "")
|
|
try:
|
|
readme_content = base64.b64decode(content_b64).decode(
|
|
"utf-8"
|
|
)
|
|
if len(readme_content) > 2000:
|
|
readme_content = (
|
|
readme_content[:2000] + "\n\n... (truncated)"
|
|
)
|
|
output += f"## README\n\n{readme_content}\n\n"
|
|
except:
|
|
pass
|
|
break
|
|
commits_response = await client.get(
|
|
self._api_url(f"/repos/{owner}/{repo_name}/commits"),
|
|
headers=self._headers(__user__),
|
|
params={"sha": branch, "limit": 5},
|
|
)
|
|
if commits_response.status_code == 200:
|
|
commits = commits_response.json()
|
|
output += "## Recent Commits\n\n"
|
|
for commit in commits[:5]:
|
|
sha = commit.get("sha", "")[:8]
|
|
msg = (
|
|
commit.get("commit", {})
|
|
.get("message", "")
|
|
.split("\n")[0][:60]
|
|
)
|
|
author = (
|
|
commit.get("commit", {})
|
|
.get("author", {})
|
|
.get("name", "unknown")
|
|
)
|
|
output += f"- `{sha}` {msg} ({author})\n"
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {
|
|
"description": "Done",
|
|
"done": True,
|
|
"hidden": True,
|
|
},
|
|
}
|
|
)
|
|
return output
|
|
except Exception as e:
|
|
return f"Error loading project context: {type(e).__name__}: {e}"
|
|
|
|
# COMMITS (NEW)
|
|
async def list_commits(
|
|
self,
|
|
repo: Optional[str] = None,
|
|
branch: Optional[str] = None,
|
|
page: int = 1,
|
|
limit: Optional[int] = None,
|
|
path: Optional[str] = None,
|
|
__user__: dict = None,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> str:
|
|
"""List commits in a repository."""
|
|
token = self._get_token(__user__)
|
|
if not token:
|
|
return "Error: GITEA_TOKEN not configured"
|
|
try:
|
|
owner, repo_name = self._resolve_repo(repo, __user__)
|
|
except ValueError as e:
|
|
return f"Error: {e}"
|
|
effective_branch = self._get_branch(branch, __user__)
|
|
page_size = self._get_page_size(limit)
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {"description": f"Fetching commits...", "done": False},
|
|
}
|
|
)
|
|
try:
|
|
params = {
|
|
"sha": effective_branch,
|
|
"page": page,
|
|
"limit": page_size,
|
|
}
|
|
if path:
|
|
params["path"] = path
|
|
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
|
|
response = await client.get(
|
|
self._api_url(f"/repos/{owner}/{repo_name}/commits"),
|
|
headers=self._headers(__user__),
|
|
params=params,
|
|
)
|
|
response.raise_for_status()
|
|
commits = response.json()
|
|
total_count = response.headers.get("x-total-count", "?")
|
|
output = f"Commits in {owner}/{repo_name} ({effective_branch}, page {page}):\n"
|
|
if path:
|
|
output += f"Filtered to: {path}\n"
|
|
output += "\n"
|
|
for commit in commits:
|
|
sha = commit.get("sha", "")[:8]
|
|
commit_data = commit.get("commit", {})
|
|
msg = commit_data.get("message", "").split("\n")[0][:60]
|
|
author = commit_data.get("author", {}).get("name", "unknown")
|
|
date = commit_data.get("author", {}).get("date", "")[:10]
|
|
output += f"- `{sha}` {msg}\n by {author} on {date}\n"
|
|
output += f"\nShowing {len(commits)} commits (Total: {total_count})"
|
|
if len(commits) == page_size:
|
|
output += (
|
|
f"\nMore results may exist. Use page={page + 1} to continue."
|
|
)
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {
|
|
"description": "Done",
|
|
"done": True,
|
|
"hidden": True,
|
|
},
|
|
}
|
|
)
|
|
return output
|
|
except httpx.HTTPStatusError as e:
|
|
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
|
|
except Exception as e:
|
|
return f"Error: {type(e).__name__}: {e}" |