diff --git a/gitea/dev_broken.py b/gitea/dev_broken.py deleted file mode 100644 index b15fe90..0000000 --- a/gitea/dev_broken.py +++ /dev/null @@ -1,1723 +0,0 @@ -""" -title: Gitea Dev -author: Jeff Smith + minimax + Claude -version: 1.2.0 -license: MIT -description: Interact with Gitea repositories - read, write, branch, and PR/Issue workflows -requirements: pydantic, httpx -changelog: - 1.2.0: - - Added configurable SSL verification (VERIFY_SSL valve) - - Fixed nested AsyncClient creation in update_file() and delete_file() - - Improved error handling with JSON message parsing - - Added _format_error() helper method - - Added get_repo() method for repository details - - Added merge_pull_request() method - - Added update_pull_request() method - - Added get_commit() method for commit details - - Fixed all httpx client reuse issues -""" - -from typing import Optional, Callable, Any, List, Dict -from pydantic import BaseModel, Field -import httpx -import base64 - - -class Tools: - class Valves(BaseModel): - """Shared configuration (set by admin)""" - - GITEA_URL: str = Field( - default="https://gitea.example.com", - description="Gitea URL (ingress or internal service)", - ) - DEFAULT_REPO: str = Field( - default="", - description="Default repository in owner/repo format", - ) - DEFAULT_BRANCH: str = Field(default="main", description="Default branch name") - DEFAULT_ORG: str = Field(default="", description="Default organization") - ALLOW_USER_OVERRIDES: bool = Field( - default=True, - description="Allow users to override defaults via UserValves (False = locked to admin defaults)", - ) - VERIFY_SSL: bool = Field( - default=True, - description="Verify SSL certificates (disable only for self-signed certs)", - ) - # Pagination defaults - DEFAULT_PAGE_SIZE: int = Field( - default=50, - description="Default page size for list operations (max 50)", - ) - - class UserValves(BaseModel): - """Per-user configuration (personal credentials and overrides)""" - - GITEA_TOKEN: str = Field( - default="", - description="Your Gitea API token (Settings > Applications > Generate Token)", - ) - USER_DEFAULT_REPO: str = Field( - default="", - description="Override default repository", - ) - USER_DEFAULT_BRANCH: str = Field( - default="", - description="Override default branch", - ) - USER_DEFAULT_ORG: str = Field( - default="", - description="Override default organization", - ) - - def __init__(self): - self.valves = self.Valves() - self.citation = False - - def _api_url(self, endpoint: str) -> str: - base = self._get_url() - return f"{base}/api/v1{endpoint}" - - def _get_url(self) -> str: - """Get effective URL.""" - return self.valves.GITEA_URL.rstrip("/") - - def _get_token(self, __user__: dict = None) -> str: - """Extract token from framework-provided user context.""" - if __user__ and "valves" in __user__: - user_valves = self.UserValves(**__user__["valves"]) - return user_valves.GITEA_TOKEN - return "" - - def _headers(self, __user__: dict = None) -> dict: - token = self._get_token(__user__) - return { - "Authorization": f"token {token}", - "Content-Type": "application/json", - } - - def _format_error(self, e: httpx.HTTPStatusError) -> str: - """Format HTTP error with JSON message if available.""" - try: - error_json = e.response.json() - error_msg = error_json.get("message", e.response.text[:200]) - except: - error_msg = e.response.text[:200] - return f"HTTP Error {e.response.status_code}: {error_msg}" - - def _get_repo(self, repo: Optional[str], __user__: dict = None) -> str: - """Get effective repo with priority.""" - if repo: - return repo - if __user__ and "valves" in __user__: - user_valves = self.UserValves(**__user__["valves"]) - if self.valves.ALLOW_USER_OVERRIDES and user_valves.USER_DEFAULT_REPO: - return user_valves.USER_DEFAULT_REPO - return self.valves.DEFAULT_REPO - - def _get_branch(self, branch: Optional[str], __user__: dict = None) -> str: - """Get effective branch with priority.""" - if branch: - return branch - if __user__ and "valves" in __user__: - user_valves = self.UserValves(**__user__["valves"]) - if self.valves.ALLOW_USER_OVERRIDES and user_valves.USER_DEFAULT_BRANCH: - return user_valves.USER_DEFAULT_BRANCH - return self.valves.DEFAULT_BRANCH - - def _get_org(self, org: Optional[str], __user__: dict = None) -> str: - """Get effective org with priority.""" - if org: - return org - if __user__ and "valves" in __user__: - user_valves = self.UserValves(**__user__["valves"]) - if self.valves.ALLOW_USER_OVERRIDES and user_valves.USER_DEFAULT_ORG: - return user_valves.USER_DEFAULT_ORG - return self.valves.DEFAULT_ORG - - def _resolve_repo( - self, repo: Optional[str], __user__: dict = None - ) -> tuple[str, str]: - effective_repo = self._get_repo(repo, __user__) - if not effective_repo: - raise ValueError( - "No repository specified. Set DEFAULT_REPO in Valves or USER_DEFAULT_REPO in UserValves." - ) - if "/" not in effective_repo: - raise ValueError( - f"Repository must be in owner/repo format, got: {effective_repo}" - ) - return effective_repo.split("/", 1) - - def _get_page_size(self, limit: Optional[int] = None) -> int: - """Get effective page size, capped at 50 (Gitea max).""" - if limit is not None: - return min(limit, 50) - return min(self.valves.DEFAULT_PAGE_SIZE, 50) - - # REPOSITORY DISCOVERY (WITH PAGINATION) - async def list_repos( - self, - page: int = 1, - limit: Optional[int] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """List repositories accessible to the authenticated user.""" - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured. Add it in User Settings > Gitea Dev." - page_size = self._get_page_size(limit) - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": f"Fetching repositories (page {page})...", - "done": False, - }, - } - ) - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.get( - self._api_url("/user/repos"), - headers=self._headers(__user__), - params={"page": page, "limit": page_size}, - ) - response.raise_for_status() - repos = response.json() - total_count = response.headers.get("x-total-count", "?") - output = f"Accessible Repositories (Page {page}, {page_size}/page):\n\n" - for repo in repos: - full_name = repo.get("full_name", "") - desc = repo.get("description", "")[:50] or "No description" - private = "🔒" if repo.get("private") else "🌐" - default_branch = repo.get("default_branch", "main") - output += f"- {private} {full_name} ({default_branch}): {desc}\n" - output += f"\nShowing {len(repos)} repos (Total: {total_count})" - if len(repos) == page_size: - output += ( - f"\nMore results may exist. Use page={page + 1} to continue." - ) - effective_repo = self._get_repo(None, __user__) - if effective_repo: - output += f"\nActive project: {effective_repo}" - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": "Done", - "done": True, - "hidden": True, - }, - } - ) - return output - except httpx.HTTPStatusError as e: - return self._format_error(e) - except Exception as e: - return f"Error: {type(e).__name__}: {e}" - - async def get_repo( - self, - repo: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """Get detailed repository information. - - :param repo: Repository in owner/repo format - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured" - - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": f"Fetching repository {owner}/{repo_name}...", - "done": False, - }, - } - ) - - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}"), - headers=self._headers(__user__), - ) - response.raise_for_status() - repo_info = response.json() - - output = f"# Repository: {owner}/{repo_name}\n\n" - output += f"**Full Name:** {repo_info.get('full_name', 'N/A')}\n" - output += ( - f"**Description:** {repo_info.get('description', 'No description')}\n" - ) - output += f"**Private:** {'Yes' if repo_info.get('private') else 'No'}\n" - output += f"**Fork:** {'Yes' if repo_info.get('fork') else 'No'}\n" - output += f"**Stars:** {repo_info.get('stars_count', 0)}\n" - output += f"**Forks:** {repo_info.get('forks_count', 0)}\n" - output += f"**Watchers:** {repo_info.get('watchers_count', 0)}\n" - output += f"**Open Issues:** {repo_info.get('open_issues_count', 0)}\n" - output += f"**Size:** {repo_info.get('size', 0)} KB\n" - output += f"**Language:** {repo_info.get('language', 'N/A')}\n" - output += f"**Default Branch:** {repo_info.get('default_branch', 'main')}\n" - output += f"**Created:** {repo_info.get('created_at', 'N/A')[:10]}\n" - output += f"**Updated:** {repo_info.get('updated_at', 'N/A')[:10]}\n" - output += f"**HTML URL:** {repo_info.get('html_url', 'N/A')}\n" - output += f"**Clone URL:** {repo_info.get('clone_url', 'N/A')}\n" - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Done", "done": True, "hidden": True}, - } - ) - return output - except httpx.HTTPStatusError as e: - if e.response.status_code == 404: - return f"Repository '{owner}/{repo_name}' not found" - return self._format_error(e) - except Exception as e: - return f"Error: {type(e).__name__}: {e}" - - async def list_branches( - self, - repo: Optional[str] = None, - page: int = 1, - limit: Optional[int] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """List branches in a repository.""" - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured" - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - page_size = self._get_page_size(limit) - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": f"Fetching branches for {owner}/{repo_name} (page {page})...", - "done": False, - }, - } - ) - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/branches"), - headers=self._headers(__user__), - params={"page": page, "limit": page_size}, - ) - response.raise_for_status() - branches = response.json() - total_count = response.headers.get("x-total-count", "?") - output = f"Branches in {owner}/{repo_name} (Page {page}):\n\n" - for branch in branches: - name = branch.get("name", "") - protected = "🛡️" if branch.get("protected") else "" - commit_sha = branch.get("commit", {}).get("id", "")[:8] - output += f"- {name} {protected} [{commit_sha}]\n" - output += f"\nShowing {len(branches)} branches (Total: {total_count})" - if len(branches) == page_size: - output += ( - f"\nMore results may exist. Use page={page + 1} to continue." - ) - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": "Done", - "done": True, - "hidden": True, - }, - } - ) - return output - except httpx.HTTPStatusError as e: - return self._format_error(e) - except Exception as e: - return f"Error: {type(e).__name__}: {e}" - - async def list_files( - self, - path: str = "", - repo: Optional[str] = None, - branch: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """List files and directories in a repository path.""" - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured" - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - effective_branch = self._get_branch(branch, __user__) - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": f"Listing {path or 'root'}...", - "done": False, - }, - } - ) - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), - headers=self._headers(__user__), - params={"ref": effective_branch}, - ) - response.raise_for_status() - contents = response.json() - if isinstance(contents, dict): - return f"'{path}' is a file, not a directory. Use get_file() to read it." - output = f"Contents of {owner}/{repo_name}/{path or '.'} ({effective_branch}):\n\n" - dirs = [c for c in contents if c.get("type") == "dir"] - files = [c for c in contents if c.get("type") == "file"] - for item in sorted(dirs, key=lambda x: x.get("name", "")): - output += f"📁 {item.get('name', '')}/\n" - for item in sorted(files, key=lambda x: x.get("name", "")): - size = item.get("size", 0) - size_str = f"{size}B" if size < 1024 else f"{size//1024}KB" - output += f"📄 {item.get('name', '')} ({size_str})\n" - output += f"\nTotal: {len(dirs)} directories, {len(files)} files" - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": "Done", - "done": True, - "hidden": True, - }, - } - ) - return output - except httpx.HTTPStatusError as e: - if e.response.status_code == 404: - return f"Path not found: {path}" - return self._format_error(e) - except Exception as e: - return f"Error: {type(e).__name__}: {e}" - - async def get_file( - self, - path: str, - repo: Optional[str] = None, - branch: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """Get contents of a file from the repository.""" - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured" - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - effective_branch = self._get_branch(branch, __user__) - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": f"Reading {path}...", "done": False}, - } - ) - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), - headers=self._headers(__user__), - params={"ref": effective_branch}, - ) - response.raise_for_status() - file_info = response.json() - if isinstance(file_info, list): - return f"'{path}' is a directory, not a file. Use list_files() to browse." - if file_info.get("type") != "file": - return f"'{path}' is not a file (type: {file_info.get('type')})" - content_b64 = file_info.get("content", "") - try: - content = base64.b64decode(content_b64).decode("utf-8") - except Exception: - return "Error: Could not decode file content (may be binary)" - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": "Done", - "done": True, - "hidden": True, - }, - } - ) - return f"File: {owner}/{repo_name}/{path} ({effective_branch})\nSHA: {file_info.get('sha', 'unknown')[:8]}\n\n```\n{content}\n```" - except httpx.HTTPStatusError as e: - if e.response.status_code == 404: - return f"File not found: {path}" - return self._format_error(e) - except Exception as e: - return f"Error: {type(e).__name__}: {e}" - - async def create_file( - self, - path: str, - content: str, - message: str, - repo: Optional[str] = None, - branch: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """Create a new file in the repository.""" - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured" - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - effective_branch = self._get_branch(branch, __user__) - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": f"Creating {path}...", "done": False}, - } - ) - try: - content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii") - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.post( - self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), - headers=self._headers(__user__), - json={ - "content": content_b64, - "message": message, - "branch": effective_branch, - }, - ) - response.raise_for_status() - result = response.json() - commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": "Done", - "done": True, - "hidden": True, - }, - } - ) - return f"Created {path} on {effective_branch}\nCommit: {commit_sha}\nMessage: {message}" - except httpx.HTTPStatusError as e: - if e.response.status_code == 422: - return f"File already exists: {path}. Use update_file() instead." - return self._format_error(e) - except Exception as e: - return f"Error: {type(e).__name__}: {e}" - - async def update_file( - self, - path: str, - content: str, - message: str, - repo: Optional[str] = None, - branch: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """Update an existing file in the repository.""" - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured" - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - effective_branch = self._get_branch(branch, __user__) - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": f"Updating {path}...", "done": False}, - } - ) - try: - # FIXED: Reuse single client instance - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - get_response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), - headers=self._headers(__user__), - params={"ref": effective_branch}, - ) - if get_response.status_code == 404: - return f"File not found: {path}. Use create_file() instead." - get_response.raise_for_status() - file_info = get_response.json() - sha = file_info.get("sha") - if not sha: - return "Error: Could not get file SHA for update" - content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii") - - # FIXED: Using same client instance - response = await client.put( - self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), - headers=self._headers(__user__), - json={ - "content": content_b64, - "message": message, - "branch": effective_branch, - "sha": sha, - }, - ) - response.raise_for_status() - result = response.json() - commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": "Done", - "done": True, - "hidden": True, - }, - } - ) - return f"Updated {path} on {effective_branch}\nCommit: {commit_sha}\nMessage: {message}" - except httpx.HTTPStatusError as e: - return self._format_error(e) - except Exception as e: - return f"Error: {type(e).__name__}: {e}" - - async def delete_file( - self, - path: str, - message: str, - repo: Optional[str] = None, - branch: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - __event_call__: Callable[[dict], Any] = None, - ) -> str: - """Delete a file from the repository.""" - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured" - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - effective_branch = self._get_branch(branch, __user__) - # Confirm deletion - if __event_call__: - result = await __event_call__( - { - "type": "confirmation", - "data": { - "title": "Delete File?", - "message": f"Delete {path} from {owner}/{repo_name} ({effective_branch})?", - }, - } - ) - if result is None or result is False: - return "delete_file CANCELLED" - if isinstance(result, dict) and not result.get("confirmed"): - return "delete_file CANCELLED" - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": f"Deleting {path}...", "done": False}, - } - ) - try: - # FIXED: Reuse single client instance - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - get_response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), - headers=self._headers(__user__), - params={"ref": effective_branch}, - ) - if get_response.status_code == 404: - return f"File not found: {path}" - get_response.raise_for_status() - file_info = get_response.json() - sha = file_info.get("sha") - if not sha: - return "Error: Could not get file SHA for delete" - - # FIXED: Using same client instance - response = await client.delete( - self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), - headers=self._headers(__user__), - json={ - "message": message, - "branch": effective_branch, - "sha": sha, - }, - ) - response.raise_for_status() - result = response.json() - commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": "Done", - "done": True, - "hidden": True, - }, - } - ) - return f"Deleted {path} on {effective_branch}\nCommit: {commit_sha}\nMessage: {message}" - except httpx.HTTPStatusError as e: - return self._format_error(e) - except Exception as e: - return f"Error: {type(e).__name__}: {e}" - - # BRANCHING & PR (WITH PAGINATION) - async def create_branch( - self, - branch_name: str, - from_branch: Optional[str] = None, - repo: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """Create a new branch.""" - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured" - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - effective_from = from_branch or self._get_branch(None, __user__) - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": f"Creating branch {branch_name}...", - "done": False, - }, - } - ) - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.post( - self._api_url(f"/repos/{owner}/{repo_name}/branches"), - headers=self._headers(__user__), - json={ - "new_branch_name": branch_name, - "old_branch_name": effective_from, - }, - ) - response.raise_for_status() - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": "Done", - "done": True, - "hidden": True, - }, - } - ) - return f"Created branch '{branch_name}' from '{effective_from}' in {owner}/{repo_name}" - except httpx.HTTPStatusError as e: - if e.response.status_code == 409: - return f"Branch '{branch_name}' already exists" - return self._format_error(e) - except Exception as e: - return f"Error: {type(e).__name__}: {e}" - - async def create_pull_request( - self, - title: str, - head_branch: str, - base_branch: Optional[str] = None, - body: str = "", - repo: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """Create a pull request.""" - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured" - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - effective_base = base_branch or self._get_branch(None, __user__) - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Creating pull request...", "done": False}, - } - ) - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.post( - self._api_url(f"/repos/{owner}/{repo_name}/pulls"), - headers=self._headers(__user__), - json={ - "title": title, - "head": head_branch, - "base": effective_base, - "body": body, - }, - ) - response.raise_for_status() - pr = response.json() - pr_number = pr.get("number") - pr_url = pr.get("html_url", "") - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": "Done", - "done": True, - "hidden": True, - }, - } - ) - return f"Created PR #{pr_number}: {title}\n{head_branch} → {effective_base}\nURL: {pr_url}" - except httpx.HTTPStatusError as e: - return self._format_error(e) - except Exception as e: - return f"Error: {type(e).__name__}: {e}" - - async def update_pull_request( - self, - pr_number: int, - repo: Optional[str] = None, - title: Optional[str] = None, - body: Optional[str] = None, - base_branch: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """Update an existing pull request. - - :param pr_number: Pull request number - :param repo: Repository in owner/repo format - :param title: New title - :param body: New body/description - :param base_branch: New base branch - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured" - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": f"Updating PR #{pr_number}...", - "done": False, - }, - } - ) - - payload = {} - if title is not None: - payload["title"] = title - if body is not None: - payload["body"] = body - if base_branch is not None: - payload["base"] = base_branch - - if not payload: - return "Error: No fields to update. Provide at least one of: title, body, base_branch" - - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.patch( - self._api_url(f"/repos/{owner}/{repo_name}/pulls/{pr_number}"), - headers=self._headers(__user__), - json=payload, - ) - response.raise_for_status() - pr = response.json() - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Done", "done": True, "hidden": True}, - } - ) - return f"Updated PR #{pr_number}: {pr.get('title', '')}\nURL: {pr.get('html_url', '')}" - except httpx.HTTPStatusError as e: - if e.response.status_code == 404: - return f"PR #{pr_number} not found" - return self._format_error(e) - except Exception as e: - return f"Error: {type(e).__name__}: {e}" - - async def merge_pull_request( - self, - pr_number: int, - repo: Optional[str] = None, - merge_method: str = "merge", - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - __event_call__: Callable[[dict], Any] = None, - ) -> str: - """Merge a pull request. - - :param pr_number: Pull request number - :param repo: Repository in owner/repo format - :param merge_method: merge, rebase, or squash - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured" - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - valid_methods = ["merge", "rebase", "squash"] - if merge_method not in valid_methods: - return f"Error: Invalid merge_method '{merge_method}'. Use: {', '.join(valid_methods)}" - - if __event_call__: - result = await __event_call__( - { - "type": "confirmation", - "data": { - "title": "Merge Pull Request?", - "message": f"Merge PR #{pr_number} in {owner}/{repo_name} using {merge_method} method?", - }, - } - ) - if result is None or result is False: - return "merge_pull_request CANCELLED" - if isinstance(result, dict) and not result.get("confirmed"): - return "merge_pull_request CANCELLED" - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": f"Merging PR #{pr_number}...", - "done": False, - }, - } - ) - - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.post( - self._api_url(f"/repos/{owner}/{repo_name}/pulls/{pr_number}/merge"), - headers=self._headers(__user__), - json={"Do": merge_method}, - ) - response.raise_for_status() - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Done", "done": True, "hidden": True}, - } - ) - return f"Successfully merged PR #{pr_number} using {merge_method} method" - except httpx.HTTPStatusError as e: - if e.response.status_code == 405: - return f"PR #{pr_number} cannot be merged (may have conflicts or be already merged)" - if e.response.status_code == 404: - return f"PR #{pr_number} not found" - return self._format_error(e) - except Exception as e: - return f"Error: {type(e).__name__}: {e}" - - async def list_pull_requests( - self, - state: str = "open", - repo: Optional[str] = None, - page: int = 1, - limit: Optional[int] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """List pull requests.""" - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured" - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - page_size = self._get_page_size(limit) - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Fetching pull requests...", "done": False}, - } - ) - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/pulls"), - headers=self._headers(__user__), - params={"state": state, "page": page, "limit": page_size}, - ) - response.raise_for_status() - prs = response.json() - total_count = response.headers.get("x-total-count", "?") - output = ( - f"Pull Requests in {owner}/{repo_name} ({state}, page {page}):\n\n" - ) - if not prs: - output += "No pull requests found." - else: - for pr in prs: - number = pr.get("number") - title = pr.get("title", "")[:60] - user = pr.get("user", {}).get("login", "unknown") - head = pr.get("head", {}).get("ref", "") - base = pr.get("base", {}).get("ref", "") - mergeable = pr.get("mergeable", None) - merge_status = "" - if mergeable is True: - merge_status = " ✅" - elif mergeable is False: - merge_status = " ⚠️ conflicts" - output += f"- #{number}: {title}\n {head} → {base} (by {user}){merge_status}\n" - output += f"\nShowing {len(prs)} PRs (Total: {total_count})" - if len(prs) == page_size: - output += f"\nMore results may exist. Use page={page + 1} to continue." - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": "Done", - "done": True, - "hidden": True, - }, - } - ) - return output - except httpx.HTTPStatusError as e: - return self._format_error(e) - except Exception as e: - return f"Error: {type(e).__name__}: {e}" - - async def get_pull_request( - self, - pr_number: int, - repo: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """Get details of a specific pull request.""" - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured" - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": f"Fetching PR #{pr_number}...", - "done": False, - }, - } - ) - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/pulls/{pr_number}"), - headers=self._headers(__user__), - ) - response.raise_for_status() - pr = response.json() - output = f"Pull Request #{pr_number} in {owner}/{repo_name}\n\n" - output += f"**Title:** {pr.get('title', '')}\n" - output += f"**State:** {pr.get('state', 'unknown')}\n" - output += f"**Author:** {pr.get('user', {}).get('login', 'unknown')}\n" - output += f"**Branch:** {pr.get('head', {}).get('ref', '')} → {pr.get('base', {}).get('ref', '')}\n" - output += f"**Mergeable:** {pr.get('mergeable', 'unknown')}\n" - output += f"**URL:** {pr.get('html_url', '')}\n" - body = pr.get("body", "") - if body: - output += f"\n**Description:**\n{body[:500]}" - if len(body) > 500: - output += "...(truncated)" - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": "Done", - "done": True, - "hidden": True, - }, - } - ) - return output - except httpx.HTTPStatusError as e: - if e.response.status_code == 404: - return f"PR #{pr_number} not found" - return self._format_error(e) - except Exception as e: - return f"Error: {type(e).__name__}: {e}" - - # ISSUES (WITH PAGINATION) - async def list_issues( - self, - state: str = "open", - repo: Optional[str] = None, - page: int = 1, - limit: Optional[int] = None, - labels: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """List issues in a repository.""" - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured" - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - page_size = self._get_page_size(limit) - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Fetching issues...", "done": False}, - } - ) - try: - params = { - "state": state, - "type": "issues", - "page": page, - "limit": page_size, - } - if labels: - params["labels"] = labels - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/issues"), - headers=self._headers(__user__), - params=params, - ) - response.raise_for_status() - issues = response.json() - total_count = response.headers.get("x-total-count", "?") - output = f"Issues in {owner}/{repo_name} ({state}, page {page}):\n\n" - if not issues: - output += "No issues found." - else: - for issue in issues: - number = issue.get("number") - title = issue.get("title", "")[:60] - user = issue.get("user", {}).get("login", "unknown") - issue_labels = ", ".join( - [label.get("name", "") for label in issue.get("labels", [])] - ) - labels_str = f" [{issue_labels}]" if issue_labels else "" - assignee = issue.get("assignee", {}) - assignee_str = ( - f" → {assignee.get('login', '')}" if assignee else "" - ) - output += f"- #{number}: {title}{labels_str} (by {user}){assignee_str}\n" - output += f"\nShowing {len(issues)} issues (Total: {total_count})" - if len(issues) == page_size: - output += f"\nMore results may exist. Use page={page + 1} to continue." - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": "Done", - "done": True, - "hidden": True, - }, - } - ) - return output - except httpx.HTTPStatusError as e: - return self._format_error(e) - except Exception as e: - return f"Error: {type(e).__name__}: {e}" - - async def create_issue( - self, - title: str, - body: str = "", - repo: Optional[str] = None, - labels: Optional[List[str]] = None, - assignee: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """Create a new issue.""" - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured" - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Creating issue...", "done": False}, - } - ) - try: - payload = {"title": title, "body": body} - if labels: - payload["labels"] = labels - if assignee: - payload["assignee"] = assignee - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.post( - self._api_url(f"/repos/{owner}/{repo_name}/issues"), - headers=self._headers(__user__), - json=payload, - ) - response.raise_for_status() - issue = response.json() - issue_number = issue.get("number") - issue_url = issue.get("html_url", "") - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": "Done", - "done": True, - "hidden": True, - }, - } - ) - return f"Created issue #{issue_number}: {title}\nURL: {issue_url}" - except httpx.HTTPStatusError as e: - return self._format_error(e) - except Exception as e: - return f"Error: {type(e).__name__}: {e}" - - async def get_issue( - self, - issue_number: int, - repo: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """Get details of a specific issue.""" - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured" - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": f"Fetching issue #{issue_number}...", - "done": False, - }, - } - ) - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/issues/{issue_number}"), - headers=self._headers(__user__), - ) - response.raise_for_status() - issue = response.json() - output = f"Issue #{issue_number} in {owner}/{repo_name}\n\n" - output += f"**Title:** {issue.get('title', '')}\n" - output += f"**State:** {issue.get('state', 'unknown')}\n" - output += ( - f"**Author:** {issue.get('user', {}).get('login', 'unknown')}\n" - ) - labels = [label.get("name", "") for label in issue.get("labels", [])] - if labels: - output += f"**Labels:** {', '.join(labels)}\n" - assignee = issue.get("assignee") - if assignee: - output += f"**Assignee:** {assignee.get('login', '')}\n" - output += f"**URL:** {issue.get('html_url', '')}\n" - body = issue.get("body", "") - if body: - output += f"\n**Description:**\n{body[:1000]}" - if len(body) > 1000: - output += "...(truncated)" - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": "Done", - "done": True, - "hidden": True, - }, - } - ) - return output - except httpx.HTTPStatusError as e: - if e.response.status_code == 404: - return f"Issue #{issue_number} not found" - return self._format_error(e) - except Exception as e: - return f"Error: {type(e).__name__}: {e}" - - async def update_issue( - self, - issue_number: int, - repo: Optional[str] = None, - title: Optional[str] = None, - body: Optional[str] = None, - state: Optional[str] = None, - assignee: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """Update an existing issue.""" - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured" - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": f"Updating issue #{issue_number}...", - "done": False, - }, - } - ) - # Build payload with only provided fields - payload = {} - if title is not None: - payload["title"] = title - if body is not None: - payload["body"] = body - if state is not None: - if state not in ("open", "closed"): - return f"Error: Invalid state '{state}'. Use 'open' or 'closed'." - payload["state"] = state - if assignee is not None: - payload["assignee"] = assignee - if not payload: - return "Error: No fields to update. Provide at least one of: title, body, state, assignee" - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.patch( - self._api_url(f"/repos/{owner}/{repo_name}/issues/{issue_number}"), - headers=self._headers(__user__), - json=payload, - ) - response.raise_for_status() - issue = response.json() - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": "Done", - "done": True, - "hidden": True, - }, - } - ) - return f"Updated issue #{issue_number}: {issue.get('title', '')}\nState: {issue.get('state', 'unknown')}\nURL: {issue.get('html_url', '')}" - except httpx.HTTPStatusError as e: - if e.response.status_code == 404: - return f"Issue #{issue_number} not found" - return self._format_error(e) - except Exception as e: - return f"Error: {type(e).__name__}: {e}" - - # PROJECT CONTEXT - async def get_project_context( - self, - repo: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """Get project context: README, structure, recent commits.""" - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured" - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - branch = self._get_branch(None, __user__) - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": f"Loading project context for {owner}/{repo_name}...", - "done": False, - }, - } - ) - output = f"# Project: {owner}/{repo_name}\n\n" - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - repo_response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}"), - headers=self._headers(__user__), - ) - if repo_response.status_code == 200: - repo_info = repo_response.json() - output += f"**Description:** {repo_info.get('description', 'No description')}\n" - output += f"**Default Branch:** {repo_info.get('default_branch', 'main')}\n" - output += ( - f"**Language:** {repo_info.get('language', 'Unknown')}\n\n" - ) - root_response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/contents/"), - headers=self._headers(__user__), - params={"ref": branch}, - ) - if root_response.status_code == 200: - contents = root_response.json() - output += "## Root Structure\n\n" - for item in sorted( - contents, - key=lambda x: (x.get("type") != "dir", x.get("name", "")), - ): - icon = "📁" if item.get("type") == "dir" else "📄" - output += f"{icon} {item.get('name', '')}\n" - output += "\n" - for readme_name in [ - "README.md", - "readme.md", - "README", - "README.txt", - ]: - readme_response = await client.get( - self._api_url( - f"/repos/{owner}/{repo_name}/contents/{readme_name}" - ), - headers=self._headers(__user__), - params={"ref": branch}, - ) - if readme_response.status_code == 200: - readme_info = readme_response.json() - content_b64 = readme_info.get("content", "") - try: - readme_content = base64.b64decode(content_b64).decode( - "utf-8" - ) - if len(readme_content) > 2000: - readme_content = ( - readme_content[:2000] + "\n\n... (truncated)" - ) - output += f"## README\n\n{readme_content}\n\n" - except: - pass - break - commits_response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/commits"), - headers=self._headers(__user__), - params={"sha": branch, "limit": 5}, - ) - if commits_response.status_code == 200: - commits = commits_response.json() - output += "## Recent Commits\n\n" - for commit in commits[:5]: - sha = commit.get("sha", "")[:8] - msg = ( - commit.get("commit", {}) - .get("message", "") - .split("\n")[0][:60] - ) - author = ( - commit.get("commit", {}) - .get("author", {}) - .get("name", "unknown") - ) - output += f"- `{sha}` {msg} ({author})\n" - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": "Done", - "done": True, - "hidden": True, - }, - } - ) - return output - except Exception as e: - return f"Error loading project context: {type(e).__name__}: {e}" - - # COMMITS - async def list_commits( - self, - repo: Optional[str] = None, - branch: Optional[str] = None, - page: int = 1, - limit: Optional[int] = None, - path: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """List commits in a repository.""" - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured" - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - effective_branch = self._get_branch(branch, __user__) - page_size = self._get_page_size(limit) - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": f"Fetching commits...", "done": False}, - } - ) - try: - params = { - "sha": effective_branch, - "page": page, - "limit": page_size, - } - if path: - params["path"] = path - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/commits"), - headers=self._headers(__user__), - params=params, - ) - response.raise_for_status() - commits = response.json() - total_count = response.headers.get("x-total-count", "?") - output = f"Commits in {owner}/{repo_name} ({effective_branch}, page {page}):\n" - if path: - output += f"Filtered to: {path}\n" - output += "\n" - for commit in commits: - sha = commit.get("sha", "")[:8] - commit_data = commit.get("commit", {}) - msg = commit_data.get("message", "").split("\n")[0][:60] - author = commit_data.get("author", {}).get("name", "unknown") - date = commit_data.get("author", {}).get("date", "")[:10] - output += f"- `{sha}` {msg}\n by {author} on {date}\n" - output += f"\nShowing {len(commits)} commits (Total: {total_count})" - if len(commits) == page_size: - output += ( - f"\nMore results may exist. Use page={page + 1} to continue." - ) - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": "Done", - "done": True, - "hidden": True, - }, - } - ) - return output - except httpx.HTTPStatusError as e: - return self._format_error(e) - except Exception as e: - return f"Error: {type(e).__name__}: {e}" - - async def get_commit( - self, - commit_sha: str, - repo: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """Get detailed information about a specific commit. - - :param commit_sha: Commit SHA (full or short) - :param repo: Repository in owner/repo format - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured" - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": f"Fetching commit {commit_sha[:8]}...", - "done": False, - }, - } - ) - - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/git/commits/{commit_sha}"), - headers=self._headers(__user__), - ) - response.raise_for_status() - commit = response.json() - - output = f"# Commit: {commit.get('sha', '')[:8]}\n\n" - output += f"**Full SHA:** {commit.get('sha', 'N/A')}\n" - output += f"**Message:** {commit.get('message', 'No message')}\n" - - author = commit.get('author', {}) - output += f"**Author:** {author.get('name', 'unknown')} <{author.get('email', '')}>\n" - output += f"**Date:** {author.get('date', 'N/A')}\n" - - committer = commit.get('committer', {}) - if committer.get('name') != author.get('name'): - output += f"**Committer:** {committer.get('name', 'unknown')} <{committer.get('email', '')}>\n" - - # Get file changes - response2 = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/commits/{commit_sha}"), - headers=self._headers(__user__), - ) - if response2.status_code == 200: - commit_detail = response2.json() - files = commit_detail.get('files', []) - if files: - output += f"\n**Files Changed:** {len(files)}\n" - for file in files[:20]: # Limit to first 20 files - status = file.get('status', '') - filename = file.get('filename', '') - additions = file.get('additions', 0) - deletions = file.get('deletions', 0) - output += f"- {status}: {filename} (+{additions}/-{deletions})\n" - if len(files) > 20: - output += f"... and {len(files) - 20} more files\n" - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Done", "done": True, "hidden": True}, - } - ) - return output - except httpx.HTTPStatusError as e: - if e.response.status_code == 404: - return f"Commit '{commit_sha}' not found" - return self._format_error(e) - except Exception as e: - return f"Error: {type(e).__name__}: {e}" \ No newline at end of file