Update gitea/dev.py

This commit is contained in:
2026-01-13 19:15:27 +00:00
parent d96c439c06
commit 2a3b615a3b

View File

@@ -1,10 +1,21 @@
"""
title: Gitea Dev
author: Jeff Smith + minimax + Claude
version: 1.1.0
version: 1.2.0
license: MIT
description: Interact with Gitea repositories - read, write, branch, and PR/Issue workflows
requirements: pydantic, httpx
changelog:
1.2.0:
- Added configurable SSL verification (VERIFY_SSL valve)
- Fixed nested AsyncClient creation in update_file() and delete_file()
- Improved error handling with JSON message parsing
- Added _format_error() helper method
- Added get_repo() method for repository details
- Added merge_pull_request() method
- Added update_pull_request() method
- Added get_commit() method for commit details
- Fixed all httpx client reuse issues
"""
from typing import Optional, Callable, Any, List, Dict
@@ -31,6 +42,10 @@ class Tools:
default=True,
description="Allow users to override defaults via UserValves (False = locked to admin defaults)",
)
VERIFY_SSL: bool = Field(
default=True,
description="Verify SSL certificates (disable only for self-signed certs)",
)
# Pagination defaults
DEFAULT_PAGE_SIZE: int = Field(
default=50,
@@ -83,6 +98,15 @@ class Tools:
"Content-Type": "application/json",
}
def _format_error(self, e: httpx.HTTPStatusError) -> str:
"""Format HTTP error with JSON message if available."""
try:
error_json = e.response.json()
error_msg = error_json.get("message", e.response.text[:200])
except:
error_msg = e.response.text[:200]
return f"HTTP Error {e.response.status_code}: {error_msg}"
def _get_repo(self, repo: Optional[str], __user__: dict = None) -> str:
"""Get effective repo with priority."""
if repo:
@@ -157,7 +181,9 @@ class Tools:
}
)
try:
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.get(
self._api_url("/user/repos"),
headers=self._headers(__user__),
@@ -194,7 +220,82 @@ class Tools:
)
return output
except httpx.HTTPStatusError as e:
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
async def get_repo(
self,
repo: Optional[str] = None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Get detailed repository information.
:param repo: Repository in owner/repo format
"""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured"
try:
owner, repo_name = self._resolve_repo(repo, __user__)
except ValueError as e:
return f"Error: {e}"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Fetching repository {owner}/{repo_name}...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}"),
headers=self._headers(__user__),
)
response.raise_for_status()
repo_info = response.json()
output = f"# Repository: {owner}/{repo_name}\n\n"
output += f"**Full Name:** {repo_info.get('full_name', 'N/A')}\n"
output += (
f"**Description:** {repo_info.get('description', 'No description')}\n"
)
output += f"**Private:** {'Yes' if repo_info.get('private') else 'No'}\n"
output += f"**Fork:** {'Yes' if repo_info.get('fork') else 'No'}\n"
output += f"**Stars:** {repo_info.get('stars_count', 0)}\n"
output += f"**Forks:** {repo_info.get('forks_count', 0)}\n"
output += f"**Watchers:** {repo_info.get('watchers_count', 0)}\n"
output += f"**Open Issues:** {repo_info.get('open_issues_count', 0)}\n"
output += f"**Size:** {repo_info.get('size', 0)} KB\n"
output += f"**Language:** {repo_info.get('language', 'N/A')}\n"
output += f"**Default Branch:** {repo_info.get('default_branch', 'main')}\n"
output += f"**Created:** {repo_info.get('created_at', 'N/A')[:10]}\n"
output += f"**Updated:** {repo_info.get('updated_at', 'N/A')[:10]}\n"
output += f"**HTML URL:** {repo_info.get('html_url', 'N/A')}\n"
output += f"**Clone URL:** {repo_info.get('clone_url', 'N/A')}\n"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
return output
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return f"Repository '{owner}/{repo_name}' not found"
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
@@ -226,7 +327,9 @@ class Tools:
}
)
try:
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}/branches"),
headers=self._headers(__user__),
@@ -259,7 +362,7 @@ class Tools:
)
return output
except httpx.HTTPStatusError as e:
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
@@ -291,7 +394,9 @@ class Tools:
}
)
try:
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._headers(__user__),
@@ -326,7 +431,7 @@ class Tools:
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return f"Path not found: {path}"
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
@@ -355,7 +460,9 @@ class Tools:
}
)
try:
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._headers(__user__),
@@ -387,7 +494,7 @@ class Tools:
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return f"File not found: {path}"
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
@@ -419,7 +526,9 @@ class Tools:
)
try:
content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii")
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.post(
self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._headers(__user__),
@@ -447,7 +556,7 @@ class Tools:
except httpx.HTTPStatusError as e:
if e.response.status_code == 422:
return f"File already exists: {path}. Use update_file() instead."
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
@@ -478,7 +587,10 @@ class Tools:
}
)
try:
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
# FIXED: Reuse single client instance
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
get_response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._headers(__user__),
@@ -492,34 +604,35 @@ class Tools:
if not sha:
return "Error: Could not get file SHA for update"
content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii")
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
response = await client.put(
self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._headers(__user__),
json={
"content": content_b64,
"message": message,
"branch": effective_branch,
"sha": sha,
},
# FIXED: Using same client instance
response = await client.put(
self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._headers(__user__),
json={
"content": content_b64,
"message": message,
"branch": effective_branch,
"sha": sha,
},
)
response.raise_for_status()
result = response.json()
commit_sha = result.get("commit", {}).get("sha", "unknown")[:8]
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": "Done",
"done": True,
"hidden": True,
},
}
)
response.raise_for_status()
result = response.json()
commit_sha = result.get("commit", {}).get("sha", "unknown")[:8]
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": "Done",
"done": True,
"hidden": True,
},
}
)
return f"Updated {path} on {effective_branch}\nCommit: {commit_sha}\nMessage: {message}"
return f"Updated {path} on {effective_branch}\nCommit: {commit_sha}\nMessage: {message}"
except httpx.HTTPStatusError as e:
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
@@ -565,7 +678,10 @@ class Tools:
}
)
try:
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
# FIXED: Reuse single client instance
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
get_response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._headers(__user__),
@@ -578,33 +694,34 @@ class Tools:
sha = file_info.get("sha")
if not sha:
return "Error: Could not get file SHA for delete"
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
response = await client.delete(
self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._headers(__user__),
json={
"message": message,
"branch": effective_branch,
"sha": sha,
},
# FIXED: Using same client instance
response = await client.delete(
self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._headers(__user__),
json={
"message": message,
"branch": effective_branch,
"sha": sha,
},
)
response.raise_for_status()
result = response.json()
commit_sha = result.get("commit", {}).get("sha", "unknown")[:8]
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": "Done",
"done": True,
"hidden": True,
},
}
)
response.raise_for_status()
result = response.json()
commit_sha = result.get("commit", {}).get("sha", "unknown")[:8]
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": "Done",
"done": True,
"hidden": True,
},
}
)
return f"Deleted {path} on {effective_branch}\nCommit: {commit_sha}\nMessage: {message}"
return f"Deleted {path} on {effective_branch}\nCommit: {commit_sha}\nMessage: {message}"
except httpx.HTTPStatusError as e:
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
@@ -637,7 +754,9 @@ class Tools:
}
)
try:
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.post(
self._api_url(f"/repos/{owner}/{repo_name}/branches"),
headers=self._headers(__user__),
@@ -662,7 +781,7 @@ class Tools:
except httpx.HTTPStatusError as e:
if e.response.status_code == 409:
return f"Branch '{branch_name}' already exists"
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
@@ -693,7 +812,9 @@ class Tools:
}
)
try:
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.post(
self._api_url(f"/repos/{owner}/{repo_name}/pulls"),
headers=self._headers(__user__),
@@ -721,7 +842,163 @@ class Tools:
)
return f"Created PR #{pr_number}: {title}\n{head_branch}{effective_base}\nURL: {pr_url}"
except httpx.HTTPStatusError as e:
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
async def update_pull_request(
self,
pr_number: int,
repo: Optional[str] = None,
title: Optional[str] = None,
body: Optional[str] = None,
base_branch: Optional[str] = None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Update an existing pull request.
:param pr_number: Pull request number
:param repo: Repository in owner/repo format
:param title: New title
:param body: New body/description
:param base_branch: New base branch
"""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured"
try:
owner, repo_name = self._resolve_repo(repo, __user__)
except ValueError as e:
return f"Error: {e}"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Updating PR #{pr_number}...",
"done": False,
},
}
)
payload = {}
if title is not None:
payload["title"] = title
if body is not None:
payload["body"] = body
if base_branch is not None:
payload["base"] = base_branch
if not payload:
return "Error: No fields to update. Provide at least one of: title, body, base_branch"
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.patch(
self._api_url(f"/repos/{owner}/{repo_name}/pulls/{pr_number}"),
headers=self._headers(__user__),
json=payload,
)
response.raise_for_status()
pr = response.json()
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
return f"Updated PR #{pr_number}: {pr.get('title', '')}\nURL: {pr.get('html_url', '')}"
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return f"PR #{pr_number} not found"
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
async def merge_pull_request(
self,
pr_number: int,
repo: Optional[str] = None,
merge_method: str = "merge",
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
__event_call__: Callable[[dict], Any] = None,
) -> str:
"""Merge a pull request.
:param pr_number: Pull request number
:param repo: Repository in owner/repo format
:param merge_method: merge, rebase, or squash
"""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured"
try:
owner, repo_name = self._resolve_repo(repo, __user__)
except ValueError as e:
return f"Error: {e}"
valid_methods = ["merge", "rebase", "squash"]
if merge_method not in valid_methods:
return f"Error: Invalid merge_method '{merge_method}'. Use: {', '.join(valid_methods)}"
if __event_call__:
result = await __event_call__(
{
"type": "confirmation",
"data": {
"title": "Merge Pull Request?",
"message": f"Merge PR #{pr_number} in {owner}/{repo_name} using {merge_method} method?",
},
}
)
if result is None or result is False:
return "merge_pull_request CANCELLED"
if isinstance(result, dict) and not result.get("confirmed"):
return "merge_pull_request CANCELLED"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Merging PR #{pr_number}...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.post(
self._api_url(f"/repos/{owner}/{repo_name}/pulls/{pr_number}/merge"),
headers=self._headers(__user__),
json={"Do": merge_method},
)
response.raise_for_status()
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
return f"Successfully merged PR #{pr_number} using {merge_method} method"
except httpx.HTTPStatusError as e:
if e.response.status_code == 405:
return f"PR #{pr_number} cannot be merged (may have conflicts or be already merged)"
if e.response.status_code == 404:
return f"PR #{pr_number} not found"
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
@@ -751,7 +1028,9 @@ class Tools:
}
)
try:
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}/pulls"),
headers=self._headers(__user__),
@@ -778,7 +1057,7 @@ class Tools:
merge_status = ""
elif mergeable is False:
merge_status = " ⚠️ conflicts"
output += f"- #{number}: {title}\n {head}{base} (by {user}){merge_status}\n"
output += f"- #{number}: {title}\n {head}{base} (by {user}){merge_status}\n"
output += f"\nShowing {len(prs)} PRs (Total: {total_count})"
if len(prs) == page_size:
output += f"\nMore results may exist. Use page={page + 1} to continue."
@@ -795,7 +1074,7 @@ class Tools:
)
return output
except httpx.HTTPStatusError as e:
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
@@ -825,7 +1104,9 @@ class Tools:
}
)
try:
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}/pulls/{pr_number}"),
headers=self._headers(__user__),
@@ -859,7 +1140,7 @@ class Tools:
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return f"PR #{pr_number} not found"
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
@@ -899,7 +1180,9 @@ class Tools:
}
if labels:
params["labels"] = labels
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}/issues"),
headers=self._headers(__user__),
@@ -941,7 +1224,7 @@ class Tools:
)
return output
except httpx.HTTPStatusError as e:
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
@@ -976,7 +1259,9 @@ class Tools:
payload["labels"] = labels
if assignee:
payload["assignee"] = assignee
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.post(
self._api_url(f"/repos/{owner}/{repo_name}/issues"),
headers=self._headers(__user__),
@@ -999,7 +1284,7 @@ class Tools:
)
return f"Created issue #{issue_number}: {title}\nURL: {issue_url}"
except httpx.HTTPStatusError as e:
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
@@ -1029,7 +1314,9 @@ class Tools:
}
)
try:
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}/issues/{issue_number}"),
headers=self._headers(__user__),
@@ -1069,7 +1356,7 @@ class Tools:
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return f"Issue #{issue_number} not found"
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
@@ -1117,7 +1404,9 @@ class Tools:
if not payload:
return "Error: No fields to update. Provide at least one of: title, body, state, assignee"
try:
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.patch(
self._api_url(f"/repos/{owner}/{repo_name}/issues/{issue_number}"),
headers=self._headers(__user__),
@@ -1140,7 +1429,7 @@ class Tools:
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return f"Issue #{issue_number} not found"
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
@@ -1172,7 +1461,9 @@ class Tools:
)
output = f"# Project: {owner}/{repo_name}\n\n"
try:
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
repo_response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}"),
headers=self._headers(__user__),
@@ -1263,7 +1554,7 @@ class Tools:
except Exception as e:
return f"Error loading project context: {type(e).__name__}: {e}"
# COMMITS (NEW)
# COMMITS
async def list_commits(
self,
repo: Optional[str] = None,
@@ -1299,7 +1590,9 @@ class Tools:
}
if path:
params["path"] = path
async with httpx.AsyncClient(timeout=30.0, verify=False) as client:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}/commits"),
headers=self._headers(__user__),
@@ -1318,7 +1611,7 @@ class Tools:
msg = commit_data.get("message", "").split("\n")[0][:60]
author = commit_data.get("author", {}).get("name", "unknown")
date = commit_data.get("author", {}).get("date", "")[:10]
output += f"- `{sha}` {msg}\n by {author} on {date}\n"
output += f"- `{sha}` {msg}\n by {author} on {date}\n"
output += f"\nShowing {len(commits)} commits (Total: {total_count})"
if len(commits) == page_size:
output += (
@@ -1337,6 +1630,94 @@ class Tools:
)
return output
except httpx.HTTPStatusError as e:
return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}"
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
async def get_commit(
self,
commit_sha: str,
repo: Optional[str] = None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Get detailed information about a specific commit.
:param commit_sha: Commit SHA (full or short)
:param repo: Repository in owner/repo format
"""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured"
try:
owner, repo_name = self._resolve_repo(repo, __user__)
except ValueError as e:
return f"Error: {e}"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Fetching commit {commit_sha[:8]}...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}/git/commits/{commit_sha}"),
headers=self._headers(__user__),
)
response.raise_for_status()
commit = response.json()
output = f"# Commit: {commit.get('sha', '')[:8]}\n\n"
output += f"**Full SHA:** {commit.get('sha', 'N/A')}\n"
output += f"**Message:** {commit.get('message', 'No message')}\n"
author = commit.get('author', {})
output += f"**Author:** {author.get('name', 'unknown')} <{author.get('email', '')}>\n"
output += f"**Date:** {author.get('date', 'N/A')}\n"
committer = commit.get('committer', {})
if committer.get('name') != author.get('name'):
output += f"**Committer:** {committer.get('name', 'unknown')} <{committer.get('email', '')}>\n"
# Get file changes
response2 = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}/commits/{commit_sha}"),
headers=self._headers(__user__),
)
if response2.status_code == 200:
commit_detail = response2.json()
files = commit_detail.get('files', [])
if files:
output += f"\n**Files Changed:** {len(files)}\n"
for file in files[:20]: # Limit to first 20 files
status = file.get('status', '')
filename = file.get('filename', '')
additions = file.get('additions', 0)
deletions = file.get('deletions', 0)
output += f"- {status}: {filename} (+{additions}/-{deletions})\n"
if len(files) > 20:
output += f"... and {len(files) - 20} more files\n"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
return output
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return f"Commit '{commit_sha}' not found"
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"