diff --git a/gitea/dev.py b/gitea/dev.py new file mode 100644 index 0000000..26ab69d --- /dev/null +++ b/gitea/dev.py @@ -0,0 +1,1342 @@ +""" +title: Gitea Dev +author: Jeff Smith + minimax + Claude +version: 1.1.0 +license: MIT +description: Interact with Gitea repositories - read, write, branch, and PR/Issue workflows +requirements: pydantic, httpx +""" + +from typing import Optional, Callable, Any, List, Dict +from pydantic import BaseModel, Field +import httpx +import base64 + + +class Tools: + class Valves(BaseModel): + """Shared configuration (set by admin)""" + + GITEA_URL: str = Field( + default="https://gitea.example.com", + description="Gitea URL (ingress or internal service)", + ) + DEFAULT_REPO: str = Field( + default="", + description="Default repository in owner/repo format", + ) + DEFAULT_BRANCH: str = Field(default="main", description="Default branch name") + DEFAULT_ORG: str = Field(default="", description="Default organization") + ALLOW_USER_OVERRIDES: bool = Field( + default=True, + description="Allow users to override defaults via UserValves (False = locked to admin defaults)", + ) + # Pagination defaults + DEFAULT_PAGE_SIZE: int = Field( + default=50, + description="Default page size for list operations (max 50)", + ) + + class UserValves(BaseModel): + """Per-user configuration (personal credentials and overrides)""" + + GITEA_TOKEN: str = Field( + default="", + description="Your Gitea API token (Settings > Applications > Generate Token)", + ) + USER_DEFAULT_REPO: str = Field( + default="", + description="Override default repository", + ) + USER_DEFAULT_BRANCH: str = Field( + default="", + description="Override default branch", + ) + USER_DEFAULT_ORG: str = Field( + default="", + description="Override default organization", + ) + + def __init__(self): + self.valves = self.Valves() + self.citation = False + + def _api_url(self, endpoint: str) -> str: + base = self._get_url() + return f"{base}/api/v1{endpoint}" + + def _get_url(self) -> str: + """Get effective URL.""" + return self.valves.GITEA_URL.rstrip("/") + + def _get_token(self, __user__: dict = None) -> str: + """Extract token from framework-provided user context.""" + if __user__ and "valves" in __user__: + user_valves = self.UserValves(**__user__["valves"]) + return user_valves.GITEA_TOKEN + return "" + + def _headers(self, __user__: dict = None) -> dict: + token = self._get_token(__user__) + return { + "Authorization": f"token {token}", + "Content-Type": "application/json", + } + + def _get_repo(self, repo: Optional[str], __user__: dict = None) -> str: + """Get effective repo with priority.""" + if repo: + return repo + if __user__ and "valves" in __user__: + user_valves = self.UserValves(**__user__["valves"]) + if self.valves.ALLOW_USER_OVERRIDES and user_valves.USER_DEFAULT_REPO: + return user_valves.USER_DEFAULT_REPO + return self.valves.DEFAULT_REPO + + def _get_branch(self, branch: Optional[str], __user__: dict = None) -> str: + """Get effective branch with priority.""" + if branch: + return branch + if __user__ and "valves" in __user__: + user_valves = self.UserValves(**__user__["valves"]) + if self.valves.ALLOW_USER_OVERRIDES and user_valves.USER_DEFAULT_BRANCH: + return user_valves.USER_DEFAULT_BRANCH + return self.valves.DEFAULT_BRANCH + + def _get_org(self, org: Optional[str], __user__: dict = None) -> str: + """Get effective org with priority.""" + if org: + return org + if __user__ and "valves" in __user__: + user_valves = self.UserValves(**__user__["valves"]) + if self.valves.ALLOW_USER_OVERRIDES and user_valves.USER_DEFAULT_ORG: + return user_valves.USER_DEFAULT_ORG + return self.valves.DEFAULT_ORG + + def _resolve_repo( + self, repo: Optional[str], __user__: dict = None + ) -> tuple[str, str]: + effective_repo = self._get_repo(repo, __user__) + if not effective_repo: + raise ValueError( + "No repository specified. Set DEFAULT_REPO in Valves or USER_DEFAULT_REPO in UserValves." + ) + if "/" not in effective_repo: + raise ValueError( + f"Repository must be in owner/repo format, got: {effective_repo}" + ) + return effective_repo.split("/", 1) + + def _get_page_size(self, limit: Optional[int] = None) -> int: + """Get effective page size, capped at 50 (Gitea max).""" + if limit is not None: + return min(limit, 50) + return min(self.valves.DEFAULT_PAGE_SIZE, 50) + + # REPOSITORY DISCOVERY (WITH PAGINATION) + async def list_repos( + self, + page: int = 1, + limit: Optional[int] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """List repositories accessible to the authenticated user.""" + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured. Add it in User Settings > Gitea Dev." + page_size = self._get_page_size(limit) + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Fetching repositories (page {page})...", + "done": False, + }, + } + ) + try: + async with httpx.AsyncClient(timeout=30.0, verify=False) as client: + response = await client.get( + self._api_url("/user/repos"), + headers=self._headers(__user__), + params={"page": page, "limit": page_size}, + ) + response.raise_for_status() + repos = response.json() + total_count = response.headers.get("x-total-count", "?") + output = f"Accessible Repositories (Page {page}, {page_size}/page):\n\n" + for repo in repos: + full_name = repo.get("full_name", "") + desc = repo.get("description", "")[:50] or "No description" + private = "🔒" if repo.get("private") else "🌐" + default_branch = repo.get("default_branch", "main") + output += f"- {private} {full_name} ({default_branch}): {desc}\n" + output += f"\nShowing {len(repos)} repos (Total: {total_count})" + if len(repos) == page_size: + output += ( + f"\nMore results may exist. Use page={page + 1} to continue." + ) + effective_repo = self._get_repo(None, __user__) + if effective_repo: + output += f"\nActive project: {effective_repo}" + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": "Done", + "done": True, + "hidden": True, + }, + } + ) + return output + except httpx.HTTPStatusError as e: + return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}" + except Exception as e: + return f"Error: {type(e).__name__}: {e}" + + async def list_branches( + self, + repo: Optional[str] = None, + page: int = 1, + limit: Optional[int] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """List branches in a repository.""" + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured" + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + page_size = self._get_page_size(limit) + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Fetching branches for {owner}/{repo_name} (page {page})...", + "done": False, + }, + } + ) + try: + async with httpx.AsyncClient(timeout=30.0, verify=False) as client: + response = await client.get( + self._api_url(f"/repos/{owner}/{repo_name}/branches"), + headers=self._headers(__user__), + params={"page": page, "limit": page_size}, + ) + response.raise_for_status() + branches = response.json() + total_count = response.headers.get("x-total-count", "?") + output = f"Branches in {owner}/{repo_name} (Page {page}):\n\n" + for branch in branches: + name = branch.get("name", "") + protected = "🛡️" if branch.get("protected") else "" + commit_sha = branch.get("commit", {}).get("id", "")[:8] + output += f"- {name} {protected} [{commit_sha}]\n" + output += f"\nShowing {len(branches)} branches (Total: {total_count})" + if len(branches) == page_size: + output += ( + f"\nMore results may exist. Use page={page + 1} to continue." + ) + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": "Done", + "done": True, + "hidden": True, + }, + } + ) + return output + except httpx.HTTPStatusError as e: + return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}" + except Exception as e: + return f"Error: {type(e).__name__}: {e}" + + async def list_files( + self, + path: str = "", + repo: Optional[str] = None, + branch: Optional[str] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """List files and directories in a repository path.""" + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured" + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + effective_branch = self._get_branch(branch, __user__) + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Listing {path or 'root'}...", + "done": False, + }, + } + ) + try: + async with httpx.AsyncClient(timeout=30.0, verify=False) as client: + response = await client.get( + self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), + headers=self._headers(__user__), + params={"ref": effective_branch}, + ) + response.raise_for_status() + contents = response.json() + if isinstance(contents, dict): + return f"'{path}' is a file, not a directory. Use get_file() to read it." + output = f"Contents of {owner}/{repo_name}/{path or '.'} ({effective_branch}):\n\n" + dirs = [c for c in contents if c.get("type") == "dir"] + files = [c for c in contents if c.get("type") == "file"] + for item in sorted(dirs, key=lambda x: x.get("name", "")): + output += f"📁 {item.get('name', '')}/\n" + for item in sorted(files, key=lambda x: x.get("name", "")): + size = item.get("size", 0) + size_str = f"{size}B" if size < 1024 else f"{size//1024}KB" + output += f"📄 {item.get('name', '')} ({size_str})\n" + output += f"\nTotal: {len(dirs)} directories, {len(files)} files" + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": "Done", + "done": True, + "hidden": True, + }, + } + ) + return output + except httpx.HTTPStatusError as e: + if e.response.status_code == 404: + return f"Path not found: {path}" + return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}" + except Exception as e: + return f"Error: {type(e).__name__}: {e}" + + async def get_file( + self, + path: str, + repo: Optional[str] = None, + branch: Optional[str] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """Get contents of a file from the repository.""" + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured" + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + effective_branch = self._get_branch(branch, __user__) + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": f"Reading {path}...", "done": False}, + } + ) + try: + async with httpx.AsyncClient(timeout=30.0, verify=False) as client: + response = await client.get( + self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), + headers=self._headers(__user__), + params={"ref": effective_branch}, + ) + response.raise_for_status() + file_info = response.json() + if isinstance(file_info, list): + return f"'{path}' is a directory, not a file. Use list_files() to browse." + if file_info.get("type") != "file": + return f"'{path}' is not a file (type: {file_info.get('type')})" + content_b64 = file_info.get("content", "") + try: + content = base64.b64decode(content_b64).decode("utf-8") + except Exception: + return "Error: Could not decode file content (may be binary)" + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": "Done", + "done": True, + "hidden": True, + }, + } + ) + return f"File: {owner}/{repo_name}/{path} ({effective_branch})\nSHA: {file_info.get('sha', 'unknown')[:8]}\n\n```\n{content}\n```" + except httpx.HTTPStatusError as e: + if e.response.status_code == 404: + return f"File not found: {path}" + return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}" + except Exception as e: + return f"Error: {type(e).__name__}: {e}" + + async def create_file( + self, + path: str, + content: str, + message: str, + repo: Optional[str] = None, + branch: Optional[str] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """Create a new file in the repository.""" + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured" + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + effective_branch = self._get_branch(branch, __user__) + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": f"Creating {path}...", "done": False}, + } + ) + try: + content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii") + async with httpx.AsyncClient(timeout=30.0, verify=False) as client: + response = await client.post( + self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), + headers=self._headers(__user__), + json={ + "content": content_b64, + "message": message, + "branch": effective_branch, + }, + ) + response.raise_for_status() + result = response.json() + commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": "Done", + "done": True, + "hidden": True, + }, + } + ) + return f"Created {path} on {effective_branch}\nCommit: {commit_sha}\nMessage: {message}" + except httpx.HTTPStatusError as e: + if e.response.status_code == 422: + return f"File already exists: {path}. Use update_file() instead." + return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}" + except Exception as e: + return f"Error: {type(e).__name__}: {e}" + + async def update_file( + self, + path: str, + content: str, + message: str, + repo: Optional[str] = None, + branch: Optional[str] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """Update an existing file in the repository.""" + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured" + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + effective_branch = self._get_branch(branch, __user__) + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": f"Updating {path}...", "done": False}, + } + ) + try: + async with httpx.AsyncClient(timeout=30.0, verify=False) as client: + get_response = await client.get( + self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), + headers=self._headers(__user__), + params={"ref": effective_branch}, + ) + if get_response.status_code == 404: + return f"File not found: {path}. Use create_file() instead." + get_response.raise_for_status() + file_info = get_response.json() + sha = file_info.get("sha") + if not sha: + return "Error: Could not get file SHA for update" + content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii") + async with httpx.AsyncClient(timeout=30.0, verify=False) as client: + response = await client.put( + self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), + headers=self._headers(__user__), + json={ + "content": content_b64, + "message": message, + "branch": effective_branch, + "sha": sha, + }, + ) + response.raise_for_status() + result = response.json() + commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": "Done", + "done": True, + "hidden": True, + }, + } + ) + return f"Updated {path} on {effective_branch}\nCommit: {commit_sha}\nMessage: {message}" + except httpx.HTTPStatusError as e: + return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}" + except Exception as e: + return f"Error: {type(e).__name__}: {e}" + + async def delete_file( + self, + path: str, + message: str, + repo: Optional[str] = None, + branch: Optional[str] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + __event_call__: Callable[[dict], Any] = None, + ) -> str: + """Delete a file from the repository.""" + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured" + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + effective_branch = self._get_branch(branch, __user__) + # Confirm deletion + if __event_call__: + result = await __event_call__( + { + "type": "confirmation", + "data": { + "title": "Delete File?", + "message": f"Delete {path} from {owner}/{repo_name} ({effective_branch})?", + }, + } + ) + if result is None or result is False: + return "delete_file CANCELLED" + if isinstance(result, dict) and not result.get("confirmed"): + return "delete_file CANCELLED" + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": f"Deleting {path}...", "done": False}, + } + ) + try: + async with httpx.AsyncClient(timeout=30.0, verify=False) as client: + get_response = await client.get( + self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), + headers=self._headers(__user__), + params={"ref": effective_branch}, + ) + if get_response.status_code == 404: + return f"File not found: {path}" + get_response.raise_for_status() + file_info = get_response.json() + sha = file_info.get("sha") + if not sha: + return "Error: Could not get file SHA for delete" + async with httpx.AsyncClient(timeout=30.0, verify=False) as client: + response = await client.delete( + self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), + headers=self._headers(__user__), + json={ + "message": message, + "branch": effective_branch, + "sha": sha, + }, + ) + response.raise_for_status() + result = response.json() + commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": "Done", + "done": True, + "hidden": True, + }, + } + ) + return f"Deleted {path} on {effective_branch}\nCommit: {commit_sha}\nMessage: {message}" + except httpx.HTTPStatusError as e: + return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}" + except Exception as e: + return f"Error: {type(e).__name__}: {e}" + + # BRANCHING & PR (WITH PAGINATION) + async def create_branch( + self, + branch_name: str, + from_branch: Optional[str] = None, + repo: Optional[str] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """Create a new branch.""" + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured" + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + effective_from = from_branch or self._get_branch(None, __user__) + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Creating branch {branch_name}...", + "done": False, + }, + } + ) + try: + async with httpx.AsyncClient(timeout=30.0, verify=False) as client: + response = await client.post( + self._api_url(f"/repos/{owner}/{repo_name}/branches"), + headers=self._headers(__user__), + json={ + "new_branch_name": branch_name, + "old_branch_name": effective_from, + }, + ) + response.raise_for_status() + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": "Done", + "done": True, + "hidden": True, + }, + } + ) + return f"Created branch '{branch_name}' from '{effective_from}' in {owner}/{repo_name}" + except httpx.HTTPStatusError as e: + if e.response.status_code == 409: + return f"Branch '{branch_name}' already exists" + return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}" + except Exception as e: + return f"Error: {type(e).__name__}: {e}" + + async def create_pull_request( + self, + title: str, + head_branch: str, + base_branch: Optional[str] = None, + body: str = "", + repo: Optional[str] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """Create a pull request.""" + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured" + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + effective_base = base_branch or self._get_branch(None, __user__) + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Creating pull request...", "done": False}, + } + ) + try: + async with httpx.AsyncClient(timeout=30.0, verify=False) as client: + response = await client.post( + self._api_url(f"/repos/{owner}/{repo_name}/pulls"), + headers=self._headers(__user__), + json={ + "title": title, + "head": head_branch, + "base": effective_base, + "body": body, + }, + ) + response.raise_for_status() + pr = response.json() + pr_number = pr.get("number") + pr_url = pr.get("html_url", "") + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": "Done", + "done": True, + "hidden": True, + }, + } + ) + return f"Created PR #{pr_number}: {title}\n{head_branch} → {effective_base}\nURL: {pr_url}" + except httpx.HTTPStatusError as e: + return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}" + except Exception as e: + return f"Error: {type(e).__name__}: {e}" + + async def list_pull_requests( + self, + state: str = "open", + repo: Optional[str] = None, + page: int = 1, + limit: Optional[int] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """List pull requests.""" + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured" + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + page_size = self._get_page_size(limit) + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Fetching pull requests...", "done": False}, + } + ) + try: + async with httpx.AsyncClient(timeout=30.0, verify=False) as client: + response = await client.get( + self._api_url(f"/repos/{owner}/{repo_name}/pulls"), + headers=self._headers(__user__), + params={"state": state, "page": page, "limit": page_size}, + ) + response.raise_for_status() + prs = response.json() + total_count = response.headers.get("x-total-count", "?") + output = ( + f"Pull Requests in {owner}/{repo_name} ({state}, page {page}):\n\n" + ) + if not prs: + output += "No pull requests found." + else: + for pr in prs: + number = pr.get("number") + title = pr.get("title", "")[:60] + user = pr.get("user", {}).get("login", "unknown") + head = pr.get("head", {}).get("ref", "") + base = pr.get("base", {}).get("ref", "") + mergeable = pr.get("mergeable", None) + merge_status = "" + if mergeable is True: + merge_status = " ✅" + elif mergeable is False: + merge_status = " ⚠️ conflicts" + output += f"- #{number}: {title}\n {head} → {base} (by {user}){merge_status}\n" + output += f"\nShowing {len(prs)} PRs (Total: {total_count})" + if len(prs) == page_size: + output += f"\nMore results may exist. Use page={page + 1} to continue." + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": "Done", + "done": True, + "hidden": True, + }, + } + ) + return output + except httpx.HTTPStatusError as e: + return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}" + except Exception as e: + return f"Error: {type(e).__name__}: {e}" + + async def get_pull_request( + self, + pr_number: int, + repo: Optional[str] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """Get details of a specific pull request.""" + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured" + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Fetching PR #{pr_number}...", + "done": False, + }, + } + ) + try: + async with httpx.AsyncClient(timeout=30.0, verify=False) as client: + response = await client.get( + self._api_url(f"/repos/{owner}/{repo_name}/pulls/{pr_number}"), + headers=self._headers(__user__), + ) + response.raise_for_status() + pr = response.json() + output = f"Pull Request #{pr_number} in {owner}/{repo_name}\n\n" + output += f"**Title:** {pr.get('title', '')}\n" + output += f"**State:** {pr.get('state', 'unknown')}\n" + output += f"**Author:** {pr.get('user', {}).get('login', 'unknown')}\n" + output += f"**Branch:** {pr.get('head', {}).get('ref', '')} → {pr.get('base', {}).get('ref', '')}\n" + output += f"**Mergeable:** {pr.get('mergeable', 'unknown')}\n" + output += f"**URL:** {pr.get('html_url', '')}\n" + body = pr.get("body", "") + if body: + output += f"\n**Description:**\n{body[:500]}" + if len(body) > 500: + output += "...(truncated)" + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": "Done", + "done": True, + "hidden": True, + }, + } + ) + return output + except httpx.HTTPStatusError as e: + if e.response.status_code == 404: + return f"PR #{pr_number} not found" + return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}" + except Exception as e: + return f"Error: {type(e).__name__}: {e}" + + # ISSUES (WITH PAGINATION) + async def list_issues( + self, + state: str = "open", + repo: Optional[str] = None, + page: int = 1, + limit: Optional[int] = None, + labels: Optional[str] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """List issues in a repository.""" + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured" + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + page_size = self._get_page_size(limit) + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Fetching issues...", "done": False}, + } + ) + try: + params = { + "state": state, + "type": "issues", + "page": page, + "limit": page_size, + } + if labels: + params["labels"] = labels + async with httpx.AsyncClient(timeout=30.0, verify=False) as client: + response = await client.get( + self._api_url(f"/repos/{owner}/{repo_name}/issues"), + headers=self._headers(__user__), + params=params, + ) + response.raise_for_status() + issues = response.json() + total_count = response.headers.get("x-total-count", "?") + output = f"Issues in {owner}/{repo_name} ({state}, page {page}):\n\n" + if not issues: + output += "No issues found." + else: + for issue in issues: + number = issue.get("number") + title = issue.get("title", "")[:60] + user = issue.get("user", {}).get("login", "unknown") + issue_labels = ", ".join( + [label.get("name", "") for label in issue.get("labels", [])] + ) + labels_str = f" [{issue_labels}]" if issue_labels else "" + assignee = issue.get("assignee", {}) + assignee_str = ( + f" → {assignee.get('login', '')}" if assignee else "" + ) + output += f"- #{number}: {title}{labels_str} (by {user}){assignee_str}\n" + output += f"\nShowing {len(issues)} issues (Total: {total_count})" + if len(issues) == page_size: + output += f"\nMore results may exist. Use page={page + 1} to continue." + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": "Done", + "done": True, + "hidden": True, + }, + } + ) + return output + except httpx.HTTPStatusError as e: + return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}" + except Exception as e: + return f"Error: {type(e).__name__}: {e}" + + async def create_issue( + self, + title: str, + body: str = "", + repo: Optional[str] = None, + labels: Optional[List[str]] = None, + assignee: Optional[str] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """Create a new issue.""" + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured" + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Creating issue...", "done": False}, + } + ) + try: + payload = {"title": title, "body": body} + if labels: + payload["labels"] = labels + if assignee: + payload["assignee"] = assignee + async with httpx.AsyncClient(timeout=30.0, verify=False) as client: + response = await client.post( + self._api_url(f"/repos/{owner}/{repo_name}/issues"), + headers=self._headers(__user__), + json=payload, + ) + response.raise_for_status() + issue = response.json() + issue_number = issue.get("number") + issue_url = issue.get("html_url", "") + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": "Done", + "done": True, + "hidden": True, + }, + } + ) + return f"Created issue #{issue_number}: {title}\nURL: {issue_url}" + except httpx.HTTPStatusError as e: + return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}" + except Exception as e: + return f"Error: {type(e).__name__}: {e}" + + async def get_issue( + self, + issue_number: int, + repo: Optional[str] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """Get details of a specific issue.""" + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured" + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Fetching issue #{issue_number}...", + "done": False, + }, + } + ) + try: + async with httpx.AsyncClient(timeout=30.0, verify=False) as client: + response = await client.get( + self._api_url(f"/repos/{owner}/{repo_name}/issues/{issue_number}"), + headers=self._headers(__user__), + ) + response.raise_for_status() + issue = response.json() + output = f"Issue #{issue_number} in {owner}/{repo_name}\n\n" + output += f"**Title:** {issue.get('title', '')}\n" + output += f"**State:** {issue.get('state', 'unknown')}\n" + output += ( + f"**Author:** {issue.get('user', {}).get('login', 'unknown')}\n" + ) + labels = [label.get("name", "") for label in issue.get("labels", [])] + if labels: + output += f"**Labels:** {', '.join(labels)}\n" + assignee = issue.get("assignee") + if assignee: + output += f"**Assignee:** {assignee.get('login', '')}\n" + output += f"**URL:** {issue.get('html_url', '')}\n" + body = issue.get("body", "") + if body: + output += f"\n**Description:**\n{body[:1000]}" + if len(body) > 1000: + output += "...(truncated)" + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": "Done", + "done": True, + "hidden": True, + }, + } + ) + return output + except httpx.HTTPStatusError as e: + if e.response.status_code == 404: + return f"Issue #{issue_number} not found" + return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}" + except Exception as e: + return f"Error: {type(e).__name__}: {e}" + + async def update_issue( + self, + issue_number: int, + repo: Optional[str] = None, + title: Optional[str] = None, + body: Optional[str] = None, + state: Optional[str] = None, + assignee: Optional[str] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """Update an existing issue.""" + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured" + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Updating issue #{issue_number}...", + "done": False, + }, + } + ) + # Build payload with only provided fields + payload = {} + if title is not None: + payload["title"] = title + if body is not None: + payload["body"] = body + if state is not None: + if state not in ("open", "closed"): + return f"Error: Invalid state '{state}'. Use 'open' or 'closed'." + payload["state"] = state + if assignee is not None: + payload["assignee"] = assignee + if not payload: + return "Error: No fields to update. Provide at least one of: title, body, state, assignee" + try: + async with httpx.AsyncClient(timeout=30.0, verify=False) as client: + response = await client.patch( + self._api_url(f"/repos/{owner}/{repo_name}/issues/{issue_number}"), + headers=self._headers(__user__), + json=payload, + ) + response.raise_for_status() + issue = response.json() + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": "Done", + "done": True, + "hidden": True, + }, + } + ) + return f"Updated issue #{issue_number}: {issue.get('title', '')}\nState: {issue.get('state', 'unknown')}\nURL: {issue.get('html_url', '')}" + except httpx.HTTPStatusError as e: + if e.response.status_code == 404: + return f"Issue #{issue_number} not found" + return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}" + except Exception as e: + return f"Error: {type(e).__name__}: {e}" + + # PROJECT CONTEXT + async def get_project_context( + self, + repo: Optional[str] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """Get project context: README, structure, recent commits.""" + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured" + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + branch = self._get_branch(None, __user__) + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Loading project context for {owner}/{repo_name}...", + "done": False, + }, + } + ) + output = f"# Project: {owner}/{repo_name}\n\n" + try: + async with httpx.AsyncClient(timeout=30.0, verify=False) as client: + repo_response = await client.get( + self._api_url(f"/repos/{owner}/{repo_name}"), + headers=self._headers(__user__), + ) + if repo_response.status_code == 200: + repo_info = repo_response.json() + output += f"**Description:** {repo_info.get('description', 'No description')}\n" + output += f"**Default Branch:** {repo_info.get('default_branch', 'main')}\n" + output += ( + f"**Language:** {repo_info.get('language', 'Unknown')}\n\n" + ) + root_response = await client.get( + self._api_url(f"/repos/{owner}/{repo_name}/contents/"), + headers=self._headers(__user__), + params={"ref": branch}, + ) + if root_response.status_code == 200: + contents = root_response.json() + output += "## Root Structure\n\n" + for item in sorted( + contents, + key=lambda x: (x.get("type") != "dir", x.get("name", "")), + ): + icon = "📁" if item.get("type") == "dir" else "📄" + output += f"{icon} {item.get('name', '')}\n" + output += "\n" + for readme_name in [ + "README.md", + "readme.md", + "README", + "README.txt", + ]: + readme_response = await client.get( + self._api_url( + f"/repos/{owner}/{repo_name}/contents/{readme_name}" + ), + headers=self._headers(__user__), + params={"ref": branch}, + ) + if readme_response.status_code == 200: + readme_info = readme_response.json() + content_b64 = readme_info.get("content", "") + try: + readme_content = base64.b64decode(content_b64).decode( + "utf-8" + ) + if len(readme_content) > 2000: + readme_content = ( + readme_content[:2000] + "\n\n... (truncated)" + ) + output += f"## README\n\n{readme_content}\n\n" + except: + pass + break + commits_response = await client.get( + self._api_url(f"/repos/{owner}/{repo_name}/commits"), + headers=self._headers(__user__), + params={"sha": branch, "limit": 5}, + ) + if commits_response.status_code == 200: + commits = commits_response.json() + output += "## Recent Commits\n\n" + for commit in commits[:5]: + sha = commit.get("sha", "")[:8] + msg = ( + commit.get("commit", {}) + .get("message", "") + .split("\n")[0][:60] + ) + author = ( + commit.get("commit", {}) + .get("author", {}) + .get("name", "unknown") + ) + output += f"- `{sha}` {msg} ({author})\n" + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": "Done", + "done": True, + "hidden": True, + }, + } + ) + return output + except Exception as e: + return f"Error loading project context: {type(e).__name__}: {e}" + + # COMMITS (NEW) + async def list_commits( + self, + repo: Optional[str] = None, + branch: Optional[str] = None, + page: int = 1, + limit: Optional[int] = None, + path: Optional[str] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """List commits in a repository.""" + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured" + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + effective_branch = self._get_branch(branch, __user__) + page_size = self._get_page_size(limit) + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": f"Fetching commits...", "done": False}, + } + ) + try: + params = { + "sha": effective_branch, + "page": page, + "limit": page_size, + } + if path: + params["path"] = path + async with httpx.AsyncClient(timeout=30.0, verify=False) as client: + response = await client.get( + self._api_url(f"/repos/{owner}/{repo_name}/commits"), + headers=self._headers(__user__), + params=params, + ) + response.raise_for_status() + commits = response.json() + total_count = response.headers.get("x-total-count", "?") + output = f"Commits in {owner}/{repo_name} ({effective_branch}, page {page}):\n" + if path: + output += f"Filtered to: {path}\n" + output += "\n" + for commit in commits: + sha = commit.get("sha", "")[:8] + commit_data = commit.get("commit", {}) + msg = commit_data.get("message", "").split("\n")[0][:60] + author = commit_data.get("author", {}).get("name", "unknown") + date = commit_data.get("author", {}).get("date", "")[:10] + output += f"- `{sha}` {msg}\n by {author} on {date}\n" + output += f"\nShowing {len(commits)} commits (Total: {total_count})" + if len(commits) == page_size: + output += ( + f"\nMore results may exist. Use page={page + 1} to continue." + ) + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": "Done", + "done": True, + "hidden": True, + }, + } + ) + return output + except httpx.HTTPStatusError as e: + return f"HTTP Error {e.response.status_code}: {e.response.text[:200]}" + except Exception as e: + return f"Error: {type(e).__name__}: {e}" \ No newline at end of file