""" title: Gitea Coder - Workflow Role with Automatic Branch Management author: Jeff Smith + minimax + Claude version: 1.0.0 license: MIT description: High-level workflow role for LLM-based code generation with automatic branch management and quality gates requirements: pydantic, httpx changelog: 1.0.0: - Initial implementation of gitea_coder role - Branch name = chat_id (KISS principle) - Stateless design - no caching complexity - Branch creation with scope gating (prevents main pushes) - Generates detailed commit messages with ticket references - Creates PRs from branches - Reads ticket requirements from issues (by number or URL) - Updates tickets with status - Reads PR feedback - Unified file operations workflow - Diff-based updates with apply_diff() - Size delta gating in commit_changes() for quality control - Complete CRUD operations: create_file, replace_file, delete_file, rename_file """ from typing import Optional, Callable, Any, Dict, List, Tuple from pydantic import BaseModel, Field import re import base64 import httpx class GiteaHelpers: """ Helper methods for Gitea API interactions. Designed to be mixed into Tools class via composition. """ def __init__(self, tools_instance): """Initialize with reference to parent Tools instance for valve access""" self.tools = tools_instance @property def valves(self): """Access valves from parent Tools instance""" return self.tools.valves def api_url(self, endpoint: str) -> str: """Construct full API URL for Gitea endpoint""" base = self.get_url() return f"{base}/api/v1{endpoint}" def get_url(self) -> str: """Get effective Gitea URL with trailing slash handling""" return self.valves.GITEA_URL.rstrip("/") def get_token(self, __user__: dict = None) -> str: """Extract Gitea token from user context""" if __user__ and "valves" in __user__: user_valves = __user__.get("valves") if user_valves: token = getattr(user_valves, "GITEA_TOKEN", "") if token: return token return "" def headers(self, __user__: dict = None) -> dict: """Generate authentication headers with token""" token = self.get_token(__user__) headers = {"Content-Type": "application/json"} if token: headers["Authorization"] = f"token {token}" return headers def format_error(self, e: httpx.HTTPStatusError, context: str = "") -> str: """Format HTTP error with detailed context""" try: error_json = e.response.json() error_msg = error_json.get("message", e.response.text[:200]) except Exception: error_msg = e.response.text[:200] context_str = f" ({context})" if context else "" return f"HTTP Error {e.response.status_code}{context_str}: {error_msg}" def get_repo(self, repo: Optional[str], __user__: dict = None) -> str: """Get effective repository with priority resolution""" if repo: return repo if __user__ and "valves" in __user__: user_valves = __user__.get("valves") if user_valves and self.valves.ALLOW_USER_OVERRIDES: user_repo = getattr(user_valves, "USER_DEFAULT_REPO", "") if user_repo: return user_repo return self.valves.DEFAULT_REPO def get_branch(self, __user__: dict = None, __metadata__: dict = None) -> str: """ Get effective branch name. CRITICAL: Branch name IS the chat_id. Nothing else. Priority: chat_id from metadata > user override > default """ # First check metadata for chat_id (becomes branch name) if __metadata__: chat_id = __metadata__.get("chat_id") if chat_id: return chat_id # Then check user valves override if __user__ and "valves" in __user__: user_valves = __user__.get("valves") if user_valves and self.valves.ALLOW_USER_OVERRIDES: user_branch = getattr(user_valves, "USER_DEFAULT_BRANCH", "") if user_branch: return user_branch # Finally fall back to default return self.valves.DEFAULT_BRANCH def resolve_repo(self, repo: Optional[str], __user__: dict = None) -> Tuple[str, str]: """Resolve repository string into owner and repo name""" effective_repo = self.get_repo(repo, __user__) if not effective_repo: raise ValueError( "No repository specified. Set DEFAULT_REPO in Valves or USER_DEFAULT_REPO in UserValves." ) if "/" not in effective_repo: raise ValueError( f"Repository must be in 'owner/repo' format, got: {effective_repo}" ) return tuple(effective_repo.split("/", 1)) def parse_issue_url(self, url: str) -> Tuple[Optional[str], Optional[str], Optional[int]]: """ Parse a Gitea issue URL into components. Format: https://///issues/ Returns: Tuple of (owner, repo, issue_number) or (None, None, None) if invalid """ # Match: https://domain/owner/repo/issues/123 match = re.match(r'https?://[^/]+/([^/]+)/([^/]+)/issues/(\d+)', url) if match: owner = match.group(1) repo = match.group(2) issue_number = int(match.group(3)) return (owner, repo, issue_number) return (None, None, None) def parse_issue_refs(self, text: str) -> List[str]: """Extract issue references from text (e.g., #42, issue #42)""" refs = re.findall(r"#(\d+)", text) issue_refs = [f"#{ref}" for ref in refs] # Also check for "issue N" pattern issue_n_refs = re.findall(r"issue\s*#?(\d+)", text, re.IGNORECASE) for ref in issue_n_refs: issue_ref = f"#{ref}" if issue_ref not in issue_refs: issue_refs.append(issue_ref) return issue_refs def apply_unified_diff(self, current_content: str, diff_content: str) -> Optional[str]: """ Apply a unified diff to content. Args: current_content: Current file content diff_content: Unified diff patch Returns: New content after applying diff, or None if failed """ try: diff_lines = diff_content.splitlines(keepends=True) # Parse hunks from unified diff hunks = [] current_hunk = None in_hunk = False for line in diff_lines: if line.startswith("---") or line.startswith("+++"): continue # Skip file headers elif line.startswith("@@"): if current_hunk: hunks.append(current_hunk) # Parse hunk header: @@ -old_line,old_count +new_line,new_count @@ match = re.search( r"@@\s+-(\d+)(?:,(\d+))?\s+\+(\d+)(?:,(\d+))?\s+@@", line ) if match: old_start = int(match.group(1)) new_start = int(match.group(3)) current_hunk = { "old_start": old_start, "new_start": new_start, "lines": [], } in_hunk = True continue elif in_hunk and line[0:1] in ('+', '-', ' '): if current_hunk: current_hunk["lines"].append(line) elif in_hunk: if current_hunk: hunks.append(current_hunk) current_hunk = None in_hunk = False if current_hunk: hunks.append(current_hunk) if not hunks: return current_content # Split content into lines old_lines = current_content.splitlines(keepends=False) new_lines = list(old_lines) # Apply each hunk in reverse order to maintain correct indices for hunk in sorted(hunks, key=lambda h: h["old_start"], reverse=True): old_start = hunk["old_start"] - 1 # Convert to 0-indexed # Collect lines to remove and add lines_to_remove = [] lines_to_add = [] for line in hunk["lines"]: if line.startswith("+"): lines_to_add.append(line[1:].rstrip("\n")) elif line.startswith("-"): lines_to_remove.append(line[1:].rstrip("\n")) # Remove old lines if they match if lines_to_remove: end_idx = old_start + len(lines_to_remove) if end_idx <= len(new_lines): actual_lines = new_lines[old_start:end_idx] if actual_lines == lines_to_remove: del new_lines[old_start:end_idx] # Insert new lines at old_start for line in reversed(lines_to_add): new_lines.insert(old_start, line) # Reconstruct content with line endings new_content = "\n".join(new_lines) if new_content and not new_content.endswith("\n"): new_content += "\n" return new_content except Exception as e: print(f"Diff application warning: {e}") return None def generate_commit_message( self, change_type: str, scope: str, description: str, issue_refs: Optional[List[str]] = None, body: Optional[str] = None, ) -> str: """ Generate a conventional commit message. Format: (): """ # Validate and normalize change type valid_types = [ "feat", "fix", "docs", "style", "refactor", "test", "chore", "perf", "ci", "build", "revert" ] if change_type.lower() not in valid_types: change_type = "chore" # Build the subject line scope_str = f"({scope})" if scope else "" message = f"{change_type.lower()}{scope_str}: {description}" # Add issue references to footer if issue_refs: refs_str = ", ".join(issue_refs) footer = f"Refs: {refs_str}" if body: body = f"{body}\n\n{footer}" else: message = f"{message}\n\n{footer}" if body: message = f"{message}\n\n{body}" return message def generate_diff_commit_message(self, path: str, diff_content: str) -> str: """Generate a commit message from diff content""" file_name = path.split("/")[-1] # Detect change type from diff change_type = "chore" diff_lines = diff_content.splitlines() if any(line.startswith("+def ") or line.startswith("+class ") for line in diff_lines): change_type = "feat" elif any(line.startswith("+ return ") for line in diff_lines): change_type = "fix" elif "test" in path.lower() or "spec" in path.lower(): change_type = "test" elif ".md" in path.lower() or "readme" in path.lower(): change_type = "docs" elif any(line.startswith("-") for line in diff_lines): change_type = "refactor" # Extract description from added lines added_lines = [ line[1:].strip() for line in diff_lines if line.startswith("+") and not line.startswith("+++") ] description = "update" if added_lines: for line in added_lines: if line and not line.startswith(("import ", "from ")): match = re.match(r"(def|class|const|var|let|interface|type)\s+(\w+)", line) if match: kind, name = match.groups() if kind == "def": description = f"add {name}() function" break elif kind == "class": description = f"add {name} class" break elif not line.startswith((" ", "\t")): description = line[:50].rstrip(":") if len(line) > 50: description += "..." break return f"{change_type}({file_name}): {description}" class Tools: """ Gitea Coder Role - High-level workflow automation for code generation tasks. ARCHITECTURE: - Stateless design - no caching complexity - Branch name = chat_id (PERIOD. Nothing else.) - All operations are self-contained - LLM focuses on code, not infrastructure Workflow: reads ticket โ†’ creates branch (chat_id) โ†’ commits โ†’ updates ticket โ†’ creates PR Key Features: - Branch name IS chat_id (KISS principle) - Auto-generates conventional commit messages - Quality gates for file changes (size delta validation) - Diff-based updates to prevent accidental file replacements - Complete CRUD operations (create, replace, delete, rename) - Ticket updates for status tracking - PR reading for feedback """ class Valves(BaseModel): """System-wide configuration for Gitea Coder integration""" GITEA_URL: str = Field( default="https://gitea.example.com", description="Gitea server URL", ) DEFAULT_REPO: str = Field( default="", description="Default repository in owner/repo format", ) DEFAULT_BRANCH: str = Field( default="main", description="Default branch name for base operations", ) ALLOW_USER_OVERRIDES: bool = Field( default=True, description="Allow users to override defaults via UserValves", ) VERIFY_SSL: bool = Field( default=True, description="Verify SSL certificates", ) MAX_SIZE_DELTA_PERCENT: float = Field( default=50.0, description="Maximum allowed file size change percentage (quality gate)", ge=1.0, le=500.0, ) PROTECTED_BRANCHES: List[str] = Field( default=["main", "master", "develop", "dev", "release", "hotfix"], description="Branches that cannot be committed to directly", ) class UserValves(BaseModel): """Per-user configuration""" GITEA_TOKEN: str = Field( default="", description="Your Gitea API token", ) USER_DEFAULT_REPO: str = Field( default="", description="Override default repository", ) USER_DEFAULT_BRANCH: str = Field( default="", description="Override default branch", ) def __init__(self): """Initialize tools""" self.valves = self.Valves() self.user_valves = self.UserValves() self.citation = True # Initialize helper functions self._gitea = GiteaHelpers(self) async def workflow_summary( self, __user__: dict = None, __metadata__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> dict: """Get a summary of available coder workflows and commands""" branch = self._gitea.get_branch(__user__, __metadata__) message = f"""# ๐Ÿš€ Gitea Coder Workflow Guide ## Quick Start 1. **Read the ticket:** `read_ticket(issue_number)` or `read_ticket_by_url(url)` 2. **Create branch:** `create_branch()` - Branch name is chat_id: `{branch}` 3. **Make changes:** `apply_diff()` or `commit_changes()` 4. **Update ticket:** `update_ticket(issue_number, comment)` 5. **Create PR:** `create_pull_request(title)` 6. **Read PR feedback:** `read_pull_request(pr_number)` ## Available Commands ### ๐Ÿ“‹ Ticket Operations - `read_ticket(issue_number)` - Get issue details by number - `read_ticket_by_url(url)` - Get issue details by URL - `update_ticket(issue_number, comment)` - Post status update to ticket ### ๐ŸŒฟ Branch Management - `create_branch()` - Create branch with name = chat_id - `get_branch_status()` - See current working branch - `list_branches()` - List all branches in repository ### ๐Ÿ“ File Operations - `apply_diff(path, diff, message)` - Apply unified diff patch - `commit_changes(path, content, message)` - Commit with size delta gate - `replace_file(path, content, message)` - Replace entire file - `create_file(path, content, message)` - Create new file - `delete_file(path, message)` - Delete a file - `rename_file(old_path, new_path, message)` - Rename a file - `get_file(path)` - Read file content - `list_files(path)` - List directory contents ### ๐Ÿ“ฆ Pull Request Operations - `create_pull_request(title, description)` - Create PR from current branch - `read_pull_request(pr_number)` - Get PR details and review feedback ## Current Session **Working Branch:** `{branch}` (same as chat_id) ## Tips - Branch name = chat_id automatically (no confusion!) - Use diff-based updates for incremental changes - Update tickets with status, not user - Read PR feedback to iterate """ return { "status": "success", "message": message } async def read_ticket( self, issue_number: int, repo: Optional[str] = None, __user__: dict = None, __metadata__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> dict: """Read and parse a ticket/issue by number""" retVal = {"status": "failure", "message": ""} token = self._gitea.get_token(__user__) if not token: retVal["message"] = "GITEA_TOKEN not configured in UserValves settings." return retVal try: owner, repo_name = self._gitea.resolve_repo(repo, __user__) except ValueError as e: retVal["message"] = str(e) return retVal if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Fetching issue #{issue_number}...", "done": False, }, } ) try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: response = await client.get( self._gitea.api_url(f"/repos/{owner}/{repo_name}/issues/{issue_number}"), headers=self._gitea.headers(__user__), ) response.raise_for_status() issue = response.json() title = issue.get("title", "No title") body = issue.get("body", "") state = issue.get("state", "unknown") user = issue.get("user", {}).get("login", "unknown") labels = [label.get("name", "") for label in issue.get("labels", [])] created_at = issue.get("created_at", "")[:10] html_url = issue.get("html_url", "") # Parse body for structured info testing_criteria = [] technical_notes = [] is_testing_required = "test" in body.lower() or "testing" in body.lower() is_docs_required = "documentation" in body.lower() or "docs" in body.lower() if is_testing_required: testing_section = re.search( r"(?:testing|criteria|test).*?:(.*?)(?:\n\n|$)", body, re.IGNORECASE | re.DOTALL, ) if testing_section: testing_criteria = [ line.strip().lstrip("-*โ€ข") for line in testing_section.group(1).split("\n") if line.strip() ] tech_section = re.search( r"(?:technical|tech).*?:(.*?)(?:\n\n|$)", body, re.IGNORECASE | re.DOTALL, ) if tech_section: technical_notes = [ line.strip().lstrip("-*โ€ข") for line in tech_section.group(1).split("\n") if line.strip() ] issue_refs = self._gitea.parse_issue_refs(body) if not any(ref == f"#{issue_number}" for ref in issue_refs): issue_refs.insert(0, f"#{issue_number}") if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) # Build message message = f"# ๐Ÿ“‹ Ticket #{issue_number}: {title}\n\n" message += f"**State:** {state.upper()} | **Author:** @{user} | **Created:** {created_at}\n" message += f"**Labels:** {', '.join(labels) if labels else 'None'}\n" message += f"**URL:** {html_url}\n\n" if body: message += "## ๐Ÿ“ Description\n\n" if len(body) > 1000: message += f"{body[:1000]}...\n\n" else: message += f"{body}\n\n" message += "## ๐Ÿงช Testing Requirements\n\n" if is_testing_required: if testing_criteria: message += "**Testing Criteria:**\n" for criterion in testing_criteria: message += f"- [ ] {criterion}\n" message += "\n" else: message += "Testing required, but no specific criteria listed.\n\n" else: message += "No explicit testing requirements detected.\n\n" if technical_notes: message += "## ๐Ÿ”ง Technical Notes\n\n" for note in technical_notes: message += f"- {note}\n" message += "\n" if is_docs_required: message += "## ๐Ÿ“š Documentation Required\n\n" message += "This ticket mentions documentation needs.\n\n" if issue_refs: message += "## ๐Ÿ”— Related Issues\n\n" for ref in issue_refs: message += f"- {ref}\n" message += "\n" message += "## ๐Ÿš€ Next Steps\n\n" message += "1. Create branch: `create_branch()`\n" message += "2. Make changes using `apply_diff()` or `commit_changes()`\n" message += "3. Update ticket: `update_ticket(issue_number, comment)`\n" message += "4. Create PR: `create_pull_request(title)`\n" retVal["status"] = "success" retVal["message"] = message return retVal except httpx.HTTPStatusError as e: error_msg = self._gitea.format_error(e, f"issue #{issue_number}") if e.response.status_code == 404: retVal["message"] = f"Issue #{issue_number} not found in {owner}/{repo_name}." else: retVal["message"] = f"Failed to fetch issue. {error_msg}" return retVal except Exception as e: retVal["message"] = f"Unexpected failure: {type(e).__name__}: {e}" return retVal async def read_ticket_by_url( self, url: str, __user__: dict = None, __metadata__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> dict: """ Read and parse a ticket/issue by URL. Format: https://///issues/ """ retVal = {"status": "failure", "message": ""} # Parse URL owner, repo, issue_number = self._gitea.parse_issue_url(url) if not owner or not repo or not issue_number: retVal["message"] = f"Invalid issue URL format. Expected: https://domain/owner/repo/issues/number\nGot: {url}" return retVal # Use the standard read_ticket with parsed components return await self.read_ticket( issue_number=issue_number, repo=f"{owner}/{repo}", __user__=__user__, __metadata__=__metadata__, __event_emitter__=__event_emitter__, ) async def update_ticket( self, issue_number: int, comment: str, repo: Optional[str] = None, __user__: dict = None, __metadata__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> dict: """ Post a status update comment to a ticket. Status goes on the ticket, not to the user. """ retVal = {"status": "failure", "message": ""} token = self._gitea.get_token(__user__) if not token: retVal["message"] = "GITEA_TOKEN not configured in UserValves settings." return retVal try: owner, repo_name = self._gitea.resolve_repo(repo, __user__) except ValueError as e: retVal["message"] = str(e) return retVal if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Updating issue #{issue_number}...", "done": False, }, } ) try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: response = await client.post( self._gitea.api_url(f"/repos/{owner}/{repo_name}/issues/{issue_number}/comments"), headers=self._gitea.headers(__user__), json={"body": comment}, ) response.raise_for_status() result = response.json() if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) retVal["status"] = "success" retVal["message"] = f"โœ… Updated issue #{issue_number} with status comment" return retVal except httpx.HTTPStatusError as e: error_msg = self._gitea.format_error(e, f"issue #{issue_number} update") retVal["message"] = f"Failed to update issue. {error_msg}" return retVal except Exception as e: retVal["message"] = f"Unexpected failure: {type(e).__name__}: {e}" return retVal async def create_branch( self, repo: Optional[str] = None, __user__: dict = None, __metadata__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> dict: """ Create a new branch with name = chat_id. KISS: Branch name IS chat_id. Nothing else. """ retVal = {"status": "failure", "message": ""} token = self._gitea.get_token(__user__) if not token: retVal["message"] = "GITEA_TOKEN not configured in UserValves settings." return retVal try: owner, repo_name = self._gitea.resolve_repo(repo, __user__) except ValueError as e: retVal["message"] = str(e) return retVal # Branch name IS chat_id branch_name = self._gitea.get_branch(__user__, __metadata__) # Check if protected branch if branch_name in self.valves.PROTECTED_BRANCHES: retVal["message"] = f"โŒ Cannot create branch with protected name '{branch_name}'" return retVal if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Creating branch {branch_name}...", "done": False, }, } ) # Get base branch base_branch = self.valves.DEFAULT_BRANCH try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: response = await client.post( self._gitea.api_url(f"/repos/{owner}/{repo_name}/branches"), headers=self._gitea.headers(__user__), json={ "new_branch_name": branch_name, "old_branch_name": base_branch, }, ) # Handle branch already exists if response.status_code == 409: retVal["status"] = "success" retVal["message"] = f"โš ๏ธ Branch `{branch_name}` already exists." return retVal response.raise_for_status() if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) retVal["status"] = "success" retVal["message"] = f"""โœ… **Branch Created Successfully** **Branch:** `{branch_name}` (chat_id) **Base Branch:** `{base_branch}` **Repository:** `{owner}/{repo_name}` **Next Steps:** 1. Make changes to files using `apply_diff()` or `commit_changes()` 2. Update ticket with status: `update_ticket(issue_number, comment)` 3. Create PR: `create_pull_request(title)` """ return retVal except httpx.HTTPStatusError as e: error_msg = self._gitea.format_error(e, "branch creation") retVal["message"] = f"Failed to create branch. {error_msg}" return retVal except Exception as e: retVal["message"] = f"Unexpected failure: {type(e).__name__}: {e}" return retVal async def get_branch_status( self, repo: Optional[str] = None, __user__: dict = None, __metadata__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> dict: """Get the current working branch status""" retVal = {"status": "success", "message": ""} try: owner, repo_name = self._gitea.resolve_repo(repo, __user__) except ValueError as e: retVal["status"] = "failure" retVal["message"] = str(e) return retVal # Get branch from metadata (chat_id) working_branch = self._gitea.get_branch(__user__, __metadata__) message = f"# ๐ŸŒฟ Branch Status: {owner}/{repo_name}\n\n" message += f"**Current Branch:** `{working_branch}` (chat_id)\n\n" if __metadata__ and __metadata__.get("chat_id"): message += f"**Chat ID:** `{__metadata__.get('chat_id')}`\n\n" message += "All file operations will use this branch.\n" retVal["message"] = message return retVal async def list_branches( self, repo: Optional[str] = None, __user__: dict = None, __metadata__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> dict: """List all branches in the repository""" retVal = {"status": "failure", "message": ""} token = self._gitea.get_token(__user__) if not token: retVal["message"] = "GITEA_TOKEN not configured." return retVal try: owner, repo_name = self._gitea.resolve_repo(repo, __user__) except ValueError as e: retVal["message"] = str(e) return retVal if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Fetching branches...", "done": False}, } ) try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: response = await client.get( self._gitea.api_url(f"/repos/{owner}/{repo_name}/branches"), headers=self._gitea.headers(__user__), params={"limit": 50}, ) response.raise_for_status() branches = response.json() message = f"# ๐ŸŒฟ Branches in {owner}/{repo_name}\n\n" # Separate protected and other branches protected = [b for b in branches if b.get("protected")] other = [b for b in branches if not b.get("protected")] if protected: message += "## ๐Ÿ›ก๏ธ Protected Branches\n\n" for branch in sorted(protected, key=lambda x: x["name"]): name = branch.get("name", "") commit_sha = branch.get("commit", {}).get("id", "")[:8] message += f"- `{name}` [commit: {commit_sha}]\n" message += "\n" if other: message += "## ๐Ÿ“ฆ Other Branches\n\n" for branch in sorted(other, key=lambda x: x["name"])[:20]: name = branch.get("name", "") commit_sha = branch.get("commit", {}).get("id", "")[:8] message += f"- `{name}` [commit: {commit_sha}]\n" if len(other) > 20: message += f"\n... and {len(other) - 20} more branches\n" message += "\n" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) retVal["status"] = "success" retVal["message"] = message.strip() return retVal except httpx.HTTPStatusError as e: error_msg = self._gitea.format_error(e, "branch listing") retVal["message"] = f"Failed to list branches. {error_msg}" return retVal except Exception as e: retVal["message"] = f"Unexpected failure: {type(e).__name__}: {e}" return retVal async def apply_diff( self, path: str, diff_content: str, message: Optional[str] = None, repo: Optional[str] = None, auto_message: bool = True, __user__: dict = None, __metadata__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> dict: """ Apply a unified diff patch to a file. This is the PREFERRED method for making changes as it: 1. Is precise about what changes 2. Prevents accidental file replacements 3. Is what LLMs understand best (trained on GitHub PRs) """ retVal = {"status": "failure", "message": ""} # Get working branch from metadata effective_branch = self._gitea.get_branch(__user__, __metadata__) token = self._gitea.get_token(__user__) if not token: retVal["message"] = "GITEA_TOKEN not configured in UserValves settings." return retVal try: owner, repo_name = self._gitea.resolve_repo(repo, __user__) except ValueError as e: retVal["message"] = str(e) return retVal if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Applying diff to {path}...", "done": False, }, } ) try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: # Get current file content get_response = await client.get( self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), headers=self._gitea.headers(__user__), params={"ref": effective_branch}, ) # Check if file exists if get_response.status_code == 404: retVal["message"] = f"File not found: `{path}`. Use `create_file()` to create a new file." return retVal get_response.raise_for_status() file_info = get_response.json() current_sha = file_info.get("sha") # Decode current content current_content_b64 = file_info.get("content", "") try: current_content = base64.b64decode(current_content_b64).decode("utf-8") except Exception: retVal["message"] = "Could not decode current file content." return retVal # Parse and apply the diff new_content = self._gitea.apply_unified_diff(current_content, diff_content) if new_content is None: retVal["message"] = "Failed to parse or apply diff. Check the diff format." return retVal # Generate commit message if needed if not message: if auto_message: message = self._gitea.generate_diff_commit_message(path, diff_content) else: retVal["message"] = "Commit message is required when auto_message=False." return retVal # Commit the changes new_content_b64 = base64.b64encode(new_content.encode("utf-8")).decode("ascii") response = await client.put( self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), headers=self._gitea.headers(__user__), json={ "content": new_content_b64, "message": message, "branch": effective_branch, "sha": current_sha, }, ) response.raise_for_status() result = response.json() commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] # Parse diff stats added_lines = diff_content.count("+") - diff_content.count("+++") removed_lines = diff_content.count("-") - diff_content.count("---") if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) message_text = f"โœ… **Diff Applied Successfully**\n\n" message_text += f"**File:** `{path}`\n" message_text += f"**Branch:** `{effective_branch}`\n" message_text += f"**Commit:** `{commit_sha}`\n" message_text += f"**Changes:** +{added_lines} lines, -{removed_lines} lines\n\n" message_text += f"**Message:** {message}\n" retVal["status"] = "success" retVal["message"] = message_text return retVal except httpx.HTTPStatusError as e: error_msg = self._gitea.format_error(e, f"diff application to '{path}'") if e.response.status_code == 409: retVal["message"] = f"Update conflict for `{path}`. Fetch the latest version and try again." else: retVal["message"] = f"Failed to apply diff. {error_msg}" return retVal except Exception as e: retVal["message"] = f"Unexpected failure during diff application: {type(e).__name__}: {e}" return retVal async def commit_changes( self, path: str, content: str, message: Optional[str] = None, repo: Optional[str] = None, max_delta_percent: Optional[float] = None, __user__: dict = None, __metadata__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> dict: """ Commit file changes with automatic content detection and size delta gating. This method: 1. Detects whether to create or replace a file 2. Validates file size changes against threshold (quality gate) 3. Auto-generates commit message if not provided 4. Commits to the working branch """ retVal = {"status": "failure", "message": ""} # Get working branch from metadata effective_branch = self._gitea.get_branch(__user__, __metadata__) token = self._gitea.get_token(__user__) if not token: retVal["message"] = "GITEA_TOKEN not configured in UserValves settings." return retVal try: owner, repo_name = self._gitea.resolve_repo(repo, __user__) except ValueError as e: retVal["message"] = str(e) return retVal # Use provided threshold or default from valves delta_threshold = max_delta_percent or self.valves.MAX_SIZE_DELTA_PERCENT if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Processing {path}...", "done": False, }, } ) try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: # Check if file exists get_response = await client.get( self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), headers=self._gitea.headers(__user__), params={"ref": effective_branch}, ) file_exists = get_response.status_code == 200 current_sha = None current_size = 0 if file_exists: get_response.raise_for_status() file_info = get_response.json() current_sha = file_info.get("sha") current_size = file_info.get("size", 0) # SIZE DELTA GATE - Quality check new_size = len(content.encode("utf-8")) if current_size > 0 and new_size > 0: delta_percent = abs(new_size - current_size) / current_size * 100 if delta_percent > delta_threshold: # Calculate actual bytes changed size_diff = new_size - current_size direction = "larger" if size_diff > 0 else "smaller" if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": "Size gate triggered", "done": True, "hidden": True, }, } ) retVal["message"] = f"""โš ๏ธ **Quality Gate: Large File Change Detected** **File:** `{path}` **Current Size:** {current_size} bytes **New Size:** {new_size} bytes **Change:** {size_diff:+d} bytes ({delta_percent:.1f}% {direction}) **Threshold:** {delta_threshold}% **Recommended: Use diff-based updates instead** ```python apply_diff(path="{path}", diff_content="...") ``` Or override with: ```python commit_changes(..., max_delta_percent=100) ``` """ return retVal # Generate commit message if not provided if not message: message = self._gitea.generate_commit_message( change_type="chore", scope=path.split("/")[-1] if "/" in path else path, description=f"update {path}", ) # Prepare content content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii") if file_exists: # Replace existing file if not current_sha: retVal["message"] = f"Could not retrieve SHA for existing file: {path}" return retVal response = await client.put( self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), headers=self._gitea.headers(__user__), json={ "content": content_b64, "message": message, "branch": effective_branch, "sha": current_sha, }, ) else: # Create new file response = await client.post( self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), headers=self._gitea.headers(__user__), json={ "content": content_b64, "message": message, "branch": effective_branch, }, ) response.raise_for_status() result = response.json() commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] action = "Updated" if file_exists else "Created" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) # Calculate and show size change new_size = len(content.encode("utf-8")) size_info = "" if file_exists and current_size > 0: delta = new_size - current_size delta_percent = delta / current_size * 100 size_info = f"**Size Change:** {delta:+d} bytes ({delta_percent:.1f}%)\n" message_text = f"""โœ… **{action} File Successfully** **File:** `{path}` **Branch:** `{effective_branch}` **Commit:** `{commit_sha}` **Message:** {message} {size_info}**Action:** {action} """ retVal["status"] = "success" retVal["message"] = message_text return retVal except httpx.HTTPStatusError as e: error_msg = self._gitea.format_error(e, f"file commit for '{path}'") if e.response.status_code == 409: retVal["message"] = f"Update conflict for `{path}`. Fetch the latest version and try again." else: retVal["message"] = f"Failed to commit file. {error_msg}" return retVal except Exception as e: retVal["message"] = f"Unexpected failure during commit: {type(e).__name__}: {e}" return retVal async def create_pull_request( self, title: str, body: Optional[str] = "", repo: Optional[str] = None, __user__: dict = None, __metadata__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> dict: """Create a pull request from the current branch""" retVal = {"status": "failure", "message": ""} # Get working branch from metadata head_branch = self._gitea.get_branch(__user__, __metadata__) token = self._gitea.get_token(__user__) if not token: retVal["message"] = "GITEA_TOKEN not configured." return retVal try: owner, repo_name = self._gitea.resolve_repo(repo, __user__) except ValueError as e: retVal["message"] = str(e) return retVal base_branch = self.valves.DEFAULT_BRANCH # Check if protected branch if head_branch in self.valves.PROTECTED_BRANCHES: retVal["message"] = f"โŒ Cannot create PR from protected branch '{head_branch}'" return retVal if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Creating PR...", "done": False}, } ) try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: response = await client.post( self._gitea.api_url(f"/repos/{owner}/{repo_name}/pulls"), headers=self._gitea.headers(__user__), json={ "title": title, "head": head_branch, "base": base_branch, "body": body, }, ) # Handle PR already exists if response.status_code == 409: retVal["message"] = f"โš ๏ธ PR already exists for branch `{head_branch}`" return retVal response.raise_for_status() pr = response.json() pr_number = pr.get("number") pr_url = pr.get("html_url", "") if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) retVal["status"] = "success" retVal["message"] = f"""โœ… **Pull Request Created Successfully** **PR #{pr_number}:** {title} **Branch:** `{head_branch}` โ†’ `{base_branch}` **URL:** {pr_url} **Next Steps:** 1. Read PR feedback: `read_pull_request({pr_number})` 2. Address reviewer comments 3. Update ticket with PR link """ return retVal except httpx.HTTPStatusError as e: error_msg = self._gitea.format_error(e, "PR creation") if e.response.status_code == 422: retVal["message"] = "Could not create PR. The branch may not exist or there may be merge conflicts." else: retVal["message"] = f"Failed to create PR. {error_msg}" return retVal except Exception as e: retVal["message"] = f"Unexpected failure during PR creation: {type(e).__name__}: {e}" return retVal async def read_pull_request( self, pr_number: int, repo: Optional[str] = None, __user__: dict = None, __metadata__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> dict: """ Read pull request details including review feedback. This is how you get feedback to iterate on changes. """ retVal = {"status": "failure", "message": ""} token = self._gitea.get_token(__user__) if not token: retVal["message"] = "GITEA_TOKEN not configured." return retVal try: owner, repo_name = self._gitea.resolve_repo(repo, __user__) except ValueError as e: retVal["message"] = str(e) return retVal if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Fetching PR #{pr_number}...", "done": False, }, } ) try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: # Get PR details pr_response = await client.get( self._gitea.api_url(f"/repos/{owner}/{repo_name}/pulls/{pr_number}"), headers=self._gitea.headers(__user__), ) pr_response.raise_for_status() pr = pr_response.json() # Get PR comments comments_response = await client.get( self._gitea.api_url(f"/repos/{owner}/{repo_name}/pulls/{pr_number}/comments"), headers=self._gitea.headers(__user__), ) comments_response.raise_for_status() comments = comments_response.json() title = pr.get("title", "") body = pr.get("body", "") state = pr.get("state", "") user = pr.get("user", {}).get("login", "") head_branch = pr.get("head", {}).get("ref", "") base_branch = pr.get("base", {}).get("ref", "") mergeable = pr.get("mergeable", False) merged = pr.get("merged", False) html_url = pr.get("html_url", "") if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) # Build message message = f"# ๐Ÿ”€ Pull Request #{pr_number}: {title}\n\n" message += f"**State:** {state.upper()} | **Author:** @{user}\n" message += f"**Branch:** `{head_branch}` โ†’ `{base_branch}`\n" message += f"**Mergeable:** {'โœ… Yes' if mergeable else 'โŒ No'}\n" message += f"**Merged:** {'โœ… Yes' if merged else 'โŒ No'}\n" message += f"**URL:** {html_url}\n\n" if body: message += "## ๐Ÿ“ Description\n\n" message += f"{body}\n\n" if comments: message += f"## ๐Ÿ’ฌ Review Comments ({len(comments)})\n\n" for comment in comments[:10]: # Limit to 10 most recent comment_user = comment.get("user", {}).get("login", "unknown") comment_body = comment.get("body", "") comment_path = comment.get("path", "") comment_line = comment.get("line", "") message += f"**@{comment_user}**" if comment_path: message += f" on `{comment_path}`" if comment_line: message += f" (line {comment_line})" message += f":\n{comment_body}\n\n" if len(comments) > 10: message += f"... and {len(comments) - 10} more comments\n\n" else: message += "## ๐Ÿ’ฌ No review comments yet\n\n" message += "## ๐Ÿš€ Next Steps\n\n" if comments: message += "1. Address review comments\n" message += "2. Make changes using `apply_diff()` or `commit_changes()`\n" message += "3. Update ticket with progress\n" else: message += "Waiting for review feedback...\n" retVal["status"] = "success" retVal["message"] = message return retVal except httpx.HTTPStatusError as e: error_msg = self._gitea.format_error(e, f"PR #{pr_number}") if e.response.status_code == 404: retVal["message"] = f"PR #{pr_number} not found in {owner}/{repo_name}." else: retVal["message"] = f"Failed to fetch PR. {error_msg}" return retVal except Exception as e: retVal["message"] = f"Unexpected failure: {type(e).__name__}: {e}" return retVal async def replace_file( self, path: str, content: str, message: str, repo: Optional[str] = None, __user__: dict = None, __metadata__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> dict: """ Update an existing file in the repository. WARNING: This replaces the entire file content. Use `apply_diff()` for incremental changes. """ retVal = {"status": "failure", "message": ""} effective_branch = self._gitea.get_branch(__user__, __metadata__) token = self._gitea.get_token(__user__) if not token: retVal["message"] = "GITEA_TOKEN not configured in UserValves settings." return retVal try: owner, repo_name = self._gitea.resolve_repo(repo, __user__) except ValueError as e: retVal["message"] = str(e) return retVal if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": f"Updating {path}...", "done": False}, } ) try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: get_response = await client.get( self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), headers=self._gitea.headers(__user__), params={"ref": effective_branch}, ) if get_response.status_code == 404: retVal["message"] = f"File not found: `{path}`. Use `create_file()` to create a new file." return retVal get_response.raise_for_status() file_info = get_response.json() sha = file_info.get("sha") if not sha: retVal["message"] = "Could not retrieve file SHA for update." return retVal content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii") response = await client.put( self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), headers=self._gitea.headers(__user__), json={ "content": content_b64, "message": message, "branch": effective_branch, "sha": sha, }, ) response.raise_for_status() result = response.json() commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) message_text = f"โœ… **File Updated Successfully**\n\n" message_text += f"**File:** `{path}`\n" message_text += f"**Branch:** `{effective_branch}`\n" message_text += f"**Commit:** `{commit_sha}`\n" message_text += f"**Message:** {message}\n" retVal["status"] = "success" retVal["message"] = message_text return retVal except httpx.HTTPStatusError as e: error_msg = self._gitea.format_error(e, f"file update for '{path}'") if e.response.status_code == 409: retVal["message"] = f"Update conflict for `{path}`. Fetch the latest version and try again." else: retVal["message"] = f"Failed to update file. {error_msg}" return retVal except Exception as e: retVal["message"] = f"Unexpected failure during file update: {type(e).__name__}: {e}" return retVal async def create_file( self, path: str, content: str, message: str, repo: Optional[str] = None, __user__: dict = None, __metadata__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> dict: """Create a new file in the repository""" retVal = {"status": "failure", "message": ""} effective_branch = self._gitea.get_branch(__user__, __metadata__) token = self._gitea.get_token(__user__) if not token: retVal["message"] = "GITEA_TOKEN not configured in UserValves settings." return retVal try: owner, repo_name = self._gitea.resolve_repo(repo, __user__) except ValueError as e: retVal["message"] = str(e) return retVal if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": f"Creating {path}...", "done": False}, } ) try: content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii") async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: response = await client.post( self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), headers=self._gitea.headers(__user__), json={ "content": content_b64, "message": message, "branch": effective_branch, }, ) response.raise_for_status() result = response.json() commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) message_text = f"โœ… **File Created Successfully**\n\n" message_text += f"**File:** `{path}`\n" message_text += f"**Branch:** `{effective_branch}`\n" message_text += f"**Commit:** `{commit_sha}`\n" message_text += f"**Message:** {message}\n" retVal["status"] = "success" retVal["message"] = message_text return retVal except httpx.HTTPStatusError as e: error_msg = self._gitea.format_error(e, f"file creation for '{path}'") if e.response.status_code == 422: retVal["message"] = f"File already exists: `{path}`. Use `replace_file()` or `apply_diff()` to modify it." else: retVal["message"] = f"Failed to create file. {error_msg}" return retVal except Exception as e: retVal["message"] = f"Unexpected failure during file creation: {type(e).__name__}: {e}" return retVal async def delete_file( self, path: str, message: str, repo: Optional[str] = None, __user__: dict = None, __metadata__: dict = None, __event_emitter__: Callable[[dict], Any] = None, __event_call__: Callable[[dict], Any] = None, ) -> dict: """Delete a file from the repository""" retVal = {"status": "failure", "message": ""} effective_branch = self._gitea.get_branch(__user__, __metadata__) token = self._gitea.get_token(__user__) if not token: retVal["message"] = "GITEA_TOKEN not configured in UserValves settings." return retVal try: owner, repo_name = self._gitea.resolve_repo(repo, __user__) except ValueError as e: retVal["message"] = str(e) return retVal # Confirmation dialog if __event_call__: result = await __event_call__( { "type": "confirmation", "data": { "title": "Confirm File Deletion", "message": f"Delete `{path}` from `{owner}/{repo_name}` ({effective_branch})?", }, } ) if result is None or result is False: retVal["message"] = "โš ๏ธ File deletion cancelled by user." return retVal if isinstance(result, dict) and not result.get("confirmed"): retVal["message"] = "โš ๏ธ File deletion cancelled by user." return retVal if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": f"Deleting {path}...", "done": False}, } ) try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: get_response = await client.get( self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), headers=self._gitea.headers(__user__), params={"ref": effective_branch}, ) if get_response.status_code == 404: retVal["message"] = f"File not found: `{path}`" return retVal get_response.raise_for_status() file_info = get_response.json() sha = file_info.get("sha") if not sha: retVal["message"] = "Could not retrieve file SHA for deletion." return retVal response = await client.delete( self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), headers=self._gitea.headers(__user__), json={ "message": message, "branch": effective_branch, "sha": sha, }, ) response.raise_for_status() result = response.json() commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) message_text = f"โœ… **File Deleted Successfully**\n\n" message_text += f"**File:** `{path}`\n" message_text += f"**Branch:** `{effective_branch}`\n" message_text += f"**Commit:** `{commit_sha}`\n" message_text += f"**Message:** {message}\n" retVal["status"] = "success" retVal["message"] = message_text return retVal except httpx.HTTPStatusError as e: error_msg = self._gitea.format_error(e, f"file deletion for '{path}'") if e.response.status_code == 404: retVal["message"] = f"File not found: `{path}`" else: retVal["message"] = f"Failed to delete file. {error_msg}" return retVal except Exception as e: retVal["message"] = f"Unexpected failure during file deletion: {type(e).__name__}: {e}" return retVal async def rename_file( self, old_path: str, new_path: str, message: Optional[str] = None, repo: Optional[str] = None, __user__: dict = None, __metadata__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> dict: """Rename a file in the repository""" retVal = {"status": "failure", "message": ""} effective_branch = self._gitea.get_branch(__user__, __metadata__) token = self._gitea.get_token(__user__) if not token: retVal["message"] = "GITEA_TOKEN not configured in UserValves settings." return retVal try: owner, repo_name = self._gitea.resolve_repo(repo, __user__) except ValueError as e: retVal["message"] = str(e) return retVal if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Renaming {old_path} to {new_path}...", "done": False, }, } ) try: if not message: message = self._gitea.generate_commit_message( change_type="chore", scope="rename", description=f"rename {old_path} โ†’ {new_path}", ) async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: get_response = await client.get( self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{old_path}"), headers=self._gitea.headers(__user__), params={"ref": effective_branch}, ) if get_response.status_code == 404: retVal["message"] = f"File not found: `{old_path}`" return retVal get_response.raise_for_status() old_file = get_response.json() old_sha = old_file.get("sha") content_b64 = old_file.get("content", "") try: content = base64.b64decode(content_b64).decode("utf-8") except Exception: retVal["message"] = "Could not decode file content." return retVal new_content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii") create_response = await client.post( self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{new_path}"), headers=self._gitea.headers(__user__), json={ "content": new_content_b64, "message": message, "branch": effective_branch, }, ) if create_response.status_code == 422: retVal["message"] = f"File already exists at new path: `{new_path}`" return retVal create_response.raise_for_status() delete_response = await client.delete( self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{old_path}"), headers=self._gitea.headers(__user__), json={ "message": message, "branch": effective_branch, "sha": old_sha, }, ) delete_response.raise_for_status() commit_sha = create_response.json().get("commit", {}).get("sha", "unknown")[:8] if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) message_text = f"โœ… **File Renamed Successfully**\n\n" message_text += f"**Old Path:** `{old_path}`\n" message_text += f"**New Path:** `{new_path}`\n" message_text += f"**Branch:** `{effective_branch}`\n" message_text += f"**Commit:** `{commit_sha}`\n" message_text += f"**Message:** {message}\n" retVal["status"] = "success" retVal["message"] = message_text return retVal except httpx.HTTPStatusError as e: error_msg = self._gitea.format_error(e, "file rename") retVal["message"] = f"Failed to rename file. {error_msg}" return retVal except Exception as e: retVal["message"] = f"Unexpected failure during file rename: {type(e).__name__}: {e}" return retVal async def get_file( self, path: str, repo: Optional[str] = None, __user__: dict = None, __metadata__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> dict: """Get the contents of a file from the repository""" retVal = {"status": "failure", "message": ""} effective_branch = self._gitea.get_branch(__user__, __metadata__) token = self._gitea.get_token(__user__) if not token: retVal["message"] = "GITEA_TOKEN not configured in UserValves settings." return retVal try: owner, repo_name = self._gitea.resolve_repo(repo, __user__) except ValueError as e: retVal["message"] = str(e) return retVal if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Reading file {path}...", "done": False, }, } ) try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: response = await client.get( self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), headers=self._gitea.headers(__user__), params={"ref": effective_branch}, ) response.raise_for_status() file_info = response.json() if isinstance(file_info, list): retVal["message"] = f"'{path}' is a directory. Use `list_files()` to browse its contents." return retVal if file_info.get("type") != "file": retVal["message"] = f"'{path}' is not a file (type: {file_info.get('type')})" return retVal content_b64 = file_info.get("content", "") try: content = base64.b64decode(content_b64).decode("utf-8") except Exception: retVal["message"] = "Could not decode file content. The file may be binary." return retVal size = file_info.get("size", 0) sha_short = file_info.get("sha", "unknown")[:8] if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) message_text = f"**File:** `{path}`\n" message_text += f"**Branch:** `{effective_branch}`\n" message_text += f"**SHA:** `{sha_short}` | **Size:** {size} bytes\n" message_text += f"**URL:** {file_info.get('html_url', 'N/A')}\n\n" message_text += f"```\n{content}\n```\n" retVal["status"] = "success" retVal["message"] = message_text return retVal except httpx.HTTPStatusError as e: error_msg = self._gitea.format_error(e, f"file fetch for '{path}'") if e.response.status_code == 404: retVal["message"] = f"File not found: `{path}`" else: retVal["message"] = f"Failed to fetch file. {error_msg}" return retVal except Exception as e: retVal["message"] = f"Unexpected failure: {type(e).__name__}: {e}" return retVal async def list_files( self, path: str = "", repo: Optional[str] = None, __user__: dict = None, __metadata__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> dict: """List files and directories in a repository path""" retVal = {"status": "failure", "message": ""} effective_branch = self._gitea.get_branch(__user__, __metadata__) token = self._gitea.get_token(__user__) if not token: retVal["message"] = "GITEA_TOKEN not configured in UserValves settings." return retVal try: owner, repo_name = self._gitea.resolve_repo(repo, __user__) except ValueError as e: retVal["message"] = str(e) return retVal if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Listing {path or 'root'}...", "done": False, }, } ) try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: response = await client.get( self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), headers=self._gitea.headers(__user__), params={"ref": effective_branch}, ) response.raise_for_status() contents = response.json() if isinstance(contents, dict): retVal["message"] = f"'{path}' is a file. Use `get_file()` to read its contents." return retVal message = f"**Contents of {owner}/{repo_name}/{path or '.'}** (`{effective_branch}`)\n\n" dirs = [item for item in contents if item.get("type") == "dir"] files = [item for item in contents if item.get("type") == "file"] if dirs: message += "**๐Ÿ“ Directories:**\n" for item in sorted(dirs, key=lambda x: x.get("name", "").lower()): message += f"- `๐Ÿ“ {item.get('name', '')}/`\n" message += "\n" if files: message += "**๐Ÿ“„ Files:**\n" for item in sorted(files, key=lambda x: x.get("name", "").lower()): size = item.get("size", 0) if size < 1024: size_str = f"{size}B" elif size < 1024 * 1024: size_str = f"{size//1024}KB" else: size_str = f"{size//(1024*1024)}MB" sha_short = item.get("sha", "unknown")[:8] message += f"- `{item.get('name', '')}` ({size_str}, sha: {sha_short})\n" message += f"\n**Total:** {len(dirs)} directories, {len(files)} files" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) retVal["status"] = "success" retVal["message"] = message return retVal except httpx.HTTPStatusError as e: error_msg = self._gitea.format_error(e, f"directory listing for '{path}'") if e.response.status_code == 404: retVal["message"] = f"Path not found: `{path}`" else: retVal["message"] = f"Failed to list directory contents. {error_msg}" return retVal except Exception as e: retVal["message"] = f"Unexpected failure: {type(e).__name__}: {e}" return retVal