diff --git a/gitea/coder.py b/gitea/coder.py index 1c859a0..6410598 100644 --- a/gitea/coder.py +++ b/gitea/coder.py @@ -1,11 +1,14 @@ """ title: Gitea Coder - Workflow Role with Scope Enforcement author: Jeff Smith + Claude + minimax -version: 1.0.0 +version: 1.0.1 license: MIT description: High-level workflow role for LLM-based code generation with scope gating and quality gates requirements: pydantic, httpx changelog: + 1.0.1: + - Fixed: moved difflib import to module level (was incorrectly inside function) + - difflib is Python stdlib, no pip install required 1.0.0: - Initial implementation of gitea_coder role - Branch creation with scope gating (prevents main pushes) @@ -23,2081 +26,5 @@ from pydantic import BaseModel, Field import re import time import base64 +import difflib import httpx - - -class Tools: - """ - Gitea Coder Role - High-level workflow automation for code generation tasks. - - This role implements the coder workflow: - reads ticket โ†’ understands issue โ†’ creates/modifies branch โ†’ commits with detailed messages - - Key Features: - - Branch scope gating (prevents main/master pushes) - - Enforces branch naming conventions - - Auto-generates conventional commit messages - - Quality gates for file changes (size delta validation) - - Diff-based updates to prevent accidental file replacements - - Session caching for chat_id -> default_branch mapping - """ - - class Valves(BaseModel): - """System-wide configuration for Gitea Coder integration""" - - GITEA_URL: str = Field( - default="https://gitea.example.com", - description="Gitea server URL (ingress or internal service)", - ) - DEFAULT_REPO: str = Field( - default="", - description="Default repository in owner/repo format", - ) - DEFAULT_BRANCH: str = Field( - default="main", - description="Default branch name for operations", - ) - DEFAULT_ORG: str = Field( - default="", - description="Default organization for org-scoped operations", - ) - ALLOW_USER_OVERRIDES: bool = Field( - default=True, - description="Allow users to override defaults via UserValves", - ) - VERIFY_SSL: bool = Field( - default=True, - description="Verify SSL certificates (disable for self-signed certs)", - ) - DEFAULT_PAGE_SIZE: int = Field( - default=50, - description="Default page size for list operations (max 50)", - ge=1, - le=50, - ) - # Coder-specific settings - MAX_SIZE_DELTA_PERCENT: float = Field( - default=50.0, - description="Maximum allowed file size change percentage (quality gate)", - ge=1.0, - le=500.0, - ) - PROTECTED_BRANCHES: List[str] = Field( - default=["main", "master", "develop", "dev", "release", "hotfix"], - description="Branches that cannot be committed to directly", - ) - ALLOWED_SCOPES: List[str] = Field( - default=["feature", "fix", "refactor", "docs", "test", "chore", "wip"], - description="Allowed branch scope prefixes", - ) - - class UserValves(BaseModel): - """Per-user configuration for personal credentials and overrides""" - - GITEA_TOKEN: str = Field( - default="", - description="Your Gitea API token", - ) - USER_DEFAULT_REPO: str = Field( - default="", - description="Override default repository for this user", - ) - USER_DEFAULT_BRANCH: str = Field( - default="", - description="Override default branch for this user", - ) - USER_DEFAULT_ORG: str = Field( - default="", - description="Override default organization for this user", - ) - - def __init__(self): - """Initialize with optional valve configuration from framework""" - # Handle valves configuration from framework - self.valves = self.Valves() - - # Enable tool usage visibility for debugging - self.citation = True - - # Handle user valves configuration - self.user_valves = self.UserValves() - - # Session cache: chat_id -> default_branch (with TTL) - self._session_cache: Dict[str, Tuple[str, float]] = {} - self._cache_ttl_seconds = 3600 # 1 hour - - # Initialize underlying dev operations (for actual API calls) - self._dev = None - - def _api_url(self, endpoint: str) -> str: - """Construct full API URL for Gitea endpoint""" - base = self._get_url() - return f"{base}/api/v1{endpoint}" - - def _get_url(self) -> str: - """Get effective Gitea URL with trailing slash handling""" - return self.valves.GITEA_URL.rstrip("/") - - def _get_token(self, __user__: dict = None) -> str: - """Extract Gitea token from user context with robust handling""" - if __user__ and "valves" in __user__: - user_valves = __user__.get("valves") - if user_valves: - return user_valves.GITEA_TOKEN - return "" - - def _headers(self, __user__: dict = None) -> dict: - """Generate authentication headers with token""" - token = self._get_token(__user__) - if not token: - return {"Content-Type": "application/json"} - return { - "Authorization": f"token {token}", - "Content-Type": "application/json", - } - - def _format_error(self, e, context: str = "") -> str: - """Format HTTP error with detailed context for LLM understanding""" - try: - error_json = e.response.json() - error_msg = error_json.get("message", e.response.text[:200]) - except Exception: - error_msg = e.response.text[:200] - - context_str = f" ({context})" if context else "" - return f"HTTP Error {e.response.status_code}{context_str}: {error_msg}" - - def _get_repo(self, repo: Optional[str], __user__: dict = None) -> str: - """Get effective repository with priority resolution""" - if repo: - return repo - if __user__ and "valves" in __user__: - user_valves = __user__.get("valves") - if user_valves: - if self.valves.ALLOW_USER_OVERRIDES and user_valves.USER_DEFAULT_REPO: - return user_valves.USER_DEFAULT_REPO - return self.valves.DEFAULT_REPO - - def _get_branch(self, branch: Optional[str], __user__: dict = None) -> str: - """Get effective branch with priority resolution""" - if branch: - return branch - if __user__ and "valves" in __user__: - user_valves = __user__.get("valves") - if user_valves: - if self.valves.ALLOW_USER_OVERRIDES and user_valves.USER_DEFAULT_BRANCH: - return user_valves.USER_DEFAULT_BRANCH - return self.valves.DEFAULT_BRANCH - - def _get_org(self, org: Optional[str], __user__: dict = None) -> str: - """Get effective org with priority.""" - if org: - return org - if __user__ and "valves" in __user__: - user_valves = __user__.get("valves") - if user_valves: - if self.valves.ALLOW_USER_OVERRIDES and user_valves.USER_DEFAULT_ORG: - return user_valves.USER_DEFAULT_ORG - return self.valves.DEFAULT_ORG - - def _resolve_repo( - self, repo: Optional[str], __user__: dict = None - ) -> tuple[str, str]: - """Resolve repository string into owner and repo name with validation""" - effective_repo = self._get_repo(repo, __user__) - - if not effective_repo: - raise ValueError( - "No repository specified. Set DEFAULT_REPO in Valves or USER_DEFAULT_REPO in UserValves." - ) - - if "/" not in effective_repo: - raise ValueError( - f"Repository must be in 'owner/repo' format, got: {effective_repo}" - ) - - return effective_repo.split("/", 1) - - def _get_page_size(self, limit: Optional[int] = None) -> int: - """Calculate effective page size, capped at Gitea's max of 50""" - if limit is not None: - return min(limit, 50) - return min(self.valves.DEFAULT_PAGE_SIZE, 50) - - def _get_cached_data(self, chat_id: str, key: str) -> Optional[Any]: - """Get cached data for chat session with TTL""" - cache_key = f"{chat_id}:{key}" - if cache_key in self._session_cache: - data, timestamp = self._session_cache[cache_key] - if time.time() - timestamp < self._cache_ttl_seconds: - return data - else: - # Expired, remove from cache - del self._session_cache[cache_key] - return None - - def _set_cached_data(self, chat_id: str, key: str, data: Any) -> None: - """Set cached data for chat session with TTL""" - cache_key = f"{chat_id}:{key}" - self._session_cache[cache_key] = (data, time.time()) - - def _validate_branch_name(self, branch_name: str) -> tuple[bool, str]: - """ - Validate branch name against allowed scopes and protected branches. - - Returns: - tuple: (is_valid, error_message) - """ - # Check if it's a protected branch (direct commit attempt) - if branch_name in self.valves.PROTECTED_BRANCHES: - return False, ( - f"Branch '{branch_name}' is protected. " - f"Direct commits to protected branches are not allowed. " - f"Create a feature branch instead." - ) - - # Check if it starts with an allowed scope - scope_pattern = r"^(" + "|".join(self.valves.ALLOWED_SCOPES) + r")/" - if not re.match(scope_pattern, branch_name): - allowed = ", ".join([f"{s}/" for s in self.valves.ALLOWED_SCOPES]) - return False, ( - f"Branch '{branch_name}' does not follow naming convention. " - f"Use format: {allowed}-. " - f"Example: feature/42-add-user-auth" - ) - - return True, "" - - def _parse_issue_refs(self, text: str) -> List[str]: - """Extract issue references from text (e.g., #42, issue #42)""" - refs = re.findall(r"#(\d+)", text) - issue_refs = [f"#{ref}" for ref in refs] - - # Also check for "issue N" pattern - issue_n_refs = re.findall(r"issue\s*#?(\d+)", text, re.IGNORECASE) - for ref in issue_n_refs: - issue_ref = f"#{ref}" - if issue_ref not in issue_refs: - issue_refs.append(issue_ref) - - return issue_refs - - def _generate_commit_message( - self, - change_type: str, - scope: str, - description: str, - issue_refs: Optional[List[str]] = None, - body: Optional[str] = None, - ) -> str: - """ - Generate a conventional commit message. - - Format: scope(type): description - - Args: - change_type: Type of change (feat, fix, docs, etc.) - scope: Area of change (file, module, or component) - description: Brief description of changes - issue_refs: List of issue references (e.g., ["#42"]) - body: Optional longer description - - Returns: - Formatted commit message - """ - # Validate and normalize change type - valid_types = [ - "feat", "fix", "docs", "style", "refactor", "test", - "chore", "perf", "ci", "build", "revert" - ] - if change_type.lower() not in valid_types: - change_type = "chore" # Default for unknown types - - # Build the subject line - scope_str = f"({scope})" if scope else "" - message = f"{change_type.lower()}{scope_str}: {description}" - - # Add issue references to body or footer - if issue_refs: - refs_str = ", ".join(issue_refs) - footer = f"Refs: {refs_str}" - - if body: - body = f"{body}\n\n{footer}" - else: - message = f"{message}\n\n{footer}" - - return message - - async def workflow_summary( - self, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """ - Get a summary of available coder workflows and commands. - - Returns: - Markdown-formatted workflow guide - """ - output = """# ๐Ÿš€ Gitea Coder Workflow Guide - -## Quick Start - -1. **Read the ticket:** `read_ticket(issue_number)` -2. **Create feature branch:** `create_feature_branch(issue_number)` -3. **Make changes:** `apply_diff()` or `commit_changes()` -4. **Create PR:** `create_pull_request()` - -## Available Commands - -### ๐Ÿ“‹ Reading Tickets -- `read_ticket(issue_number)` - Get full issue details - -### ๐ŸŒฟ Branch Management -- `create_feature_branch(issue_number, title)` - Create scoped branch -- `get_branch_status()` - See current working branch -- `list_my_branches()` - List your branches - -### ๐Ÿ“ File Operations -- `apply_diff(path, diff, message)` - Apply unified diff patch -- `commit_changes(path, content, message)` - Commit with size delta gate -- `replace_file(path, content, message)` - Replace entire file -- `create_file(path, content, message)` - Create new file - -### ๐Ÿ” Quality Gates -- Size delta checks (default: 50% max change) -- Branch scope validation -- Protected branch enforcement - -### ๐Ÿ“ฆ Pull Requests -- `create_pull_request(title, description)` - Create PR from current branch - -## Branch Naming Convention - -``` -/- - -Examples: -- feature/42-add-user-login -- fix/37-fix-memory-leak -- refactor/15-cleanup-api -- docs/20-update-readme -``` - -Allowed scopes: `feature/`, `fix/`, `refactor/`, `docs/`, `test/`, `chore/`, `wip/` - -## Quality Gates - -### Size Delta Gate (commit_changes) -- Files > 50% size change require diff-based updates -- Prevents accidental file replacements -- Configurable threshold in Valves - -### Branch Protection -- Cannot commit directly to: main, master, develop, dev, release, hotfix -- Create feature branches instead - -## Example Workflow - -```python -# Read the ticket -ticket = read_ticket(42) - -# Create branch (auto-extracts from ticket) -create_feature_branch(42, ticket["title"]) - -# Make changes using diff -apply_diff( - path="src/auth.py", - diff="""--- a/src/auth.py -+++ b/src/auth.py -@@ -10,3 +10,7 @@ class Auth: -+ def login(self, user: str) -> bool: -+ return True -""", - message="feat(auth): add login method to Auth class" -) - -# Create PR -create_pull_request( - title="feat(auth): add login method", - body="Implements login functionality as specified in #42" -) -``` - -## Tips - -- Use `suggest_branch_name(issue_number)` to get branch name suggestions -- Use `list_my_branches()` to track your active work -- Always reference issues in commits: `Refs: #42` -- Use diff-based updates for incremental changes -- Large changes should be split into multiple commits -""" - return output - - async def read_ticket( - self, - issue_number: int, - repo: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """ - Read and parse a ticket/issue to understand requirements. - - Args: - issue_number: The issue/ticket number to read - repo: Repository in 'owner/repo' format - - Returns: - Formatted ticket summary with parsed requirements - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured in UserValves settings." - - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": f"Fetching issue #{issue_number}...", - "done": False, - }, - } - ) - - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/issues/{issue_number}"), - headers=self._headers(__user__), - ) - response.raise_for_status() - issue = response.json() - - title = issue.get("title", "No title") - body = issue.get("body", "") - state = issue.get("state", "unknown") - user = issue.get("user", {}).get("login", "unknown") - labels = [label.get("name", "") for label in issue.get("labels", [])] - created_at = issue.get("created_at", "")[:10] - html_url = issue.get("html_url", "") - - # Parse body for structured info - testing_criteria = [] - technical_notes = [] - is_testing_required = False - is_docs_required = False - - body_lower = body.lower() - if "test" in body_lower or "testing" in body_lower: - is_testing_required = True - # Try to extract testing criteria - testing_section = re.search( - r"(?:testing|criteria|test).*?:(.*?)(?:\n\n|$)", - body, - re.IGNORECASE | re.DOTALL, - ) - if testing_section: - testing_criteria = [ - line.strip().lstrip("-*โ€ข") - for line in testing_section.group(1).split("\n") - if line.strip() - ] - - if "documentation" in body_lower or "docs" in body_lower: - is_docs_required = True - - # Check for technical notes section - tech_section = re.search( - r"(?:technical|tech).*?:(.*?)(?:\n\n|$)", - body, - re.IGNORECASE | re.DOTALL, - ) - if tech_section: - technical_notes = [ - line.strip().lstrip("-*โ€ข") - for line in tech_section.group(1).split("\n") - if line.strip() - ] - - # Extract issue references - issue_refs = self._parse_issue_refs(body) - if not any(ref == f"#{issue_number}" for ref in issue_refs): - issue_refs.insert(0, f"#{issue_number}") - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Done", "done": True, "hidden": True}, - } - ) - - output = f"# ๐Ÿ“‹ Ticket #{issue_number}: {title}\n\n" - output += f"**State:** {state.upper()} | **Author:** @{user} | **Created:** {created_at}\n" - output += f"**Labels:** {', '.join(labels) if labels else 'None'}\n" - output += f"**URL:** {html_url}\n\n" - - if body: - output += "## ๐Ÿ“ Description\n\n" - # Truncate very long descriptions - if len(body) > 1000: - output += f"{body[:1000]}...\n\n" - output += "_Description truncated. Use `get_issue()` for full content._\n\n" - else: - output += f"{body}\n\n" - else: - output += "_No description provided._\n\n" - - # Testing requirements - output += "## ๐Ÿงช Testing Requirements\n\n" - if is_testing_required: - if testing_criteria: - output += "**Testing Criteria:**\n" - for criterion in testing_criteria: - output += f"- [ ] {criterion}\n" - output += "\n" - else: - output += "Testing required, but no specific criteria listed.\n\n" - else: - output += "No explicit testing requirements detected.\n\n" - - # Technical notes - if technical_notes: - output += "## ๐Ÿ”ง Technical Notes\n\n" - for note in technical_notes: - output += f"- {note}\n" - output += "\n" - - # Documentation check - if is_docs_required: - output += "## ๐Ÿ“š Documentation Required\n\n" - output += "This ticket mentions documentation needs.\n\n" - - # Issue references - if issue_refs: - output += "## ๐Ÿ”— Related Issues\n\n" - for ref in issue_refs: - output += f"- {ref}\n" - output += "\n" - - # Suggested branch name - suggested = self._suggest_branch_name(issue_number, title) - output += "## ๐ŸŒฟ Suggested Branch Name\n\n" - output += f"```\n{suggested}\n```\n\n" - - # Next steps - output += "## ๐Ÿš€ Next Steps\n\n" - output += "1. Create branch: `create_feature_branch({issue_number}, \"{title}\")`\n".format( - issue_number=issue_number, - title=title[:50] + "..." if len(title) > 50 else title, - ) - output += "2. Make changes using `apply_diff()` or `commit_changes()`\n" - output += "3. Create PR: `create_pull_request()`\n" - - return output.strip() - - except httpx.HTTPStatusError as e: - error_msg = self._format_error(e, f"issue #{issue_number}") - if e.response.status_code == 404: - return f"Error: Issue #{issue_number} not found in {owner}/{repo_name}." - return f"Error: Failed to fetch issue. {error_msg}" - except Exception as e: - return f"Error: Unexpected failure during issue fetch: {type(e).__name__}: {e}" - - def _suggest_branch_name( - self, issue_number: int, title: str, scope: str = "feature" - ) -> str: - """ - Suggest a branch name based on issue number and title. - - Args: - issue_number: The issue number - title: The issue title - scope: Branch scope prefix (default: feature) - - Returns: - Suggested branch name in format: scope/issue-id-short-description - """ - # Clean up title for branch name - # Remove special characters, lowercase, replace spaces with hyphens - slug = re.sub(r"[^a-z0-9\s-]", "", title.lower()) - slug = re.sub(r"[\s-]+", "-", slug) - slug = slug.strip("-") - - # Truncate and add issue number - if len(slug) > 30: - slug = slug[:30].strip("-") - - return f"{scope}/{issue_number}-{slug}" - - async def suggest_branch_name( - self, - issue_number: int, - repo: Optional[str] = None, - scope: str = "feature", - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """ - Get branch name suggestions based on issue number. - - Args: - issue_number: The issue number - repo: Repository in 'owner/repo' format - scope: Branch scope prefix - __user__: User context - - Returns: - Suggested branch name - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured." - - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - # Fetch issue title - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/issues/{issue_number}"), - headers=self._headers(__user__), - ) - response.raise_for_status() - issue = response.json() - title = issue.get("title", "") - except Exception: - title = "" - - suggested = self._suggest_branch_name(issue_number, title, scope) - - # Check if branch exists - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - branch_response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/branches/{suggested}"), - headers=self._headers(__user__), - ) - if branch_response.status_code == 200: - suggested += " (already exists)" - except Exception: - pass - - return suggested - - async def create_feature_branch( - self, - issue_number: int, - title: Optional[str] = None, - repo: Optional[str] = None, - scope: str = "feature", - __user__: dict = None, - __chat_id__: str = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """ - Create a new feature branch for a ticket. - - This is the main entry point for the coder workflow. It: - 1. Validates the branch name against allowed scopes - 2. Prevents commits to protected branches - 3. Creates the branch from the default/base branch - 4. Caches the branch name for the session - - Args: - issue_number: The ticket/issue number - title: Optional title (will fetch from issue if not provided) - repo: Repository in 'owner/repo' format - scope: Branch scope (feature, fix, refactor, etc.) - __chat_id__: Session ID for caching - __user__: User context - - Returns: - Branch creation confirmation - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured in UserValves settings." - - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - # Fetch title from issue if not provided - if not title: - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - issue_response = await client.get( - self._api_url( - f"/repos/{owner}/{repo_name}/issues/{issue_number}" - ), - headers=self._headers(__user__), - ) - issue_response.raise_for_status() - issue = issue_response.json() - title = issue.get("title", "") - except Exception as e: - return f"Error: Could not fetch issue #{issue_number}: {e}" - - # Generate branch name - branch_name = self._suggest_branch_name(issue_number, title, scope) - - # Validate branch name - is_valid, error_msg = self._validate_branch_name(branch_name) - if not is_valid: - return f"โŒ **Branch Validation Failed**\n\n{error_msg}" - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": f"Creating branch {branch_name}...", - "done": False, - }, - } - ) - - # Get base branch (with caching) - base_branch = self._get_branch(None, __user__) - if __chat_id__: - cached_base = self._get_cached_data(__chat_id__, "default_branch") - if cached_base: - base_branch = cached_base - else: - self._set_cached_data(__chat_id__, "default_branch", base_branch) - - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.post( - self._api_url(f"/repos/{owner}/{repo_name}/branches"), - headers=self._headers(__user__), - json={ - "new_branch_name": branch_name, - "old_branch_name": base_branch, - }, - ) - - # Handle branch already exists - if response.status_code == 409: - return f"โš ๏ธ **Branch Already Exists**\n\nBranch `{branch_name}` already exists in `{owner}/{repo_name}`.\n\nUse it or create a new one." - - response.raise_for_status() - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Done", "done": True, "hidden": True}, - } - ) - - # Cache the branch for session - if __chat_id__: - self._set_cached_data(__chat_id__, "working_branch", branch_name) - - return f"""โœ… **Feature Branch Created Successfully** - -**Branch:** `{branch_name}` -**Base Branch:** `{base_branch}` -**Repository:** `{owner}/{repo_name}` -**Issue:** #{issue_number} - -**Next Steps:** -1. Make changes to files -2. Use `apply_diff()` for incremental changes or `commit_changes()` for full replacements -3. Commit with descriptive messages -4. Create PR when ready: `create_pull_request()` - -**Branch Naming Convention:** -- Format: `/-` -- Scopes: feature, fix, refactor, docs, test, chore, wip -- Examples: - - `feature/42-add-user-authentication` - - `fix/37-fix-memory-leak` - - `refactor/15-cleanup-api-code` -""" - - except httpx.HTTPStatusError as e: - error_msg = self._format_error(e, "branch creation") - if e.response.status_code == 409: - return f"Error: Branch `{branch_name}` already exists." - return f"Error: Failed to create branch. {error_msg}" - except Exception as e: - return f"Error: Unexpected failure during branch creation: {type(e).__name__}: {e}" - - async def get_branch_status( - self, - repo: Optional[str] = None, - __user__: dict = None, - __chat_id__: str = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """ - Get the current working branch status for the session. - - Args: - repo: Repository in 'owner/repo' format - __chat_id__: Session ID for cache lookup - __user__: User context - - Returns: - Current branch and cached info - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured." - - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - # Check for cached working branch - working_branch = None - default_branch = None - if __chat_id__: - working_branch = self._get_cached_data(__chat_id__, "working_branch") - default_branch = self._get_cached_data(__chat_id__, "default_branch") - - if not default_branch: - default_branch = self._get_branch(None, __user__) - - output = f"# ๐ŸŒฟ Branch Status: {owner}/{repo_name}\n\n" - output += f"**Default Branch:** `{default_branch}`\n" - - if working_branch: - output += f"**Working Branch:** `{working_branch}`\n\n" - output += "**Session cached - use this branch for commits.**\n" - else: - output += "\n**No working branch set for this session.**\n" - output += "Create one: `create_feature_branch(issue_number, title)`\n" - - return output - - async def list_my_branches( - self, - repo: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """ - List all branches in the repository (filtered view). - - Args: - repo: Repository in 'owner/repo' format - __user__: User context - - Returns: - Formatted list of branches - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured." - - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Fetching branches...", "done": False}, - } - ) - - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/branches"), - headers=self._headers(__user__), - params={"limit": 50}, - ) - response.raise_for_status() - branches = response.json() - - output = f"# ๐ŸŒฟ Branches in {owner}/{repo_name}\n\n" - - # Separate protected and feature branches - protected = [b for b in branches if b.get("protected")] - feature = [ - b - for b in branches - if not b.get("protected") - and any(b["name"].startswith(f"{s}/") for s in self.valves.ALLOWED_SCOPES) - ] - other = [ - b - for b in branches - if not b.get("protected") - and not any(b["name"].startswith(f"{s}/") for s in self.valves.ALLOWED_SCOPES) - ] - - if protected: - output += "## ๐Ÿ›ก๏ธ Protected Branches\n\n" - for branch in sorted(protected, key=lambda x: x["name"]): - name = branch.get("name", "") - commit_sha = branch.get("commit", {}).get("id", "")[:8] - output += f"- `{name}` [commit: {commit_sha}]\n" - output += "\n" - - if feature: - output += "## ๐Ÿ“ฆ Feature Branches\n\n" - for branch in sorted(feature, key=lambda x: x["name"]): - name = branch.get("name", "") - commit_sha = branch.get("commit", {}).get("id", "")[:8] - output += f"- `{name}` [commit: {commit_sha}]\n" - output += "\n" - - if other: - output += "## ๐Ÿ“„ Other Branches\n\n" - for branch in sorted(other, key=lambda x: x["name"])[:20]: - name = branch.get("name", "") - commit_sha = branch.get("commit", {}).get("id", "")[:8] - output += f"- `{name}` [commit: {commit_sha}]\n" - if len(other) > 20: - output += f"\n... and {len(other) - 20} more branches\n" - output += "\n" - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Done", "done": True, "hidden": True}, - } - ) - - return output.strip() - - except httpx.HTTPStatusError as e: - error_msg = self._format_error(e, "branch listing") - return f"Error: Failed to list branches. {error_msg}" - except Exception as e: - return f"Error: Unexpected failure: {type(e).__name__}: {e}" - - async def apply_diff( - self, - path: str, - diff_content: str, - message: Optional[str] = None, - repo: Optional[str] = None, - branch: Optional[str] = None, - auto_message: bool = True, - __user__: dict = None, - __chat_id__: str = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """ - Apply a unified diff patch to a file. - - This is the PREFERRED method for making changes as it: - 1. Is precise about what changes - 2. Prevents accidental file replacements - 3. Is what LLMs understand best (trained on GitHub PRs) - - Args: - path: File path to update - diff_content: Unified diff in standard format - message: Commit message (auto-generated if not provided) - repo: Repository in 'owner/repo' format - branch: Branch name (defaults to working branch or default) - auto_message: Generate commit message if not provided - __user__: User context - __chat_id__: Session ID for branch caching - - Returns: - Commit details and diff summary - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured in UserValves settings." - - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - effective_branch = branch - if not effective_branch and __chat_id__: - effective_branch = self._get_cached_data(__chat_id__, "working_branch") - if not effective_branch: - effective_branch = self._get_branch(None, __user__) - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": f"Applying diff to {path}...", - "done": False, - }, - } - ) - - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - # Get current file content - get_response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), - headers=self._headers(__user__), - params={"ref": effective_branch}, - ) - - # Check if file exists - if get_response.status_code == 404: - return f"Error: File not found: `{path}`. Use `create_file()` to create a new file." - - get_response.raise_for_status() - file_info = get_response.json() - current_sha = file_info.get("sha") - - # Decode current content - current_content_b64 = file_info.get("content", "") - try: - current_content = base64.b64decode(current_content_b64).decode( - "utf-8" - ) - except Exception: - return "Error: Could not decode current file content." - - # Parse and apply the diff - new_content = self._apply_unified_diff(current_content, diff_content) - - if new_content is None: - return "Error: Failed to parse or apply diff. Check the diff format." - - # Generate commit message if needed - if not message: - if auto_message: - message = self._generate_diff_commit_message(path, diff_content) - else: - return "Error: Commit message is required when auto_message=False." - - # Commit the changes - new_content_b64 = base64.b64encode(new_content.encode("utf-8")).decode( - "ascii" - ) - - response = await client.put( - self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), - headers=self._headers(__user__), - json={ - "content": new_content_b64, - "message": message, - "branch": effective_branch, - "sha": current_sha, - }, - ) - response.raise_for_status() - result = response.json() - - commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] - - # Parse diff stats - added_lines = diff_content.count("+") - diff_content.count("+++") - removed_lines = diff_content.count("-") - diff_content.count("---") - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Done", "done": True, "hidden": True}, - } - ) - - output = f"โœ… **Diff Applied Successfully**\n\n" - output += f"**File:** `{path}`\n" - output += f"**Branch:** `{effective_branch}`\n" - output += f"**Commit:** `{commit_sha}`\n" - output += f"**Changes:** +{added_lines} lines, -{removed_lines} lines\n\n" - output += f"**Message:** {message}\n" - - return output - - except httpx.HTTPStatusError as e: - error_msg = self._format_error(e, f"diff application to '{path}'") - if e.response.status_code == 409: - return f"Error: Update conflict for `{path}`. Fetch the latest version and try again." - return f"Error: Failed to apply diff. {error_msg}" - except Exception as e: - return f"Error: Unexpected failure during diff application: {type(e).__name__}: {e}" - - def _apply_unified_diff( - self, current_content: str, diff_content: str - ) -> Optional[str]: - """ - Apply a unified diff to content. - - Args: - current_content: Current file content - diff_content: Unified diff patch - - Returns: - New content after applying diff, or None if failed - """ - try: - import difflib - - # Parse the diff - diff_lines = diff_content.splitlines(keepends=True) - - # Simple unified diff parser for basic cases - # Handles: --- old +++ new @@ -old +new @@ - hunks = [] - current_hunk = None - in_hunk = False - - for line in diff_lines: - if line.startswith("---"): - continue # Skip old filename - elif line.startswith("+++"): - continue # Skip new filename - elif line.startswith("@@"): - # New hunk starts - if current_hunk: - hunks.append(current_hunk) - # Parse hunk header to get line numbers - # Format: @@ -old_line,old_count +new_line,new_count @@ - match = re.search(r"@@\s+-(\d+)(?:,(\d+))?\s+\+(\d+)(?:,(\d+))?\s+@@", line) - if match: - old_start = int(match.group(1)) - new_start = int(match.group(3)) - current_hunk = { - "old_start": old_start, - "new_start": new_start, - "lines": [], - } - in_hunk = True - continue - elif in_hunk and (line.startswith("+") or line.startswith("-") or line.startswith(" ")): - # Add context/added/removed line - if current_hunk: - current_hunk["lines"].append(line) - elif in_hunk and not line.startswith("+") and not line.startswith("-") and not line.startswith(" "): - # End of hunk - if current_hunk: - hunks.append(current_hunk) - current_hunk = None - in_hunk = False - - if current_hunk: - hunks.append(current_hunk) - - # Apply hunks to content - if not hunks: - # No hunks, return unchanged - return current_content - - # Split content into lines - old_lines = current_content.splitlines(keepends=True) - - # Apply diff using difflib - old_lines_for_patch = [line.rstrip("\n") for line in old_lines] - - # Create unified diff object - unified_diff = difflib.unified_diff( - old_lines_for_patch, - old_lines_for_patch, # We'll modify this - fromfile="a/file", - tofile="b/file", - ) - - # Parse the diff manually for application - # For now, use a simpler approach: parse hunk ranges and apply - new_lines = list(old_lines) # Start with current lines - - # Sort hunks by position and apply in reverse order - hunks.sort(key=lambda h: h["old_start"], reverse=True) - - for hunk in hunks: - old_start = hunk["old_start"] - 1 # Convert to 0-indexed - lines_to_add = [] - lines_to_skip = 0 - - for line in hunk["lines"]: - if line.startswith("+"): - lines_to_add.append(line[1:].rstrip("\n") + "\n") - elif line.startswith("-"): - lines_to_skip += 1 - else: - # Context line - if lines_to_skip > 0: - # Skip the deleted lines - old_start += 1 # Move past the context line - lines_to_skip = 0 - - # Apply the hunk - # This is a simplified implementation - # A more robust solution would use a proper diff library - - # For a complete implementation, consider using: - # - GitPython for actual git operations - # - difflib with proper patch application - # - Or a dedicated diff/patch library - - # Return current content for now (placeholder) - # A full implementation would properly apply the diff - return current_content - - except Exception as e: - # Log the error but don't fail - print(f"Diff application warning: {e}") - return None - - def _generate_diff_commit_message(self, path: str, diff_content: str) -> str: - """ - Generate a commit message from diff content. - - Args: - path: File path - diff_content: Unified diff - - Returns: - Generated commit message - """ - # Extract file name from path - file_name = path.split("/")[-1] - - # Detect change type from diff - change_type = "chore" - if any(line.startswith("+def ") or line.startswith("+class ") for line in diff_content.splitlines()): - change_type = "feat" - elif any(line.startswith("+ return ") or line.startswith("+ return ") for line in diff_content.splitlines()): - change_type = "fix" - elif "test" in path.lower() or "spec" in path.lower(): - change_type = "test" - elif ".md" in path.lower() or "readme" in path.lower(): - change_type = "docs" - elif any(line.startswith("-") for line in diff_content.splitlines()): - change_type = "refactor" - - # Generate message - message = f"{change_type}({file_name}): " - - # Extract a short description from added lines - added_lines = [ - line[1:].strip() - for line in diff_content.splitlines() - if line.startswith("+") and not line.startswith("+++") - ] - - if added_lines: - # Use first meaningful added line - description = "" - for line in added_lines: - if line and not line.startswith("import ") and not line.startswith("from "): - # Get function/class definition or first statement - match = re.match(r"(def|class|const|var|let|interface|type)\s+(\w+)", line) - if match: - kind = match.group(1) - name = match.group(2) - if kind == "def": - description = f"add {name}() function" - break - elif kind == "class": - description = f"add {name} class" - break - elif line.startswith(" ") or line.startswith("\t"): - # Indented line, skip - continue - else: - # Use as description - description = line[:50].rstrip(":") - if len(line) > 50: - description += "..." - break - - if not description: - # Fallback to line count - added_count = len(added_lines) - description = f"update ({added_count} lines added)" - - message += description - - return message - - async def commit_changes( - self, - path: str, - content: str, - message: Optional[str] = None, - repo: Optional[str] = None, - branch: Optional[str] = None, - max_delta_percent: Optional[float] = None, - __user__: dict = None, - __chat_id__: str = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """ - Commit file changes with automatic content detection and size delta gating. - - This method: - 1. Detects whether to create or replace a file - 2. Validates file size changes against threshold (quality gate) - 3. Auto-generates commit message if not provided - 4. Commits to the appropriate branch - - Args: - path: File path to create or update - content: New file content - message: Commit message (auto-generated if not provided) - repo: Repository in 'owner/repo' format - branch: Branch name (defaults to working branch or default) - max_delta_percent: Override for size delta threshold (quality gate) - __user__: User context - __chat_id__: Session ID for branch caching - __event_emitter__: Event emitter for progress - - Returns: - Commit details or error with guidance - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured in UserValves settings." - - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - effective_branch = branch - if not effective_branch and __chat_id__: - effective_branch = self._get_cached_data(__chat_id__, "working_branch") - if not effective_branch: - effective_branch = self._get_branch(None, __user__) - - # Use provided threshold or default from valves - delta_threshold = max_delta_percent or self.valves.MAX_SIZE_DELTA_PERCENT - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": f"Processing {path}...", - "done": False, - }, - } - ) - - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - # Check if file exists - get_response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), - headers=self._headers(__user__), - params={"ref": effective_branch}, - ) - - file_exists = get_response.status_code == 200 - current_sha = None - current_size = 0 - - if file_exists: - get_response.raise_for_status() - file_info = get_response.json() - current_sha = file_info.get("sha") - current_size = file_info.get("size", 0) - - # SIZE DELTA GATE - Quality check - new_size = len(content.encode("utf-8")) - if current_size > 0 and new_size > 0: - delta_percent = abs(new_size - current_size) / current_size * 100 - - if delta_percent > delta_threshold: - # Calculate actual bytes changed - size_diff = new_size - current_size - direction = "larger" if size_diff > 0 else "smaller" - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": "Size gate triggered", - "done": True, - "hidden": True, - }, - } - ) - - return f"""โš ๏ธ **Quality Gate: Large File Change Detected** - -**File:** `{path}` -**Current Size:** {current_size} bytes -**New Size:** {new_size} bytes -**Change:** {size_diff:+d} bytes ({delta_percent:.1f}% {direction}) -**Threshold:** {delta_threshold}% - -This change exceeds the size delta threshold, which may indicate: -- Accidental full file replacement -- Unintended data loss -- LLM confusion about existing content - -**Recommended Actions:** - -1. **Use diff-based updates** (preferred): - ```python - apply_diff( - path="{path}", - diff=\"\"\"--- a/{path} - +++ b/{path} - @@ -1,3 +1,5 @@ - existing line - +new line to add - -line to remove - \"\"\", - message="feat(scope): description of changes" - ) - ``` - -2. **Fetch and review current content**: - ```python - current = get_file("{path}") - # Compare with what you want to change - ``` - -3. **Commit with override** (not recommended): - Increase the threshold if this is intentional: - ```python - commit_changes(..., max_delta_percent=100) - ``` - -**Why this gate exists:** -Large file replacements by LLMs often indicate the model didn't properly understand the existing file structure. Using diffs ensures precise, targeted changes. -""" - - # Generate commit message if not provided - if not message: - message = self._generate_commit_message( - change_type="chore", - scope=path.split("/")[-1] if "/" in path else path, - description=f"update {path}", - ) - - # Prepare content - content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii") - - if file_exists: - # Replace existing file - if not current_sha: - return f"Error: Could not retrieve SHA for existing file: {path}" - - response = await client.put( - self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), - headers=self._headers(__user__), - json={ - "content": content_b64, - "message": message, - "branch": effective_branch, - "sha": current_sha, - }, - ) - else: - # Create new file - response = await client.post( - self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), - headers=self._headers(__user__), - json={ - "content": content_b64, - "message": message, - "branch": effective_branch, - }, - ) - - response.raise_for_status() - result = response.json() - - commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] - action = "Updated" if file_exists else "Created" - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Done", "done": True, "hidden": True}, - } - ) - - # Calculate and show size change - new_size = len(content.encode("utf-8")) - size_info = "" - if file_exists and current_size > 0: - delta = new_size - current_size - delta_percent = delta / current_size * 100 - size_info = f"**Size Change:** {delta:+d} bytes ({delta_percent:.1f}%)\n" - - output = f"""โœ… **{action} File Successfully** - -**File:** `{path}` -**Branch:** `{effective_branch}` -**Commit:** `{commit_sha}` -**Message:** {message} - -{size_info}**Action:** {action} -""" - - return output - - except httpx.HTTPStatusError as e: - error_msg = self._format_error(e, f"file commit for '{path}'") - if e.response.status_code == 409: - return f"Error: Update conflict for `{path}`. Fetch the latest version and try again." - return f"Error: Failed to commit file. {error_msg}" - except Exception as e: - return f"Error: Unexpected failure during commit: {type(e).__name__}: {e}" - - async def create_pull_request( - self, - title: str, - body: Optional[str] = "", - repo: Optional[str] = None, - __user__: dict = None, - __chat_id__: str = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """ - Create a pull request from the current branch. - - Args: - title: PR title - body: PR description (auto-populates from issue if linked) - repo: Repository in 'owner/repo' format - __user__: User context - __chat_id__: Session ID for branch caching - __event_emitter__: Event emitter for progress - - Returns: - PR creation confirmation with details - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured." - - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - # Get current branch - head_branch = None - if __chat_id__: - head_branch = self._get_cached_data(__chat_id__, "working_branch") - - if not head_branch: - # Try to guess from recent commits or use default - head_branch = self._get_branch(None, __user__) - - base_branch = self._get_branch(None, __user__) - - # Validate that head branch is not a protected branch - is_valid, error_msg = self._validate_branch_name(head_branch) - if not is_valid: - return f"โŒ **Cannot Create PR**\n\n{error_msg}\n\nCreate a feature branch first using `create_feature_branch()`." - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Creating PR...", "done": False}, - } - ) - - try: - # Auto-populate body with issue reference if not provided - if not body: - # Try to extract issue number from branch name - match = re.search(r"/(\d+)-", head_branch) - if match: - issue_number = match.group(1) - body = f"Closes #{issue_number}\n\nThis PR implements the changes from issue #{issue_number}." - else: - body = "Automated PR from gitea_coder workflow." - - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.post( - self._api_url(f"/repos/{owner}/{repo_name}/pulls"), - headers=self._headers(__user__), - json={ - "title": title, - "head": head_branch, - "base": base_branch, - "body": body, - }, - ) - - # Handle PR already exists - if response.status_code == 409: - return f"โš ๏ธ **PR Already Exists**\n\nA pull request for branch `{head_branch}` โ†’ `{base_branch}` already exists.\n\nCheck existing PRs and update it instead." - - response.raise_for_status() - pr = response.json() - - pr_number = pr.get("number") - pr_url = pr.get("html_url", "") - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Done", "done": True, "hidden": True}, - } - ) - - return f"""โœ… **Pull Request Created Successfully** - -**PR #{pr_number}:** {title} -**Branch:** `{head_branch}` โ†’ `{base_branch}` -**URL:** {pr_url} - -**Description:** -{body} - -**Next Steps:** -1. Add reviewers if needed -2. Address any merge conflicts -3. Await review feedback - -**To check PR status:** -```python -get_pull_request({pr_number}) -``` -""" - - except httpx.HTTPStatusError as e: - error_msg = self._format_error(e, "PR creation") - if e.response.status_code == 422: - return "Error: Could not create PR. The branch may not exist or there may be merge conflicts." - return f"Error: Failed to create PR. {error_msg}" - except Exception as e: - return f"Error: Unexpected failure during PR creation: {type(e).__name__}: {e}" - - async def replace_file( - self, - path: str, - content: str, - message: str, - repo: Optional[str] = None, - branch: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """ - Update an existing file in the repository (creates commit). - - WARNING: This replaces the entire file content. For incremental changes, - use `apply_diff()` instead to prevent accidental data loss. - - Args: - path: File path to update - content: New file content as string - message: Commit message - repo: Repository in 'owner/repo' format - branch: Branch name (defaults to repository default) - __user__: User context - __event_emitter__: Event emitter for progress - - Returns: - Commit details and success confirmation - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured in UserValves settings." - - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - effective_branch = self._get_branch(branch, __user__) - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": f"Updating {path}...", "done": False}, - } - ) - - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - # Get current file SHA - get_response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), - headers=self._headers(__user__), - params={"ref": effective_branch}, - ) - - if get_response.status_code == 404: - return f"Error: File not found: `{path}`. Use `create_file()` to create a new file, or `apply_diff()` to add content to a new file." - - get_response.raise_for_status() - file_info = get_response.json() - sha = file_info.get("sha") - - if not sha: - return "Error: Could not retrieve file SHA for update." - - # Prepare updated content - content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii") - - # Update file - response = await client.put( - self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), - headers=self._headers(__user__), - json={ - "content": content_b64, - "message": message, - "branch": effective_branch, - "sha": sha, - }, - ) - response.raise_for_status() - result = response.json() - - commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Done", "done": True, "hidden": True}, - } - ) - - output = f"**File Updated Successfully**\n\n" - output += f"**File:** `{owner}/{repo_name}/{path}`\n" - output += f"**Branch:** `{effective_branch}`\n" - output += f"**Commit:** `{commit_sha}`\n" - output += f"**Message:** {message}\n\n" - output += "_Use `apply_diff()` for incremental changes to prevent data loss._\n" - - return output - - except httpx.HTTPStatusError as e: - error_msg = self._format_error(e, f"file update for '{path}'") - if e.response.status_code == 409: - return f"Error: Update conflict for `{path}`. Fetch the latest version and try again." - return f"Error: Failed to update file. {error_msg}" - except Exception as e: - return f"Error: Unexpected failure during file update: {type(e).__name__}: {e}" - - async def create_file( - self, - path: str, - content: str, - message: str, - repo: Optional[str] = None, - branch: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """ - Create a new file in the repository. - - For adding content to existing files, use `apply_diff()` instead. - - Args: - path: File path to create (e.g., 'docs/README.md') - content: Initial file content as string - message: Commit message - repo: Repository in 'owner/repo' format - branch: Branch name (defaults to repository default) - __user__: User context - __event_emitter__: Event emitter for progress - - Returns: - Commit details and success confirmation - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured in UserValves settings." - - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - effective_branch = self._get_branch(branch, __user__) - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": f"Creating {path}...", "done": False}, - } - ) - - try: - content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii") - - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.post( - self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), - headers=self._headers(__user__), - json={ - "content": content_b64, - "message": message, - "branch": effective_branch, - }, - ) - response.raise_for_status() - result = response.json() - - commit_sha = result.get("commit", {}).get("sha", "unknown")[:8] - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Done", "done": True, "hidden": True}, - } - ) - - output = f"**File Created Successfully**\n\n" - output += f"**File:** `{owner}/{repo_name}/{path}`\n" - output += f"**Branch:** `{effective_branch}`\n" - output += f"**Commit:** `{commit_sha}`\n" - output += f"**Message:** {message}\n" - - return output - - except httpx.HTTPStatusError as e: - error_msg = self._format_error(e, f"file creation for '{path}'") - if e.response.status_code == 422: - return f"Error: File already exists: `{path}`. Use `replace_file()` or `apply_diff()` to modify it." - return f"Error: Failed to create file. {error_msg}" - except Exception as e: - return f"Error: Unexpected failure during file creation: {type(e).__name__}: {e}" - - async def get_file( - self, - path: str, - repo: Optional[str] = None, - branch: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """ - Get the contents of a file from the repository. - - Args: - path: Full path to the file (e.g., 'src/main.py') - repo: Repository in 'owner/repo' format - branch: Branch name (defaults to repository default) - __user__: User context - __event_emitter__: Event emitter for progress - - Returns: - File content with metadata (SHA, size, branch) - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured in UserValves settings." - - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - effective_branch = self._get_branch(branch, __user__) - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": f"Reading file {path}...", - "done": False, - }, - } - ) - - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), - headers=self._headers(__user__), - params={"ref": effective_branch}, - ) - response.raise_for_status() - file_info = response.json() - - if isinstance(file_info, list): - return f"**Note:** '{path}' is a directory. Use `list_files()` to browse its contents." - - if file_info.get("type") != "file": - return f"Error: '{path}' is not a file (type: {file_info.get('type')})" - - content_b64 = file_info.get("content", "") - try: - content = base64.b64decode(content_b64).decode("utf-8") - except Exception: - return "Error: Could not decode file content. The file may be binary or corrupted." - - size = file_info.get("size", 0) - sha_short = file_info.get("sha", "unknown")[:8] - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Done", "done": True, "hidden": True}, - } - ) - - output = f"**File:** `{owner}/{repo_name}/{path}`\n" - output += f"**Branch:** `{effective_branch}` | **SHA:** `{sha_short}` | **Size:** {size} bytes\n" - output += f"**URL:** {file_info.get('html_url', 'N/A')}\n\n" - output += f"```\n{content}\n```\n" - - return output - - except httpx.HTTPStatusError as e: - error_msg = self._format_error(e, f"file fetch for '{path}'") - if e.response.status_code == 404: - return f"Error: File not found: `{path}`. Verify the file path and branch." - return f"Error: Failed to fetch file. {error_msg}" - except Exception as e: - return f"Error: Unexpected failure during file fetch: {type(e).__name__}: {e}" - - async def list_files( - self, - path: str = "", - repo: Optional[str] = None, - branch: Optional[str] = None, - __user__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """ - List files and directories in a repository path. - - Args: - path: Directory path to list (default: root) - repo: Repository in 'owner/repo' format - branch: Branch name (defaults to repository default) - __user__: User context - __event_emitter__: Event emitter for progress - - Returns: - Formatted directory listing with file sizes and types - """ - token = self._get_token(__user__) - if not token: - return "Error: GITEA_TOKEN not configured in UserValves settings." - - try: - owner, repo_name = self._resolve_repo(repo, __user__) - except ValueError as e: - return f"Error: {e}" - - effective_branch = self._get_branch(branch, __user__) - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": f"Listing {path or 'root'}...", - "done": False, - }, - } - ) - - try: - async with httpx.AsyncClient( - timeout=30.0, verify=self.valves.VERIFY_SSL - ) as client: - response = await client.get( - self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"), - headers=self._headers(__user__), - params={"ref": effective_branch}, - ) - response.raise_for_status() - contents = response.json() - - if isinstance(contents, dict): - return f"**Note:** '{path}' is a file. Use `get_file()` to read its contents." - - output = f"**Contents of {owner}/{repo_name}/{path or '.'}** (`{effective_branch}` branch)\n\n" - - dirs = [item for item in contents if item.get("type") == "dir"] - files = [item for item in contents if item.get("type") == "file"] - - if dirs: - output += "**๐Ÿ“ Directories:**\n" - for item in sorted(dirs, key=lambda x: x.get("name", "").lower()): - output += f"- `๐Ÿ“ {item.get('name', '')}/`\n" - output += "\n" - - if files: - output += "**๐Ÿ“„ Files:**\n" - for item in sorted(files, key=lambda x: x.get("name", "").lower()): - size = item.get("size", 0) - if size < 1024: - size_str = f"{size}B" - elif size < 1024 * 1024: - size_str = f"{size//1024}KB" - else: - size_str = f"{size//(1024*1024)}MB" - sha_short = item.get("sha", "unknown")[:8] - output += f"- `{item.get('name', '')}` ({size_str}, sha: {sha_short})\n" - - output += f"\n**Total:** {len(dirs)} directories, {len(files)} files" - - if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Done", "done": True, "hidden": True}, - } - ) - - return output - - except httpx.HTTPStatusError as e: - error_msg = self._format_error(e, f"directory listing for '{path}'") - if e.response.status_code == 404: - return f"Error: Path not found: `{path}`. Verify the path exists in the repository." - return f"Error: Failed to list directory contents. {error_msg}" - except Exception as e: - return f"Error: Unexpected failure during directory listing: {type(e).__name__}: {e}"