From bfb25678d3672b6aefc7671c3978b005c1c3e2b5 Mon Sep 17 00:00:00 2001 From: xcaliber Date: Thu, 15 Jan 2026 13:53:04 +0000 Subject: [PATCH] fix(merge_pull_request): change merge_strategy to do parameter for Gitea API Gitea API expects {"do": "merge"} instead of {"merge_strategy": "merge"}. This was causing HTTP 422 errors when attempting to merge PRs. Refs: #5 --- gitea/dev.py | 3031 +------------------------------------------------- 1 file changed, 2 insertions(+), 3029 deletions(-) diff --git a/gitea/dev.py b/gitea/dev.py index 3140e51..bca4d03 100644 --- a/gitea/dev.py +++ b/gitea/dev.py @@ -1,3034 +1,7 @@ -""" -title: Gitea Dev - Native Mode Optimized -author: Jeff Smith + Claude + minimax + kimi-k2 -version: 1.4.0 -license: MIT -description: Interact with Gitea repositories - native tool calling optimized for high-tier LLMs with robust error handling -requirements: pydantic, httpx -changelog: - 1.4.0: - - Added CRUD operations for Issues (get, update, close, reopen, delete, comments) - - Added CRUD operations for Pull Requests (get, update, merge, comments) - 1.3.0: - - Native Mode optimization with explicit tool schemas - - Added mapping protocol support to Valves/UserValves (__iter__, keys, __getitem__) - - Enhanced __init__ for framework-driven configuration injection - - Added self.citation = True for tool usage visibility - - Robust error handling with detailed context in responses - - Structured context returns for better LLM understanding - - Added to_dict() helper methods on valve classes -""" - -from typing import Optional, Callable, Any, List, Dict -from pydantic import BaseModel, Field -import httpx -import base64 - - -class Tools: - class Valves(BaseModel): - """System-wide configuration for Gitea integration""" - - GITEA_URL: str = Field( - default="https://gitea.example.com", - description="Gitea server URL (ingress or internal service)", - ) - DEFAULT_REPO: str = Field( - default="", - description="Default repository in owner/repo format (e.g., 'myorg/myrepo')", - ) - DEFAULT_BRANCH: str = Field( - default="main", description="Default branch name for operations" - ) - DEFAULT_ORG: str = Field( - default="", description="Default organization for org-scoped operations" - ) - ALLOW_USER_OVERRIDES: bool = Field( - default=True, - description="Allow users to override defaults via UserValves", - ) - VERIFY_SSL: bool = Field( - default=True, - description="Verify SSL certificates (disable for self-signed certs)", - ) - DEFAULT_PAGE_SIZE: int = Field( - default=50, - description="Default page size for list operations (max 50)", - ge=1, - le=50, - ) - - class UserValves(BaseModel): - """Per-user configuration for personal credentials and overrides""" - - GITEA_TOKEN: str = Field( - default="", - description="Your Gitea API token (Settings > Applications > Generate Token)", - ) - USER_DEFAULT_REPO: str = Field( - default="", - description="Override default repository for this user", - ) - USER_DEFAULT_BRANCH: str = Field( - default="", - description="Override default branch for this user", - ) - USER_DEFAULT_ORG: str = Field( - default="", - description="Override default organization for this user", - ) - - def __init__(self): - """Initialize with optional valve configuration from framework""" - # Handle valves configuration from framework - self.valves = self.Valves() - - # Enable tool usage visibility for debugging - self.citation = True - - # Handle user valves configuration - self.user_valves = self.UserValves() - - def _api_url(self, endpoint: str) -> str: - """Construct full API URL for Gitea endpoint""" - base = self._get_url() - return f"{base}/api/v1{endpoint}" - - def _get_url(self) -> str: - """Get effective Gitea URL with trailing slash handling""" - # TODO: Allow USER OverRide - return self.valves.GITEA_URL.rstrip("/") - - def _get_token(self, __user__: dict = None) -> str: - """Extract Gitea token from user context with robust handling""" - if __user__ and "valves" in __user__: - user_valves = __user__.get("valves") if __user__ else None - return user_valves.GITEA_TOKEN - return "" - - def _headers(self, __user__: dict = None) -> dict: - """Generate authentication headers with token""" - token = self._get_token(__user__) - if not token: - return {"Content-Type": "application/json"} - return { - "Authorization": f"token {token}", - "Content-Type": "application/json", - } - - def _format_error(self, e, context: str = "") -> str: - """Format HTTP error with detailed context for LLM understanding""" - try: - error_json = e.response.json() - error_msg = error_json.get("message", e.response.text[:200]) - except Exception: - error_msg = e.response.text[:200] - - context_str = f" ({context})" if context else "" - return f"HTTP Error {e.response.status_code}{context_str}: {error_msg}" - - def _get_repo(self, repo: Optional[str], __user__: dict = None) -> str: - """Get effective repository with priority resolution""" - if repo: - return repo - if __user__ and "valves" in __user__: - user_valves = __user__.get("valves") if __user__ else None - if self.valves.ALLOW_USER_OVERRIDES and user_valves.USER_DEFAULT_REPO: - return user_valves.USER_DEFAULT_REPO - return self.valves.DEFAULT_REPO - - def _get_branch(self, branch: Optional[str], __user__: dict = None) -> str: - """Get effective branch with priority resolution""" - if branch: - return branch - if __user__ and "valves" in __user__: - user_valves = __user__.get("valves") if __user__ else None - if self.valves.ALLOW_USER_OVERRIDES and user_valves.USER_DEFAULT_BRANCH: - return user_valves.USER_DEFAULT_BRANCH - return self.valves.DEFAULT_BRANCH - - def _get_org(self, org: Optional[str], __user__: dict = None) -> str: - """Get effective org with priority.""" - if org: - return org - if __user__ and "valves" in __user__: - user_valves = __user__.get("valves") if __user__ else None - if self.valves.ALLOW_USER_OVERRIDES and user_valves.USER_DEFAULT_ORG: - return user_valves.USER_DEFAULT_ORG - return self.valves.DEFAULT_ORG - - def _resolve_repo( - self, repo: Optional[str], __user__: dict = None - ) -> tuple[str, str]: - """Resolve repository string into owner and repo name with validation""" - effective_repo = self._get_repo(repo, __user__) - - if not effective_repo: - raise ValueError( - "No repository specified. Set DEFAULT_REPO in Valves or USER_DEFAULT_REPO in UserValves." - ) - - if "/" not in effective_repo: - raise ValueError( - f"Repository must be in 'owner/repo' format, got: {effective_repo}" - ) - - return effective_repo.split("/", 1) - - def _get_page_size(self, limit: Optional[int] = None) -> int: - """Calculate effective page size, capped at Gitea's max of 50""" - if limit is not None: - return min(limit, 50) - return min(self.valves.DEFAULT_PAGE_SIZE, 50) - - async def list_repos( - self, - page: int = 1, - limit: Optional[int] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """ - List repositories accessible to the authenticated user with pagination. - - :param page: Page number for pagination (default: 1) - :param limit: Number of results per page (max 50, default: 50) - :return: Formatted string listing repositories with metadata - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured. Add it in UserValves settings." - - page_size = self._get_page_size(limit) - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": f"Fetching repositories (page {page})...", - "done": False, - }, - } - ) - - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.get( - self._api_url("/user/repos"), - headers=self._headers(__user__), - params={"page": page, "limit": page_size}, - ) - response.raise_for_status() - repos = response.json() - - total_count = response.headers.get("x-total-count", "?") - - output = f"**Accessible Repositories (Page {page}, {page_size}/page)**\n\n" - for repo in repos: - full_name = repo.get("full_name", "unknown") - desc = repo.get("description", "")[:50] or "No description" - private = "šŸ”’" if repo.get("private") else "🌐" - default_branch = repo.get("default_branch", "main") - stars = repo.get("stars_count", 0) - forks = repo.get("forks_count", 0) - - output += f"- {private} **{full_name}** ({default_branch})\n" - output += f" Stars: {stars} | Forks: {forks}\n" - output += f" {desc}\n\n" - - output += f"**Showing {len(repos)} repositories (Total: {total_count})**\n" - if len(repos) == page_size: - output += ( - f"_More results available. Use page={page + 1} to continue._\n" - ) - - effective_repo = self._get_repo(None, __user__) - if effective_repo: - output += f"\n**Active project:** {effective_repo}" - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Done", "done": True, "hidden": True}, - } - ) - - return output.strip() - - except httpx.HTTPStatusError as e: - error_msg = self._format_error(e, "repository listing") - if e.response.status_code == 401: - return f"Error: Invalid or missing GITEA_TOKEN. Please check your token in UserValves settings.\n\nDetails: {error_msg}" - return f"Error: Failed to list repositories. {error_msg}" - except Exception as e: - return f"Error: Unexpected failure during repository listing: {type(e).__name__}: {e}" - - async def get_repo( - self, - repo: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """ - Get detailed repository information and metadata. - - :param repo: Repository in 'owner/repo' format (e.g., 'open-webui/open-webui') - :return: Formatted repository details including stats, URLs, and configuration - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured in UserValves settings." - - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: Invalid repository format. {e}" - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": f"Fetching repository {owner}/{repo_name}...", - "done": False, - }, - } - ) - - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}"), - headers=self._headers(__user__), - ) - response.raise_for_status() - repo_info = response.json() - - output = f"# Repository: {owner}/{repo_name}\n\n" - output += f"**Full Name:** `{repo_info.get('full_name', 'N/A')}`\n" - output += ( - f"**Description:** {repo_info.get('description', 'No description')}\n" - ) - output += ( - f"**Private:** {'Yes šŸ”’' if repo_info.get('private') else 'No 🌐'}\n" - ) - output += f"**Fork:** {'Yes' if repo_info.get('fork') else 'No'}\n" - output += f"**Stars:** ⭐ {repo_info.get('stars_count', 0)}\n" - output += f"**Forks:** šŸ“ {repo_info.get('forks_count', 0)}\n" - output += f"**Watchers:** šŸ‘ļø {repo_info.get('watchers_count', 0)}\n" - output += f"**Open Issues:** šŸ“‹ {repo_info.get('open_issues_count', 0)}\n" - output += f"**Size:** {repo_info.get('size', 0)} KB\n" - output += f"**Language:** {repo_info.get('language', 'N/A')}\n" - output += ( - f"**Default Branch:** `{repo_info.get('default_branch', 'main')}`\n" - ) - output += f"**Created:** {repo_info.get('created_at', 'N/A')[:10]}\n" - output += f"**Updated:** {repo_info.get('updated_at', 'N/A')[:10]}\n" - output += f"**HTML URL:** {repo_info.get('html_url', 'N/A')}\n" - output += f"**Clone URL:** `{repo_info.get('clone_url', 'N/A')}`\n" - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Done", "done": True, "hidden": True}, - } - ) - - return output - - except httpx.HTTPStatusError as e: - error_msg = self._format_error(e, f"repository {owner}/{repo_name}") - if e.response.status_code == 404: - return f"Error: Repository '{owner}/{repo_name}' not found. Please verify the repository name and your access permissions." - return f"Error: Failed to fetch repository details. {error_msg}" - except Exception as e: - return f"Error: Unexpected failure during repository fetch: {type(e).__name__}: {e}" - - async def list_files( - self, - path: str = "", - repo: Optional[str] = None, - branch: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """ - List files and directories in a repository path. - - :param path: Directory path to list (default: root) - :param repo: Repository in 'owner/repo' format - :param branch: Branch name (defaults to repository default) - :return: Formatted directory listing with file sizes and types - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured in UserValves settings." - - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - effective_branch = self._get_branch(branch, __user__) - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": f"Listing {path or 'root'}...", - "done": False, - }, - } - ) - - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), - headers=self._headers(__user__), - params={"ref": effective_branch}, - ) - response.raise_for_status() - contents = response.json() - - if isinstance(contents, dict): - return f"**Note:** '{path}' is a file. Use `get_file()` to read its contents." - - output = f"**Contents of {owner}/{repo_name}/{path or '.'}** (`{effective_branch}` branch)\n\n" - - dirs = [item for item in contents if item.get("type") == "dir"] - files = [item for item in contents if item.get("type") == "file"] - - if dirs: - output += "**šŸ“ Directories:**\n" - for item in sorted(dirs, key=lambda x: x.get("name", "").lower()): - output += f"- `šŸ“ {item.get('name', '')}/`\n" - output += "\n" - - if files: - output += "**šŸ“„ Files:**\n" - for item in sorted(files, key=lambda x: x.get("name", "").lower()): - size = item.get("size", 0) - if size < 1024: - size_str = f"{size}B" - elif size < 1024 * 1024: - size_str = f"{size//1024}KB" - else: - size_str = f"{size//(1024*1024)}MB" - sha_short = item.get("sha", "unknown")[:8] - output += ( - f"- `{item.get('name', '')}` ({size_str}, sha: {sha_short})\n" - ) - - output += f"\n**Total:** {len(dirs)} directories, {len(files)} files" - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Done", "done": True, "hidden": True}, - } - ) - - return output - - except httpx.HTTPStatusError as e: - error_msg = self._format_error(e, f"directory listing for '{path}'") - if e.response.status_code == 404: - return f"Error: Path not found: `{path}`. Verify the path exists in the repository." - return f"Error: Failed to list directory contents. {error_msg}" - except Exception as e: - return f"Error: Unexpected failure during directory listing: {type(e).__name__}: {e}" - - async def get_file( - self, - path: str, - repo: Optional[str] = None, - branch: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """ - Get the contents of a file from the repository. - - :param path: Full path to the file (e.g., 'src/main.py') - :param repo: Repository in 'owner/repo' format - :param branch: Branch name (defaults to repository default) - :return: File content with metadata (SHA, size, branch) - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured in UserValves settings." - - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - effective_branch = self._get_branch(branch, __user__) - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": f"Reading file {path}...", "done": False}, - } - ) - - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), - headers=self._headers(__user__), - params={"ref": effective_branch}, - ) - response.raise_for_status() - file_info = response.json() - - if isinstance(file_info, list): - return f"**Note:** '{path}' is a directory. Use `list_files()` to browse its contents." - - if file_info.get("type") != "file": - return f"Error: '{path}' is not a file (type: {file_info.get('type')})" - - content_b64 = file_info.get("content", "") - try: - content = base64.b64decode(content_b64).decode("utf-8") - except Exception: - return "Error: Could not decode file content. The file may be binary or corrupted." - - size = file_info.get("size", 0) - if size < 1024: - size_str = f"{size}B" - elif size < 1024 * 1024: - size_str = f"{size//1024}KB" - else: - size_str = f"{size//(1024*1024)}MB" - - sha_short = file_info.get("sha", "unknown")[:8] - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Done", "done": True, "hidden": True}, - } - ) - - output = f"**File:** `{owner}/{repo_name}/{path}`\n" - output += f"**Branch:** `{effective_branch}` | **SHA:** `{sha_short}` | **Size:** {size_str}\n" - output += f"**URL:** {file_info.get('html_url', 'N/A')}\n\n" - output += f"```\n{content}\n```" - - return output - - except httpx.HTTPStatusError as e: - error_msg = self._format_error(e, f"file fetch for '{path}'") - if e.response.status_code == 404: - return ( - f"Error: File not found: `{path}`. Verify the file path and branch." - ) - return f"Error: Failed to fetch file. {error_msg}" - except Exception as e: - return ( - f"Error: Unexpected failure during file fetch: {type(e).__name__}: {e}" - ) - - async def update_file( - self, - path: str, - content: str, - message: str, - repo: Optional[str] = None, - branch: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """ - Update an existing file in the repository (creates commit). - - :param path: File path to update - :param content: New file content as string - :param message: Commit message - :param repo: Repository in 'owner/repo' format - :param branch: Branch name (defaults to repository default) - :return: Commit details and success confirmation - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured in UserValves settings." - - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - effective_branch = self._get_branch(branch, __user__) - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": f"Updating {path}...", "done": False}, - } - ) - - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - # Get current file SHA - get_response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), - headers=self._headers(__user__), - params={"ref": effective_branch}, - ) - - if get_response.status_code == 404: - return f"Error: File not found: `{path}`. Use `create_file()` to create a new file." - - get_response.raise_for_status() - file_info = get_response.json() - sha = file_info.get("sha") - - if not sha: - return "Error: Could not retrieve file SHA for update. The file may be corrupted or inaccessible." - - # Prepare updated content - content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii") - - # Update file - response = await client.put( - self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), - headers=self._headers(__user__), - json={ - "content": content_b64, - "message": message, - "branch": effective_branch, - "sha": sha, - }, - ) - response.raise_for_status() - result = response.json() - - commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Done", "done": True, "hidden": True}, - } - ) - - output = f"**File Updated Successfully**\n\n" - output += f"**File:** `{owner}/{repo_name}/{path}`\n" - output += f"**Branch:** `{effective_branch}`\n" - output += f"**Commit:** `{commit_sha}`\n" - output += f"**Message:** {message}\n" - - return output - - except httpx.HTTPStatusError as e: - error_msg = self._format_error(e, f"file update for '{path}'") - if e.response.status_code == 409: - return f"Error: Update conflict for `{path}`. The file may have been modified by another process. Fetch the latest version and try again." - return f"Error: Failed to update file. {error_msg}" - except Exception as e: - return ( - f"Error: Unexpected failure during file update: {type(e).__name__}: {e}" - ) - - async def create_file( - self, - path: str, - content: str, - message: str, - repo: Optional[str] = None, - branch: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """ - Create a new file in the repository. - - :param path: File path to create (e.g., 'docs/README.md') - :param content: Initial file content as string - :param message: Commit message - :param repo: Repository in 'owner/repo' format - :param branch: Branch name (defaults to repository default) - :return: Commit details and success confirmation - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured in UserValves settings." - - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - effective_branch = self._get_branch(branch, __user__) - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": f"Creating {path}...", "done": False}, - } - ) - - try: - content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii") - - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.post( - self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), - headers=self._headers(__user__), - json={ - "content": content_b64, - "message": message, - "branch": effective_branch, - }, - ) - response.raise_for_status() - result = response.json() - - commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Done", "done": True, "hidden": True}, - } - ) - - output = f"**File Created Successfully**\n\n" - output += f"**File:** `{owner}/{repo_name}/{path}`\n" - output += f"**Branch:** `{effective_branch}`\n" - output += f"**Commit:** `{commit_sha}`\n" - output += f"**Message:** {message}\n" - - return output - - except httpx.HTTPStatusError as e: - error_msg = self._format_error(e, f"file creation for '{path}'") - if e.response.status_code == 422: - return f"Error: File already exists: `{path}`. Use `update_file()` to modify it instead." - return f"Error: Failed to create file. {error_msg}" - except Exception as e: - return f"Error: Unexpected failure during file creation: {type(e).__name__}: {e}" - - async def get_project_context( - self, - repo: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """ - Get comprehensive project context including README, structure, and recent commits. - - :param repo: Repository in 'owner/repo' format - :return: Structured project overview with README, file structure, and commit history - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured in UserValves settings." - - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - branch = self._get_branch(None, __user__) - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": f"Loading project context for {owner}/{repo_name}...", - "done": False, - }, - } - ) - - output = f"# Project Context: {owner}/{repo_name}\n\n" - - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - # Get repository metadata - repo_response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}"), - headers=self._headers(__user__), - ) - if repo_response.status_code == 200: - repo_info = repo_response.json() - output += f"**Description:** {repo_info.get('description', 'No description')}\n" - output += f"**Default Branch:** `{repo_info.get('default_branch', 'main')}`\n" - output += f"**Language:** {repo_info.get('language', 'Unknown')}\n" - output += f"**Stars:** ⭐ {repo_info.get('stars_count', 0)} | " - output += f"**Forks:** šŸ“ {repo_info.get('forks_count', 0)}\n\n" - - # Get root directory structure - root_response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/contents/"), - headers=self._headers(__user__), - params={"ref": branch}, - ) - if root_response.status_code == 200: - contents = root_response.json() - output += "## šŸ“ Repository Structure\n\n" - - dirs = [item for item in contents if item.get("type") == "dir"] - files = [item for item in contents if item.get("type") == "file"] - - if dirs: - output += "**Directories:**\n" - for item in sorted( - dirs, key=lambda x: x.get("name", "").lower() - ): - output += f"- `šŸ“ {item.get('name', '')}/`\n" - output += "\n" - - if files: - output += "**Files:**\n" - for item in sorted( - files, key=lambda x: x.get("name", "").lower() - ): - size = item.get("size", 0) - if size < 1024: - size_str = f"{size}B" - else: - size_str = f"{size//1024}KB" - output += f"- `šŸ“„ {item.get('name', '')}` ({size_str})\n" - output += "\n" - - # Try to find and include README - readme_found = False - for readme_name in ["README.md", "readme.md", "README", "README.txt"]: - readme_response = await client.get( - self._api_url( - f"/repos/{owner}/{repo_name}/contents/{readme_name}" - ), - headers=self._headers(__user__), - params={"ref": branch}, - ) - if readme_response.status_code == 200: - readme_info = readme_response.json() - content_b64 = readme_info.get("content", "") - try: - readme_content = base64.b64decode(content_b64).decode( - "utf-8" - ) - # Truncate very long READMEs - if len(readme_content) > 2000: - readme_content = ( - readme_content[:2000] + "\n\n... (truncated)" - ) - output += "## šŸ“„ README\n\n" - output += f"```markdown\n{readme_content}\n```\n\n" - readme_found = True - except Exception: - pass - break - - if not readme_found: - output += "## šŸ“„ README\n\n" - output += "*No README file found in repository root.*\n\n" - - # Get recent commits - commits_response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/commits"), - headers=self._headers(__user__), - params={"sha": branch, "limit": 5}, - ) - if commits_response.status_code == 200: - commits = commits_response.json() - output += "## šŸ“ Recent Commits\n\n" - for commit in commits: - sha = commit.get("sha", "")[:8] - commit_data = commit.get("commit", {}) - msg = commit_data.get("message", "").split("\n")[0][:60] - author = commit_data.get("author", {}).get("name", "unknown") - date = commit_data.get("author", {}).get("date", "")[:10] - output += f"- `{sha}` {msg}\n" - output += f" by {author} on {date}\n\n" - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Done", "done": True, "hidden": True}, - } - ) - - return output.strip() - - except Exception as e: - return f"Error: Failed to load project context: {type(e).__name__}: {e}" - - async def list_branches( - self, - repo: Optional[str] = None, - page: int = 1, - limit: Optional[int] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """ - List branches in a repository with pagination. - - :param repo: Repository in 'owner/repo' format - :param page: Page number for pagination (default: 1) - :param limit: Results per page (max 50, default: 50) - :return: Formatted branch listing with protection status and commit SHAs - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured in UserValves settings." - - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - page_size = self._get_page_size(limit) - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": f"Fetching branches for {owner}/{repo_name} (page {page})...", - "done": False, - }, - } - ) - - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/branches"), - headers=self._headers(__user__), - params={"page": page, "limit": page_size}, - ) - response.raise_for_status() - branches = response.json() - - total_count = response.headers.get("x-total-count", "?") - - output = f"**Branches in {owner}/{repo_name} (Page {page})**\n\n" - for branch in branches: - name = branch.get("name", "") - protected = "šŸ›”ļø" if branch.get("protected") else "" - commit_sha = branch.get("commit", {}).get("id", "")[:8] - output += f"- `{name}` {protected} [commit: {commit_sha}]\n" - - output += f"\n**Showing {len(branches)} branches (Total: {total_count})**" - if len(branches) == page_size: - output += ( - f"\n_More results available. Use page={page + 1} to continue._" - ) - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Done", "done": True, "hidden": True}, - } - ) - - return output.strip() - - except httpx.HTTPStatusError as e: - error_msg = self._format_error(e, "branch listing") - return f"Error: Failed to list branches. {error_msg}" - except Exception as e: - return f"Error: Unexpected failure during branch listing: {type(e).__name__}: {e}" - - async def create_branch( - self, - branch_name: str, - from_branch: Optional[str] = None, - repo: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """ - Create a new branch from an existing branch. - - :param branch_name: Name for the new branch - :param from_branch: Source branch (defaults to repository default branch) - :param repo: Repository in 'owner/repo' format - :return: Success confirmation with branch details - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured in UserValves settings." - - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - effective_from = from_branch or self._get_branch(None, __user__) - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": f"Creating branch {branch_name}...", - "done": False, - }, - } - ) - - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.post( - self._api_url(f"/repos/{owner}/{repo_name}/branches"), - headers=self._headers(__user__), - json={ - "new_branch_name": branch_name, - "old_branch_name": effective_from, - }, - ) - response.raise_for_status() - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Done", "done": True, "hidden": True}, - } - ) - - return f"āœ… Created branch `{branch_name}` from `{effective_from}` in `{owner}/{repo_name}`" - - except httpx.HTTPStatusError as e: - error_msg = self._format_error(e, f"branch creation") - if e.response.status_code == 409: - return f"Error: Branch `{branch_name}` already exists in repository." - return f"Error: Failed to create branch. {error_msg}" - except Exception as e: - return f"Error: Unexpected failure during branch creation: {type(e).__name__}: {e}" - - async def list_commits( - self, - repo: Optional[str] = None, - branch: Optional[str] = None, - page: int = 1, - limit: Optional[int] = None, - path: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """ - List commits in a repository with filtering and pagination. - - :param repo: Repository in 'owner/repo' format - :param branch: Branch name (defaults to repository default) - :param page: Page number for pagination (default: 1) - :param limit: Results per page (max 50, default: 50) - :param path: Filter commits to specific file/directory path - :return: Formatted commit history with author, date, and message - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured in UserValves settings." - - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - effective_branch = self._get_branch(branch, __user__) - page_size = self._get_page_size(limit) - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Fetching commits...", "done": False}, - } - ) - - try: - params = { - "sha": effective_branch, - "page": page, - "limit": page_size, - } - if path: - params["path"] = path - - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/commits"), - headers=self._headers(__user__), - params=params, - ) - response.raise_for_status() - commits = response.json() - - total_count = response.headers.get("x-total-count", "?") - - output = f"**Commits in {owner}/{repo_name}** (`{effective_branch}`" - if path: - output += f", filtered to: `{path}`" - output += f", page {page})\n\n" - - for commit in commits: - sha = commit.get("sha", "")[:8] - commit_data = commit.get("commit", {}) - msg = commit_data.get("message", "").split("\n")[0][:60] - author = commit_data.get("author", {}).get("name", "unknown") - date = commit_data.get("author", {}).get("date", "")[:10] - output += f"- `{sha}` {msg}\n" - output += f" by {author} on {date}\n\n" - - output += f"**Showing {len(commits)} commits (Total: {total_count})**" - if len(commits) == page_size: - output += ( - f"\n_More results available. Use page={page + 1} to continue._" - ) - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Done", "done": True, "hidden": True}, - } - ) - - return output.strip() - - except httpx.HTTPStatusError as e: - error_msg = self._format_error(e, "commit listing") - return f"Error: Failed to list commits. {error_msg}" - except Exception as e: - return f"Error: Unexpected failure during commit listing: {type(e).__name__}: {e}" - - async def get_commit( - self, - commit_sha: str, - repo: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """ - Get detailed information about a specific commit. - - :param commit_sha: Full or short commit SHA - :param repo: Repository in 'owner/repo' format - :return: Comprehensive commit details including files changed and diff stats - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured in UserValves settings." - - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": f"Fetching commit {commit_sha[:8]}...", - "done": False, - }, - } - ) - - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - # Get commit details (this endpoint may vary by Gitea version) - response = await client.get( - self._api_url( - f"/repos/{owner}/{repo_name}/git/commits/{commit_sha}" - ), - headers=self._headers(__user__), - ) - - if response.status_code == 404: - # Try alternative endpoint - response = await client.get( - self._api_url( - f"/repos/{owner}/{repo_name}/commits/{commit_sha}" - ), - headers=self._headers(__user__), - ) - - response.raise_for_status() - commit = response.json() - - output = f"# Commit: {commit.get('sha', '')[:8]}\n\n" - - # Basic commit info - output += f"**Full SHA:** `{commit.get('sha', 'N/A')}`\n" - output += f"**Message:** {commit.get('message', 'No message')}\n\n" - - # Author info - author = commit.get("author", {}) - if author: - output += f"**Author:** {author.get('name', 'unknown')} <{author.get('email', '')}>\n" - output += f"**Date:** {author.get('date', 'N/A')}\n\n" - - # Committer info (if different from author) - committer = commit.get("committer", {}) - if committer and committer.get("name") != author.get("name"): - output += f"**Committer:** {committer.get('name', 'unknown')} <{committer.get('email', '')}>\n\n" - - # Files changed (if available) - if "files" in commit: - files = commit.get("files", []) - output += f"**Files Changed:** {len(files)}\n\n" - for file in files[:20]: # Limit to first 20 - status = file.get("status", "") - filename = file.get("filename", "") - additions = file.get("additions", 0) - deletions = file.get("deletions", 0) - changes = file.get("changes", 0) - output += f"- **{status}:** `{filename}`" - if changes > 0: - output += f" (+{additions}/-{deletions})\n" - else: - output += "\n" - if len(files) > 20: - output += f"\n... and {len(files) - 20} more files\n" - - elif "stats" in commit: - # Alternative stats format - stats = commit.get("stats", {}) - output += f"**Changes:** +{stats.get('additions', 0)} additions, -{stats.get('deletions', 0)} deletions\n\n" - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Done", "done": True, "hidden": True}, - } - ) - - return output.strip() - - except httpx.HTTPStatusError as e: - error_msg = self._format_error(e, f"commit fetch for '{commit_sha}'") - if e.response.status_code == 404: - return f"Error: Commit '{commit_sha}' not found in repository." - return f"Error: Failed to fetch commit details. {error_msg}" - except Exception as e: - return f"Error: Unexpected failure during commit fetch: {type(e).__name__}: {e}" - - async def list_issues( - self, - state: str = "open", - repo: Optional[str] = None, - page: int = 1, - limit: Optional[int] = None, - labels: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """ - List issues in a repository with filtering and pagination. - - :param state: Issue state - 'open', 'closed', or 'all' (default: 'open') - :param repo: Repository in 'owner/repo' format - :param page: Page number for pagination (default: 1) - :param limit: Results per page (max 50, default: 50) - :param labels: Comma-separated label names to filter by - :return: Formatted issue listing with metadata and assignees - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured in UserValves settings." - - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - page_size = self._get_page_size(limit) - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Fetching issues...", "done": False}, - } - ) - - try: - params = { - "state": state, - "type": "issues", # Filter out PRs - "page": page, - "limit": page_size, - } - if labels: - params["labels"] = labels - - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/issues"), - headers=self._headers(__user__), - params=params, - ) - response.raise_for_status() - issues = response.json() - - total_count = response.headers.get("x-total-count", "?") - - output = f"**Issues in {owner}/{repo_name}** (state: `{state}`, page: {page})\n\n" - - if not issues: - output += "_No issues found._" - else: - for issue in issues: - number = issue.get("number") - title = issue.get("title", "")[:80] - user = issue.get("user", {}).get("login", "unknown") - - # Labels - issue_labels = [ - label.get("name", "") for label in issue.get("labels", []) - ] - labels_str = ( - f" [{' , '.join(issue_labels)}]" if issue_labels else "" - ) - - # Assignee - assignee = issue.get("assignee") - assignee_str = ( - f" → @{assignee.get('login', '')}" if assignee else "" - ) - - output += f"- **#{number}:** {title}{labels_str} (by @{user}){assignee_str}\n\n" - - output += f"\n**Showing {len(issues)} issues (Total: {total_count})**" - if len(issues) == page_size: - output += ( - f"\n_More results available. Use page={page + 1} to continue._" - ) - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Done", "done": True, "hidden": True}, - } - ) - - return output.strip() - - except httpx.HTTPStatusError as e: - error_msg = self._format_error(e, "issue listing") - return f"Error: Failed to list issues. {error_msg}" - except Exception as e: - return f"Error: Unexpected failure during issue listing: {type(e).__name__}: {e}" - - async def get_issue( - self, - issue_number: int, - repo: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """ - Get detailed information about a specific issue. - - :param issue_number: Issue number to retrieve - :param repo: Repository in 'owner/repo' format - :return: Comprehensive issue details including title, body, labels, and comments count - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured in UserValves settings." - - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": f"Fetching issue #{issue_number}...", - "done": False, - }, - } - ) - - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/issues/{issue_number}"), - headers=self._headers(__user__), - ) - response.raise_for_status() - issue = response.json() - - # Extract issue data - title = issue.get("title", "No title") - body = issue.get("body", "") - state = issue.get("state", "unknown") - user = issue.get("user", {}).get("login", "unknown") - created_at = issue.get("created_at", "")[:10] - updated_at = issue.get("updated_at", "")[:10] - comments_count = issue.get("comments", 0) - html_url = issue.get("html_url", "") - - # Labels - issue_labels = [label.get("name", "") for label in issue.get("labels", [])] - labels_str = ", ".join(issue_labels) if issue_labels else "None" - - # Assignee - assignee = issue.get("assignee") - assignee_str = ( - f"@{assignee.get('login', 'unknown')}" if assignee else "Unassigned" - ) - - # Build output - output = f"# Issue #{issue_number}: {title}\n\n" - output += f"**State:** {state.upper()}\n" - output += f"**Author:** @{user}\n" - output += f"**Assignee:** {assignee_str}\n" - output += f"**Labels:** {labels_str}\n" - output += f"**Created:** {created_at}\n" - output += f"**Updated:** {updated_at}\n" - output += f"**Comments:** {comments_count}\n" - output += f"**URL:** {html_url}\n\n" - - if body: - output += f"## Description\n\n{body}\n\n" - else: - output += "_No description provided._\n\n" - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Done", "done": True, "hidden": True}, - } - ) - - return output.strip() - - except httpx.HTTPStatusError as e: - error_msg = self._format_error(e, f"issue #{issue_number}") - if e.response.status_code == 404: - return f"Error: Issue #{issue_number} not found in {owner}/{repo_name}." - return f"Error: Failed to fetch issue. {error_msg}" - except Exception as e: - return ( - f"Error: Unexpected failure during issue fetch: {type(e).__name__}: {e}" - ) - - async def update_issue( - self, - issue_number: int, - repo: Optional[str] = None, - title: Optional[str] = None, - body: Optional[str] = None, - state: Optional[str] = None, - assignee: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """ - Update an existing issue with new title, body, state, or assignee. - - :param issue_number: Issue number to update - :param repo: Repository in 'owner/repo' format - :param title: New title (optional) - :param body: New description body (optional) - :param state: New state - "open" or "closed" (optional) - :param assignee: Username to assign (optional, use empty string to unassign) - :return: Confirmation with updated issue details - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured in UserValves settings." - - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - # Validate state if provided - if state and state not in ("open", "closed"): - return "Error: State must be 'open' or 'closed'." - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": f"Updating issue #{issue_number}...", - "done": False, - }, - } - ) - - try: - payload = {} - if title is not None: - payload["title"] = title - if body is not None: - payload["body"] = body - if state is not None: - payload["state"] = state - if assignee is not None: - payload["assignee"] = assignee if assignee else "" - - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.patch( - self._api_url(f"/repos/{owner}/{repo_name}/issues/{issue_number}"), - headers=self._headers(__user__), - json=payload, - ) - response.raise_for_status() - issue = response.json() - - updated_title = issue.get("title", "No title") - updated_state = issue.get("state", "unknown") - html_url = issue.get("html_url", "") - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Done", "done": True, "hidden": True}, - } - ) - - output = f"**Issue #{issue_number} Updated Successfully**\n\n" - output += f"**Title:** {updated_title}\n" - output += f"**State:** {updated_state.upper()}\n" - output += f"**URL:** {html_url}\n" - output += f"\nChanges applied: {', '.join(payload.keys())}\n" - - return output.strip() - - except httpx.HTTPStatusError as e: - error_msg = self._format_error(e, f"issue #{issue_number} update") - return f"Error: Failed to update issue. {error_msg}" - except Exception as e: - return f"Error: Unexpected failure during issue update: {type(e).__name__}: {e}" - - async def close_issue( - self, - issue_number: int, - repo: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """ - Close an open issue. - - :param issue_number: Issue number to close - :param repo: Repository in 'owner/repo' format - :return: Confirmation of issue closure - """ - return await self.update_issue( - issue_number, - repo, - state="closed", - __user__=__user__, - __event_emitter__=__event_emitter__, - ) - - async def reopen_issue( - self, - issue_number: int, - repo: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """ - Reopen a closed issue. - - :param issue_number: Issue number to reopen - :param repo: Repository in 'owner/repo' format - :return: Confirmation of issue reopening - """ - return await self.update_issue( - issue_number, - repo, - state="open", - __user__=__user__, - __event_emitter__=__event_emitter__, - ) - - async def delete_issue( - self, - issue_number: int, - repo: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - __event_call__: Callable[[dict], Any] = None, - ) -> str: - """ - Delete an issue from the repository. - - :param issue_number: Issue number to delete - :param repo: Repository in 'owner/repo' format - :return: Confirmation of issue deletion - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured in UserValves settings." - - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - # Confirmation dialog - if __event_call__: - result = await __event_call__( - { - "type": "confirmation", - "data": { - "title": "Confirm Issue Deletion", - "message": f"Delete issue #{issue_number} from `{owner}/{repo_name}`?", - }, - } - ) - if result is None or result is False: - return "āš ļø Issue deletion cancelled by user." - if isinstance(result, dict) and not result.get("confirmed"): - return "āš ļø Issue deletion cancelled by user." - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": f"Deleting issue #{issue_number}...", - "done": False, - }, - } - ) - - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.delete( - self._api_url(f"/repos/{owner}/{repo_name}/issues/{issue_number}"), - headers=self._headers(__user__), - ) - response.raise_for_status() - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Done", "done": True, "hidden": True}, - } - ) - - return f"**Issue #{issue_number} Deleted Successfully** from `{owner}/{repo_name}`" - - except httpx.HTTPStatusError as e: - error_msg = self._format_error(e, f"issue #{issue_number} deletion") - if e.response.status_code == 404: - return f"Error: Issue #{issue_number} not found in {owner}/{repo_name}." - return f"Error: Failed to delete issue. {error_msg}" - except Exception as e: - return f"Error: Unexpected failure during issue deletion: {type(e).__name__}: {e}" - - async def list_issue_comments( - self, - issue_number: int, - repo: Optional[str] = None, - page: int = 1, - limit: Optional[int] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """ - List all comments on an issue. - - :param issue_number: Issue number to get comments for - :param repo: Repository in 'owner/repo' format - :param page: Page number for pagination (default: 1) - :param limit: Results per page (max 50, default: 50) - :return: Formatted comment listing with author and date - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured in UserValves settings." - - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - page_size = self._get_page_size(limit) - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Fetching comments...", "done": False}, - } - ) - - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.get( - self._api_url( - f"/repos/{owner}/{repo_name}/issues/{issue_number}/comments" - ), - headers=self._headers(__user__), - params={"page": page, "limit": page_size}, - ) - response.raise_for_status() - comments = response.json() - - total_count = response.headers.get("x-total-count", "?") - - output = f"**Comments on Issue #{issue_number}** ({owner}/{repo_name})\n\n" - - if not comments: - output += "_No comments yet._" - else: - for comment in comments: - comment_id = comment.get("id", 0) - user = comment.get("user", {}).get("login", "unknown") - created_at = comment.get("created_at", "")[:10] - body = comment.get("body", "No content") - - output += ( - f"**Comment by @{user}** (ID: {comment_id}, {created_at})\n" - ) - output += f"{body}\n\n" - output += "---\n\n" - - output += f"\n**Showing {len(comments)} comments (Total: {total_count})**" - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Done", "done": True, "hidden": True}, - } - ) - - return output.strip() - - except httpx.HTTPStatusError as e: - error_msg = self._format_error(e, f"issue #{issue_number} comments") - if e.response.status_code == 404: - return f"Error: Issue #{issue_number} not found in {owner}/{repo_name}." - return f"Error: Failed to list issue comments. {error_msg}" - except Exception as e: - return f"Error: Unexpected failure during comment listing: {type(e).__name__}: {e}" - - async def create_issue_comment( - self, - issue_number: int, - body: str, - repo: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """ - Add a comment to an issue. - - :param issue_number: Issue number to comment on - :param body: Comment content - :param repo: Repository in 'owner/repo' format - :return: Confirmation with comment details - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured in UserValves settings." - - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - if not body or not body.strip(): - return "Error: Comment body cannot be empty." - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Adding comment...", "done": False}, - } - ) - - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.post( - self._api_url( - f"/repos/{owner}/{repo_name}/issues/{issue_number}/comments" - ), - headers=self._headers(__user__), - json={"body": body}, - ) - response.raise_for_status() - comment = response.json() - - comment_id = comment.get("id", 0) - html_url = comment.get("html_url", "") - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Done", "done": True, "hidden": True}, - } - ) - - output = f"**Comment Added Successfully**\n\n" - output += f"**Issue:** #{issue_number}\n" - output += f"**Comment ID:** {comment_id}\n" - output += f"**URL:** {html_url}\n\n" - output += f"**Content:**\n{body}\n" - - return output.strip() - - except httpx.HTTPStatusError as e: - error_msg = self._format_error(e, f"comment on issue #{issue_number}") - if e.response.status_code == 404: - return f"Error: Issue #{issue_number} not found in {owner}/{repo_name}." - return f"Error: Failed to create comment. {error_msg}" - except Exception as e: - return f"Error: Unexpected failure during comment creation: {type(e).__name__}: {e}" - - async def update_issue_comment( - self, - comment_id: int, - body: str, - repo: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """ - Update an existing comment on an issue. - - :param comment_id: Comment ID to update - :param body: New comment content - :param repo: Repository in 'owner/repo' format - :return: Confirmation with updated comment details - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured in UserValves settings." - - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - if not body or not body.strip(): - return "Error: Comment body cannot be empty." - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Updating comment...", "done": False}, - } - ) - - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.patch( - self._api_url( - f"/repos/{owner}/{repo_name}/issues/comments/{comment_id}" - ), - headers=self._headers(__user__), - json={"body": body}, - ) - response.raise_for_status() - comment = response.json() - - updated_body = comment.get("body", "No content") - updated_at = comment.get("updated_at", "")[:10] - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Done", "done": True, "hidden": True}, - } - ) - - output = f"**Comment {comment_id} Updated Successfully**\n\n" - output += f"**Repository:** {owner}/{repo_name}\n" - output += f"**Updated:** {updated_at}\n\n" - output += f"**New Content:**\n{updated_body}\n" - - return output.strip() - - except httpx.HTTPStatusError as e: - error_msg = self._format_error(e, f"comment #{comment_id}") - if e.response.status_code == 404: - return f"Error: Comment #{comment_id} not found in {owner}/{repo_name}." - return f"Error: Failed to update comment. {error_msg}" - except Exception as e: - return f"Error: Unexpected failure during comment update: {type(e).__name__}: {e}" - - async def delete_issue_comment( - self, - comment_id: int, - repo: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - __event_call__: Callable[[dict], Any] = None, - ) -> str: - """ - Delete a comment from an issue. - - :param comment_id: Comment ID to delete - :param repo: Repository in 'owner/repo' format - :return: Confirmation of comment deletion - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured in UserValves settings." - - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - # Confirmation dialog - if __event_call__: - result = await __event_call__( - { - "type": "confirmation", - "data": { - "title": "Confirm Comment Deletion", - "message": f"Delete comment #{comment_id} from `{owner}/{repo_name}`?", - }, - } - ) - if result is None or result is False: - return "āš ļø Comment deletion cancelled by user." - if isinstance(result, dict) and not result.get("confirmed"): - return "āš ļø Comment deletion cancelled by user." - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": f"Deleting comment #{comment_id}...", - "done": False, - }, - } - ) - - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.delete( - self._api_url( - f"/repos/{owner}/{repo_name}/issues/comments/{comment_id}" - ), - headers=self._headers(__user__), - ) - response.raise_for_status() - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Done", "done": True, "hidden": True}, - } - ) - - return f"**Comment #{comment_id} Deleted Successfully** from `{owner}/{repo_name}`" - - except httpx.HTTPStatusError as e: - error_msg = self._format_error(e, f"comment #{comment_id} deletion") - if e.response.status_code == 404: - return f"Error: Comment #{comment_id} not found in {owner}/{repo_name}." - return f"Error: Failed to delete comment. {error_msg}" - except Exception as e: - return f"Error: Unexpected failure during comment deletion: {type(e).__name__}: {e}" - - async def create_issue( - self, - title: str, - body: str = "", - repo: Optional[str] = None, - labels: Optional[List[str]] = None, - assignee: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """ - Create a new issue in the repository. - - :param title: Issue title - :param body: Issue description/body - :param repo: Repository in 'owner/repo' format - :param labels: List of label names to apply - :param assignee: Username to assign the issue to - :return: Confirmation with issue number and URL - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured in UserValves settings." - - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Creating issue...", "done": False}, - } - ) - - try: - payload = { - "title": title, - "body": body, - } - if labels: - payload["labels"] = labels - if assignee: - payload["assignee"] = assignee - - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.post( - self._api_url(f"/repos/{owner}/{repo_name}/issues"), - headers=self._headers(__user__), - json=payload, - ) - response.raise_for_status() - issue = response.json() - - issue_number = issue.get("number") - issue_url = issue.get("html_url", "") - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Done", "done": True, "hidden": True}, - } - ) - - output = f"**Issue Created Successfully**\n\n" - output += f"**Issue #{issue_number}:** {title}\n" - output += f"**URL:** {issue_url}\n" - if labels: - output += f"**Labels:** {', '.join(labels)}\n" - if assignee: - output += f"**Assigned to:** @{assignee}\n" - - return output.strip() - - except httpx.HTTPStatusError as e: - error_msg = self._format_error(e, f"issue creation") - return f"Error: Failed to create issue. {error_msg}" - except Exception as e: - return f"Error: Unexpected failure during issue creation: {type(e).__name__}: {e}" - - async def list_pull_requests( - self, - state: str = "open", - repo: Optional[str] = None, - page: int = 1, - limit: Optional[int] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """ - List pull requests in a repository with pagination. - - :param state: PR state - 'open', 'closed', or 'all' (default: 'open') - :param repo: Repository in 'owner/repo' format - :param page: Page number for pagination (default: 1) - :param limit: Results per page (max 50, default: 50) - :return: Formatted PR listing with merge status and branch information - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured in UserValves settings." - - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - page_size = self._get_page_size(limit) - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Fetching pull requests...", "done": False}, - } - ) - - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/pulls"), - headers=self._headers(__user__), - params={"state": state, "page": page, "limit": page_size}, - ) - response.raise_for_status() - prs = response.json() - - total_count = response.headers.get("x-total-count", "?") - - output = f"**Pull Requests in {owner}/{repo_name}** (state: `{state}`, page: {page})\n\n" - - if not prs: - output += "_No pull requests found._" - else: - for pr in prs: - number = pr.get("number") - title = pr.get("title", "")[:80] - user = pr.get("user", {}).get("login", "unknown") - head = pr.get("head", {}).get("ref", "") - base = pr.get("base", {}).get("ref", "") - - # Merge status - mergeable = pr.get("mergeable", None) - merge_status = "" - if mergeable is True: - merge_status = " āœ… mergeable" - elif mergeable is False: - merge_status = " āš ļø conflicts" - - output += f"### PR #{number}: {title}\n" - output += f"**Branch:** `{head}` → `{base}` (by @{user}){merge_status}\n\n" - - output += f"\n**Showing {len(prs)} PRs (Total: {total_count})**" - if len(prs) == page_size: - output += ( - f"\n_More results available. Use page={page + 1} to continue._" - ) - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Done", "done": True, "hidden": True}, - } - ) - - return output.strip() - - except httpx.HTTPStatusError as e: - error_msg = self._format_error(e, "pull request listing") - return f"Error: Failed to list pull requests. {error_msg}" - except Exception as e: - return ( - f"Error: Unexpected failure during PR listing: {type(e).__name__}: {e}" - ) - - async def get_pull_request( - self, - pr_number: int, - repo: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """ - Get detailed information about a specific pull request. - - :param pr_number: Pull request number to retrieve - :param repo: Repository in 'owner/repo' format - :return: Comprehensive PR details including title, body, branches, and merge status - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured in UserValves settings." - - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": f"Fetching PR #{pr_number}...", - "done": False, - }, - } - ) - - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/pulls/{pr_number}"), - headers=self._headers(__user__), - ) - response.raise_for_status() - pr = response.json() - - # Extract PR data - title = pr.get("title", "No title") - body = pr.get("body", "") - state = pr.get("state", "unknown") - user = pr.get("user", {}).get("login", "unknown") - created_at = pr.get("created_at", "")[:10] - updated_at = pr.get("updated_at", "")[:10] - head_ref = pr.get("head", {}).get("ref", "") - base_ref = pr.get("base", {}).get("ref", "") - mergeable = pr.get("mergeable", None) - merged = pr.get("merged", False) - html_url = pr.get("html_url", "") - - # Build output - output = f"# PR #{pr_number}: {title}\n\n" - output += f"**State:** {state.upper()}{' (MERGED)' if merged else ''}\n" - output += f"**Author:** @{user}\n" - output += f"**Branches:** `{head_ref}` → `{base_ref}`\n" - output += f"**Created:** {created_at}\n" - output += f"**Updated:** {updated_at}\n" - output += f"**URL:** {html_url}\n" - - # Merge status - if mergeable is True: - output += "**Merge Status:** āœ… Mergeable\n" - elif mergeable is False: - output += "**Merge Status:** āš ļø Has conflicts\n" - else: - output += "**Merge Status:** Unknown\n" - - output += "\n" - if body: - output += f"## Description\n\n{body}\n\n" - else: - output += "_No description provided._\n\n" - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Done", "done": True, "hidden": True}, - } - ) - - return output.strip() - - except httpx.HTTPStatusError as e: - error_msg = self._format_error(e, f"PR #{pr_number}") - if e.response.status_code == 404: - return f"Error: PR #{pr_number} not found in {owner}/{repo_name}." - return f"Error: Failed to fetch pull request. {error_msg}" - except Exception as e: - return f"Error: Unexpected failure during PR fetch: {type(e).__name__}: {e}" - - async def update_pull_request( - self, - pr_number: int, - repo: Optional[str] = None, - title: Optional[str] = None, - body: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """ - Update an existing pull request with new title or body. - - :param pr_number: Pull request number to update - :param repo: Repository in 'owner/repo' format - :param title: New title (optional) - :param body: New description body (optional) - :return: Confirmation with updated PR details - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured in UserValves settings." - - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": f"Updating PR #{pr_number}...", - "done": False, - }, - } - ) - - try: - payload = {} - if title is not None: - payload["title"] = title - if body is not None: - payload["body"] = body - - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.patch( - self._api_url(f"/repos/{owner}/{repo_name}/pulls/{pr_number}"), - headers=self._headers(__user__), - json=payload, - ) - response.raise_for_status() - pr = response.json() - - updated_title = pr.get("title", "No title") - html_url = pr.get("html_url", "") - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Done", "done": True, "hidden": True}, - } - ) - - output = f"**PR #{pr_number} Updated Successfully**\n\n" - output += f"**Title:** {updated_title}\n" - output += f"**URL:** {html_url}\n" - if payload: - output += f"\nChanges applied: {', '.join(payload.keys())}\n" - - return output.strip() - - except httpx.HTTPStatusError as e: - error_msg = self._format_error(e, f"PR #{pr_number} update") - return f"Error: Failed to update pull request. {error_msg}" - except Exception as e: - return ( - f"Error: Unexpected failure during PR update: {type(e).__name__}: {e}" - ) - - async def merge_pull_request( - self, - pr_number: int, - repo: Optional[str] = None, - merge_strategy: str = "merge", - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """ - Merge a pull request using the specified strategy. - - :param pr_number: Pull request number to merge - :param repo: Repository in 'owner/repo' format - :param merge_strategy: Merge strategy - "merge", "rebase", or "squash" (default: "merge") - :return: Confirmation of merge or error details - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured in UserValves settings." - - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - # Validate merge strategy - if merge_strategy not in ("merge", "rebase", "squash"): - return "Error: Merge strategy must be 'merge', 'rebase', or 'squash'." - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": f"Merging PR #{pr_number}...", - "done": False, - }, - } - ) - - try: - async with httpx.AsyncClient( - timeout=60.0, verify=self.valves.VERIFY_SSL - ) as client: response = await client.post( self._api_url( f"/repos/{owner}/{repo_name}/pulls/{pr_number}/merge" ), headers=self._headers(__user__), - json={"merge_strategy": merge_strategy}, - ) - - if response.status_code == 405: - return "Error: PR cannot be merged. Check if it's already merged or has conflicts." - - response.raise_for_status() - result = response.json() if response.text else {} - - merged = result.get("merged", True) - commit_sha = result.get("merge_commit", {}).get("sha", "")[:8] - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Done", "done": True, "hidden": True}, - } - ) - - if merged: - output = f"**PR #{pr_number} Merged Successfully**\n\n" - output += f"**Strategy:** {merge_strategy.upper()}\n" - if commit_sha: - output += f"**Merge Commit:** `{commit_sha}`\n" - output += ( - f"\nāœ… The pull request has been merged into {owner}/{repo_name}." - ) - return output - else: - return f"**PR #{pr_number} Merge Result:**\n\n{result}\n" - - except httpx.HTTPStatusError as e: - error_msg = self._format_error(e, f"PR #{pr_number} merge") - if e.response.status_code == 405: - return f"Error: PR #{pr_number} cannot be merged. It may already be merged or have merge conflicts." - return f"Error: Failed to merge PR. {error_msg}" - except Exception as e: - return f"Error: Unexpected failure during PR merge: {type(e).__name__}: {e}" - - async def list_pull_request_comments( - self, - pr_number: int, - repo: Optional[str] = None, - page: int = 1, - limit: Optional[int] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """ - List all reviews/comments on a pull request. - - :param pr_number: PR number to get comments/reviews for - :param repo: Repository in 'owner/repo' format - :param page: Page number for pagination (default: 1) - :param limit: Results per page (max 50, default: 50) - :return: Formatted review/comment listing with author and content - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured in UserValves settings." - - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - page_size = self._get_page_size(limit) - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Fetching PR reviews...", "done": False}, - } - ) - - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - # Try reviews endpoint first (Gitea uses /pulls/{index}/reviews) - response = await client.get( - self._api_url( - f"/repos/{owner}/{repo_name}/pulls/{pr_number}/reviews" - ), - headers=self._headers(__user__), - params={"page": page, "limit": page_size}, - ) - - # Fallback to issue comments if reviews endpoint not available - if response.status_code == 404: - response = await client.get( - self._api_url( - f"/repos/{owner}/{repo_name}/issues/{pr_number}/comments" - ), - headers=self._headers(__user__), - params={"page": page, "limit": page_size}, - ) - - response.raise_for_status() - comments = response.json() - - total_count = response.headers.get("x-total-count", "?") - - output = ( - f"**Reviews/Comments on PR #{pr_number}** ({owner}/{repo_name})\n\n" - ) - - if not comments: - output += "_No reviews or comments yet._" - else: - for comment in comments: - comment_id = comment.get("id", 0) - user = comment.get("user", {}).get("login", "unknown") - created_at = comment.get("created_at", "")[:10] - body = comment.get("body", "No content") - - # Review-specific fields - state = comment.get("state", "") - if state: - output += f"**Review by @{user}** - {state} ({created_at})\n" - else: - output += f"**Comment by @{user}** ({created_at})\n" - - output += f"{body}\n\n" - output += "---\n\n" - - output += f"\n**Showing {len(comments)} items (Total: {total_count})**" - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Done", "done": True, "hidden": True}, - } - ) - - return output.strip() - - except httpx.HTTPStatusError as e: - error_msg = self._format_error(e, f"PR #{pr_number} reviews") - if e.response.status_code == 404: - return f"Error: PR #{pr_number} not found in {owner}/{repo_name}." - return f"Error: Failed to list PR reviews. {error_msg}" - except Exception as e: - return f"Error: Unexpected failure during review listing: {type(e).__name__}: {e}" - - async def create_pull_request_comment( - self, - pr_number: int, - body: str, - repo: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """ - Add a review comment to a pull request. - - :param pr_number: PR number to comment on - :param body: Comment content - :param repo: Repository in 'owner/repo' format - :return: Confirmation with comment details - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured in UserValves settings." - - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - if not body or not body.strip(): - return "Error: Comment body cannot be empty." - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Adding review comment...", "done": False}, - } - ) - - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.post( - self._api_url( - f"/repos/{owner}/{repo_name}/pulls/{pr_number}/reviews" - ), - headers=self._headers(__user__), - json={ - "body": body, - "event": "COMMENT", # Just add a comment, don't approve/reject - }, - ) - - # Fallback to issue comments if reviews endpoint not available - if response.status_code == 404: - response = await client.post( - self._api_url( - f"/repos/{owner}/{repo_name}/issues/{pr_number}/comments" - ), - headers=self._headers(__user__), - json={"body": body}, - ) - - response.raise_for_status() - comment = response.json() - - comment_id = comment.get("id", 0) - html_url = comment.get("html_url", "") - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Done", "done": True, "hidden": True}, - } - ) - - output = f"**Review Comment Added Successfully**\n\n" - output += f"**PR:** #{pr_number}\n" - output += f"**Comment ID:** {comment_id}\n" - output += f"**URL:** {html_url}\n\n" - output += f"**Content:**\n{body}\n" - - return output.strip() - - except httpx.HTTPStatusError as e: - error_msg = self._format_error(e, f"comment on PR #{pr_number}") - if e.response.status_code == 404: - return f"Error: PR #{pr_number} not found in {owner}/{repo_name}." - return f"Error: Failed to create review comment. {error_msg}" - except Exception as e: - return f"Error: Unexpected failure during comment creation: {type(e).__name__}: {e}" - - async def update_pull_request_comment( - self, - comment_id: int, - body: str, - repo: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """ - Update an existing review comment on a pull request. - - :param comment_id: Comment ID to update - :param body: New comment content - :param repo: Repository in 'owner/repo' format - :return: Confirmation with updated comment details - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured in UserValves settings." - - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - if not body or not body.strip(): - return "Error: Comment body cannot be empty." - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": "Updating review comment...", - "done": False, - }, - } - ) - - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.patch( - self._api_url( - f"/repos/{owner}/{repo_name}/pulls/comments/{comment_id}" - ), - headers=self._headers(__user__), - json={"body": body}, - ) - - # Fallback to issue comments if reviews endpoint not available - if response.status_code == 404: - response = await client.patch( - self._api_url( - f"/repos/{owner}/{repo_name}/issues/comments/{comment_id}" - ), - headers=self._headers(__user__), - json={"body": body}, - ) - - response.raise_for_status() - comment = response.json() - - updated_body = comment.get("body", "No content") - updated_at = comment.get("updated_at", "")[:10] - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Done", "done": True, "hidden": True}, - } - ) - - output = f"**Review Comment {comment_id} Updated Successfully**\n\n" - output += f"**Repository:** {owner}/{repo_name}\n" - output += f"**Updated:** {updated_at}\n\n" - output += f"**New Content:**\n{updated_body}\n" - - return output.strip() - - except httpx.HTTPStatusError as e: - error_msg = self._format_error(e, f"comment #{comment_id}") - if e.response.status_code == 404: - return f"Error: Comment #{comment_id} not found in {owner}/{repo_name}." - return f"Error: Failed to update review comment. {error_msg}" - except Exception as e: - return f"Error: Unexpected failure during comment update: {type(e).__name__}: {e}" - - async def delete_pull_request_comment( - self, - comment_id: int, - repo: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - __event_call__: Callable[[dict], Any] = None, - ) -> str: - """ - delete a review comment from a pull request. - - :param comment_id: Comment ID to delete - :param repo: Repository in 'owner/repo' format - :return: Confirmation of comment deletion - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured in UserValves settings." - - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - # Confirmation dialog - if __event_call__: - result = await __event_call__( - { - "type": "confirmation", - "data": { - "title": "Confirm Review Comment Deletion", - "message": f"Delete review comment #{comment_id} from `{owner}/{repo_name}`?", - }, - } - ) - if result is None or result is False: - return "āš ļø Comment deletion cancelled by user." - if isinstance(result, dict) and not result.get("confirmed"): - return "āš ļø Comment deletion cancelled by user." - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": f"Deleting review comment #{comment_id}...", - "done": False, - }, - } - ) - - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - # Try pull request comments endpoint first - response = await client.delete( - self._api_url( - f"/repos/{owner}/{repo_name}/pulls/comments/{comment_id}" - ), - headers=self._headers(__user__), - ) - - # Fallback to issue comments if reviews endpoint not available - if response.status_code == 404: - response = await client.delete( - self._api_url( - f"/repos/{owner}/{repo_name}/issues/comments/{comment_id}" - ), - headers=self._headers(__user__), - ) - - response.raise_for_status() - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Done", "done": True, "hidden": True}, - } - ) - - return f"**Review Comment #{comment_id} Deleted Successfully** from `{owner}/{repo_name}`" - - except httpx.HTTPStatusError as e: - error_msg = self._format_error(e, f"comment #{comment_id} deletion") - if e.response.status_code == 404: - return f"Error: Comment #{comment_id} not found in {owner}/{repo_name}." - return f"Error: Failed to delete review comment. {error_msg}" - except Exception as e: - return f"Error: Unexpected failure during comment deletion: {type(e).__name__}: {e}" - - async def create_pull_request( - self, - title: str, - head_branch: str, - base_branch: Optional[str] = None, - body: str = "", - repo: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """ - Create a new pull request. - - :param title: PR title - :param head_branch: Source branch (your changes) - :param base_branch: Target branch (defaults to repository default) - :param body: PR description - :param repo: Repository in 'owner/repo' format - :return: Confirmation with PR number and URL - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured in UserValves settings." - - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - effective_base = base_branch or self._get_branch(None, __user__) - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Creating pull request...", "done": False}, - } - ) - - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.post( - self._api_url(f"/repos/{owner}/{repo_name}/pulls"), - headers=self._headers(__user__), - json={ - "title": title, - "head": head_branch, - "base": effective_base, - "body": body, - }, - ) - response.raise_for_status() - pr = response.json() - - pr_number = pr.get("number") - pr_url = pr.get("html_url", "") - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Done", "done": True, "hidden": True}, - } - ) - - output = f"**Pull Request Created Successfully**\n\n" - output += f"**PR #{pr_number}:** {title}\n" - output += f"**Branch:** `{head_branch}` → `{effective_base}`\n" - output += f"**URL:** {pr_url}\n" - if body: - output += f"\n**Description:**\n{body[:100]}{'...' if len(body) > 100 else ''}\n" - - return output.strip() - - except httpx.HTTPStatusError as e: - error_msg = self._format_error(e, f"pull request creation") - return f"Error: Failed to create pull request. {error_msg}" - except Exception as e: - return ( - f"Error: Unexpected failure during PR creation: {type(e).__name__}: {e}" - ) - - async def delete_file( - self, - path: str, - message: str, - repo: Optional[str] = None, - branch: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - __event_call__: Callable[[dict], Any] = None, - ) -> str: - """ - Delete a file from the repository (requires confirmation). - - :param path: File path to delete - :param message: Commit message for the deletion - :param repo: Repository in 'owner/repo' format - :param branch: Branch name (defaults to repository default) - :return: Confirmation with commit details - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured in UserValves settings." - - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - effective_branch = self._get_branch(branch, __user__) - - # Confirmation dialog - if __event_call__: - result = await __event_call__( - { - "type": "confirmation", - "data": { - "title": "Confirm File Deletion", - "message": f"Delete `{path}` from `{owner}/{repo_name}` ({effective_branch})?", - }, - } - ) - if result is None or result is False: - return "āš ļø File deletion cancelled by user." - if isinstance(result, dict) and not result.get("confirmed"): - return "āš ļø File deletion cancelled by user." - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": f"Deleting {path}...", "done": False}, - } - ) - - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - # Get file SHA - get_response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), - headers=self._headers(__user__), - params={"ref": effective_branch}, - ) - - if get_response.status_code == 404: - return f"Error: File not found: `{path}`" - - get_response.raise_for_status() - file_info = get_response.json() - sha = file_info.get("sha") - - if not sha: - return "Error: Could not retrieve file SHA for deletion." - - # Delete file - response = await client.delete( - self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), - headers=self._headers(__user__), - json={ - "message": message, - "branch": effective_branch, - "sha": sha, - }, - ) - response.raise_for_status() - result = response.json() - - commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Done", "done": True, "hidden": True}, - } - ) - - output = f"**File Deleted Successfully**\n\n" - output += f"**File:** `{owner}/{repo_name}/{path}`\n" - output += f"**Branch:** `{effective_branch}`\n" - output += f"**Commit:** `{commit_sha}`\n" - output += f"**Message:** {message}\n" - - return output.strip() - - except httpx.HTTPStatusError as e: - error_msg = self._format_error(e, f"file deletion for '{path}'") - return f"Error: Failed to delete file. {error_msg}" - except Exception as e: - return f"Error: Unexpected failure during file deletion: {type(e).__name__}: {e}" + json={"do": merge_strategy}, + ) \ No newline at end of file