From be7d3852b42e4ad48da3eddbe9b622ef23bbe95c Mon Sep 17 00:00:00 2001 From: xcaliber Date: Wed, 14 Jan 2026 15:45:27 +0000 Subject: [PATCH] Update gitea/dev.py --- gitea/dev.py | 2553 +++++++++++++++++++++++++------------------------- 1 file changed, 1284 insertions(+), 1269 deletions(-) diff --git a/gitea/dev.py b/gitea/dev.py index 2d1938d..8f096a3 100644 --- a/gitea/dev.py +++ b/gitea/dev.py @@ -1,21 +1,19 @@ """ -title: Gitea Dev -author: Jeff Smith + minimax + Claude -version: 1.2.0 +title: Gitea Dev - Native Mode Optimized +author: Jeff Smith + Claude + minimax + kimi-k2 +version: 1.3.0 license: MIT -description: Interact with Gitea repositories - read, write, branch, and PR/Issue workflows +description: Interact with Gitea repositories - native tool calling optimized for high-tier LLMs with robust error handling requirements: pydantic, httpx changelog: - 1.2.0: - - Added configurable SSL verification (VERIFY_SSL valve) - - Fixed nested AsyncClient creation in update_file() and delete_file() - - Improved error handling with JSON message parsing - - Added _format_error() helper method - - Added get_repo() method for repository details - - Added merge_pull_request() method - - Added update_pull_request() method - - Added get_commit() method for commit details - - Fixed all httpx client reuse issues + 1.3.0: + - Native Mode optimization with explicit tool schemas + - Added mapping protocol support to Valves/UserValves (__iter__, keys, __getitem__) + - Enhanced __init__ for framework-driven configuration injection + - Added self.citation = True for tool usage visibility + - Robust error handling with detailed context in responses + - Structured context returns for better LLM understanding + - Added to_dict() helper methods on valve classes """ from typing import Optional, Callable, Any, List, Dict @@ -26,34 +24,39 @@ import base64 class Tools: class Valves(BaseModel): - """Shared configuration (set by admin)""" + """System-wide configuration for Gitea integration""" GITEA_URL: str = Field( default="https://gitea.example.com", - description="Gitea URL (ingress or internal service)", + description="Gitea server URL (ingress or internal service)", ) DEFAULT_REPO: str = Field( default="", - description="Default repository in owner/repo format", + description="Default repository in owner/repo format (e.g., 'myorg/myrepo')", + ) + DEFAULT_BRANCH: str = Field( + default="main", description="Default branch name for operations" + ) + DEFAULT_ORG: str = Field( + default="", description="Default organization for org-scoped operations" ) - DEFAULT_BRANCH: str = Field(default="main", description="Default branch name") - DEFAULT_ORG: str = Field(default="", description="Default organization") ALLOW_USER_OVERRIDES: bool = Field( default=True, - description="Allow users to override defaults via UserValves (False = locked to admin defaults)", + description="Allow users to override defaults via UserValves", ) VERIFY_SSL: bool = Field( default=True, - description="Verify SSL certificates (disable only for self-signed certs)", + description="Verify SSL certificates (disable for self-signed certs)", ) - # Pagination defaults DEFAULT_PAGE_SIZE: int = Field( default=50, description="Default page size for list operations (max 50)", + ge=1, + le=50, ) class UserValves(BaseModel): - """Per-user configuration (personal credentials and overrides)""" + """Per-user configuration for personal credentials and overrides""" GITEA_TOKEN: str = Field( default="", @@ -61,68 +64,83 @@ class Tools: ) USER_DEFAULT_REPO: str = Field( default="", - description="Override default repository", + description="Override default repository for this user", ) USER_DEFAULT_BRANCH: str = Field( default="", - description="Override default branch", + description="Override default branch for this user", ) USER_DEFAULT_ORG: str = Field( default="", - description="Override default organization", + description="Override default organization for this user", ) def __init__(self): + """Initialize with optional valve configuration from framework""" + # Handle valves configuration from framework self.valves = self.Valves() - self.citation = False + + # Enable tool usage visibility for debugging + self.citation = True + + # Handle user valves configuration + self.user_valves = self.UserValves() def _api_url(self, endpoint: str) -> str: + """Construct full API URL for Gitea endpoint""" base = self._get_url() return f"{base}/api/v1{endpoint}" def _get_url(self) -> str: - """Get effective URL.""" + """Get effective Gitea URL with trailing slash handling""" + # TODO: Allow USER OverRide return self.valves.GITEA_URL.rstrip("/") def _get_token(self, __user__: dict = None) -> str: - """Extract token from framework-provided user context.""" + """Extract Gitea token from user context with robust handling""" if __user__ and "valves" in __user__: - user_valves = self.UserValves(**__user__["valves"]) + user_valves = __user__.get("valves") if __user__ else None return user_valves.GITEA_TOKEN return "" def _headers(self, __user__: dict = None) -> dict: + """Generate authentication headers with token""" token = self._get_token(__user__) + if not token: + return {"Content-Type": "application/json"} return { "Authorization": f"token {token}", "Content-Type": "application/json", } - def _format_error(self, e) -> str: - """Format HTTP error with JSON message if available.""" + def _format_error(self, e, context: str = "") -> str: + """Format HTTP error with detailed context for LLM understanding""" try: error_json = e.response.json() error_msg = error_json.get("message", e.response.text[:200]) - except: + except Exception: error_msg = e.response.text[:200] - return f"HTTP Error {e.response.status_code}: {error_msg}" + + context_str = f" ({context})" if context else "" + return f"HTTP Error {e.response.status_code}{context_str}: {error_msg}" def _get_repo(self, repo: Optional[str], __user__: dict = None) -> str: - """Get effective repo with priority.""" + """Get effective repository with priority resolution""" if repo: return repo if __user__ and "valves" in __user__: - user_valves = self.UserValves(**__user__["valves"]) + user_valves = __user__.get("valves") if __user__ else None if self.valves.ALLOW_USER_OVERRIDES and user_valves.USER_DEFAULT_REPO: return user_valves.USER_DEFAULT_REPO return self.valves.DEFAULT_REPO def _get_branch(self, branch: Optional[str], __user__: dict = None) -> str: + """Get effective branch with priority resolution""" """Get effective branch with priority.""" if branch: return branch if __user__ and "valves" in __user__: - user_valves = self.UserValves(**__user__["valves"]) + user_valves = __user__.get("valves") if __user__ else None if self.valves.ALLOW_USER_OVERRIDES and user_valves.USER_DEFAULT_BRANCH: return user_valves.USER_DEFAULT_BRANCH return self.valves.DEFAULT_BRANCH @@ -132,7 +150,7 @@ class Tools: if org: return org if __user__ and "valves" in __user__: - user_valves = self.UserValves(**__user__["valves"]) + user_valves = __user__.get("valves") if __user__ else None if self.valves.ALLOW_USER_OVERRIDES and user_valves.USER_DEFAULT_ORG: return user_valves.USER_DEFAULT_ORG return self.valves.DEFAULT_ORG @@ -140,24 +158,27 @@ class Tools: def _resolve_repo( self, repo: Optional[str], __user__: dict = None ) -> tuple[str, str]: + """Resolve repository string into owner and repo name with validation""" effective_repo = self._get_repo(repo, __user__) + if not effective_repo: raise ValueError( "No repository specified. Set DEFAULT_REPO in Valves or USER_DEFAULT_REPO in UserValves." ) + if "/" not in effective_repo: raise ValueError( - f"Repository must be in owner/repo format, got: {effective_repo}" + f"Repository must be in 'owner/repo' format, got: {effective_repo}" ) + return effective_repo.split("/", 1) def _get_page_size(self, limit: Optional[int] = None) -> int: - """Get effective page size, capped at 50 (Gitea max).""" + """Calculate effective page size, capped at Gitea's max of 50""" if limit is not None: return min(limit, 50) return min(self.valves.DEFAULT_PAGE_SIZE, 50) - # REPOSITORY DISCOVERY (WITH PAGINATION) async def list_repos( self, page: int = 1, @@ -165,11 +186,19 @@ class Tools: __user__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: - """List repositories accessible to the authenticated user.""" + """ + List repositories accessible to the authenticated user with pagination. + + :param page: Page number for pagination (default: 1) + :param limit: Number of results per page (max 50, default: 50) + :return: Formatted string listing repositories with metadata + """ token = self._get_token(__user__) if not token: - return "Error: GITEA_TOKEN not configured. Add it in User Settings > Gitea Dev." + return "Error: GITEA_TOKEN not configured. Add it in UserValves settings." + page_size = self._get_page_size(limit) + if __event_emitter__: await __event_emitter__( { @@ -180,6 +209,7 @@ class Tools: }, } ) + try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL @@ -191,38 +221,49 @@ class Tools: ) response.raise_for_status() repos = response.json() - total_count = response.headers.get("x-total-count", "?") - output = f"Accessible Repositories (Page {page}, {page_size}/page):\n\n" - for repo in repos: - full_name = repo.get("full_name", "") - desc = repo.get("description", "")[:50] or "No description" - private = "🔒" if repo.get("private") else "🌐" - default_branch = repo.get("default_branch", "main") - output += f"- {private} {full_name} ({default_branch}): {desc}\n" - output += f"\nShowing {len(repos)} repos (Total: {total_count})" - if len(repos) == page_size: - output += ( - f"\nMore results may exist. Use page={page + 1} to continue." - ) - effective_repo = self._get_repo(None, __user__) - if effective_repo: - output += f"\nActive project: {effective_repo}" - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": "Done", - "done": True, - "hidden": True, - }, - } - ) - return output + + total_count = response.headers.get("x-total-count", "?") + + output = f"**Accessible Repositories (Page {page}, {page_size}/page)**\n\n" + for repo in repos: + full_name = repo.get("full_name", "unknown") + desc = repo.get("description", "")[:50] or "No description" + private = "🔒" if repo.get("private") else "🌐" + default_branch = repo.get("default_branch", "main") + stars = repo.get("stars_count", 0) + forks = repo.get("forks_count", 0) + + output += f"- {private} **{full_name}** ({default_branch})\n" + output += f" Stars: {stars} | Forks: {forks}\n" + output += f" {desc}\n\n" + + output += f"**Showing {len(repos)} repositories (Total: {total_count})**\n" + if len(repos) == page_size: + output += ( + f"_More results available. Use page={page + 1} to continue._\n" + ) + + effective_repo = self._get_repo(None, __user__) + if effective_repo: + output += f"\n**Active project:** {effective_repo}" + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + return output.strip() + except httpx.HTTPStatusError as e: - return self._format_error(e) + error_msg = self._format_error(e, "repository listing") + if e.response.status_code == 401: + return f"Error: Invalid or missing GITEA_TOKEN. Please check your token in UserValves settings.\n\nDetails: {error_msg}" + return f"Error: Failed to list repositories. {error_msg}" except Exception as e: - return f"Error: {type(e).__name__}: {e}" + return f"Error: Unexpected failure during repository listing: {type(e).__name__}: {e}" async def get_repo( self, @@ -230,19 +271,21 @@ class Tools: __user__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: - """Get detailed repository information. - - :param repo: Repository in owner/repo format + """ + Get detailed repository information and metadata. + + :param repo: Repository in 'owner/repo' format (e.g., 'open-webui/open-webui') + :return: Formatted repository details including stats, URLs, and configuration """ token = self._get_token(__user__) if not token: - return "Error: GITEA_TOKEN not configured" - + return "Error: GITEA_TOKEN not configured in UserValves settings." + try: owner, repo_name = self._resolve_repo(repo, __user__) except ValueError as e: - return f"Error: {e}" - + return f"Error: Invalid repository format. {e}" + if __event_emitter__: await __event_emitter__( { @@ -253,7 +296,7 @@ class Tools: }, } ) - + try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL @@ -264,26 +307,30 @@ class Tools: ) response.raise_for_status() repo_info = response.json() - + output = f"# Repository: {owner}/{repo_name}\n\n" - output += f"**Full Name:** {repo_info.get('full_name', 'N/A')}\n" + output += f"**Full Name:** `{repo_info.get('full_name', 'N/A')}`\n" output += ( f"**Description:** {repo_info.get('description', 'No description')}\n" ) - output += f"**Private:** {'Yes' if repo_info.get('private') else 'No'}\n" + output += ( + f"**Private:** {'Yes 🔒' if repo_info.get('private') else 'No 🌐'}\n" + ) output += f"**Fork:** {'Yes' if repo_info.get('fork') else 'No'}\n" - output += f"**Stars:** {repo_info.get('stars_count', 0)}\n" - output += f"**Forks:** {repo_info.get('forks_count', 0)}\n" - output += f"**Watchers:** {repo_info.get('watchers_count', 0)}\n" - output += f"**Open Issues:** {repo_info.get('open_issues_count', 0)}\n" + output += f"**Stars:** ⭐ {repo_info.get('stars_count', 0)}\n" + output += f"**Forks:** 🍴 {repo_info.get('forks_count', 0)}\n" + output += f"**Watchers:** 👁️ {repo_info.get('watchers_count', 0)}\n" + output += f"**Open Issues:** 📋 {repo_info.get('open_issues_count', 0)}\n" output += f"**Size:** {repo_info.get('size', 0)} KB\n" output += f"**Language:** {repo_info.get('language', 'N/A')}\n" - output += f"**Default Branch:** {repo_info.get('default_branch', 'main')}\n" + output += ( + f"**Default Branch:** `{repo_info.get('default_branch', 'main')}`\n" + ) output += f"**Created:** {repo_info.get('created_at', 'N/A')[:10]}\n" output += f"**Updated:** {repo_info.get('updated_at', 'N/A')[:10]}\n" output += f"**HTML URL:** {repo_info.get('html_url', 'N/A')}\n" - output += f"**Clone URL:** {repo_info.get('clone_url', 'N/A')}\n" - + output += f"**Clone URL:** `{repo_info.get('clone_url', 'N/A')}`\n" + if __event_emitter__: await __event_emitter__( { @@ -291,80 +338,16 @@ class Tools: "data": {"description": "Done", "done": True, "hidden": True}, } ) - return output - except httpx.HTTPStatusError as e: - if e.response.status_code == 404: - return f"Repository '{owner}/{repo_name}' not found" - return self._format_error(e) - except Exception as e: - return f"Error: {type(e).__name__}: {e}" - async def list_branches( - self, - repo: Optional[str] = None, - page: int = 1, - limit: Optional[int] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """List branches in a repository.""" - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured" - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - page_size = self._get_page_size(limit) - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": f"Fetching branches for {owner}/{repo_name} (page {page})...", - "done": False, - }, - } - ) - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/branches"), - headers=self._headers(__user__), - params={"page": page, "limit": page_size}, - ) - response.raise_for_status() - branches = response.json() - total_count = response.headers.get("x-total-count", "?") - output = f"Branches in {owner}/{repo_name} (Page {page}):\n\n" - for branch in branches: - name = branch.get("name", "") - protected = "🛡️" if branch.get("protected") else "" - commit_sha = branch.get("commit", {}).get("id", "")[:8] - output += f"- {name} {protected} [{commit_sha}]\n" - output += f"\nShowing {len(branches)} branches (Total: {total_count})" - if len(branches) == page_size: - output += ( - f"\nMore results may exist. Use page={page + 1} to continue." - ) - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": "Done", - "done": True, - "hidden": True, - }, - } - ) - return output + return output + except httpx.HTTPStatusError as e: - return self._format_error(e) + error_msg = self._format_error(e, f"repository {owner}/{repo_name}") + if e.response.status_code == 404: + return f"Error: Repository '{owner}/{repo_name}' not found. Please verify the repository name and your access permissions." + return f"Error: Failed to fetch repository details. {error_msg}" except Exception as e: - return f"Error: {type(e).__name__}: {e}" + return f"Error: Unexpected failure during repository fetch: {type(e).__name__}: {e}" async def list_files( self, @@ -374,15 +357,25 @@ class Tools: __user__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: - """List files and directories in a repository path.""" + """ + List files and directories in a repository path. + + :param path: Directory path to list (default: root) + :param repo: Repository in 'owner/repo' format + :param branch: Branch name (defaults to repository default) + :return: Formatted directory listing with file sizes and types + """ token = self._get_token(__user__) if not token: - return "Error: GITEA_TOKEN not configured" + return "Error: GITEA_TOKEN not configured in UserValves settings." + try: owner, repo_name = self._resolve_repo(repo, __user__) except ValueError as e: return f"Error: {e}" + effective_branch = self._get_branch(branch, __user__) + if __event_emitter__: await __event_emitter__( { @@ -393,6 +386,7 @@ class Tools: }, } ) + try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL @@ -404,36 +398,55 @@ class Tools: ) response.raise_for_status() contents = response.json() - if isinstance(contents, dict): - return f"'{path}' is a file, not a directory. Use get_file() to read it." - output = f"Contents of {owner}/{repo_name}/{path or '.'} ({effective_branch}):\n\n" - dirs = [c for c in contents if c.get("type") == "dir"] - files = [c for c in contents if c.get("type") == "file"] - for item in sorted(dirs, key=lambda x: x.get("name", "")): - output += f"📁 {item.get('name', '')}/\n" - for item in sorted(files, key=lambda x: x.get("name", "")): + + if isinstance(contents, dict): + return f"**Note:** '{path}' is a file. Use `get_file()` to read its contents." + + output = f"**Contents of {owner}/{repo_name}/{path or '.'}** (`{effective_branch}` branch)\n\n" + + dirs = [item for item in contents if item.get("type") == "dir"] + files = [item for item in contents if item.get("type") == "file"] + + if dirs: + output += "**📁 Directories:**\n" + for item in sorted(dirs, key=lambda x: x.get("name", "").lower()): + output += f"- `📁 {item.get('name', '')}/`\n" + output += "\n" + + if files: + output += "**📄 Files:**\n" + for item in sorted(files, key=lambda x: x.get("name", "").lower()): size = item.get("size", 0) - size_str = f"{size}B" if size < 1024 else f"{size//1024}KB" - output += f"📄 {item.get('name', '')} ({size_str})\n" - output += f"\nTotal: {len(dirs)} directories, {len(files)} files" - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": "Done", - "done": True, - "hidden": True, - }, - } + if size < 1024: + size_str = f"{size}B" + elif size < 1024 * 1024: + size_str = f"{size//1024}KB" + else: + size_str = f"{size//(1024*1024)}MB" + sha_short = item.get("sha", "unknown")[:8] + output += ( + f"- `{item.get('name', '')}` ({size_str}, sha: {sha_short})\n" ) - return output + + output += f"\n**Total:** {len(dirs)} directories, {len(files)} files" + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + return output + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, f"directory listing for '{path}'") if e.response.status_code == 404: - return f"Path not found: {path}" - return self._format_error(e) + return f"Error: Path not found: `{path}`. Verify the path exists in the repository." + return f"Error: Failed to list directory contents. {error_msg}" except Exception as e: - return f"Error: {type(e).__name__}: {e}" + return f"Error: Unexpected failure during directory listing: {type(e).__name__}: {e}" async def get_file( self, @@ -443,22 +456,33 @@ class Tools: __user__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: - """Get contents of a file from the repository.""" + """ + Get the contents of a file from the repository. + + :param path: Full path to the file (e.g., 'src/main.py') + :param repo: Repository in 'owner/repo' format + :param branch: Branch name (defaults to repository default) + :return: File content with metadata (SHA, size, branch) + """ token = self._get_token(__user__) if not token: - return "Error: GITEA_TOKEN not configured" + return "Error: GITEA_TOKEN not configured in UserValves settings." + try: owner, repo_name = self._resolve_repo(repo, __user__) except ValueError as e: return f"Error: {e}" + effective_branch = self._get_branch(branch, __user__) + if __event_emitter__: await __event_emitter__( { "type": "status", - "data": {"description": f"Reading {path}...", "done": False}, + "data": {"description": f"Reading file {path}...", "done": False}, } ) + try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL @@ -470,33 +494,160 @@ class Tools: ) response.raise_for_status() file_info = response.json() - if isinstance(file_info, list): - return f"'{path}' is a directory, not a file. Use list_files() to browse." - if file_info.get("type") != "file": - return f"'{path}' is not a file (type: {file_info.get('type')})" - content_b64 = file_info.get("content", "") - try: - content = base64.b64decode(content_b64).decode("utf-8") - except Exception: - return "Error: Could not decode file content (may be binary)" - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": "Done", - "done": True, - "hidden": True, - }, - } - ) - return f"File: {owner}/{repo_name}/{path} ({effective_branch})\nSHA: {file_info.get('sha', 'unknown')[:8]}\n\n```\n{content}\n```" + + if isinstance(file_info, list): + return f"**Note:** '{path}' is a directory. Use `list_files()` to browse its contents." + + if file_info.get("type") != "file": + return f"Error: '{path}' is not a file (type: {file_info.get('type')})" + + content_b64 = file_info.get("content", "") + try: + content = base64.b64decode(content_b64).decode("utf-8") + except Exception: + return "Error: Could not decode file content. The file may be binary or corrupted." + + size = file_info.get("size", 0) + if size < 1024: + size_str = f"{size}B" + elif size < 1024 * 1024: + size_str = f"{size//1024}KB" + else: + size_str = f"{size//(1024*1024)}MB" + + sha_short = file_info.get("sha", "unknown")[:8] + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + output = f"**File:** `{owner}/{repo_name}/{path}`\n" + output += f"**Branch:** `{effective_branch}` | **SHA:** `{sha_short}` | **Size:** {size_str}\n" + output += f"**URL:** {file_info.get('html_url', 'N/A')}\n\n" + output += f"```\n{content}\n```" + + return output + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, f"file fetch for '{path}'") if e.response.status_code == 404: - return f"File not found: {path}" - return self._format_error(e) + return ( + f"Error: File not found: `{path}`. Verify the file path and branch." + ) + return f"Error: Failed to fetch file. {error_msg}" except Exception as e: - return f"Error: {type(e).__name__}: {e}" + return ( + f"Error: Unexpected failure during file fetch: {type(e).__name__}: {e}" + ) + + async def update_file( + self, + path: str, + content: str, + message: str, + repo: Optional[str] = None, + branch: Optional[str] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Update an existing file in the repository (creates commit). + + :param path: File path to update + :param content: New file content as string + :param message: Commit message + :param repo: Repository in 'owner/repo' format + :param branch: Branch name (defaults to repository default) + :return: Commit details and success confirmation + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured in UserValves settings." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + effective_branch = self._get_branch(branch, __user__) + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": f"Updating {path}...", "done": False}, + } + ) + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + # Get current file SHA + get_response = await client.get( + self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), + headers=self._headers(__user__), + params={"ref": effective_branch}, + ) + + if get_response.status_code == 404: + return f"Error: File not found: `{path}`. Use `create_file()` to create a new file." + + get_response.raise_for_status() + file_info = get_response.json() + sha = file_info.get("sha") + + if not sha: + return "Error: Could not retrieve file SHA for update. The file may be corrupted or inaccessible." + + # Prepare updated content + content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii") + + # Update file + response = await client.put( + self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), + headers=self._headers(__user__), + json={ + "content": content_b64, + "message": message, + "branch": effective_branch, + "sha": sha, + }, + ) + response.raise_for_status() + result = response.json() + + commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + output = f"**File Updated Successfully**\n\n" + output += f"**File:** `{owner}/{repo_name}/{path}`\n" + output += f"**Branch:** `{effective_branch}`\n" + output += f"**Commit:** `{commit_sha}`\n" + output += f"**Message:** {message}\n" + + return output + + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, f"file update for '{path}'") + if e.response.status_code == 409: + return f"Error: Update conflict for `{path}`. The file may have been modified by another process. Fetch the latest version and try again." + return f"Error: Failed to update file. {error_msg}" + except Exception as e: + return ( + f"Error: Unexpected failure during file update: {type(e).__name__}: {e}" + ) async def create_file( self, @@ -508,15 +659,27 @@ class Tools: __user__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: - """Create a new file in the repository.""" + """ + Create a new file in the repository. + + :param path: File path to create (e.g., 'docs/README.md') + :param content: Initial file content as string + :param message: Commit message + :param repo: Repository in 'owner/repo' format + :param branch: Branch name (defaults to repository default) + :return: Commit details and success confirmation + """ token = self._get_token(__user__) if not token: - return "Error: GITEA_TOKEN not configured" + return "Error: GITEA_TOKEN not configured in UserValves settings." + try: owner, repo_name = self._resolve_repo(repo, __user__) except ValueError as e: return f"Error: {e}" + effective_branch = self._get_branch(branch, __user__) + if __event_emitter__: await __event_emitter__( { @@ -524,8 +687,10 @@ class Tools: "data": {"description": f"Creating {path}...", "done": False}, } ) + try: content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii") + async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: @@ -540,192 +705,265 @@ class Tools: ) response.raise_for_status() result = response.json() - commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": "Done", - "done": True, - "hidden": True, - }, - } - ) - return f"Created {path} on {effective_branch}\nCommit: {commit_sha}\nMessage: {message}" + + commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + output = f"**File Created Successfully**\n\n" + output += f"**File:** `{owner}/{repo_name}/{path}`\n" + output += f"**Branch:** `{effective_branch}`\n" + output += f"**Commit:** `{commit_sha}`\n" + output += f"**Message:** {message}\n" + + return output + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, f"file creation for '{path}'") if e.response.status_code == 422: - return f"File already exists: {path}. Use update_file() instead." - return self._format_error(e) + return f"Error: File already exists: `{path}`. Use `update_file()` to modify it instead." + return f"Error: Failed to create file. {error_msg}" except Exception as e: - return f"Error: {type(e).__name__}: {e}" + return f"Error: Unexpected failure during file creation: {type(e).__name__}: {e}" - async def update_file( + async def get_project_context( self, - path: str, - content: str, - message: str, repo: Optional[str] = None, - branch: Optional[str] = None, __user__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: - """Update an existing file in the repository.""" + """ + Get comprehensive project context including README, structure, and recent commits. + + :param repo: Repository in 'owner/repo' format + :return: Structured project overview with README, file structure, and commit history + """ token = self._get_token(__user__) if not token: - return "Error: GITEA_TOKEN not configured" + return "Error: GITEA_TOKEN not configured in UserValves settings." + try: owner, repo_name = self._resolve_repo(repo, __user__) except ValueError as e: return f"Error: {e}" - effective_branch = self._get_branch(branch, __user__) + + branch = self._get_branch(None, __user__) + if __event_emitter__: await __event_emitter__( { "type": "status", - "data": {"description": f"Updating {path}...", "done": False}, - } - ) - try: - # FIXED: Reuse single client instance - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - get_response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), - headers=self._headers(__user__), - params={"ref": effective_branch}, - ) - if get_response.status_code == 404: - return f"File not found: {path}. Use create_file() instead." - get_response.raise_for_status() - file_info = get_response.json() - sha = file_info.get("sha") - if not sha: - return "Error: Could not get file SHA for update" - content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii") - - # FIXED: Using same client instance - response = await client.put( - self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), - headers=self._headers(__user__), - json={ - "content": content_b64, - "message": message, - "branch": effective_branch, - "sha": sha, - }, - ) - response.raise_for_status() - result = response.json() - commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": "Done", - "done": True, - "hidden": True, - }, - } - ) - return f"Updated {path} on {effective_branch}\nCommit: {commit_sha}\nMessage: {message}" - except httpx.HTTPStatusError as e: - return self._format_error(e) - except Exception as e: - return f"Error: {type(e).__name__}: {e}" - - async def delete_file( - self, - path: str, - message: str, - repo: Optional[str] = None, - branch: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - __event_call__: Callable[[dict], Any] = None, - ) -> str: - """Delete a file from the repository.""" - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured" - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - effective_branch = self._get_branch(branch, __user__) - # Confirm deletion - if __event_call__: - result = await __event_call__( - { - "type": "confirmation", "data": { - "title": "Delete File?", - "message": f"Delete {path} from {owner}/{repo_name} ({effective_branch})?", + "description": f"Loading project context for {owner}/{repo_name}...", + "done": False, }, } ) - if result is None or result is False: - return "delete_file CANCELLED" - if isinstance(result, dict) and not result.get("confirmed"): - return "delete_file CANCELLED" + + output = f"# Project Context: {owner}/{repo_name}\n\n" + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + # Get repository metadata + repo_response = await client.get( + self._api_url(f"/repos/{owner}/{repo_name}"), + headers=self._headers(__user__), + ) + if repo_response.status_code == 200: + repo_info = repo_response.json() + output += f"**Description:** {repo_info.get('description', 'No description')}\n" + output += f"**Default Branch:** `{repo_info.get('default_branch', 'main')}`\n" + output += f"**Language:** {repo_info.get('language', 'Unknown')}\n" + output += f"**Stars:** ⭐ {repo_info.get('stars_count', 0)} | " + output += f"**Forks:** 🍴 {repo_info.get('forks_count', 0)}\n\n" + + # Get root directory structure + root_response = await client.get( + self._api_url(f"/repos/{owner}/{repo_name}/contents/"), + headers=self._headers(__user__), + params={"ref": branch}, + ) + if root_response.status_code == 200: + contents = root_response.json() + output += "## 📁 Repository Structure\n\n" + + dirs = [item for item in contents if item.get("type") == "dir"] + files = [item for item in contents if item.get("type") == "file"] + + if dirs: + output += "**Directories:**\n" + for item in sorted( + dirs, key=lambda x: x.get("name", "").lower() + ): + output += f"- `📁 {item.get('name', '')}/`\n" + output += "\n" + + if files: + output += "**Files:**\n" + for item in sorted( + files, key=lambda x: x.get("name", "").lower() + ): + size = item.get("size", 0) + if size < 1024: + size_str = f"{size}B" + else: + size_str = f"{size//1024}KB" + output += f"- `📄 {item.get('name', '')}` ({size_str})\n" + output += "\n" + + # Try to find and include README + readme_found = False + for readme_name in ["README.md", "readme.md", "README", "README.txt"]: + readme_response = await client.get( + self._api_url( + f"/repos/{owner}/{repo_name}/contents/{readme_name}" + ), + headers=self._headers(__user__), + params={"ref": branch}, + ) + if readme_response.status_code == 200: + readme_info = readme_response.json() + content_b64 = readme_info.get("content", "") + try: + readme_content = base64.b64decode(content_b64).decode( + "utf-8" + ) + # Truncate very long READMEs + if len(readme_content) > 2000: + readme_content = ( + readme_content[:2000] + "\n\n... (truncated)" + ) + output += "## 📄 README\n\n" + output += f"```markdown\n{readme_content}\n```\n\n" + readme_found = True + except Exception: + pass + break + + if not readme_found: + output += "## 📄 README\n\n" + output += "*No README file found in repository root.*\n\n" + + # Get recent commits + commits_response = await client.get( + self._api_url(f"/repos/{owner}/{repo_name}/commits"), + headers=self._headers(__user__), + params={"sha": branch, "limit": 5}, + ) + if commits_response.status_code == 200: + commits = commits_response.json() + output += "## 📝 Recent Commits\n\n" + for commit in commits: + sha = commit.get("sha", "")[:8] + commit_data = commit.get("commit", {}) + msg = commit_data.get("message", "").split("\n")[0][:60] + author = commit_data.get("author", {}).get("name", "unknown") + date = commit_data.get("author", {}).get("date", "")[:10] + output += f"- `{sha}` {msg}\n" + output += f" by {author} on {date}\n\n" + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + return output.strip() + + except Exception as e: + return f"Error: Failed to load project context: {type(e).__name__}: {e}" + + async def list_branches( + self, + repo: Optional[str] = None, + page: int = 1, + limit: Optional[int] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + List branches in a repository with pagination. + + :param repo: Repository in 'owner/repo' format + :param page: Page number for pagination (default: 1) + :param limit: Results per page (max 50, default: 50) + :return: Formatted branch listing with protection status and commit SHAs + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured in UserValves settings." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + page_size = self._get_page_size(limit) + if __event_emitter__: await __event_emitter__( { "type": "status", - "data": {"description": f"Deleting {path}...", "done": False}, + "data": { + "description": f"Fetching branches for {owner}/{repo_name} (page {page})...", + "done": False, + }, } ) + try: - # FIXED: Reuse single client instance async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: - get_response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), + response = await client.get( + self._api_url(f"/repos/{owner}/{repo_name}/branches"), headers=self._headers(__user__), - params={"ref": effective_branch}, - ) - if get_response.status_code == 404: - return f"File not found: {path}" - get_response.raise_for_status() - file_info = get_response.json() - sha = file_info.get("sha") - if not sha: - return "Error: Could not get file SHA for delete" - - # FIXED: Using same client instance - response = await client.delete( - self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), - headers=self._headers(__user__), - json={ - "message": message, - "branch": effective_branch, - "sha": sha, - }, + params={"page": page, "limit": page_size}, ) response.raise_for_status() - result = response.json() - commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": "Done", - "done": True, - "hidden": True, - }, - } - ) - return f"Deleted {path} on {effective_branch}\nCommit: {commit_sha}\nMessage: {message}" - except httpx.HTTPStatusError as e: - return self._format_error(e) - except Exception as e: - return f"Error: {type(e).__name__}: {e}" + branches = response.json() + + total_count = response.headers.get("x-total-count", "?") + + output = f"**Branches in {owner}/{repo_name} (Page {page})**\n\n" + for branch in branches: + name = branch.get("name", "") + protected = "🛡️" if branch.get("protected") else "" + commit_sha = branch.get("commit", {}).get("id", "")[:8] + output += f"- `{name}` {protected} [commit: {commit_sha}]\n" + + output += f"\n**Showing {len(branches)} branches (Total: {total_count})**" + if len(branches) == page_size: + output += ( + f"\n_More results available. Use page={page + 1} to continue._" + ) + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + return output.strip() + + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, "branch listing") + return f"Error: Failed to list branches. {error_msg}" + except Exception as e: + return f"Error: Unexpected failure during branch listing: {type(e).__name__}: {e}" - # BRANCHING & PR (WITH PAGINATION) async def create_branch( self, branch_name: str, @@ -734,15 +972,25 @@ class Tools: __user__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: - """Create a new branch.""" + """ + Create a new branch from an existing branch. + + :param branch_name: Name for the new branch + :param from_branch: Source branch (defaults to repository default branch) + :param repo: Repository in 'owner/repo' format + :return: Success confirmation with branch details + """ token = self._get_token(__user__) if not token: - return "Error: GITEA_TOKEN not configured" + return "Error: GITEA_TOKEN not configured in UserValves settings." + try: owner, repo_name = self._resolve_repo(repo, __user__) except ValueError as e: return f"Error: {e}" + effective_from = from_branch or self._get_branch(None, __user__) + if __event_emitter__: await __event_emitter__( { @@ -753,6 +1001,7 @@ class Tools: }, } ) + try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL @@ -766,24 +1015,531 @@ class Tools: }, ) response.raise_for_status() - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": "Done", - "done": True, - "hidden": True, - }, - } - ) - return f"Created branch '{branch_name}' from '{effective_from}' in {owner}/{repo_name}" + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + return f"✅ Created branch `{branch_name}` from `{effective_from}` in `{owner}/{repo_name}`" + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, f"branch creation") if e.response.status_code == 409: - return f"Branch '{branch_name}' already exists" - return self._format_error(e) + return f"Error: Branch `{branch_name}` already exists in repository." + return f"Error: Failed to create branch. {error_msg}" except Exception as e: - return f"Error: {type(e).__name__}: {e}" + return f"Error: Unexpected failure during branch creation: {type(e).__name__}: {e}" + + async def list_commits( + self, + repo: Optional[str] = None, + branch: Optional[str] = None, + page: int = 1, + limit: Optional[int] = None, + path: Optional[str] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + List commits in a repository with filtering and pagination. + + :param repo: Repository in 'owner/repo' format + :param branch: Branch name (defaults to repository default) + :param page: Page number for pagination (default: 1) + :param limit: Results per page (max 50, default: 50) + :param path: Filter commits to specific file/directory path + :return: Formatted commit history with author, date, and message + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured in UserValves settings." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + effective_branch = self._get_branch(branch, __user__) + page_size = self._get_page_size(limit) + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Fetching commits...", "done": False}, + } + ) + + try: + params = { + "sha": effective_branch, + "page": page, + "limit": page_size, + } + if path: + params["path"] = path + + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + response = await client.get( + self._api_url(f"/repos/{owner}/{repo_name}/commits"), + headers=self._headers(__user__), + params=params, + ) + response.raise_for_status() + commits = response.json() + + total_count = response.headers.get("x-total-count", "?") + + output = f"**Commits in {owner}/{repo_name}** (`{effective_branch}`" + if path: + output += f", filtered to: `{path}`" + output += f", page {page})\n\n" + + for commit in commits: + sha = commit.get("sha", "")[:8] + commit_data = commit.get("commit", {}) + msg = commit_data.get("message", "").split("\n")[0][:60] + author = commit_data.get("author", {}).get("name", "unknown") + date = commit_data.get("author", {}).get("date", "")[:10] + output += f"- `{sha}` {msg}\n" + output += f" by {author} on {date}\n\n" + + output += f"**Showing {len(commits)} commits (Total: {total_count})**" + if len(commits) == page_size: + output += ( + f"\n_More results available. Use page={page + 1} to continue._" + ) + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + return output.strip() + + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, "commit listing") + return f"Error: Failed to list commits. {error_msg}" + except Exception as e: + return f"Error: Unexpected failure during commit listing: {type(e).__name__}: {e}" + + async def get_commit( + self, + commit_sha: str, + repo: Optional[str] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Get detailed information about a specific commit. + + :param commit_sha: Full or short commit SHA + :param repo: Repository in 'owner/repo' format + :return: Comprehensive commit details including files changed and diff stats + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured in UserValves settings." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Fetching commit {commit_sha[:8]}...", + "done": False, + }, + } + ) + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + # Get commit details (this endpoint may vary by Gitea version) + response = await client.get( + self._api_url( + f"/repos/{owner}/{repo_name}/git/commits/{commit_sha}" + ), + headers=self._headers(__user__), + ) + + if response.status_code == 404: + # Try alternative endpoint + response = await client.get( + self._api_url( + f"/repos/{owner}/{repo_name}/commits/{commit_sha}" + ), + headers=self._headers(__user__), + ) + + response.raise_for_status() + commit = response.json() + + output = f"# Commit: {commit.get('sha', '')[:8]}\n\n" + + # Basic commit info + output += f"**Full SHA:** `{commit.get('sha', 'N/A')}`\n" + output += f"**Message:** {commit.get('message', 'No message')}\n\n" + + # Author info + author = commit.get("author", {}) + if author: + output += f"**Author:** {author.get('name', 'unknown')} <{author.get('email', '')}>\n" + output += f"**Date:** {author.get('date', 'N/A')}\n\n" + + # Committer info (if different from author) + committer = commit.get("committer", {}) + if committer and committer.get("name") != author.get("name"): + output += f"**Committer:** {committer.get('name', 'unknown')} <{committer.get('email', '')}>\n\n" + + # Files changed (if available) + if "files" in commit: + files = commit.get("files", []) + output += f"**Files Changed:** {len(files)}\n\n" + for file in files[:20]: # Limit to first 20 + status = file.get("status", "") + filename = file.get("filename", "") + additions = file.get("additions", 0) + deletions = file.get("deletions", 0) + changes = file.get("changes", 0) + output += f"- **{status}:** `{filename}`" + if changes > 0: + output += f" (+{additions}/-{deletions})\n" + else: + output += "\n" + if len(files) > 20: + output += f"\n... and {len(files) - 20} more files\n" + + elif "stats" in commit: + # Alternative stats format + stats = commit.get("stats", {}) + output += f"**Changes:** +{stats.get('additions', 0)} additions, -{stats.get('deletions', 0)} deletions\n\n" + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + return output.strip() + + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, f"commit fetch for '{commit_sha}'") + if e.response.status_code == 404: + return f"Error: Commit '{commit_sha}' not found in repository." + return f"Error: Failed to fetch commit details. {error_msg}" + except Exception as e: + return f"Error: Unexpected failure during commit fetch: {type(e).__name__}: {e}" + + async def list_issues( + self, + state: str = "open", + repo: Optional[str] = None, + page: int = 1, + limit: Optional[int] = None, + labels: Optional[str] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + List issues in a repository with filtering and pagination. + + :param state: Issue state - 'open', 'closed', or 'all' (default: 'open') + :param repo: Repository in 'owner/repo' format + :param page: Page number for pagination (default: 1) + :param limit: Results per page (max 50, default: 50) + :param labels: Comma-separated label names to filter by + :return: Formatted issue listing with metadata and assignees + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured in UserValves settings." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + page_size = self._get_page_size(limit) + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Fetching issues...", "done": False}, + } + ) + + try: + params = { + "state": state, + "type": "issues", # Filter out PRs + "page": page, + "limit": page_size, + } + if labels: + params["labels"] = labels + + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + response = await client.get( + self._api_url(f"/repos/{owner}/{repo_name}/issues"), + headers=self._headers(__user__), + params=params, + ) + response.raise_for_status() + issues = response.json() + + total_count = response.headers.get("x-total-count", "?") + + output = f"**Issues in {owner}/{repo_name}** (state: `{state}`, page: {page})\n\n" + + if not issues: + output += "_No issues found._" + else: + for issue in issues: + number = issue.get("number") + title = issue.get("title", "")[:80] + user = issue.get("user", {}).get("login", "unknown") + + # Labels + issue_labels = [ + label.get("name", "") for label in issue.get("labels", []) + ] + labels_str = ( + f" [{' , '.join(issue_labels)}]" if issue_labels else "" + ) + + # Assignee + assignee = issue.get("assignee") + assignee_str = ( + f" → @{assignee.get('login', '')}" if assignee else "" + ) + + output += f"- **#{number}:** {title}{labels_str} (by @{user}){assignee_str}\n\n" + + output += f"\n**Showing {len(issues)} issues (Total: {total_count})**" + if len(issues) == page_size: + output += ( + f"\n_More results available. Use page={page + 1} to continue._" + ) + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + return output.strip() + + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, "issue listing") + return f"Error: Failed to list issues. {error_msg}" + except Exception as e: + return f"Error: Unexpected failure during issue listing: {type(e).__name__}: {e}" + + async def create_issue( + self, + title: str, + body: str = "", + repo: Optional[str] = None, + labels: Optional[List[str]] = None, + assignee: Optional[str] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Create a new issue in the repository. + + :param title: Issue title + :param body: Issue description/body + :param repo: Repository in 'owner/repo' format + :param labels: List of label names to apply + :param assignee: Username to assign the issue to + :return: Confirmation with issue number and URL + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured in UserValves settings." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Creating issue...", "done": False}, + } + ) + + try: + payload = { + "title": title, + "body": body, + } + if labels: + payload["labels"] = labels + if assignee: + payload["assignee"] = assignee + + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + response = await client.post( + self._api_url(f"/repos/{owner}/{repo_name}/issues"), + headers=self._headers(__user__), + json=payload, + ) + response.raise_for_status() + issue = response.json() + + issue_number = issue.get("number") + issue_url = issue.get("html_url", "") + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + output = f"**Issue Created Successfully**\n\n" + output += f"**Issue #{issue_number}:** {title}\n" + output += f"**URL:** {issue_url}\n" + if labels: + output += f"**Labels:** {', '.join(labels)}\n" + if assignee: + output += f"**Assigned to:** @{assignee}\n" + + return output.strip() + + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, f"issue creation") + return f"Error: Failed to create issue. {error_msg}" + except Exception as e: + return f"Error: Unexpected failure during issue creation: {type(e).__name__}: {e}" + + async def list_pull_requests( + self, + state: str = "open", + repo: Optional[str] = None, + page: int = 1, + limit: Optional[int] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + List pull requests in a repository with pagination. + + :param state: PR state - 'open', 'closed', or 'all' (default: 'open') + :param repo: Repository in 'owner/repo' format + :param page: Page number for pagination (default: 1) + :param limit: Results per page (max 50, default: 50) + :return: Formatted PR listing with merge status and branch information + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured in UserValves settings." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + page_size = self._get_page_size(limit) + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Fetching pull requests...", "done": False}, + } + ) + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + response = await client.get( + self._api_url(f"/repos/{owner}/{repo_name}/pulls"), + headers=self._headers(__user__), + params={"state": state, "page": page, "limit": page_size}, + ) + response.raise_for_status() + prs = response.json() + + total_count = response.headers.get("x-total-count", "?") + + output = f"**Pull Requests in {owner}/{repo_name}** (state: `{state}`, page: {page})\n\n" + + if not prs: + output += "_No pull requests found._" + else: + for pr in prs: + number = pr.get("number") + title = pr.get("title", "")[:80] + user = pr.get("user", {}).get("login", "unknown") + head = pr.get("head", {}).get("ref", "") + base = pr.get("base", {}).get("ref", "") + + # Merge status + mergeable = pr.get("mergeable", None) + merge_status = "" + if mergeable is True: + merge_status = " ✅ mergeable" + elif mergeable is False: + merge_status = " ⚠️ conflicts" + + output += f"### PR #{number}: {title}\n" + output += f"**Branch:** `{head}` → `{base}` (by @{user}){merge_status}\n\n" + + output += f"\n**Showing {len(prs)} PRs (Total: {total_count})**" + if len(prs) == page_size: + output += ( + f"\n_More results available. Use page={page + 1} to continue._" + ) + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + return output.strip() + + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, "pull request listing") + return f"Error: Failed to list pull requests. {error_msg}" + except Exception as e: + return ( + f"Error: Unexpected failure during PR listing: {type(e).__name__}: {e}" + ) async def create_pull_request( self, @@ -795,15 +1551,27 @@ class Tools: __user__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: - """Create a pull request.""" + """ + Create a new pull request. + + :param title: PR title + :param head_branch: Source branch (your changes) + :param base_branch: Target branch (defaults to repository default) + :param body: PR description + :param repo: Repository in 'owner/repo' format + :return: Confirmation with PR number and URL + """ token = self._get_token(__user__) if not token: - return "Error: GITEA_TOKEN not configured" + return "Error: GITEA_TOKEN not configured in UserValves settings." + try: owner, repo_name = self._resolve_repo(repo, __user__) except ValueError as e: return f"Error: {e}" + effective_base = base_branch or self._get_branch(None, __user__) + if __event_emitter__: await __event_emitter__( { @@ -811,6 +1579,7 @@ class Tools: "data": {"description": "Creating pull request...", "done": False}, } ) + try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL @@ -827,886 +1596,10 @@ class Tools: ) response.raise_for_status() pr = response.json() - pr_number = pr.get("number") - pr_url = pr.get("html_url", "") - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": "Done", - "done": True, - "hidden": True, - }, - } - ) - return f"Created PR #{pr_number}: {title}\n{head_branch} → {effective_base}\nURL: {pr_url}" - except httpx.HTTPStatusError as e: - return self._format_error(e) - except Exception as e: - return f"Error: {type(e).__name__}: {e}" - async def update_pull_request( - self, - pr_number: int, - repo: Optional[str] = None, - title: Optional[str] = None, - body: Optional[str] = None, - base_branch: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """Update an existing pull request. - - :param pr_number: Pull request number - :param repo: Repository in owner/repo format - :param title: New title - :param body: New body/description - :param base_branch: New base branch - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured" - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": f"Updating PR #{pr_number}...", - "done": False, - }, - } - ) - - payload = {} - if title is not None: - payload["title"] = title - if body is not None: - payload["body"] = body - if base_branch is not None: - payload["base"] = base_branch - - if not payload: - return "Error: No fields to update. Provide at least one of: title, body, base_branch" - - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.patch( - self._api_url(f"/repos/{owner}/{repo_name}/pulls/{pr_number}"), - headers=self._headers(__user__), - json=payload, - ) - response.raise_for_status() - pr = response.json() - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Done", "done": True, "hidden": True}, - } - ) - return f"Updated PR #{pr_number}: {pr.get('title', '')}\nURL: {pr.get('html_url', '')}" - except httpx.HTTPStatusError as e: - if e.response.status_code == 404: - return f"PR #{pr_number} not found" - return self._format_error(e) - except Exception as e: - return f"Error: {type(e).__name__}: {e}" + pr_number = pr.get("number") + pr_url = pr.get("html_url", "") - async def merge_pull_request( - self, - pr_number: int, - repo: Optional[str] = None, - merge_method: str = "merge", - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - __event_call__: Callable[[dict], Any] = None, - ) -> str: - """Merge a pull request. - - :param pr_number: Pull request number - :param repo: Repository in owner/repo format - :param merge_method: merge, rebase, or squash - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured" - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - valid_methods = ["merge", "rebase", "squash"] - if merge_method not in valid_methods: - return f"Error: Invalid merge_method '{merge_method}'. Use: {', '.join(valid_methods)}" - - if __event_call__: - result = await __event_call__( - { - "type": "confirmation", - "data": { - "title": "Merge Pull Request?", - "message": f"Merge PR #{pr_number} in {owner}/{repo_name} using {merge_method} method?", - }, - } - ) - if result is None or result is False: - return "merge_pull_request CANCELLED" - if isinstance(result, dict) and not result.get("confirmed"): - return "merge_pull_request CANCELLED" - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": f"Merging PR #{pr_number}...", - "done": False, - }, - } - ) - - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.post( - self._api_url(f"/repos/{owner}/{repo_name}/pulls/{pr_number}/merge"), - headers=self._headers(__user__), - json={"Do": merge_method}, - ) - response.raise_for_status() - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Done", "done": True, "hidden": True}, - } - ) - return f"Successfully merged PR #{pr_number} using {merge_method} method" - except httpx.HTTPStatusError as e: - if e.response.status_code == 405: - return f"PR #{pr_number} cannot be merged (may have conflicts or be already merged)" - if e.response.status_code == 404: - return f"PR #{pr_number} not found" - return self._format_error(e) - except Exception as e: - return f"Error: {type(e).__name__}: {e}" - - async def list_pull_requests( - self, - state: str = "open", - repo: Optional[str] = None, - page: int = 1, - limit: Optional[int] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """List pull requests.""" - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured" - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - page_size = self._get_page_size(limit) - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Fetching pull requests...", "done": False}, - } - ) - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/pulls"), - headers=self._headers(__user__), - params={"state": state, "page": page, "limit": page_size}, - ) - response.raise_for_status() - prs = response.json() - total_count = response.headers.get("x-total-count", "?") - output = ( - f"Pull Requests in {owner}/{repo_name} ({state}, page {page}):\n\n" - ) - if not prs: - output += "No pull requests found." - else: - for pr in prs: - number = pr.get("number") - title = pr.get("title", "")[:60] - user = pr.get("user", {}).get("login", "unknown") - head = pr.get("head", {}).get("ref", "") - base = pr.get("base", {}).get("ref", "") - mergeable = pr.get("mergeable", None) - merge_status = "" - if mergeable is True: - merge_status = " ✅" - elif mergeable is False: - merge_status = " ⚠️ conflicts" - output += f"- #{number}: {title}\n {head} → {base} (by {user}){merge_status}\n" - output += f"\nShowing {len(prs)} PRs (Total: {total_count})" - if len(prs) == page_size: - output += f"\nMore results may exist. Use page={page + 1} to continue." - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": "Done", - "done": True, - "hidden": True, - }, - } - ) - return output - except httpx.HTTPStatusError as e: - return self._format_error(e) - except Exception as e: - return f"Error: {type(e).__name__}: {e}" - - async def get_pull_request( - self, - pr_number: int, - repo: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """Get details of a specific pull request.""" - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured" - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": f"Fetching PR #{pr_number}...", - "done": False, - }, - } - ) - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/pulls/{pr_number}"), - headers=self._headers(__user__), - ) - response.raise_for_status() - pr = response.json() - output = f"Pull Request #{pr_number} in {owner}/{repo_name}\n\n" - output += f"**Title:** {pr.get('title', '')}\n" - output += f"**State:** {pr.get('state', 'unknown')}\n" - output += f"**Author:** {pr.get('user', {}).get('login', 'unknown')}\n" - output += f"**Branch:** {pr.get('head', {}).get('ref', '')} → {pr.get('base', {}).get('ref', '')}\n" - output += f"**Mergeable:** {pr.get('mergeable', 'unknown')}\n" - output += f"**URL:** {pr.get('html_url', '')}\n" - body = pr.get("body", "") - if body: - output += f"\n**Description:**\n{body[:500]}" - if len(body) > 500: - output += "...(truncated)" - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": "Done", - "done": True, - "hidden": True, - }, - } - ) - return output - except httpx.HTTPStatusError as e: - if e.response.status_code == 404: - return f"PR #{pr_number} not found" - return self._format_error(e) - except Exception as e: - return f"Error: {type(e).__name__}: {e}" - - # ISSUES (WITH PAGINATION) - async def list_issues( - self, - state: str = "open", - repo: Optional[str] = None, - page: int = 1, - limit: Optional[int] = None, - labels: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """List issues in a repository.""" - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured" - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - page_size = self._get_page_size(limit) - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Fetching issues...", "done": False}, - } - ) - try: - params = { - "state": state, - "type": "issues", - "page": page, - "limit": page_size, - } - if labels: - params["labels"] = labels - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/issues"), - headers=self._headers(__user__), - params=params, - ) - response.raise_for_status() - issues = response.json() - total_count = response.headers.get("x-total-count", "?") - output = f"Issues in {owner}/{repo_name} ({state}, page {page}):\n\n" - if not issues: - output += "No issues found." - else: - for issue in issues: - number = issue.get("number") - title = issue.get("title", "")[:60] - user = issue.get("user", {}).get("login", "unknown") - issue_labels = ", ".join( - [label.get("name", "") for label in issue.get("labels", [])] - ) - labels_str = f" [{issue_labels}]" if issue_labels else "" - assignee = issue.get("assignee", {}) - assignee_str = ( - f" → {assignee.get('login', '')}" if assignee else "" - ) - output += f"- #{number}: {title}{labels_str} (by {user}){assignee_str}\n" - output += f"\nShowing {len(issues)} issues (Total: {total_count})" - if len(issues) == page_size: - output += f"\nMore results may exist. Use page={page + 1} to continue." - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": "Done", - "done": True, - "hidden": True, - }, - } - ) - return output - except httpx.HTTPStatusError as e: - return self._format_error(e) - except Exception as e: - return f"Error: {type(e).__name__}: {e}" - - async def create_issue( - self, - title: str, - body: str = "", - repo: Optional[str] = None, - labels: Optional[List[str]] = None, - assignee: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """Create a new issue.""" - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured" - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Creating issue...", "done": False}, - } - ) - try: - payload = {"title": title, "body": body} - if labels: - payload["labels"] = labels - if assignee: - payload["assignee"] = assignee - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.post( - self._api_url(f"/repos/{owner}/{repo_name}/issues"), - headers=self._headers(__user__), - json=payload, - ) - response.raise_for_status() - issue = response.json() - issue_number = issue.get("number") - issue_url = issue.get("html_url", "") - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": "Done", - "done": True, - "hidden": True, - }, - } - ) - return f"Created issue #{issue_number}: {title}\nURL: {issue_url}" - except httpx.HTTPStatusError as e: - return self._format_error(e) - except Exception as e: - return f"Error: {type(e).__name__}: {e}" - - async def get_issue( - self, - issue_number: int, - repo: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """Get details of a specific issue.""" - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured" - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": f"Fetching issue #{issue_number}...", - "done": False, - }, - } - ) - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/issues/{issue_number}"), - headers=self._headers(__user__), - ) - response.raise_for_status() - issue = response.json() - output = f"Issue #{issue_number} in {owner}/{repo_name}\n\n" - output += f"**Title:** {issue.get('title', '')}\n" - output += f"**State:** {issue.get('state', 'unknown')}\n" - output += ( - f"**Author:** {issue.get('user', {}).get('login', 'unknown')}\n" - ) - labels = [label.get("name", "") for label in issue.get("labels", [])] - if labels: - output += f"**Labels:** {', '.join(labels)}\n" - assignee = issue.get("assignee") - if assignee: - output += f"**Assignee:** {assignee.get('login', '')}\n" - output += f"**URL:** {issue.get('html_url', '')}\n" - body = issue.get("body", "") - if body: - output += f"\n**Description:**\n{body[:1000]}" - if len(body) > 1000: - output += "...(truncated)" - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": "Done", - "done": True, - "hidden": True, - }, - } - ) - return output - except httpx.HTTPStatusError as e: - if e.response.status_code == 404: - return f"Issue #{issue_number} not found" - return self._format_error(e) - except Exception as e: - return f"Error: {type(e).__name__}: {e}" - - async def update_issue( - self, - issue_number: int, - repo: Optional[str] = None, - title: Optional[str] = None, - body: Optional[str] = None, - state: Optional[str] = None, - assignee: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """Update an existing issue.""" - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured" - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": f"Updating issue #{issue_number}...", - "done": False, - }, - } - ) - # Build payload with only provided fields - payload = {} - if title is not None: - payload["title"] = title - if body is not None: - payload["body"] = body - if state is not None: - if state not in ("open", "closed"): - return f"Error: Invalid state '{state}'. Use 'open' or 'closed'." - payload["state"] = state - if assignee is not None: - payload["assignee"] = assignee - if not payload: - return "Error: No fields to update. Provide at least one of: title, body, state, assignee" - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.patch( - self._api_url(f"/repos/{owner}/{repo_name}/issues/{issue_number}"), - headers=self._headers(__user__), - json=payload, - ) - response.raise_for_status() - issue = response.json() - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": "Done", - "done": True, - "hidden": True, - }, - } - ) - return f"Updated issue #{issue_number}: {issue.get('title', '')}\nState: {issue.get('state', 'unknown')}\nURL: {issue.get('html_url', '')}" - except httpx.HTTPStatusError as e: - if e.response.status_code == 404: - return f"Issue #{issue_number} not found" - return self._format_error(e) - except Exception as e: - return f"Error: {type(e).__name__}: {e}" - - # PROJECT CONTEXT - async def get_project_context( - self, - repo: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """Get project context: README, structure, recent commits.""" - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured" - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - branch = self._get_branch(None, __user__) - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": f"Loading project context for {owner}/{repo_name}...", - "done": False, - }, - } - ) - output = f"# Project: {owner}/{repo_name}\n\n" - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - repo_response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}"), - headers=self._headers(__user__), - ) - if repo_response.status_code == 200: - repo_info = repo_response.json() - output += f"**Description:** {repo_info.get('description', 'No description')}\n" - output += f"**Default Branch:** {repo_info.get('default_branch', 'main')}\n" - output += ( - f"**Language:** {repo_info.get('language', 'Unknown')}\n\n" - ) - root_response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/contents/"), - headers=self._headers(__user__), - params={"ref": branch}, - ) - if root_response.status_code == 200: - contents = root_response.json() - output += "## Root Structure\n\n" - for item in sorted( - contents, - key=lambda x: (x.get("type") != "dir", x.get("name", "")), - ): - icon = "📁" if item.get("type") == "dir" else "📄" - output += f"{icon} {item.get('name', '')}\n" - output += "\n" - for readme_name in [ - "README.md", - "readme.md", - "README", - "README.txt", - ]: - readme_response = await client.get( - self._api_url( - f"/repos/{owner}/{repo_name}/contents/{readme_name}" - ), - headers=self._headers(__user__), - params={"ref": branch}, - ) - if readme_response.status_code == 200: - readme_info = readme_response.json() - content_b64 = readme_info.get("content", "") - try: - readme_content = base64.b64decode(content_b64).decode( - "utf-8" - ) - if len(readme_content) > 2000: - readme_content = ( - readme_content[:2000] + "\n\n... (truncated)" - ) - output += f"## README\n\n{readme_content}\n\n" - except: - pass - break - commits_response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/commits"), - headers=self._headers(__user__), - params={"sha": branch, "limit": 5}, - ) - if commits_response.status_code == 200: - commits = commits_response.json() - output += "## Recent Commits\n\n" - for commit in commits[:5]: - sha = commit.get("sha", "")[:8] - msg = ( - commit.get("commit", {}) - .get("message", "") - .split("\n")[0][:60] - ) - author = ( - commit.get("commit", {}) - .get("author", {}) - .get("name", "unknown") - ) - output += f"- `{sha}` {msg} ({author})\n" - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": "Done", - "done": True, - "hidden": True, - }, - } - ) - return output - except Exception as e: - return f"Error loading project context: {type(e).__name__}: {e}" - - # COMMITS - async def list_commits( - self, - repo: Optional[str] = None, - branch: Optional[str] = None, - page: int = 1, - limit: Optional[int] = None, - path: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """List commits in a repository.""" - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured" - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - effective_branch = self._get_branch(branch, __user__) - page_size = self._get_page_size(limit) - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": f"Fetching commits...", "done": False}, - } - ) - try: - params = { - "sha": effective_branch, - "page": page, - "limit": page_size, - } - if path: - params["path"] = path - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/commits"), - headers=self._headers(__user__), - params=params, - ) - response.raise_for_status() - commits = response.json() - total_count = response.headers.get("x-total-count", "?") - output = f"Commits in {owner}/{repo_name} ({effective_branch}, page {page}):\n" - if path: - output += f"Filtered to: {path}\n" - output += "\n" - for commit in commits: - sha = commit.get("sha", "")[:8] - commit_data = commit.get("commit", {}) - msg = commit_data.get("message", "").split("\n")[0][:60] - author = commit_data.get("author", {}).get("name", "unknown") - date = commit_data.get("author", {}).get("date", "")[:10] - output += f"- `{sha}` {msg}\n by {author} on {date}\n" - output += f"\nShowing {len(commits)} commits (Total: {total_count})" - if len(commits) == page_size: - output += ( - f"\nMore results may exist. Use page={page + 1} to continue." - ) - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": "Done", - "done": True, - "hidden": True, - }, - } - ) - return output - except httpx.HTTPStatusError as e: - return self._format_error(e) - except Exception as e: - return f"Error: {type(e).__name__}: {e}" - - async def get_commit( - self, - commit_sha: str, - repo: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """Get detailed information about a specific commit. - - :param commit_sha: Commit SHA (full or short) - :param repo: Repository in owner/repo format - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured" - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": f"Fetching commit {commit_sha[:8]}...", - "done": False, - }, - } - ) - - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/git/commits/{commit_sha}"), - headers=self._headers(__user__), - ) - response.raise_for_status() - commit = response.json() - - output = f"# Commit: {commit.get('sha', '')[:8]}\n\n" - output += f"**Full SHA:** {commit.get('sha', 'N/A')}\n" - output += f"**Message:** {commit.get('message', 'No message')}\n" - - author = commit.get('author', {}) - output += f"**Author:** {author.get('name', 'unknown')} <{author.get('email', '')}>\n" - output += f"**Date:** {author.get('date', 'N/A')}\n" - - committer = commit.get('committer', {}) - if committer.get('name') != author.get('name'): - output += f"**Committer:** {committer.get('name', 'unknown')} <{committer.get('email', '')}>\n" - - # Get file changes - response2 = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/commits/{commit_sha}"), - headers=self._headers(__user__), - ) - if response2.status_code == 200: - commit_detail = response2.json() - files = commit_detail.get('files', []) - if files: - output += f"\n**Files Changed:** {len(files)}\n" - for file in files[:20]: # Limit to first 20 files - status = file.get('status', '') - filename = file.get('filename', '') - additions = file.get('additions', 0) - deletions = file.get('deletions', 0) - output += f"- {status}: {filename} (+{additions}/-{deletions})\n" - if len(files) > 20: - output += f"... and {len(files) - 20} more files\n" - if __event_emitter__: await __event_emitter__( { @@ -1714,10 +1607,132 @@ class Tools: "data": {"description": "Done", "done": True, "hidden": True}, } ) - return output + + output = f"**Pull Request Created Successfully**\n\n" + output += f"**PR #{pr_number}:** {title}\n" + output += f"**Branch:** `{head_branch}` → `{effective_base}`\n" + output += f"**URL:** {pr_url}\n" + if body: + output += f"\n**Description:**\n{body[:100]}{'...' if len(body) > 100 else ''}\n" + + return output.strip() + except httpx.HTTPStatusError as e: - if e.response.status_code == 404: - return f"Commit '{commit_sha}' not found" - return self._format_error(e) + error_msg = self._format_error(e, f"pull request creation") + return f"Error: Failed to create pull request. {error_msg}" except Exception as e: - return f"Error: {type(e).__name__}: {e}" \ No newline at end of file + return ( + f"Error: Unexpected failure during PR creation: {type(e).__name__}: {e}" + ) + + async def delete_file( + self, + path: str, + message: str, + repo: Optional[str] = None, + branch: Optional[str] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + __event_call__: Callable[[dict], Any] = None, + ) -> str: + """ + Delete a file from the repository (requires confirmation). + + :param path: File path to delete + :param message: Commit message for the deletion + :param repo: Repository in 'owner/repo' format + :param branch: Branch name (defaults to repository default) + :return: Confirmation with commit details + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured in UserValves settings." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + effective_branch = self._get_branch(branch, __user__) + + # Confirmation dialog + if __event_call__: + result = await __event_call__( + { + "type": "confirmation", + "data": { + "title": "Confirm File Deletion", + "message": f"Delete `{path}` from `{owner}/{repo_name}` ({effective_branch})?", + }, + } + ) + if result is None or result is False: + return "⚠️ File deletion cancelled by user." + if isinstance(result, dict) and not result.get("confirmed"): + return "⚠️ File deletion cancelled by user." + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": f"Deleting {path}...", "done": False}, + } + ) + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + # Get file SHA + get_response = await client.get( + self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), + headers=self._headers(__user__), + params={"ref": effective_branch}, + ) + + if get_response.status_code == 404: + return f"Error: File not found: `{path}`" + + get_response.raise_for_status() + file_info = get_response.json() + sha = file_info.get("sha") + + if not sha: + return "Error: Could not retrieve file SHA for deletion." + + # Delete file + response = await client.delete( + self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), + headers=self._headers(__user__), + json={ + "message": message, + "branch": effective_branch, + "sha": sha, + }, + ) + response.raise_for_status() + result = response.json() + + commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + output = f"**File Deleted Successfully**\n\n" + output += f"**File:** `{owner}/{repo_name}/{path}`\n" + output += f"**Branch:** `{effective_branch}`\n" + output += f"**Commit:** `{commit_sha}`\n" + output += f"**Message:** {message}\n" + + return output.strip() + + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, f"file deletion for '{path}'") + return f"Error: Failed to delete file. {error_msg}" + except Exception as e: + return f"Error: Unexpected failure during file deletion: {type(e).__name__}: {e}"