""" title: Gitea Coder - Workflow Role with Automatic Branch Management author: Jeff Smith + minimax + Claude version: 1.0.0 license: MIT description: High-level workflow role for LLM-based code generation with automatic branch management and quality gates requirements: pydantic, httpx changelog: 1.0.0: - Initial implementation of gitea_coder role - Branch name = chat_id (KISS principle) - Stateless design - no caching complexity - Branch creation with scope gating (prevents main pushes) - Generates detailed commit messages with ticket references - Creates PRs from branches - Reads ticket requirements from issues (by number or URL) - Updates tickets with status - Reads PR feedback - Unified file operations workflow - Diff-based updates with apply_diff() - Size delta gating in commit_changes() for quality control - Complete CRUD operations: create_file, replace_file, delete_file, rename_file - FIX: apply_unified_diff now properly fails when no valid hunks are parsed - FIX: apply_diff detects no-op changes and returns failure with guidance - FEATURE: read_ticket auto-creates branch for immediate file operations - FEATURE: apply_diff returns patched file content for LLM review - FEATURE: Persistent session state (repo/issue/branch/pr) survives restarts - FEATURE: State enables cheaper models (kimi-k2, qwen3) that lose context - FEATURE: get_session_state() and clear_session_state() for debugging - FEATURE: list_sessions() finds orphaned work from crashed chats - FEATURE: claim_session() continues work from dead sessions (disaster recovery) - ENHANCEMENT: Simplified 4-step workflow (read ticket → modify → update ticket → PR) - ENHANCEMENT: Ticket updates emphasized as "statement of work" / audit trail - ENHANCEMENT: Review checklist for common diff errors (accidental removals, misplacements) - ENHANCEMENT: State resolution priority: explicit arg > state file > env var > UserValves > Valves - ENHANCEMENT: Branch resolution uses state for session continuity after claim_session """ from typing import Optional, Callable, Any, Dict, List, Tuple from pydantic import BaseModel, Field from datetime import datetime from pathlib import Path import re import base64 import httpx import json import os class GiteaHelpers: """ Helper methods for Gitea API interactions. Designed to be mixed into Tools class via composition. """ def __init__(self, tools_instance): """Initialize with reference to parent Tools instance for valve access""" self.tools = tools_instance @property def valves(self): """Access valves from parent Tools instance""" return self.tools.valves # ============= STATE PERSISTENCE ============= # Persists repo/issue/branch across restarts and helps models # that struggle with context tracking (kimi-k2, qwen3-code) def _state_dir(self) -> Path: """Get base state directory""" base = os.environ.get( "GITEA_CODER_STATE_DIR", os.path.expanduser("~/.gitea_coder") ) return Path(base) def _state_path(self, chat_id: str) -> Path: """Get state file path for a chat session""" return self._state_dir() / "chats" / chat_id / "state.json" def load_state(self, chat_id: str) -> dict: """Load persisted state for chat session""" if not chat_id: return {} path = self._state_path(chat_id) if path.exists(): try: return json.loads(path.read_text()) except Exception: return {} return {} def save_state(self, chat_id: str, **updates): """Update and persist state for chat session""" if not chat_id: return try: state = self.load_state(chat_id) state.update(updates) state["updated_at"] = datetime.utcnow().isoformat() path = self._state_path(chat_id) path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(state, indent=2)) except Exception: pass # Best effort - don't fail operations on state save errors def get_state_summary(self, chat_id: str) -> str: """Get human-readable state summary for debugging""" state = self.load_state(chat_id) if not state: return "No persisted state" parts = [] if state.get("repo"): parts.append(f"repo=`{state['repo']}`") if state.get("issue"): parts.append(f"issue=#{state['issue']}") if state.get("branch"): parts.append(f"branch=`{state['branch'][:8]}...`") return ", ".join(parts) if parts else "Empty state" # ============= API HELPERS ============= def api_url(self, endpoint: str) -> str: """Construct full API URL for Gitea endpoint""" base = self.get_url() return f"{base}/api/v1{endpoint}" def get_url(self) -> str: """Get effective Gitea URL with trailing slash handling""" return self.valves.GITEA_URL.rstrip("/") def get_token(self, __user__: dict = None) -> str: """Extract Gitea token from user context""" if __user__ and "valves" in __user__: user_valves = __user__.get("valves") if user_valves: token = getattr(user_valves, "GITEA_TOKEN", "") if token: return token return "" def headers(self, __user__: dict = None) -> dict: """Generate authentication headers with token""" token = self.get_token(__user__) headers = {"Content-Type": "application/json"} if token: headers["Authorization"] = f"token {token}" return headers def format_error(self, e: httpx.HTTPStatusError, context: str = "") -> str: """Format HTTP error with detailed context""" try: error_json = e.response.json() error_msg = error_json.get("message", e.response.text[:200]) except Exception: error_msg = e.response.text[:200] context_str = f" ({context})" if context else "" return f"HTTP Error {e.response.status_code}{context_str}: {error_msg}" def get_repo( self, repo: Optional[str], __user__: dict = None, __metadata__: dict = None ) -> str: """ Get effective repository with priority resolution: 1. Explicit arg (always wins) 2. Persistent state (learned from previous operations) 3. Environment variable GITEA_DEFAULT_REPO (k8s/cron) 4. UserValves override (interactive) 5. Admin Valves default """ # 1. Explicit arg always wins if repo: return repo # 2. Check persistent state (helps models that lose track) if __metadata__: chat_id = __metadata__.get("chat_id") if chat_id: state = self.load_state(chat_id) if state.get("repo"): return state["repo"] # 3. Environment variable (k8s/cron native) env_repo = os.environ.get("GITEA_DEFAULT_REPO") if env_repo: return env_repo # 4. UserValves override if __user__ and "valves" in __user__: user_valves = __user__.get("valves") if user_valves and self.valves.ALLOW_USER_OVERRIDES: user_repo = getattr(user_valves, "USER_DEFAULT_REPO", "") if user_repo: return user_repo # 5. Admin Valves default return self.valves.DEFAULT_REPO def get_branch(self, __user__: dict = None, __metadata__: dict = None) -> str: """ Get effective branch name with priority resolution: 1. Claimed session branch (from state file - for disaster recovery) 2. chat_id from metadata (default - branch = chat) 3. UserValves override 4. Fallback to DEFAULT_BRANCH """ chat_id = None if __metadata__: chat_id = __metadata__.get("chat_id") # Check if we've claimed another session's branch (disaster recovery) if chat_id: state = self.load_state(chat_id) if state.get("branch"): # Use branch from state - could be our own or claimed return state["branch"] else: # No state yet, use chat_id as branch return chat_id # Then check user valves override if __user__ and "valves" in __user__: user_valves = __user__.get("valves") if user_valves and self.valves.ALLOW_USER_OVERRIDES: user_branch = getattr(user_valves, "USER_DEFAULT_BRANCH", "") if user_branch: return user_branch # Finally fall back to default return self.valves.DEFAULT_BRANCH def resolve_repo( self, repo: Optional[str], __user__: dict = None, __metadata__: dict = None ) -> Tuple[str, str]: """Resolve repository string into owner and repo name""" effective_repo = self.get_repo(repo, __user__, __metadata__) if not effective_repo: raise ValueError( "No repository specified. Set DEFAULT_REPO in Valves, USER_DEFAULT_REPO in UserValves, " "or pass repo='owner/name' explicitly." ) if "/" not in effective_repo: raise ValueError( f"Repository must be in 'owner/repo' format, got: {effective_repo}" ) return tuple(effective_repo.split("/", 1)) def parse_issue_url( self, url: str ) -> Tuple[Optional[str], Optional[str], Optional[int]]: """ Parse a Gitea issue URL into components. Format: https://///issues/ Returns: Tuple of (owner, repo, issue_number) or (None, None, None) if invalid """ # Match: https://domain/owner/repo/issues/123 match = re.match(r"https?://[^/]+/([^/]+)/([^/]+)/issues/(\d+)", url) if match: owner = match.group(1) repo = match.group(2) issue_number = int(match.group(3)) return (owner, repo, issue_number) return (None, None, None) def parse_issue_refs(self, text: str) -> List[str]: """Extract issue references from text (e.g., #42, issue #42)""" refs = re.findall(r"#(\d+)", text) issue_refs = [f"#{ref}" for ref in refs] # Also check for "issue N" pattern issue_n_refs = re.findall(r"issue\s*#?(\d+)", text, re.IGNORECASE) for ref in issue_n_refs: issue_ref = f"#{ref}" if issue_ref not in issue_refs: issue_refs.append(issue_ref) return issue_refs def apply_unified_diff( self, current_content: str, diff_content: str ) -> Tuple[Optional[str], str]: """ Apply a unified diff to content. Args: current_content: Current file content diff_content: Unified diff patch Returns: Tuple of (new_content, error_message) - On success: (new_content, "") - On failure: (None, error_description) """ try: diff_lines = diff_content.splitlines(keepends=True) # Validate diff format - check for hunk headers has_hunk_header = any(line.startswith("@@") for line in diff_lines) has_add_or_remove = any( line.startswith("+") or line.startswith("-") for line in diff_lines if not line.startswith("+++") and not line.startswith("---") ) if not has_hunk_header: return ( None, "No hunk headers (@@) found. Unified diff format required.", ) if not has_add_or_remove: return (None, "No additions (+) or deletions (-) found in diff.") # Parse hunks from unified diff hunks = [] current_hunk = None in_hunk = False for line in diff_lines: if line.startswith("---") or line.startswith("+++"): continue # Skip file headers elif line.startswith("@@"): if current_hunk: hunks.append(current_hunk) # Parse hunk header: @@ -old_line,old_count +new_line,new_count @@ match = re.search( r"@@\s+-(\d+)(?:,(\d+))?\s+\+(\d+)(?:,(\d+))?\s+@@", line ) if match: old_start = int(match.group(1)) new_start = int(match.group(3)) current_hunk = { "old_start": old_start, "new_start": new_start, "lines": [], } in_hunk = True else: return (None, f"Invalid hunk header format: {line.strip()}") continue elif in_hunk and len(line) > 0 and line[0:1] in ("+", "-", " "): if current_hunk: current_hunk["lines"].append(line) elif in_hunk and line.strip() == "": # Empty line in diff - treat as context if current_hunk: current_hunk["lines"].append(" \n") elif in_hunk: if current_hunk: hunks.append(current_hunk) current_hunk = None in_hunk = False if current_hunk: hunks.append(current_hunk) if not hunks: return (None, "No valid hunks could be parsed from the diff content.") # Split content into lines old_lines = current_content.splitlines(keepends=False) new_lines = list(old_lines) # Apply each hunk in reverse order to maintain correct indices for hunk in sorted(hunks, key=lambda h: h["old_start"], reverse=True): old_start = hunk["old_start"] - 1 # Convert to 0-indexed # Collect lines to remove and add lines_to_remove = [] lines_to_add = [] for line in hunk["lines"]: if line.startswith("+"): lines_to_add.append(line[1:].rstrip("\n")) elif line.startswith("-"): lines_to_remove.append(line[1:].rstrip("\n")) # Remove old lines if they match if lines_to_remove: end_idx = old_start + len(lines_to_remove) if end_idx <= len(new_lines): actual_lines = new_lines[old_start:end_idx] if actual_lines == lines_to_remove: del new_lines[old_start:end_idx] else: # Context mismatch - warn but continue del new_lines[old_start:end_idx] # Insert new lines at old_start for line in reversed(lines_to_add): new_lines.insert(old_start, line) # Reconstruct content with line endings new_content = "\n".join(new_lines) if new_content and not new_content.endswith("\n"): new_content += "\n" return (new_content, "") except Exception as e: return (None, f"Diff parsing error: {type(e).__name__}: {e}") def generate_commit_message( self, change_type: str, scope: str, description: str, issue_refs: Optional[List[str]] = None, body: Optional[str] = None, ) -> str: """ Generate a conventional commit message. Format: (): """ # Validate and normalize change type valid_types = [ "feat", "fix", "docs", "style", "refactor", "test", "chore", "perf", "ci", "build", "revert", ] if change_type.lower() not in valid_types: change_type = "chore" # Build the subject line scope_str = f"({scope})" if scope else "" message = f"{change_type.lower()}{scope_str}: {description}" # Add issue references to footer if issue_refs: refs_str = ", ".join(issue_refs) footer = f"Refs: {refs_str}" if body: body = f"{body}\n\n{footer}" else: message = f"{message}\n\n{footer}" if body: message = f"{message}\n\n{body}" return message def generate_diff_commit_message(self, path: str, diff_content: str) -> str: """Generate a commit message from diff content""" file_name = path.split("/")[-1] # Detect change type from diff change_type = "chore" diff_lines = diff_content.splitlines() if any( line.startswith("+def ") or line.startswith("+class ") for line in diff_lines ): change_type = "feat" elif any(line.startswith("+ return ") for line in diff_lines): change_type = "fix" elif "test" in path.lower() or "spec" in path.lower(): change_type = "test" elif ".md" in path.lower() or "readme" in path.lower(): change_type = "docs" elif any(line.startswith("-") for line in diff_lines): change_type = "refactor" # Extract description from added lines added_lines = [ line[1:].strip() for line in diff_lines if line.startswith("+") and not line.startswith("+++") ] description = "update" if added_lines: for line in added_lines: if line and not line.startswith(("import ", "from ")): match = re.match( r"(def|class|const|var|let|interface|type)\s+(\w+)", line ) if match: kind, name = match.groups() if kind == "def": description = f"add {name}() function" break elif kind == "class": description = f"add {name} class" break elif not line.startswith((" ", "\t")): description = line[:50].rstrip(":") if len(line) > 50: description += "..." break return f"{change_type}({file_name}): {description}" class Tools: """ Gitea Coder Role - High-level workflow automation for code generation tasks. ARCHITECTURE: - Stateless design - no caching complexity - Branch name = chat_id (PERIOD. Nothing else.) - All operations are self-contained - LLM focuses on code, not infrastructure Workflow: reads ticket → creates branch (chat_id) → commits → updates ticket → creates PR Key Features: - Branch name IS chat_id (KISS principle) - Auto-generates conventional commit messages - Quality gates for file changes (size delta validation) - Diff-based updates to prevent accidental file replacements - Complete CRUD operations (create, replace, delete, rename) - Ticket updates for status tracking - PR reading for feedback """ class Valves(BaseModel): """System-wide configuration for Gitea Coder integration""" GITEA_URL: str = Field( default="https://gitea.example.com", description="Gitea server URL", ) DEFAULT_REPO: str = Field( default="", description="Default repository in owner/repo format", ) DEFAULT_BRANCH: str = Field( default="main", description="Default branch name for base operations", ) ALLOW_USER_OVERRIDES: bool = Field( default=True, description="Allow users to override defaults via UserValves", ) VERIFY_SSL: bool = Field( default=True, description="Verify SSL certificates", ) MAX_SIZE_DELTA_PERCENT: float = Field( default=50.0, description="Maximum allowed file size change percentage (quality gate)", ge=1.0, le=500.0, ) PROTECTED_BRANCHES: List[str] = Field( default=["main", "master", "develop", "dev", "release", "hotfix"], description="Branches that cannot be committed to directly", ) class UserValves(BaseModel): """Per-user configuration""" GITEA_TOKEN: str = Field( default="", description="Your Gitea API token", ) USER_DEFAULT_REPO: str = Field( default="", description="Override default repository", ) USER_DEFAULT_BRANCH: str = Field( default="", description="Override default branch", ) def __init__(self): """Initialize tools""" self.valves = self.Valves() self.user_valves = self.UserValves() self.citation = True # Initialize helper functions self._gitea = GiteaHelpers(self) async def workflow_summary( self, __user__: dict = None, __metadata__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> dict: """Get a summary of available coder workflows and commands""" branch = self._gitea.get_branch(__user__, __metadata__) message = f"""# 🚀 Gitea Coder Workflow Guide ## Workflow (4 Steps) 1. **Read the ticket:** `read_ticket(issue_number)` or `read_ticket_by_url(url)` - Branch `{branch}` is auto-created when you read the ticket - Files are immediately accessible 2. **Read & modify files:** - `get_file(path)` - Read current content - `list_files(path)` - Browse repository - `apply_diff(path, diff, message)` - Apply changes (preferred) - `commit_changes(path, content, message)` - Full file replacement 3. **Update the ticket:** `update_ticket(issue_number, comment)` ← **CRITICAL** - This is your **statement of work** - Document what you did, what you found, decisions made - Update throughout the process, not just at the end 4. **Create PR:** `create_pull_request(title, description)` - Use `read_pull_request(pr_number)` to check feedback ## Available Commands ### 📋 Ticket Operations (Statement of Work) - `read_ticket(issue_number)` - Get issue details, auto-creates branch - `read_ticket_by_url(url)` - Get issue details by URL, auto-creates branch - `update_ticket(issue_number, comment)` - **Document your work on the ticket** ### 🌿 Branch Management - `create_branch()` - Manually create branch (usually not needed) - `get_branch_status()` - See current working branch - `list_branches()` - List all branches in repository ### 🧠 Session State (Auto-managed) - `get_session_state()` - View persisted repo/issue/branch context - `clear_session_state()` - Reset session (start fresh) - `list_sessions()` - Find orphaned sessions from crashed chats - `claim_session(chat_id)` - Continue work from a dead session ### 📝 File Operations - `get_file(path)` - Read file content - `list_files(path)` - List directory contents - `apply_diff(path, diff, message)` - Apply unified diff patch (preferred) - `commit_changes(path, content, message)` - Commit with size delta gate - `replace_file(path, content, message)` - Replace entire file - `create_file(path, content, message)` - Create new file - `delete_file(path, message)` - Delete a file - `rename_file(old_path, new_path, message)` - Rename a file ### 📦 Pull Request Operations - `create_pull_request(title, description)` - Create PR from current branch - `read_pull_request(pr_number)` - Get PR details and review feedback ## Current Session **Working Branch:** `{branch}` (same as chat_id) ## Diff Format Guide When using `apply_diff()`, provide a **unified diff** format: ```diff --- a/path/to/file.js +++ b/path/to/file.js @@ -10,3 +10,4 @@ existing line (context) +new line to add another existing line -line to remove ``` **Key elements:** - `@@` hunk headers are REQUIRED - Lines starting with `+` are additions - Lines starting with `-` are deletions - Lines starting with ` ` (space) are context ## Best Practices 1. **Always update the ticket** - It's your audit trail and statement of work 2. **Use `apply_diff()` for changes** - More precise, prevents accidents 3. **Read files before modifying** - Understand current state first 4. **Commit messages matter** - They're auto-generated but can be customized """ return {"status": "success", "message": message} async def read_ticket( self, issue_number: int, repo: Optional[str] = None, __user__: dict = None, __metadata__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> dict: """ Read and parse a ticket/issue by number. Automatically creates the working branch (chat_id) if it doesn't exist. This enables immediate file operations after reading the ticket. """ retVal = {"status": "failure", "message": ""} token = self._gitea.get_token(__user__) if not token: retVal["message"] = "GITEA_TOKEN not configured in UserValves settings." return retVal try: owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__) except ValueError as e: retVal["message"] = str(e) return retVal # Get branch name (chat_id) branch_name = self._gitea.get_branch(__user__, __metadata__) branch_status = "unknown" # Auto-create branch for immediate file operations if branch_name not in self.valves.PROTECTED_BRANCHES: if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Setting up branch {branch_name[:8]}...", "done": False, }, } ) try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: response = await client.post( self._gitea.api_url(f"/repos/{owner}/{repo_name}/branches"), headers=self._gitea.headers(__user__), json={ "new_branch_name": branch_name, "old_branch_name": self.valves.DEFAULT_BRANCH, }, ) if response.status_code == 409: branch_status = "exists" elif response.status_code in (200, 201): branch_status = "created" else: branch_status = "failed" except Exception: branch_status = "failed" if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Fetching issue #{issue_number}...", "done": False, }, } ) try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: response = await client.get( self._gitea.api_url( f"/repos/{owner}/{repo_name}/issues/{issue_number}" ), headers=self._gitea.headers(__user__), ) response.raise_for_status() issue = response.json() title = issue.get("title", "No title") body = issue.get("body", "") state = issue.get("state", "unknown") user = issue.get("user", {}).get("login", "unknown") labels = [label.get("name", "") for label in issue.get("labels", [])] created_at = issue.get("created_at", "")[:10] html_url = issue.get("html_url", "") # Parse body for structured info testing_criteria = [] technical_notes = [] is_testing_required = "test" in body.lower() or "testing" in body.lower() is_docs_required = "documentation" in body.lower() or "docs" in body.lower() if is_testing_required: testing_section = re.search( r"(?:testing|criteria|test).*?:(.*?)(?:\n\n|$)", body, re.IGNORECASE | re.DOTALL, ) if testing_section: testing_criteria = [ line.strip().lstrip("-*•") for line in testing_section.group(1).split("\n") if line.strip() ] tech_section = re.search( r"(?:technical|tech).*?:(.*?)(?:\n\n|$)", body, re.IGNORECASE | re.DOTALL, ) if tech_section: technical_notes = [ line.strip().lstrip("-*•") for line in tech_section.group(1).split("\n") if line.strip() ] issue_refs = self._gitea.parse_issue_refs(body) if not any(ref == f"#{issue_number}" for ref in issue_refs): issue_refs.insert(0, f"#{issue_number}") if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) # Build message message = f"# 📋 Ticket #{issue_number}: {title}\n\n" message += f"**State:** {state.upper()} | **Author:** @{user} | **Created:** {created_at}\n" message += f"**Labels:** {', '.join(labels) if labels else 'None'}\n" message += f"**URL:** {html_url}\n\n" # Show branch status if branch_status == "created": message += f"🌿 **Branch Created:** `{branch_name}` (from `{self.valves.DEFAULT_BRANCH}`)\n\n" elif branch_status == "exists": message += f"🌿 **Branch Ready:** `{branch_name}` (already exists)\n\n" elif branch_status == "failed": message += f"⚠️ **Branch:** Could not create `{branch_name}` - you may need to call `create_branch()`\n\n" if body: message += "## 📝 Description\n\n" if len(body) > 1000: message += f"{body[:1000]}...\n\n" else: message += f"{body}\n\n" message += "## 🧪 Testing Requirements\n\n" if is_testing_required: if testing_criteria: message += "**Testing Criteria:**\n" for criterion in testing_criteria: message += f"- [ ] {criterion}\n" message += "\n" else: message += "Testing required, but no specific criteria listed.\n\n" else: message += "No explicit testing requirements detected.\n\n" if technical_notes: message += "## 🔧 Technical Notes\n\n" for note in technical_notes: message += f"- {note}\n" message += "\n" if is_docs_required: message += "## 📚 Documentation Required\n\n" message += "This ticket mentions documentation needs.\n\n" if issue_refs: message += "## 🔗 Related Issues\n\n" for ref in issue_refs: message += f"- {ref}\n" message += "\n" message += "## 🚀 Workflow\n\n" message += f"1. **Read files:** `get_file(path)` or `list_files(path)` - branch `{branch_name}` is ready\n" message += "2. **Make changes:** `apply_diff()` or `commit_changes()`\n" message += f"3. **Update ticket:** `update_ticket({issue_number}, comment)` ← **Document your work!**\n" message += "4. **Create PR:** `create_pull_request(title)`\n\n" message += "---\n" message += f"💡 **Important:** Always update the ticket with your progress and findings.\n" message += f"The ticket is your **statement of work** - it documents what was done and why.\n" # Persist state for context recovery (helps models that lose track) if __metadata__ and __metadata__.get("chat_id"): self._gitea.save_state( __metadata__["chat_id"], repo=f"{owner}/{repo_name}", issue=issue_number, branch=branch_name, ) retVal["status"] = "success" retVal["message"] = message return retVal except httpx.HTTPStatusError as e: error_msg = self._gitea.format_error(e, f"issue #{issue_number}") if e.response.status_code == 404: retVal["message"] = ( f"Issue #{issue_number} not found in {owner}/{repo_name}." ) else: retVal["message"] = f"Failed to fetch issue. {error_msg}" return retVal except Exception as e: retVal["message"] = f"Unexpected failure: {type(e).__name__}: {e}" return retVal async def read_ticket_by_url( self, url: str, __user__: dict = None, __metadata__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> dict: """ Read and parse a ticket/issue by URL. Format: https://///issues/ Automatically creates the working branch (chat_id) if it doesn't exist. This enables immediate file operations after reading the ticket. """ retVal = {"status": "failure", "message": ""} # Parse URL owner, repo, issue_number = self._gitea.parse_issue_url(url) if not owner or not repo or not issue_number: retVal["message"] = ( f"Invalid issue URL format. Expected: https://domain/owner/repo/issues/number\nGot: {url}" ) return retVal # Use the standard read_ticket with parsed components return await self.read_ticket( issue_number=issue_number, repo=f"{owner}/{repo}", __user__=__user__, __metadata__=__metadata__, __event_emitter__=__event_emitter__, ) async def update_ticket( self, issue_number: int, comment: str, repo: Optional[str] = None, __user__: dict = None, __metadata__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> dict: """ Post a status update comment to a ticket. THIS IS YOUR STATEMENT OF WORK. Use this to document: - What you analyzed and found - What changes you made and why - Decisions and trade-offs - Testing performed - Any blockers or questions Update the ticket throughout your work, not just at the end. The ticket comment history is the audit trail of the work performed. """ retVal = {"status": "failure", "message": ""} token = self._gitea.get_token(__user__) if not token: retVal["message"] = "GITEA_TOKEN not configured in UserValves settings." return retVal try: owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__) except ValueError as e: retVal["message"] = str(e) return retVal if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Updating issue #{issue_number}...", "done": False, }, } ) try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: response = await client.post( self._gitea.api_url( f"/repos/{owner}/{repo_name}/issues/{issue_number}/comments" ), headers=self._gitea.headers(__user__), json={"body": comment}, ) response.raise_for_status() result = response.json() if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) retVal["status"] = "success" retVal["message"] = f"✅ Updated issue #{issue_number} with status comment" return retVal except httpx.HTTPStatusError as e: error_msg = self._gitea.format_error(e, f"issue #{issue_number} update") retVal["message"] = f"Failed to update issue. {error_msg}" return retVal except Exception as e: retVal["message"] = f"Unexpected failure: {type(e).__name__}: {e}" return retVal async def create_branch( self, repo: Optional[str] = None, __user__: dict = None, __metadata__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> dict: """ Create a new branch with name = chat_id. KISS: Branch name IS chat_id. Nothing else. """ retVal = {"status": "failure", "message": ""} token = self._gitea.get_token(__user__) if not token: retVal["message"] = "GITEA_TOKEN not configured in UserValves settings." return retVal try: owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__) except ValueError as e: retVal["message"] = str(e) return retVal # Branch name IS chat_id branch_name = self._gitea.get_branch(__user__, __metadata__) # Check if protected branch if branch_name in self.valves.PROTECTED_BRANCHES: retVal["message"] = ( f"❌ Cannot create branch with protected name '{branch_name}'" ) return retVal if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Creating branch {branch_name}...", "done": False, }, } ) # Get base branch base_branch = self.valves.DEFAULT_BRANCH try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: response = await client.post( self._gitea.api_url(f"/repos/{owner}/{repo_name}/branches"), headers=self._gitea.headers(__user__), json={ "new_branch_name": branch_name, "old_branch_name": base_branch, }, ) # Handle branch already exists if response.status_code == 409: retVal["status"] = "success" retVal["message"] = f"⚠️ Branch `{branch_name}` already exists." return retVal response.raise_for_status() if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) # Persist state for context recovery if __metadata__ and __metadata__.get("chat_id"): self._gitea.save_state( __metadata__["chat_id"], repo=f"{owner}/{repo_name}", branch=branch_name, ) retVal["status"] = "success" retVal[ "message" ] = f"""✅ **Branch Created Successfully** **Branch:** `{branch_name}` (chat_id) **Base Branch:** `{base_branch}` **Repository:** `{owner}/{repo_name}` **Next Steps:** 1. Make changes to files using `apply_diff()` or `commit_changes()` 2. Update ticket with status: `update_ticket(issue_number, comment)` 3. Create PR: `create_pull_request(title)` """ return retVal except httpx.HTTPStatusError as e: error_msg = self._gitea.format_error(e, "branch creation") retVal["message"] = f"Failed to create branch. {error_msg}" return retVal except Exception as e: retVal["message"] = f"Unexpected failure: {type(e).__name__}: {e}" return retVal async def get_branch_status( self, repo: Optional[str] = None, __user__: dict = None, __metadata__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> dict: """Get the current working branch status""" retVal = {"status": "success", "message": ""} try: owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__) except ValueError as e: retVal["status"] = "failure" retVal["message"] = str(e) return retVal # Get branch from metadata (chat_id) working_branch = self._gitea.get_branch(__user__, __metadata__) message = f"# 🌿 Branch Status: {owner}/{repo_name}\n\n" message += f"**Current Branch:** `{working_branch}` (chat_id)\n\n" if __metadata__ and __metadata__.get("chat_id"): message += f"**Chat ID:** `{__metadata__.get('chat_id')}`\n\n" message += "All file operations will use this branch.\n" retVal["message"] = message return retVal async def get_session_state( self, __user__: dict = None, __metadata__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> dict: """ View the persisted session state. State is automatically saved when you read tickets, create branches, or make commits. This helps maintain context across operations. """ retVal = {"status": "success", "message": ""} chat_id = __metadata__.get("chat_id") if __metadata__ else None if not chat_id: retVal["message"] = ( "No chat_id available - session state requires a chat context." ) return retVal state = self._gitea.load_state(chat_id) if not state: retVal[ "message" ] = f"""# 🧠 Session State **Chat ID:** `{chat_id}` **State:** Empty (no operations performed yet) Session state is automatically populated when you: - Read a ticket (`read_ticket` / `read_ticket_by_url`) - Create a branch (`create_branch`) - Make commits This state helps maintain context across operations. """ return retVal message = f"# 🧠 Session State\n\n" message += f"**Chat ID:** `{chat_id}`\n\n" if state.get("repo"): message += f"**Repository:** `{state['repo']}`\n" if state.get("issue"): message += f"**Issue:** #{state['issue']}\n" if state.get("branch"): message += f"**Branch:** `{state['branch']}`\n" if state.get("updated_at"): message += f"**Last Updated:** {state['updated_at']}\n" message += "\nThis state is automatically used when repo/issue is not explicitly provided." retVal["message"] = message return retVal async def clear_session_state( self, __user__: dict = None, __metadata__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> dict: """ Clear the persisted session state. Use this to start fresh or switch to a different repository/issue. """ retVal = {"status": "success", "message": ""} chat_id = __metadata__.get("chat_id") if __metadata__ else None if not chat_id: retVal["message"] = "No chat_id available - nothing to clear." return retVal state_path = self._gitea._state_path(chat_id) if state_path.exists(): try: state_path.unlink() retVal["message"] = ( f"✅ Session state cleared for chat `{chat_id[:8]}...`\n\nYou can now start fresh with a new repository or issue." ) except Exception as e: retVal["status"] = "failure" retVal["message"] = f"Failed to clear state: {e}" else: retVal["message"] = "Session state was already empty." return retVal async def list_sessions( self, __user__: dict = None, __metadata__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> dict: """ List all persisted sessions with their metadata. Use this to find orphaned work from crashed/dead sessions. Sessions can be claimed with `claim_session(chat_id)` to continue work. """ retVal = {"status": "success", "message": ""} current_chat_id = __metadata__.get("chat_id") if __metadata__ else None state_dir = self._gitea._state_dir() / "chats" if not state_dir.exists(): retVal["message"] = "No sessions found. State directory does not exist." return retVal sessions = [] for chat_dir in state_dir.iterdir(): if chat_dir.is_dir(): state_file = chat_dir / "state.json" if state_file.exists(): try: state = json.loads(state_file.read_text()) state["chat_id"] = chat_dir.name state["is_current"] = chat_dir.name == current_chat_id sessions.append(state) except Exception: pass if not sessions: retVal["message"] = "No sessions found." return retVal # Sort by updated_at descending (most recent first) sessions.sort(key=lambda x: x.get("updated_at", ""), reverse=True) message = "# 📋 Saved Sessions\n\n" message += f"**Current Chat:** `{current_chat_id[:8] if current_chat_id else 'unknown'}...`\n\n" message += "---\n\n" for s in sessions: chat_id = s.get("chat_id", "unknown") is_current = s.get("is_current", False) marker = " ← **CURRENT**" if is_current else "" message += f"### `{chat_id[:8]}...`{marker}\n\n" if s.get("repo"): message += f"- **Repo:** `{s['repo']}`\n" if s.get("issue"): message += f"- **Issue:** #{s['issue']}\n" if s.get("branch"): message += f"- **Branch:** `{s['branch'][:12]}...`\n" if s.get("pr_number"): message += f"- **PR:** #{s['pr_number']}\n" if s.get("updated_at"): message += f"- **Last Activity:** {s['updated_at']}\n" if not is_current: message += f'\n→ `claim_session("{chat_id}")` to continue this work\n' message += "\n---\n\n" message += f"**Total Sessions:** {len(sessions)}\n" retVal["message"] = message return retVal async def claim_session( self, session_id: str, __user__: dict = None, __metadata__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> dict: """ Claim an orphaned session to continue its work. Use `list_sessions()` to find available sessions. This copies the session's state (repo, issue, branch) to the current chat, allowing you to continue work on an existing branch after a session death. IMPORTANT: The branch name in Gitea will still be the old chat_id. This function updates the tool's working context, not the Gitea branch name. """ retVal = {"status": "failure", "message": ""} current_chat_id = __metadata__.get("chat_id") if __metadata__ else None if not current_chat_id: retVal["message"] = "No current chat_id available." return retVal if session_id == current_chat_id: retVal["message"] = "Cannot claim your own session - already active." retVal["status"] = "success" return retVal # Load the orphaned session's state orphan_state = self._gitea.load_state(session_id) if not orphan_state: retVal["message"] = ( f"Session `{session_id[:8]}...` not found or has no state." ) return retVal # Copy state to current session, preserving the original branch name self._gitea.save_state( current_chat_id, repo=orphan_state.get("repo"), issue=orphan_state.get("issue"), branch=orphan_state.get("branch"), # Keep original branch name! pr_number=orphan_state.get("pr_number"), claimed_from=session_id, ) message = f"✅ **Session Claimed**\n\n" message += f"**Claimed From:** `{session_id[:8]}...`\n" message += f"**Current Chat:** `{current_chat_id[:8]}...`\n\n" message += "**Inherited State:**\n" if orphan_state.get("repo"): message += f"- **Repo:** `{orphan_state['repo']}`\n" if orphan_state.get("issue"): message += f"- **Issue:** #{orphan_state['issue']}\n" if orphan_state.get("branch"): message += f"- **Branch:** `{orphan_state['branch']}`\n" if orphan_state.get("pr_number"): message += f"- **PR:** #{orphan_state['pr_number']}\n" message += "\n⚠️ **Note:** File operations will use branch `{branch}`, not your current chat_id.\n".format( branch=orphan_state.get("branch", "unknown")[:12] + "..." ) message += "This is correct - you're continuing work on the existing branch.\n" retVal["status"] = "success" retVal["message"] = message return retVal async def list_branches( self, repo: Optional[str] = None, __user__: dict = None, __metadata__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> dict: """List all branches in the repository""" retVal = {"status": "failure", "message": ""} token = self._gitea.get_token(__user__) if not token: retVal["message"] = "GITEA_TOKEN not configured." return retVal try: owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__) except ValueError as e: retVal["message"] = str(e) return retVal if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Fetching branches...", "done": False}, } ) try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: response = await client.get( self._gitea.api_url(f"/repos/{owner}/{repo_name}/branches"), headers=self._gitea.headers(__user__), params={"limit": 50}, ) response.raise_for_status() branches = response.json() message = f"# 🌿 Branches in {owner}/{repo_name}\n\n" # Separate protected and other branches protected = [b for b in branches if b.get("protected")] other = [b for b in branches if not b.get("protected")] if protected: message += "## 🛡️ Protected Branches\n\n" for branch in sorted(protected, key=lambda x: x["name"]): name = branch.get("name", "") commit_sha = branch.get("commit", {}).get("id", "")[:8] message += f"- `{name}` [commit: {commit_sha}]\n" message += "\n" if other: message += "## 📦 Other Branches\n\n" for branch in sorted(other, key=lambda x: x["name"])[:20]: name = branch.get("name", "") commit_sha = branch.get("commit", {}).get("id", "")[:8] message += f"- `{name}` [commit: {commit_sha}]\n" if len(other) > 20: message += f"\n... and {len(other) - 20} more branches\n" message += "\n" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) retVal["status"] = "success" retVal["message"] = message.strip() return retVal except httpx.HTTPStatusError as e: error_msg = self._gitea.format_error(e, "branch listing") retVal["message"] = f"Failed to list branches. {error_msg}" return retVal except Exception as e: retVal["message"] = f"Unexpected failure: {type(e).__name__}: {e}" return retVal async def apply_diff( self, path: str, diff_content: str, message: Optional[str] = None, repo: Optional[str] = None, auto_message: bool = True, __user__: dict = None, __metadata__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> dict: """ Apply a unified diff patch to a file. This is the PREFERRED method for making changes as it: 1. Is precise about what changes 2. Prevents accidental file replacements 3. Is what LLMs understand best (trained on GitHub PRs) REQUIRED FORMAT - Unified diff with hunk headers: ```diff --- a/path/to/file +++ b/path/to/file @@ -line,count +line,count @@ context line +added line -removed line ``` """ retVal = {"status": "failure", "message": ""} # Get working branch from metadata effective_branch = self._gitea.get_branch(__user__, __metadata__) token = self._gitea.get_token(__user__) if not token: retVal["message"] = "GITEA_TOKEN not configured in UserValves settings." return retVal try: owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__) except ValueError as e: retVal["message"] = str(e) return retVal if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Applying diff to {path}...", "done": False, }, } ) try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: # Get current file content get_response = await client.get( self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), headers=self._gitea.headers(__user__), params={"ref": effective_branch}, ) # Check if file exists if get_response.status_code == 404: retVal["message"] = ( f"File not found: `{path}`. Use `create_file()` to create a new file." ) return retVal get_response.raise_for_status() file_info = get_response.json() current_sha = file_info.get("sha") # Decode current content current_content_b64 = file_info.get("content", "") try: current_content = base64.b64decode(current_content_b64).decode( "utf-8" ) except Exception: retVal["message"] = "Could not decode current file content." return retVal # Parse and apply the diff new_content, error_msg = self._gitea.apply_unified_diff( current_content, diff_content ) if new_content is None: retVal[ "message" ] = f"""❌ **Failed to apply diff** **Reason:** {error_msg} **Required format:** Unified diff with hunk headers ```diff --- a/{path} +++ b/{path} @@ -10,3 +10,4 @@ existing line (context) +new line to add -line to remove ``` **Key requirements:** - `@@` hunk headers are REQUIRED (e.g., `@@ -10,3 +10,4 @@`) - Lines starting with `+` are additions - Lines starting with `-` are deletions - Lines starting with ` ` (space) are context Use `get_file(path)` to see current content and construct a proper diff. """ return retVal # Check if diff actually changed anything if new_content == current_content: retVal[ "message" ] = f"""⚠️ **Diff produced no changes** The diff was parsed but resulted in no modifications to the file. **Possible causes:** 1. The context lines in your diff don't match the actual file content 2. The hunk line numbers are incorrect 3. The changes were already applied Use `get_file("{path}")` to see the current content and verify your diff targets the correct lines. """ return retVal # Generate commit message if needed if not message: if auto_message: message = self._gitea.generate_diff_commit_message( path, diff_content ) else: retVal["message"] = ( "Commit message is required when auto_message=False." ) return retVal # Commit the changes new_content_b64 = base64.b64encode(new_content.encode("utf-8")).decode( "ascii" ) response = await client.put( self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), headers=self._gitea.headers(__user__), json={ "content": new_content_b64, "message": message, "branch": effective_branch, "sha": current_sha, }, ) response.raise_for_status() result = response.json() commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] # Parse diff stats added_lines = len( [ l for l in diff_content.splitlines() if l.startswith("+") and not l.startswith("+++") ] ) removed_lines = len( [ l for l in diff_content.splitlines() if l.startswith("-") and not l.startswith("---") ] ) if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) # Prepare patched content for review # For large files, show a truncated version patched_lines = new_content.splitlines() total_lines = len(patched_lines) if total_lines <= 100: # Small file - show entire content patched_preview = new_content else: # Large file - show first 30 and last 20 lines with indicator head = "\n".join(patched_lines[:30]) tail = "\n".join(patched_lines[-20:]) patched_preview = ( f"{head}\n\n... [{total_lines - 50} lines omitted] ...\n\n{tail}" ) message_text = f"✅ **Diff Applied - REVIEW REQUIRED**\n\n" message_text += f"**File:** `{path}`\n" message_text += f"**Branch:** `{effective_branch}`\n" message_text += f"**Commit:** `{commit_sha}`\n" message_text += ( f"**Changes:** +{added_lines} lines, -{removed_lines} lines\n" ) message_text += f"**Message:** {message}\n\n" message_text += "---\n\n" message_text += "## 📋 Review the Patched File\n\n" message_text += "**Check for:**\n" message_text += "- ❌ Accidental line removals\n" message_text += "- ❌ Code placed in wrong location\n" message_text += "- ❌ Duplicate code blocks\n" message_text += "- ❌ Missing imports or dependencies\n" message_text += "- ❌ Broken syntax or indentation\n\n" message_text += f"**Patched Content ({total_lines} lines):**\n\n" message_text += f"```\n{patched_preview}\n```\n\n" message_text += "---\n" message_text += ( "⚠️ **If the patch is incorrect**, use `apply_diff()` again to fix, " ) message_text += "or `get_file()` to see current state and `replace_file()` to correct.\n" retVal["status"] = "success" retVal["message"] = message_text return retVal except httpx.HTTPStatusError as e: error_msg = self._gitea.format_error(e, f"diff application to '{path}'") if e.response.status_code == 409: retVal["message"] = ( f"Update conflict for `{path}`. Fetch the latest version and try again." ) else: retVal["message"] = f"Failed to apply diff. {error_msg}" return retVal except Exception as e: retVal["message"] = ( f"Unexpected failure during diff application: {type(e).__name__}: {e}" ) return retVal async def commit_changes( self, path: str, content: str, message: Optional[str] = None, repo: Optional[str] = None, max_delta_percent: Optional[float] = None, __user__: dict = None, __metadata__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> dict: """ Commit file changes with automatic content detection and size delta gating. This method: 1. Detects whether to create or replace a file 2. Validates file size changes against threshold (quality gate) 3. Auto-generates commit message if not provided 4. Commits to the working branch """ retVal = {"status": "failure", "message": ""} # Get working branch from metadata effective_branch = self._gitea.get_branch(__user__, __metadata__) token = self._gitea.get_token(__user__) if not token: retVal["message"] = "GITEA_TOKEN not configured in UserValves settings." return retVal try: owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__) except ValueError as e: retVal["message"] = str(e) return retVal # Use provided threshold or default from valves delta_threshold = max_delta_percent or self.valves.MAX_SIZE_DELTA_PERCENT if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Processing {path}...", "done": False, }, } ) try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: # Check if file exists get_response = await client.get( self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), headers=self._gitea.headers(__user__), params={"ref": effective_branch}, ) file_exists = get_response.status_code == 200 current_sha = None current_size = 0 if file_exists: get_response.raise_for_status() file_info = get_response.json() current_sha = file_info.get("sha") current_size = file_info.get("size", 0) # SIZE DELTA GATE - Quality check new_size = len(content.encode("utf-8")) if current_size > 0 and new_size > 0: delta_percent = ( abs(new_size - current_size) / current_size * 100 ) if delta_percent > delta_threshold: # Calculate actual bytes changed size_diff = new_size - current_size direction = "larger" if size_diff > 0 else "smaller" if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": "Size gate triggered", "done": True, "hidden": True, }, } ) retVal[ "message" ] = f"""⚠️ **Quality Gate: Large File Change Detected** **File:** `{path}` **Current Size:** {current_size} bytes **New Size:** {new_size} bytes **Change:** {size_diff:+d} bytes ({delta_percent:.1f}% {direction}) **Threshold:** {delta_threshold}% **Recommended: Use diff-based updates instead** ```python apply_diff(path="{path}", diff_content="...") ``` Or override with: ```python commit_changes(..., max_delta_percent=100) ``` """ return retVal # Generate commit message if not provided if not message: message = self._gitea.generate_commit_message( change_type="chore", scope=path.split("/")[-1] if "/" in path else path, description=f"update {path}", ) # Prepare content content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii") if file_exists: # Replace existing file if not current_sha: retVal["message"] = ( f"Could not retrieve SHA for existing file: {path}" ) return retVal response = await client.put( self._gitea.api_url( f"/repos/{owner}/{repo_name}/contents/{path}" ), headers=self._gitea.headers(__user__), json={ "content": content_b64, "message": message, "branch": effective_branch, "sha": current_sha, }, ) else: # Create new file response = await client.post( self._gitea.api_url( f"/repos/{owner}/{repo_name}/contents/{path}" ), headers=self._gitea.headers(__user__), json={ "content": content_b64, "message": message, "branch": effective_branch, }, ) response.raise_for_status() result = response.json() commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] action = "Updated" if file_exists else "Created" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) # Calculate and show size change new_size = len(content.encode("utf-8")) size_info = "" if file_exists and current_size > 0: delta = new_size - current_size delta_percent = delta / current_size * 100 size_info = ( f"**Size Change:** {delta:+d} bytes ({delta_percent:.1f}%)\n" ) message_text = f"""✅ **{action} File Successfully** **File:** `{path}` **Branch:** `{effective_branch}` **Commit:** `{commit_sha}` **Message:** {message} {size_info}**Action:** {action} """ retVal["status"] = "success" retVal["message"] = message_text return retVal except httpx.HTTPStatusError as e: error_msg = self._gitea.format_error(e, f"file commit for '{path}'") if e.response.status_code == 409: retVal["message"] = ( f"Update conflict for `{path}`. Fetch the latest version and try again." ) else: retVal["message"] = f"Failed to commit file. {error_msg}" return retVal except Exception as e: retVal["message"] = ( f"Unexpected failure during commit: {type(e).__name__}: {e}" ) return retVal async def create_pull_request( self, title: str, body: Optional[str] = "", repo: Optional[str] = None, __user__: dict = None, __metadata__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> dict: """Create a pull request from the current branch""" retVal = {"status": "failure", "message": ""} # Get working branch from metadata head_branch = self._gitea.get_branch(__user__, __metadata__) token = self._gitea.get_token(__user__) if not token: retVal["message"] = "GITEA_TOKEN not configured." return retVal try: owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__) except ValueError as e: retVal["message"] = str(e) return retVal base_branch = self.valves.DEFAULT_BRANCH # Check if protected branch if head_branch in self.valves.PROTECTED_BRANCHES: retVal["message"] = ( f"❌ Cannot create PR from protected branch '{head_branch}'" ) return retVal if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Creating PR...", "done": False}, } ) try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: response = await client.post( self._gitea.api_url(f"/repos/{owner}/{repo_name}/pulls"), headers=self._gitea.headers(__user__), json={ "title": title, "head": head_branch, "base": base_branch, "body": body, }, ) # Handle PR already exists if response.status_code == 409: retVal["message"] = ( f"⚠️ PR already exists for branch `{head_branch}`" ) return retVal response.raise_for_status() pr = response.json() pr_number = pr.get("number") pr_url = pr.get("html_url", "") if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) # Persist PR number for session recovery if __metadata__ and __metadata__.get("chat_id"): self._gitea.save_state(__metadata__["chat_id"], pr_number=pr_number) retVal["status"] = "success" retVal[ "message" ] = f"""✅ **Pull Request Created Successfully** **PR #{pr_number}:** {title} **Branch:** `{head_branch}` → `{base_branch}` **URL:** {pr_url} **Next Steps:** 1. Read PR feedback: `read_pull_request({pr_number})` 2. Address reviewer comments 3. Update ticket with PR link """ return retVal except httpx.HTTPStatusError as e: error_msg = self._gitea.format_error(e, "PR creation") if e.response.status_code == 422: retVal["message"] = ( "Could not create PR. The branch may not exist or there may be merge conflicts." ) else: retVal["message"] = f"Failed to create PR. {error_msg}" return retVal except Exception as e: retVal["message"] = ( f"Unexpected failure during PR creation: {type(e).__name__}: {e}" ) return retVal async def read_pull_request( self, pr_number: int, repo: Optional[str] = None, __user__: dict = None, __metadata__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> dict: """ Read pull request details including review feedback. This is how you get feedback to iterate on changes. """ retVal = {"status": "failure", "message": ""} token = self._gitea.get_token(__user__) if not token: retVal["message"] = "GITEA_TOKEN not configured." return retVal try: owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__) except ValueError as e: retVal["message"] = str(e) return retVal if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Fetching PR #{pr_number}...", "done": False, }, } ) try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: # Get PR details pr_response = await client.get( self._gitea.api_url( f"/repos/{owner}/{repo_name}/pulls/{pr_number}" ), headers=self._gitea.headers(__user__), ) pr_response.raise_for_status() pr = pr_response.json() # Get PR comments comments_response = await client.get( self._gitea.api_url( f"/repos/{owner}/{repo_name}/pulls/{pr_number}/comments" ), headers=self._gitea.headers(__user__), ) comments_response.raise_for_status() comments = comments_response.json() title = pr.get("title", "") body = pr.get("body", "") state = pr.get("state", "") user = pr.get("user", {}).get("login", "") head_branch = pr.get("head", {}).get("ref", "") base_branch = pr.get("base", {}).get("ref", "") mergeable = pr.get("mergeable", False) merged = pr.get("merged", False) html_url = pr.get("html_url", "") if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) # Build message message = f"# 🔀 Pull Request #{pr_number}: {title}\n\n" message += f"**State:** {state.upper()} | **Author:** @{user}\n" message += f"**Branch:** `{head_branch}` → `{base_branch}`\n" message += f"**Mergeable:** {'✅ Yes' if mergeable else '❌ No'}\n" message += f"**Merged:** {'✅ Yes' if merged else '❌ No'}\n" message += f"**URL:** {html_url}\n\n" if body: message += "## 📝 Description\n\n" message += f"{body}\n\n" if comments: message += f"## 💬 Review Comments ({len(comments)})\n\n" for comment in comments[:10]: # Limit to 10 most recent comment_user = comment.get("user", {}).get("login", "unknown") comment_body = comment.get("body", "") comment_path = comment.get("path", "") comment_line = comment.get("line", "") message += f"**@{comment_user}**" if comment_path: message += f" on `{comment_path}`" if comment_line: message += f" (line {comment_line})" message += f":\n{comment_body}\n\n" if len(comments) > 10: message += f"... and {len(comments) - 10} more comments\n\n" else: message += "## 💬 No review comments yet\n\n" message += "## 🚀 Next Steps\n\n" if comments: message += "1. Address review comments\n" message += ( "2. Make changes using `apply_diff()` or `commit_changes()`\n" ) message += "3. Update ticket with progress\n" else: message += "Waiting for review feedback...\n" retVal["status"] = "success" retVal["message"] = message return retVal except httpx.HTTPStatusError as e: error_msg = self._gitea.format_error(e, f"PR #{pr_number}") if e.response.status_code == 404: retVal["message"] = f"PR #{pr_number} not found in {owner}/{repo_name}." else: retVal["message"] = f"Failed to fetch PR. {error_msg}" return retVal except Exception as e: retVal["message"] = f"Unexpected failure: {type(e).__name__}: {e}" return retVal async def replace_file( self, path: str, content: str, message: str, repo: Optional[str] = None, __user__: dict = None, __metadata__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> dict: """ Update an existing file in the repository. WARNING: This replaces the entire file content. Use `apply_diff()` for incremental changes. """ retVal = {"status": "failure", "message": ""} effective_branch = self._gitea.get_branch(__user__, __metadata__) token = self._gitea.get_token(__user__) if not token: retVal["message"] = "GITEA_TOKEN not configured in UserValves settings." return retVal try: owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__) except ValueError as e: retVal["message"] = str(e) return retVal if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": f"Updating {path}...", "done": False}, } ) try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: get_response = await client.get( self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), headers=self._gitea.headers(__user__), params={"ref": effective_branch}, ) if get_response.status_code == 404: retVal["message"] = ( f"File not found: `{path}`. Use `create_file()` to create a new file." ) return retVal get_response.raise_for_status() file_info = get_response.json() sha = file_info.get("sha") if not sha: retVal["message"] = "Could not retrieve file SHA for update." return retVal content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii") response = await client.put( self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), headers=self._gitea.headers(__user__), json={ "content": content_b64, "message": message, "branch": effective_branch, "sha": sha, }, ) response.raise_for_status() result = response.json() commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) message_text = f"✅ **File Updated Successfully**\n\n" message_text += f"**File:** `{path}`\n" message_text += f"**Branch:** `{effective_branch}`\n" message_text += f"**Commit:** `{commit_sha}`\n" message_text += f"**Message:** {message}\n" retVal["status"] = "success" retVal["message"] = message_text return retVal except httpx.HTTPStatusError as e: error_msg = self._gitea.format_error(e, f"file update for '{path}'") if e.response.status_code == 409: retVal["message"] = ( f"Update conflict for `{path}`. Fetch the latest version and try again." ) else: retVal["message"] = f"Failed to update file. {error_msg}" return retVal except Exception as e: retVal["message"] = ( f"Unexpected failure during file update: {type(e).__name__}: {e}" ) return retVal async def create_file( self, path: str, content: str, message: str, repo: Optional[str] = None, __user__: dict = None, __metadata__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> dict: """Create a new file in the repository""" retVal = {"status": "failure", "message": ""} effective_branch = self._gitea.get_branch(__user__, __metadata__) token = self._gitea.get_token(__user__) if not token: retVal["message"] = "GITEA_TOKEN not configured in UserValves settings." return retVal try: owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__) except ValueError as e: retVal["message"] = str(e) return retVal if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": f"Creating {path}...", "done": False}, } ) try: content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii") async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: response = await client.post( self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), headers=self._gitea.headers(__user__), json={ "content": content_b64, "message": message, "branch": effective_branch, }, ) response.raise_for_status() result = response.json() commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) message_text = f"✅ **File Created Successfully**\n\n" message_text += f"**File:** `{path}`\n" message_text += f"**Branch:** `{effective_branch}`\n" message_text += f"**Commit:** `{commit_sha}`\n" message_text += f"**Message:** {message}\n" retVal["status"] = "success" retVal["message"] = message_text return retVal except httpx.HTTPStatusError as e: error_msg = self._gitea.format_error(e, f"file creation for '{path}'") if e.response.status_code == 422: retVal["message"] = ( f"File already exists: `{path}`. Use `replace_file()` or `apply_diff()` to modify it." ) else: retVal["message"] = f"Failed to create file. {error_msg}" return retVal except Exception as e: retVal["message"] = ( f"Unexpected failure during file creation: {type(e).__name__}: {e}" ) return retVal async def delete_file( self, path: str, message: str, repo: Optional[str] = None, __user__: dict = None, __metadata__: dict = None, __event_emitter__: Callable[[dict], Any] = None, __event_call__: Callable[[dict], Any] = None, ) -> dict: """Delete a file from the repository""" retVal = {"status": "failure", "message": ""} effective_branch = self._gitea.get_branch(__user__, __metadata__) token = self._gitea.get_token(__user__) if not token: retVal["message"] = "GITEA_TOKEN not configured in UserValves settings." return retVal try: owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__) except ValueError as e: retVal["message"] = str(e) return retVal # Confirmation dialog if __event_call__: result = await __event_call__( { "type": "confirmation", "data": { "title": "Confirm File Deletion", "message": f"Delete `{path}` from `{owner}/{repo_name}` ({effective_branch})?", }, } ) if result is None or result is False: retVal["message"] = "⚠️ File deletion cancelled by user." return retVal if isinstance(result, dict) and not result.get("confirmed"): retVal["message"] = "⚠️ File deletion cancelled by user." return retVal if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": f"Deleting {path}...", "done": False}, } ) try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: get_response = await client.get( self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), headers=self._gitea.headers(__user__), params={"ref": effective_branch}, ) if get_response.status_code == 404: retVal["message"] = f"File not found: `{path}`" return retVal get_response.raise_for_status() file_info = get_response.json() sha = file_info.get("sha") if not sha: retVal["message"] = "Could not retrieve file SHA for deletion." return retVal response = await client.delete( self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), headers=self._gitea.headers(__user__), json={ "message": message, "branch": effective_branch, "sha": sha, }, ) response.raise_for_status() result = response.json() commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) message_text = f"✅ **File Deleted Successfully**\n\n" message_text += f"**File:** `{path}`\n" message_text += f"**Branch:** `{effective_branch}`\n" message_text += f"**Commit:** `{commit_sha}`\n" message_text += f"**Message:** {message}\n" retVal["status"] = "success" retVal["message"] = message_text return retVal except httpx.HTTPStatusError as e: error_msg = self._gitea.format_error(e, f"file deletion for '{path}'") if e.response.status_code == 404: retVal["message"] = f"File not found: `{path}`" else: retVal["message"] = f"Failed to delete file. {error_msg}" return retVal except Exception as e: retVal["message"] = ( f"Unexpected failure during file deletion: {type(e).__name__}: {e}" ) return retVal async def rename_file( self, old_path: str, new_path: str, message: Optional[str] = None, repo: Optional[str] = None, __user__: dict = None, __metadata__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> dict: """Rename a file in the repository""" retVal = {"status": "failure", "message": ""} effective_branch = self._gitea.get_branch(__user__, __metadata__) token = self._gitea.get_token(__user__) if not token: retVal["message"] = "GITEA_TOKEN not configured in UserValves settings." return retVal try: owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__) except ValueError as e: retVal["message"] = str(e) return retVal if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Renaming {old_path} to {new_path}...", "done": False, }, } ) try: if not message: message = self._gitea.generate_commit_message( change_type="chore", scope="rename", description=f"rename {old_path} → {new_path}", ) async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: get_response = await client.get( self._gitea.api_url( f"/repos/{owner}/{repo_name}/contents/{old_path}" ), headers=self._gitea.headers(__user__), params={"ref": effective_branch}, ) if get_response.status_code == 404: retVal["message"] = f"File not found: `{old_path}`" return retVal get_response.raise_for_status() old_file = get_response.json() old_sha = old_file.get("sha") content_b64 = old_file.get("content", "") try: content = base64.b64decode(content_b64).decode("utf-8") except Exception: retVal["message"] = "Could not decode file content." return retVal new_content_b64 = base64.b64encode(content.encode("utf-8")).decode( "ascii" ) create_response = await client.post( self._gitea.api_url( f"/repos/{owner}/{repo_name}/contents/{new_path}" ), headers=self._gitea.headers(__user__), json={ "content": new_content_b64, "message": message, "branch": effective_branch, }, ) if create_response.status_code == 422: retVal["message"] = f"File already exists at new path: `{new_path}`" return retVal create_response.raise_for_status() delete_response = await client.delete( self._gitea.api_url( f"/repos/{owner}/{repo_name}/contents/{old_path}" ), headers=self._gitea.headers(__user__), json={ "message": message, "branch": effective_branch, "sha": old_sha, }, ) delete_response.raise_for_status() commit_sha = ( create_response.json().get("commit", {}).get("sha", "unknown")[:8] ) if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) message_text = f"✅ **File Renamed Successfully**\n\n" message_text += f"**Old Path:** `{old_path}`\n" message_text += f"**New Path:** `{new_path}`\n" message_text += f"**Branch:** `{effective_branch}`\n" message_text += f"**Commit:** `{commit_sha}`\n" message_text += f"**Message:** {message}\n" retVal["status"] = "success" retVal["message"] = message_text return retVal except httpx.HTTPStatusError as e: error_msg = self._gitea.format_error(e, "file rename") retVal["message"] = f"Failed to rename file. {error_msg}" return retVal except Exception as e: retVal["message"] = ( f"Unexpected failure during file rename: {type(e).__name__}: {e}" ) return retVal async def get_file( self, path: str, repo: Optional[str] = None, __user__: dict = None, __metadata__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> dict: """Get the contents of a file from the repository""" retVal = {"status": "failure", "message": ""} effective_branch = self._gitea.get_branch(__user__, __metadata__) token = self._gitea.get_token(__user__) if not token: retVal["message"] = "GITEA_TOKEN not configured in UserValves settings." return retVal try: owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__) except ValueError as e: retVal["message"] = str(e) return retVal if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Reading file {path}...", "done": False, }, } ) try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: response = await client.get( self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), headers=self._gitea.headers(__user__), params={"ref": effective_branch}, ) response.raise_for_status() file_info = response.json() if isinstance(file_info, list): retVal["message"] = ( f"'{path}' is a directory. Use `list_files()` to browse its contents." ) return retVal if file_info.get("type") != "file": retVal["message"] = ( f"'{path}' is not a file (type: {file_info.get('type')})" ) return retVal content_b64 = file_info.get("content", "") try: content = base64.b64decode(content_b64).decode("utf-8") except Exception: retVal["message"] = ( "Could not decode file content. The file may be binary." ) return retVal size = file_info.get("size", 0) sha_short = file_info.get("sha", "unknown")[:8] if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) message_text = f"**File:** `{path}`\n" message_text += f"**Branch:** `{effective_branch}`\n" message_text += f"**SHA:** `{sha_short}` | **Size:** {size} bytes\n" message_text += f"**URL:** {file_info.get('html_url', 'N/A')}\n\n" message_text += f"```\n{content}\n```\n" retVal["status"] = "success" retVal["message"] = message_text return retVal except httpx.HTTPStatusError as e: error_msg = self._gitea.format_error(e, f"file fetch for '{path}'") if e.response.status_code == 404: retVal["message"] = f"File not found: `{path}`" else: retVal["message"] = f"Failed to fetch file. {error_msg}" return retVal except Exception as e: retVal["message"] = f"Unexpected failure: {type(e).__name__}: {e}" return retVal async def list_files( self, path: str = "", repo: Optional[str] = None, __user__: dict = None, __metadata__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> dict: """List files and directories in a repository path""" retVal = {"status": "failure", "message": ""} effective_branch = self._gitea.get_branch(__user__, __metadata__) token = self._gitea.get_token(__user__) if not token: retVal["message"] = "GITEA_TOKEN not configured in UserValves settings." return retVal try: owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__) except ValueError as e: retVal["message"] = str(e) return retVal if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Listing {path or 'root'}...", "done": False, }, } ) try: async with httpx.AsyncClient( timeout=30.0, verify=self.valves.VERIFY_SSL ) as client: response = await client.get( self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), headers=self._gitea.headers(__user__), params={"ref": effective_branch}, ) response.raise_for_status() contents = response.json() if isinstance(contents, dict): retVal["message"] = ( f"'{path}' is a file. Use `get_file()` to read its contents." ) return retVal message = f"**Contents of {owner}/{repo_name}/{path or '.'}** (`{effective_branch}`)\n\n" dirs = [item for item in contents if item.get("type") == "dir"] files = [item for item in contents if item.get("type") == "file"] if dirs: message += "**📁 Directories:**\n" for item in sorted(dirs, key=lambda x: x.get("name", "").lower()): message += f"- `📁 {item.get('name', '')}/`\n" message += "\n" if files: message += "**📄 Files:**\n" for item in sorted(files, key=lambda x: x.get("name", "").lower()): size = item.get("size", 0) if size < 1024: size_str = f"{size}B" elif size < 1024 * 1024: size_str = f"{size//1024}KB" else: size_str = f"{size//(1024*1024)}MB" sha_short = item.get("sha", "unknown")[:8] message += ( f"- `{item.get('name', '')}` ({size_str}, sha: {sha_short})\n" ) message += f"\n**Total:** {len(dirs)} directories, {len(files)} files" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Done", "done": True, "hidden": True}, } ) retVal["status"] = "success" retVal["message"] = message return retVal except httpx.HTTPStatusError as e: error_msg = self._gitea.format_error(e, f"directory listing for '{path}'") if e.response.status_code == 404: retVal["message"] = f"Path not found: `{path}`" else: retVal["message"] = f"Failed to list directory contents. {error_msg}" return retVal except Exception as e: retVal["message"] = f"Unexpected failure: {type(e).__name__}: {e}" return retVal