Files
tools/gitea/coder.py
xcaliber 20c86cd595 fix(gitea): restore full gitea_coder role implementation with all methods
The file was corrupted to only 4KB during previous edits. This restores the complete implementation with all methods including:
- workflow_summary()
- read_ticket()
- suggest_branch_name()
- create_feature_branch()
- get_branch_status()
- list_my_branches()
- apply_diff()
- _apply_unified_diff()
- _generate_diff_commit_message()
- commit_changes()
- create_pull_request()
- replace_file()
- create_file()
- get_file()
- list_files()

All with proper difflib import at module level.

Refs: #11
2026-01-17 15:02:13 +00:00

2115 lines
77 KiB
Python

"""
title: Gitea Coder - Workflow Role with Scope Enforcement
author: Jeff Smith + Claude + minimax
version: 1.0.0
license: MIT
description: High-level workflow role for LLM-based code generation with scope gating and quality gates
requirements: pydantic, httpx
changelog:
1.0.0:
- Initial implementation of gitea_coder role
- Branch creation with scope gating (prevents main pushes)
- Enforces branch naming conventions (feature/, fix/, refactor/, etc.)
- Generates detailed commit messages with ticket references
- Creates PRs from branches
- Reads ticket requirements from issues
- Unified file operations workflow
- Added diff-based updates with apply_diff()
- Added size delta gating in commit_changes() for quality control
"""
from typing import Optional, Callable, Any, Dict, List, Tuple
from pydantic import BaseModel, Field
import re
import time
import base64
import difflib
import httpx
class Tools:
"""
Gitea Coder Role - High-level workflow automation for code generation tasks.
This role implements the coder workflow:
reads ticket → understands issue → creates/modifies branch → commits with detailed messages
Key Features:
- Branch scope gating (prevents main/master pushes)
- Enforces branch naming conventions
- Auto-generates conventional commit messages
- Quality gates for file changes (size delta validation)
- Diff-based updates to prevent accidental file replacements
- Session caching for chat_id → default_branch mapping
"""
class Valves(BaseModel):
"""System-wide configuration for Gitea Coder integration"""
GITEA_URL: str = Field(
default="https://gitea.example.com",
description="Gitea server URL (ingress or internal service)",
)
DEFAULT_REPO: str = Field(
default="",
description="Default repository in owner/repo format",
)
DEFAULT_BRANCH: str = Field(
default="main",
description="Default branch name for operations",
)
DEFAULT_ORG: str = Field(
default="",
description="Default organization for org-scoped operations",
)
ALLOW_USER_OVERRIDES: bool = Field(
default=True,
description="Allow users to override defaults via UserValves",
)
VERIFY_SSL: bool = Field(
default=True,
description="Verify SSL certificates (disable for self-signed certs)",
)
DEFAULT_PAGE_SIZE: int = Field(
default=50,
description="Default page size for list operations (max 50)",
ge=1,
le=50,
)
# Coder-specific settings
MAX_SIZE_DELTA_PERCENT: float = Field(
default=50.0,
description="Maximum allowed file size change percentage (quality gate)",
ge=1.0,
le=500.0,
)
PROTECTED_BRANCHES: List[str] = Field(
default=["main", "master", "develop", "dev", "release", "hotfix"],
description="Branches that cannot be committed to directly",
)
ALLOWED_SCOPES: List[str] = Field(
default=["feature", "fix", "refactor", "docs", "test", "chore", "wip"],
description="Allowed branch scope prefixes",
)
class UserValves(BaseModel):
"""Per-user configuration for personal credentials and overrides"""
GITEA_TOKEN: str = Field(
default="",
description="Your Gitea API token",
)
USER_DEFAULT_REPO: str = Field(
default="",
description="Override default repository for this user",
)
USER_DEFAULT_BRANCH: str = Field(
default="",
description="Override default branch for this user",
)
USER_DEFAULT_ORG: str = Field(
default="",
description="Override default organization for this user",
)
def __init__(self):
"""Initialize with optional valve configuration from framework"""
# Handle valves configuration from framework
self.valves = self.Valves()
# Enable tool usage visibility for debugging
self.citation = True
# Handle user valves configuration
self.user_valves = self.UserValves()
# Session cache: chat_id → default_branch (with TTL)
self._session_cache: Dict[str, Tuple[str, float]] = {}
self._cache_ttl_seconds = 3600 # 1 hour
# Initialize underlying dev operations (for actual API calls)
self._dev = None
def _api_url(self, endpoint: str) -> str:
"""Construct full API URL for Gitea endpoint"""
base = self._get_url()
return f"{base}/api/v1{endpoint}"
def _get_url(self) -> str:
"""Get effective Gitea URL with trailing slash handling"""
return self.valves.GITEA_URL.rstrip("/")
def _get_token(self, __user__: dict = None) -> str:
"""Extract Gitea token from user context with robust handling"""
if __user__ and "valves" in __user__:
user_valves = __user__.get("valves")
if user_valves:
return user_valves.GITEA_TOKEN
return ""
def _headers(self, __user__: dict = None) -> dict:
"""Generate authentication headers with token"""
token = self._get_token(__user__)
if not token:
return {"Content-Type": "application/json"}
return {
"Authorization": f"token {token}",
"Content-Type": "application/json",
}
def _format_error(self, e, context: str = "") -> str:
"""Format HTTP error with detailed context for LLM understanding"""
try:
error_json = e.response.json()
error_msg = error_json.get("message", e.response.text[:200])
except Exception:
error_msg = e.response.text[:200]
context_str = f" ({context})" if context else ""
return f"HTTP Error {e.response.status_code}{context_str}: {error_msg}"
def _get_repo(self, repo: Optional[str], __user__: dict = None) -> str:
"""Get effective repository with priority resolution"""
if repo:
return repo
if __user__ and "valves" in __user__:
user_valves = __user__.get("valves")
if user_valves:
if self.valves.ALLOW_USER_OVERRIDES and user_valves.USER_DEFAULT_REPO:
return user_valves.USER_DEFAULT_REPO
return self.valves.DEFAULT_REPO
def _get_branch(self, branch: Optional[str], __user__: dict = None) -> str:
"""Get effective branch with priority resolution"""
if branch:
return branch
if __user__ and "valves" in __user__:
user_valves = __user__.get("valves")
if user_valves:
if self.valves.ALLOW_USER_OVERRIDES and user_valves.USER_DEFAULT_BRANCH:
return user_valves.USER_DEFAULT_BRANCH
return self.valves.DEFAULT_BRANCH
def _get_org(self, org: Optional[str], __user__: dict = None) -> str:
"""Get effective org with priority."""
if org:
return org
if __user__ and "valves" in __user__:
user_valves = __user__.get("valves")
if user_valves:
if self.valves.ALLOW_USER_OVERRIDES and user_valves.USER_DEFAULT_ORG:
return user_valves.USER_DEFAULT_ORG
return self.valves.DEFAULT_ORG
def _resolve_repo(
self, repo: Optional[str], __user__: dict = None
) -> tuple[str, str]:
"""Resolve repository string into owner and repo name with validation"""
effective_repo = self._get_repo(repo, __user__)
if not effective_repo:
raise ValueError(
"No repository specified. Set DEFAULT_REPO in Valves or USER_DEFAULT_REPO in UserValves."
)
if "/" not in effective_repo:
raise ValueError(
f"Repository must be in 'owner/repo' format, got: {effective_repo}"
)
return effective_repo.split("/", 1)
def _get_page_size(self, limit: Optional[int] = None) -> int:
"""Calculate effective page size, capped at Gitea's max of 50"""
if limit is not None:
return min(limit, 50)
return min(self.valves.DEFAULT_PAGE_SIZE, 50)
def _get_cached_data(self, chat_id: str, key: str) -> Optional[Any]:
"""Get cached data for chat session with TTL"""
cache_key = f"{chat_id}:{key}"
if cache_key in self._session_cache:
data, timestamp = self._session_cache[cache_key]
if time.time() - timestamp < self._cache_ttl_seconds:
return data
else:
# Expired, remove from cache
del self._session_cache[cache_key]
return None
def _set_cached_data(self, chat_id: str, key: str, data: Any) -> None:
"""Set cached data for chat session with TTL"""
cache_key = f"{chat_id}:{key}"
self._session_cache[cache_key] = (data, time.time())
def _validate_branch_name(self, branch_name: str) -> tuple[bool, str]:
"""
Validate branch name against allowed scopes and protected branches.
Returns:
tuple: (is_valid, error_message)
"""
# Check if it's a protected branch (direct commit attempt)
if branch_name in self.valves.PROTECTED_BRANCHES:
return False, (
f"Branch '{branch_name}' is protected. "
f"Direct commits to protected branches are not allowed. "
f"Create a feature branch instead."
)
# Check if it starts with an allowed scope
scope_pattern = r"^(" + "|".join(self.valves.ALLOWED_SCOPES) + r")/"
if not re.match(scope_pattern, branch_name):
allowed = ", ".join([f"{s}/" for s in self.valves.ALLOWED_SCOPES])
return False, (
f"Branch '{branch_name}' does not follow naming convention. "
f"Use format: {allowed}<issue-id>-<description>. "
f"Example: feature/42-add-user-auth"
)
return True, ""
def _parse_issue_refs(self, text: str) -> List[str]:
"""Extract issue references from text (e.g., #42, issue #42)"""
refs = re.findall(r"#(\d+)", text)
issue_refs = [f"#{ref}" for ref in refs]
# Also check for "issue N" pattern
issue_n_refs = re.findall(r"issue\s*#?(\d+)", text, re.IGNORECASE)
for ref in issue_n_refs:
issue_ref = f"#{ref}"
if issue_ref not in issue_refs:
issue_refs.append(issue_ref)
return issue_refs
def _generate_commit_message(
self,
change_type: str,
scope: str,
description: str,
issue_refs: Optional[List[str]] = None,
body: Optional[str] = None,
) -> str:
"""
Generate a conventional commit message.
Format: scope(type): description
Args:
change_type: Type of change (feat, fix, docs, etc.)
scope: Area of change (file, module, or component)
description: Brief description of changes
issue_refs: List of issue references (e.g., ["#42"])
body: Optional longer description
Returns:
Formatted commit message
"""
# Validate and normalize change type
valid_types = [
"feat", "fix", "docs", "style", "refactor", "test",
"chore", "perf", "ci", "build", "revert"
]
if change_type.lower() not in valid_types:
change_type = "chore" # Default for unknown types
# Build the subject line
scope_str = f"({scope})" if scope else ""
message = f"{change_type.lower()}{scope_str}: {description}"
# Add issue references to body or footer
if issue_refs:
refs_str = ", ".join(issue_refs)
footer = f"Refs: {refs_str}"
if body:
body = f"{body}\n\n{footer}"
else:
message = f"{message}\n\n{footer}"
return message
async def workflow_summary(
self,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
Get a summary of available coder workflows and commands.
Returns:
Markdown-formatted workflow guide
"""
output = """# 🚀 Gitea Coder Workflow Guide
## Quick Start
1. **Read the ticket:** `read_ticket(issue_number)`
2. **Create feature branch:** `create_feature_branch(issue_number)`
3. **Make changes:** `apply_diff()` or `commit_changes()`
4. **Create PR:** `create_pull_request()`
## Available Commands
### 📋 Reading Tickets
- `read_ticket(issue_number)` - Get full issue details
### 🌿 Branch Management
- `create_feature_branch(issue_number, title)` - Create scoped branch
- `get_branch_status()` - See current working branch
- `list_my_branches()` - List your branches
### 📝 File Operations
- `apply_diff(path, diff, message)` - Apply unified diff patch
- `commit_changes(path, content, message)` - Commit with size delta gate
- `replace_file(path, content, message)` - Replace entire file
- `create_file(path, content, message)` - Create new file
### 🔍 Quality Gates
- Size delta checks (default: 50% max change)
- Branch scope validation
- Protected branch enforcement
### 📦 Pull Requests
- `create_pull_request(title, description)` - Create PR from current branch
## Branch Naming Convention
```
<scope>/<issue-id>-<short-description>
Examples:
- feature/42-add-user-login
- fix/37-fix-memory-leak
- refactor/15-cleanup-api
- docs/20-update-readme
```
Allowed scopes: `feature/`, `fix/`, `refactor/`, `docs/`, `test/`, `chore/`, `wip/`
## Quality Gates
### Size Delta Gate (commit_changes)
- Files > 50% size change require diff-based updates
- Prevents accidental file replacements
- Configurable threshold in Valves
### Branch Protection
- Cannot commit directly to: main, master, develop, dev, release, hotfix
- Create feature branches instead
## Example Workflow
```python
# Read the ticket
ticket = read_ticket(42)
# Create branch (auto-extracts from ticket)
create_feature_branch(42, ticket["title"])
# Make changes using diff
apply_diff(
path="src/auth.py",
diff=\"\"\"--- a/src/auth.py
+++ b/src/auth.py
@@ -10,3 +10,7 @@ class Auth:
+ def login(self, user: str) -> bool:
+ return True
\"\"\",
message="feat(auth): add login method to Auth class"
)
# Create PR
create_pull_request(
title="feat(auth): add login method",
body="Implements login functionality as specified in #42"
)
```
## Tips
- Use `suggest_branch_name(issue_number)` to get branch name suggestions
- Use `list_my_branches()` to track your active work
- Always reference issues in commits: `Refs: #42`
- Use diff-based updates for incremental changes
- Large changes should be split into multiple commits
"""
return output
async def read_ticket(
self,
issue_number: int,
repo: Optional[str] = None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
Read and parse a ticket/issue to understand requirements.
Args:
issue_number: The issue/ticket number to read
repo: Repository in 'owner/repo' format
Returns:
Formatted ticket summary with parsed requirements
"""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured in UserValves settings."
try:
owner, repo_name = self._resolve_repo(repo, __user__)
except ValueError as e:
return f"Error: {e}"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Fetching issue #{issue_number}...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}/issues/{issue_number}"),
headers=self._headers(__user__),
)
response.raise_for_status()
issue = response.json()
title = issue.get("title", "No title")
body = issue.get("body", "")
state = issue.get("state", "unknown")
user = issue.get("user", {}).get("login", "unknown")
labels = [label.get("name", "") for label in issue.get("labels", [])]
created_at = issue.get("created_at", "")[:10]
html_url = issue.get("html_url", "")
# Parse body for structured info
testing_criteria = []
technical_notes = []
is_testing_required = False
is_docs_required = False
body_lower = body.lower()
if "test" in body_lower or "testing" in body_lower:
is_testing_required = True
# Try to extract testing criteria
testing_section = re.search(
r"(?:testing|criteria|test).*?:(.*?)(?:\n\n|$)",
body,
re.IGNORECASE | re.DOTALL,
)
if testing_section:
testing_criteria = [
line.strip().lstrip("-*•")
for line in testing_section.group(1).split("\n")
if line.strip()
]
if "documentation" in body_lower or "docs" in body_lower:
is_docs_required = True
# Check for technical notes section
tech_section = re.search(
r"(?:technical|tech).*?:(.*?)(?:\n\n|$)",
body,
re.IGNORECASE | re.DOTALL,
)
if tech_section:
technical_notes = [
line.strip().lstrip("-*•")
for line in tech_section.group(1).split("\n")
if line.strip()
]
# Extract issue references
issue_refs = self._parse_issue_refs(body)
if not any(ref == f"#{issue_number}" for ref in issue_refs):
issue_refs.insert(0, f"#{issue_number}")
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
output = f"# 📋 Ticket #{issue_number}: {title}\n\n"
output += f"**State:** {state.upper()} | **Author:** @{user} | **Created:** {created_at}\n"
output += f"**Labels:** {', '.join(labels) if labels else 'None'}\n"
output += f"**URL:** {html_url}\n\n"
if body:
output += "## 📝 Description\n\n"
# Truncate very long descriptions
if len(body) > 1000:
output += f"{body[:1000]}...\n\n"
output += "_Description truncated. Use `get_issue()` for full content._\n\n"
else:
output += f"{body}\n\n"
else:
output += "_No description provided._\n\n"
# Testing requirements
output += "## 🧪 Testing Requirements\n\n"
if is_testing_required:
if testing_criteria:
output += "**Testing Criteria:**\n"
for criterion in testing_criteria:
output += f"- [ ] {criterion}\n"
output += "\n"
else:
output += "Testing required, but no specific criteria listed.\n\n"
else:
output += "No explicit testing requirements detected.\n\n"
# Technical notes
if technical_notes:
output += "## 🔧 Technical Notes\n\n"
for note in technical_notes:
output += f"- {note}\n"
output += "\n"
# Documentation check
if is_docs_required:
output += "## 📚 Documentation Required\n\n"
output += "This ticket mentions documentation needs.\n\n"
# Issue references
if issue_refs:
output += "## 🔗 Related Issues\n\n"
for ref in issue_refs:
output += f"- {ref}\n"
output += "\n"
# Suggested branch name
suggested = self._suggest_branch_name(issue_number, title)
output += "## 🌿 Suggested Branch Name\n\n"
output += f"```\n{suggested}\n```\n\n"
# Next steps
output += "## 🚀 Next Steps\n\n"
output += f"1. Create branch: `create_feature_branch({issue_number}, \"{title[:50]}...\")`\n"
output += "2. Make changes using `apply_diff()` or `commit_changes()`\n"
output += "3. Create PR: `create_pull_request()`\n"
return output.strip()
except httpx.HTTPStatusError as e:
error_msg = self._format_error(e, f"issue #{issue_number}")
if e.response.status_code == 404:
return f"Error: Issue #{issue_number} not found in {owner}/{repo_name}."
return f"Error: Failed to fetch issue. {error_msg}"
except Exception as e:
return f"Error: Unexpected failure during issue fetch: {type(e).__name__}: {e}"
def _suggest_branch_name(
self, issue_number: int, title: str, scope: str = "feature"
) -> str:
"""
Suggest a branch name based on issue number and title.
Args:
issue_number: The issue number
title: The issue title
scope: Branch scope prefix (default: feature)
Returns:
Suggested branch name in format: scope/issue-id-short-description
"""
# Clean up title for branch name
# Remove special characters, lowercase, replace spaces with hyphens
slug = re.sub(r"[^a-z0-9\s-]", "", title.lower())
slug = re.sub(r"[\s-]+", "-", slug)
slug = slug.strip("-")
# Truncate and add issue number
if len(slug) > 30:
slug = slug[:30].strip("-")
return f"{scope}/{issue_number}-{slug}"
async def suggest_branch_name(
self,
issue_number: int,
repo: Optional[str] = None,
scope: str = "feature",
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
Get branch name suggestions based on issue number.
Args:
issue_number: The issue number
repo: Repository in 'owner/repo' format
scope: Branch scope prefix
__user__: User context
Returns:
Suggested branch name
"""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured."
try:
owner, repo_name = self._resolve_repo(repo, __user__)
except ValueError as e:
return f"Error: {e}"
# Fetch issue title
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}/issues/{issue_number}"),
headers=self._headers(__user__),
)
response.raise_for_status()
issue = response.json()
title = issue.get("title", "")
except Exception:
title = ""
suggested = self._suggest_branch_name(issue_number, title, scope)
# Check if branch exists
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
branch_response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}/branches/{suggested}"),
headers=self._headers(__user__),
)
if branch_response.status_code == 200:
suggested += " (already exists)"
except Exception:
pass
return suggested
async def create_feature_branch(
self,
issue_number: int,
title: Optional[str] = None,
repo: Optional[str] = None,
scope: str = "feature",
__user__: dict = None,
__chat_id__: str = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
Create a new feature branch for a ticket.
This is the main entry point for the coder workflow. It:
1. Validates the branch name against allowed scopes
2. Prevents commits to protected branches
3. Creates the branch from the default/base branch
4. Caches the branch name for the session
Args:
issue_number: The ticket/issue number
title: Optional title (will fetch from issue if not provided)
repo: Repository in 'owner/repo' format
scope: Branch scope (feature, fix, refactor, etc.)
__chat_id__: Session ID for caching
__user__: User context
Returns:
Branch creation confirmation
"""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured in UserValves settings."
try:
owner, repo_name = self._resolve_repo(repo, __user__)
except ValueError as e:
return f"Error: {e}"
# Fetch title from issue if not provided
if not title:
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
issue_response = await client.get(
self._api_url(
f"/repos/{owner}/{repo_name}/issues/{issue_number}"
),
headers=self._headers(__user__),
)
issue_response.raise_for_status()
issue = issue_response.json()
title = issue.get("title", "")
except Exception as e:
return f"Error: Could not fetch issue #{issue_number}: {e}"
# Generate branch name
branch_name = self._suggest_branch_name(issue_number, title, scope)
# Validate branch name
is_valid, error_msg = self._validate_branch_name(branch_name)
if not is_valid:
return f"❌ **Branch Validation Failed**\n\n{error_msg}"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Creating branch {branch_name}...",
"done": False,
},
}
)
# Get base branch (with caching)
base_branch = self._get_branch(None, __user__)
if __chat_id__:
cached_base = self._get_cached_data(__chat_id__, "default_branch")
if cached_base:
base_branch = cached_base
else:
self._set_cached_data(__chat_id__, "default_branch", base_branch)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.post(
self._api_url(f"/repos/{owner}/{repo_name}/branches"),
headers=self._headers(__user__),
json={
"new_branch_name": branch_name,
"old_branch_name": base_branch,
},
)
# Handle branch already exists
if response.status_code == 409:
return f"⚠️ **Branch Already Exists**\n\nBranch `{branch_name}` already exists in `{owner}/{repo_name}`.\n\nUse it or create a new one."
response.raise_for_status()
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
# Cache the branch for session
if __chat_id__:
self._set_cached_data(__chat_id__, "working_branch", branch_name)
return f"""✅ **Feature Branch Created Successfully**
**Branch:** `{branch_name}`
**Base Branch:** `{base_branch}`
**Repository:** `{owner}/{repo_name}`
**Issue:** #{issue_number}
**Next Steps:**
1. Make changes to files
2. Use `apply_diff()` for incremental changes or `commit_changes()` for full replacements
3. Commit with descriptive messages
4. Create PR when ready: `create_pull_request()`
**Branch Naming Convention:**
- Format: `<scope>/<issue-id>-<description>`
- Scopes: feature, fix, refactor, docs, test, chore, wip
- Examples:
- `feature/42-add-user-authentication`
- `fix/37-fix-memory-leak`
- `refactor/15-cleanup-api-code`
"""
except httpx.HTTPStatusError as e:
error_msg = self._format_error(e, "branch creation")
if e.response.status_code == 409:
return f"Error: Branch `{branch_name}` already exists."
return f"Error: Failed to create branch. {error_msg}"
except Exception as e:
return f"Error: Unexpected failure during branch creation: {type(e).__name__}: {e}"
async def get_branch_status(
self,
repo: Optional[str] = None,
__user__: dict = None,
__chat_id__: str = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
Get the current working branch status for the session.
Args:
repo: Repository in 'owner/repo' format
__chat_id__: Session ID for cache lookup
__user__: User context
Returns:
Current branch and cached info
"""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured."
try:
owner, repo_name = self._resolve_repo(repo, __user__)
except ValueError as e:
return f"Error: {e}"
# Check for cached working branch
working_branch = None
default_branch = None
if __chat_id__:
working_branch = self._get_cached_data(__chat_id__, "working_branch")
default_branch = self._get_cached_data(__chat_id__, "default_branch")
if not default_branch:
default_branch = self._get_branch(None, __user__)
output = f"# 🌿 Branch Status: {owner}/{repo_name}\n\n"
output += f"**Default Branch:** `{default_branch}`\n"
if working_branch:
output += f"**Working Branch:** `{working_branch}`\n\n"
output += "**Session cached - use this branch for commits.**\n"
else:
output += "\n**No working branch set for this session.**\n"
output += "Create one: `create_feature_branch(issue_number, title)`\n"
return output
async def list_my_branches(
self,
repo: Optional[str] = None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
List all branches in the repository (filtered view).
Args:
repo: Repository in 'owner/repo' format
__user__: User context
Returns:
Formatted list of branches
"""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured."
try:
owner, repo_name = self._resolve_repo(repo, __user__)
except ValueError as e:
return f"Error: {e}"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Fetching branches...", "done": False},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}/branches"),
headers=self._headers(__user__),
params={"limit": 50},
)
response.raise_for_status()
branches = response.json()
output = f"# 🌿 Branches in {owner}/{repo_name}\n\n"
# Separate protected and feature branches
protected = [b for b in branches if b.get("protected")]
feature = [
b
for b in branches
if not b.get("protected")
and any(b["name"].startswith(f"{s}/") for s in self.valves.ALLOWED_SCOPES)
]
other = [
b
for b in branches
if not b.get("protected")
and not any(b["name"].startswith(f"{s}/") for s in self.valves.ALLOWED_SCOPES)
]
if protected:
output += "## 🛡️ Protected Branches\n\n"
for branch in sorted(protected, key=lambda x: x["name"]):
name = branch.get("name", "")
commit_sha = branch.get("commit", {}).get("id", "")[:8]
output += f"- `{name}` [commit: {commit_sha}]\n"
output += "\n"
if feature:
output += "## 📦 Feature Branches\n\n"
for branch in sorted(feature, key=lambda x: x["name"]):
name = branch.get("name", "")
commit_sha = branch.get("commit", {}).get("id", "")[:8]
output += f"- `{name}` [commit: {commit_sha}]\n"
output += "\n"
if other:
output += "## 📄 Other Branches\n\n"
for branch in sorted(other, key=lambda x: x["name"])[:20]:
name = branch.get("name", "")
commit_sha = branch.get("commit", {}).get("id", "")[:8]
output += f"- `{name}` [commit: {commit_sha}]\n"
if len(other) > 20:
output += f"\n... and {len(other) - 20} more branches\n"
output += "\n"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
return output.strip()
except httpx.HTTPStatusError as e:
error_msg = self._format_error(e, "branch listing")
return f"Error: Failed to list branches. {error_msg}"
except Exception as e:
return f"Error: Unexpected failure: {type(e).__name__}: {e}"
async def apply_diff(
self,
path: str,
diff_content: str,
message: Optional[str] = None,
repo: Optional[str] = None,
branch: Optional[str] = None,
auto_message: bool = True,
__user__: dict = None,
__chat_id__: str = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
Apply a unified diff patch to a file.
This is the PREFERRED method for making changes as it:
1. Is precise about what changes
2. Prevents accidental file replacements
3. Is what LLMs understand best (trained on GitHub PRs)
Args:
path: File path to update
diff_content: Unified diff in standard format
message: Commit message (auto-generated if not provided)
repo: Repository in 'owner/repo' format
branch: Branch name (defaults to working branch or default)
auto_message: Generate commit message if not provided
__user__: User context
__chat_id__: Session ID for branch caching
Returns:
Commit details and diff summary
"""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured in UserValves settings."
try:
owner, repo_name = self._resolve_repo(repo, __user__)
except ValueError as e:
return f"Error: {e}"
effective_branch = branch
if not effective_branch and __chat_id__:
effective_branch = self._get_cached_data(__chat_id__, "working_branch")
if not effective_branch:
effective_branch = self._get_branch(None, __user__)
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Applying diff to {path}...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
# Get current file content
get_response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._headers(__user__),
params={"ref": effective_branch},
)
# Check if file exists
if get_response.status_code == 404:
return f"Error: File not found: `{path}`. Use `create_file()` to create a new file."
get_response.raise_for_status()
file_info = get_response.json()
current_sha = file_info.get("sha")
# Decode current content
current_content_b64 = file_info.get("content", "")
try:
current_content = base64.b64decode(current_content_b64).decode(
"utf-8"
)
except Exception:
return "Error: Could not decode current file content."
# Parse and apply the diff
new_content = self._apply_unified_diff(current_content, diff_content)
if new_content is None:
return "Error: Failed to parse or apply diff. Check the diff format."
# Generate commit message if needed
if not message:
if auto_message:
message = self._generate_diff_commit_message(path, diff_content)
else:
return "Error: Commit message is required when auto_message=False."
# Commit the changes
new_content_b64 = base64.b64encode(new_content.encode("utf-8")).decode(
"ascii"
)
response = await client.put(
self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._headers(__user__),
json={
"content": new_content_b64,
"message": message,
"branch": effective_branch,
"sha": current_sha,
},
)
response.raise_for_status()
result = response.json()
commit_sha = result.get("commit", {}).get("sha", "unknown")[:8]
# Parse diff stats
added_lines = diff_content.count("+") - diff_content.count("+++")
removed_lines = diff_content.count("-") - diff_content.count("---")
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
output = f"✅ **Diff Applied Successfully**\n\n"
output += f"**File:** `{path}`\n"
output += f"**Branch:** `{effective_branch}`\n"
output += f"**Commit:** `{commit_sha}`\n"
output += f"**Changes:** +{added_lines} lines, -{removed_lines} lines\n\n"
output += f"**Message:** {message}\n"
return output
except httpx.HTTPStatusError as e:
error_msg = self._format_error(e, f"diff application to '{path}'")
if e.response.status_code == 409:
return f"Error: Update conflict for `{path}`. Fetch the latest version and try again."
return f"Error: Failed to apply diff. {error_msg}"
except Exception as e:
return f"Error: Unexpected failure during diff application: {type(e).__name__}: {e}"
def _apply_unified_diff(
self, current_content: str, diff_content: str
) -> Optional[str]:
"""
Apply a unified diff to content.
Args:
current_content: Current file content
diff_content: Unified diff patch
Returns:
New content after applying diff, or None if failed
"""
try:
# Parse the diff
diff_lines = diff_content.splitlines(keepends=True)
# Parse hunks from unified diff
hunks = []
current_hunk = None
in_hunk = False
for line in diff_lines:
if line.startswith("---"):
continue # Skip old filename
elif line.startswith("+++"):
continue # Skip new filename
elif line.startswith("@@"):
# New hunk starts
if current_hunk:
hunks.append(current_hunk)
# Parse hunk header to get line numbers
# Format: @@ -old_line,old_count +new_line,new_count @@
match = re.search(
r"@@\s+-(\d+)(?:,(\d+))?\s+\+(\d+)(?:,(\d+))?\s+@@", line
)
if match:
old_start = int(match.group(1))
new_start = int(match.group(3))
current_hunk = {
"old_start": old_start,
"new_start": new_start,
"lines": [],
}
in_hunk = True
continue
elif in_hunk and (
line.startswith("+") or line.startswith("-") or line.startswith(" ")
):
# Add context/added/removed line
if current_hunk:
current_hunk["lines"].append(line)
elif in_hunk and not line.startswith(
"+"
) and not line.startswith("-") and not line.startswith(" "):
# End of hunk
if current_hunk:
hunks.append(current_hunk)
current_hunk = None
in_hunk = False
if current_hunk:
hunks.append(current_hunk)
# If no hunks, return unchanged
if not hunks:
return current_content
# Split content into lines
old_lines = current_content.splitlines(keepends=True)
# Use difflib to apply the patch
old_lines_stripped = [line.rstrip("\n") for line in old_lines]
# Create a list for the new content
new_lines_stripped = list(old_lines_stripped)
# Apply each hunk in reverse order (to maintain correct indices)
for hunk in sorted(hunks, key=lambda h: h["old_start"], reverse=True):
old_start = hunk["old_start"] - 1 # Convert to 0-indexed
# Collect lines to remove and add
lines_to_remove = []
lines_to_add = []
for line in hunk["lines"]:
if line.startswith("+"):
lines_to_add.append(line[1:].rstrip("\n"))
elif line.startswith("-"):
lines_to_remove.append(line[1:].rstrip("\n"))
# Remove old lines and add new ones
# First, find and remove the exact lines specified
if lines_to_remove:
# Look for the first removed line at old_start
found_start = False
for i in range(old_start, min(old_start + len(lines_to_remove), len(new_lines_stripped))):
if i < len(new_lines_stripped) and new_lines_stripped[i] == lines_to_remove[0]:
# Found the start, remove the lines
del new_lines_stripped[i : i + len(lines_to_remove)]
found_start = True
break
if not found_start:
# Fallback: just insert at old_start
pass
# Insert new lines at old_start
for line in reversed(lines_to_add):
new_lines_stripped.insert(old_start, line)
# Reconstruct content with line endings
new_content = "".join(
line + ("\n" if not line.endswith("\n") and i < len(new_lines_stripped) - 1 else "")
for i, line in enumerate(new_lines_stripped)
)
# Ensure proper line endings
if new_content and not new_content.endswith("\n"):
new_content += "\n"
return new_content
except Exception as e:
# Log the error but don't fail
print(f"Diff application warning: {e}")
return None
def _generate_diff_commit_message(self, path: str, diff_content: str) -> str:
"""
Generate a commit message from diff content.
Args:
path: File path
diff_content: Unified diff
Returns:
Generated commit message
"""
# Extract file name from path
file_name = path.split("/")[-1]
# Detect change type from diff
change_type = "chore"
if any(
line.startswith("+def ") or line.startswith("+class ")
for line in diff_content.splitlines()
):
change_type = "feat"
elif any(
line.startswith("+ return ") or line.startswith("+ return ")
for line in diff_content.splitlines()
):
change_type = "fix"
elif "test" in path.lower() or "spec" in path.lower():
change_type = "test"
elif ".md" in path.lower() or "readme" in path.lower():
change_type = "docs"
elif any(line.startswith("-") for line in diff_content.splitlines()):
change_type = "refactor"
# Generate message
message = f"{change_type}({file_name}): "
# Extract a short description from added lines
added_lines = [
line[1:].strip()
for line in diff_content.splitlines()
if line.startswith("+") and not line.startswith("+++")
]
if added_lines:
# Use first meaningful added line
description = ""
for line in added_lines:
if line and not line.startswith("import ") and not line.startswith("from "):
# Get function/class definition or first statement
match = re.match(
r"(def|class|const|var|let|interface|type)\s+(\w+)", line
)
if match:
kind = match.group(1)
name = match.group(2)
if kind == "def":
description = f"add {name}() function"
break
elif kind == "class":
description = f"add {name} class"
break
elif line.startswith(" ") or line.startswith("\t"):
# Indented line, skip
continue
else:
# Use as description
description = line[:50].rstrip(":")
if len(line) > 50:
description += "..."
break
if not description:
# Fallback to line count
added_count = len(added_lines)
description = f"update ({added_count} lines added)"
message += description
return message
async def commit_changes(
self,
path: str,
content: str,
message: Optional[str] = None,
repo: Optional[str] = None,
branch: Optional[str] = None,
max_delta_percent: Optional[float] = None,
__user__: dict = None,
__chat_id__: str = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
Commit file changes with automatic content detection and size delta gating.
This method:
1. Detects whether to create or replace a file
2. Validates file size changes against threshold (quality gate)
3. Auto-generates commit message if not provided
4. Commits to the appropriate branch
Args:
path: File path to create or update
content: New file content
message: Commit message (auto-generated if not provided)
repo: Repository in 'owner/repo' format
branch: Branch name (defaults to working branch or default)
max_delta_percent: Override for size delta threshold (quality gate)
__user__: User context
__chat_id__: Session ID for branch caching
__event_emitter__: Event emitter for progress
Returns:
Commit details or error with guidance
"""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured in UserValves settings."
try:
owner, repo_name = self._resolve_repo(repo, __user__)
except ValueError as e:
return f"Error: {e}"
effective_branch = branch
if not effective_branch and __chat_id__:
effective_branch = self._get_cached_data(__chat_id__, "working_branch")
if not effective_branch:
effective_branch = self._get_branch(None, __user__)
# Use provided threshold or default from valves
delta_threshold = max_delta_percent or self.valves.MAX_SIZE_DELTA_PERCENT
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Processing {path}...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
# Check if file exists
get_response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._headers(__user__),
params={"ref": effective_branch},
)
file_exists = get_response.status_code == 200
current_sha = None
current_size = 0
if file_exists:
get_response.raise_for_status()
file_info = get_response.json()
current_sha = file_info.get("sha")
current_size = file_info.get("size", 0)
# SIZE DELTA GATE - Quality check
new_size = len(content.encode("utf-8"))
if current_size > 0 and new_size > 0:
delta_percent = abs(new_size - current_size) / current_size * 100
if delta_percent > delta_threshold:
# Calculate actual bytes changed
size_diff = new_size - current_size
direction = "larger" if size_diff > 0 else "smaller"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": "Size gate triggered",
"done": True,
"hidden": True,
},
}
)
return f"""⚠️ **Quality Gate: Large File Change Detected**
**File:** `{path}`
**Current Size:** {current_size} bytes
**New Size:** {new_size} bytes
**Change:** {size_diff:+d} bytes ({delta_percent:.1f}% {direction})
**Threshold:** {delta_threshold}%
This change exceeds the size delta threshold, which may indicate:
- Accidental full file replacement
- Unintended data loss
- LLM confusion about existing content
**Recommended Actions:**
1. **Use diff-based updates** (preferred):
```python
apply_diff(
path="{path}",
diff=\"\"\"--- a/{path}
+++ b/{path}
@@ -1,3 +1,5 @@
existing line
+new line to add
-line to remove
\"\"\",
message="feat(scope): description of changes"
)
```
2. **Fetch and review current content**:
```python
current = get_file("{path}")
# Compare with what you want to change
```
3. **Commit with override** (not recommended):
Increase the threshold if this is intentional:
```python
commit_changes(..., max_delta_percent=100)
```
**Why this gate exists:**
Large file replacements by LLMs often indicate the model didn't properly understand the existing file structure. Using diffs ensures precise, targeted changes.
"""
# Generate commit message if not provided
if not message:
message = self._generate_commit_message(
change_type="chore",
scope=path.split("/")[-1] if "/" in path else path,
description=f"update {path}",
)
# Prepare content
content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii")
if file_exists:
# Replace existing file
if not current_sha:
return f"Error: Could not retrieve SHA for existing file: {path}"
response = await client.put(
self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._headers(__user__),
json={
"content": content_b64,
"message": message,
"branch": effective_branch,
"sha": current_sha,
},
)
else:
# Create new file
response = await client.post(
self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._headers(__user__),
json={
"content": content_b64,
"message": message,
"branch": effective_branch,
},
)
response.raise_for_status()
result = response.json()
commit_sha = result.get("commit", {}).get("sha", "unknown")[:8]
action = "Updated" if file_exists else "Created"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
# Calculate and show size change
new_size = len(content.encode("utf-8"))
size_info = ""
if file_exists and current_size > 0:
delta = new_size - current_size
delta_percent = delta / current_size * 100
size_info = f"**Size Change:** {delta:+d} bytes ({delta_percent:.1f}%)\n"
output = f"""✅ **{action} File Successfully**
**File:** `{path}`
**Branch:** `{effective_branch}`
**Commit:** `{commit_sha}`
**Message:** {message}
{size_info}**Action:** {action}
"""
return output
except httpx.HTTPStatusError as e:
error_msg = self._format_error(e, f"file commit for '{path}'")
if e.response.status_code == 409:
return f"Error: Update conflict for `{path}`. Fetch the latest version and try again."
return f"Error: Failed to commit file. {error_msg}"
except Exception as e:
return f"Error: Unexpected failure during commit: {type(e).__name__}: {e}"
async def create_pull_request(
self,
title: str,
body: Optional[str] = "",
repo: Optional[str] = None,
__user__: dict = None,
__chat_id__: str = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
Create a pull request from the current branch.
Args:
title: PR title
body: PR description (auto-populates from issue if linked)
repo: Repository in 'owner/repo' format
__user__: User context
__chat_id__: Session ID for branch caching
__event_emitter__: Event emitter for progress
Returns:
PR creation confirmation with details
"""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured."
try:
owner, repo_name = self._resolve_repo(repo, __user__)
except ValueError as e:
return f"Error: {e}"
# Get current branch
head_branch = None
if __chat_id__:
head_branch = self._get_cached_data(__chat_id__, "working_branch")
if not head_branch:
# Try to guess from recent commits or use default
head_branch = self._get_branch(None, __user__)
base_branch = self._get_branch(None, __user__)
# Validate that head branch is not a protected branch
is_valid, error_msg = self._validate_branch_name(head_branch)
if not is_valid:
return f"❌ **Cannot Create PR**\n\n{error_msg}\n\nCreate a feature branch first using `create_feature_branch()`."
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Creating PR...", "done": False},
}
)
try:
# Auto-populate body with issue reference if not provided
if not body:
# Try to extract issue number from branch name
match = re.search(r"/(\d+)-", head_branch)
if match:
issue_number = match.group(1)
body = f"Closes #{issue_number}\n\nThis PR implements the changes from issue #{issue_number}."
else:
body = "Automated PR from gitea_coder workflow."
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.post(
self._api_url(f"/repos/{owner}/{repo_name}/pulls"),
headers=self._headers(__user__),
json={
"title": title,
"head": head_branch,
"base": base_branch,
"body": body,
},
)
# Handle PR already exists
if response.status_code == 409:
return f"⚠️ **PR Already Exists**\n\nA pull request for branch `{head_branch}` → `{base_branch}` already exists.\n\nCheck existing PRs and update it instead."
response.raise_for_status()
pr = response.json()
pr_number = pr.get("number")
pr_url = pr.get("html_url", "")
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
return f"""✅ **Pull Request Created Successfully**
**PR #{pr_number}:** {title}
**Branch:** `{head_branch}` → `{base_branch}`
**URL:** {pr_url}
**Description:**
{body}
**Next Steps:**
1. Add reviewers if needed
2. Address any merge conflicts
3. Await review feedback
**To check PR status:**
```python
get_pull_request({pr_number})
```
"""
except httpx.HTTPStatusError as e:
error_msg = self._format_error(e, "PR creation")
if e.response.status_code == 422:
return "Error: Could not create PR. The branch may not exist or there may be merge conflicts."
return f"Error: Failed to create PR. {error_msg}"
except Exception as e:
return f"Error: Unexpected failure during PR creation: {type(e).__name__}: {e}"
async def replace_file(
self,
path: str,
content: str,
message: str,
repo: Optional[str] = None,
branch: Optional[str] = None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
Update an existing file in the repository (creates commit).
WARNING: This replaces the entire file content. For incremental changes,
use `apply_diff()` instead to prevent accidental data loss.
Args:
path: File path to update
content: New file content as string
message: Commit message
repo: Repository in 'owner/repo' format
branch: Branch name (defaults to repository default)
__user__: User context
__event_emitter__: Event emitter for progress
Returns:
Commit details and success confirmation
"""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured in UserValves settings."
try:
owner, repo_name = self._resolve_repo(repo, __user__)
except ValueError as e:
return f"Error: {e}"
effective_branch = self._get_branch(branch, __user__)
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": f"Updating {path}...", "done": False},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
# Get current file SHA
get_response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._headers(__user__),
params={"ref": effective_branch},
)
if get_response.status_code == 404:
return f"Error: File not found: `{path}`. Use `create_file()` to create a new file, or `apply_diff()` to add content to a new file."
get_response.raise_for_status()
file_info = get_response.json()
sha = file_info.get("sha")
if not sha:
return "Error: Could not retrieve file SHA for update."
# Prepare updated content
content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii")
# Update file
response = await client.put(
self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._headers(__user__),
json={
"content": content_b64,
"message": message,
"branch": effective_branch,
"sha": sha,
},
)
response.raise_for_status()
result = response.json()
commit_sha = result.get("commit", {}).get("sha", "unknown")[:8]
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
output = f"**File Updated Successfully**\n\n"
output += f"**File:** `{owner}/{repo_name}/{path}`\n"
output += f"**Branch:** `{effective_branch}`\n"
output += f"**Commit:** `{commit_sha}`\n"
output += f"**Message:** {message}\n\n"
output += "_Use `apply_diff()` for incremental changes to prevent data loss._\n"
return output
except httpx.HTTPStatusError as e:
error_msg = self._format_error(e, f"file update for '{path}'")
if e.response.status_code == 409:
return f"Error: Update conflict for `{path}`. Fetch the latest version and try again."
return f"Error: Failed to update file. {error_msg}"
except Exception as e:
return f"Error: Unexpected failure during file update: {type(e).__name__}: {e}"
async def create_file(
self,
path: str,
content: str,
message: str,
repo: Optional[str] = None,
branch: Optional[str] = None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
Create a new file in the repository.
For adding content to existing files, use `apply_diff()` instead.
Args:
path: File path to create (e.g., 'docs/README.md')
content: Initial file content as string
message: Commit message
repo: Repository in 'owner/repo' format
branch: Branch name (defaults to repository default)
__user__: User context
__event_emitter__: Event emitter for progress
Returns:
Commit details and success confirmation
"""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured in UserValves settings."
try:
owner, repo_name = self._resolve_repo(repo, __user__)
except ValueError as e:
return f"Error: {e}"
effective_branch = self._get_branch(branch, __user__)
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": f"Creating {path}...", "done": False},
}
)
try:
content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii")
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.post(
self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._headers(__user__),
json={
"content": content_b64,
"message": message,
"branch": effective_branch,
},
)
response.raise_for_status()
result = response.json()
commit_sha = result.get("commit", {}).get("sha", "unknown")[:8]
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
output = f"**File Created Successfully**\n\n"
output += f"**File:** `{owner}/{repo_name}/{path}`\n"
output += f"**Branch:** `{effective_branch}`\n"
output += f"**Commit:** `{commit_sha}`\n"
output += f"**Message:** {message}\n"
return output
except httpx.HTTPStatusError as e:
error_msg = self._format_error(e, f"file creation for '{path}'")
if e.response.status_code == 422:
return f"Error: File already exists: `{path}`. Use `replace_file()` or `apply_diff()` to modify it."
return f"Error: Failed to create file. {error_msg}"
except Exception as e:
return f"Error: Unexpected failure during file creation: {type(e).__name__}: {e}"
async def get_file(
self,
path: str,
repo: Optional[str] = None,
branch: Optional[str] = None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
Get the contents of a file from the repository.
Args:
path: Full path to the file (e.g., 'src/main.py')
repo: Repository in 'owner/repo' format
branch: Branch name (defaults to repository default)
__user__: User context
__event_emitter__: Event emitter for progress
Returns:
File content with metadata (SHA, size, branch)
"""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured in UserValves settings."
try:
owner, repo_name = self._resolve_repo(repo, __user__)
except ValueError as e:
return f"Error: {e}"
effective_branch = self._get_branch(branch, __user__)
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Reading file {path}...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._headers(__user__),
params={"ref": effective_branch},
)
response.raise_for_status()
file_info = response.json()
if isinstance(file_info, list):
return f"**Note:** '{path}' is a directory. Use `list_files()` to browse its contents."
if file_info.get("type") != "file":
return f"Error: '{path}' is not a file (type: {file_info.get('type')})"
content_b64 = file_info.get("content", "")
try:
content = base64.b64decode(content_b64).decode("utf-8")
except Exception:
return "Error: Could not decode file content. The file may be binary or corrupted."
size = file_info.get("size", 0)
sha_short = file_info.get("sha", "unknown")[:8]
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
output = f"**File:** `{owner}/{repo_name}/{path}`\n"
output += f"**Branch:** `{effective_branch}` | **SHA:** `{sha_short}` | **Size:** {size} bytes\n"
output += f"**URL:** {file_info.get('html_url', 'N/A')}\n\n"
output += f"```\n{content}\n```\n"
return output
except httpx.HTTPStatusError as e:
error_msg = self._format_error(e, f"file fetch for '{path}'")
if e.response.status_code == 404:
return f"Error: File not found: `{path}`. Verify the file path and branch."
return f"Error: Failed to fetch file. {error_msg}"
except Exception as e:
return f"Error: Unexpected failure during file fetch: {type(e).__name__}: {e}"
async def list_files(
self,
path: str = "",
repo: Optional[str] = None,
branch: Optional[str] = None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
List files and directories in a repository path.
Args:
path: Directory path to list (default: root)
repo: Repository in 'owner/repo' format
branch: Branch name (defaults to repository default)
__user__: User context
__event_emitter__: Event emitter for progress
Returns:
Formatted directory listing with file sizes and types
"""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured in UserValves settings."
try:
owner, repo_name = self._resolve_repo(repo, __user__)
except ValueError as e:
return f"Error: {e}"
effective_branch = self._get_branch(branch, __user__)
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Listing {path or 'root'}...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._headers(__user__),
params={"ref": effective_branch},
)
response.raise_for_status()
contents = response.json()
if isinstance(contents, dict):
return f"**Note:** '{path}' is a file. Use `get_file()` to read its contents."
output = f"**Contents of {owner}/{repo_name}/{path or '.'}** (`{effective_branch}` branch)\n\n"
dirs = [item for item in contents if item.get("type") == "dir"]
files = [item for item in contents if item.get("type") == "file"]
if dirs:
output += "**📁 Directories:**\n"
for item in sorted(dirs, key=lambda x: x.get("name", "").lower()):
output += f"- `📁 {item.get('name', '')}/`\n"
output += "\n"
if files:
output += "**📄 Files:**\n"
for item in sorted(files, key=lambda x: x.get("name", "").lower()):
size = item.get("size", 0)
if size < 1024:
size_str = f"{size}B"
elif size < 1024 * 1024:
size_str = f"{size//1024}KB"
else:
size_str = f"{size//(1024*1024)}MB"
sha_short = item.get("sha", "unknown")[:8]
output += f"- `{item.get('name', '')}` ({size_str}, sha: {sha_short})\n"
output += f"\n**Total:** {len(dirs)} directories, {len(files)} files"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
return output
except httpx.HTTPStatusError as e:
error_msg = self._format_error(e, f"directory listing for '{path}'")
if e.response.status_code == 404:
return f"Error: Path not found: `{path}`. Verify the path exists in the repository."
return f"Error: Failed to list directory contents. {error_msg}"
except Exception as e:
return f"Error: Unexpected failure during directory listing: {type(e).__name__}: {e}"