From abf618697674a096c9f00d80c1ab0185cc754bcf Mon Sep 17 00:00:00 2001 From: xcaliber Date: Sat, 17 Jan 2026 16:20:53 +0000 Subject: [PATCH] feat(gitea): critical refactor - automatic branch management (v1.1.0) CRITICAL ARCHITECTURAL CHANGE: - LLM no longer manages branch names - system derives from chat_id - All operations automatically use current working branch from chat_id session - Branch parameter removed from all operation functions - Added _get_working_branch() helper for automatic branch detection - Added _generate_branch_name() for system-managed branch naming - Added delete_file() for complete file removal - Added rename_file() for file renaming Complete file operations: - apply_diff() - diff-based updates - commit_changes() - commit with size delta gating - create_file() - create new files - replace_file() - replace entire file - delete_file() - delete files (NEW) - rename_file() - rename files (NEW) - get_file() - read files - list_files() - list directory contents All functions now: - Use __chat_id__ parameter for branch detection - Get working branch from session cache (system-managed) - Never accept branch names from LLM input Refactored workflow: - create_feature_branch() generates branch name from issue_number + scope - All file operations use cached working branch from chat_id - LLM focuses on code/content, not infrastructure Refs: #11 --- open-webui-automation/tools/gitea/coder.py | 2536 ++++++++++++++++++++ 1 file changed, 2536 insertions(+) create mode 100644 open-webui-automation/tools/gitea/coder.py diff --git a/open-webui-automation/tools/gitea/coder.py b/open-webui-automation/tools/gitea/coder.py new file mode 100644 index 0000000..4f02088 --- /dev/null +++ b/open-webui-automation/tools/gitea/coder.py @@ -0,0 +1,2536 @@ +""" +title: Gitea Coder - Workflow Role with Automatic Branch Management +author: Jeff Smith + Claude + minimax +version: 1.1.0 +license: MIT +description: High-level workflow role for LLM-based code generation with automatic branch management +requirements: pydantic, httpx +changelog: + 1.1.0: + - CRITICAL: LLM no longer manages branch names - system derives from chat_id + - All operations automatically use current working branch from chat_id session + - Branch parameter removed from operations (system-managed, not LLM-controlled) + - Added delete_file() for complete file removal + - Added rename_file() for file renaming with history + - Added _get_working_branch() helper for automatic branch detection + - Refactored all operations to use working branch from chat_id cache + 1.0.0: + - Initial implementation of gitea_coder role + - Branch creation with scope gating + - Diff-based updates + - Size delta gating +""" + +from typing import Optional, Callable, Any, Dict, List, Tuple +from pydantic import BaseModel, Field +import re +import time +import base64 +import httpx + + +class Tools: + """ + Gitea Coder Role - High-level workflow automation for code generation tasks. + + CRITICAL ARCHITECTURAL CHANGE (v1.1.0): + - LLM is NO LONGER in charge of branch naming + - System derives branch names automatically from chat_id and issue_number + - All operations automatically use the current working branch + - LLM focuses on code/content, not branch management + + Workflow: + reads ticket โ†’ creates branch (system-managed) โ†’ makes changes โ†’ creates PR + + Key Features: + - Automatic branch management (system derives from chat_id) + - Branch scope gating (prevents main/master pushes) + - Enforces branch naming conventions (system-managed) + - Auto-generates conventional commit messages + - Quality gates for file changes (size delta validation) + - Diff-based updates to prevent accidental file replacements + - Session caching for chat_id โ†’ working_branch mapping + """ + + class Valves(BaseModel): + """System-wide configuration for Gitea Coder integration""" + + GITEA_URL: str = Field( + default="https://gitea.example.com", + description="Gitea server URL (ingress or internal service)", + ) + DEFAULT_REPO: str = Field( + default="", + description="Default repository in owner/repo format", + ) + DEFAULT_BRANCH: str = Field( + default="main", + description="Default branch name for operations", + ) + DEFAULT_ORG: str = Field( + default="", + description="Default organization for org-scoped operations", + ) + ALLOW_USER_OVERRIDES: bool = Field( + default=True, + description="Allow users to override defaults via UserValves", + ) + VERIFY_SSL: bool = Field( + default=True, + description="Verify SSL certificates (disable for self-signed certs)", + ) + DEFAULT_PAGE_SIZE: int = Field( + default=50, + description="Default page size for list operations (max 50)", + ge=1, + le=50, + ) + # Coder-specific settings + MAX_SIZE_DELTA_PERCENT: float = Field( + default=50.0, + description="Maximum allowed file size change percentage (quality gate)", + ge=1.0, + le=500.0, + ) + PROTECTED_BRANCHES: List[str] = Field( + default=["main", "master", "develop", "dev", "release", "hotfix"], + description="Branches that cannot be committed to directly", + ) + ALLOWED_SCOPES: List[str] = Field( + default=["feature", "fix", "refactor", "docs", "test", "chore", "wip"], + description="Allowed branch scope prefixes", + ) + + class UserValves(BaseModel): + """Per-user configuration for personal credentials and overrides""" + + GITEA_TOKEN: str = Field( + default="", + description="Your Gitea API token", + ) + USER_DEFAULT_REPO: str = Field( + default="", + description="Override default repository for this user", + ) + USER_DEFAULT_BRANCH: str = Field( + default="", + description="Override default branch for this user", + ) + USER_DEFAULT_ORG: str = Field( + default="", + description="Override default organization for this user", + ) + + def __init__(self): + """Initialize with optional valve configuration from framework""" + # Handle valves configuration from framework + self.valves = self.Valves() + + # Enable tool usage visibility for debugging + self.citation = True + + # Handle user valves configuration + self.user_valves = self.UserValves() + + # Session cache: chat_id -> working_branch and metadata (with TTL) + self._session_cache: Dict[str, Tuple[dict, float]] = {} + self._cache_ttl_seconds = 3600 # 1 hour + + # Initialize underlying dev operations (for actual API calls) + self._dev = None + + def _api_url(self, endpoint: str) -> str: + """Construct full API URL for Gitea endpoint""" + base = self._get_url() + return f"{base}/api/v1{endpoint}" + + def _get_url(self) -> str: + """Get effective Gitea URL with trailing slash handling""" + return self.valves.GITEA_URL.rstrip("/") + + def _get_token(self, __user__: dict = None) -> str: + """Extract Gitea token from user context with robust handling""" + if __user__ and "valves" in __user__: + user_valves = __user__.get("valves") + if user_valves: + return user_valves.GITEA_TOKEN + return "" + + def _headers(self, __user__: dict = None) -> dict: + """Generate authentication headers with token""" + token = self._get_token(__user__) + if not token: + return {"Content-Type": "application/json"} + return { + "Authorization": f"token {token}", + "Content-Type": "application/json", + } + + def _format_error(self, e, context: str = "") -> str: + """Format HTTP error with detailed context for LLM understanding""" + try: + error_json = e.response.json() + error_msg = error_json.get("message", e.response.text[:200]) + except Exception: + error_msg = e.response.text[:200] + + context_str = f" ({context})" if context else "" + return f"HTTP Error {e.response.status_code}{context_str}: {error_msg}" + + def _get_repo(self, repo: Optional[str], __user__: dict = None) -> str: + """Get effective repository with priority resolution""" + if repo: + return repo + if __user__ and "valves" in __user__: + user_valves = __user__.get("valves") + if user_valves: + if self.valves.ALLOW_USER_OVERRIDES and user_valves.USER_DEFAULT_REPO: + return user_valves.USER_DEFAULT_REPO + return self.valves.DEFAULT_REPO + + def _get_branch(self, branch: Optional[str], __user__: dict = None) -> str: + """Get effective branch with priority resolution""" + if branch: + return branch + if __user__ and "valves" in __user__: + user_valves = __user__.get("valves") + if user_valves: + if self.valves.ALLOW_USER_OVERRIDES and user_valves.USER_DEFAULT_BRANCH: + return user_valves.USER_DEFAULT_BRANCH + return self.valves.DEFAULT_BRANCH + + def _get_org(self, org: Optional[str], __user__: dict = None) -> str: + """Get effective org with priority.""" + if org: + return org + if __user__ and "valves" in __user__: + user_valves = __user__.get("valves") + if user_valves: + if self.valves.ALLOW_USER_OVERRIDES and user_valves.USER_DEFAULT_ORG: + return user_valves.USER_DEFAULT_ORG + return self.valves.DEFAULT_ORG + + def _resolve_repo( + self, repo: Optional[str], __user__: dict = None + ) -> tuple[str, str]: + """Resolve repository string into owner and repo name with validation""" + effective_repo = self._get_repo(repo, __user__) + + if not effective_repo: + raise ValueError( + "No repository specified. Set DEFAULT_REPO in Valves or USER_DEFAULT_REPO in UserValves." + ) + + if "/" not in effective_repo: + raise ValueError( + f"Repository must be in 'owner/repo' format, got: {effective_repo}" + ) + + return effective_repo.split("/", 1) + + def _get_page_size(self, limit: Optional[int] = None) -> int: + """Calculate effective page size, capped at Gitea's max of 50""" + if limit is not None: + return min(limit, 50) + return min(self.valves.DEFAULT_PAGE_SIZE, 50) + + def _get_cached_session(self, chat_id: str) -> Optional[dict]: + """Get cached session data for chat_id with TTL""" + if chat_id and chat_id in self._session_cache: + data, timestamp = self._session_cache[chat_id] + if time.time() - timestamp < self._cache_ttl_seconds: + return data + else: + # Expired, remove from cache + del self._session_cache[chat_id] + return None + + def _set_cached_session(self, chat_id: str, data: dict) -> None: + """Set cached session data for chat_id with TTL""" + if chat_id: + self._session_cache[chat_id] = (data, time.time()) + + def _get_working_branch( + self, + chat_id: Optional[str] = None, + repo: Optional[str] = None, + __user__: dict = None + ) -> str: + """ + Get the current working branch for the session. + + CRITICAL: This is system-managed, NOT controlled by LLM. + + Priority: + 1. Get from chat_id session cache (if chat_id provided) + 2. Fall back to default branch from valves + + Args: + chat_id: Session ID for cache lookup + repo: Repository (used for cache key if chat_id present) + __user__: User context + + Returns: + Current working branch name (system-managed) + """ + # Try to get from session cache first + if chat_id: + session_data = self._get_cached_session(chat_id) + if session_data and "working_branch" in session_data: + return session_data["working_branch"] + + # Fall back to default branch + return self._get_branch(None, __user__) + + def _generate_branch_name(self, issue_number: int, title: str, scope: str = "feature") -> str: + """ + Generate a branch name from issue number and title. + + CRITICAL: This is system-managed, NOT controlled by LLM. + + Format: /- + Example: feature/42-add-user-authentication + + Args: + issue_number: The ticket/issue number + title: Issue title (used to generate slug) + scope: Branch scope prefix (feature, fix, etc.) + + Returns: + Generated branch name + """ + # Clean up title for branch name + # Remove special characters, lowercase, replace spaces with hyphens + slug = re.sub(r"[^a-z0-9\s-]", "", title.lower()) + slug = re.sub(r"[\s-]+", "-", slug) + slug = slug.strip("-") + + # Truncate to reasonable length + if len(slug) > 30: + slug = slug[:30].strip("-") + + return f"{scope}/{issue_number}-{slug}" + + def _validate_branch_name(self, branch_name: str) -> tuple[bool, str]: + """ + Validate branch name against allowed scopes and protected branches. + + CRITICAL: This is system-managed, NOT controlled by LLM. + This validation is for system-generated names, not LLM input. + + Returns: + tuple: (is_valid, error_message) + """ + # Check if it's a protected branch (direct commit attempt) + if branch_name in self.valves.PROTECTED_BRANCHES: + return False, ( + f"Branch '{branch_name}' is protected. " + f"Direct commits to protected branches are not allowed. " + f"Create a feature branch instead." + ) + + # Check if it starts with an allowed scope + scope_pattern = r"^(" + "|".join(self.valves.ALLOWED_SCOPES) + ")/" + if not re.match(scope_pattern, branch_name): + allowed = ", ".join([f"{s}/" for s in self.valves.ALLOWED_SCOPES]) + return False, ( + f"Branch '{branch_name}' does not follow naming convention. " + f"Use format: {allowed}-. " + f"Example: feature/42-add-user-auth" + ) + + return True, "" + + def _parse_issue_refs(self, text: str) -> List[str]: + """Extract issue references from text (e.g., #42, issue #42)""" + refs = re.findall(r"#(\d+)", text) + issue_refs = [f"#{ref}" for ref in refs] + + # Also check for "issue N" pattern + issue_n_refs = re.findall(r"issue\s*#?(\d+)", text, re.IGNORECASE) + for ref in issue_n_refs: + issue_ref = f"#{ref}" + if issue_ref not in issue_refs: + issue_refs.append(issue_ref) + + return issue_refs + + def _generate_commit_message( + self, + change_type: str, + scope: str, + description: str, + issue_refs: Optional[List[str]] = None, + body: Optional[str] = None, + ) -> str: + """ + Generate a conventional commit message. + + Format: scope(type): description + + Args: + change_type: Type of change (feat, fix, docs, etc.) + scope: Area of change (file, module, or component) + description: Brief description of changes + issue_refs: List of issue references (e.g., ["#42"]) + body: Optional longer description + + Returns: + Formatted commit message + """ + # Validate and normalize change type + valid_types = [ + "feat", "fix", "docs", "style", "refactor", "test", + "chore", "perf", "ci", "build", "revert" + ] + if change_type.lower() not in valid_types: + change_type = "chore" # Default for unknown types + + # Build the subject line + scope_str = f"({scope})" if scope else "" + message = f"{change_type.lower()}{scope_str}: {description}" + + # Add issue references to body or footer + if issue_refs: + refs_str = ", ".join(issue_refs) + footer = f"Refs: {refs_str}" + + if body: + body = f"{body}\n\n{footer}" + else: + message = f"{message}\n\n{footer}" + + return message + + async def workflow_summary( + self, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Get a summary of available coder workflows and commands. + + CRITICAL: LLM should NOT manage branch names - system handles this. + + Returns: + Markdown-formatted workflow guide + """ + output = """# ๐Ÿš€ Gitea Coder Workflow Guide + +## โš ๏ธ CRITICAL: Branch Management is AUTOMATIC + +**The LLM is NO LONGER in charge of branch naming or branch selection.** +- Branch names are derived automatically from chat_id and issue_number +- All operations automatically use the current working branch +- LLM focuses on CODE and CONTENT, not infrastructure + +## Quick Start + +1. **Read the ticket:** `read_ticket(issue_number)` +2. **Create feature branch:** `create_feature_branch(issue_number)` (system derives name) +3. **Make changes:** `apply_diff()` or `commit_changes()` (uses working branch) +4. **Create PR:** `create_pull_request()` (auto-detects branch) + +## Available Commands + +### ๐Ÿ“‹ Reading Tickets +- `read_ticket(issue_number)` - Get full issue details + +### ๐ŸŒฟ Branch Management (SYSTEM-MANAGED) +- `create_feature_branch(issue_number, title)` - Creates branch (name derived automatically) +- `get_branch_status()` - See current working branch (system-managed) +- `list_my_branches()` - List your branches + +### ๐Ÿ“ File Operations (Auto-detect branch from chat_id) +- `apply_diff(path, diff, message)` - Apply unified diff patch +- `commit_changes(path, content, message)` - Commit with size delta gate +- `replace_file(path, content, message)` - Replace entire file +- `create_file(path, content, message)` - Create new file +- `delete_file(path, message)` - Delete a file +- `rename_file(from_path, to_path, message)` - Rename a file + +### ๐Ÿ” Quality Gates +- Size delta checks (default: 50% max change) +- Branch scope validation (system-managed) +- Protected branch enforcement + +### ๐Ÿ“ฆ Pull Requests +- `create_pull_request(title, description)` - Create PR from current branch + +## Branch Naming Convention + +**SYSTEM-GENERATED, NOT LLM-CONTROLLED** + +Format: `/-` + +Examples (generated by system): +- feature/42-add-user-login +- fix/37-fix-memory-leak +- refactor/15-cleanup-api +- docs/20-update-readme + +Allowed scopes: `feature/`, `fix/`, `refactor/`, `docs/`, `test/`, `chore/`, `wip/` + +## How It Works + +### Before (LLM-managed - PROBLEMATIC): +```python +# LLM had to manage branch names - error-prone +apply_diff( + path="src/auth.py", + diff="...", + branch="feature/42-add-login" # LLM decides branch name +) +``` + +### After (SYSTEM-managed - SAFE): +```python +# LLM focuses on code, system manages branches +apply_diff( + path="src/auth.py", + diff="...", + # Branch automatically detected from chat_id session +) + +# System derives branch name from chat_id + issue +create_feature_branch(issue_number=42, title="Add login functionality") +# โ†’ Automatically creates: feature/42-add-login-functionality +``` + +## Quality Gates + +### Size Delta Gate (commit_changes) +- Files > 50% size change require diff-based updates +- Prevents accidental file replacements +- Configurable threshold in Valves + +### Branch Protection +- Cannot commit directly to: main, master, develop, dev, release, hotfix +- Create feature branches instead (system-managed) + +## Example Workflow + +```python +# Read the ticket +ticket = read_ticket(42) + +# Create branch (SYSTEM derives name from issue) +create_feature_branch(42, "Add login functionality") +# โ†’ Creates: feature/42-add-login-functionality +# โ†’ Caches branch for session + +# Make changes using diff (uses cached branch) +apply_diff( + path="src/auth.py", + diff="""--- a/src/auth.py ++++ b/src/auth.py +@@ -10,3 +10,7 @@ class Auth: ++ def login(self, user: str) -> bool: ++ return True +""", + message="feat(auth): add login method to Auth class" +) + +# Create PR (auto-detects current branch) +create_pull_request( + title="feat(auth): add login method", + body="Implements login functionality as specified in #42" +) +``` + +## Tips + +- **Don't pass branch parameters** - System auto-detects from chat_id +- **Use issue numbers** - System derives branch names from issues +- **Always reference issues** - Use `Refs: #42` in commit messages +- **Use diff-based updates** for incremental changes +- **Large changes** should be split into multiple commits +- **Session persistence** - chat_id maintains working branch across conversation +""" + return output + + async def read_ticket( + self, + issue_number: int, + repo: Optional[str] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Read and parse a ticket/issue to understand requirements. + + Args: + issue_number: The issue/ticket number to read + repo: Repository in 'owner/repo' format + + Returns: + Formatted ticket summary with parsed requirements + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured in UserValves settings." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Fetching issue #{issue_number}...", + "done": False, + }, + } + ) + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + response = await client.get( + self._api_url(f"/repos/{owner}/{repo_name}/issues/{issue_number}"), + headers=self._headers(__user__), + ) + response.raise_for_status() + issue = response.json() + + title = issue.get("title", "No title") + body = issue.get("body", "") + state = issue.get("state", "unknown") + user = issue.get("user", {}).get("login", "unknown") + labels = [label.get("name", "") for label in issue.get("labels", [])] + created_at = issue.get("created_at", "")[:10] + html_url = issue.get("html_url", "") + + # Parse body for structured info + testing_criteria = [] + technical_notes = [] + is_testing_required = False + is_docs_required = False + + body_lower = body.lower() + if "test" in body_lower or "testing" in body_lower: + is_testing_required = True + # Try to extract testing criteria + testing_section = re.search( + r"(?:testing|criteria|test).*?:(.*?)(?:\n\n|$)", + body, + re.IGNORECASE | re.DOTALL, + ) + if testing_section: + testing_criteria = [ + line.strip().lstrip("-*โ€ข") + for line in testing_section.group(1).split("\n") + if line.strip() + ] + + if "documentation" in body_lower or "docs" in body_lower: + is_docs_required = True + + # Check for technical notes section + tech_section = re.search( + r"(?:technical|tech).*?:(.*?)(?:\n\n|$)", + body, + re.IGNORECASE | re.DOTALL, + ) + if tech_section: + technical_notes = [ + line.strip().lstrip("-*โ€ข") + for line in tech_section.group(1).split("\n") + if line.strip() + ] + + # Extract issue references + issue_refs = self._parse_issue_refs(body) + if not any(ref == f"#{issue_number}" for ref in issue_refs): + issue_refs.insert(0, f"#{issue_number}") + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + output = f"# ๐Ÿ“‹ Ticket #{issue_number}: {title}\n\n" + output += f"**State:** {state.upper()} | **Author:** @{user} | **Created:** {created_at}\n" + output += f"**Labels:** {', '.join(labels) if labels else 'None'}\n" + output += f"**URL:** {html_url}\n\n" + + if body: + output += "## ๐Ÿ“ Description\n\n" + # Truncate very long descriptions + if len(body) > 1000: + output += f"{body[:1000]}...\n\n" + output += "_Description truncated. Use `get_issue()` for full content._\n\n" + else: + output += f"{body}\n\n" + else: + output += "_No description provided._\n\n" + + # Testing requirements + output += "## ๐Ÿงช Testing Requirements\n\n" + if is_testing_required: + if testing_criteria: + output += "**Testing Criteria:**\n" + for criterion in testing_criteria: + output += f"- [ ] {criterion}\n" + output += "\n" + else: + output += "Testing required, but no specific criteria listed.\n\n" + else: + output += "No explicit testing requirements detected.\n\n" + + # Technical notes + if technical_notes: + output += "## ๐Ÿ”ง Technical Notes\n\n" + for note in technical_notes: + output += f"- {note}\n" + output += "\n" + + # Documentation check + if is_docs_required: + output += "## ๐Ÿ“š Documentation Required\n\n" + output += "This ticket mentions documentation needs.\n\n" + + # Issue references + if issue_refs: + output += "## ๐Ÿ”— Related Issues\n\n" + for ref in issue_refs: + output += f"- {ref}\n" + output += "\n" + + # Next steps + output += "## ๐Ÿš€ Next Steps\n\n" + output += f"1. Create branch: `create_feature_branch({issue_number}, \"{title[:50]}...\")`\n" + output += "2. Make changes using `apply_diff()` or `commit_changes()`\n" + output += "3. Create PR: `create_pull_request()`\n" + + return output.strip() + + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, f"issue #{issue_number}") + if e.response.status_code == 404: + return f"Error: Issue #{issue_number} not found in {owner}/{repo_name}." + return f"Error: Failed to fetch issue. {error_msg}" + except Exception as e: + return f"Error: Unexpected failure during issue fetch: {type(e).__name__}: {e}" + + async def suggest_branch_name( + self, + issue_number: int, + repo: Optional[str] = None, + scope: str = "feature", + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Get the branch name that WILL be created for an issue. + + CRITICAL: This is for INFORMATION only - system generates this automatically. + LLM should NOT use this to manage branches - just for visibility. + + Args: + issue_number: The issue number + repo: Repository in 'owner/repo' format + scope: Branch scope prefix + __user__: User context + + Returns: + The branch name that will be created + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + # Fetch issue title + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + response = await client.get( + self._api_url(f"/repos/{owner}/{repo_name}/issues/{issue_number}"), + headers=self._headers(__user__), + ) + response.raise_for_status() + issue = response.json() + title = issue.get("title", "") + except Exception: + title = "" + + # Generate branch name (system-managed) + branch_name = self._generate_branch_name(issue_number, title, scope) + + # Check if branch exists + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + branch_response = await client.get( + self._api_url(f"/repos/{owner}/{repo_name}/branches/{branch_name}"), + headers=self._headers(__user__), + ) + if branch_response.status_code == 200: + branch_name += " (already exists)" + except Exception: + pass + + return branch_name + + async def create_feature_branch( + self, + issue_number: int, + title: Optional[str] = None, + repo: Optional[str] = None, + scope: str = "feature", + __user__: dict = None, + __chat_id__: str = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Create a new feature branch for a ticket. + + CRITICAL: Branch name is SYSTEM-GENERATED from issue_number + title. + LLM should NOT pass branch names - system handles this automatically. + + This method: + 1. Fetches issue title if not provided + 2. Generates branch name from issue_number + scope (system-managed) + 3. Validates the branch name (system validation) + 4. Creates the branch from the default/base branch + 5. Caches the branch name for the session (chat_id โ†’ working_branch) + + Args: + issue_number: The ticket/issue number + title: Optional title (will fetch from issue if not provided) + repo: Repository in 'owner/repo' format + scope: Branch scope (feature, fix, refactor, etc.) + __chat_id__: Session ID for caching (CRITICAL for branch tracking) + __user__: User context + + Returns: + Branch creation confirmation with system-generated name + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured in UserValves settings." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + # Fetch title from issue if not provided + if not title: + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + issue_response = await client.get( + self._api_url( + f"/repos/{owner}/{repo_name}/issues/{issue_number}" + ), + headers=self._headers(__user__), + ) + issue_response.raise_for_status() + issue = issue_response.json() + title = issue.get("title", "") + except Exception as e: + return f"Error: Could not fetch issue #{issue_number}: {e}" + + # CRITICAL: Generate branch name (SYSTEM-MANAGED, NOT LLM-CONTROLLED) + branch_name = self._generate_branch_name(issue_number, title, scope) + + # Validate branch name (system validation) + is_valid, error_msg = self._validate_branch_name(branch_name) + if not is_valid: + return f"โŒ **Branch Validation Failed**\n\n{error_msg}" + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Creating branch {branch_name}...", + "done": False, + }, + } + ) + + # Get base branch (with caching) + base_branch = self._get_branch(None, __user__) + if __chat_id__: + cached_base = self._get_cached_session(__chat_id__) + if cached_base and "default_branch" in cached_base: + base_branch = cached_base["default_branch"] + else: + # Cache default branch for session + self._set_cached_session(__chat_id__, {"default_branch": base_branch}) + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + response = await client.post( + self._api_url(f"/repos/{owner}/{repo_name}/branches"), + headers=self._headers(__user__), + json={ + "new_branch_name": branch_name, + "old_branch_name": base_branch, + }, + ) + + # Handle branch already exists + if response.status_code == 409: + # Branch exists, that's okay - just use it + pass + + response.raise_for_status() + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + # CRITICAL: Cache the working branch for session (chat_id โ†’ working_branch) + if __chat_id__: + session_data = self._get_cached_session(__chat_id__) or {} + session_data["working_branch"] = branch_name + session_data["default_branch"] = base_branch + session_data["issue_number"] = issue_number + self._set_cached_session(__chat_id__, session_data) + + return f"""โœ… **Feature Branch Created Successfully** + +**Branch:** `{branch_name}` (SYSTEM-GENERATED) +**Base Branch:** `{base_branch}` +**Repository:** `{owner}/{repo_name}` +**Issue:** #{issue_number} +**Session:** {'Active (cached)' if __chat_id__ else 'No session'} + +**Next Steps:** +1. Make changes to files (branch auto-selected from session) +2. Use `apply_diff()` for incremental changes or `commit_changes()` for full replacements +3. Commit with descriptive messages +4. Create PR when ready: `create_pull_request()` + +**LLM Responsibilities:** +- โœ… Write code/content +- โœ… Generate commit messages +- โœ… Reference issues in PRs +- โŒ NO branch naming +- โŒ NO branch selection (system auto-detects) + +**System-Managed Branch Name:** +- Format: `/-` +- Example: `{scope}/{issue_number}-{title[:30].lower().replace(' ', '-')}` +""" + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, "branch creation") + if e.response.status_code == 409: + # Branch already exists - try to use it + if __chat_id__: + session_data = self._get_cached_session(__chat_id__) or {} + session_data["working_branch"] = branch_name + self._set_cached_session(__chat_id__, session_data) + + return f"""โš ๏ธ **Branch Already Exists** + +**Branch:** `{branch_name}` (SYSTEM-GENERATED) +**Repository:** `{owner}/{repo_name}` + +The branch already exists. Using it for this session. + +**Session cached:** Branch `{branch_name}` is now active. + +**Continue with:** +- `apply_diff()` or `commit_changes()` for changes +- `create_pull_request()` when ready +""" + return f"Error: Failed to create branch. {error_msg}" + except Exception as e: + return f"Error: Unexpected failure during branch creation: {type(e).__name__}: {e}" + + async def get_branch_status( + self, + repo: Optional[str] = None, + __user__: dict = None, + __chat_id__: str = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Get the current working branch status for the session. + + CRITICAL: Branch is SYSTEM-MANAGED from chat_id cache. + + Args: + repo: Repository in 'owner/repo' format + __chat_id__: Session ID for cache lookup (CRITICAL) + __user__: User context + + Returns: + Current branch and cached info + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + # CRITICAL: Get working branch from session cache (system-managed) + working_branch = None + default_branch = None + issue_number = None + + if __chat_id__: + session_data = self._get_cached_session(__chat_id__) + if session_data: + working_branch = session_data.get("working_branch") + default_branch = session_data.get("default_branch") + issue_number = session_data.get("issue_number") + + if not default_branch: + default_branch = self._get_branch(None, __user__) + + output = f"# ๐ŸŒฟ Branch Status: {owner}/{repo_name}\n\n" + output += f"**Default Branch:** `{default_branch}` (system fallback)\n" + + if working_branch: + output += f"**Working Branch:** `{working_branch}` (SYSTEM-MANAGED)\n" + if issue_number: + output += f"**Issue:** #{issue_number}\n" + output += "\n**Session active - branch auto-selected for operations.**\n" + output += "\n**Operations will automatically use:**\n" + output += f"- `apply_diff(path, diff)` โ†’ {working_branch}\n" + output += f"- `commit_changes(path, content)` โ†’ {working_branch}\n" + output += f"- `create_pull_request()` โ†’ {working_branch} โ†’ {default_branch}\n" + else: + output += "\n**No working branch set for this session.**\n" + output += "Create one: `create_feature_branch(issue_number, title)`\n" + + return output + + async def list_my_branches( + self, + repo: Optional[str] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + List all branches in the repository (filtered view). + + Args: + repo: Repository in 'owner/repo' format + __user__: User context + + Returns: + Formatted list of branches + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Fetching branches...", "done": False}, + } + ) + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + response = await client.get( + self._api_url(f"/repos/{owner}/{repo_name}/branches"), + headers=self._headers(__user__), + params={"limit": 50}, + ) + response.raise_for_status() + branches = response.json() + + output = f"# ๐ŸŒฟ Branches in {owner}/{repo_name}\n\n" + + # Separate protected and feature branches + protected = [b for b in branches if b.get("protected")] + feature = [ + b + for b in branches + if not b.get("protected") + and any(b["name"].startswith(f"{s}/") for s in self.valves.ALLOWED_SCOPES) + ] + other = [ + b + for b in branches + if not b.get("protected") + and not any(b["name"].startswith(f"{s}/") for s in self.valves.ALLOWED_SCOPES) + ] + + if protected: + output += "## ๐Ÿ›ก๏ธ Protected Branches\n\n" + for branch in sorted(protected, key=lambda x: x["name"]): + name = branch.get("name", "") + commit_sha = branch.get("commit", {}).get("id", "")[:8] + output += f"- `{name}` [commit: {commit_sha}]\n" + output += "\n" + + if feature: + output += "## ๐Ÿ“ฆ Feature Branches\n\n" + for branch in sorted(feature, key=lambda x: x["name"]): + name = branch.get("name", "") + commit_sha = branch.get("commit", {}).get("id", "")[:8] + output += f"- `{name}` [commit: {commit_sha}]\n" + output += "\n" + + if other: + output += "## ๐Ÿ“„ Other Branches\n\n" + for branch in sorted(other, key=lambda x: x["name"])[:20]: + name = branch.get("name", "") + commit_sha = branch.get("commit", {}).get("id", "")[:8] + output += f"- `{name}` [commit: {commit_sha}]\n" + if len(other) > 20: + output += f"\n... and {len(other) - 20} more branches\n" + output += "\n" + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + return output.strip() + + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, "branch listing") + return f"Error: Failed to list branches. {error_msg}" + except Exception as e: + return f"Error: Unexpected failure: {type(e).__name__}: {e}" + + async def apply_diff( + self, + path: str, + diff_content: str, + message: Optional[str] = None, + repo: Optional[str] = None, + auto_message: bool = True, + __user__: dict = None, + __chat_id__: str = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Apply a unified diff patch to a file. + + CRITICAL: Branch is AUTOMATICALLY DETECTED from chat_id session. + LLM should NOT pass branch parameter - system handles this. + + This is the PREFERRED method for making changes as it: + 1. Is precise about what changes + 2. Prevents accidental file replacements + 3. Is what LLMs understand best (trained on GitHub PRs) + + Args: + path: File path to update + diff_content: Unified diff in standard format + message: Commit message (auto-generated if not provided) + repo: Repository in 'owner/repo' format + auto_message: Generate commit message if not provided + __chat_id__: Session ID for branch detection (CRITICAL) + __user__: User context + + Returns: + Commit details and diff summary + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured in UserValves settings." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + # CRITICAL: Get working branch from session cache (system-managed) + effective_branch = self._get_working_branch(__chat_id__, repo, __user__) + + if not effective_branch: + return "Error: No working branch. Call `create_feature_branch(issue_number, title)` first." + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Applying diff to {path} on {effective_branch}...", + "done": False, + }, + } + ) + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + # Get current file content + get_response = await client.get( + self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), + headers=self._headers(__user__), + params={"ref": effective_branch}, + ) + + # Check if file exists + if get_response.status_code == 404: + return f"Error: File not found: `{path}`. Use `create_file()` to create a new file." + + get_response.raise_for_status() + file_info = get_response.json() + current_sha = file_info.get("sha") + + # Decode current content + current_content_b64 = file_info.get("content", "") + try: + current_content = base64.b64decode(current_content_b64).decode( + "utf-8" + ) + except Exception: + return "Error: Could not decode current file content." + + # Parse and apply the diff + new_content = self._apply_unified_diff(current_content, diff_content) + + if new_content is None: + return "Error: Failed to parse or apply diff. Check the diff format." + + # Generate commit message if needed + if not message: + if auto_message: + message = self._generate_diff_commit_message(path, diff_content) + else: + return "Error: Commit message is required when auto_message=False." + + # Commit the changes + new_content_b64 = base64.b64encode(new_content.encode("utf-8")).decode( + "ascii" + ) + + response = await client.put( + self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), + headers=self._headers(__user__), + json={ + "content": new_content_b64, + "message": message, + "branch": effective_branch, + "sha": current_sha, + }, + ) + response.raise_for_status() + result = response.json() + + commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] + + # Parse diff stats + added_lines = diff_content.count("+") - diff_content.count("+++") + removed_lines = diff_content.count("-") - diff_content.count("---") + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + output = f"โœ… **Diff Applied Successfully**\n\n" + output += f"**File:** `{path}`\n" + output += f"**Branch:** `{effective_branch}` (SYSTEM-MANAGED)\n" + output += f"**Commit:** `{commit_sha}`\n" + output += f"**Changes:** +{added_lines} lines, -{removed_lines} lines\n\n" + output += f"**Message:** {message}\n" + + return output + + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, f"diff application to '{path}'") + if e.response.status_code == 409: + return f"Error: Update conflict for `{path}`. Fetch the latest version and try again." + return f"Error: Failed to apply diff. {error_msg}" + except Exception as e: + return f"Error: Unexpected failure during diff application: {type(e).__name__}: {e}" + + def _apply_unified_diff( + self, current_content: str, diff_content: str + ) -> Optional[str]: + """ + Apply a unified diff to content. + + Args: + current_content: Current file content + diff_content: Unified diff patch + + Returns: + New content after applying diff, or None if failed + """ + import difflib + + try: + # Parse the diff + diff_lines = diff_content.splitlines(keepends=True) + + # Simple unified diff parser for basic cases + # Handles: --- old +++ new @@ -old +new @@ + hunks = [] + current_hunk = None + in_hunk = False + + for line in diff_lines: + if line.startswith("---"): + continue # Skip old filename + elif line.startswith("+++"): + continue # Skip new filename + elif line.startswith("@@"): + # New hunk starts + if current_hunk: + hunks.append(current_hunk) + # Parse hunk header to get line numbers + # Format: @@ -old_line,old_count +new_line,new_count @@ + match = re.search( + r"@@\s+-(\d+)(?:,(\d+))?\s+(\d+)(?:,(\d+))?\s+@@", line + ) + if match: + old_start = int(match.group(1)) + new_start = int(match.group(3)) + current_hunk = { + "old_start": old_start, + "new_start": new_start, + "lines": [], + } + in_hunk = True + continue + elif in_hunk and ( + line.startswith("+") or line.startswith("-") or line.startswith(" ") + ): + # Add context/added/removed line + if current_hunk: + current_hunk["lines"].append(line) + elif in_hunk and not line.startswith("+") and not line.startswith( + "-" + ) and not line.startswith(" "): + # End of hunk + if current_hunk: + hunks.append(current_hunk) + current_hunk = None + in_hunk = False + + if current_hunk: + hunks.append(current_hunk) + + # Apply hunks to content + if not hunks: + # No hunks, return unchanged + return current_content + + # Split content into lines + old_lines = current_content.splitlines(keepends=True) + + # Apply diff using difflib + old_lines_for_patch = [line.rstrip("\n") for line in old_lines] + + # Create a modified version of the lines + # Sort hunks by position and apply in reverse order to maintain indices + hunks.sort(key=lambda h: h["old_start"], reverse=True) + + for hunk in hunks: + old_start = hunk["old_start"] - 1 # Convert to 0-indexed + new_lines = [] + skip_lines = 0 + + for line in hunk["lines"]: + if line.startswith("+"): + new_lines.append(line[1:].rstrip("\n") + "\n") + elif line.startswith("-"): + skip_lines += 1 + else: + # Context line + if skip_lines > 0: + # Skip the deleted lines + old_start += 1 + skip_lines = 0 + new_lines.append(line.rstrip("\n") + "\n") + + # Apply the hunk + # Insert new lines at the correct position + new_content_lines = ( + old_lines[:old_start] + + new_lines + + old_lines[old_start + skip_lines :] + ) + + # Reconstruct content + current_content = "".join(new_content_lines) + + return current_content + + except Exception as e: + # Log the error but don't fail + print(f"Diff application warning: {e}") + return None + + def _generate_diff_commit_message(self, path: str, diff_content: str) -> str: + """ + Generate a commit message from diff content. + + Args: + path: File path + diff_content: Unified diff + + Returns: + Generated commit message + """ + # Extract file name from path + file_name = path.split("/")[-1] + + # Detect change type from diff + change_type = "chore" + if any( + line.startswith("+def ") or line.startswith("+class ") + for line in diff_content.splitlines() + ): + change_type = "feat" + elif any( + line.startswith("+ return ") or line.startswith("+ return ") + for line in diff_content.splitlines() + ): + change_type = "fix" + elif "test" in path.lower() or "spec" in path.lower(): + change_type = "test" + elif ".md" in path.lower() or "readme" in path.lower(): + change_type = "docs" + elif any(line.startswith("-") for line in diff_content.splitlines()): + change_type = "refactor" + + # Generate message + message = f"{change_type}({file_name}): " + + # Extract a short description from added lines + added_lines = [ + line[1:].strip() + for line in diff_content.splitlines() + if line.startswith("+") and not line.startswith("+++") + ] + + if added_lines: + # Use first meaningful added line + description = "" + for line in added_lines: + if line and not line.startswith("import ") and not line.startswith( + "from " + ): + # Get function/class definition or first statement + match = re.match( + r"(def|class|const|var|let|interface|type)\s+(\w+)", line + ) + if match: + kind = match.group(1) + name = match.group(2) + if kind == "def": + description = f"add {name}() function" + break + elif kind == "class": + description = f"add {name} class" + break + elif line.startswith(" ") or line.startswith("\t"): + # Indented line, skip + continue + else: + # Use as description + description = line[:50].rstrip(":") + if len(line) > 50: + description += "..." + break + + if not description: + # Fallback to line count + added_count = len(added_lines) + description = f"update ({added_count} lines added)" + + message += description + + return message + + async def commit_changes( + self, + path: str, + content: str, + message: Optional[str] = None, + repo: Optional[str] = None, + max_delta_percent: Optional[float] = None, + __user__: dict = None, + __chat_id__: str = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Commit file changes with automatic content detection and size delta gating. + + CRITICAL: Branch is AUTOMATICALLY DETECTED from chat_id session. + LLM should NOT pass branch parameter - system handles this. + + This method: + 1. Detects whether to create or replace a file + 2. Validates file size changes against threshold (quality gate) + 3. Auto-generates commit message if not provided + 4. Commits to the appropriate branch (system-managed) + + Args: + path: File path to create or update + content: New file content + message: Commit message (auto-generated if not provided) + repo: Repository in 'owner/repo' format + max_delta_percent: Override for size delta threshold (quality gate) + __chat_id__: Session ID for branch detection (CRITICAL) + __user__: User context + __event_emitter__: Event emitter for progress + + Returns: + Commit details or error with guidance + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured in UserValves settings." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + # CRITICAL: Get working branch from session cache (system-managed) + effective_branch = self._get_working_branch(__chat_id__, repo, __user__) + + if not effective_branch: + return "Error: No working branch. Call `create_feature_branch(issue_number, title)` first." + + # Use provided threshold or default from valves + delta_threshold = max_delta_percent or self.valves.MAX_SIZE_DELTA_PERCENT + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Processing {path} on {effective_branch}...", + "done": False, + }, + } + ) + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + # Check if file exists + get_response = await client.get( + self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), + headers=self._headers(__user__), + params={"ref": effective_branch}, + ) + + file_exists = get_response.status_code == 200 + current_sha = None + current_size = 0 + + if file_exists: + get_response.raise_for_status() + file_info = get_response.json() + current_sha = file_info.get("sha") + current_size = file_info.get("size", 0) + + # SIZE DELTA GATE - Quality check + new_size = len(content.encode("utf-8")) + if current_size > 0 and new_size > 0: + delta_percent = ( + abs(new_size - current_size) / current_size * 100 + ) + + if delta_percent > delta_threshold: + # Calculate actual bytes changed + size_diff = new_size - current_size + direction = "larger" if size_diff > 0 else "smaller" + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": "Size gate triggered", + "done": True, + "hidden": True, + }, + } + ) + + return f"""โš ๏ธ **Quality Gate: Large File Change Detected** + +**File:** `{path}` +**Current Size:** {current_size} bytes +**New Size:** {new_size} bytes +**Change:** {size_diff:+d} bytes ({delta_percent:.1f}% {direction}) +**Threshold:** {delta_threshold}% + +This change exceeds the size delta threshold, which may indicate: +- Accidental full file replacement +- Unintended data loss +- LLM confusion about existing content + +**Recommended Actions:** + +1. **Use diff-based updates** (preferred): + ```python + apply_diff( + path="{path}", + diff=\"\"\"--- a/{path} ++++ b/{path} +@@ -1,3 +1,5 @@ + existing line ++new line to add + -line to remove +\"\"\", + message="feat(scope): description of changes" + ) + ``` + +2. **Fetch and review current content**: + ```python + current = get_file("{path}") + # Compare with what you want to change + ``` + +3. **Commit with override** (not recommended): + Increase the threshold if this is intentional: + ```python + commit_changes(..., max_delta_percent=100) + ``` + +**Why this gate exists:** +Large file replacements by LLMs often indicate the model didn't properly understand the existing file structure. Using diffs ensures precise, targeted changes. +""" + + # Generate commit message if not provided + if not message: + message = self._generate_commit_message( + change_type="chore", + scope=path.split("/")[-1] if "/" in path else path, + description=f"update {path}", + ) + + # Prepare content + content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii") + + if file_exists: + # Replace existing file + if not current_sha: + return f"Error: Could not retrieve SHA for existing file: {path}" + + response = await client.put( + self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), + headers=self._headers(__user__), + json={ + "content": content_b64, + "message": message, + "branch": effective_branch, + "sha": current_sha, + }, + ) + else: + # Create new file + response = await client.post( + self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), + headers=self._headers(__user__), + json={ + "content": content_b64, + "message": message, + "branch": effective_branch, + }, + ) + + response.raise_for_status() + result = response.json() + + commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] + action = "Updated" if file_exists else "Created" + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + # Calculate and show size change + new_size = len(content.encode("utf-8")) + size_info = "" + if file_exists and current_size > 0: + delta = new_size - current_size + delta_percent = delta / current_size * 100 + size_info = f"**Size Change:** {delta:+d} bytes ({delta_percent:.1f}%)\n" + + output = f"""โœ… **{action} File Successfully** + +**File:** `{path}` +**Branch:** `{effective_branch}` (SYSTEM-MANAGED) +**Commit:** `{commit_sha}` +**Message:** {message} + +{size_info}**Action:** {action} +""" + + return output + + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, f"file commit for '{path}'") + if e.response.status_code == 409: + return f"Error: Update conflict for `{path}`. Fetch the latest version and try again." + return f"Error: Failed to commit file. {error_msg}" + except Exception as e: + return f"Error: Unexpected failure during commit: {type(e).__name__}: {e}" + + async def create_pull_request( + self, + title: str, + body: Optional[str] = "", + repo: Optional[str] = None, + __user__: dict = None, + __chat_id__: str = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Create a pull request from the current branch. + + CRITICAL: Branch is AUTOMATICALLY DETECTED from chat_id session. + + Args: + title: PR title + body: PR description (auto-populates from issue if linked) + repo: Repository in 'owner/repo' format + __chat_id__: Session ID for branch detection (CRITICAL) + __user__: User context + __event_emitter__: Event emitter for progress + + Returns: + PR creation confirmation with details + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + # CRITICAL: Get current branch from session cache (system-managed) + head_branch = None + if __chat_id__: + session_data = self._get_cached_session(__chat_id__) + if session_data and "working_branch" in session_data: + head_branch = session_data["working_branch"] + + if not head_branch: + return "Error: No working branch. Call `create_feature_branch(issue_number, title)` first." + + base_branch = self._get_branch(None, __user__) + + # Validate that head branch is not a protected branch + is_valid, error_msg = self._validate_branch_name(head_branch) + if not is_valid: + return f"โŒ **Cannot Create PR**\n\n{error_msg}\n\nCreate a feature branch first using `create_feature_branch()`." + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Creating PR...", "done": False}, + } + ) + + try: + # Auto-populate body with issue reference if not provided + if not body: + # Try to extract issue number from session cache + issue_number = None + if __chat_id__: + session_data = self._get_cached_session(__chat_id__) + if session_data and "issue_number" in session_data: + issue_number = session_data["issue_number"] + + if issue_number: + body = f"Closes #{issue_number}\n\nThis PR implements the changes from issue #{issue_number}." + else: + body = "Automated PR from gitea_coder workflow." + + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + response = await client.post( + self._api_url(f"/repos/{owner}/{repo_name}/pulls"), + headers=self._headers(__user__), + json={ + "title": title, + "head": head_branch, + "base": base_branch, + "body": body, + }, + ) + + # Handle PR already exists + if response.status_code == 409: + return f"โš ๏ธ **PR Already Exists**\n\nA pull request for branch `{head_branch}` โ†’ `{base_branch}` already exists.\n\nCheck existing PRs and update it instead." + + response.raise_for_status() + pr = response.json() + + pr_number = pr.get("number") + pr_url = pr.get("html_url", "") + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + return f"""โœ… **Pull Request Created Successfully** + +**PR #{pr_number}:** {title} +**Branch:** `{head_branch}` โ†’ `{base_branch}` (SYSTEM-MANAGED) +**URL:** {pr_url} + +**Description:** +{body} + +**Next Steps:** +1. Add reviewers if needed +2. Address any merge conflicts +3. Await review feedback + +**To check PR status:** +```python +get_pull_request({pr_number}) +``` +""" + + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, "PR creation") + if e.response.status_code == 422: + return "Error: Could not create PR. The branch may not exist or there may be merge conflicts." + return f"Error: Failed to create PR. {error_msg}" + except Exception as e: + return f"Error: Unexpected failure during PR creation: {type(e).__name__}: {e}" + + async def replace_file( + self, + path: str, + content: str, + message: str, + repo: Optional[str] = None, + __user__: dict = None, + __chat_id__: str = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Update an existing file in the repository (creates commit). + + CRITICAL: Branch is AUTOMATICALLY DETECTED from chat_id session. + WARNING: This replaces the entire file content. For incremental changes, + use `apply_diff()` instead to prevent accidental data loss. + + Args: + path: File path to update + content: New file content as string + message: Commit message + repo: Repository in 'owner/repo' format + __chat_id__: Session ID for branch detection (CRITICAL) + __user__: User context + __event_emitter__: Event emitter for progress + + Returns: + Commit details and success confirmation + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured in UserValves settings." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + # CRITICAL: Get working branch from session cache + effective_branch = self._get_working_branch(__chat_id__, repo, __user__) + + if not effective_branch: + return "Error: No working branch. Call `create_feature_branch(issue_number, title)` first." + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Updating {path} on {effective_branch}...", + "done": False, + }, + } + ) + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + # Get current file SHA + get_response = await client.get( + self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), + headers=self._headers(__user__), + params={"ref": effective_branch}, + ) + + if get_response.status_code == 404: + return f"Error: File not found: `{path}`. Use `create_file()` to create a new file, or `apply_diff()` to add content to a new file." + + get_response.raise_for_status() + file_info = get_response.json() + sha = file_info.get("sha") + + if not sha: + return "Error: Could not retrieve file SHA for update." + + # Prepare updated content + content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii") + + # Update file + response = await client.put( + self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), + headers=self._headers(__user__), + json={ + "content": content_b64, + "message": message, + "branch": effective_branch, + "sha": sha, + }, + ) + response.raise_for_status() + result = response.json() + + commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + output = f"**File Updated Successfully**\n\n" + output += f"**File:** `{owner}/{repo_name}/{path}`\n" + output += f"**Branch:** `{effective_branch}` (SYSTEM-MANAGED)\n" + output += f"**Commit:** `{commit_sha}`\n" + output += f"**Message:** {message}\n\n" + output += "_Use `apply_diff()` for incremental changes to prevent data loss._\n" + + return output + + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, f"file update for '{path}'") + if e.response.status_code == 409: + return f"Error: Update conflict for `{path}`. Fetch the latest version and try again." + return f"Error: Failed to update file. {error_msg}" + except Exception as e: + return f"Error: Unexpected failure during file update: {type(e).__name__}: {e}" + + async def create_file( + self, + path: str, + content: str, + message: str, + repo: Optional[str] = None, + __user__: dict = None, + __chat_id__: str = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Create a new file in the repository. + + CRITICAL: Branch is AUTOMATICALLY DETECTED from chat_id session. + For adding content to existing files, use `apply_diff()` instead. + + Args: + path: File path to create (e.g., 'docs/README.md') + content: Initial file content as string + message: Commit message + repo: Repository in 'owner/repo' format + __chat_id__: Session ID for branch detection (CRITICAL) + __user__: User context + __event_emitter__: Event emitter for progress + + Returns: + Commit details and success confirmation + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured in UserValves settings." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + # CRITICAL: Get working branch from session cache + effective_branch = self._get_working_branch(__chat_id__, repo, __user__) + + if not effective_branch: + return "Error: No working branch. Call `create_feature_branch(issue_number, title)` first." + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": f"Creating {path}...", "done": False}, + } + ) + + try: + content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii") + + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + response = await client.post( + self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), + headers=self._headers(__user__), + json={ + "content": content_b64, + "message": message, + "branch": effective_branch, + }, + ) + response.raise_for_status() + result = response.json() + + commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + output = f"**File Created Successfully**\n\n" + output += f"**File:** `{owner}/{repo_name}/{path}`\n" + output += f"**Branch:** `{effective_branch}` (SYSTEM-MANAGED)\n" + output += f"**Commit:** `{commit_sha}`\n" + output += f"**Message:** {message}\n" + + return output + + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, f"file creation for '{path}'") + if e.response.status_code == 422: + return f"Error: File already exists: `{path}`. Use `replace_file()` or `apply_diff()` to modify it." + return f"Error: Failed to create file. {error_msg}" + except Exception as e: + return f"Error: Unexpected failure during file creation: {type(e).__name__}: {e}" + + async def delete_file( + self, + path: str, + message: str, + repo: Optional[str] = None, + __user__: dict = None, + __chat_id__: str = None, + __event_emitter__: Callable[[dict], Any] = None, + __event_call__: Callable[[dict], Any] = None, + ) -> str: + """ + Delete a file from the repository (creates commit). + + CRITICAL: Branch is AUTOMATICALLY DETECTED from chat_id session. + Requires confirmation before deletion. + + Args: + path: File path to delete + message: Commit message for the deletion + repo: Repository in 'owner/repo' format + __chat_id__: Session ID for branch detection (CRITICAL) + __user__: User context + __event_emitter__: Event emitter for progress + __event_call__: Confirmation dialog callback + + Returns: + Confirmation with commit details or error + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured in UserValves settings." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + # CRITICAL: Get working branch from session cache + effective_branch = self._get_working_branch(__chat_id__, repo, __user__) + + if not effective_branch: + return "Error: No working branch. Call `create_feature_branch(issue_number, title)` first." + + # Confirmation dialog + if __event_call__: + result = await __event_call__( + { + "type": "confirmation", + "data": { + "title": "Confirm File Deletion", + "message": f"Delete `{path}` from `{owner}/{repo_name}` ({effective_branch})?", + }, + } + ) + if result is None or result is False: + return "โš ๏ธ File deletion cancelled by user." + if isinstance(result, dict) and not result.get("confirmed"): + return "โš ๏ธ File deletion cancelled by user." + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Deleting {path} from {effective_branch}...", + "done": False, + }, + } + ) + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + # Get file SHA + get_response = await client.get( + self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), + headers=self._headers(__user__), + params={"ref": effective_branch}, + ) + + if get_response.status_code == 404: + return f"Error: File not found: `{path}`" + + get_response.raise_for_status() + file_info = get_response.json() + sha = file_info.get("sha") + + if not sha: + return "Error: Could not retrieve file SHA for deletion." + + # Delete file + response = await client.delete( + self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), + headers=self._headers(__user__), + json={ + "message": message, + "branch": effective_branch, + "sha": sha, + }, + ) + response.raise_for_status() + result = response.json() + + commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + output = f"**File Deleted Successfully**\n\n" + output += f"**File:** `{owner}/{repo_name}/{path}`\n" + output += f"**Branch:** `{effective_branch}` (SYSTEM-MANAGED)\n" + output += f"**Commit:** `{commit_sha}`\n" + output += f"**Message:** {message}\n" + + return output + + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, f"file deletion for '{path}'") + return f"Error: Failed to delete file. {error_msg}" + except Exception as e: + return f"Error: Unexpected failure during file deletion: {type(e).__name__}: {e}" + + async def rename_file( + self, + from_path: str, + to_path: str, + message: str, + repo: Optional[str] = None, + __user__: dict = None, + __chat_id__: str = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Rename a file in the repository (creates commit). + + CRITICAL: Branch is AUTOMATICALLY DETECTED from chat_id session. + + This is implemented as a delete + create operation to preserve history. + + Args: + from_path: Current file path + to_path: New file path + message: Commit message for the rename + repo: Repository in 'owner/repo' format + __chat_id__: Session ID for branch detection (CRITICAL) + __user__: User context + __event_emitter__: Event emitter for progress + + Returns: + Confirmation with commit details or error + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured in UserValves settings." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + # CRITICAL: Get working branch from session cache + effective_branch = self._get_working_branch(__chat_id__, repo, __user__) + + if not effective_branch: + return "Error: No working branch. Call `create_feature_branch(issue_number, title)` first." + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Renaming {from_path} to {to_path}...", + "done": False, + }, + } + ) + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + # Get current file content + get_response = await client.get( + self._api_url(f"/repos/{owner}/{repo_name}/contents/{from_path}"), + headers=self._headers(__user__), + params={"ref": effective_branch}, + ) + + if get_response.status_code == 404: + return f"Error: File not found: `{from_path}`" + + get_response.raise_for_status() + file_info = get_response.json() + current_sha = file_info.get("sha") + current_content_b64 = file_info.get("content", "") + + # Decode content + try: + current_content = base64.b64decode(current_content_b64).decode( + "utf-8" + ) + except Exception: + return "Error: Could not decode file content." + + # Delete old file + delete_response = await client.delete( + self._api_url(f"/repos/{owner}/{repo_name}/contents/{from_path}"), + headers=self._headers(__user__), + json={ + "message": f"{message} (deleting old file)", + "branch": effective_branch, + "sha": current_sha, + }, + ) + delete_response.raise_for_status() + + # Create new file at new path + new_content_b64 = base64.b64encode(current_content.encode("utf-8")).decode( + "ascii" + ) + + create_response = await client.post( + self._api_url(f"/repos/{owner}/{repo_name}/contents/{to_path}"), + headers=self._headers(__user__), + json={ + "content": new_content_b64, + "message": message, + "branch": effective_branch, + }, + ) + create_response.raise_for_status() + result = create_response.json() + + commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + output = f"**File Renamed Successfully**\n\n" + output += f"**From:** `{from_path}`\n" + output += f"**To:** `{to_path}`\n" + output += f"**Branch:** `{effective_branch}` (SYSTEM-MANAGED)\n" + output += f"**Commit:** `{commit_sha}`\n" + output += f"**Message:** {message}\n" + + return output + + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, f"file rename from '{from_path}' to '{to_path}'") + return f"Error: Failed to rename file. {error_msg}" + except Exception as e: + return f"Error: Unexpected failure during file rename: {type(e).__name__}: {e}" + + async def get_file( + self, + path: str, + repo: Optional[str] = None, + __user__: dict = None, + __chat_id__: str = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Get the contents of a file from the repository. + + CRITICAL: Branch is AUTOMATICALLY DETECTED from chat_id session. + + Args: + path: Full path to the file (e.g., 'src/main.py') + repo: Repository in 'owner/repo' format + __chat_id__: Session ID for branch detection (CRITICAL) + __user__: User context + __event_emitter__: Event emitter for progress + + Returns: + File content with metadata (SHA, size, branch) + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured in UserValves settings." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + # CRITICAL: Get working branch from session cache + effective_branch = self._get_working_branch(__chat_id__, repo, __user__) + + if not effective_branch: + return "Error: No working branch. Call `create_feature_branch(issue_number, title)` first." + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Reading file {path}...", + "done": False, + }, + } + ) + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + response = await client.get( + self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), + headers=self._headers(__user__), + params={"ref": effective_branch}, + ) + response.raise_for_status() + file_info = response.json() + + if isinstance(file_info, list): + return f"**Note:** '{path}' is a directory. Use `list_files()` to browse its contents." + + if file_info.get("type") != "file": + return f"Error: '{path}' is not a file (type: {file_info.get('type')})" + + content_b64 = file_info.get("content", "") + try: + content = base64.b64decode(content_b64).decode("utf-8") + except Exception: + return "Error: Could not decode file content. The file may be binary or corrupted." + + size = file_info.get("size", 0) + sha_short = file_info.get("sha", "unknown")[:8] + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + output = f"**File:** `{owner}/{repo_name}/{path}`\n" + output += f"**Branch:** `{effective_branch}` (SYSTEM-MANAGED) | **SHA:** `{sha_short}` | **Size:** {size} bytes\n" + output += f"**URL:** {file_info.get('html_url', 'N/A')}\n\n" + output += f"```{content}\n```\n" + + return output + + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, f"file fetch for '{path}'") + if e.response.status_code == 404: + return f"Error: File not found: `{path}`. Verify the file path and branch." + return f"Error: Failed to fetch file. {error_msg}" + except Exception as e: + return f"Error: Unexpected failure during file fetch: {type(e).__name__}: {e}" + + async def list_files( + self, + path: str = "", + repo: Optional[str] = None, + __user__: dict = None, + __chat_id__: str = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + List files and directories in a repository path. + + CRITICAL: Branch is AUTOMATICALLY DETECTED from chat_id session. + + Args: + path: Directory path to list (default: root) + repo: Repository in 'owner/repo' format + __chat_id__: Session ID for branch detection (CRITICAL) + __user__: User context + __event_emitter__: Event emitter for progress + + Returns: + Formatted directory listing with file sizes and types + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured in UserValves settings." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + # CRITICAL: Get working branch from session cache + effective_branch = self._get_working_branch(__chat_id__, repo, __user__) + + if not effective_branch: + return "Error: No working branch. Call `create_feature_branch(issue_number, title)` first." + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Listing {path or 'root'}...", + "done": False, + }, + } + ) + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + response = await client.get( + self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), + headers=self._headers(__user__), + params={"ref": effective_branch}, + ) + response.raise_for_status() + contents = response.json() + + if isinstance(contents, dict): + return f"**Note:** '{path}' is a file. Use `get_file()` to read its contents." + + output = f"**Contents of {owner}/{repo_name}/{path or '.'}** (`{effective_branch}` branch)\n\n" + + dirs = [item for item in contents if item.get("type") == "dir"] + files = [item for item in contents if item.get("type") == "file"] + + if dirs: + output += "**๐Ÿ“ Directories:**\n" + for item in sorted(dirs, key=lambda x: x.get("name", "").lower()): + output += f"- `๐Ÿ“ {item.get('name', '')}/`\n" + output += "\n" + + if files: + output += "**๐Ÿ“„ Files:**\n" + for item in sorted(files, key=lambda x: x.get("name", "").lower()): + size = item.get("size", 0) + if size < 1024: + size_str = f"{size}B" + elif size < 1024 * 1024: + size_str = f"{size//1024}KB" + else: + size_str = f"{size//(1024*1024)}MB" + sha_short = item.get("sha", "unknown")[:8] + output += f"- `{item.get('name', '')}` ({size_str}, sha: {sha_short})\n" + + output += f"\n**Total:** {len(dirs)} directories, {len(files)} files" + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + return output + + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, f"directory listing for '{path}'") + if e.response.status_code == 404: + return f"Error: Path not found: `{path}`. Verify the path exists in the repository." + return f"Error: Failed to list directory contents. {error_msg}" + except Exception as e: + return f"Error: Unexpected failure during directory listing: {type(e).__name__}: {e}"