""" title: Gitea Dev - Native Mode Optimized author: Jeff Smith + Claude + minimax + kimi-k2 version: 1.4.0 license: MIT description: Interact with Gitea repositories - native tool calling optimized for high-tier LLMs with robust error handling requirements: pydantic, httpx changelog: 1.4.0: - Added CRUD operations for Issues (get, update, close, reopen, delete, comments) - Added CRUD operations for Pull Requests (get, update, merge, comments) 1.3.0: - Native Mode optimization with explicit tool schemas - Added mapping protocol support to Valves/UserValves (__iter__, keys, __getitem__) - Enhanced __init__ for framework-driven configuration injection - Added self.citation = True for tool usage visibility - Robust error handling with detailed context in responses - Structured context returns for better LLM understanding - Added to_dict() helper methods on valve classes """ from typing import Optional, Callable, Any, List, Dict from pydantic import BaseModel, Field import httpx import base64 class Tools: class Valves(BaseModel): """System-wide configuration for Gitea integration""" GITEA_URL: str = Field( default="https://gitea.example.com", description="Gitea server URL (ingress or internal service)", ) DEFAULT_REPO: str = Field( default="", description="Default repository in owner/repo format (e.g., 'myorg/myrepo')", ) DEFAULT_BRANCH: str = Field( default="main", description="Default branch name for operations" ) DEFAULT_ORG: str = Field( default="", description="Default organization for org-scoped operations" ) ALLOW_USER_OVERRIDES: bool = Field( default=True, description="Allow users to override defaults via UserValves", ) VERIFY_SSL: bool = Field( default=True, description="Verify SSL certificates (disable for self-signed certs)", ) DEFAULT_PAGE_SIZE: int = Field( default=50, description="Default page size for list operations (max 50)", ge=1, le=50, ) class UserValves(BaseModel): """Per-user configuration for personal credentials and overrides""" GITEA_TOKEN: str = Field( default="", description="Your Gitea API token (Settings > Applications > Generate Token)", ) USER_DEFAULT_REPO: str = Field( default="", description="Override default repository for this user", ) USER_DEFAULT_BRANCH: str = Field( default="", description="Override default branch for this user", ) USER_DEFAULT_ORG: str = Field( default="", description="Override default organization for this user", ) def __init__(self): """Initialize with optional valve configuration from framework""" # Handle valves configuration from framework self.valves = self.Valves() # Enable tool usage visibility for debugging self.citation = True # Handle user valves configuration self.user_valves = self.UserValves() def _api_url(self, endpoint: str) -> str: """Construct full API URL for Gitea endpoint""" base = self._get_url() return f"{base}/api/v1{endpoint}" def _get_url(self) -> str: """Get effective Gitea URL with trailing slash handling""" # TODO: Allow USER OverRide return self.valves.GITEA_URL.rstrip("/") def _get_token(self, __user__: dict = None) -> str: """Extract Gitea token from user context with robust handling""" if __user__ and "valves" in __user__: user_valves = __user__.get("valves") if __user__ else None return user_valves.GITEA_TOKEN return "" def _headers(self, __user__: dict = None) -> dict: """Generate authentication headers with token""" token = self._get_token(__user__) if not token: return {"Content-Type": "application/json"} return { "Authorization": f"token {token}", "Content-Type": "application/json", } def _format_error(self, e, context: str = "") -> str: """Format HTTP error with detailed context for LLM understanding""" try: error_json = e.response.json() error_msg = error_json.get("message", e.response.text[:200]) except Exception: error_msg = e.response.text[:200] context_str = f" ({context})" if context else "" return f"HTTP Error {e.response.status_code}{context_str}: {error_msg}" def _get_repo(self, repo: Optional[str], __user__: dict = None) -> str: """Get effective repository with priority resolution""" if repo: return repo if __user__ and "valves" in __user__: user_valves = __user__.get("valves") if __user__ else None if self.valves.ALLOW_USER_OVERRIDES and user_valves.USER_DEFAULT_REPO: return user_valves.USER_DEFAULT_REPO return self.valves.DEFAULT_REPO def _get_branch(self, branch: Optional[str], __user__: dict = None) -> str: """Get effective branch with priority resolution""" """Get effective branch with priority.""" if branch: return branch if __user__ and "valves" in __user__: user_valves = __user__.get("valves") if __user__ else None if self.valves.ALLOW_USER_OVERRIDES and user_valves.USER_DEFAULT_BRANCH: return user_valves.USER_DEFAULT_BRANCH return self.valves.DEFAULT_BRANCH def _get_org(self, org: Optional[str], __user__: dict = None) -> str: """Get effective org with priority.""" if org: return org if __user__ and "valves" in __user__: user_valves = __user__.get("valves") if __user__ else None if self.valves.ALLOW_USER_OVERRIDES and user_valves.USER_DEFAULT_ORG: return user_valves.USER_DEFAULT_ORG return self.valves.DEFAULT_ORG def _resolve_repo( self, repo: Optional[str], __user__: dict = None ) -> tuple[str, str]: """Resolve repository string into owner and repo name with validation""" effective_repo = self._get_repo(repo, __user__) if not effective_repo: raise ValueError( "No repository specified. Set DEFAULT_REPO in Valves or USER_DEFAULT_REPO in UserValves." ) if "/" not in effective_repo: raise ValueError( f"Repository must be in 'owner/repo' format, got: {effective_repo}" ) return effective_repo.split("/", 1) def _get_page_size(self, limit: Optional[int] = None) -> int: """Calculate effective page size, capped at Gitea's max of 50""" if limit is not None: return min(limit, 50) return min(self.valves.DEFAULT_PAGE_SIZE, 50) async def list_repos( self, page: int = 1, limit: Optional[int] = None, __user__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ List repositories accessible to the authenticated user with pagination. :param page: Page number for pagination (default: 1) :param limit: Number of results per page (max 50, default: 50) :return: Formatted string listing repositories with metadata """ token = self._get_token(__user__) if not token: return "Error: GITEA_TOKEN not configured. Add it in UserValves settings." page_size = self._get_page_size(limit) if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Fetching repositories (page {page})...", "done": False, }, } ) try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: response = await client.get( self._api_url("/user/repos"), headers=self._headers(__user__), params={"page": page, "limit": page_size}, ) response.raise_for_status() repos = response.json() total_count = response.headers.get("x-total-count", "?") output = f"**Accessible Repositories (Page {page}, {page_size}/page)**\n\n" for repo in repos: full_name = repo.get("full_name", "unknown") desc = repo.get("description", "")[:50] or "No description" private = "šŸ”’" if repo.get("private") else "🌐" default_branch = repo.get("default_branch", "main") stars = repo.get("stars_count", 0) forks = repo.get("forks_count", 0) output += f"- {private} **{full_name}** ({default_branch})\n" output += f" Stars: {stars} | Forks: {forks}\n" output += f" {desc}\n\n" output += f"**Showing {len(repos)} repositories (Total: {total_count})**\n" if len(repos) == page_size: output += ( f"_More results available. Use page={page + 1} to continue._\n" ) effective_repo = self._get_repo(None, __user__) if effective_repo: output += f"\n**Active project:** {effective_repo}" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) return output.strip() except httpx.HTTPStatusError as e: error_msg = self._format_error(e, "repository listing") if e.response.status_code == 401: return f"Error: Invalid or missing GITEA_TOKEN. Please check your token in UserValves settings.\n\nDetails: {error_msg}" return f"Error: Failed to list repositories. {error_msg}" except Exception as e: return f"Error: Unexpected failure during repository listing: {type(e).__name__}: {e}" async def get_repo( self, repo: Optional[str] = None, __user__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ Get detailed repository information and metadata. :param repo: Repository in 'owner/repo' format (e.g., 'open-webui/open-webui') :return: Formatted repository details including stats, URLs, and configuration """ token = self._get_token(__user__) if not token: return "Error: GITEA_TOKEN not configured in UserValves settings." try: owner, repo_name = self._resolve_repo(repo, __user__) except ValueError as e: return f"Error: Invalid repository format. {e}" if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Fetching repository {owner}/{repo_name}...", "done": False, }, } ) try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: response = await client.get( self._api_url(f"/repos/{owner}/{repo_name}"), headers=self._headers(__user__), ) response.raise_for_status() repo_info = response.json() output = f"# Repository: {owner}/{repo_name}\n\n" output += f"**Full Name:** `{repo_info.get('full_name', 'N/A')}`\n" output += ( f"**Description:** {repo_info.get('description', 'No description')}\n" ) output += ( f"**Private:** {'Yes šŸ”’' if repo_info.get('private') else 'No 🌐'}\n" ) output += f"**Fork:** {'Yes' if repo_info.get('fork') else 'No'}\n" output += f"**Stars:** ⭐ {repo_info.get('stars_count', 0)}\n" output += f"**Forks:** šŸ“ {repo_info.get('forks_count', 0)}\n" output += f"**Watchers:** šŸ‘ļø {repo_info.get('watchers_count', 0)}\n" output += f"**Open Issues:** šŸ“‹ {repo_info.get('open_issues_count', 0)}\n" output += f"**Size:** {repo_info.get('size', 0)} KB\n" output += f"**Language:** {repo_info.get('language', 'N/A')}\n" output += ( f"**Default Branch:** `{repo_info.get('default_branch', 'main')}`\n" ) output += f"**Created:** {repo_info.get('created_at', 'N/A')[:10]}\n" output += f"**Updated:** {repo_info.get('updated_at', 'N/A')[:10]}\n" output += f"**HTML URL:** {repo_info.get('html_url', 'N/A')}\n" output += f"**Clone URL:** `{repo_info.get('clone_url', 'N/A')}`\n" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) return output except httpx.HTTPStatusError as e: error_msg = self._format_error(e, f"repository {owner}/{repo_name}") if e.response.status_code == 404: return f"Error: Repository '{owner}/{repo_name}' not found. Please verify the repository name and your access permissions." return f"Error: Failed to fetch repository details. {error_msg}" except Exception as e: return f"Error: Unexpected failure during repository fetch: {type(e).__name__}: {e}" async def list_files( self, path: str = "", repo: Optional[str] = None, branch: Optional[str] = None, __user__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ List files and directories in a repository path. :param path: Directory path to list (default: root) :param repo: Repository in 'owner/repo' format :param branch: Branch name (defaults to repository default) :return: Formatted directory listing with file sizes and types """ token = self._get_token(__user__) if not token: return "Error: GITEA_TOKEN not configured in UserValves settings." try: owner, repo_name = self._resolve_repo(repo, __user__) except ValueError as e: return f"Error: {e}" effective_branch = self._get_branch(branch, __user__) if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Listing {path or 'root'}...", "done": False, }, } ) try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: response = await client.get( self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), headers=self._headers(__user__), params={"ref": effective_branch}, ) response.raise_for_status() contents = response.json() if isinstance(contents, dict): return f"**Note:** '{path}' is a file. Use `get_file()` to read its contents." output = f"**Contents of {owner}/{repo_name}/{path or '.'}** (`{effective_branch}` branch)\n\n" dirs = [item for item in contents if item.get("type") == "dir"] files = [item for item in contents if item.get("type") == "file"] if dirs: output += "**šŸ“ Directories:**\n" for item in sorted(dirs, key=lambda x: x.get("name", "").lower()): output += f"- `šŸ“ {item.get('name', '')}/`\n" output += "\n" if files: output += "**šŸ“„ Files:**\n" for item in sorted(files, key=lambda x: x.get("name", "").lower()): size = item.get("size", 0) if size < 1024: size_str = f"{size}B" elif size < 1024 * 1024: size_str = f"{size//1024}KB" else: size_str = f"{size//(1024*1024)}MB" sha_short = item.get("sha", "unknown")[:8] output += ( f"- `{item.get('name', '')}` ({size_str}, sha: {sha_short})\n" ) output += f"\n**Total:** {len(dirs)} directories, {len(files)} files" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) return output except httpx.HTTPStatusError as e: error_msg = self._format_error(e, f"directory listing for '{path}'") if e.response.status_code == 404: return f"Error: Path not found: `{path}`. Verify the path exists in the repository." return f"Error: Failed to list directory contents. {error_msg}" except Exception as e: return f"Error: Unexpected failure during directory listing: {type(e).__name__}: {e}" async def get_file( self, path: str, repo: Optional[str] = None, branch: Optional[str] = None, __user__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ Get the contents of a file from the repository. :param path: Full path to the file (e.g., 'src/main.py') :param repo: Repository in 'owner/repo' format :param branch: Branch name (defaults to repository default) :return: File content with metadata (SHA, size, branch) """ token = self._get_token(__user__) if not token: return "Error: GITEA_TOKEN not configured in UserValves settings." try: owner, repo_name = self._resolve_repo(repo, __user__) except ValueError as e: return f"Error: {e}" effective_branch = self._get_branch(branch, __user__) if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": f"Reading file {path}...", "done": False}, } ) try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: response = await client.get( self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), headers=self._headers(__user__), params={"ref": effective_branch}, ) response.raise_for_status() file_info = response.json() if isinstance(file_info, list): return f"**Note:** '{path}' is a directory. Use `list_files()` to browse its contents." if file_info.get("type") != "file": return f"Error: '{path}' is not a file (type: {file_info.get('type')})" content_b64 = file_info.get("content", "") try: content = base64.b64decode(content_b64).decode("utf-8") except Exception: return "Error: Could not decode file content. The file may be binary or corrupted." size = file_info.get("size", 0) if size < 1024: size_str = f"{size}B" elif size < 1024 * 1024: size_str = f"{size//1024}KB" else: size_str = f"{size//(1024*1024)}MB" sha_short = file_info.get("sha", "unknown")[:8] if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) output = f"**File:** `{owner}/{repo_name}/{path}`\n" output += f"**Branch:** `{effective_branch}` | **SHA:** `{sha_short}` | **Size:** {size_str}\n" output += f"**URL:** {file_info.get('html_url', 'N/A')}\n\n" output += f"```\n{content}\n```" return output except httpx.HTTPStatusError as e: error_msg = self._format_error(e, f"file fetch for '{path}'") if e.response.status_code == 404: return ( f"Error: File not found: `{path}`. Verify the file path and branch." ) return f"Error: Failed to fetch file. {error_msg}" except Exception as e: return ( f"Error: Unexpected failure during file fetch: {type(e).__name__}: {e}" ) async def update_file( self, path: str, content: str, message: str, repo: Optional[str] = None, branch: Optional[str] = None, __user__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ Update an existing file in the repository (creates commit). :param path: File path to update :param content: New file content as string :param message: Commit message :param repo: Repository in 'owner/repo' format :param branch: Branch name (defaults to repository default) :return: Commit details and success confirmation """ token = self._get_token(__user__) if not token: return "Error: GITEA_TOKEN not configured in UserValves settings." try: owner, repo_name = self._resolve_repo(repo, __user__) except ValueError as e: return f"Error: {e}" effective_branch = self._get_branch(branch, __user__) if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": f"Updating {path}...", "done": False}, } ) try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: # Get current file SHA get_response = await client.get( self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), headers=self._headers(__user__), params={"ref": effective_branch}, ) if get_response.status_code == 404: return f"Error: File not found: `{path}`. Use `create_file()` to create a new file." get_response.raise_for_status() file_info = get_response.json() sha = file_info.get("sha") if not sha: return "Error: Could not retrieve file SHA for update. The file may be corrupted or inaccessible." # Prepare updated content content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii") # Update file response = await client.put( self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), headers=self._headers(__user__), json={ "content": content_b64, "message": message, "branch": effective_branch, "sha": sha, }, ) response.raise_for_status() result = response.json() commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) output = f"**File Updated Successfully**\n\n" output += f"**File:** `{owner}/{repo_name}/{path}`\n" output += f"**Branch:** `{effective_branch}`\n" output += f"**Commit:** `{commit_sha}`\n" output += f"**Message:** {message}\n" return output except httpx.HTTPStatusError as e: error_msg = self._format_error(e, f"file update for '{path}'") if e.response.status_code == 409: return f"Error: Update conflict for `{path}`. The file may have been modified by another process. Fetch the latest version and try again." return f"Error: Failed to update file. {error_msg}" except Exception as e: return ( f"Error: Unexpected failure during file update: {type(e).__name__}: {e}" ) async def create_file( self, path: str, content: str, message: str, repo: Optional[str] = None, branch: Optional[str] = None, __user__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ Create a new file in the repository. :param path: File path to create (e.g., 'docs/README.md') :param content: Initial file content as string :param message: Commit message :param repo: Repository in 'owner/repo' format :param branch: Branch name (defaults to repository default) :return: Commit details and success confirmation """ token = self._get_token(__user__) if not token: return "Error: GITEA_TOKEN not configured in UserValves settings." try: owner, repo_name = self._resolve_repo(repo, __user__) except ValueError as e: return f"Error: {e}" effective_branch = self._get_branch(branch, __user__) if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": f"Creating {path}...", "done": False}, } ) try: content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii") async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: response = await client.post( self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), headers=self._headers(__user__), json={ "content": content_b64, "message": message, "branch": effective_branch, }, ) response.raise_for_status() result = response.json() commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) output = f"**File Created Successfully**\n\n" output += f"**File:** `{owner}/{repo_name}/{path}`\n" output += f"**Branch:** `{effective_branch}`\n" output += f"**Commit:** `{commit_sha}`\n" output += f"**Message:** {message}\n" return output except httpx.HTTPStatusError as e: error_msg = self._format_error(e, f"file creation for '{path}'") if e.response.status_code == 422: return f"Error: File already exists: `{path}`. Use `update_file()` to modify it instead." return f"Error: Failed to create file. {error_msg}" except Exception as e: return f"Error: Unexpected failure during file creation: {type(e).__name__}: {e}" async def get_project_context( self, repo: Optional[str] = None, __user__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ Get comprehensive project context including README, structure, and recent commits. :param repo: Repository in 'owner/repo' format :return: Structured project overview with README, file structure, and commit history """ token = self._get_token(__user__) if not token: return "Error: GITEA_TOKEN not configured in UserValves settings." try: owner, repo_name = self._resolve_repo(repo, __user__) except ValueError as e: return f"Error: {e}" branch = self._get_branch(None, __user__) if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Loading project context for {owner}/{repo_name}...", "done": False, }, } ) output = f"# Project Context: {owner}/{repo_name}\n\n" try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: # Get repository metadata repo_response = await client.get( self._api_url(f"/repos/{owner}/{repo_name}"), headers=self._headers(__user__), ) if repo_response.status_code == 200: repo_info = repo_response.json() output += f"**Description:** {repo_info.get('description', 'No description')}\n" output += f"**Default Branch:** `{repo_info.get('default_branch', 'main')}`\n" output += f"**Language:** {repo_info.get('language', 'Unknown')}\n" output += f"**Stars:** ⭐ {repo_info.get('stars_count', 0)} | " output += f"**Forks:** šŸ“ {repo_info.get('forks_count', 0)}\n\n" # Get root directory structure root_response = await client.get( self._api_url(f"/repos/{owner}/{repo_name}/contents/"), headers=self._headers(__user__), params={"ref": branch}, ) if root_response.status_code == 200: contents = root_response.json() output += "## šŸ“ Repository Structure\n\n" dirs = [item for item in contents if item.get("type") == "dir"] files = [item for item in contents if item.get("type") == "file"] if dirs: output += "**Directories:**\n" for item in sorted( dirs, key=lambda x: x.get("name", "").lower() ): output += f"- `šŸ“ {item.get('name', '')}/`\n" output += "\n" if files: output += "**Files:**\n" for item in sorted( files, key=lambda x: x.get("name", "").lower() ): size = item.get("size", 0) if size < 1024: size_str = f"{size}B" else: size_str = f"{size//1024}KB" output += f"- `šŸ“„ {item.get('name', '')}` ({size_str})\n" output += "\n" # Try to find and include README readme_found = False for readme_name in ["README.md", "readme.md", "README", "README.txt"]: readme_response = await client.get( self._api_url( f"/repos/{owner}/{repo_name}/contents/{readme_name}" ), headers=self._headers(__user__), params={"ref": branch}, ) if readme_response.status_code == 200: readme_info = readme_response.json() content_b64 = readme_info.get("content", "") try: readme_content = base64.b64decode(content_b64).decode( "utf-8" ) # Truncate very long READMEs if len(readme_content) > 2000: readme_content = ( readme_content[:2000] + "\n\n... (truncated)" ) output += "## šŸ“„ README\n\n" output += f"```markdown\n{readme_content}\n```\n\n" readme_found = True except Exception: pass break if not readme_found: output += "## šŸ“„ README\n\n" output += "*No README file found in repository root.*\n\n" # Get recent commits commits_response = await client.get( self._api_url(f"/repos/{owner}/{repo_name}/commits"), headers=self._headers(__user__), params={"sha": branch, "limit": 5}, ) if commits_response.status_code == 200: commits = commits_response.json() output += "## šŸ“ Recent Commits\n\n" for commit in commits: sha = commit.get("sha", "")[:8] commit_data = commit.get("commit", {}) msg = commit_data.get("message", "").split("\n")[0][:60] author = commit_data.get("author", {}).get("name", "unknown") date = commit_data.get("author", {}).get("date", "")[:10] output += f"- `{sha}` {msg}\n" output += f" by {author} on {date}\n\n" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) return output.strip() except Exception as e: return f"Error: Failed to load project context: {type(e).__name__}: {e}" async def list_branches( self, repo: Optional[str] = None, page: int = 1, limit: Optional[int] = None, __user__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ List branches in a repository with pagination. :param repo: Repository in 'owner/repo' format :param page: Page number for pagination (default: 1) :param limit: Results per page (max 50, default: 50) :return: Formatted branch listing with protection status and commit SHAs """ token = self._get_token(__user__) if not token: return "Error: GITEA_TOKEN not configured in UserValves settings." try: owner, repo_name = self._resolve_repo(repo, __user__) except ValueError as e: return f"Error: {e}" page_size = self._get_page_size(limit) if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Fetching branches for {owner}/{repo_name} (page {page})...", "done": False, }, } ) try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: response = await client.get( self._api_url(f"/repos/{owner}/{repo_name}/branches"), headers=self._headers(__user__), params={"page": page, "limit": page_size}, ) response.raise_for_status() branches = response.json() total_count = response.headers.get("x-total-count", "?") output = f"**Branches in {owner}/{repo_name} (Page {page})**\n\n" for branch in branches: name = branch.get("name", "") protected = "šŸ›”ļø" if branch.get("protected") else "" commit_sha = branch.get("commit", {}).get("id", "")[:8] output += f"- `{name}` {protected} [commit: {commit_sha}]\n" output += f"\n**Showing {len(branches)} branches (Total: {total_count})**" if len(branches) == page_size: output += ( f"\n_More results available. Use page={page + 1} to continue._" ) if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) return output.strip() except httpx.HTTPStatusError as e: error_msg = self._format_error(e, "branch listing") return f"Error: Failed to list branches. {error_msg}" except Exception as e: return f"Error: Unexpected failure during branch listing: {type(e).__name__}: {e}" async def create_branch( self, branch_name: str, from_branch: Optional[str] = None, repo: Optional[str] = None, __user__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ Create a new branch from an existing branch. :param branch_name: Name for the new branch :param from_branch: Source branch (defaults to repository default branch) :param repo: Repository in 'owner/repo' format :return: Success confirmation with branch details """ token = self._get_token(__user__) if not token: return "Error: GITEA_TOKEN not configured in UserValves settings." try: owner, repo_name = self._resolve_repo(repo, __user__) except ValueError as e: return f"Error: {e}" effective_from = from_branch or self._get_branch(None, __user__) if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Creating branch {branch_name}...", "done": False, }, } ) try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: response = await client.post( self._api_url(f"/repos/{owner}/{repo_name}/branches"), headers=self._headers(__user__), json={ "new_branch_name": branch_name, "old_branch_name": effective_from, }, ) response.raise_for_status() if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) return f"āœ… Created branch `{branch_name}` from `{effective_from}` in `{owner}/{repo_name}`" except httpx.HTTPStatusError as e: error_msg = self._format_error(e, f"branch creation") if e.response.status_code == 409: return f"Error: Branch `{branch_name}` already exists in repository." return f"Error: Failed to create branch. {error_msg}" except Exception as e: return f"Error: Unexpected failure during branch creation: {type(e).__name__}: {e}" async def list_commits( self, repo: Optional[str] = None, branch: Optional[str] = None, page: int = 1, limit: Optional[int] = None, path: Optional[str] = None, __user__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ List commits in a repository with filtering and pagination. :param repo: Repository in 'owner/repo' format :param branch: Branch name (defaults to repository default) :param page: Page number for pagination (default: 1) :param limit: Results per page (max 50, default: 50) :param path: Filter commits to specific file/directory path :return: Formatted commit history with author, date, and message """ token = self._get_token(__user__) if not token: return "Error: GITEA_TOKEN not configured in UserValves settings." try: owner, repo_name = self._resolve_repo(repo, __user__) except ValueError as e: return f"Error: {e}" effective_branch = self._get_branch(branch, __user__) page_size = self._get_page_size(limit) if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Fetching commits...", "done": False}, } ) try: params = { "sha": effective_branch, "page": page, "limit": page_size, } if path: params["path"] = path async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: response = await client.get( self._api_url(f"/repos/{owner}/{repo_name}/commits"), headers=self._headers(__user__), params=params, ) response.raise_for_status() commits = response.json() total_count = response.headers.get("x-total-count", "?") output = f"**Commits in {owner}/{repo_name}** (`{effective_branch}`" if path: output += f", filtered to: `{path}`" output += f", page {page})\n\n" for commit in commits: sha = commit.get("sha", "")[:8] commit_data = commit.get("commit", {}) msg = commit_data.get("message", "").split("\n")[0][:60] author = commit_data.get("author", {}).get("name", "unknown") date = commit_data.get("author", {}).get("date", "")[:10] output += f"- `{sha}` {msg}\n" output += f" by {author} on {date}\n\n" output += f"**Showing {len(commits)} commits (Total: {total_count})**" if len(commits) == page_size: output += ( f"\n_More results available. Use page={page + 1} to continue._" ) if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) return output.strip() except httpx.HTTPStatusError as e: error_msg = self._format_error(e, "commit listing") return f"Error: Failed to list commits. {error_msg}" except Exception as e: return f"Error: Unexpected failure during commit listing: {type(e).__name__}: {e}" async def get_commit( self, commit_sha: str, repo: Optional[str] = None, __user__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ Get detailed information about a specific commit. :param commit_sha: Full or short commit SHA :param repo: Repository in 'owner/repo' format :return: Comprehensive commit details including files changed and diff stats """ token = self._get_token(__user__) if not token: return "Error: GITEA_TOKEN not configured in UserValves settings." try: owner, repo_name = self._resolve_repo(repo, __user__) except ValueError as e: return f"Error: {e}" if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Fetching commit {commit_sha[:8]}...", "done": False, }, } ) try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: # Get commit details (this endpoint may vary by Gitea version) response = await client.get( self._api_url( f"/repos/{owner}/{repo_name}/git/commits/{commit_sha}" ), headers=self._headers(__user__), ) if response.status_code == 404: # Try alternative endpoint response = await client.get( self._api_url( f"/repos/{owner}/{repo_name}/commits/{commit_sha}" ), headers=self._headers(__user__), ) response.raise_for_status() commit = response.json() output = f"# Commit: {commit.get('sha', '')[:8]}\n\n" # Basic commit info output += f"**Full SHA:** `{commit.get('sha', 'N/A')}`\n" output += f"**Message:** {commit.get('message', 'No message')}\n\n" # Author info author = commit.get("author", {}) if author: output += f"**Author:** {author.get('name', 'unknown')} <{author.get('email', '')}>\n" output += f"**Date:** {author.get('date', 'N/A')}\n\n" # Committer info (if different from author) committer = commit.get("committer", {}) if committer and committer.get("name") != author.get("name"): output += f"**Committer:** {committer.get('name', 'unknown')} <{committer.get('email', '')}>\n\n" # Files changed (if available) if "files" in commit: files = commit.get("files", []) output += f"**Files Changed:** {len(files)}\n\n" for file in files[:20]: # Limit to first 20 status = file.get("status", "") filename = file.get("filename", "") additions = file.get("additions", 0) deletions = file.get("deletions", 0) changes = file.get("changes", 0) output += f"- **{status}:** `{filename}`" if changes > 0: output += f" (+{additions}/-{deletions})\n" else: output += "\n" if len(files) > 20: output += f"\n... and {len(files) - 20} more files\n" elif "stats" in commit: # Alternative stats format stats = commit.get("stats", {}) output += f"**Changes:** +{stats.get('additions', 0)} additions, -{stats.get('deletions', 0)} deletions\n\n" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) return output.strip() except httpx.HTTPStatusError as e: error_msg = self._format_error(e, f"commit fetch for '{commit_sha}'") if e.response.status_code == 404: return f"Error: Commit '{commit_sha}' not found in repository." return f"Error: Failed to fetch commit details. {error_msg}" except Exception as e: return f"Error: Unexpected failure during commit fetch: {type(e).__name__}: {e}" async def list_issues( self, state: str = "open", repo: Optional[str] = None, page: int = 1, limit: Optional[int] = None, labels: Optional[str] = None, __user__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ List issues in a repository with filtering and pagination. :param state: Issue state - 'open', 'closed', or 'all' (default: 'open') :param repo: Repository in 'owner/repo' format :param page: Page number for pagination (default: 1) :param limit: Results per page (max 50, default: 50) :param labels: Comma-separated label names to filter by :return: Formatted issue listing with metadata and assignees """ token = self._get_token(__user__) if not token: return "Error: GITEA_TOKEN not configured in UserValves settings." try: owner, repo_name = self._resolve_repo(repo, __user__) except ValueError as e: return f"Error: {e}" page_size = self._get_page_size(limit) if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Fetching issues...", "done": False}, } ) try: params = { "state": state, "type": "issues", # Filter out PRs "page": page, "limit": page_size, } if labels: params["labels"] = labels async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: response = await client.get( self._api_url(f"/repos/{owner}/{repo_name}/issues"), headers=self._headers(__user__), params=params, ) response.raise_for_status() issues = response.json() total_count = response.headers.get("x-total-count", "?") output = f"**Issues in {owner}/{repo_name}** (state: `{state}`, page: {page})\n\n" if not issues: output += "_No issues found._" else: for issue in issues: number = issue.get("number") title = issue.get("title", "")[:80] user = issue.get("user", {}).get("login", "unknown") # Labels issue_labels = [ label.get("name", "") for label in issue.get("labels", []) ] labels_str = ( f" [{' , '.join(issue_labels)}]" if issue_labels else "" ) # Assignee assignee = issue.get("assignee") assignee_str = ( f" → @{assignee.get('login', '')}" if assignee else "" ) output += f"- **#{number}:** {title}{labels_str} (by @{user}){assignee_str}\n\n" output += f"\n**Showing {len(issues)} issues (Total: {total_count})**" if len(issues) == page_size: output += ( f"\n_More results available. Use page={page + 1} to continue._" ) if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) return output.strip() except httpx.HTTPStatusError as e: error_msg = self._format_error(e, "issue listing") return f"Error: Failed to list issues. {error_msg}" except Exception as e: return f"Error: Unexpected failure during issue listing: {type(e).__name__}: {e}" async def get_issue( self, issue_number: int, repo: Optional[str] = None, __user__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ Get detailed information about a specific issue. :param issue_number: Issue number to retrieve :param repo: Repository in 'owner/repo' format :return: Comprehensive issue details including title, body, labels, and comments count """ token = self._get_token(__user__) if not token: return "Error: GITEA_TOKEN not configured in UserValves settings." try: owner, repo_name = self._resolve_repo(repo, __user__) except ValueError as e: return f"Error: {e}" if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Fetching issue #{issue_number}...", "done": False, }, } ) try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: response = await client.get( self._api_url(f"/repos/{owner}/{repo_name}/issues/{issue_number}"), headers=self._headers(__user__), ) response.raise_for_status() issue = response.json() # Extract issue data title = issue.get("title", "No title") body = issue.get("body", "") state = issue.get("state", "unknown") user = issue.get("user", {}).get("login", "unknown") created_at = issue.get("created_at", "")[:10] updated_at = issue.get("updated_at", "")[:10] comments_count = issue.get("comments", 0) html_url = issue.get("html_url", "") # Labels issue_labels = [ label.get("name", "") for label in issue.get("labels", []) ] labels_str = ", ".join(issue_labels) if issue_labels else "None" # Assignee assignee = issue.get("assignee") assignee_str = ( f"@{assignee.get('login', 'unknown')}" if assignee else "Unassigned" ) # Build output output = f"# Issue #{issue_number}: {title}\n\n" output += f"**State:** {state.upper()}\n" output += f"**Author:** @{user}\n" output += f"**Assignee:** {assignee_str}\n" output += f"**Labels:** {labels_str}\n" output += f"**Created:** {created_at}\n" output += f"**Updated:** {updated_at}\n" output += f"**Comments:** {comments_count}\n" output += f"**URL:** {html_url}\n\n" if body: output += f"## Description\n\n{body}\n\n" else: output += "_No description provided._\n\n" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) return output.strip() except httpx.HTTPStatusError as e: error_msg = self._format_error(e, f"issue #{issue_number}") if e.response.status_code == 404: return f"Error: Issue #{issue_number} not found in {owner}/{repo_name}." return f"Error: Failed to fetch issue. {error_msg}" except Exception as e: return f"Error: Unexpected failure during issue fetch: {type(e).__name__}: {e}" async def update_issue( self, issue_number: int, repo: Optional[str] = None, title: Optional[str] = None, body: Optional[str] = None, state: Optional[str] = None, assignee: Optional[str] = None, __user__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ Update an existing issue with new title, body, state, or assignee. :param issue_number: Issue number to update :param repo: Repository in 'owner/repo' format :param title: New title (optional) :param body: New description body (optional) :param state: New state - "open" or "closed" (optional) :param assignee: Username to assign (optional, use empty string to unassign) :return: Confirmation with updated issue details """ token = self._get_token(__user__) if not token: return "Error: GITEA_TOKEN not configured in UserValves settings." try: owner, repo_name = self._resolve_repo(repo, __user__) except ValueError as e: return f"Error: {e}" # Validate state if provided if state and state not in ("open", "closed"): return "Error: State must be 'open' or 'closed'." if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Updating issue #{issue_number}...", "done": False, }, } ) try: payload = {} if title is not None: payload["title"] = title if body is not None: payload["body"] = body if state is not None: payload["state"] = state if assignee is not None: payload["assignee"] = assignee if assignee else "" async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: response = await client.patch( self._api_url(f"/repos/{owner}/{repo_name}/issues/{issue_number}"), headers=self._headers(__user__), json=payload, ) response.raise_for_status() issue = response.json() updated_title = issue.get("title", "No title") updated_state = issue.get("state", "unknown") html_url = issue.get("html_url", "") if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) output = f"**Issue #{issue_number} Updated Successfully**\n\n" output += f"**Title:** {updated_title}\n" output += f"**State:** {updated_state.upper()}\n" output += f"**URL:** {html_url}\n" output += f"\nChanges applied: {', '.join(payload.keys())}\n" return output.strip() except httpx.HTTPStatusError as e: error_msg = self._format_error(e, f"issue #{issue_number} update") return f"Error: Failed to update issue. {error_msg}" except Exception as e: return f"Error: Unexpected failure during issue update: {type(e).__name__}: {e}" async def close_issue( self, issue_number: int, repo: Optional[str] = None, __user__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ Close an open issue. :param issue_number: Issue number to close :param repo: Repository in 'owner/repo' format :return: Confirmation of issue closure """ return await self.update_issue(issue_number, repo, state="closed", __user__=__user__, __event_emitter__=__event_emitter__) async def reopen_issue( self, issue_number: int, repo: Optional[str] = None, __user__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ Reopen a closed issue. :param issue_number: Issue number to reopen :param repo: Repository in 'owner/repo' format :return: Confirmation of issue reopening """ return await self.update_issue(issue_number, repo, state="open", __user__=__user__, __event_emitter__=__event_emitter__) async def delete_issue( self, issue_number: int, repo: Optional[str] = None, __user__: dict = None, __event_emitter__: Callable[[dict], Any] = None, __event_call__: Callable[[dict], Any] = None, ) -> str: """ Delete an issue from the repository. :param issue_number: Issue number to delete :param repo: Repository in 'owner/repo' format :return: Confirmation of issue deletion """ token = self._get_token(__user__) if not token: return "Error: GITEA_TOKEN not configured in UserValves settings." try: owner, repo_name = self._resolve_repo(repo, __user__) except ValueError as e: return f"Error: {e}" # Confirmation dialog if __event_call__: result = await __event_call__( { "type": "confirmation", "data": { "title": "Confirm Issue Deletion", "message": f"Delete issue #{issue_number} from `{owner}/{repo_name}`?", }, } ) if result is None or result is False: return "āš ļø Issue deletion cancelled by user." if isinstance(result, dict) and not result.get("confirmed"): return "āš ļø Issue deletion cancelled by user." if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Deleting issue #{issue_number}...", "done": False, }, } ) try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: response = await client.delete( self._api_url(f"/repos/{owner}/{repo_name}/issues/{issue_number}"), headers=self._headers(__user__), ) response.raise_for_status() if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) return f"**Issue #{issue_number} Deleted Successfully** from `{owner}/{repo_name}`" except httpx.HTTPStatusError as e: error_msg = self._format_error(e, f"issue #{issue_number} deletion") if e.response.status_code == 404: return f"Error: Issue #{issue_number} not found in {owner}/{repo_name}." return f"Error: Failed to delete issue. {error_msg}" except Exception as e: return f"Error: Unexpected failure during issue deletion: {type(e).__name__}: {e}" async def list_issue_comments( self, issue_number: int, repo: Optional[str] = None, page: int = 1, limit: Optional[int] = None, __user__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ List all comments on an issue. :param issue_number: Issue number to get comments for :param repo: Repository in 'owner/repo' format :param page: Page number for pagination (default: 1) :param limit: Results per page (max 50, default: 50) :return: Formatted comment listing with author and date """ token = self._get_token(__user__) if not token: return "Error: GITEA_TOKEN not configured in UserValves settings." try: owner, repo_name = self._resolve_repo(repo, __user__) except ValueError as e: return f"Error: {e}" page_size = self._get_page_size(limit) if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Fetching comments...", "done": False}, } ) try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: response = await client.get( self._api_url( f"/repos/{owner}/{repo_name}/issues/{issue_number}/comments" ), headers=self._headers(__user__), params={"page": page, "limit": page_size}, ) response.raise_for_status() comments = response.json() total_count = response.headers.get("x-total-count", "?") output = f"**Comments on Issue #{issue_number}** ({owner}/{repo_name})\n\n" if not comments: output += "_No comments yet._" else: for comment in comments: comment_id = comment.get("id", 0) user = comment.get("user", {}).get("login", "unknown") created_at = comment.get("created_at", "")[:10] body = comment.get("body", "No content") output += f"**Comment by @{user}** (ID: {comment_id}, {created_at})\n" output += f"{body}\n\n" output += "---\n\n" output += f"\n**Showing {len(comments)} comments (Total: {total_count})**" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) return output.strip() except httpx.HTTPStatusError as e: error_msg = self._format_error(e, f"issue #{issue_number} comments") if e.response.status_code == 404: return f"Error: Issue #{issue_number} not found in {owner}/{repo_name}." return f"Error: Failed to list issue comments. {error_msg}" except Exception as e: return f"Error: Unexpected failure during comment listing: {type(e).__name__}: {e}" async def create_issue_comment( self, issue_number: int, body: str, repo: Optional[str] = None, __user__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ Add a comment to an issue. :param issue_number: Issue number to comment on :param body: Comment content :param repo: Repository in 'owner/repo' format :return: Confirmation with comment details """ token = self._get_token(__user__) if not token: return "Error: GITEA_TOKEN not configured in UserValves settings." try: owner, repo_name = self._resolve_repo(repo, __user__) except ValueError as e: return f"Error: {e}" if not body or not body.strip(): return "Error: Comment body cannot be empty." if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Adding comment...", "done": False}, } ) try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: response = await client.post( self._api_url( f"/repos/{owner}/{repo_name}/issues/{issue_number}/comments" ), headers=self._headers(__user__), json={"body": body}, ) response.raise_for_status() comment = response.json() comment_id = comment.get("id", 0) html_url = comment.get("html_url", "") if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) output = f"**Comment Added Successfully**\n\n" output += f"**Issue:** #{issue_number}\n" output += f"**Comment ID:** {comment_id}\n" output += f"**URL:** {html_url}\n\n" output += f"**Content:**\n{body}\n" return output.strip() except httpx.HTTPStatusError as e: error_msg = self._format_error(e, f"comment on issue #{issue_number}") if e.response.status_code == 404: return f"Error: Issue #{issue_number} not found in {owner}/{repo_name}." return f"Error: Failed to create comment. {error_msg}" except Exception as e: return f"Error: Unexpected failure during comment creation: {type(e).__name__}: {e}" async def update_issue_comment( self, comment_id: int, body: str, repo: Optional[str] = None, __user__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ Update an existing comment on an issue. :param comment_id: Comment ID to update :param body: New comment content :param repo: Repository in 'owner/repo' format :return: Confirmation with updated comment details """ token = self._get_token(__user__) if not token: return "Error: GITEA_TOKEN not configured in UserValves settings." try: owner, repo_name = self._resolve_repo(repo, __user__) except ValueError as e: return f"Error: {e}" if not body or not body.strip(): return "Error: Comment body cannot be empty." if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Updating comment...", "done": False}, } ) try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: response = await client.patch( self._api_url(f"/repos/{owner}/{repo_name}/issues/comments/{comment_id}"), headers=self._headers(__user__), json={"body": body}, ) response.raise_for_status() comment = response.json() updated_body = comment.get("body", "No content") updated_at = comment.get("updated_at", "")[:10] if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) output = f"**Comment {comment_id} Updated Successfully**\n\n" output += f"**Repository:** {owner}/{repo_name}\n" output += f"**Updated:** {updated_at}\n\n" output += f"**New Content:**\n{updated_body}\n" return output.strip() except httpx.HTTPStatusError as e: error_msg = self._format_error(e, f"comment #{comment_id}") if e.response.status_code == 404: return f"Error: Comment #{comment_id} not found in {owner}/{repo_name}." return f"Error: Failed to update comment. {error_msg}" except Exception as e: return f"Error: Unexpected failure during comment update: {type(e).__name__}: {e}" async def delete_issue_comment( self, comment_id: int, repo: Optional[str] = None, __user__: dict = None, __event_emitter__: Callable[[dict], Any] = None, __event_call__: Callable[[dict], Any] = None, ) -> str: """ Delete a comment from an issue. :param comment_id: Comment ID to delete :param repo: Repository in 'owner/repo' format :return: Confirmation of comment deletion """ token = self._get_token(__user__) if not token: return "Error: GITEA_TOKEN not configured in UserValves settings." try: owner, repo_name = self._resolve_repo(repo, __user__) except ValueError as e: return f"Error: {e}" # Confirmation dialog if __event_call__: result = await __event_call__( { "type": "confirmation", "data": { "title": "Confirm Comment Deletion", "message": f"Delete comment #{comment_id} from `{owner}/{repo_name}`?", }, } ) if result is None or result is False: return "āš ļø Comment deletion cancelled by user." if isinstance(result, dict) and not result.get("confirmed"): return "āš ļø Comment deletion cancelled by user." if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Deleting comment #{comment_id}...", "done": False, }, } ) try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: response = await client.delete( self._api_url(f"/repos/{owner}/{repo_name}/issues/comments/{comment_id}"), headers=self._headers(__user__), ) response.raise_for_status() if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) return f"**Comment #{comment_id} Deleted Successfully** from `{owner}/{repo_name}`" except httpx.HTTPStatusError as e: error_msg = self._format_error(e, f"comment #{comment_id} deletion") if e.response.status_code == 404: return f"Error: Comment #{comment_id} not found in {owner}/{repo_name}." return f"Error: Failed to delete comment. {error_msg}" except Exception as e: return f"Error: Unexpected failure during comment deletion: {type(e).__name__}: {e}" async def create_issue( self, title: str, body: str = "", repo: Optional[str] = None, labels: Optional[List[str]] = None, assignee: Optional[str] = None, __user__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ Create a new issue in the repository. :param title: Issue title :param body: Issue description/body :param repo: Repository in 'owner/repo' format :param labels: List of label names to apply :param assignee: Username to assign the issue to :return: Confirmation with issue number and URL """ token = self._get_token(__user__) if not token: return "Error: GITEA_TOKEN not configured in UserValves settings." try: owner, repo_name = self._resolve_repo(repo, __user__) except ValueError as e: return f"Error: {e}" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Creating issue...", "done": False}, } ) try: payload = { "title": title, "body": body, } if labels: payload["labels"] = labels if assignee: payload["assignee"] = assignee async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: response = await client.post( self._api_url(f"/repos/{owner}/{repo_name}/issues"), headers=self._headers(__user__), json=payload, ) response.raise_for_status() issue = response.json() issue_number = issue.get("number") issue_url = issue.get("html_url", "") if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) output = f"**Issue Created Successfully**\n\n" output += f"**Issue #{issue_number}:** {title}\n" output += f"**URL:** {issue_url}\n" if labels: output += f"**Labels:** {', '.join(labels)}\n" if assignee: output += f"**Assigned to:** @{assignee}\n" return output.strip() except httpx.HTTPStatusError as e: error_msg = self._format_error(e, f"issue creation") return f"Error: Failed to create issue. {error_msg}" except Exception as e: return f"Error: Unexpected failure during issue creation: {type(e).__name__}: {e}" async def list_pull_requests( self, state: str = "open", repo: Optional[str] = None, page: int = 1, limit: Optional[int] = None, __user__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ List pull requests in a repository with pagination. :param state: PR state - 'open', 'closed', or 'all' (default: 'open') :param repo: Repository in 'owner/repo' format :param page: Page number for pagination (default: 1) :param limit: Results per page (max 50, default: 50) :return: Formatted PR listing with merge status and branch information """ token = self._get_token(__user__) if not token: return "Error: GITEA_TOKEN not configured in UserValves settings." try: owner, repo_name = self._resolve_repo(repo, __user__) except ValueError as e: return f"Error: {e}" page_size = self._get_page_size(limit) if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Fetching pull requests...", "done": False}, } ) try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: response = await client.get( self._api_url(f"/repos/{owner}/{repo_name}/pulls"), headers=self._headers(__user__), params={"state": state, "page": page, "limit": page_size}, ) response.raise_for_status() prs = response.json() total_count = response.headers.get("x-total-count", "?") output = f"**Pull Requests in {owner}/{repo_name}** (state: `{state}`, page: {page})\n\n" if not prs: output += "_No pull requests found._" else: for pr in prs: number = pr.get("number") title = pr.get("title", "")[:80] user = pr.get("user", {}).get("login", "unknown") head = pr.get("head", {}).get("ref", "") base = pr.get("base", {}).get("ref", "") # Merge status mergeable = pr.get("mergeable", None) merge_status = "" if mergeable is True: merge_status = " āœ… mergeable" elif mergeable is False: merge_status = " āš ļø conflicts" output += f"### PR #{number}: {title}\n" output += f"**Branch:** `{head}` → `{base}` (by @{user}){merge_status}\n\n" output += f"\n**Showing {len(prs)} PRs (Total: {total_count})**" if len(prs) == page_size: output += ( f"\n_More results available. Use page={page + 1} to continue._" ) if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) return output.strip() except httpx.HTTPStatusError as e: error_msg = self._format_error(e, "pull request listing") return f"Error: Failed to list pull requests. {error_msg}" except Exception as e: return ( f"Error: Unexpected failure during PR listing: {type(e).__name__}: {e}" ) async def get_pull_request( self, pr_number: int, repo: Optional[str] = None, __user__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ Get detailed information about a specific pull request. :param pr_number: Pull request number to retrieve :param repo: Repository in 'owner/repo' format :return: Comprehensive PR details including title, body, branches, and merge status """ token = self._get_token(__user__) if not token: return "Error: GITEA_TOKEN not configured in UserValves settings." try: owner, repo_name = self._resolve_repo(repo, __user__) except ValueError as e: return f"Error: {e}" if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Fetching PR #{pr_number}...", "done": False, }, } ) try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: response = await client.get( self._api_url(f"/repos/{owner}/{repo_name}/pulls/{pr_number}"), headers=self._headers(__user__), ) response.raise_for_status() pr = response.json() # Extract PR data title = pr.get("title", "No title") body = pr.get("body", "") state = pr.get("state", "unknown") user = pr.get("user", {}).get("login", "unknown") created_at = pr.get("created_at", "")[:10] updated_at = pr.get("updated_at", "")[:10] head_ref = pr.get("head", {}).get("ref", "") base_ref = pr.get("base", {}).get("ref", "") mergeable = pr.get("mergeable", None) merged = pr.get("merged", False) html_url = pr.get("html_url", "") # Build output output = f"# PR #{pr_number}: {title}\n\n" output += f"**State:** {state.upper()}{' (MERGED)' if merged else ''}\n" output += f"**Author:** @{user}\n" output += f"**Branches:** `{head_ref}` → `{base_ref}`\n" output += f"**Created:** {created_at}\n" output += f"**Updated:** {updated_at}\n" output += f"**URL:** {html_url}\n" # Merge status if mergeable is True: output += "**Merge Status:** āœ… Mergeable\n" elif mergeable is False: output += "**Merge Status:** āš ļø Has conflicts\n" else: output += "**Merge Status:** Unknown\n" output += "\n" if body: output += f"## Description\n\n{body}\n\n" else: output += "_No description provided._\n\n" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) return output.strip() except httpx.HTTPStatusError as e: error_msg = self._format_error(e, f"PR #{pr_number}") if e.response.status_code == 404: return f"Error: PR #{pr_number} not found in {owner}/{repo_name}." return f"Error: Failed to fetch pull request. {error_msg}" except Exception as e: return f"Error: Unexpected failure during PR fetch: {type(e).__name__}: {e}" async def update_pull_request( self, pr_number: int, repo: Optional[str] = None, title: Optional[str] = None, body: Optional[str] = None, __user__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ Update an existing pull request with new title or body. :param pr_number: Pull request number to update :param repo: Repository in 'owner/repo' format :param title: New title (optional) :param body: New description body (optional) :return: Confirmation with updated PR details """ token = self._get_token(__user__) if not token: return "Error: GITEA_TOKEN not configured in UserValves settings." try: owner, repo_name = self._resolve_repo(repo, __user__) except ValueError as e: return f"Error: {e}" if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Updating PR #{pr_number}...", "done": False, }, } ) try: payload = {} if title is not None: payload["title"] = title if body is not None: payload["body"] = body async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: response = await client.patch( self._api_url(f"/repos/{owner}/{repo_name}/pulls/{pr_number}"), headers=self._headers(__user__), json=payload, ) response.raise_for_status() pr = response.json() updated_title = pr.get("title", "No title") html_url = pr.get("html_url", "") if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) output = f"**PR #{pr_number} Updated Successfully**\n\n" output += f"**Title:** {updated_title}\n" output += f"**URL:** {html_url}\n" if payload: output += f"\nChanges applied: {', '.join(payload.keys())}\n" return output.strip() except httpx.HTTPStatusError as e: error_msg = self._format_error(e, f"PR #{pr_number} update") return f"Error: Failed to update pull request. {error_msg}" except Exception as e: return f"Error: Unexpected failure during PR update: {type(e).__name__}: {e}" async def merge_pull_request( self, pr_number: int, repo: Optional[str] = None, merge_strategy: str = "merge", __user__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ Merge a pull request using the specified strategy. :param pr_number: Pull request number to merge :param repo: Repository in 'owner/repo' format :param merge_strategy: Merge strategy - "merge", "rebase", or "squash" (default: "merge") :return: Confirmation of merge or error details """ token = self._get_token(__user__) if not token: return "Error: GITEA_TOKEN not configured in UserValves settings." try: owner, repo_name = self._resolve_repo(repo, __user__) except ValueError as e: return f"Error: {e}" # Validate merge strategy if merge_strategy not in ("merge", "rebase", "squash"): return "Error: Merge strategy must be 'merge', 'rebase', or 'squash'." if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Merging PR #{pr_number}...", "done": False, }, } ) try: async with httpx.AsyncClient( timeout=60.0, verify=self.valves.VERIFY_SSL ) as client: response = await client.post( self._api_url( f"/repos/{owner}/{repo_name}/pulls/{pr_number}/merge" ), headers=self._headers(__user__), json={"merge_strategy": merge_strategy}, ) if response.status_code == 405: return "Error: PR cannot be merged. Check if it's already merged or has conflicts." response.raise_for_status() result = response.json() if response.text else {} merged = result.get("merged", True) commit_sha = result.get("merge_commit", {}).get("sha", "")[:8] if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) if merged: return f"**PR #{pr_number} Merged Successfully**\n\n" output += f"**Strategy:** {merge_strategy.upper()}\n" if commit_sha: output += f"**Merge Commit:** `{commit_sha}`\n" output += f"\nāœ… The pull request has been merged into {owner}/{repo_name}." else: return f"**PR #{pr_number} Merge Result:**\n\n{result}\n" except httpx.HTTPStatusError as e: error_msg = self._format_error(e, f"PR #{pr_number} merge") if e.response.status_code == 405: return f"Error: PR #{pr_number} cannot be merged. It may already be merged or have merge conflicts." return f"Error: Failed to merge PR. {error_msg}" except Exception as e: return f"Error: Unexpected failure during PR merge: {type(e).__name__}: {e}" async def list_pull_request_comments( self, pr_number: int, repo: Optional[str] = None, page: int = 1, limit: Optional[int] = None, __user__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ List all reviews/comments on a pull request. :param pr_number: PR number to get comments/reviews for :param repo: Repository in 'owner/repo' format :param page: Page number for pagination (default: 1) :param limit: Results per page (max 50, default: 50) :return: Formatted review/comment listing with author and content """ token = self._get_token(__user__) if not token: return "Error: GITEA_TOKEN not configured in UserValves settings." try: owner, repo_name = self._resolve_repo(repo, __user__) except ValueError as e: return f"Error: {e}" page_size = self._get_page_size(limit) if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Fetching PR reviews...", "done": False}, } ) try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: # Try reviews endpoint first (Gitea uses /pulls/{index}/reviews) response = await client.get( self._api_url( f"/repos/{owner}/{repo_name}/pulls/{pr_number}/reviews" ), headers=self._headers(__user__), params={"page": page, "limit": page_size}, ) # Fallback to issue comments if reviews endpoint not available if response.status_code == 404: response = await client.get( self._api_url( f"/repos/{owner}/{repo_name}/issues/{pr_number}/comments" ), headers=self._headers(__user__), params={"page": page, "limit": page_size}, ) response.raise_for_status() comments = response.json() total_count = response.headers.get("x-total-count", "?") output = f"**Reviews/Comments on PR #{pr_number}** ({owner}/{repo_name})\n\n" if not comments: output += "_No reviews or comments yet._" else: for comment in comments: comment_id = comment.get("id", 0) user = comment.get("user", {}).get("login", "unknown") created_at = comment.get("created_at", "")[:10] body = comment.get("body", "No content") # Review-specific fields state = comment.get("state", "") if state: output += f"**Review by @{user}** - {state} ({created_at})\n" else: output += f"**Comment by @{user}** ({created_at})\n" output += f"{body}\n\n" output += "---\n\n" output += f"\n**Showing {len(comments)} items (Total: {total_count})**" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) return output.strip() except httpx.HTTPStatusError as e: error_msg = self._format_error(e, f"PR #{pr_number} reviews") if e.response.status_code == 404: return f"Error: PR #{pr_number} not found in {owner}/{repo_name}." return f"Error: Failed to list PR reviews. {error_msg}" except Exception as e: return f"Error: Unexpected failure during review listing: {type(e).__name__}: {e}" async def create_pull_request_comment( self, pr_number: int, body: str, repo: Optional[str] = None, __user__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ Add a review comment to a pull request. :param pr_number: PR number to comment on :param body: Comment content :param repo: Repository in 'owner/repo' format :return: Confirmation with comment details """ token = self._get_token(__user__) if not token: return "Error: GITEA_TOKEN not configured in UserValves settings." try: owner, repo_name = self._resolve_repo(repo, __user__) except ValueError as e: return f"Error: {e}" if not body or not body.strip(): return "Error: Comment body cannot be empty." if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Adding review comment...", "done": False}, } ) try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: response = await client.post( self._api_url( f"/repos/{owner}/{repo_name}/pulls/{pr_number}/reviews" ), headers=self._headers(__user__), json={ "body": body, "event": "COMMENT", # Just add a comment, don't approve/reject }, ) # Fallback to issue comments if reviews endpoint not available if response.status_code == 404: response = await client.post( self._api_url( f"/repos/{owner}/{repo_name}/issues/{pr_number}/comments" ), headers=self._headers(__user__), json={"body": body}, ) response.raise_for_status() comment = response.json() comment_id = comment.get("id", 0) html_url = comment.get("html_url", "") if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) output = f"**Review Comment Added Successfully**\n\n" output += f"**PR:** #{pr_number}\n" output += f"**Comment ID:** {comment_id}\n" output += f"**URL:** {html_url}\n\n" output += f"**Content:**\n{body}\n" return output.strip() except httpx.HTTPStatusError as e: error_msg = self._format_error(e, f"comment on PR #{pr_number}") if e.response.status_code == 404: return f"Error: PR #{pr_number} not found in {owner}/{repo_name}." return f"Error: Failed to create review comment. {error_msg}" except Exception as e: return f"Error: Unexpected failure during comment creation: {type(e).__name__}: {e}" async def update_pull_request_comment( self, comment_id: int, body: str, repo: Optional[str] = None, __user__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ Update an existing review comment on a pull request. :param comment_id: Comment ID to update :param body: New comment content :param repo: Repository in 'owner/repo' format :return: Confirmation with updated comment details """ token = self._get_token(__user__) if not token: return "Error: GITEA_TOKEN not configured in UserValves settings." try: owner, repo_name = self._resolve_repo(repo, __user__) except ValueError as e: return f"Error: {e}" if not body or not body.strip(): return "Error: Comment body cannot be empty." if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Updating review comment...", "done": False}, } ) try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: response = await client.patch( self._api_url( f"/repos/{owner}/{repo_name}/pulls/comments/{comment_id}" ), headers=self._headers(__user__), json={"body": body}, ) # Fallback to issue comments if reviews endpoint not available if response.status_code == 404: response = await client.patch( self._api_url( f"/repos/{owner}/{repo_name}/issues/comments/{comment_id}" ), headers=self._headers(__user__), json={"body": body}, ) response.raise_for_status() comment = response.json() updated_body = comment.get("body", "No content") updated_at = comment.get("updated_at", "")[:10] if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) output = f"**Review Comment {comment_id} Updated Successfully**\n\n" output += f"**Repository:** {owner}/{repo_name}\n" output += f"**Updated:** {updated_at}\n\n" output += f"**New Content:**\n{updated_body}\n" return output.strip() except httpx.HTTPStatusError as e: error_msg = self._format_error(e, f"comment #{comment_id}") if e.response.status_code == 404: return f"Error: Comment #{comment_id} not found in {owner}/{repo_name}." return f"Error: Failed to update review comment. {error_msg}" except Exception as e: return f"Error: Unexpected failure during comment update: {type(e).__name__}: {e}" async def delete_pull_request_comment( self, comment_id: int, repo: Optional[str] = None, __user__: dict = None, __event_emitter__: Callable[[dict], Any] = None, __event_call__: Callable[[dict], Any] = None, ) -> str: """ delete a review comment from a pull request. :param comment_id: Comment ID to delete :param repo: Repository in 'owner/repo' format :return: Confirmation of comment deletion """ token = self._get_token(__user__) if not token: return "Error: GITEA_TOKEN not configured in UserValves settings." try: owner, repo_name = self._resolve_repo(repo, __user__) except ValueError as e: return f"Error: {e}" # Confirmation dialog if __event_call__: result = await __event_call__( { "type": "confirmation", "data": { "title": "Confirm Review Comment Deletion", "message": f"Delete review comment #{comment_id} from `{owner}/{repo_name}`?", }, } ) if result is None or result is False: return "āš ļø Comment deletion cancelled by user." if isinstance(result, dict) and not result.get("confirmed"): return "āš ļø Comment deletion cancelled by user." if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Deleting review comment #{comment_id}...", "done": False, }, } ) try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: # Try pull request comments endpoint first response = await client.delete( self._api_url( f"/repos/{owner}/{repo_name}/pulls/comments/{comment_id}" ), headers=self._headers(__user__), ) # Fallback to issue comments if reviews endpoint not available if response.status_code == 404: response = await client.delete( self._api_url( f"/repos/{owner}/{repo_name}/issues/comments/{comment_id}" ), headers=self._headers(__user__), ) response.raise_for_status() if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) return f"**Review Comment #{comment_id} Deleted Successfully** from `{owner}/{repo_name}`" except httpx.HTTPStatusError as e: error_msg = self._format_error(e, f"comment #{comment_id} deletion") if e.response.status_code == 404: return f"Error: Comment #{comment_id} not found in {owner}/{repo_name}." return f"Error: Failed to delete review comment. {error_msg}" except Exception as e: return f"Error: Unexpected failure during comment deletion: {type(e).__name__}: {e}" async def create_pull_request( self, title: str, head_branch: str, base_branch: Optional[str] = None, body: str = "", repo: Optional[str] = None, __user__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ Create a new pull request. :param title: PR title :param head_branch: Source branch (your changes) :param base_branch: Target branch (defaults to repository default) :param body: PR description :param repo: Repository in 'owner/repo' format :return: Confirmation with PR number and URL """ token = self._get_token(__user__) if not token: return "Error: GITEA_TOKEN not configured in UserValves settings." try: owner, repo_name = self._resolve_repo(repo, __user__) except ValueError as e: return f"Error: {e}" effective_base = base_branch or self._get_branch(None, __user__) if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Creating pull request...", "done": False}, } ) try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: response = await client.post( self._api_url(f"/repos/{owner}/{repo_name}/pulls"), headers=self._headers(__user__), json={ "title": title, "head": head_branch, "base": effective_base, "body": body, }, ) response.raise_for_status() pr = response.json() pr_number = pr.get("number") pr_url = pr.get("html_url", "") if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) output = f"**Pull Request Created Successfully**\n\n" output += f"**PR #{pr_number}:** {title}\n" output += f"**Branch:** `{head_branch}` → `{effective_base}`\n" output += f"**URL:** {pr_url}\n" if body: output += f"\n**Description:**\n{body[:100]}{'...' if len(body) > 100 else ''}\n" return output.strip() except httpx.HTTPStatusError as e: error_msg = self._format_error(e, f"pull request creation") return f"Error: Failed to create pull request. {error_msg}" except Exception as e: return ( f"Error: Unexpected failure during PR creation: {type(e).__name__}: {e}" ) async def delete_file( self, path: str, message: str, repo: Optional[str] = None, branch: Optional[str] = None, __user__: dict = None, __event_emitter__: Callable[[dict], Any] = None, __event_call__: Callable[[dict], Any] = None, ) -> str: """ Delete a file from the repository (requires confirmation). :param path: File path to delete :param message: Commit message for the deletion :param repo: Repository in 'owner/repo' format :param branch: Branch name (defaults to repository default) :return: Confirmation with commit details """ token = self._get_token(__user__) if not token: return "Error: GITEA_TOKEN not configured in UserValves settings." try: owner, repo_name = self._resolve_repo(repo, __user__) except ValueError as e: return f"Error: {e}" effective_branch = self._get_branch(branch, __user__) # Confirmation dialog if __event_call__: result = await __event_call__( { "type": "confirmation", "data": { "title": "Confirm File Deletion", "message": f"Delete `{path}` from `{owner}/{repo_name}` ({effective_branch})?", }, } ) if result is None or result is False: return "āš ļø File deletion cancelled by user." if isinstance(result, dict) and not result.get("confirmed"): return "āš ļø File deletion cancelled by user." if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": f"Deleting {path}...", "done": False}, } ) try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: # Get file SHA get_response = await client.get( self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), headers=self._headers(__user__), params={"ref": effective_branch}, ) if get_response.status_code == 404: return f"Error: File not found: `{path}`" get_response.raise_for_status() file_info = get_response.json() sha = file_info.get("sha") if not sha: return "Error: Could not retrieve file SHA for deletion." # Delete file response = await client.delete( self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), headers=self._headers(__user__), json={ "message": message, "branch": effective_branch, "sha": sha, }, ) response.raise_for_status() result = response.json() commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) output = f"**File Deleted Successfully**\n\n" output += f"**File:** `{owner}/{repo_name}/{path}`\n" output += f"**Branch:** `{effective_branch}`\n" output += f"**Commit:** `{commit_sha}`\n" output += f"**Message:** {message}\n" return output.strip() except httpx.HTTPStatusError as e: error_msg = self._format_error(e, f"file deletion for '{path}'") return f"Error: Failed to delete file. {error_msg}" except Exception as e: return f"Error: Unexpected failure during file deletion: {type(e).__name__}: {e}"