From 889680a7b7c89b3e107d355df32a3dd30e9c14a8 Mon Sep 17 00:00:00 2001 From: xcaliber Date: Sun, 18 Jan 2026 22:24:53 +0000 Subject: [PATCH] feat(gitea): implement gitea_coder role with scope enforcement (#20) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary Implements the gitea_coder role as defined in issue #11, providing a complete workflow automation layer for Git operations with scope enforcement. ## Features ### Branch Management with Scope Gating - ✅ Enforces branch naming conventions (feature/, fix/, refactor/, docs/, test/, chore/) - ✅ Prevents direct pushes to protected branches (main, master, develop, dev) - ✅ Auto-appends issue numbers to branch names ### Unified Commit Workflow - ✅ Automatic create vs replace detection - ✅ Conventional commits format with issue references - ✅ Detailed commit message generation ### PR Creation - ✅ Validates source branch is not protected - ✅ Auto-references issues in PR description - ✅ Uses existing gitea/dev.py operations ### Ticket Integration - ✅ Reads and parses issue requirements - ✅ Extracts testing criteria and technical notes - ✅ Suggests branch names from issue content ## Files Added - `gitea/coder.py` - Complete gitea_coder role implementation ## Files Modified - `README.md` - Added gitea_coder documentation ## Testing Criteria ✅ Can create feature branch from ticket ✅ Can modify files according to ticket requirements ✅ Can generate commit messages with issue references ✅ Can create PR for review Refs: #11 Reviewed-on: https://git.gobha.me/open-webui-automation/tools/pulls/20 --- README.md | 3 +- gitea/coder.py | 2889 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 2891 insertions(+), 1 deletion(-) create mode 100644 gitea/coder.py diff --git a/README.md b/README.md index 13238d2..378c8fe 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,7 @@ This monorepo contains a collection of automation tools for Open WebUI, designed Python scripts for Git operations and repository management: - **`admin.py`**: Administrative utilities for managing Gitea repositories, potentially including user management, permissions, and batch operations. - **`dev.py`**: Development-focused tools for Git workflows, branch handling, and repository interactions tailored for software development processes. +- **`coder.py`**: Development workflow role that reads tickets, creates branches with scope enforcement, generates commit messages with issue references, and creates pull requests. ### venice/ Tools for interacting with Venice AI services: @@ -15,4 +16,4 @@ Tools for interacting with Venice AI services: - **`image.py`**: Utilities for image processing and generation using AI models. - **`info.py`**: Information retrieval and data handling tools, possibly for querying AI models or processing related data. -These tools are modular and can be used independently or integrated into larger automation pipelines for Open WebUI. \ No newline at end of file +These tools are modular and can be used independently or integrated into larger automation pipelines for Open WebUI. diff --git a/gitea/coder.py b/gitea/coder.py new file mode 100644 index 0000000..b1bf0ee --- /dev/null +++ b/gitea/coder.py @@ -0,0 +1,2889 @@ +""" +title: Gitea Coder - Workflow Role with Automatic Branch Management +author: Jeff Smith + minimax + Claude +version: 1.0.0 +license: MIT +description: High-level workflow role for LLM-based code generation with automatic branch management and quality gates +requirements: pydantic, httpx +changelog: + 1.0.0: + - Initial implementation of gitea_coder role + - Branch name = chat_id (KISS principle) + - Stateless design - no caching complexity + - Branch creation with scope gating (prevents main pushes) + - Generates detailed commit messages with ticket references + - Creates PRs from branches + - Reads ticket requirements from issues (by number or URL) + - Updates tickets with status + - Reads PR feedback + - Unified file operations workflow + - Diff-based updates with apply_diff() + - Size delta gating in commit_changes() for quality control + - Complete CRUD operations: create_file, replace_file, delete_file, rename_file + - FIX: apply_unified_diff now properly fails when no valid hunks are parsed + - FIX: apply_diff detects no-op changes and returns failure with guidance + - FEATURE: read_ticket auto-creates branch for immediate file operations + - FEATURE: apply_diff returns patched file content for LLM review + - FEATURE: Persistent session state (repo/issue/branch/pr) survives restarts + - FEATURE: State enables cheaper models (kimi-k2, qwen3) that lose context + - FEATURE: get_session_state() and clear_session_state() for debugging + - FEATURE: list_sessions() finds orphaned work from crashed chats + - FEATURE: claim_session() continues work from dead sessions (disaster recovery) + - ENHANCEMENT: Simplified 4-step workflow (read ticket → modify → update ticket → PR) + - ENHANCEMENT: Ticket updates emphasized as "statement of work" / audit trail + - ENHANCEMENT: Review checklist for common diff errors (accidental removals, misplacements) + - ENHANCEMENT: State resolution priority: explicit arg > state file > env var > UserValves > Valves + - ENHANCEMENT: Branch resolution uses state for session continuity after claim_session +""" + +from typing import Optional, Callable, Any, Dict, List, Tuple +from pydantic import BaseModel, Field +from datetime import datetime +from pathlib import Path +import re +import base64 +import httpx +import json +import os + + +class GiteaHelpers: + """ + Helper methods for Gitea API interactions. + Designed to be mixed into Tools class via composition. + """ + + def __init__(self, tools_instance): + """Initialize with reference to parent Tools instance for valve access""" + self.tools = tools_instance + + @property + def valves(self): + """Access valves from parent Tools instance""" + return self.tools.valves + + # ============= STATE PERSISTENCE ============= + # Persists repo/issue/branch across restarts and helps models + # that struggle with context tracking (kimi-k2, qwen3-code) + + def _state_dir(self) -> Path: + """Get base state directory""" + base = os.environ.get( + "GITEA_CODER_STATE_DIR", os.path.expanduser("~/.gitea_coder") + ) + return Path(base) + + def _state_path(self, chat_id: str) -> Path: + """Get state file path for a chat session""" + return self._state_dir() / "chats" / chat_id / "state.json" + + def load_state(self, chat_id: str) -> dict: + """Load persisted state for chat session""" + if not chat_id: + return {} + path = self._state_path(chat_id) + if path.exists(): + try: + return json.loads(path.read_text()) + except Exception: + return {} + return {} + + def save_state(self, chat_id: str, **updates): + """Update and persist state for chat session""" + if not chat_id: + return + try: + state = self.load_state(chat_id) + state.update(updates) + state["updated_at"] = datetime.utcnow().isoformat() + + path = self._state_path(chat_id) + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(state, indent=2)) + except Exception: + pass # Best effort - don't fail operations on state save errors + + def get_state_summary(self, chat_id: str) -> str: + """Get human-readable state summary for debugging""" + state = self.load_state(chat_id) + if not state: + return "No persisted state" + parts = [] + if state.get("repo"): + parts.append(f"repo=`{state['repo']}`") + if state.get("issue"): + parts.append(f"issue=#{state['issue']}") + if state.get("branch"): + parts.append(f"branch=`{state['branch'][:8]}...`") + return ", ".join(parts) if parts else "Empty state" + + # ============= API HELPERS ============= + + def api_url(self, endpoint: str) -> str: + """Construct full API URL for Gitea endpoint""" + base = self.get_url() + return f"{base}/api/v1{endpoint}" + + def get_url(self) -> str: + """Get effective Gitea URL with trailing slash handling""" + return self.valves.GITEA_URL.rstrip("/") + + def get_token(self, __user__: dict = None) -> str: + """Extract Gitea token from user context""" + if __user__ and "valves" in __user__: + user_valves = __user__.get("valves") + if user_valves: + token = getattr(user_valves, "GITEA_TOKEN", "") + if token: + return token + return "" + + def headers(self, __user__: dict = None) -> dict: + """Generate authentication headers with token""" + token = self.get_token(__user__) + headers = {"Content-Type": "application/json"} + if token: + headers["Authorization"] = f"token {token}" + return headers + + def format_error(self, e: httpx.HTTPStatusError, context: str = "") -> str: + """Format HTTP error with detailed context""" + try: + error_json = e.response.json() + error_msg = error_json.get("message", e.response.text[:200]) + except Exception: + error_msg = e.response.text[:200] + + context_str = f" ({context})" if context else "" + return f"HTTP Error {e.response.status_code}{context_str}: {error_msg}" + + def get_repo( + self, repo: Optional[str], __user__: dict = None, __metadata__: dict = None + ) -> str: + """ + Get effective repository with priority resolution: + 1. Explicit arg (always wins) + 2. Persistent state (learned from previous operations) + 3. Environment variable GITEA_DEFAULT_REPO (k8s/cron) + 4. UserValves override (interactive) + 5. Admin Valves default + """ + # 1. Explicit arg always wins + if repo: + return repo + + # 2. Check persistent state (helps models that lose track) + if __metadata__: + chat_id = __metadata__.get("chat_id") + if chat_id: + state = self.load_state(chat_id) + if state.get("repo"): + return state["repo"] + + # 3. Environment variable (k8s/cron native) + env_repo = os.environ.get("GITEA_DEFAULT_REPO") + if env_repo: + return env_repo + + # 4. UserValves override + if __user__ and "valves" in __user__: + user_valves = __user__.get("valves") + if user_valves and self.valves.ALLOW_USER_OVERRIDES: + user_repo = getattr(user_valves, "USER_DEFAULT_REPO", "") + if user_repo: + return user_repo + + # 5. Admin Valves default + return self.valves.DEFAULT_REPO + + def get_branch(self, __user__: dict = None, __metadata__: dict = None) -> str: + """ + Get effective branch name with priority resolution: + 1. Claimed session branch (from state file - for disaster recovery) + 2. chat_id from metadata (default - branch = chat) + 3. UserValves override + 4. Fallback to DEFAULT_BRANCH + """ + chat_id = None + if __metadata__: + chat_id = __metadata__.get("chat_id") + + # Check if we've claimed another session's branch (disaster recovery) + if chat_id: + state = self.load_state(chat_id) + if state.get("branch"): + # Use branch from state - could be our own or claimed + return state["branch"] + else: + # No state yet, use chat_id as branch + return chat_id + + # Then check user valves override + if __user__ and "valves" in __user__: + user_valves = __user__.get("valves") + if user_valves and self.valves.ALLOW_USER_OVERRIDES: + user_branch = getattr(user_valves, "USER_DEFAULT_BRANCH", "") + if user_branch: + return user_branch + + # Finally fall back to default + return self.valves.DEFAULT_BRANCH + + def resolve_repo( + self, repo: Optional[str], __user__: dict = None, __metadata__: dict = None + ) -> Tuple[str, str]: + """Resolve repository string into owner and repo name""" + effective_repo = self.get_repo(repo, __user__, __metadata__) + + if not effective_repo: + raise ValueError( + "No repository specified. Set DEFAULT_REPO in Valves, USER_DEFAULT_REPO in UserValves, " + "or pass repo='owner/name' explicitly." + ) + + if "/" not in effective_repo: + raise ValueError( + f"Repository must be in 'owner/repo' format, got: {effective_repo}" + ) + + return tuple(effective_repo.split("/", 1)) + + def parse_issue_url( + self, url: str + ) -> Tuple[Optional[str], Optional[str], Optional[int]]: + """ + Parse a Gitea issue URL into components. + Format: https://///issues/ + + Returns: + Tuple of (owner, repo, issue_number) or (None, None, None) if invalid + """ + # Match: https://domain/owner/repo/issues/123 + match = re.match(r"https?://[^/]+/([^/]+)/([^/]+)/issues/(\d+)", url) + if match: + owner = match.group(1) + repo = match.group(2) + issue_number = int(match.group(3)) + return (owner, repo, issue_number) + return (None, None, None) + + def parse_issue_refs(self, text: str) -> List[str]: + """Extract issue references from text (e.g., #42, issue #42)""" + refs = re.findall(r"#(\d+)", text) + issue_refs = [f"#{ref}" for ref in refs] + + # Also check for "issue N" pattern + issue_n_refs = re.findall(r"issue\s*#?(\d+)", text, re.IGNORECASE) + for ref in issue_n_refs: + issue_ref = f"#{ref}" + if issue_ref not in issue_refs: + issue_refs.append(issue_ref) + + return issue_refs + + def apply_unified_diff( + self, current_content: str, diff_content: str + ) -> Tuple[Optional[str], str]: + """ + Apply a unified diff to content. + + Args: + current_content: Current file content + diff_content: Unified diff patch + + Returns: + Tuple of (new_content, error_message) + - On success: (new_content, "") + - On failure: (None, error_description) + """ + try: + diff_lines = diff_content.splitlines(keepends=True) + + # Validate diff format - check for hunk headers + has_hunk_header = any(line.startswith("@@") for line in diff_lines) + has_add_or_remove = any( + line.startswith("+") or line.startswith("-") + for line in diff_lines + if not line.startswith("+++") and not line.startswith("---") + ) + + if not has_hunk_header: + return ( + None, + "No hunk headers (@@) found. Unified diff format required.", + ) + + if not has_add_or_remove: + return (None, "No additions (+) or deletions (-) found in diff.") + + # Parse hunks from unified diff + hunks = [] + current_hunk = None + in_hunk = False + + for line in diff_lines: + if line.startswith("---") or line.startswith("+++"): + continue # Skip file headers + elif line.startswith("@@"): + if current_hunk: + hunks.append(current_hunk) + + # Parse hunk header: @@ -old_line,old_count +new_line,new_count @@ + match = re.search( + r"@@\s+-(\d+)(?:,(\d+))?\s+\+(\d+)(?:,(\d+))?\s+@@", line + ) + if match: + old_start = int(match.group(1)) + new_start = int(match.group(3)) + current_hunk = { + "old_start": old_start, + "new_start": new_start, + "lines": [], + } + in_hunk = True + else: + return (None, f"Invalid hunk header format: {line.strip()}") + continue + elif in_hunk and len(line) > 0 and line[0:1] in ("+", "-", " "): + if current_hunk: + current_hunk["lines"].append(line) + elif in_hunk and line.strip() == "": + # Empty line in diff - treat as context + if current_hunk: + current_hunk["lines"].append(" \n") + elif in_hunk: + if current_hunk: + hunks.append(current_hunk) + current_hunk = None + in_hunk = False + + if current_hunk: + hunks.append(current_hunk) + + if not hunks: + return (None, "No valid hunks could be parsed from the diff content.") + + # Split content into lines + old_lines = current_content.splitlines(keepends=False) + new_lines = list(old_lines) + + # Apply each hunk in reverse order to maintain correct indices + for hunk in sorted(hunks, key=lambda h: h["old_start"], reverse=True): + old_start = hunk["old_start"] - 1 # Convert to 0-indexed + + # Collect lines to remove and add + lines_to_remove = [] + lines_to_add = [] + + for line in hunk["lines"]: + if line.startswith("+"): + lines_to_add.append(line[1:].rstrip("\n")) + elif line.startswith("-"): + lines_to_remove.append(line[1:].rstrip("\n")) + + # Remove old lines if they match + if lines_to_remove: + end_idx = old_start + len(lines_to_remove) + if end_idx <= len(new_lines): + actual_lines = new_lines[old_start:end_idx] + if actual_lines == lines_to_remove: + del new_lines[old_start:end_idx] + else: + # Context mismatch - warn but continue + del new_lines[old_start:end_idx] + + # Insert new lines at old_start + for line in reversed(lines_to_add): + new_lines.insert(old_start, line) + + # Reconstruct content with line endings + new_content = "\n".join(new_lines) + if new_content and not new_content.endswith("\n"): + new_content += "\n" + + return (new_content, "") + + except Exception as e: + return (None, f"Diff parsing error: {type(e).__name__}: {e}") + + def generate_commit_message( + self, + change_type: str, + scope: str, + description: str, + issue_refs: Optional[List[str]] = None, + body: Optional[str] = None, + ) -> str: + """ + Generate a conventional commit message. + Format: (): + """ + # Validate and normalize change type + valid_types = [ + "feat", + "fix", + "docs", + "style", + "refactor", + "test", + "chore", + "perf", + "ci", + "build", + "revert", + ] + if change_type.lower() not in valid_types: + change_type = "chore" + + # Build the subject line + scope_str = f"({scope})" if scope else "" + message = f"{change_type.lower()}{scope_str}: {description}" + + # Add issue references to footer + if issue_refs: + refs_str = ", ".join(issue_refs) + footer = f"Refs: {refs_str}" + + if body: + body = f"{body}\n\n{footer}" + else: + message = f"{message}\n\n{footer}" + + if body: + message = f"{message}\n\n{body}" + + return message + + def generate_diff_commit_message(self, path: str, diff_content: str) -> str: + """Generate a commit message from diff content""" + file_name = path.split("/")[-1] + + # Detect change type from diff + change_type = "chore" + diff_lines = diff_content.splitlines() + + if any( + line.startswith("+def ") or line.startswith("+class ") + for line in diff_lines + ): + change_type = "feat" + elif any(line.startswith("+ return ") for line in diff_lines): + change_type = "fix" + elif "test" in path.lower() or "spec" in path.lower(): + change_type = "test" + elif ".md" in path.lower() or "readme" in path.lower(): + change_type = "docs" + elif any(line.startswith("-") for line in diff_lines): + change_type = "refactor" + + # Extract description from added lines + added_lines = [ + line[1:].strip() + for line in diff_lines + if line.startswith("+") and not line.startswith("+++") + ] + + description = "update" + if added_lines: + for line in added_lines: + if line and not line.startswith(("import ", "from ")): + match = re.match( + r"(def|class|const|var|let|interface|type)\s+(\w+)", line + ) + if match: + kind, name = match.groups() + if kind == "def": + description = f"add {name}() function" + break + elif kind == "class": + description = f"add {name} class" + break + elif not line.startswith((" ", "\t")): + description = line[:50].rstrip(":") + if len(line) > 50: + description += "..." + break + + return f"{change_type}({file_name}): {description}" + + +class Tools: + """ + Gitea Coder Role - High-level workflow automation for code generation tasks. + + ARCHITECTURE: + - Stateless design - no caching complexity + - Branch name = chat_id (PERIOD. Nothing else.) + - All operations are self-contained + - LLM focuses on code, not infrastructure + + Workflow: + reads ticket → creates branch (chat_id) → commits → updates ticket → creates PR + + Key Features: + - Branch name IS chat_id (KISS principle) + - Auto-generates conventional commit messages + - Quality gates for file changes (size delta validation) + - Diff-based updates to prevent accidental file replacements + - Complete CRUD operations (create, replace, delete, rename) + - Ticket updates for status tracking + - PR reading for feedback + """ + + class Valves(BaseModel): + """System-wide configuration for Gitea Coder integration""" + + GITEA_URL: str = Field( + default="https://gitea.example.com", + description="Gitea server URL", + ) + DEFAULT_REPO: str = Field( + default="", + description="Default repository in owner/repo format", + ) + DEFAULT_BRANCH: str = Field( + default="main", + description="Default branch name for base operations", + ) + ALLOW_USER_OVERRIDES: bool = Field( + default=True, + description="Allow users to override defaults via UserValves", + ) + VERIFY_SSL: bool = Field( + default=True, + description="Verify SSL certificates", + ) + MAX_SIZE_DELTA_PERCENT: float = Field( + default=50.0, + description="Maximum allowed file size change percentage (quality gate)", + ge=1.0, + le=500.0, + ) + PROTECTED_BRANCHES: List[str] = Field( + default=["main", "master", "develop", "dev", "release", "hotfix"], + description="Branches that cannot be committed to directly", + ) + + class UserValves(BaseModel): + """Per-user configuration""" + + GITEA_TOKEN: str = Field( + default="", + description="Your Gitea API token", + ) + USER_DEFAULT_REPO: str = Field( + default="", + description="Override default repository", + ) + USER_DEFAULT_BRANCH: str = Field( + default="", + description="Override default branch", + ) + + def __init__(self): + """Initialize tools""" + self.valves = self.Valves() + self.user_valves = self.UserValves() + self.citation = True + + # Initialize helper functions + self._gitea = GiteaHelpers(self) + + async def workflow_summary( + self, + __user__: dict = None, + __metadata__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> dict: + """Get a summary of available coder workflows and commands""" + + branch = self._gitea.get_branch(__user__, __metadata__) + + message = f"""# 🚀 Gitea Coder Workflow Guide + +## Workflow (4 Steps) + +1. **Read the ticket:** `read_ticket(issue_number)` or `read_ticket_by_url(url)` + - Branch `{branch}` is auto-created when you read the ticket + - Files are immediately accessible + +2. **Read & modify files:** + - `get_file(path)` - Read current content + - `list_files(path)` - Browse repository + - `apply_diff(path, diff, message)` - Apply changes (preferred) + - `commit_changes(path, content, message)` - Full file replacement + +3. **Update the ticket:** `update_ticket(issue_number, comment)` ← **CRITICAL** + - This is your **statement of work** + - Document what you did, what you found, decisions made + - Update throughout the process, not just at the end + +4. **Create PR:** `create_pull_request(title, description)` + - Use `read_pull_request(pr_number)` to check feedback + +## Available Commands + +### 📋 Ticket Operations (Statement of Work) +- `read_ticket(issue_number)` - Get issue details, auto-creates branch +- `read_ticket_by_url(url)` - Get issue details by URL, auto-creates branch +- `update_ticket(issue_number, comment)` - **Document your work on the ticket** + +### 🌿 Branch Management +- `create_branch()` - Manually create branch (usually not needed) +- `get_branch_status()` - See current working branch +- `list_branches()` - List all branches in repository + +### 🧠 Session State (Auto-managed) +- `get_session_state()` - View persisted repo/issue/branch context +- `clear_session_state()` - Reset session (start fresh) +- `list_sessions()` - Find orphaned sessions from crashed chats +- `claim_session(chat_id)` - Continue work from a dead session + +### 📝 File Operations +- `get_file(path)` - Read file content +- `list_files(path)` - List directory contents +- `apply_diff(path, diff, message)` - Apply unified diff patch (preferred) +- `commit_changes(path, content, message)` - Commit with size delta gate +- `replace_file(path, content, message)` - Replace entire file +- `create_file(path, content, message)` - Create new file +- `delete_file(path, message)` - Delete a file +- `rename_file(old_path, new_path, message)` - Rename a file + +### 📦 Pull Request Operations +- `create_pull_request(title, description)` - Create PR from current branch +- `read_pull_request(pr_number)` - Get PR details and review feedback + +## Current Session + +**Working Branch:** `{branch}` (same as chat_id) + +## Diff Format Guide + +When using `apply_diff()`, provide a **unified diff** format: + +```diff +--- a/path/to/file.js ++++ b/path/to/file.js +@@ -10,3 +10,4 @@ + existing line (context) ++new line to add + another existing line +-line to remove +``` + +**Key elements:** +- `@@` hunk headers are REQUIRED +- Lines starting with `+` are additions +- Lines starting with `-` are deletions +- Lines starting with ` ` (space) are context + +## Best Practices + +1. **Always update the ticket** - It's your audit trail and statement of work +2. **Use `apply_diff()` for changes** - More precise, prevents accidents +3. **Read files before modifying** - Understand current state first +4. **Commit messages matter** - They're auto-generated but can be customized +""" + + return {"status": "success", "message": message} + + async def read_ticket( + self, + issue_number: int, + repo: Optional[str] = None, + __user__: dict = None, + __metadata__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> dict: + """ + Read and parse a ticket/issue by number. + + Automatically creates the working branch (chat_id) if it doesn't exist. + This enables immediate file operations after reading the ticket. + """ + + retVal = {"status": "failure", "message": ""} + + token = self._gitea.get_token(__user__) + if not token: + retVal["message"] = "GITEA_TOKEN not configured in UserValves settings." + return retVal + + try: + owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__) + except ValueError as e: + retVal["message"] = str(e) + return retVal + + # Get branch name (chat_id) + branch_name = self._gitea.get_branch(__user__, __metadata__) + branch_status = "unknown" + + # Auto-create branch for immediate file operations + if branch_name not in self.valves.PROTECTED_BRANCHES: + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Setting up branch {branch_name[:8]}...", + "done": False, + }, + } + ) + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + response = await client.post( + self._gitea.api_url(f"/repos/{owner}/{repo_name}/branches"), + headers=self._gitea.headers(__user__), + json={ + "new_branch_name": branch_name, + "old_branch_name": self.valves.DEFAULT_BRANCH, + }, + ) + + if response.status_code == 409: + branch_status = "exists" + elif response.status_code in (200, 201): + branch_status = "created" + else: + branch_status = "failed" + except Exception: + branch_status = "failed" + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Fetching issue #{issue_number}...", + "done": False, + }, + } + ) + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + response = await client.get( + self._gitea.api_url( + f"/repos/{owner}/{repo_name}/issues/{issue_number}" + ), + headers=self._gitea.headers(__user__), + ) + response.raise_for_status() + issue = response.json() + + title = issue.get("title", "No title") + body = issue.get("body", "") + state = issue.get("state", "unknown") + user = issue.get("user", {}).get("login", "unknown") + labels = [label.get("name", "") for label in issue.get("labels", [])] + created_at = issue.get("created_at", "")[:10] + html_url = issue.get("html_url", "") + + # Parse body for structured info + testing_criteria = [] + technical_notes = [] + is_testing_required = "test" in body.lower() or "testing" in body.lower() + is_docs_required = "documentation" in body.lower() or "docs" in body.lower() + + if is_testing_required: + testing_section = re.search( + r"(?:testing|criteria|test).*?:(.*?)(?:\n\n|$)", + body, + re.IGNORECASE | re.DOTALL, + ) + if testing_section: + testing_criteria = [ + line.strip().lstrip("-*•") + for line in testing_section.group(1).split("\n") + if line.strip() + ] + + tech_section = re.search( + r"(?:technical|tech).*?:(.*?)(?:\n\n|$)", + body, + re.IGNORECASE | re.DOTALL, + ) + if tech_section: + technical_notes = [ + line.strip().lstrip("-*•") + for line in tech_section.group(1).split("\n") + if line.strip() + ] + + issue_refs = self._gitea.parse_issue_refs(body) + if not any(ref == f"#{issue_number}" for ref in issue_refs): + issue_refs.insert(0, f"#{issue_number}") + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + # Build message + message = f"# 📋 Ticket #{issue_number}: {title}\n\n" + message += f"**State:** {state.upper()} | **Author:** @{user} | **Created:** {created_at}\n" + message += f"**Labels:** {', '.join(labels) if labels else 'None'}\n" + message += f"**URL:** {html_url}\n\n" + + # Show branch status + if branch_status == "created": + message += f"🌿 **Branch Created:** `{branch_name}` (from `{self.valves.DEFAULT_BRANCH}`)\n\n" + elif branch_status == "exists": + message += f"🌿 **Branch Ready:** `{branch_name}` (already exists)\n\n" + elif branch_status == "failed": + message += f"⚠️ **Branch:** Could not create `{branch_name}` - you may need to call `create_branch()`\n\n" + + if body: + message += "## 📝 Description\n\n" + if len(body) > 1000: + message += f"{body[:1000]}...\n\n" + else: + message += f"{body}\n\n" + + message += "## 🧪 Testing Requirements\n\n" + if is_testing_required: + if testing_criteria: + message += "**Testing Criteria:**\n" + for criterion in testing_criteria: + message += f"- [ ] {criterion}\n" + message += "\n" + else: + message += "Testing required, but no specific criteria listed.\n\n" + else: + message += "No explicit testing requirements detected.\n\n" + + if technical_notes: + message += "## 🔧 Technical Notes\n\n" + for note in technical_notes: + message += f"- {note}\n" + message += "\n" + + if is_docs_required: + message += "## 📚 Documentation Required\n\n" + message += "This ticket mentions documentation needs.\n\n" + + if issue_refs: + message += "## 🔗 Related Issues\n\n" + for ref in issue_refs: + message += f"- {ref}\n" + message += "\n" + + message += "## 🚀 Workflow\n\n" + message += f"1. **Read files:** `get_file(path)` or `list_files(path)` - branch `{branch_name}` is ready\n" + message += "2. **Make changes:** `apply_diff()` or `commit_changes()`\n" + message += f"3. **Update ticket:** `update_ticket({issue_number}, comment)` ← **Document your work!**\n" + message += "4. **Create PR:** `create_pull_request(title)`\n\n" + message += "---\n" + message += f"💡 **Important:** Always update the ticket with your progress and findings.\n" + message += f"The ticket is your **statement of work** - it documents what was done and why.\n" + + # Persist state for context recovery (helps models that lose track) + if __metadata__ and __metadata__.get("chat_id"): + self._gitea.save_state( + __metadata__["chat_id"], + repo=f"{owner}/{repo_name}", + issue=issue_number, + branch=branch_name, + ) + + retVal["status"] = "success" + retVal["message"] = message + return retVal + + except httpx.HTTPStatusError as e: + error_msg = self._gitea.format_error(e, f"issue #{issue_number}") + if e.response.status_code == 404: + retVal["message"] = ( + f"Issue #{issue_number} not found in {owner}/{repo_name}." + ) + else: + retVal["message"] = f"Failed to fetch issue. {error_msg}" + return retVal + except Exception as e: + retVal["message"] = f"Unexpected failure: {type(e).__name__}: {e}" + return retVal + + async def read_ticket_by_url( + self, + url: str, + __user__: dict = None, + __metadata__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> dict: + """ + Read and parse a ticket/issue by URL. + Format: https://///issues/ + + Automatically creates the working branch (chat_id) if it doesn't exist. + This enables immediate file operations after reading the ticket. + """ + + retVal = {"status": "failure", "message": ""} + + # Parse URL + owner, repo, issue_number = self._gitea.parse_issue_url(url) + + if not owner or not repo or not issue_number: + retVal["message"] = ( + f"Invalid issue URL format. Expected: https://domain/owner/repo/issues/number\nGot: {url}" + ) + return retVal + + # Use the standard read_ticket with parsed components + return await self.read_ticket( + issue_number=issue_number, + repo=f"{owner}/{repo}", + __user__=__user__, + __metadata__=__metadata__, + __event_emitter__=__event_emitter__, + ) + + async def update_ticket( + self, + issue_number: int, + comment: str, + repo: Optional[str] = None, + __user__: dict = None, + __metadata__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> dict: + """ + Post a status update comment to a ticket. + + THIS IS YOUR STATEMENT OF WORK. + + Use this to document: + - What you analyzed and found + - What changes you made and why + - Decisions and trade-offs + - Testing performed + - Any blockers or questions + + Update the ticket throughout your work, not just at the end. + The ticket comment history is the audit trail of the work performed. + """ + + retVal = {"status": "failure", "message": ""} + + token = self._gitea.get_token(__user__) + if not token: + retVal["message"] = "GITEA_TOKEN not configured in UserValves settings." + return retVal + + try: + owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__) + except ValueError as e: + retVal["message"] = str(e) + return retVal + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Updating issue #{issue_number}...", + "done": False, + }, + } + ) + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + response = await client.post( + self._gitea.api_url( + f"/repos/{owner}/{repo_name}/issues/{issue_number}/comments" + ), + headers=self._gitea.headers(__user__), + json={"body": comment}, + ) + response.raise_for_status() + result = response.json() + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + retVal["status"] = "success" + retVal["message"] = f"✅ Updated issue #{issue_number} with status comment" + return retVal + + except httpx.HTTPStatusError as e: + error_msg = self._gitea.format_error(e, f"issue #{issue_number} update") + retVal["message"] = f"Failed to update issue. {error_msg}" + return retVal + except Exception as e: + retVal["message"] = f"Unexpected failure: {type(e).__name__}: {e}" + return retVal + + async def create_branch( + self, + repo: Optional[str] = None, + __user__: dict = None, + __metadata__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> dict: + """ + Create a new branch with name = chat_id. + KISS: Branch name IS chat_id. Nothing else. + """ + + retVal = {"status": "failure", "message": ""} + + token = self._gitea.get_token(__user__) + if not token: + retVal["message"] = "GITEA_TOKEN not configured in UserValves settings." + return retVal + + try: + owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__) + except ValueError as e: + retVal["message"] = str(e) + return retVal + + # Branch name IS chat_id + branch_name = self._gitea.get_branch(__user__, __metadata__) + + # Check if protected branch + if branch_name in self.valves.PROTECTED_BRANCHES: + retVal["message"] = ( + f"❌ Cannot create branch with protected name '{branch_name}'" + ) + return retVal + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Creating branch {branch_name}...", + "done": False, + }, + } + ) + + # Get base branch + base_branch = self.valves.DEFAULT_BRANCH + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + response = await client.post( + self._gitea.api_url(f"/repos/{owner}/{repo_name}/branches"), + headers=self._gitea.headers(__user__), + json={ + "new_branch_name": branch_name, + "old_branch_name": base_branch, + }, + ) + + # Handle branch already exists + if response.status_code == 409: + retVal["status"] = "success" + retVal["message"] = f"⚠️ Branch `{branch_name}` already exists." + return retVal + + response.raise_for_status() + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + # Persist state for context recovery + if __metadata__ and __metadata__.get("chat_id"): + self._gitea.save_state( + __metadata__["chat_id"], + repo=f"{owner}/{repo_name}", + branch=branch_name, + ) + + retVal["status"] = "success" + retVal[ + "message" + ] = f"""✅ **Branch Created Successfully** + +**Branch:** `{branch_name}` (chat_id) +**Base Branch:** `{base_branch}` +**Repository:** `{owner}/{repo_name}` + +**Next Steps:** +1. Make changes to files using `apply_diff()` or `commit_changes()` +2. Update ticket with status: `update_ticket(issue_number, comment)` +3. Create PR: `create_pull_request(title)` +""" + return retVal + + except httpx.HTTPStatusError as e: + error_msg = self._gitea.format_error(e, "branch creation") + retVal["message"] = f"Failed to create branch. {error_msg}" + return retVal + except Exception as e: + retVal["message"] = f"Unexpected failure: {type(e).__name__}: {e}" + return retVal + + async def get_branch_status( + self, + repo: Optional[str] = None, + __user__: dict = None, + __metadata__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> dict: + """Get the current working branch status""" + + retVal = {"status": "success", "message": ""} + + try: + owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__) + except ValueError as e: + retVal["status"] = "failure" + retVal["message"] = str(e) + return retVal + + # Get branch from metadata (chat_id) + working_branch = self._gitea.get_branch(__user__, __metadata__) + + message = f"# 🌿 Branch Status: {owner}/{repo_name}\n\n" + message += f"**Current Branch:** `{working_branch}` (chat_id)\n\n" + + if __metadata__ and __metadata__.get("chat_id"): + message += f"**Chat ID:** `{__metadata__.get('chat_id')}`\n\n" + + message += "All file operations will use this branch.\n" + + retVal["message"] = message + return retVal + + async def get_session_state( + self, + __user__: dict = None, + __metadata__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> dict: + """ + View the persisted session state. + + State is automatically saved when you read tickets, create branches, + or make commits. This helps maintain context across operations. + """ + retVal = {"status": "success", "message": ""} + + chat_id = __metadata__.get("chat_id") if __metadata__ else None + + if not chat_id: + retVal["message"] = ( + "No chat_id available - session state requires a chat context." + ) + return retVal + + state = self._gitea.load_state(chat_id) + + if not state: + retVal[ + "message" + ] = f"""# 🧠 Session State + +**Chat ID:** `{chat_id}` +**State:** Empty (no operations performed yet) + +Session state is automatically populated when you: +- Read a ticket (`read_ticket` / `read_ticket_by_url`) +- Create a branch (`create_branch`) +- Make commits + +This state helps maintain context across operations. +""" + return retVal + + message = f"# 🧠 Session State\n\n" + message += f"**Chat ID:** `{chat_id}`\n\n" + + if state.get("repo"): + message += f"**Repository:** `{state['repo']}`\n" + if state.get("issue"): + message += f"**Issue:** #{state['issue']}\n" + if state.get("branch"): + message += f"**Branch:** `{state['branch']}`\n" + if state.get("updated_at"): + message += f"**Last Updated:** {state['updated_at']}\n" + + message += "\nThis state is automatically used when repo/issue is not explicitly provided." + + retVal["message"] = message + return retVal + + async def clear_session_state( + self, + __user__: dict = None, + __metadata__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> dict: + """ + Clear the persisted session state. + + Use this to start fresh or switch to a different repository/issue. + """ + retVal = {"status": "success", "message": ""} + + chat_id = __metadata__.get("chat_id") if __metadata__ else None + + if not chat_id: + retVal["message"] = "No chat_id available - nothing to clear." + return retVal + + state_path = self._gitea._state_path(chat_id) + + if state_path.exists(): + try: + state_path.unlink() + retVal["message"] = ( + f"✅ Session state cleared for chat `{chat_id[:8]}...`\n\nYou can now start fresh with a new repository or issue." + ) + except Exception as e: + retVal["status"] = "failure" + retVal["message"] = f"Failed to clear state: {e}" + else: + retVal["message"] = "Session state was already empty." + + return retVal + + async def list_sessions( + self, + __user__: dict = None, + __metadata__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> dict: + """ + List all persisted sessions with their metadata. + + Use this to find orphaned work from crashed/dead sessions. + Sessions can be claimed with `claim_session(chat_id)` to continue work. + """ + retVal = {"status": "success", "message": ""} + + current_chat_id = __metadata__.get("chat_id") if __metadata__ else None + state_dir = self._gitea._state_dir() / "chats" + + if not state_dir.exists(): + retVal["message"] = "No sessions found. State directory does not exist." + return retVal + + sessions = [] + for chat_dir in state_dir.iterdir(): + if chat_dir.is_dir(): + state_file = chat_dir / "state.json" + if state_file.exists(): + try: + state = json.loads(state_file.read_text()) + state["chat_id"] = chat_dir.name + state["is_current"] = chat_dir.name == current_chat_id + sessions.append(state) + except Exception: + pass + + if not sessions: + retVal["message"] = "No sessions found." + return retVal + + # Sort by updated_at descending (most recent first) + sessions.sort(key=lambda x: x.get("updated_at", ""), reverse=True) + + message = "# 📋 Saved Sessions\n\n" + message += f"**Current Chat:** `{current_chat_id[:8] if current_chat_id else 'unknown'}...`\n\n" + message += "---\n\n" + + for s in sessions: + chat_id = s.get("chat_id", "unknown") + is_current = s.get("is_current", False) + + marker = " ← **CURRENT**" if is_current else "" + message += f"### `{chat_id[:8]}...`{marker}\n\n" + + if s.get("repo"): + message += f"- **Repo:** `{s['repo']}`\n" + if s.get("issue"): + message += f"- **Issue:** #{s['issue']}\n" + if s.get("branch"): + message += f"- **Branch:** `{s['branch'][:12]}...`\n" + if s.get("pr_number"): + message += f"- **PR:** #{s['pr_number']}\n" + if s.get("updated_at"): + message += f"- **Last Activity:** {s['updated_at']}\n" + + if not is_current: + message += f'\n→ `claim_session("{chat_id}")` to continue this work\n' + + message += "\n---\n\n" + + message += f"**Total Sessions:** {len(sessions)}\n" + + retVal["message"] = message + return retVal + + async def claim_session( + self, + session_id: str, + __user__: dict = None, + __metadata__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> dict: + """ + Claim an orphaned session to continue its work. + + Use `list_sessions()` to find available sessions. + This copies the session's state (repo, issue, branch) to the current chat, + allowing you to continue work on an existing branch after a session death. + + IMPORTANT: The branch name in Gitea will still be the old chat_id. + This function updates the tool's working context, not the Gitea branch name. + """ + retVal = {"status": "failure", "message": ""} + + current_chat_id = __metadata__.get("chat_id") if __metadata__ else None + + if not current_chat_id: + retVal["message"] = "No current chat_id available." + return retVal + + if session_id == current_chat_id: + retVal["message"] = "Cannot claim your own session - already active." + retVal["status"] = "success" + return retVal + + # Load the orphaned session's state + orphan_state = self._gitea.load_state(session_id) + + if not orphan_state: + retVal["message"] = ( + f"Session `{session_id[:8]}...` not found or has no state." + ) + return retVal + + # Copy state to current session, preserving the original branch name + self._gitea.save_state( + current_chat_id, + repo=orphan_state.get("repo"), + issue=orphan_state.get("issue"), + branch=orphan_state.get("branch"), # Keep original branch name! + pr_number=orphan_state.get("pr_number"), + claimed_from=session_id, + ) + + message = f"✅ **Session Claimed**\n\n" + message += f"**Claimed From:** `{session_id[:8]}...`\n" + message += f"**Current Chat:** `{current_chat_id[:8]}...`\n\n" + message += "**Inherited State:**\n" + + if orphan_state.get("repo"): + message += f"- **Repo:** `{orphan_state['repo']}`\n" + if orphan_state.get("issue"): + message += f"- **Issue:** #{orphan_state['issue']}\n" + if orphan_state.get("branch"): + message += f"- **Branch:** `{orphan_state['branch']}`\n" + if orphan_state.get("pr_number"): + message += f"- **PR:** #{orphan_state['pr_number']}\n" + + message += "\n⚠️ **Note:** File operations will use branch `{branch}`, not your current chat_id.\n".format( + branch=orphan_state.get("branch", "unknown")[:12] + "..." + ) + message += "This is correct - you're continuing work on the existing branch.\n" + + retVal["status"] = "success" + retVal["message"] = message + return retVal + + async def list_branches( + self, + repo: Optional[str] = None, + __user__: dict = None, + __metadata__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> dict: + """List all branches in the repository""" + + retVal = {"status": "failure", "message": ""} + + token = self._gitea.get_token(__user__) + if not token: + retVal["message"] = "GITEA_TOKEN not configured." + return retVal + + try: + owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__) + except ValueError as e: + retVal["message"] = str(e) + return retVal + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Fetching branches...", "done": False}, + } + ) + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + response = await client.get( + self._gitea.api_url(f"/repos/{owner}/{repo_name}/branches"), + headers=self._gitea.headers(__user__), + params={"limit": 50}, + ) + response.raise_for_status() + branches = response.json() + + message = f"# 🌿 Branches in {owner}/{repo_name}\n\n" + + # Separate protected and other branches + protected = [b for b in branches if b.get("protected")] + other = [b for b in branches if not b.get("protected")] + + if protected: + message += "## 🛡️ Protected Branches\n\n" + for branch in sorted(protected, key=lambda x: x["name"]): + name = branch.get("name", "") + commit_sha = branch.get("commit", {}).get("id", "")[:8] + message += f"- `{name}` [commit: {commit_sha}]\n" + message += "\n" + + if other: + message += "## 📦 Other Branches\n\n" + for branch in sorted(other, key=lambda x: x["name"])[:20]: + name = branch.get("name", "") + commit_sha = branch.get("commit", {}).get("id", "")[:8] + message += f"- `{name}` [commit: {commit_sha}]\n" + if len(other) > 20: + message += f"\n... and {len(other) - 20} more branches\n" + message += "\n" + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + retVal["status"] = "success" + retVal["message"] = message.strip() + return retVal + + except httpx.HTTPStatusError as e: + error_msg = self._gitea.format_error(e, "branch listing") + retVal["message"] = f"Failed to list branches. {error_msg}" + return retVal + except Exception as e: + retVal["message"] = f"Unexpected failure: {type(e).__name__}: {e}" + return retVal + + async def apply_diff( + self, + path: str, + diff_content: str, + message: Optional[str] = None, + repo: Optional[str] = None, + auto_message: bool = True, + __user__: dict = None, + __metadata__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> dict: + """ + Apply a unified diff patch to a file. + + This is the PREFERRED method for making changes as it: + 1. Is precise about what changes + 2. Prevents accidental file replacements + 3. Is what LLMs understand best (trained on GitHub PRs) + + REQUIRED FORMAT - Unified diff with hunk headers: + ```diff + --- a/path/to/file + +++ b/path/to/file + @@ -line,count +line,count @@ + context line + +added line + -removed line + ``` + """ + + retVal = {"status": "failure", "message": ""} + + # Get working branch from metadata + effective_branch = self._gitea.get_branch(__user__, __metadata__) + + token = self._gitea.get_token(__user__) + if not token: + retVal["message"] = "GITEA_TOKEN not configured in UserValves settings." + return retVal + + try: + owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__) + except ValueError as e: + retVal["message"] = str(e) + return retVal + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Applying diff to {path}...", + "done": False, + }, + } + ) + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + # Get current file content + get_response = await client.get( + self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), + headers=self._gitea.headers(__user__), + params={"ref": effective_branch}, + ) + + # Check if file exists + if get_response.status_code == 404: + retVal["message"] = ( + f"File not found: `{path}`. Use `create_file()` to create a new file." + ) + return retVal + + get_response.raise_for_status() + file_info = get_response.json() + current_sha = file_info.get("sha") + + # Decode current content + current_content_b64 = file_info.get("content", "") + try: + current_content = base64.b64decode(current_content_b64).decode( + "utf-8" + ) + except Exception: + retVal["message"] = "Could not decode current file content." + return retVal + + # Parse and apply the diff + new_content, error_msg = self._gitea.apply_unified_diff( + current_content, diff_content + ) + + if new_content is None: + retVal[ + "message" + ] = f"""❌ **Failed to apply diff** + +**Reason:** {error_msg} + +**Required format:** Unified diff with hunk headers +```diff +--- a/{path} ++++ b/{path} +@@ -10,3 +10,4 @@ + existing line (context) ++new line to add +-line to remove +``` + +**Key requirements:** +- `@@` hunk headers are REQUIRED (e.g., `@@ -10,3 +10,4 @@`) +- Lines starting with `+` are additions +- Lines starting with `-` are deletions +- Lines starting with ` ` (space) are context + +Use `get_file(path)` to see current content and construct a proper diff. +""" + return retVal + + # Check if diff actually changed anything + if new_content == current_content: + retVal[ + "message" + ] = f"""⚠️ **Diff produced no changes** + +The diff was parsed but resulted in no modifications to the file. + +**Possible causes:** +1. The context lines in your diff don't match the actual file content +2. The hunk line numbers are incorrect +3. The changes were already applied + +Use `get_file("{path}")` to see the current content and verify your diff targets the correct lines. +""" + return retVal + + # Generate commit message if needed + if not message: + if auto_message: + message = self._gitea.generate_diff_commit_message( + path, diff_content + ) + else: + retVal["message"] = ( + "Commit message is required when auto_message=False." + ) + return retVal + + # Commit the changes + new_content_b64 = base64.b64encode(new_content.encode("utf-8")).decode( + "ascii" + ) + + response = await client.put( + self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), + headers=self._gitea.headers(__user__), + json={ + "content": new_content_b64, + "message": message, + "branch": effective_branch, + "sha": current_sha, + }, + ) + response.raise_for_status() + result = response.json() + + commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] + + # Parse diff stats + added_lines = len( + [ + l + for l in diff_content.splitlines() + if l.startswith("+") and not l.startswith("+++") + ] + ) + removed_lines = len( + [ + l + for l in diff_content.splitlines() + if l.startswith("-") and not l.startswith("---") + ] + ) + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + # Prepare patched content for review + # For large files, show a truncated version + patched_lines = new_content.splitlines() + total_lines = len(patched_lines) + + if total_lines <= 100: + # Small file - show entire content + patched_preview = new_content + else: + # Large file - show first 30 and last 20 lines with indicator + head = "\n".join(patched_lines[:30]) + tail = "\n".join(patched_lines[-20:]) + patched_preview = ( + f"{head}\n\n... [{total_lines - 50} lines omitted] ...\n\n{tail}" + ) + + message_text = f"✅ **Diff Applied - REVIEW REQUIRED**\n\n" + message_text += f"**File:** `{path}`\n" + message_text += f"**Branch:** `{effective_branch}`\n" + message_text += f"**Commit:** `{commit_sha}`\n" + message_text += ( + f"**Changes:** +{added_lines} lines, -{removed_lines} lines\n" + ) + message_text += f"**Message:** {message}\n\n" + + message_text += "---\n\n" + message_text += "## 📋 Review the Patched File\n\n" + message_text += "**Check for:**\n" + message_text += "- ❌ Accidental line removals\n" + message_text += "- ❌ Code placed in wrong location\n" + message_text += "- ❌ Duplicate code blocks\n" + message_text += "- ❌ Missing imports or dependencies\n" + message_text += "- ❌ Broken syntax or indentation\n\n" + + message_text += f"**Patched Content ({total_lines} lines):**\n\n" + message_text += f"```\n{patched_preview}\n```\n\n" + + message_text += "---\n" + message_text += ( + "⚠️ **If the patch is incorrect**, use `apply_diff()` again to fix, " + ) + message_text += "or `get_file()` to see current state and `replace_file()` to correct.\n" + + retVal["status"] = "success" + retVal["message"] = message_text + return retVal + + except httpx.HTTPStatusError as e: + error_msg = self._gitea.format_error(e, f"diff application to '{path}'") + if e.response.status_code == 409: + retVal["message"] = ( + f"Update conflict for `{path}`. Fetch the latest version and try again." + ) + else: + retVal["message"] = f"Failed to apply diff. {error_msg}" + return retVal + except Exception as e: + retVal["message"] = ( + f"Unexpected failure during diff application: {type(e).__name__}: {e}" + ) + return retVal + + async def commit_changes( + self, + path: str, + content: str, + message: Optional[str] = None, + repo: Optional[str] = None, + max_delta_percent: Optional[float] = None, + __user__: dict = None, + __metadata__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> dict: + """ + Commit file changes with automatic content detection and size delta gating. + + This method: + 1. Detects whether to create or replace a file + 2. Validates file size changes against threshold (quality gate) + 3. Auto-generates commit message if not provided + 4. Commits to the working branch + """ + + retVal = {"status": "failure", "message": ""} + + # Get working branch from metadata + effective_branch = self._gitea.get_branch(__user__, __metadata__) + + token = self._gitea.get_token(__user__) + if not token: + retVal["message"] = "GITEA_TOKEN not configured in UserValves settings." + return retVal + + try: + owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__) + except ValueError as e: + retVal["message"] = str(e) + return retVal + + # Use provided threshold or default from valves + delta_threshold = max_delta_percent or self.valves.MAX_SIZE_DELTA_PERCENT + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Processing {path}...", + "done": False, + }, + } + ) + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + # Check if file exists + get_response = await client.get( + self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), + headers=self._gitea.headers(__user__), + params={"ref": effective_branch}, + ) + + file_exists = get_response.status_code == 200 + current_sha = None + current_size = 0 + + if file_exists: + get_response.raise_for_status() + file_info = get_response.json() + current_sha = file_info.get("sha") + current_size = file_info.get("size", 0) + + # SIZE DELTA GATE - Quality check + new_size = len(content.encode("utf-8")) + if current_size > 0 and new_size > 0: + delta_percent = ( + abs(new_size - current_size) / current_size * 100 + ) + + if delta_percent > delta_threshold: + # Calculate actual bytes changed + size_diff = new_size - current_size + direction = "larger" if size_diff > 0 else "smaller" + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": "Size gate triggered", + "done": True, + "hidden": True, + }, + } + ) + + retVal[ + "message" + ] = f"""⚠️ **Quality Gate: Large File Change Detected** + +**File:** `{path}` +**Current Size:** {current_size} bytes +**New Size:** {new_size} bytes +**Change:** {size_diff:+d} bytes ({delta_percent:.1f}% {direction}) +**Threshold:** {delta_threshold}% + +**Recommended: Use diff-based updates instead** +```python +apply_diff(path="{path}", diff_content="...") +``` + +Or override with: +```python +commit_changes(..., max_delta_percent=100) +``` +""" + return retVal + + # Generate commit message if not provided + if not message: + message = self._gitea.generate_commit_message( + change_type="chore", + scope=path.split("/")[-1] if "/" in path else path, + description=f"update {path}", + ) + + # Prepare content + content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii") + + if file_exists: + # Replace existing file + if not current_sha: + retVal["message"] = ( + f"Could not retrieve SHA for existing file: {path}" + ) + return retVal + + response = await client.put( + self._gitea.api_url( + f"/repos/{owner}/{repo_name}/contents/{path}" + ), + headers=self._gitea.headers(__user__), + json={ + "content": content_b64, + "message": message, + "branch": effective_branch, + "sha": current_sha, + }, + ) + else: + # Create new file + response = await client.post( + self._gitea.api_url( + f"/repos/{owner}/{repo_name}/contents/{path}" + ), + headers=self._gitea.headers(__user__), + json={ + "content": content_b64, + "message": message, + "branch": effective_branch, + }, + ) + + response.raise_for_status() + result = response.json() + + commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] + action = "Updated" if file_exists else "Created" + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + # Calculate and show size change + new_size = len(content.encode("utf-8")) + size_info = "" + if file_exists and current_size > 0: + delta = new_size - current_size + delta_percent = delta / current_size * 100 + size_info = ( + f"**Size Change:** {delta:+d} bytes ({delta_percent:.1f}%)\n" + ) + + message_text = f"""✅ **{action} File Successfully** + +**File:** `{path}` +**Branch:** `{effective_branch}` +**Commit:** `{commit_sha}` +**Message:** {message} + +{size_info}**Action:** {action} +""" + + retVal["status"] = "success" + retVal["message"] = message_text + return retVal + + except httpx.HTTPStatusError as e: + error_msg = self._gitea.format_error(e, f"file commit for '{path}'") + if e.response.status_code == 409: + retVal["message"] = ( + f"Update conflict for `{path}`. Fetch the latest version and try again." + ) + else: + retVal["message"] = f"Failed to commit file. {error_msg}" + return retVal + except Exception as e: + retVal["message"] = ( + f"Unexpected failure during commit: {type(e).__name__}: {e}" + ) + return retVal + + async def create_pull_request( + self, + title: str, + body: Optional[str] = "", + repo: Optional[str] = None, + __user__: dict = None, + __metadata__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> dict: + """Create a pull request from the current branch""" + + retVal = {"status": "failure", "message": ""} + + # Get working branch from metadata + head_branch = self._gitea.get_branch(__user__, __metadata__) + + token = self._gitea.get_token(__user__) + if not token: + retVal["message"] = "GITEA_TOKEN not configured." + return retVal + + try: + owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__) + except ValueError as e: + retVal["message"] = str(e) + return retVal + + base_branch = self.valves.DEFAULT_BRANCH + + # Check if protected branch + if head_branch in self.valves.PROTECTED_BRANCHES: + retVal["message"] = ( + f"❌ Cannot create PR from protected branch '{head_branch}'" + ) + return retVal + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Creating PR...", "done": False}, + } + ) + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + response = await client.post( + self._gitea.api_url(f"/repos/{owner}/{repo_name}/pulls"), + headers=self._gitea.headers(__user__), + json={ + "title": title, + "head": head_branch, + "base": base_branch, + "body": body, + }, + ) + + # Handle PR already exists + if response.status_code == 409: + retVal["message"] = ( + f"⚠️ PR already exists for branch `{head_branch}`" + ) + return retVal + + response.raise_for_status() + pr = response.json() + + pr_number = pr.get("number") + pr_url = pr.get("html_url", "") + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + # Persist PR number for session recovery + if __metadata__ and __metadata__.get("chat_id"): + self._gitea.save_state(__metadata__["chat_id"], pr_number=pr_number) + + retVal["status"] = "success" + retVal[ + "message" + ] = f"""✅ **Pull Request Created Successfully** + +**PR #{pr_number}:** {title} +**Branch:** `{head_branch}` → `{base_branch}` +**URL:** {pr_url} + +**Next Steps:** +1. Read PR feedback: `read_pull_request({pr_number})` +2. Address reviewer comments +3. Update ticket with PR link +""" + return retVal + + except httpx.HTTPStatusError as e: + error_msg = self._gitea.format_error(e, "PR creation") + if e.response.status_code == 422: + retVal["message"] = ( + "Could not create PR. The branch may not exist or there may be merge conflicts." + ) + else: + retVal["message"] = f"Failed to create PR. {error_msg}" + return retVal + except Exception as e: + retVal["message"] = ( + f"Unexpected failure during PR creation: {type(e).__name__}: {e}" + ) + return retVal + + async def read_pull_request( + self, + pr_number: int, + repo: Optional[str] = None, + __user__: dict = None, + __metadata__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> dict: + """ + Read pull request details including review feedback. + This is how you get feedback to iterate on changes. + """ + + retVal = {"status": "failure", "message": ""} + + token = self._gitea.get_token(__user__) + if not token: + retVal["message"] = "GITEA_TOKEN not configured." + return retVal + + try: + owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__) + except ValueError as e: + retVal["message"] = str(e) + return retVal + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Fetching PR #{pr_number}...", + "done": False, + }, + } + ) + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + # Get PR details + pr_response = await client.get( + self._gitea.api_url( + f"/repos/{owner}/{repo_name}/pulls/{pr_number}" + ), + headers=self._gitea.headers(__user__), + ) + pr_response.raise_for_status() + pr = pr_response.json() + + # Get PR comments + comments_response = await client.get( + self._gitea.api_url( + f"/repos/{owner}/{repo_name}/pulls/{pr_number}/comments" + ), + headers=self._gitea.headers(__user__), + ) + comments_response.raise_for_status() + comments = comments_response.json() + + title = pr.get("title", "") + body = pr.get("body", "") + state = pr.get("state", "") + user = pr.get("user", {}).get("login", "") + head_branch = pr.get("head", {}).get("ref", "") + base_branch = pr.get("base", {}).get("ref", "") + mergeable = pr.get("mergeable", False) + merged = pr.get("merged", False) + html_url = pr.get("html_url", "") + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + # Build message + message = f"# 🔀 Pull Request #{pr_number}: {title}\n\n" + message += f"**State:** {state.upper()} | **Author:** @{user}\n" + message += f"**Branch:** `{head_branch}` → `{base_branch}`\n" + message += f"**Mergeable:** {'✅ Yes' if mergeable else '❌ No'}\n" + message += f"**Merged:** {'✅ Yes' if merged else '❌ No'}\n" + message += f"**URL:** {html_url}\n\n" + + if body: + message += "## 📝 Description\n\n" + message += f"{body}\n\n" + + if comments: + message += f"## 💬 Review Comments ({len(comments)})\n\n" + for comment in comments[:10]: # Limit to 10 most recent + comment_user = comment.get("user", {}).get("login", "unknown") + comment_body = comment.get("body", "") + comment_path = comment.get("path", "") + comment_line = comment.get("line", "") + + message += f"**@{comment_user}**" + if comment_path: + message += f" on `{comment_path}`" + if comment_line: + message += f" (line {comment_line})" + message += f":\n{comment_body}\n\n" + + if len(comments) > 10: + message += f"... and {len(comments) - 10} more comments\n\n" + else: + message += "## 💬 No review comments yet\n\n" + + message += "## 🚀 Next Steps\n\n" + if comments: + message += "1. Address review comments\n" + message += ( + "2. Make changes using `apply_diff()` or `commit_changes()`\n" + ) + message += "3. Update ticket with progress\n" + else: + message += "Waiting for review feedback...\n" + + retVal["status"] = "success" + retVal["message"] = message + return retVal + + except httpx.HTTPStatusError as e: + error_msg = self._gitea.format_error(e, f"PR #{pr_number}") + if e.response.status_code == 404: + retVal["message"] = f"PR #{pr_number} not found in {owner}/{repo_name}." + else: + retVal["message"] = f"Failed to fetch PR. {error_msg}" + return retVal + except Exception as e: + retVal["message"] = f"Unexpected failure: {type(e).__name__}: {e}" + return retVal + + async def replace_file( + self, + path: str, + content: str, + message: str, + repo: Optional[str] = None, + __user__: dict = None, + __metadata__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> dict: + """ + Update an existing file in the repository. + WARNING: This replaces the entire file content. + Use `apply_diff()` for incremental changes. + """ + + retVal = {"status": "failure", "message": ""} + + effective_branch = self._gitea.get_branch(__user__, __metadata__) + + token = self._gitea.get_token(__user__) + if not token: + retVal["message"] = "GITEA_TOKEN not configured in UserValves settings." + return retVal + + try: + owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__) + except ValueError as e: + retVal["message"] = str(e) + return retVal + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": f"Updating {path}...", "done": False}, + } + ) + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + get_response = await client.get( + self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), + headers=self._gitea.headers(__user__), + params={"ref": effective_branch}, + ) + + if get_response.status_code == 404: + retVal["message"] = ( + f"File not found: `{path}`. Use `create_file()` to create a new file." + ) + return retVal + + get_response.raise_for_status() + file_info = get_response.json() + sha = file_info.get("sha") + + if not sha: + retVal["message"] = "Could not retrieve file SHA for update." + return retVal + + content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii") + + response = await client.put( + self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), + headers=self._gitea.headers(__user__), + json={ + "content": content_b64, + "message": message, + "branch": effective_branch, + "sha": sha, + }, + ) + response.raise_for_status() + result = response.json() + + commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + message_text = f"✅ **File Updated Successfully**\n\n" + message_text += f"**File:** `{path}`\n" + message_text += f"**Branch:** `{effective_branch}`\n" + message_text += f"**Commit:** `{commit_sha}`\n" + message_text += f"**Message:** {message}\n" + + retVal["status"] = "success" + retVal["message"] = message_text + return retVal + + except httpx.HTTPStatusError as e: + error_msg = self._gitea.format_error(e, f"file update for '{path}'") + if e.response.status_code == 409: + retVal["message"] = ( + f"Update conflict for `{path}`. Fetch the latest version and try again." + ) + else: + retVal["message"] = f"Failed to update file. {error_msg}" + return retVal + except Exception as e: + retVal["message"] = ( + f"Unexpected failure during file update: {type(e).__name__}: {e}" + ) + return retVal + + async def create_file( + self, + path: str, + content: str, + message: str, + repo: Optional[str] = None, + __user__: dict = None, + __metadata__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> dict: + """Create a new file in the repository""" + + retVal = {"status": "failure", "message": ""} + + effective_branch = self._gitea.get_branch(__user__, __metadata__) + + token = self._gitea.get_token(__user__) + if not token: + retVal["message"] = "GITEA_TOKEN not configured in UserValves settings." + return retVal + + try: + owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__) + except ValueError as e: + retVal["message"] = str(e) + return retVal + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": f"Creating {path}...", "done": False}, + } + ) + + try: + content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii") + + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + response = await client.post( + self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), + headers=self._gitea.headers(__user__), + json={ + "content": content_b64, + "message": message, + "branch": effective_branch, + }, + ) + response.raise_for_status() + result = response.json() + + commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + message_text = f"✅ **File Created Successfully**\n\n" + message_text += f"**File:** `{path}`\n" + message_text += f"**Branch:** `{effective_branch}`\n" + message_text += f"**Commit:** `{commit_sha}`\n" + message_text += f"**Message:** {message}\n" + + retVal["status"] = "success" + retVal["message"] = message_text + return retVal + + except httpx.HTTPStatusError as e: + error_msg = self._gitea.format_error(e, f"file creation for '{path}'") + if e.response.status_code == 422: + retVal["message"] = ( + f"File already exists: `{path}`. Use `replace_file()` or `apply_diff()` to modify it." + ) + else: + retVal["message"] = f"Failed to create file. {error_msg}" + return retVal + except Exception as e: + retVal["message"] = ( + f"Unexpected failure during file creation: {type(e).__name__}: {e}" + ) + return retVal + + async def delete_file( + self, + path: str, + message: str, + repo: Optional[str] = None, + __user__: dict = None, + __metadata__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + __event_call__: Callable[[dict], Any] = None, + ) -> dict: + """Delete a file from the repository""" + + retVal = {"status": "failure", "message": ""} + + effective_branch = self._gitea.get_branch(__user__, __metadata__) + + token = self._gitea.get_token(__user__) + if not token: + retVal["message"] = "GITEA_TOKEN not configured in UserValves settings." + return retVal + + try: + owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__) + except ValueError as e: + retVal["message"] = str(e) + return retVal + + # Confirmation dialog + if __event_call__: + result = await __event_call__( + { + "type": "confirmation", + "data": { + "title": "Confirm File Deletion", + "message": f"Delete `{path}` from `{owner}/{repo_name}` ({effective_branch})?", + }, + } + ) + if result is None or result is False: + retVal["message"] = "⚠️ File deletion cancelled by user." + return retVal + if isinstance(result, dict) and not result.get("confirmed"): + retVal["message"] = "⚠️ File deletion cancelled by user." + return retVal + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": f"Deleting {path}...", "done": False}, + } + ) + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + get_response = await client.get( + self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), + headers=self._gitea.headers(__user__), + params={"ref": effective_branch}, + ) + + if get_response.status_code == 404: + retVal["message"] = f"File not found: `{path}`" + return retVal + + get_response.raise_for_status() + file_info = get_response.json() + sha = file_info.get("sha") + + if not sha: + retVal["message"] = "Could not retrieve file SHA for deletion." + return retVal + + response = await client.delete( + self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), + headers=self._gitea.headers(__user__), + json={ + "message": message, + "branch": effective_branch, + "sha": sha, + }, + ) + response.raise_for_status() + result = response.json() + + commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + message_text = f"✅ **File Deleted Successfully**\n\n" + message_text += f"**File:** `{path}`\n" + message_text += f"**Branch:** `{effective_branch}`\n" + message_text += f"**Commit:** `{commit_sha}`\n" + message_text += f"**Message:** {message}\n" + + retVal["status"] = "success" + retVal["message"] = message_text + return retVal + + except httpx.HTTPStatusError as e: + error_msg = self._gitea.format_error(e, f"file deletion for '{path}'") + if e.response.status_code == 404: + retVal["message"] = f"File not found: `{path}`" + else: + retVal["message"] = f"Failed to delete file. {error_msg}" + return retVal + except Exception as e: + retVal["message"] = ( + f"Unexpected failure during file deletion: {type(e).__name__}: {e}" + ) + return retVal + + async def rename_file( + self, + old_path: str, + new_path: str, + message: Optional[str] = None, + repo: Optional[str] = None, + __user__: dict = None, + __metadata__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> dict: + """Rename a file in the repository""" + + retVal = {"status": "failure", "message": ""} + + effective_branch = self._gitea.get_branch(__user__, __metadata__) + + token = self._gitea.get_token(__user__) + if not token: + retVal["message"] = "GITEA_TOKEN not configured in UserValves settings." + return retVal + + try: + owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__) + except ValueError as e: + retVal["message"] = str(e) + return retVal + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Renaming {old_path} to {new_path}...", + "done": False, + }, + } + ) + + try: + if not message: + message = self._gitea.generate_commit_message( + change_type="chore", + scope="rename", + description=f"rename {old_path} → {new_path}", + ) + + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + get_response = await client.get( + self._gitea.api_url( + f"/repos/{owner}/{repo_name}/contents/{old_path}" + ), + headers=self._gitea.headers(__user__), + params={"ref": effective_branch}, + ) + + if get_response.status_code == 404: + retVal["message"] = f"File not found: `{old_path}`" + return retVal + + get_response.raise_for_status() + old_file = get_response.json() + old_sha = old_file.get("sha") + + content_b64 = old_file.get("content", "") + try: + content = base64.b64decode(content_b64).decode("utf-8") + except Exception: + retVal["message"] = "Could not decode file content." + return retVal + + new_content_b64 = base64.b64encode(content.encode("utf-8")).decode( + "ascii" + ) + + create_response = await client.post( + self._gitea.api_url( + f"/repos/{owner}/{repo_name}/contents/{new_path}" + ), + headers=self._gitea.headers(__user__), + json={ + "content": new_content_b64, + "message": message, + "branch": effective_branch, + }, + ) + + if create_response.status_code == 422: + retVal["message"] = f"File already exists at new path: `{new_path}`" + return retVal + + create_response.raise_for_status() + + delete_response = await client.delete( + self._gitea.api_url( + f"/repos/{owner}/{repo_name}/contents/{old_path}" + ), + headers=self._gitea.headers(__user__), + json={ + "message": message, + "branch": effective_branch, + "sha": old_sha, + }, + ) + delete_response.raise_for_status() + + commit_sha = ( + create_response.json().get("commit", {}).get("sha", "unknown")[:8] + ) + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + message_text = f"✅ **File Renamed Successfully**\n\n" + message_text += f"**Old Path:** `{old_path}`\n" + message_text += f"**New Path:** `{new_path}`\n" + message_text += f"**Branch:** `{effective_branch}`\n" + message_text += f"**Commit:** `{commit_sha}`\n" + message_text += f"**Message:** {message}\n" + + retVal["status"] = "success" + retVal["message"] = message_text + return retVal + + except httpx.HTTPStatusError as e: + error_msg = self._gitea.format_error(e, "file rename") + retVal["message"] = f"Failed to rename file. {error_msg}" + return retVal + except Exception as e: + retVal["message"] = ( + f"Unexpected failure during file rename: {type(e).__name__}: {e}" + ) + return retVal + + async def get_file( + self, + path: str, + repo: Optional[str] = None, + __user__: dict = None, + __metadata__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> dict: + """Get the contents of a file from the repository""" + + retVal = {"status": "failure", "message": ""} + + effective_branch = self._gitea.get_branch(__user__, __metadata__) + + token = self._gitea.get_token(__user__) + if not token: + retVal["message"] = "GITEA_TOKEN not configured in UserValves settings." + return retVal + + try: + owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__) + except ValueError as e: + retVal["message"] = str(e) + return retVal + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Reading file {path}...", + "done": False, + }, + } + ) + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + response = await client.get( + self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), + headers=self._gitea.headers(__user__), + params={"ref": effective_branch}, + ) + response.raise_for_status() + file_info = response.json() + + if isinstance(file_info, list): + retVal["message"] = ( + f"'{path}' is a directory. Use `list_files()` to browse its contents." + ) + return retVal + + if file_info.get("type") != "file": + retVal["message"] = ( + f"'{path}' is not a file (type: {file_info.get('type')})" + ) + return retVal + + content_b64 = file_info.get("content", "") + try: + content = base64.b64decode(content_b64).decode("utf-8") + except Exception: + retVal["message"] = ( + "Could not decode file content. The file may be binary." + ) + return retVal + + size = file_info.get("size", 0) + sha_short = file_info.get("sha", "unknown")[:8] + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + message_text = f"**File:** `{path}`\n" + message_text += f"**Branch:** `{effective_branch}`\n" + message_text += f"**SHA:** `{sha_short}` | **Size:** {size} bytes\n" + message_text += f"**URL:** {file_info.get('html_url', 'N/A')}\n\n" + message_text += f"```\n{content}\n```\n" + + retVal["status"] = "success" + retVal["message"] = message_text + return retVal + + except httpx.HTTPStatusError as e: + error_msg = self._gitea.format_error(e, f"file fetch for '{path}'") + if e.response.status_code == 404: + retVal["message"] = f"File not found: `{path}`" + else: + retVal["message"] = f"Failed to fetch file. {error_msg}" + return retVal + except Exception as e: + retVal["message"] = f"Unexpected failure: {type(e).__name__}: {e}" + return retVal + + async def list_files( + self, + path: str = "", + repo: Optional[str] = None, + __user__: dict = None, + __metadata__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> dict: + """List files and directories in a repository path""" + + retVal = {"status": "failure", "message": ""} + + effective_branch = self._gitea.get_branch(__user__, __metadata__) + + token = self._gitea.get_token(__user__) + if not token: + retVal["message"] = "GITEA_TOKEN not configured in UserValves settings." + return retVal + + try: + owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__) + except ValueError as e: + retVal["message"] = str(e) + return retVal + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Listing {path or 'root'}...", + "done": False, + }, + } + ) + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + response = await client.get( + self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), + headers=self._gitea.headers(__user__), + params={"ref": effective_branch}, + ) + response.raise_for_status() + contents = response.json() + + if isinstance(contents, dict): + retVal["message"] = ( + f"'{path}' is a file. Use `get_file()` to read its contents." + ) + return retVal + + message = f"**Contents of {owner}/{repo_name}/{path or '.'}** (`{effective_branch}`)\n\n" + + dirs = [item for item in contents if item.get("type") == "dir"] + files = [item for item in contents if item.get("type") == "file"] + + if dirs: + message += "**📁 Directories:**\n" + for item in sorted(dirs, key=lambda x: x.get("name", "").lower()): + message += f"- `📁 {item.get('name', '')}/`\n" + message += "\n" + + if files: + message += "**📄 Files:**\n" + for item in sorted(files, key=lambda x: x.get("name", "").lower()): + size = item.get("size", 0) + if size < 1024: + size_str = f"{size}B" + elif size < 1024 * 1024: + size_str = f"{size//1024}KB" + else: + size_str = f"{size//(1024*1024)}MB" + sha_short = item.get("sha", "unknown")[:8] + message += ( + f"- `{item.get('name', '')}` ({size_str}, sha: {sha_short})\n" + ) + + message += f"\n**Total:** {len(dirs)} directories, {len(files)} files" + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + retVal["status"] = "success" + retVal["message"] = message + return retVal + + except httpx.HTTPStatusError as e: + error_msg = self._gitea.format_error(e, f"directory listing for '{path}'") + if e.response.status_code == 404: + retVal["message"] = f"Path not found: `{path}`" + else: + retVal["message"] = f"Failed to list directory contents. {error_msg}" + return retVal + except Exception as e: + retVal["message"] = f"Unexpected failure: {type(e).__name__}: {e}" + return retVal