Files
tools/gitea/dev.py
2026-01-13 18:23:29 +00:00

1342 lines
56 KiB
Python

"""
title: Gitea Dev
author: Jeff Smith + minimax + Claude
version: 1.1.0
license: MIT
description: Interact with Gitea repositories - read, write, branch, and PR/Issue workflows
requirements: pydantic, httpx
"""
from typing import Optional, Callable, Any, List, Dict
from pydantic import BaseModel, Field
import httpx
import base64
class Tools:
class Valves(BaseModel):
"""Shared configuration (set by admin)"""
GITEA_URL: str = Field(
default="https://gitea.example.com",
description="Gitea URL (ingress or internal service)",
)
DEFAULT_REPO: str = Field(
default="",
description="Default repository in owner/repo format",
)
DEFAULT_BRANCH: str = Field(default="main", description="Default branch name")
DEFAULT_ORG: str = Field(default="", description="Default organization")
ALLOW_USER_OVERRIDES: bool = Field(
default=True,
description="Allow users to override defaults via UserValves (False = locked to admin defaults)",
)
# Pagination defaults
DEFAULT_PAGE_SIZE: int = Field(
default=50,
description="Default page size for list operations (max 50)",
)
class UserValves(BaseModel):
"""Per-user configuration (personal credentials and overrides)"""
GITEA_TOKEN: str = Field(
default="",
description="Your Gitea API token (Settings > Applications > Generate Token)",
)
USER_DEFAULT_REPO: str = Field(
default="",
description="Override default repository",
)
USER_DEFAULT_BRANCH: str = Field(
default="",
description="Override default branch",
)
USER_DEFAULT_ORG: str = Field(
default="",
description="Override default organization",
)
def __init__(self):
self.valves = self.Valves()
self.citation = False
def _api_url(self, endpoint: str) -> str:
base = self._get_url()
return f"{base}/api/v1{endpoint}"
def _get_url(self) -> str:
"""Get effective URL."""
return self.valves.GITEA_URL.rstrip("/")
def _get_token(self, __user__: dict = None) -> str:
"""Extract token from framework-provided user context."""
if __user__ and "valves" in __user__:
user_valves = self.UserValves(**__user__["valves"])
return user_valves.GITEA_TOKEN
return ""
def _headers(self, __user__: dict = None) -> dict:
token = self._get_token(__user__)
return {
"Authorization": f"token {token}",
"Content-Type": "application/json",
}
def _get_repo(self, repo: Optional[str], __user__: dict = None) -> str:
"""Get effective repo with priority."""
if repo:
return repo
if __user__ and "valves" in __user__:
user_valves = self.UserValves(**__user__["valves"])
if self.valves.ALLOW_USER_OVERRIDES and user_valves.USER_DEFAULT_REPO:
return user_valves.USER_DEFAULT_REPO
return self.valves.DEFAULT_REPO
def _get_branch(self, branch: Optional[str], __user__: dict = None) -> str:
"""Get effective branch with priority."""
if branch:
return branch
if __user__ and "valves" in __user__:
user_valves = self.UserValves(**__user__["valves"])
if self.valves.ALLOW_USER_OVERRIDES and user_valves.USER_DEFAULT_BRANCH:
return user_valves.USER_DEFAULT_BRANCH
return self.valves.DEFAULT_BRANCH
def _get_org(self, org: Optional[str], __user__: dict = None) -> str:
"""Get effective org with priority."""
if org:
return org
if __user__ and "valves" in __user__:
user_valves = self.UserValves(**__user__["valves"])
if self.valves.ALLOW_USER_OVERRIDES and user_valves.USER_DEFAULT_ORG:
return user_valves.USER_DEFAULT_ORG
return self.valves.DEFAULT_ORG
def _resolve_repo(
self, repo: Optional[str], __user__: dict = None
) -> tuple[str, str]:
effective_repo = self._get_repo(repo, __user__)
if not effective_repo:
raise ValueError(
"No repository specified. Set DEFAULT_REPO in Valves or USER_DEFAULT_REPO in UserValves."
)
if "/" not in effective_repo:
raise ValueError(
f"Repository must be in owner/repo format, got: {effective_repo}"
)
return effective_repo.split("/", 1)
def _get_page_size(self, limit: Optional[int] = None) -> int:
"""Get effective page size, capped at 50 (Gitea max)."""
if limit is not None:
return min(limit, 50)
return min(self.valves.DEFAULT_PAGE_SIZE, 50)
# REPOSITORY DISCOVERY (WITH PAGINATION)
async def list_repos(
self,
page: int = 1,
limit: Optional[int] = None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""List repositories accessible to the authenticated user."""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured. Add it in User Settings > Gitea Dev."
page_size = self._get_page_size(limit)
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Fetching repositories (page {page})...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
response = await client.get(
self._api_url("/user/repos"),
headers=self._headers(__user__),
params={"page": page, "limit": page_size},
)
response.raise_for_status()
repos = response.json()
total_count = response.headers.get("x-total-count", "?")
output = f"Accessible Repositories (Page {page}, {page_size}/page):\n\n"
for repo in repos:
full_name = repo.get("full_name", "")
desc = repo.get("description", "")[:50] or "No description"
private = "🔒" if repo.get("private") else "🌐"
default_branch = repo.get("default_branch", "main")
output += f"- {private} {full_name} ({default_branch}): {desc}\n"
output += f"\nShowing {len(repos)} repos (Total: {total_count})"
if len(repos) == page_size:
output += (
f"\nMore results may exist. Use page={page + 1} to continue."
)
effective_repo = self._get_repo(None, __user__)
if effective_repo:
output += f"\nActive project: {effective_repo}"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": "Done",
"done": True,
"hidden": True,
},
}
)
return output
except httpx.HTTPStatusError as e:
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
async def list_branches(
self,
repo: Optional[str] = None,
page: int = 1,
limit: Optional[int] = None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""List branches in a repository."""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured"
try:
owner, repo_name = self._resolve_repo(repo, __user__)
except ValueError as e:
return f"Error: {e}"
page_size = self._get_page_size(limit)
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Fetching branches for {owner}/{repo_name} (page {page})...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}/branches"),
headers=self._headers(__user__),
params={"page": page, "limit": page_size},
)
response.raise_for_status()
branches = response.json()
total_count = response.headers.get("x-total-count", "?")
output = f"Branches in {owner}/{repo_name} (Page {page}):\n\n"
for branch in branches:
name = branch.get("name", "")
protected = "🛡️" if branch.get("protected") else ""
commit_sha = branch.get("commit", {}).get("id", "")[:8]
output += f"- {name} {protected} [{commit_sha}]\n"
output += f"\nShowing {len(branches)} branches (Total: {total_count})"
if len(branches) == page_size:
output += (
f"\nMore results may exist. Use page={page + 1} to continue."
)
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": "Done",
"done": True,
"hidden": True,
},
}
)
return output
except httpx.HTTPStatusError as e:
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
async def list_files(
self,
path: str = "",
repo: Optional[str] = None,
branch: Optional[str] = None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""List files and directories in a repository path."""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured"
try:
owner, repo_name = self._resolve_repo(repo, __user__)
except ValueError as e:
return f"Error: {e}"
effective_branch = self._get_branch(branch, __user__)
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Listing {path or 'root'}...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._headers(__user__),
params={"ref": effective_branch},
)
response.raise_for_status()
contents = response.json()
if isinstance(contents, dict):
return f"'{path}' is a file, not a directory. Use get_file() to read it."
output = f"Contents of {owner}/{repo_name}/{path or '.'} ({effective_branch}):\n\n"
dirs = [c for c in contents if c.get("type") == "dir"]
files = [c for c in contents if c.get("type") == "file"]
for item in sorted(dirs, key=lambda x: x.get("name", "")):
output += f"📁 {item.get('name', '')}/\n"
for item in sorted(files, key=lambda x: x.get("name", "")):
size = item.get("size", 0)
size_str = f"{size}B" if size < 1024 else f"{size//1024}KB"
output += f"📄 {item.get('name', '')} ({size_str})\n"
output += f"\nTotal: {len(dirs)} directories, {len(files)} files"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": "Done",
"done": True,
"hidden": True,
},
}
)
return output
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return f"Path not found: {path}"
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
async def get_file(
self,
path: str,
repo: Optional[str] = None,
branch: Optional[str] = None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Get contents of a file from the repository."""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured"
try:
owner, repo_name = self._resolve_repo(repo, __user__)
except ValueError as e:
return f"Error: {e}"
effective_branch = self._get_branch(branch, __user__)
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": f"Reading {path}...", "done": False},
}
)
try:
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._headers(__user__),
params={"ref": effective_branch},
)
response.raise_for_status()
file_info = response.json()
if isinstance(file_info, list):
return f"'{path}' is a directory, not a file. Use list_files() to browse."
if file_info.get("type") != "file":
return f"'{path}' is not a file (type: {file_info.get('type')})"
content_b64 = file_info.get("content", "")
try:
content = base64.b64decode(content_b64).decode("utf-8")
except Exception:
return "Error: Could not decode file content (may be binary)"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": "Done",
"done": True,
"hidden": True,
},
}
)
return f"File: {owner}/{repo_name}/{path} ({effective_branch})\nSHA: {file_info.get('sha', 'unknown')[:8]}\n\n```\n{content}\n```"
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return f"File not found: {path}"
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
async def create_file(
self,
path: str,
content: str,
message: str,
repo: Optional[str] = None,
branch: Optional[str] = None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Create a new file in the repository."""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured"
try:
owner, repo_name = self._resolve_repo(repo, __user__)
except ValueError as e:
return f"Error: {e}"
effective_branch = self._get_branch(branch, __user__)
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": f"Creating {path}...", "done": False},
}
)
try:
content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii")
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
response = await client.post(
self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._headers(__user__),
json={
"content": content_b64,
"message": message,
"branch": effective_branch,
},
)
response.raise_for_status()
result = response.json()
commit_sha = result.get("commit", {}).get("sha", "unknown")[:8]
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": "Done",
"done": True,
"hidden": True,
},
}
)
return f"Created {path} on {effective_branch}\nCommit: {commit_sha}\nMessage: {message}"
except httpx.HTTPStatusError as e:
if e.response.status_code == 422:
return f"File already exists: {path}. Use update_file() instead."
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
async def update_file(
self,
path: str,
content: str,
message: str,
repo: Optional[str] = None,
branch: Optional[str] = None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Update an existing file in the repository."""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured"
try:
owner, repo_name = self._resolve_repo(repo, __user__)
except ValueError as e:
return f"Error: {e}"
effective_branch = self._get_branch(branch, __user__)
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": f"Updating {path}...", "done": False},
}
)
try:
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
get_response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._headers(__user__),
params={"ref": effective_branch},
)
if get_response.status_code == 404:
return f"File not found: {path}. Use create_file() instead."
get_response.raise_for_status()
file_info = get_response.json()
sha = file_info.get("sha")
if not sha:
return "Error: Could not get file SHA for update"
content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii")
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
response = await client.put(
self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._headers(__user__),
json={
"content": content_b64,
"message": message,
"branch": effective_branch,
"sha": sha,
},
)
response.raise_for_status()
result = response.json()
commit_sha = result.get("commit", {}).get("sha", "unknown")[:8]
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": "Done",
"done": True,
"hidden": True,
},
}
)
return f"Updated {path} on {effective_branch}\nCommit: {commit_sha}\nMessage: {message}"
except httpx.HTTPStatusError as e:
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
async def delete_file(
self,
path: str,
message: str,
repo: Optional[str] = None,
branch: Optional[str] = None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
__event_call__: Callable[[dict], Any] = None,
) -> str:
"""Delete a file from the repository."""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured"
try:
owner, repo_name = self._resolve_repo(repo, __user__)
except ValueError as e:
return f"Error: {e}"
effective_branch = self._get_branch(branch, __user__)
# Confirm deletion
if __event_call__:
result = await __event_call__(
{
"type": "confirmation",
"data": {
"title": "Delete File?",
"message": f"Delete {path} from {owner}/{repo_name} ({effective_branch})?",
},
}
)
if result is None or result is False:
return "delete_file CANCELLED"
if isinstance(result, dict) and not result.get("confirmed"):
return "delete_file CANCELLED"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": f"Deleting {path}...", "done": False},
}
)
try:
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
get_response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._headers(__user__),
params={"ref": effective_branch},
)
if get_response.status_code == 404:
return f"File not found: {path}"
get_response.raise_for_status()
file_info = get_response.json()
sha = file_info.get("sha")
if not sha:
return "Error: Could not get file SHA for delete"
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
response = await client.delete(
self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._headers(__user__),
json={
"message": message,
"branch": effective_branch,
"sha": sha,
},
)
response.raise_for_status()
result = response.json()
commit_sha = result.get("commit", {}).get("sha", "unknown")[:8]
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": "Done",
"done": True,
"hidden": True,
},
}
)
return f"Deleted {path} on {effective_branch}\nCommit: {commit_sha}\nMessage: {message}"
except httpx.HTTPStatusError as e:
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
# BRANCHING & PR (WITH PAGINATION)
async def create_branch(
self,
branch_name: str,
from_branch: Optional[str] = None,
repo: Optional[str] = None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Create a new branch."""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured"
try:
owner, repo_name = self._resolve_repo(repo, __user__)
except ValueError as e:
return f"Error: {e}"
effective_from = from_branch or self._get_branch(None, __user__)
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Creating branch {branch_name}...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
response = await client.post(
self._api_url(f"/repos/{owner}/{repo_name}/branches"),
headers=self._headers(__user__),
json={
"new_branch_name": branch_name,
"old_branch_name": effective_from,
},
)
response.raise_for_status()
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": "Done",
"done": True,
"hidden": True,
},
}
)
return f"Created branch '{branch_name}' from '{effective_from}' in {owner}/{repo_name}"
except httpx.HTTPStatusError as e:
if e.response.status_code == 409:
return f"Branch '{branch_name}' already exists"
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
async def create_pull_request(
self,
title: str,
head_branch: str,
base_branch: Optional[str] = None,
body: str = "",
repo: Optional[str] = None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Create a pull request."""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured"
try:
owner, repo_name = self._resolve_repo(repo, __user__)
except ValueError as e:
return f"Error: {e}"
effective_base = base_branch or self._get_branch(None, __user__)
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Creating pull request...", "done": False},
}
)
try:
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
response = await client.post(
self._api_url(f"/repos/{owner}/{repo_name}/pulls"),
headers=self._headers(__user__),
json={
"title": title,
"head": head_branch,
"base": effective_base,
"body": body,
},
)
response.raise_for_status()
pr = response.json()
pr_number = pr.get("number")
pr_url = pr.get("html_url", "")
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": "Done",
"done": True,
"hidden": True,
},
}
)
return f"Created PR #{pr_number}: {title}\n{head_branch}{effective_base}\nURL: {pr_url}"
except httpx.HTTPStatusError as e:
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
async def list_pull_requests(
self,
state: str = "open",
repo: Optional[str] = None,
page: int = 1,
limit: Optional[int] = None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""List pull requests."""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured"
try:
owner, repo_name = self._resolve_repo(repo, __user__)
except ValueError as e:
return f"Error: {e}"
page_size = self._get_page_size(limit)
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Fetching pull requests...", "done": False},
}
)
try:
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}/pulls"),
headers=self._headers(__user__),
params={"state": state, "page": page, "limit": page_size},
)
response.raise_for_status()
prs = response.json()
total_count = response.headers.get("x-total-count", "?")
output = (
f"Pull Requests in {owner}/{repo_name} ({state}, page {page}):\n\n"
)
if not prs:
output += "No pull requests found."
else:
for pr in prs:
number = pr.get("number")
title = pr.get("title", "")[:60]
user = pr.get("user", {}).get("login", "unknown")
head = pr.get("head", {}).get("ref", "")
base = pr.get("base", {}).get("ref", "")
mergeable = pr.get("mergeable", None)
merge_status = ""
if mergeable is True:
merge_status = ""
elif mergeable is False:
merge_status = " ⚠️ conflicts"
output += f"- #{number}: {title}\n {head}{base} (by {user}){merge_status}\n"
output += f"\nShowing {len(prs)} PRs (Total: {total_count})"
if len(prs) == page_size:
output += f"\nMore results may exist. Use page={page + 1} to continue."
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": "Done",
"done": True,
"hidden": True,
},
}
)
return output
except httpx.HTTPStatusError as e:
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
async def get_pull_request(
self,
pr_number: int,
repo: Optional[str] = None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Get details of a specific pull request."""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured"
try:
owner, repo_name = self._resolve_repo(repo, __user__)
except ValueError as e:
return f"Error: {e}"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Fetching PR #{pr_number}...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}/pulls/{pr_number}"),
headers=self._headers(__user__),
)
response.raise_for_status()
pr = response.json()
output = f"Pull Request #{pr_number} in {owner}/{repo_name}\n\n"
output += f"**Title:** {pr.get('title', '')}\n"
output += f"**State:** {pr.get('state', 'unknown')}\n"
output += f"**Author:** {pr.get('user', {}).get('login', 'unknown')}\n"
output += f"**Branch:** {pr.get('head', {}).get('ref', '')}{pr.get('base', {}).get('ref', '')}\n"
output += f"**Mergeable:** {pr.get('mergeable', 'unknown')}\n"
output += f"**URL:** {pr.get('html_url', '')}\n"
body = pr.get("body", "")
if body:
output += f"\n**Description:**\n{body[:500]}"
if len(body) > 500:
output += "...(truncated)"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": "Done",
"done": True,
"hidden": True,
},
}
)
return output
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return f"PR #{pr_number} not found"
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
# ISSUES (WITH PAGINATION)
async def list_issues(
self,
state: str = "open",
repo: Optional[str] = None,
page: int = 1,
limit: Optional[int] = None,
labels: Optional[str] = None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""List issues in a repository."""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured"
try:
owner, repo_name = self._resolve_repo(repo, __user__)
except ValueError as e:
return f"Error: {e}"
page_size = self._get_page_size(limit)
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Fetching issues...", "done": False},
}
)
try:
params = {
"state": state,
"type": "issues",
"page": page,
"limit": page_size,
}
if labels:
params["labels"] = labels
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}/issues"),
headers=self._headers(__user__),
params=params,
)
response.raise_for_status()
issues = response.json()
total_count = response.headers.get("x-total-count", "?")
output = f"Issues in {owner}/{repo_name} ({state}, page {page}):\n\n"
if not issues:
output += "No issues found."
else:
for issue in issues:
number = issue.get("number")
title = issue.get("title", "")[:60]
user = issue.get("user", {}).get("login", "unknown")
issue_labels = ", ".join(
[label.get("name", "") for label in issue.get("labels", [])]
)
labels_str = f" [{issue_labels}]" if issue_labels else ""
assignee = issue.get("assignee", {})
assignee_str = (
f"{assignee.get('login', '')}" if assignee else ""
)
output += f"- #{number}: {title}{labels_str} (by {user}){assignee_str}\n"
output += f"\nShowing {len(issues)} issues (Total: {total_count})"
if len(issues) == page_size:
output += f"\nMore results may exist. Use page={page + 1} to continue."
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": "Done",
"done": True,
"hidden": True,
},
}
)
return output
except httpx.HTTPStatusError as e:
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
async def create_issue(
self,
title: str,
body: str = "",
repo: Optional[str] = None,
labels: Optional[List[str]] = None,
assignee: Optional[str] = None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Create a new issue."""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured"
try:
owner, repo_name = self._resolve_repo(repo, __user__)
except ValueError as e:
return f"Error: {e}"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Creating issue...", "done": False},
}
)
try:
payload = {"title": title, "body": body}
if labels:
payload["labels"] = labels
if assignee:
payload["assignee"] = assignee
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
response = await client.post(
self._api_url(f"/repos/{owner}/{repo_name}/issues"),
headers=self._headers(__user__),
json=payload,
)
response.raise_for_status()
issue = response.json()
issue_number = issue.get("number")
issue_url = issue.get("html_url", "")
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": "Done",
"done": True,
"hidden": True,
},
}
)
return f"Created issue #{issue_number}: {title}\nURL: {issue_url}"
except httpx.HTTPStatusError as e:
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
async def get_issue(
self,
issue_number: int,
repo: Optional[str] = None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Get details of a specific issue."""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured"
try:
owner, repo_name = self._resolve_repo(repo, __user__)
except ValueError as e:
return f"Error: {e}"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Fetching issue #{issue_number}...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}/issues/{issue_number}"),
headers=self._headers(__user__),
)
response.raise_for_status()
issue = response.json()
output = f"Issue #{issue_number} in {owner}/{repo_name}\n\n"
output += f"**Title:** {issue.get('title', '')}\n"
output += f"**State:** {issue.get('state', 'unknown')}\n"
output += (
f"**Author:** {issue.get('user', {}).get('login', 'unknown')}\n"
)
labels = [label.get("name", "") for label in issue.get("labels", [])]
if labels:
output += f"**Labels:** {', '.join(labels)}\n"
assignee = issue.get("assignee")
if assignee:
output += f"**Assignee:** {assignee.get('login', '')}\n"
output += f"**URL:** {issue.get('html_url', '')}\n"
body = issue.get("body", "")
if body:
output += f"\n**Description:**\n{body[:1000]}"
if len(body) > 1000:
output += "...(truncated)"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": "Done",
"done": True,
"hidden": True,
},
}
)
return output
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return f"Issue #{issue_number} not found"
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
async def update_issue(
self,
issue_number: int,
repo: Optional[str] = None,
title: Optional[str] = None,
body: Optional[str] = None,
state: Optional[str] = None,
assignee: Optional[str] = None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Update an existing issue."""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured"
try:
owner, repo_name = self._resolve_repo(repo, __user__)
except ValueError as e:
return f"Error: {e}"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Updating issue #{issue_number}...",
"done": False,
},
}
)
# Build payload with only provided fields
payload = {}
if title is not None:
payload["title"] = title
if body is not None:
payload["body"] = body
if state is not None:
if state not in ("open", "closed"):
return f"Error: Invalid state '{state}'. Use 'open' or 'closed'."
payload["state"] = state
if assignee is not None:
payload["assignee"] = assignee
if not payload:
return "Error: No fields to update. Provide at least one of: title, body, state, assignee"
try:
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
response = await client.patch(
self._api_url(f"/repos/{owner}/{repo_name}/issues/{issue_number}"),
headers=self._headers(__user__),
json=payload,
)
response.raise_for_status()
issue = response.json()
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": "Done",
"done": True,
"hidden": True,
},
}
)
return f"Updated issue #{issue_number}: {issue.get('title', '')}\nState: {issue.get('state', 'unknown')}\nURL: {issue.get('html_url', '')}"
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return f"Issue #{issue_number} not found"
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
# PROJECT CONTEXT
async def get_project_context(
self,
repo: Optional[str] = None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Get project context: README, structure, recent commits."""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured"
try:
owner, repo_name = self._resolve_repo(repo, __user__)
except ValueError as e:
return f"Error: {e}"
branch = self._get_branch(None, __user__)
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Loading project context for {owner}/{repo_name}...",
"done": False,
},
}
)
output = f"# Project: {owner}/{repo_name}\n\n"
try:
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
repo_response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}"),
headers=self._headers(__user__),
)
if repo_response.status_code == 200:
repo_info = repo_response.json()
output += f"**Description:** {repo_info.get('description', 'No description')}\n"
output += f"**Default Branch:** {repo_info.get('default_branch', 'main')}\n"
output += (
f"**Language:** {repo_info.get('language', 'Unknown')}\n\n"
)
root_response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}/contents/"),
headers=self._headers(__user__),
params={"ref": branch},
)
if root_response.status_code == 200:
contents = root_response.json()
output += "## Root Structure\n\n"
for item in sorted(
contents,
key=lambda x: (x.get("type") != "dir", x.get("name", "")),
):
icon = "📁" if item.get("type") == "dir" else "📄"
output += f"{icon} {item.get('name', '')}\n"
output += "\n"
for readme_name in [
"README.md",
"readme.md",
"README",
"README.txt",
]:
readme_response = await client.get(
self._api_url(
f"/repos/{owner}/{repo_name}/contents/{readme_name}"
),
headers=self._headers(__user__),
params={"ref": branch},
)
if readme_response.status_code == 200:
readme_info = readme_response.json()
content_b64 = readme_info.get("content", "")
try:
readme_content = base64.b64decode(content_b64).decode(
"utf-8"
)
if len(readme_content) > 2000:
readme_content = (
readme_content[:2000] + "\n\n... (truncated)"
)
output += f"## README\n\n{readme_content}\n\n"
except:
pass
break
commits_response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}/commits"),
headers=self._headers(__user__),
params={"sha": branch, "limit": 5},
)
if commits_response.status_code == 200:
commits = commits_response.json()
output += "## Recent Commits\n\n"
for commit in commits[:5]:
sha = commit.get("sha", "")[:8]
msg = (
commit.get("commit", {})
.get("message", "")
.split("\n")[0][:60]
)
author = (
commit.get("commit", {})
.get("author", {})
.get("name", "unknown")
)
output += f"- `{sha}` {msg} ({author})\n"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": "Done",
"done": True,
"hidden": True,
},
}
)
return output
except Exception as e:
return f"Error loading project context: {type(e).__name__}: {e}"
# COMMITS (NEW)
async def list_commits(
self,
repo: Optional[str] = None,
branch: Optional[str] = None,
page: int = 1,
limit: Optional[int] = None,
path: Optional[str] = None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""List commits in a repository."""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured"
try:
owner, repo_name = self._resolve_repo(repo, __user__)
except ValueError as e:
return f"Error: {e}"
effective_branch = self._get_branch(branch, __user__)
page_size = self._get_page_size(limit)
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": f"Fetching commits...", "done": False},
}
)
try:
params = {
"sha": effective_branch,
"page": page,
"limit": page_size,
}
if path:
params["path"] = path
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}/commits"),
headers=self._headers(__user__),
params=params,
)
response.raise_for_status()
commits = response.json()
total_count = response.headers.get("x-total-count", "?")
output = f"Commits in {owner}/{repo_name} ({effective_branch}, page {page}):\n"
if path:
output += f"Filtered to: {path}\n"
output += "\n"
for commit in commits:
sha = commit.get("sha", "")[:8]
commit_data = commit.get("commit", {})
msg = commit_data.get("message", "").split("\n")[0][:60]
author = commit_data.get("author", {}).get("name", "unknown")
date = commit_data.get("author", {}).get("date", "")[:10]
output += f"- `{sha}` {msg}\n by {author} on {date}\n"
output += f"\nShowing {len(commits)} commits (Total: {total_count})"
if len(commits) == page_size:
output += (
f"\nMore results may exist. Use page={page + 1} to continue."
)
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": "Done",
"done": True,
"hidden": True,
},
}
)
return output
except httpx.HTTPStatusError as e:
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
except Exception as e:
return f"Error: {type(e).__name__}: {e}"