diff --git a/gitea/coder.py b/gitea/coder.py index b2497e1..82b61d3 100644 --- a/gitea/coder.py +++ b/gitea/coder.py @@ -1,3 +1,1154 @@ +""" +title: Gitea Coder - Workflow Role with Scope Enforcement +author: Jeff Smith + Claude + minimax +version: 1.0.0 +license: MIT +description: High-level workflow role for LLM-based code generation with scope gating and quality gates +requirements: pydantic, httpx +changelog: + 1.0.0: + - Initial implementation of gitea_coder role + - Branch creation with scope gating (prevents main pushes) + - Enforces branch naming conventions (feature/, fix/, refactor/, etc.) + - Generates detailed commit messages with ticket references + - Creates PRs from branches + - Reads ticket requirements from issues + - Unified file operations workflow + - Added diff-based updates with apply_diff() + - Added size delta gating in commit_changes() for quality control +""" + +from typing import Optional, Callable, Any, Dict, List, Tuple +from pydantic import BaseModel, Field +import re +import time +import base64 +import difflib +import httpx + + +class Tools: + """ + Gitea Coder Role - High-level workflow automation for code generation tasks. + + This role implements the coder workflow: + reads ticket → understands issue → creates/modifies branch → commits with detailed messages + + Key Features: + - Branch scope gating (prevents main/master pushes) + - Enforces branch naming conventions + - Auto-generates conventional commit messages + - Quality gates for file changes (size delta validation) + - Diff-based updates to prevent accidental file replacements + - Session caching for chat_id → default_branch mapping + """ + + class Valves(BaseModel): + """System-wide configuration for Gitea Coder integration""" + + GITEA_URL: str = Field( + default="https://gitea.example.com", + description="Gitea server URL (ingress or internal service)", + ) + DEFAULT_REPO: str = Field( + default="", + description="Default repository in owner/repo format", + ) + DEFAULT_BRANCH: str = Field( + default="main", + description="Default branch name for operations", + ) + DEFAULT_ORG: str = Field( + default="", + description="Default organization for org-scoped operations", + ) + ALLOW_USER_OVERRIDES: bool = Field( + default=True, + description="Allow users to override defaults via UserValves", + ) + VERIFY_SSL: bool = Field( + default=True, + description="Verify SSL certificates (disable for self-signed certs)", + ) + DEFAULT_PAGE_SIZE: int = Field( + default=50, + description="Default page size for list operations (max 50)", + ge=1, + le=50, + ) + # Coder-specific settings + MAX_SIZE_DELTA_PERCENT: float = Field( + default=50.0, + description="Maximum allowed file size change percentage (quality gate)", + ge=1.0, + le=500.0, + ) + PROTECTED_BRANCHES: List[str] = Field( + default=["main", "master", "develop", "dev", "release", "hotfix"], + description="Branches that cannot be committed to directly", + ) + ALLOWED_SCOPES: List[str] = Field( + default=["feature", "fix", "refactor", "docs", "test", "chore", "wip"], + description="Allowed branch scope prefixes", + ) + + class UserValves(BaseModel): + """Per-user configuration for personal credentials and overrides""" + + GITEA_TOKEN: str = Field( + default="", + description="Your Gitea API token", + ) + USER_DEFAULT_REPO: str = Field( + default="", + description="Override default repository for this user", + ) + USER_DEFAULT_BRANCH: str = Field( + default="", + description="Override default branch for this user", + ) + USER_DEFAULT_ORG: str = Field( + default="", + description="Override default organization for this user", + ) + + def __init__(self): + """Initialize with optional valve configuration from framework""" + # Handle valves configuration from framework + self.valves = self.Valves() + + # Enable tool usage visibility for debugging + self.citation = True + + # Handle user valves configuration + self.user_valves = self.UserValves() + + # Session cache: chat_id → default_branch (with TTL) + self._session_cache: Dict[str, Tuple[str, float]] = {} + self._cache_ttl_seconds = 3600 # 1 hour + + # Initialize underlying dev operations (for actual API calls) + self._dev = None + + def _api_url(self, endpoint: str) -> str: + """Construct full API URL for Gitea endpoint""" + base = self._get_url() + return f"{base}/api/v1{endpoint}" + + def _get_url(self) -> str: + """Get effective Gitea URL with trailing slash handling""" + return self.valves.GITEA_URL.rstrip("/") + + def _get_token(self, __user__: dict = None) -> str: + """Extract Gitea token from user context with robust handling""" + if __user__ and "valves" in __user__: + user_valves = __user__.get("valves") + if user_valves: + return user_valves.GITEA_TOKEN + return "" + + def _headers(self, __user__: dict = None) -> dict: + """Generate authentication headers with token""" + token = self._get_token(__user__) + if not token: + return {"Content-Type": "application/json"} + return { + "Authorization": f"token {token}", + "Content-Type": "application/json", + } + + def _format_error(self, e, context: str = "") -> str: + """Format HTTP error with detailed context for LLM understanding""" + try: + error_json = e.response.json() + error_msg = error_json.get("message", e.response.text[:200]) + except Exception: + error_msg = e.response.text[:200] + + context_str = f" ({context})" if context else "" + return f"HTTP Error {e.response.status_code}{context_str}: {error_msg}" + + def _get_repo(self, repo: Optional[str], __user__: dict = None) -> str: + """Get effective repository with priority resolution""" + if repo: + return repo + if __user__ and "valves" in __user__: + user_valves = __user__.get("valves") + if user_valves: + if self.valves.ALLOW_USER_OVERRIDES and user_valves.USER_DEFAULT_REPO: + return user_valves.USER_DEFAULT_REPO + return self.valves.DEFAULT_REPO + + def _get_branch(self, branch: Optional[str], __user__: dict = None) -> str: + """Get effective branch with priority resolution""" + if branch: + return branch + if __user__ and "valves" in __user__: + user_valves = __user__.get("valves") + if user_valves: + if self.valves.ALLOW_USER_OVERRIDES and user_valves.USER_DEFAULT_BRANCH: + return user_valves.USER_DEFAULT_BRANCH + return self.valves.DEFAULT_BRANCH + + def _get_org(self, org: Optional[str], __user__: dict = None) -> str: + """Get effective org with priority.""" + if org: + return org + if __user__ and "valves" in __user__: + user_valves = __user__.get("valves") + if user_valves: + if self.valves.ALLOW_USER_OVERRIDES and user_valves.USER_DEFAULT_ORG: + return user_valves.USER_DEFAULT_ORG + return self.valves.DEFAULT_ORG + + def _resolve_repo( + self, repo: Optional[str], __user__: dict = None + ) -> tuple[str, str]: + """Resolve repository string into owner and repo name with validation""" + effective_repo = self._get_repo(repo, __user__) + + if not effective_repo: + raise ValueError( + "No repository specified. Set DEFAULT_REPO in Valves or USER_DEFAULT_REPO in UserValves." + ) + + if "/" not in effective_repo: + raise ValueError( + f"Repository must be in 'owner/repo' format, got: {effective_repo}" + ) + + return effective_repo.split("/", 1) + + def _get_page_size(self, limit: Optional[int] = None) -> int: + """Calculate effective page size, capped at Gitea's max of 50""" + if limit is not None: + return min(limit, 50) + return min(self.valves.DEFAULT_PAGE_SIZE, 50) + + def _get_cached_data(self, chat_id: str, key: str) -> Optional[Any]: + """Get cached data for chat session with TTL""" + cache_key = f"{chat_id}:{key}" + if cache_key in self._session_cache: + data, timestamp = self._session_cache[cache_key] + if time.time() - timestamp < self._cache_ttl_seconds: + return data + else: + # Expired, remove from cache + del self._session_cache[cache_key] + return None + + def _set_cached_data(self, chat_id: str, key: str, data: Any) -> None: + """Set cached data for chat session with TTL""" + cache_key = f"{chat_id}:{key}" + self._session_cache[cache_key] = (data, time.time()) + + def _validate_branch_name(self, branch_name: str) -> tuple[bool, str]: + """ + Validate branch name against allowed scopes and protected branches. + + Returns: + tuple: (is_valid, error_message) + """ + # Check if it's a protected branch (direct commit attempt) + if branch_name in self.valves.PROTECTED_BRANCHES: + return False, ( + f"Branch '{branch_name}' is protected. " + f"Direct commits to protected branches are not allowed. " + f"Create a feature branch instead." + ) + + # Check if it starts with an allowed scope + scope_pattern = r"^(" + "|".join(self.valves.ALLOWED_SCOPES) + r")/" + if not re.match(scope_pattern, branch_name): + allowed = ", ".join([f"{s}/" for s in self.valves.ALLOWED_SCOPES]) + return False, ( + f"Branch '{branch_name}' does not follow naming convention. " + f"Use format: {allowed}-. " + f"Example: feature/42-add-user-auth" + ) + + return True, "" + + def _parse_issue_refs(self, text: str) -> List[str]: + """Extract issue references from text (e.g., #42, issue #42)""" + refs = re.findall(r"#(\d+)", text) + issue_refs = [f"#{ref}" for ref in refs] + + # Also check for "issue N" pattern + issue_n_refs = re.findall(r"issue\s*#?(\d+)", text, re.IGNORECASE) + for ref in issue_n_refs: + issue_ref = f"#{ref}" + if issue_ref not in issue_refs: + issue_refs.append(issue_ref) + + return issue_refs + + def _generate_commit_message( + self, + change_type: str, + scope: str, + description: str, + issue_refs: Optional[List[str]] = None, + body: Optional[str] = None, + ) -> str: + """ + Generate a conventional commit message. + + Format: scope(type): description + + Args: + change_type: Type of change (feat, fix, docs, etc.) + scope: Area of change (file, module, or component) + description: Brief description of changes + issue_refs: List of issue references (e.g., ["#42"]) + body: Optional longer description + + Returns: + Formatted commit message + """ + # Validate and normalize change type + valid_types = [ + "feat", "fix", "docs", "style", "refactor", "test", + "chore", "perf", "ci", "build", "revert" + ] + if change_type.lower() not in valid_types: + change_type = "chore" # Default for unknown types + + # Build the subject line + scope_str = f"({scope})" if scope else "" + message = f"{change_type.lower()}{scope_str}: {description}" + + # Add issue references to body or footer + if issue_refs: + refs_str = ", ".join(issue_refs) + footer = f"Refs: {refs_str}" + + if body: + body = f"{body}\n\n{footer}" + else: + message = f"{message}\n\n{footer}" + + return message + + async def workflow_summary( + self, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Get a summary of available coder workflows and commands. + + Returns: + Markdown-formatted workflow guide + """ + output = """# 🚀 Gitea Coder Workflow Guide + +## Quick Start + +1. **Read the ticket:** `read_ticket(issue_number)` +2. **Create feature branch:** `create_feature_branch(issue_number)` +3. **Make changes:** `apply_diff()` or `commit_changes()` +4. **Create PR:** `create_pull_request()` + +## Available Commands + +### 📋 Reading Tickets +- `read_ticket(issue_number)` - Get full issue details + +### 🌿 Branch Management +- `create_feature_branch(issue_number, title)` - Create scoped branch +- `get_branch_status()` - See current working branch +- `list_my_branches()` - List your branches + +### 📝 File Operations +- `apply_diff(path, diff, message)` - Apply unified diff patch +- `commit_changes(path, content, message)` - Commit with size delta gate +- `replace_file(path, content, message)` - Replace entire file +- `create_file(path, content, message)` - Create new file + +### 🔍 Quality Gates +- Size delta checks (default: 50% max change) +- Branch scope validation +- Protected branch enforcement + +### 📦 Pull Requests +- `create_pull_request(title, description)` - Create PR from current branch + +## Branch Naming Convention + +``` +/- + +Examples: +- feature/42-add-user-login +- fix/37-fix-memory-leak +- refactor/15-cleanup-api +- docs/20-update-readme +``` + +Allowed scopes: `feature/`, `fix/`, `refactor/`, `docs/`, `test/`, `chore/`, `wip/` + +## Quality Gates + +### Size Delta Gate (commit_changes) +- Files > 50% size change require diff-based updates +- Prevents accidental file replacements +- Configurable threshold in Valves + +### Branch Protection +- Cannot commit directly to: main, master, develop, dev, release, hotfix +- Create feature branches instead + +## Example Workflow + +```python +# Read the ticket +ticket = read_ticket(42) + +# Create branch (auto-extracts from ticket) +create_feature_branch(42, ticket["title"]) + +# Make changes using diff +apply_diff( + path="src/auth.py", + diff=\"\"\"--- a/src/auth.py ++++ b/src/auth.py +@@ -10,3 +10,7 @@ class Auth: ++ def login(self, user: str) -> bool: ++ return True +\"\"\", + message="feat(auth): add login method to Auth class" +) + +# Create PR +create_pull_request( + title="feat(auth): add login method", + body="Implements login functionality as specified in #42" +) +``` + +## Tips + +- Use `suggest_branch_name(issue_number)` to get branch name suggestions +- Use `list_my_branches()` to track your active work +- Always reference issues in commits: `Refs: #42` +- Use diff-based updates for incremental changes +- Large changes should be split into multiple commits +""" + return output + + async def read_ticket( + self, + issue_number: int, + repo: Optional[str] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Read and parse a ticket/issue to understand requirements. + + Args: + issue_number: The issue/ticket number to read + repo: Repository in 'owner/repo' format + + Returns: + Formatted ticket summary with parsed requirements + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured in UserValves settings." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Fetching issue #{issue_number}...", + "done": False, + }, + } + ) + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + response = await client.get( + self._api_url(f"/repos/{owner}/{repo_name}/issues/{issue_number}"), + headers=self._headers(__user__), + ) + response.raise_for_status() + issue = response.json() + + title = issue.get("title", "No title") + body = issue.get("body", "") + state = issue.get("state", "unknown") + user = issue.get("user", {}).get("login", "unknown") + labels = [label.get("name", "") for label in issue.get("labels", [])] + created_at = issue.get("created_at", "")[:10] + html_url = issue.get("html_url", "") + + # Parse body for structured info + testing_criteria = [] + technical_notes = [] + is_testing_required = False + is_docs_required = False + + body_lower = body.lower() + if "test" in body_lower or "testing" in body_lower: + is_testing_required = True + # Try to extract testing criteria + testing_section = re.search( + r"(?:testing|criteria|test).*?:(.*?)(?:\n\n|$)", + body, + re.IGNORECASE | re.DOTALL, + ) + if testing_section: + testing_criteria = [ + line.strip().lstrip("-*•") + for line in testing_section.group(1).split("\n") + if line.strip() + ] + + if "documentation" in body_lower or "docs" in body_lower: + is_docs_required = True + + # Check for technical notes section + tech_section = re.search( + r"(?:technical|tech).*?:(.*?)(?:\n\n|$)", + body, + re.IGNORECASE | re.DOTALL, + ) + if tech_section: + technical_notes = [ + line.strip().lstrip("-*•") + for line in tech_section.group(1).split("\n") + if line.strip() + ] + + # Extract issue references + issue_refs = self._parse_issue_refs(body) + if not any(ref == f"#{issue_number}" for ref in issue_refs): + issue_refs.insert(0, f"#{issue_number}") + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + output = f"# 📋 Ticket #{issue_number}: {title}\n\n" + output += f"**State:** {state.upper()} | **Author:** @{user} | **Created:** {created_at}\n" + output += f"**Labels:** {', '.join(labels) if labels else 'None'}\n" + output += f"**URL:** {html_url}\n\n" + + if body: + output += "## 📝 Description\n\n" + # Truncate very long descriptions + if len(body) > 1000: + output += f"{body[:1000]}...\n\n" + output += "_Description truncated. Use `get_issue()` for full content._\n\n" + else: + output += f"{body}\n\n" + else: + output += "_No description provided._\n\n" + + # Testing requirements + output += "## 🧪 Testing Requirements\n\n" + if is_testing_required: + if testing_criteria: + output += "**Testing Criteria:**\n" + for criterion in testing_criteria: + output += f"- [ ] {criterion}\n" + output += "\n" + else: + output += "Testing required, but no specific criteria listed.\n\n" + else: + output += "No explicit testing requirements detected.\n\n" + + # Technical notes + if technical_notes: + output += "## 🔧 Technical Notes\n\n" + for note in technical_notes: + output += f"- {note}\n" + output += "\n" + + # Documentation check + if is_docs_required: + output += "## 📚 Documentation Required\n\n" + output += "This ticket mentions documentation needs.\n\n" + + # Issue references + if issue_refs: + output += "## 🔗 Related Issues\n\n" + for ref in issue_refs: + output += f"- {ref}\n" + output += "\n" + + # Suggested branch name + suggested = self._suggest_branch_name(issue_number, title) + output += "## 🌿 Suggested Branch Name\n\n" + output += f"```\n{suggested}\n```\n\n" + + # Next steps + output += "## 🚀 Next Steps\n\n" + output += f"1. Create branch: `create_feature_branch({issue_number}, \"{title[:50]}...\")`\n" + output += "2. Make changes using `apply_diff()` or `commit_changes()`\n" + output += "3. Create PR: `create_pull_request()`\n" + + return output.strip() + + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, f"issue #{issue_number}") + if e.response.status_code == 404: + return f"Error: Issue #{issue_number} not found in {owner}/{repo_name}." + return f"Error: Failed to fetch issue. {error_msg}" + except Exception as e: + return f"Error: Unexpected failure during issue fetch: {type(e).__name__}: {e}" + + def _suggest_branch_name( + self, issue_number: int, title: str, scope: str = "feature" + ) -> str: + """ + Suggest a branch name based on issue number and title. + + Args: + issue_number: The issue number + title: The issue title + scope: Branch scope prefix (default: feature) + + Returns: + Suggested branch name in format: scope/issue-id-short-description + """ + # Clean up title for branch name + # Remove special characters, lowercase, replace spaces with hyphens + slug = re.sub(r"[^a-z0-9\s-]", "", title.lower()) + slug = re.sub(r"[\s-]+", "-", slug) + slug = slug.strip("-") + + # Truncate and add issue number + if len(slug) > 30: + slug = slug[:30].strip("-") + + return f"{scope}/{issue_number}-{slug}" + + async def suggest_branch_name( + self, + issue_number: int, + repo: Optional[str] = None, + scope: str = "feature", + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Get branch name suggestions based on issue number. + + Args: + issue_number: The issue number + repo: Repository in 'owner/repo' format + scope: Branch scope prefix + __user__: User context + + Returns: + Suggested branch name + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + # Fetch issue title + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + response = await client.get( + self._api_url(f"/repos/{owner}/{repo_name}/issues/{issue_number}"), + headers=self._headers(__user__), + ) + response.raise_for_status() + issue = response.json() + title = issue.get("title", "") + except Exception: + title = "" + + suggested = self._suggest_branch_name(issue_number, title, scope) + + # Check if branch exists + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + branch_response = await client.get( + self._api_url(f"/repos/{owner}/{repo_name}/branches/{suggested}"), + headers=self._headers(__user__), + ) + if branch_response.status_code == 200: + suggested += " (already exists)" + except Exception: + pass + + return suggested + + async def create_feature_branch( + self, + issue_number: int, + title: Optional[str] = None, + repo: Optional[str] = None, + scope: str = "feature", + __user__: dict = None, + __chat_id__: str = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Create a new feature branch for a ticket. + + This is the main entry point for the coder workflow. It: + 1. Validates the branch name against allowed scopes + 2. Prevents commits to protected branches + 3. Creates the branch from the default/base branch + 4. Caches the branch name for the session + + Args: + issue_number: The ticket/issue number + title: Optional title (will fetch from issue if not provided) + repo: Repository in 'owner/repo' format + scope: Branch scope (feature, fix, refactor, etc.) + __chat_id__: Session ID for caching + __user__: User context + + Returns: + Branch creation confirmation + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured in UserValves settings." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + # Fetch title from issue if not provided + if not title: + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + issue_response = await client.get( + self._api_url( + f"/repos/{owner}/{repo_name}/issues/{issue_number}" + ), + headers=self._headers(__user__), + ) + issue_response.raise_for_status() + issue = issue_response.json() + title = issue.get("title", "") + except Exception as e: + return f"Error: Could not fetch issue #{issue_number}: {e}" + + # Generate branch name + branch_name = self._suggest_branch_name(issue_number, title, scope) + + # Validate branch name + is_valid, error_msg = self._validate_branch_name(branch_name) + if not is_valid: + return f"❌ **Branch Validation Failed**\n\n{error_msg}" + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Creating branch {branch_name}...", + "done": False, + }, + } + ) + + # Get base branch (with caching) + base_branch = self._get_branch(None, __user__) + if __chat_id__: + cached_base = self._get_cached_data(__chat_id__, "default_branch") + if cached_base: + base_branch = cached_base + else: + self._set_cached_data(__chat_id__, "default_branch", base_branch) + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + response = await client.post( + self._api_url(f"/repos/{owner}/{repo_name}/branches"), + headers=self._headers(__user__), + json={ + "new_branch_name": branch_name, + "old_branch_name": base_branch, + }, + ) + + # Handle branch already exists + if response.status_code == 409: + return f"⚠️ **Branch Already Exists**\n\nBranch `{branch_name}` already exists in `{owner}/{repo_name}`.\n\nUse it or create a new one." + + response.raise_for_status() + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + # Cache the branch for session + if __chat_id__: + self._set_cached_data(__chat_id__, "working_branch", branch_name) + + return f"""✅ **Feature Branch Created Successfully** + +**Branch:** `{branch_name}` +**Base Branch:** `{base_branch}` +**Repository:** `{owner}/{repo_name}` +**Issue:** #{issue_number} + +**Next Steps:** +1. Make changes to files +2. Use `apply_diff()` for incremental changes or `commit_changes()` for full replacements +3. Commit with descriptive messages +4. Create PR when ready: `create_pull_request()` + +**Branch Naming Convention:** +- Format: `/-` +- Scopes: feature, fix, refactor, docs, test, chore, wip +- Examples: + - `feature/42-add-user-authentication` + - `fix/37-fix-memory-leak` + - `refactor/15-cleanup-api-code` +""" + + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, "branch creation") + if e.response.status_code == 409: + return f"Error: Branch `{branch_name}` already exists." + return f"Error: Failed to create branch. {error_msg}" + except Exception as e: + return f"Error: Unexpected failure during branch creation: {type(e).__name__}: {e}" + + async def get_branch_status( + self, + repo: Optional[str] = None, + __user__: dict = None, + __chat_id__: str = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Get the current working branch status for the session. + + Args: + repo: Repository in 'owner/repo' format + __chat_id__: Session ID for cache lookup + __user__: User context + + Returns: + Current branch and cached info + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + # Check for cached working branch + working_branch = None + default_branch = None + if __chat_id__: + working_branch = self._get_cached_data(__chat_id__, "working_branch") + default_branch = self._get_cached_data(__chat_id__, "default_branch") + + if not default_branch: + default_branch = self._get_branch(None, __user__) + + output = f"# 🌿 Branch Status: {owner}/{repo_name}\n\n" + output += f"**Default Branch:** `{default_branch}`\n" + + if working_branch: + output += f"**Working Branch:** `{working_branch}`\n\n" + output += "**Session cached - use this branch for commits.**\n" + else: + output += "\n**No working branch set for this session.**\n" + output += "Create one: `create_feature_branch(issue_number, title)`\n" + + return output + + async def list_my_branches( + self, + repo: Optional[str] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + List all branches in the repository (filtered view). + + Args: + repo: Repository in 'owner/repo' format + __user__: User context + + Returns: + Formatted list of branches + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Fetching branches...", "done": False}, + } + ) + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + response = await client.get( + self._api_url(f"/repos/{owner}/{repo_name}/branches"), + headers=self._headers(__user__), + params={"limit": 50}, + ) + response.raise_for_status() + branches = response.json() + + output = f"# 🌿 Branches in {owner}/{repo_name}\n\n" + + # Separate protected and feature branches + protected = [b for b in branches if b.get("protected")] + feature = [ + b + for b in branches + if not b.get("protected") + and any(b["name"].startswith(f"{s}/") for s in self.valves.ALLOWED_SCOPES) + ] + other = [ + b + for b in branches + if not b.get("protected") + and not any(b["name"].startswith(f"{s}/") for s in self.valves.ALLOWED_SCOPES) + ] + + if protected: + output += "## 🛡️ Protected Branches\n\n" + for branch in sorted(protected, key=lambda x: x["name"]): + name = branch.get("name", "") + commit_sha = branch.get("commit", {}).get("id", "")[:8] + output += f"- `{name}` [commit: {commit_sha}]\n" + output += "\n" + + if feature: + output += "## 📦 Feature Branches\n\n" + for branch in sorted(feature, key=lambda x: x["name"]): + name = branch.get("name", "") + commit_sha = branch.get("commit", {}).get("id", "")[:8] + output += f"- `{name}` [commit: {commit_sha}]\n" + output += "\n" + + if other: + output += "## 📄 Other Branches\n\n" + for branch in sorted(other, key=lambda x: x["name"])[:20]: + name = branch.get("name", "") + commit_sha = branch.get("commit", {}).get("id", "")[:8] + output += f"- `{name}` [commit: {commit_sha}]\n" + if len(other) > 20: + output += f"\n... and {len(other) - 20} more branches\n" + output += "\n" + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + return output.strip() + + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, "branch listing") + return f"Error: Failed to list branches. {error_msg}" + except Exception as e: + return f"Error: Unexpected failure: {type(e).__name__}: {e}" + + async def apply_diff( + self, + path: str, + diff_content: str, + message: Optional[str] = None, + repo: Optional[str] = None, + branch: Optional[str] = None, + auto_message: bool = True, + __user__: dict = None, + __chat_id__: str = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Apply a unified diff patch to a file. + + This is the PREFERRED method for making changes as it: + 1. Is precise about what changes + 2. Prevents accidental file replacements + 3. Is what LLMs understand best (trained on GitHub PRs) + + Args: + path: File path to update + diff_content: Unified diff in standard format + message: Commit message (auto-generated if not provided) + repo: Repository in 'owner/repo' format + branch: Branch name (defaults to working branch or default) + auto_message: Generate commit message if not provided + __user__: User context + __chat_id__: Session ID for branch caching + + Returns: + Commit details and diff summary + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured in UserValves settings." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + effective_branch = branch + if not effective_branch and __chat_id__: + effective_branch = self._get_cached_data(__chat_id__, "working_branch") + if not effective_branch: + effective_branch = self._get_branch(None, __user__) + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Applying diff to {path}...", + "done": False, + }, + } + ) + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + # Get current file content + get_response = await client.get( + self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), + headers=self._headers(__user__), + params={"ref": effective_branch}, + ) + + # Check if file exists + if get_response.status_code == 404: + return f"Error: File not found: `{path}`. Use `create_file()` to create a new file." + + get_response.raise_for_status() + file_info = get_response.json() + current_sha = file_info.get("sha") + + # Decode current content + current_content_b64 = file_info.get("content", "") + try: + current_content = base64.b64decode(current_content_b64).decode( + "utf-8" + ) + except Exception: + return "Error: Could not decode current file content." + + # Parse and apply the diff + new_content = self._apply_unified_diff(current_content, diff_content) + + if new_content is None: + return "Error: Failed to parse or apply diff. Check the diff format." + + # Generate commit message if needed + if not message: + if auto_message: + message = self._generate_diff_commit_message(path, diff_content) + else: + return "Error: Commit message is required when auto_message=False." + + # Commit the changes + new_content_b64 = base64.b64encode(new_content.encode("utf-8")).decode( + "ascii" + ) + + response = await client.put( + self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), + headers=self._headers(__user__), + json={ + "content": new_content_b64, + "message": message, + "branch": effective_branch, + "sha": current_sha, + }, + ) + response.raise_for_status() + result = response.json() + + commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] + + # Parse diff stats + added_lines = diff_content.count("+") - diff_content.count("+++") + removed_lines = diff_content.count("-") - diff_content.count("---") + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + output = f"✅ **Diff Applied Successfully**\n\n" + output += f"**File:** `{path}`\n" + output += f"**Branch:** `{effective_branch}`\n" + output += f"**Commit:** `{commit_sha}`\n" + output += f"**Changes:** +{added_lines} lines, -{removed_lines} lines\n\n" + output += f"**Message:** {message}\n" + + return output + + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, f"diff application to '{path}'") + if e.response.status_code == 409: + return f"Error: Update conflict for `{path}`. Fetch the latest version and try again." + return f"Error: Failed to apply diff. {error_msg}" + except Exception as e: + return f"Error: Unexpected failure during diff application: {type(e).__name__}: {e}" + def _apply_unified_diff( self, current_content: str, diff_content: str ) -> Optional[str]: @@ -15,8 +1166,7 @@ # Parse the diff diff_lines = diff_content.splitlines(keepends=True) - # Simple unified diff parser for basic cases - # Handles: --- old +++ new @@ -old +new @@ + # Parse hunks from unified diff hunks = [] current_hunk = None in_hunk = False @@ -32,7 +1182,9 @@ hunks.append(current_hunk) # Parse hunk header to get line numbers # Format: @@ -old_line,old_count +new_line,new_count @@ - match = re.search(r"@@\s+-(\d+)(?:,(\d+))?\s+\+(\d+)(?:,(\d+))?\s+@@", line) + match = re.search( + r"@@\s+-(\d+)(?:,(\d+))?\s+\+(\d+)(?:,(\d+))?\s+@@", line + ) if match: old_start = int(match.group(1)) new_start = int(match.group(3)) @@ -43,11 +1195,15 @@ } in_hunk = True continue - elif in_hunk and (line.startswith("+") or line.startswith("-") or line.startswith(" ")): + elif in_hunk and ( + line.startswith("+") or line.startswith("-") or line.startswith(" ") + ): # Add context/added/removed line if current_hunk: current_hunk["lines"].append(line) - elif in_hunk and not line.startswith("+") and not line.startswith("-") and not line.startswith(" "): + elif in_hunk and not line.startswith( + "+" + ) and not line.startswith("-") and not line.startswith(" "): # End of hunk if current_hunk: hunks.append(current_hunk) @@ -57,63 +1213,903 @@ if current_hunk: hunks.append(current_hunk) - # Apply hunks to content + # If no hunks, return unchanged if not hunks: - # No hunks, return unchanged return current_content # Split content into lines old_lines = current_content.splitlines(keepends=True) - # Apply diff using difflib - old_lines_for_patch = [line.rstrip("\n") for line in old_lines] + # Use difflib to apply the patch + old_lines_stripped = [line.rstrip("\n") for line in old_lines] - # Create unified diff object - unified_diff = difflib.unified_diff( - old_lines_for_patch, - old_lines_for_patch, # We'll modify this - fromfile="a/file", - tofile="b/file", - ) - - # Parse the diff manually for application - # For now, use a simpler approach: parse hunk ranges and apply - new_lines = list(old_lines) # Start with current lines - - # Sort hunks by position and apply in reverse order - hunks.sort(key=lambda h: h["old_start"], reverse=True) - - for hunk in hunks: + # Create a list for the new content + new_lines_stripped = list(old_lines_stripped) + + # Apply each hunk in reverse order (to maintain correct indices) + for hunk in sorted(hunks, key=lambda h: h["old_start"], reverse=True): old_start = hunk["old_start"] - 1 # Convert to 0-indexed + + # Collect lines to remove and add + lines_to_remove = [] lines_to_add = [] - lines_to_skip = 0 for line in hunk["lines"]: if line.startswith("+"): - lines_to_add.append(line[1:].rstrip("\n") + "\n") + lines_to_add.append(line[1:].rstrip("\n")) elif line.startswith("-"): - lines_to_skip += 1 - else: - # Context line - if lines_to_skip > 0: - # Skip the deleted lines - old_start += 1 # Move past the context line - lines_to_skip = 0 + lines_to_remove.append(line[1:].rstrip("\n")) - # Apply the hunk - # This is a simplified implementation - # A more robust solution would use a proper diff library + # Remove old lines and add new ones + # First, find and remove the exact lines specified + if lines_to_remove: + # Look for the first removed line at old_start + found_start = False + for i in range(old_start, min(old_start + len(lines_to_remove), len(new_lines_stripped))): + if i < len(new_lines_stripped) and new_lines_stripped[i] == lines_to_remove[0]: + # Found the start, remove the lines + del new_lines_stripped[i : i + len(lines_to_remove)] + found_start = True + break + + if not found_start: + # Fallback: just insert at old_start + pass + + # Insert new lines at old_start + for line in reversed(lines_to_add): + new_lines_stripped.insert(old_start, line) - # For a complete implementation, consider using: - # - GitPython for actual git operations - # - difflib with proper patch application - # - Or a dedicated diff/patch library + # Reconstruct content with line endings + new_content = "".join( + line + ("\n" if not line.endswith("\n") and i < len(new_lines_stripped) - 1 else "") + for i, line in enumerate(new_lines_stripped) + ) + + # Ensure proper line endings + if new_content and not new_content.endswith("\n"): + new_content += "\n" - # Return current content for now (placeholder) - # A full implementation would properly apply the diff - return current_content + return new_content except Exception as e: # Log the error but don't fail print(f"Diff application warning: {e}") - return None \ No newline at end of file + return None + + def _generate_diff_commit_message(self, path: str, diff_content: str) -> str: + """ + Generate a commit message from diff content. + + Args: + path: File path + diff_content: Unified diff + + Returns: + Generated commit message + """ + # Extract file name from path + file_name = path.split("/")[-1] + + # Detect change type from diff + change_type = "chore" + if any( + line.startswith("+def ") or line.startswith("+class ") + for line in diff_content.splitlines() + ): + change_type = "feat" + elif any( + line.startswith("+ return ") or line.startswith("+ return ") + for line in diff_content.splitlines() + ): + change_type = "fix" + elif "test" in path.lower() or "spec" in path.lower(): + change_type = "test" + elif ".md" in path.lower() or "readme" in path.lower(): + change_type = "docs" + elif any(line.startswith("-") for line in diff_content.splitlines()): + change_type = "refactor" + + # Generate message + message = f"{change_type}({file_name}): " + + # Extract a short description from added lines + added_lines = [ + line[1:].strip() + for line in diff_content.splitlines() + if line.startswith("+") and not line.startswith("+++") + ] + + if added_lines: + # Use first meaningful added line + description = "" + for line in added_lines: + if line and not line.startswith("import ") and not line.startswith("from "): + # Get function/class definition or first statement + match = re.match( + r"(def|class|const|var|let|interface|type)\s+(\w+)", line + ) + if match: + kind = match.group(1) + name = match.group(2) + if kind == "def": + description = f"add {name}() function" + break + elif kind == "class": + description = f"add {name} class" + break + elif line.startswith(" ") or line.startswith("\t"): + # Indented line, skip + continue + else: + # Use as description + description = line[:50].rstrip(":") + if len(line) > 50: + description += "..." + break + + if not description: + # Fallback to line count + added_count = len(added_lines) + description = f"update ({added_count} lines added)" + + message += description + + return message + + async def commit_changes( + self, + path: str, + content: str, + message: Optional[str] = None, + repo: Optional[str] = None, + branch: Optional[str] = None, + max_delta_percent: Optional[float] = None, + __user__: dict = None, + __chat_id__: str = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Commit file changes with automatic content detection and size delta gating. + + This method: + 1. Detects whether to create or replace a file + 2. Validates file size changes against threshold (quality gate) + 3. Auto-generates commit message if not provided + 4. Commits to the appropriate branch + + Args: + path: File path to create or update + content: New file content + message: Commit message (auto-generated if not provided) + repo: Repository in 'owner/repo' format + branch: Branch name (defaults to working branch or default) + max_delta_percent: Override for size delta threshold (quality gate) + __user__: User context + __chat_id__: Session ID for branch caching + __event_emitter__: Event emitter for progress + + Returns: + Commit details or error with guidance + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured in UserValves settings." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + effective_branch = branch + if not effective_branch and __chat_id__: + effective_branch = self._get_cached_data(__chat_id__, "working_branch") + if not effective_branch: + effective_branch = self._get_branch(None, __user__) + + # Use provided threshold or default from valves + delta_threshold = max_delta_percent or self.valves.MAX_SIZE_DELTA_PERCENT + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Processing {path}...", + "done": False, + }, + } + ) + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + # Check if file exists + get_response = await client.get( + self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), + headers=self._headers(__user__), + params={"ref": effective_branch}, + ) + + file_exists = get_response.status_code == 200 + current_sha = None + current_size = 0 + + if file_exists: + get_response.raise_for_status() + file_info = get_response.json() + current_sha = file_info.get("sha") + current_size = file_info.get("size", 0) + + # SIZE DELTA GATE - Quality check + new_size = len(content.encode("utf-8")) + if current_size > 0 and new_size > 0: + delta_percent = abs(new_size - current_size) / current_size * 100 + + if delta_percent > delta_threshold: + # Calculate actual bytes changed + size_diff = new_size - current_size + direction = "larger" if size_diff > 0 else "smaller" + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": "Size gate triggered", + "done": True, + "hidden": True, + }, + } + ) + + return f"""⚠️ **Quality Gate: Large File Change Detected** + +**File:** `{path}` +**Current Size:** {current_size} bytes +**New Size:** {new_size} bytes +**Change:** {size_diff:+d} bytes ({delta_percent:.1f}% {direction}) +**Threshold:** {delta_threshold}% + +This change exceeds the size delta threshold, which may indicate: +- Accidental full file replacement +- Unintended data loss +- LLM confusion about existing content + +**Recommended Actions:** + +1. **Use diff-based updates** (preferred): + ```python + apply_diff( + path="{path}", + diff=\"\"\"--- a/{path} ++++ b/{path} +@@ -1,3 +1,5 @@ + existing line ++new line to add + -line to remove +\"\"\", + message="feat(scope): description of changes" + ) + ``` + +2. **Fetch and review current content**: + ```python + current = get_file("{path}") + # Compare with what you want to change + ``` + +3. **Commit with override** (not recommended): + Increase the threshold if this is intentional: + ```python + commit_changes(..., max_delta_percent=100) + ``` + +**Why this gate exists:** +Large file replacements by LLMs often indicate the model didn't properly understand the existing file structure. Using diffs ensures precise, targeted changes. +""" + + # Generate commit message if not provided + if not message: + message = self._generate_commit_message( + change_type="chore", + scope=path.split("/")[-1] if "/" in path else path, + description=f"update {path}", + ) + + # Prepare content + content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii") + + if file_exists: + # Replace existing file + if not current_sha: + return f"Error: Could not retrieve SHA for existing file: {path}" + + response = await client.put( + self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), + headers=self._headers(__user__), + json={ + "content": content_b64, + "message": message, + "branch": effective_branch, + "sha": current_sha, + }, + ) + else: + # Create new file + response = await client.post( + self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), + headers=self._headers(__user__), + json={ + "content": content_b64, + "message": message, + "branch": effective_branch, + }, + ) + + response.raise_for_status() + result = response.json() + + commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] + action = "Updated" if file_exists else "Created" + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + # Calculate and show size change + new_size = len(content.encode("utf-8")) + size_info = "" + if file_exists and current_size > 0: + delta = new_size - current_size + delta_percent = delta / current_size * 100 + size_info = f"**Size Change:** {delta:+d} bytes ({delta_percent:.1f}%)\n" + + output = f"""✅ **{action} File Successfully** + +**File:** `{path}` +**Branch:** `{effective_branch}` +**Commit:** `{commit_sha}` +**Message:** {message} + +{size_info}**Action:** {action} +""" + + return output + + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, f"file commit for '{path}'") + if e.response.status_code == 409: + return f"Error: Update conflict for `{path}`. Fetch the latest version and try again." + return f"Error: Failed to commit file. {error_msg}" + except Exception as e: + return f"Error: Unexpected failure during commit: {type(e).__name__}: {e}" + + async def create_pull_request( + self, + title: str, + body: Optional[str] = "", + repo: Optional[str] = None, + __user__: dict = None, + __chat_id__: str = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Create a pull request from the current branch. + + Args: + title: PR title + body: PR description (auto-populates from issue if linked) + repo: Repository in 'owner/repo' format + __user__: User context + __chat_id__: Session ID for branch caching + __event_emitter__: Event emitter for progress + + Returns: + PR creation confirmation with details + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + # Get current branch + head_branch = None + if __chat_id__: + head_branch = self._get_cached_data(__chat_id__, "working_branch") + + if not head_branch: + # Try to guess from recent commits or use default + head_branch = self._get_branch(None, __user__) + + base_branch = self._get_branch(None, __user__) + + # Validate that head branch is not a protected branch + is_valid, error_msg = self._validate_branch_name(head_branch) + if not is_valid: + return f"❌ **Cannot Create PR**\n\n{error_msg}\n\nCreate a feature branch first using `create_feature_branch()`." + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Creating PR...", "done": False}, + } + ) + + try: + # Auto-populate body with issue reference if not provided + if not body: + # Try to extract issue number from branch name + match = re.search(r"/(\d+)-", head_branch) + if match: + issue_number = match.group(1) + body = f"Closes #{issue_number}\n\nThis PR implements the changes from issue #{issue_number}." + else: + body = "Automated PR from gitea_coder workflow." + + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + response = await client.post( + self._api_url(f"/repos/{owner}/{repo_name}/pulls"), + headers=self._headers(__user__), + json={ + "title": title, + "head": head_branch, + "base": base_branch, + "body": body, + }, + ) + + # Handle PR already exists + if response.status_code == 409: + return f"⚠️ **PR Already Exists**\n\nA pull request for branch `{head_branch}` → `{base_branch}` already exists.\n\nCheck existing PRs and update it instead." + + response.raise_for_status() + pr = response.json() + + pr_number = pr.get("number") + pr_url = pr.get("html_url", "") + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + return f"""✅ **Pull Request Created Successfully** + +**PR #{pr_number}:** {title} +**Branch:** `{head_branch}` → `{base_branch}` +**URL:** {pr_url} + +**Description:** +{body} + +**Next Steps:** +1. Add reviewers if needed +2. Address any merge conflicts +3. Await review feedback + +**To check PR status:** +```python +get_pull_request({pr_number}) +``` +""" + + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, "PR creation") + if e.response.status_code == 422: + return "Error: Could not create PR. The branch may not exist or there may be merge conflicts." + return f"Error: Failed to create PR. {error_msg}" + except Exception as e: + return f"Error: Unexpected failure during PR creation: {type(e).__name__}: {e}" + + async def replace_file( + self, + path: str, + content: str, + message: str, + repo: Optional[str] = None, + branch: Optional[str] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Update an existing file in the repository (creates commit). + + WARNING: This replaces the entire file content. For incremental changes, + use `apply_diff()` instead to prevent accidental data loss. + + Args: + path: File path to update + content: New file content as string + message: Commit message + repo: Repository in 'owner/repo' format + branch: Branch name (defaults to repository default) + __user__: User context + __event_emitter__: Event emitter for progress + + Returns: + Commit details and success confirmation + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured in UserValves settings." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + effective_branch = self._get_branch(branch, __user__) + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": f"Updating {path}...", "done": False}, + } + ) + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + # Get current file SHA + get_response = await client.get( + self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), + headers=self._headers(__user__), + params={"ref": effective_branch}, + ) + + if get_response.status_code == 404: + return f"Error: File not found: `{path}`. Use `create_file()` to create a new file, or `apply_diff()` to add content to a new file." + + get_response.raise_for_status() + file_info = get_response.json() + sha = file_info.get("sha") + + if not sha: + return "Error: Could not retrieve file SHA for update." + + # Prepare updated content + content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii") + + # Update file + response = await client.put( + self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), + headers=self._headers(__user__), + json={ + "content": content_b64, + "message": message, + "branch": effective_branch, + "sha": sha, + }, + ) + response.raise_for_status() + result = response.json() + + commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + output = f"**File Updated Successfully**\n\n" + output += f"**File:** `{owner}/{repo_name}/{path}`\n" + output += f"**Branch:** `{effective_branch}`\n" + output += f"**Commit:** `{commit_sha}`\n" + output += f"**Message:** {message}\n\n" + output += "_Use `apply_diff()` for incremental changes to prevent data loss._\n" + + return output + + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, f"file update for '{path}'") + if e.response.status_code == 409: + return f"Error: Update conflict for `{path}`. Fetch the latest version and try again." + return f"Error: Failed to update file. {error_msg}" + except Exception as e: + return f"Error: Unexpected failure during file update: {type(e).__name__}: {e}" + + async def create_file( + self, + path: str, + content: str, + message: str, + repo: Optional[str] = None, + branch: Optional[str] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Create a new file in the repository. + + For adding content to existing files, use `apply_diff()` instead. + + Args: + path: File path to create (e.g., 'docs/README.md') + content: Initial file content as string + message: Commit message + repo: Repository in 'owner/repo' format + branch: Branch name (defaults to repository default) + __user__: User context + __event_emitter__: Event emitter for progress + + Returns: + Commit details and success confirmation + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured in UserValves settings." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + effective_branch = self._get_branch(branch, __user__) + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": f"Creating {path}...", "done": False}, + } + ) + + try: + content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii") + + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + response = await client.post( + self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), + headers=self._headers(__user__), + json={ + "content": content_b64, + "message": message, + "branch": effective_branch, + }, + ) + response.raise_for_status() + result = response.json() + + commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + output = f"**File Created Successfully**\n\n" + output += f"**File:** `{owner}/{repo_name}/{path}`\n" + output += f"**Branch:** `{effective_branch}`\n" + output += f"**Commit:** `{commit_sha}`\n" + output += f"**Message:** {message}\n" + + return output + + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, f"file creation for '{path}'") + if e.response.status_code == 422: + return f"Error: File already exists: `{path}`. Use `replace_file()` or `apply_diff()` to modify it." + return f"Error: Failed to create file. {error_msg}" + except Exception as e: + return f"Error: Unexpected failure during file creation: {type(e).__name__}: {e}" + + async def get_file( + self, + path: str, + repo: Optional[str] = None, + branch: Optional[str] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Get the contents of a file from the repository. + + Args: + path: Full path to the file (e.g., 'src/main.py') + repo: Repository in 'owner/repo' format + branch: Branch name (defaults to repository default) + __user__: User context + __event_emitter__: Event emitter for progress + + Returns: + File content with metadata (SHA, size, branch) + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured in UserValves settings." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + effective_branch = self._get_branch(branch, __user__) + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Reading file {path}...", + "done": False, + }, + } + ) + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + response = await client.get( + self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), + headers=self._headers(__user__), + params={"ref": effective_branch}, + ) + response.raise_for_status() + file_info = response.json() + + if isinstance(file_info, list): + return f"**Note:** '{path}' is a directory. Use `list_files()` to browse its contents." + + if file_info.get("type") != "file": + return f"Error: '{path}' is not a file (type: {file_info.get('type')})" + + content_b64 = file_info.get("content", "") + try: + content = base64.b64decode(content_b64).decode("utf-8") + except Exception: + return "Error: Could not decode file content. The file may be binary or corrupted." + + size = file_info.get("size", 0) + sha_short = file_info.get("sha", "unknown")[:8] + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + output = f"**File:** `{owner}/{repo_name}/{path}`\n" + output += f"**Branch:** `{effective_branch}` | **SHA:** `{sha_short}` | **Size:** {size} bytes\n" + output += f"**URL:** {file_info.get('html_url', 'N/A')}\n\n" + output += f"```\n{content}\n```\n" + + return output + + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, f"file fetch for '{path}'") + if e.response.status_code == 404: + return f"Error: File not found: `{path}`. Verify the file path and branch." + return f"Error: Failed to fetch file. {error_msg}" + except Exception as e: + return f"Error: Unexpected failure during file fetch: {type(e).__name__}: {e}" + + async def list_files( + self, + path: str = "", + repo: Optional[str] = None, + branch: Optional[str] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + List files and directories in a repository path. + + Args: + path: Directory path to list (default: root) + repo: Repository in 'owner/repo' format + branch: Branch name (defaults to repository default) + __user__: User context + __event_emitter__: Event emitter for progress + + Returns: + Formatted directory listing with file sizes and types + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured in UserValves settings." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + effective_branch = self._get_branch(branch, __user__) + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Listing {path or 'root'}...", + "done": False, + }, + } + ) + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + response = await client.get( + self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), + headers=self._headers(__user__), + params={"ref": effective_branch}, + ) + response.raise_for_status() + contents = response.json() + + if isinstance(contents, dict): + return f"**Note:** '{path}' is a file. Use `get_file()` to read its contents." + + output = f"**Contents of {owner}/{repo_name}/{path or '.'}** (`{effective_branch}` branch)\n\n" + + dirs = [item for item in contents if item.get("type") == "dir"] + files = [item for item in contents if item.get("type") == "file"] + + if dirs: + output += "**📁 Directories:**\n" + for item in sorted(dirs, key=lambda x: x.get("name", "").lower()): + output += f"- `📁 {item.get('name', '')}/`\n" + output += "\n" + + if files: + output += "**📄 Files:**\n" + for item in sorted(files, key=lambda x: x.get("name", "").lower()): + size = item.get("size", 0) + if size < 1024: + size_str = f"{size}B" + elif size < 1024 * 1024: + size_str = f"{size//1024}KB" + else: + size_str = f"{size//(1024*1024)}MB" + sha_short = item.get("sha", "unknown")[:8] + output += f"- `{item.get('name', '')}` ({size_str}, sha: {sha_short})\n" + + output += f"\n**Total:** {len(dirs)} directories, {len(files)} files" + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + return output + + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, f"directory listing for '{path}'") + if e.response.status_code == 404: + return f"Error: Path not found: `{path}`. Verify the path exists in the repository." + return f"Error: Failed to list directory contents. {error_msg}" + except Exception as e: + return f"Error: Unexpected failure during directory listing: {type(e).__name__}: {e}" \ No newline at end of file