Files
tools/gitea/admin.py
2026-01-13 19:08:17 +00:00

2151 lines
77 KiB
Python

"""
title: Gitea Admin
author: Jeff Smith + minimax + Claude
version: 1.3.0
license: MIT
description: Gitea organization management - teams, members, repository settings, and admin operations
requirements: pydantic, httpx
changelog:
1.3.0:
- Fixed create_org() to use correct /orgs endpoint instead of /admin/users
- Added configurable SSL verification (VERIFY_SSL valve)
- Improved error handling with JSON message parsing
- Added repository visibility parameter support
- Corrected admin-only operation documentation
- Enhanced error messages with better context
"""
from typing import Optional, Callable, Any, List, Dict
from pydantic import BaseModel, Field
import httpx
class Tools:
class Valves(BaseModel):
"""Shared configuration (set by admin)"""
GITEA_URL: str = Field(
default="https://gitea.example.com",
description="Gitea URL (requires admin token for most operations)",
)
DEFAULT_ORG: str = Field(
default="",
description="Default organization for team operations",
)
ALLOW_USER_OVERRIDES: bool = Field(
default=True,
description="Allow users to override defaults via UserValves",
)
VERIFY_SSL: bool = Field(
default=True,
description="Verify SSL certificates (disable only for self-signed certs)",
)
class UserValves(BaseModel):
"""Per-user configuration"""
GITEA_TOKEN: str = Field(
default="",
description="Your Gitea API token with admin scope",
)
USER_DEFAULT_ORG: str = Field(
default="",
description="Override default organization",
)
def __init__(self):
self.valves = self.Valves()
self.user_valves = self.UserValves()
self.citation = False
self._team_cache: Dict[str, Dict[str, int]] = {}
def _api_url(self, endpoint: str) -> str:
base = self.valves.GITEA_URL.rstrip("/")
return f"{base}/api/v1{endpoint}"
def _get_token(
self,
__user__: dict = None,
) -> str:
"""Get token - always user-specific for security."""
if __user__ and "valves" in __user__:
return __user__["valves"].GITEA_TOKEN
return ""
def _headers(self, token: str) -> dict:
return {
"Authorization": f"token {token}",
"Content-Type": "application/json",
}
def _get_org(
self,
org: Optional[str],
__user__: dict = None,
) -> str:
"""Get effective org with priority."""
if org:
return org
if self.valves.ALLOW_USER_OVERRIDES:
if __user__ and "valves" in __user__:
user_org = __user__["valves"].USER_DEFAULT_ORG
if user_org:
return user_org
return self.valves.DEFAULT_ORG
def _resolve_repo(self, repo: Optional[str]) -> tuple[str, str]:
if not repo:
raise ValueError("No repository specified")
if "/" not in repo:
raise ValueError(f"Repository must be in owner/repo format, got: {repo}")
return repo.split("/", 1)
def _format_error(self, e: httpx.HTTPStatusError) -> str:
"""Format HTTP error with JSON message if available."""
try:
error_json = e.response.json()
error_msg = error_json.get("message", e.response.text[:200])
except:
error_msg = e.response.text[:200]
return f"HTTP Error {e.response.status_code}: {error_msg}"
async def _get_team_id(
self,
team_name: str,
org: str,
__user__: dict = None,
) -> Optional[int]:
"""Resolve team name to numeric ID."""
cache_key = org.lower()
if cache_key in self._team_cache:
if team_name in self._team_cache[cache_key]:
return self._team_cache[cache_key][team_name]
token = self._get_token(__user__)
if not token:
return None
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.get(
self._api_url(f"/orgs/{org}/teams"),
headers=self._headers(token),
)
response.raise_for_status()
teams = response.json()
if not isinstance(teams, list):
return None
if cache_key not in self._team_cache:
self._team_cache[cache_key] = {}
for team in teams:
name = team.get("name", "")
team_id = team.get("id")
if name and team_id:
self._team_cache[cache_key][name] = team_id
return self._team_cache[cache_key].get(team_name)
except Exception:
return None
async def _invalidate_team_cache(self, org: str):
"""Clear team cache for an org."""
cache_key = org.lower()
if cache_key in self._team_cache:
del self._team_cache[cache_key]
# =========================================================================
# USER MANAGEMENT (ADMIN)
# =========================================================================
async def create_user(
self,
username: str,
email: str,
password: Optional[str] = None,
full_name: Optional[str] = None,
login_name: Optional[str] = None,
source_id: int = 0,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Create a new user (requires admin token).
:param username: Unique username
:param email: User email address
:param password: Password (optional, may be generated)
:param full_name: Full name
:param login_name: Login name
:param source_id: Authentication source ID
"""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured. Admin token required."
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Creating user {username}...",
"done": False,
},
}
)
payload = {
"username": username,
"email": email,
}
if password:
payload["password"] = password
if full_name:
payload["full_name"] = full_name
if login_name:
payload["login_name"] = login_name
if source_id:
payload["source_id"] = source_id
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.post(
self._api_url("/admin/users"),
headers=self._headers(token),
json=payload,
)
response.raise_for_status()
user = response.json()
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
return f"Created user: {username} (ID: {user.get('id', 'unknown')})\nEmail: {email}\nFull Name: {full_name or 'N/A'}"
except httpx.HTTPStatusError as e:
if e.response.status_code == 422:
return f"User '{username}' or email '{email}' already exists"
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
async def delete_user(
self,
username: str,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
__event_call__: Callable[[dict], Any] = None,
) -> str:
"""Delete a user (requires admin token).
:param username: Username to delete
"""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured. Admin token required."
if __event_call__:
result = await __event_call__(
{
"type": "confirmation",
"data": {
"title": "Delete User?",
"message": f"Permanently delete user '{username}'? This action cannot be undone.",
},
}
)
if result is None or result is False:
return "delete_user CANCELLED"
if isinstance(result, dict) and not result.get("confirmed"):
return "delete_user CANCELLED"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Deleting user {username}...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.delete(
self._api_url(f"/admin/users/{username}"),
headers=self._headers(token),
)
if response.status_code == 204:
if __event_emitter__:
await __event_emitter__(
{"type": "status", "data": {"done": True, "hidden": True}}
)
return f"Deleted user: {username}"
response.raise_for_status()
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return f"User '{username}' not found"
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
async def edit_user(
self,
username: str,
email: Optional[str] = None,
password: Optional[str] = None,
full_name: Optional[str] = None,
website: Optional[str] = None,
location: Optional[str] = None,
active: Optional[bool] = None,
admin: Optional[bool] = None,
allow_create_org: Optional[bool] = None,
allow_git_hook: Optional[bool] = None,
allow_import_local: Optional[bool] = None,
max_repo_creation: Optional[int] = None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Edit a user (requires admin token).
:param username: Username to edit
:param email: New email
:param password: New password
:param full_name: New full name
:param website: Website URL
:param location: Location
:param active: Set active status
:param admin: Set admin status
:param allow_create_org: Allow organization creation
:param allow_git_hook: Allow git hooks
:param allow_import_local: Allow local imports
:param max_repo_creation: Max repository creation limit (-1 for unlimited)
"""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured. Admin token required."
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Editing user {username}...",
"done": False,
},
}
)
payload = {}
if email is not None:
payload["email"] = email
if password is not None:
payload["password"] = password
if full_name is not None:
payload["full_name"] = full_name
if website is not None:
payload["website"] = website
if location is not None:
payload["location"] = location
if active is not None:
payload["active"] = active
if admin is not None:
payload["admin"] = admin
if allow_create_org is not None:
payload["allow_create_organization"] = allow_create_org
if allow_git_hook is not None:
payload["allow_git_hook"] = allow_git_hook
if allow_import_local is not None:
payload["allow_import_local"] = allow_import_local
if max_repo_creation is not None:
payload["max_repo_creation"] = max_repo_creation
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.patch(
self._api_url(f"/admin/users/{username}"),
headers=self._headers(token),
json=payload,
)
response.raise_for_status()
user = response.json()
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
return f"Updated user: {username}\nEmail: {user.get('email', email or 'N/A')}\nFull Name: {user.get('full_name', full_name or 'N/A')}\nAdmin: {user.get('admin', admin)}"
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return f"User '{username}' not found"
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
async def list_users(
self,
page: int = 1,
limit: int = 50,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""List all users (requires admin token).
:param page: Page number
:param limit: Items per page
"""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured. Admin token required."
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Fetching users...", "done": False},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.get(
self._api_url(f"/admin/users?page={page}&limit={limit}"),
headers=self._headers(token),
)
response.raise_for_status()
users = response.json()
output = f"Users (Page {page}, Limit {limit}):\n\n"
for user in users:
login = user.get("login", "")
email = user.get("email", "")
full_name = user.get("full_name", "")
admin = user.get("admin", False)
active = user.get("active", False)
created = user.get("created_at", "")[:10]
admin_badge = " [ADMIN]" if admin else ""
active_badge = "" if active else ""
output += f"- {login}{admin_badge}{active_badge}\n"
if email:
output += f" Email: {email}\n"
if full_name:
output += f" Name: {full_name}\n"
output += f" Created: {created}\n"
output += f"\nTotal: {len(users)} users"
if len(users) == limit:
output += f" (use page={page+1} for more)"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
return output
except httpx.HTTPStatusError as e:
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
async def get_user(
self,
username: str,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Get user details.
:param username: Username to get
"""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Fetching user {username}...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.get(
self._api_url(f"/users/{username}"),
headers=self._headers(token),
)
response.raise_for_status()
user = response.json()
output = f"# User: {username}\n\n"
output += f"**ID:** {user.get('id', 'N/A')}\n"
output += f"**Login:** {user.get('login', 'N/A')}\n"
output += f"**Full Name:** {user.get('full_name', 'N/A')}\n"
output += f"**Email:** {user.get('email', 'N/A')}\n"
output += f"**Avatar URL:** {user.get('avatar_url', 'N/A')}\n"
output += f"**Website:** {user.get('website', 'N/A')}\n"
output += f"**Location:** {user.get('location', 'N/A')}\n"
output += f"**Admin:** {'Yes' if user.get('admin') else 'No'}\n"
output += f"**Active:** {'Yes' if user.get('active') else 'No'}\n"
output += f"**Repos:** {user.get('repo_count', 0)}\n"
output += f"**Followers:** {user.get('followers_count', 0)}\n"
output += f"**Following:** {user.get('following_count', 0)}\n"
output += f"**Created:** {user.get('created_at', 'N/A')[:10]}\n"
output += f"**Updated:** {user.get('updated_at', 'N/A')[:10]}\n"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
return output
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return f"User '{username}' not found"
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
# =========================================================================
# ORGANIZATION MANAGEMENT
# =========================================================================
async def create_org(
self,
org_name: str,
full_name: Optional[str] = None,
description: Optional[str] = None,
website: Optional[str] = None,
location: Optional[str] = None,
visibility: str = "public",
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Create a new organization.
:param org_name: Unique organization name
:param full_name: Full display name
:param description: Organization description
:param website: Website URL
:param location: Location
:param visibility: public, limited, or private
"""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured"
valid_visibility = ["public", "limited", "private"]
if visibility not in valid_visibility:
return f"Error: Invalid visibility '{visibility}'. Use: {', '.join(valid_visibility)}"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Creating organization {org_name}...",
"done": False,
},
}
)
payload = {
"username": org_name,
"full_name": full_name or org_name,
"description": description or "",
"website": website or "",
"location": location or "",
"visibility": visibility,
}
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.post(
self._api_url("/orgs"), # FIXED: was /admin/users
headers=self._headers(token),
json=payload,
)
response.raise_for_status()
org = response.json()
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
return f"Created organization: {org_name}\nFull Name: {full_name or org_name}\nVisibility: {visibility}\nURL: {org.get('html_url', 'N/A')}"
except httpx.HTTPStatusError as e:
if e.response.status_code == 422:
return f"Organization '{org_name}' already exists"
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
async def delete_org(
self,
org: str,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
__event_call__: Callable[[dict], Any] = None,
) -> str:
"""Delete an organization (requires organization owner permission).
:param org: Organization name
"""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured"
if __event_call__:
result = await __event_call__(
{
"type": "confirmation",
"data": {
"title": "Delete Organization?",
"message": f"Permanently delete organization '{org}' and all its repositories? This action cannot be undone.",
},
}
)
if result is None or result is False:
return "delete_org CANCELLED"
if isinstance(result, dict) and not result.get("confirmed"):
return "delete_org CANCELLED"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Deleting organization {org}...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.delete(
self._api_url(f"/orgs/{org}"),
headers=self._headers(token),
)
if response.status_code == 204:
if __event_emitter__:
await __event_emitter__(
{"type": "status", "data": {"done": True, "hidden": True}}
)
return f"Deleted organization: {org}"
response.raise_for_status()
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return f"Organization '{org}' not found"
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
async def edit_org(
self,
org: str,
full_name: Optional[str] = None,
description: Optional[str] = None,
website: Optional[str] = None,
location: Optional[str] = None,
visibility: Optional[str] = None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Edit an organization.
:param org: Organization name
:param full_name: New full name
:param description: New description
:param website: New website
:param location: New location
:param visibility: public, limited, or private
"""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured"
if visibility and visibility not in ["public", "limited", "private"]:
return f"Error: Invalid visibility '{visibility}'. Use: public, limited, private"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Editing organization {org}...",
"done": False,
},
}
)
payload = {}
if full_name is not None:
payload["full_name"] = full_name
if description is not None:
payload["description"] = description
if website is not None:
payload["website"] = website
if location is not None:
payload["location"] = location
if visibility is not None:
payload["visibility"] = visibility
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.patch(
self._api_url(f"/orgs/{org}"),
headers=self._headers(token),
json=payload,
)
response.raise_for_status()
org_info = response.json()
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
return f"Updated organization: {org}\nFull Name: {org_info.get('full_name', full_name or org)}\nVisibility: {org_info.get('visibility', visibility or 'N/A')}"
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return f"Organization '{org}' not found"
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
async def list_orgs(
self,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""List organizations the authenticated user belongs to."""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured. Add it in User Settings > Gitea Admin."
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Fetching organizations...", "done": False},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.get(
self._api_url("/user/orgs"),
headers=self._headers(token),
)
response.raise_for_status()
orgs = response.json()
output = "Your Organizations:\n\n"
for org in orgs:
name = org.get("username", "")
desc = org.get("description", "")[:60] or "No description"
output += f"- {name}: {desc}\n"
effective_org = self._get_org(None, __user__)
if effective_org:
output += f"\nDefault org: {effective_org}"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
return output
except httpx.HTTPStatusError as e:
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
async def get_org(
self,
org: Optional[str] = None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Get organization details."""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured"
effective_org = self._get_org(org, __user__)
if not effective_org:
return "Error: Organization not specified and no default set"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Fetching {effective_org}...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.get(
self._api_url(f"/orgs/{effective_org}"),
headers=self._headers(token),
)
response.raise_for_status()
org_info = response.json()
output = f"# Organization: {effective_org}\n\n"
output += f"**Full Name:** {org_info.get('full_name', effective_org)}\n"
output += (
f"**Description:** {org_info.get('description', 'No description')}\n"
)
output += f"**Website:** {org_info.get('website', 'N/A')}\n"
output += f"**Location:** {org_info.get('location', 'N/A')}\n"
output += f"**Visibility:** {org_info.get('visibility', 'unknown')}\n"
output += f"**Repos:** {org_info.get('repo_count', 0)}\n"
output += f"**Members:** {org_info.get('members', 0)}\n"
output += f"**Teams:** {org_info.get('teams', 0)}\n"
output += f"**Created:** {org_info.get('created_at', 'N/A')[:10]}\n"
output += f"**Updated:** {org_info.get('updated_at', 'N/A')[:10]}\n"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
return output
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return f"Organization not found: {effective_org}"
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
async def list_org_members(
self,
org: Optional[str] = None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""List members of an organization.
:param org: Organization name
"""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured"
effective_org = self._get_org(org, __user__)
if not effective_org:
return "Error: Organization not specified and no default set"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Fetching members of {effective_org}...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.get(
self._api_url(f"/orgs/{effective_org}/members"),
headers=self._headers(token),
)
response.raise_for_status()
members = response.json()
output = f"Members of {effective_org}:\n\n"
for member in members:
login = member.get("login", "")
full_name = member.get("full_name", "") or member.get("username", "")
output += f"- {login} ({full_name})\n"
output += f"\nTotal: {len(members)} members"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
return output
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return f"Organization '{effective_org}' not found or no access"
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
async def add_org_member(
self,
username: str,
org: Optional[str] = None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Add a member to an organization.
:param username: Username to add
:param org: Organization name
"""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured"
effective_org = self._get_org(org, __user__)
if not effective_org:
return "Error: Organization not specified and no default set"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Adding {username} to {effective_org}...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.put(
self._api_url(f"/orgs/{effective_org}/members/{username}"),
headers=self._headers(token),
)
if response.status_code == 204:
if __event_emitter__:
await __event_emitter__(
{"type": "status", "data": {"done": True, "hidden": True}}
)
return f"Added {username} to organization '{effective_org}'"
response.raise_for_status()
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return f"User '{username}' or organization '{effective_org}' not found"
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
async def remove_org_member(
self,
username: str,
org: Optional[str] = None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Remove a member from an organization.
:param username: Username to remove
:param org: Organization name
"""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured"
effective_org = self._get_org(org, __user__)
if not effective_org:
return "Error: Organization not specified and no default set"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Removing {username} from {effective_org}...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.delete(
self._api_url(f"/orgs/{effective_org}/members/{username}"),
headers=self._headers(token),
)
if response.status_code == 204:
if __event_emitter__:
await __event_emitter__(
{"type": "status", "data": {"done": True, "hidden": True}}
)
return f"Removed {username} from organization '{effective_org}'"
response.raise_for_status()
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return f"User '{username}' not in organization '{effective_org}'"
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
async def list_org_repos(
self,
org: Optional[str] = None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""List repositories in an organization.
:param org: Organization name
"""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured"
effective_org = self._get_org(org, __user__)
if not effective_org:
return "Error: Organization not specified and no default set"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Fetching repositories for {effective_org}...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.get(
self._api_url(f"/orgs/{effective_org}/repos"),
headers=self._headers(token),
)
response.raise_for_status()
repos = response.json()
output = f"Repositories in {effective_org}:\n\n"
for repo in repos:
full_name = repo.get("full_name", "")
private = "🔒" if repo.get("private") else "🌐"
desc = repo.get("description", "")[:50] or "No description"
stars = repo.get("stars_count", 0)
forks = repo.get("forks_count", 0)
output += f"- {private} {full_name}\n"
output += f" {desc} | ⭐ {stars} | 🍴 {forks}\n"
output += f"\nTotal: {len(repos)} repositories"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
return output
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return f"Organization '{effective_org}' not found"
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
async def create_org_repo(
self,
repo_name: str,
org: Optional[str] = None,
description: str = "",
private: bool = False,
visibility: Optional[str] = None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Create a repository in an organization.
:param repo_name: Repository name
:param org: Organization name
:param description: Repository description
:param private: Make repository private (deprecated, use visibility)
:param visibility: Repository visibility (public/limited/private)
"""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured"
effective_org = self._get_org(org, __user__)
if not effective_org:
return "Error: Organization not specified and no default set"
# Handle both old and new parameters
if visibility:
if visibility not in ["public", "limited", "private"]:
return f"Error: Invalid visibility '{visibility}'. Use: public, limited, private"
is_private = visibility == "private"
else:
is_private = private
visibility = "private" if private else "public"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Creating repository {effective_org}/{repo_name}...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.post(
self._api_url(f"/orgs/{effective_org}/repos"),
headers=self._headers(token),
json={
"name": repo_name,
"description": description,
"private": is_private,
"auto_init": True,
},
)
response.raise_for_status()
repo = response.json()
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
return f"Created repository: {effective_org}/{repo_name}\nVisibility: {visibility.capitalize()}\nDescription: {description or 'N/A'}\nURL: {repo.get('html_url', 'N/A')}"
except httpx.HTTPStatusError as e:
if e.response.status_code == 422:
return f"Repository '{repo_name}' already exists in {effective_org}"
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
# =========================================================================
# TEAMS
# =========================================================================
async def list_teams(
self,
org: Optional[str] = None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""List teams in an organization."""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured"
effective_org = self._get_org(org, __user__)
if not effective_org:
return "Error: Organization not specified and no default set"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Fetching teams in {effective_org}...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.get(
self._api_url(f"/orgs/{effective_org}/teams"),
headers=self._headers(token),
)
response.raise_for_status()
teams = response.json()
output = f"Teams in {effective_org}:\n\n"
for team in teams:
name = team.get("name", "")
team_id = team.get("id", "?")
desc = team.get("description", "")[:50] or "No description"
members = team.get("members_count", 0)
repos = team.get("repos_count", 0)
permission = team.get("permission", "unknown")
output += f"- {name} (ID: {team_id}): {desc}\n"
output += (
f" Permission: {permission} | {members} members | {repos} repos\n"
)
output += f"\nTotal: {len(teams)} teams"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
return output
except httpx.HTTPStatusError as e:
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
async def create_team(
self,
name: str,
org: Optional[str] = None,
description: str = "",
permission: str = "write",
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Create a new team in an organization."""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured"
effective_org = self._get_org(org, __user__)
if not effective_org:
return "Error: Organization not specified and no default set"
valid_permissions = ["read", "write", "admin"]
if permission not in valid_permissions:
return f"Error: Invalid permission '{permission}'. Use: {', '.join(valid_permissions)}"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": f"Creating team {name}...", "done": False},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.post(
self._api_url(f"/orgs/{effective_org}/teams"),
headers=self._headers(token),
json={
"name": name,
"description": description,
"permission": permission,
"units": [
"repo.code",
"repo.issues",
"repo.pulls",
"repo.releases",
"repo.wiki",
],
},
)
response.raise_for_status()
team = response.json()
team_id = team.get("id", "unknown")
await self._invalidate_team_cache(effective_org)
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
return f"Created team: {name} (ID: {team_id}) in {effective_org}\nPermission: {permission}"
except httpx.HTTPStatusError as e:
if e.response.status_code == 422:
return f"Team '{name}' already exists in {effective_org}"
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
async def delete_team(
self,
team_name: str,
org: Optional[str] = None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
__event_call__: Callable[[dict], Any] = None,
) -> str:
"""Delete a team from an organization."""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured"
effective_org = self._get_org(org, __user__)
if not effective_org:
return "Error: Organization not specified and no default set"
team_id = await self._get_team_id(team_name, effective_org, __user__)
if not team_id:
return f"Error: Team '{team_name}' not found in {effective_org}"
if __event_call__:
result = await __event_call__(
{
"type": "confirmation",
"data": {
"title": "Delete Team?",
"message": f"Permanently delete team '{team_name}' (ID: {team_id}) from {effective_org}?",
},
}
)
if result is None or result is False:
return "delete_team CANCELLED"
if isinstance(result, dict) and not result.get("confirmed"):
return "delete_team CANCELLED"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Deleting team {team_name}...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.delete(
self._api_url(f"/teams/{team_id}"),
headers=self._headers(token),
)
if response.status_code == 204:
await self._invalidate_team_cache(effective_org)
if __event_emitter__:
await __event_emitter__(
{"type": "status", "data": {"done": True, "hidden": True}}
)
return f"Deleted team '{team_name}' (ID: {team_id}) from {effective_org}"
response.raise_for_status()
except httpx.HTTPStatusError as e:
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
async def add_team_member(
self,
username: str,
team_name: str,
org: Optional[str] = None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Add a user to a team."""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured"
effective_org = self._get_org(org, __user__)
if not effective_org:
return "Error: Organization not specified and no default set"
team_id = await self._get_team_id(team_name, effective_org, __user__)
if not team_id:
return f"Error: Team '{team_name}' not found in {effective_org}. Use list_teams() to see available teams."
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Adding {username} to {team_name}...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.put(
self._api_url(f"/teams/{team_id}/members/{username}"),
headers=self._headers(token),
)
if response.status_code == 204:
if __event_emitter__:
await __event_emitter__(
{"type": "status", "data": {"done": True, "hidden": True}}
)
return f"Added {username} to team '{team_name}' (ID: {team_id}) in {effective_org}"
response.raise_for_status()
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return f"User '{username}' not found"
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
async def remove_team_member(
self,
username: str,
team_name: str,
org: Optional[str] = None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Remove a user from a team."""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured"
effective_org = self._get_org(org, __user__)
if not effective_org:
return "Error: Organization not specified and no default set"
team_id = await self._get_team_id(team_name, effective_org, __user__)
if not team_id:
return f"Error: Team '{team_name}' not found in {effective_org}"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Removing {username} from {team_name}...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.delete(
self._api_url(f"/teams/{team_id}/members/{username}"),
headers=self._headers(token),
)
if response.status_code == 204:
if __event_emitter__:
await __event_emitter__(
{"type": "status", "data": {"done": True, "hidden": True}}
)
return f"Removed {username} from team '{team_name}' (ID: {team_id}) in {effective_org}"
response.raise_for_status()
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return f"User '{username}' not in team '{team_name}'"
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
async def list_team_members(
self,
team_name: str,
org: Optional[str] = None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""List members of a team."""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured"
effective_org = self._get_org(org, __user__)
if not effective_org:
return "Error: Organization not specified and no default set"
team_id = await self._get_team_id(team_name, effective_org, __user__)
if not team_id:
return f"Error: Team '{team_name}' not found in {effective_org}. Use list_teams() to see available teams."
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Fetching members of {team_name}...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.get(
self._api_url(f"/teams/{team_id}/members"),
headers=self._headers(token),
)
response.raise_for_status()
members = response.json()
output = f"Members of '{team_name}' (ID: {team_id}) in {effective_org}:\n\n"
for member in members:
login = member.get("login", "")
full_name = member.get("full_name", "") or member.get("username", "")
is_admin = member.get("is_admin", False)
admin_badge = " [ADMIN]" if is_admin else ""
output += f"- {login} ({full_name}){admin_badge}\n"
output += f"\nTotal: {len(members)} members"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
return output
except httpx.HTTPStatusError as e:
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
async def add_team_repo(
self,
team_name: str,
repo: str,
org: Optional[str] = None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Add a repository to a team's access list."""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured"
effective_org = self._get_org(org, __user__)
if not effective_org:
return "Error: Organization not specified"
team_id = await self._get_team_id(team_name, effective_org, __user__)
if not team_id:
return f"Error: Team '{team_name}' not found in {effective_org}"
try:
owner, repo_name = self._resolve_repo(repo)
except ValueError as e:
return f"Error: {e}"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": f"Adding {repo} to team...", "done": False},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.put(
self._api_url(f"/teams/{team_id}/repos/{owner}/{repo_name}"),
headers=self._headers(token),
)
if response.status_code == 204:
if __event_emitter__:
await __event_emitter__(
{"type": "status", "data": {"done": True, "hidden": True}}
)
return f"Added {repo} to team '{team_name}'"
response.raise_for_status()
except httpx.HTTPStatusError as e:
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
async def remove_team_repo(
self,
team_name: str,
repo: str,
org: Optional[str] = None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Remove a repository from a team's access list."""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured"
effective_org = self._get_org(org, __user__)
if not effective_org:
return "Error: Organization not specified"
team_id = await self._get_team_id(team_name, effective_org, __user__)
if not team_id:
return f"Error: Team '{team_name}' not found in {effective_org}"
try:
owner, repo_name = self._resolve_repo(repo)
except ValueError as e:
return f"Error: {e}"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Removing {repo} from team...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.delete(
self._api_url(f"/teams/{team_id}/repos/{owner}/{repo_name}"),
headers=self._headers(token),
)
if response.status_code == 204:
if __event_emitter__:
await __event_emitter__(
{"type": "status", "data": {"done": True, "hidden": True}}
)
return f"Removed {repo} from team '{team_name}'"
response.raise_for_status()
except httpx.HTTPStatusError as e:
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
async def list_team_repos(
self,
team_name: str,
org: Optional[str] = None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""List repositories accessible to a team."""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured"
effective_org = self._get_org(org, __user__)
if not effective_org:
return "Error: Organization not specified"
team_id = await self._get_team_id(team_name, effective_org, __user__)
if not team_id:
return f"Error: Team '{team_name}' not found in {effective_org}"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Fetching repos for {team_name}...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.get(
self._api_url(f"/teams/{team_id}/repos"),
headers=self._headers(token),
)
response.raise_for_status()
repos = response.json()
output = f"Repositories for team '{team_name}' (ID: {team_id}):\n\n"
for repo in repos:
full_name = repo.get("full_name", "")
private = "🔒" if repo.get("private") else "🌐"
desc = repo.get("description", "")[:40] or "No description"
output += f"- {private} {full_name}: {desc}\n"
output += f"\nTotal: {len(repos)} repositories"
if __event_emitter__:
await __event_emitter__(
{"type": "status", "data": {"done": True, "hidden": True}}
)
return output
except httpx.HTTPStatusError as e:
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
# =========================================================================
# COLLABORATORS
# =========================================================================
async def add_collaborator(
self,
username: str,
repo: str,
permission: str = "write",
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Add a collaborator to a repository."""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured"
try:
owner, repo_name = self._resolve_repo(repo)
except ValueError as e:
return f"Error: {e}"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Adding {username} to {owner}/{repo_name}...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.put(
self._api_url(
f"/repos/{owner}/{repo_name}/collaborators/{username}"
),
headers=self._headers(token),
json={"permission": permission},
)
response.raise_for_status()
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
return (
f"Added {username} to {owner}/{repo_name} with {permission} permission"
)
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return f"Repository {owner}/{repo_name} or user '{username}' not found"
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
async def remove_collaborator(
self,
username: str,
repo: str,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Remove a collaborator from a repository."""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured"
try:
owner, repo_name = self._resolve_repo(repo)
except ValueError as e:
return f"Error: {e}"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Removing {username} from {owner}/{repo_name}...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.delete(
self._api_url(
f"/repos/{owner}/{repo_name}/collaborators/{username}"
),
headers=self._headers(token),
)
if response.status_code == 204:
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": "Done",
"done": True,
"hidden": True,
},
}
)
return f"Removed {username} from {owner}/{repo_name}"
response.raise_for_status()
except httpx.HTTPStatusError as e:
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
async def list_collaborators(
self,
repo: str,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""List collaborators on a repository."""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured"
try:
owner, repo_name = self._resolve_repo(repo)
except ValueError as e:
return f"Error: {e}"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Fetching collaborators for {owner}/{repo_name}...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}/collaborators"),
headers=self._headers(token),
)
response.raise_for_status()
collabs = response.json()
output = f"Collaborators on {owner}/{repo_name}:\n\n"
for col in collabs:
login = col.get("login", "")
perm = col.get("permissions", {})
role = (
"admin"
if perm.get("admin")
else "write" if perm.get("push") else "read"
)
output += f"- {login} ({role})\n"
output += f"\nTotal: {len(collabs)} collaborators"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
return output
except httpx.HTTPStatusError as e:
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
# =========================================================================
# REPOSITORY MANAGEMENT
# =========================================================================
async def create_repo(
self,
repo_name: str,
description: str = "",
private: bool = False,
visibility: Optional[str] = None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Create a personal repository.
:param repo_name: Repository name
:param description: Repository description
:param private: Make repository private (deprecated, use visibility)
:param visibility: Repository visibility (public/limited/private)
"""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured"
# Handle both old and new parameters
if visibility:
if visibility not in ["public", "limited", "private"]:
return f"Error: Invalid visibility '{visibility}'. Use: public, limited, private"
is_private = visibility == "private"
else:
is_private = private
visibility = "private" if private else "public"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Creating repository {repo_name}...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.post(
self._api_url("/user/repos"),
headers=self._headers(token),
json={
"name": repo_name,
"description": description,
"private": is_private,
"auto_init": True,
},
)
response.raise_for_status()
repo = response.json()
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
return f"Created repository: {repo_name}\nVisibility: {visibility.capitalize()}\nDescription: {description or 'N/A'}\nURL: {repo.get('html_url', 'N/A')}"
except httpx.HTTPStatusError as e:
if e.response.status_code == 422:
return f"Repository '{repo_name}' already exists"
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
async def delete_repo(
self,
repo: str,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
__event_call__: Callable[[dict], Any] = None,
) -> str:
"""Delete a repository.
:param repo: Repository in owner/repo format
"""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured"
try:
owner, repo_name = self._resolve_repo(repo)
except ValueError as e:
return f"Error: {e}"
if __event_call__:
result = await __event_call__(
{
"type": "confirmation",
"data": {
"title": "Delete Repository?",
"message": f"Permanently delete repository '{owner}/{repo_name}'? This action cannot be undone.",
},
}
)
if result is None or result is False:
return "delete_repo CANCELLED"
if isinstance(result, dict) and not result.get("confirmed"):
return "delete_repo CANCELLED"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Deleting repository {owner}/{repo_name}...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.delete(
self._api_url(f"/repos/{owner}/{repo_name}"),
headers=self._headers(token),
)
if response.status_code == 204:
if __event_emitter__:
await __event_emitter__(
{"type": "status", "data": {"done": True, "hidden": True}}
)
return f"Deleted repository: {owner}/{repo_name}"
response.raise_for_status()
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return f"Repository '{owner}/{repo_name}' not found"
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
async def get_repo(
self,
repo: str,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""Get repository details.
:param repo: Repository in owner/repo format
"""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured"
try:
owner, repo_name = self._resolve_repo(repo)
except ValueError as e:
return f"Error: {e}"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Fetching repository {owner}/{repo_name}...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}"),
headers=self._headers(token),
)
response.raise_for_status()
repo_info = response.json()
output = f"# Repository: {owner}/{repo_name}\n\n"
output += f"**Full Name:** {repo_info.get('full_name', 'N/A')}\n"
output += (
f"**Description:** {repo_info.get('description', 'No description')}\n"
)
output += f"**Private:** {'Yes' if repo_info.get('private') else 'No'}\n"
output += f"**Fork:** {'Yes' if repo_info.get('fork') else 'No'}\n"
output += f"**Stars:** {repo_info.get('stars_count', 0)}\n"
output += f"**Forks:** {repo_info.get('forks_count', 0)}\n"
output += f"**Watchers:** {repo_info.get('watchers_count', 0)}\n"
output += f"**Open Issues:** {repo_info.get('open_issues_count', 0)}\n"
output += f"**Size:** {repo_info.get('size', 0)} KB\n"
output += f"**Default Branch:** {repo_info.get('default_branch', 'main')}\n"
output += f"**Created:** {repo_info.get('created_at', 'N/A')[:10]}\n"
output += f"**Updated:** {repo_info.get('updated_at', 'N/A')[:10]}\n"
output += f"**HTML URL:** {repo_info.get('html_url', 'N/A')}\n"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
return output
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return f"Repository '{owner}/{repo_name}' not found"
return self._format_error(e)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"