Files
tools/gitea/coder.py
xcaliber 9ec4bb3491 feat(gitea): critical refactor - automatic branch management (v1.0.0)
CRITICAL ARCHITECTURAL CHANGE:
- LLM no longer manages branch names - system derives from chat_id
- All operations automatically use current working branch from chat_id session
- Branch parameter REMOVED from all operation functions
- Added _get_working_branch() helper for automatic branch detection
- Added _generate_branch_name() for system-managed branch naming
- All functions now use __chat_id__ parameter for branch detection

Complete file operations (NO branch parameter needed!):
- apply_diff() - diff-based updates
- commit_changes() - commit with size delta gating  
- create_file() - create new files
- replace_file() - replace entire file
- delete_file() - delete files  NEW
- rename_file() - rename files  NEW
- get_file() - read files
- list_files() - list directory contents

Updated workflow:
- create_feature_branch() generates branch name from issue_number + scope
- System caches: chat_id → working_branch
- All file operations use cached working branch from chat_id
- LLM focuses on code/content, not infrastructure

All functions now:
- Use __chat_id__ parameter for branch detection (REQUIRED)
- Get working branch from session cache (system-managed)
- Never accept branch names from LLM input

Documentation updated:
- workflow_summary() shows critical change notice
- All function docs updated to reflect automatic branch management
- Examples updated to show __chat_id__ usage

Version remains 1.0.0 (no release yet)

Refs: #11
2026-01-17 16:39:02 +00:00

2557 lines
94 KiB
Python

"""
title: Gitea Coder - Workflow Role with Automatic Branch Management
author: Jeff Smith + minimax
version: 1.0.0
license: MIT
description: High-level workflow role for LLM-based code generation with automatic branch management and quality gates
requirements: pydantic, httpx
changelog:
1.0.0:
- Initial implementation of gitea_coder role
- Automatic branch management (system derives from chat_id)
- LLM no longer manages branch names
- Branch creation with scope gating (prevents main pushes)
- Enforces branch naming conventions (feature/, fix/, refactor/, etc.)
- Generates detailed commit messages with ticket references
- Creates PRs from branches
- Reads ticket requirements from issues
- Unified file operations workflow
- Diff-based updates with apply_diff()
- Size delta gating in commit_changes() for quality control
- Complete CRUD operations: create_file, replace_file, delete_file, rename_file
"""
from typing import Optional, Callable, Any, Dict, List, Tuple
from pydantic import BaseModel, Field
import re
import time
import base64
import difflib
import httpx
class Tools:
"""
Gitea Coder Role - High-level workflow automation for code generation tasks.
CRITICAL ARCHITECTURAL CHANGE (v1.0.0):
- LLM no longer manages branch names
- System automatically derives branch from chat_id session
- All operations use __chat_id__ parameter for branch detection
- create_feature_branch() generates branch name, then caches for session
- LLM focuses on code/content, not infrastructure
Workflow:
reads ticket → creates/modifies branch (system-managed) → commits with detailed messages
Key Features:
- Automatic branch management (chat_id → working_branch mapping)
- Branch scope gating (prevents main/master pushes)
- Enforces branch naming conventions
- Auto-generates conventional commit messages
- Quality gates for file changes (size delta validation)
- Diff-based updates to prevent accidental file replacements
- Complete CRUD operations (create, replace, delete, rename)
"""
class Valves(BaseModel):
"""System-wide configuration for Gitea Coder integration"""
GITEA_URL: str = Field(
default="https://gitea.example.com",
description="Gitea server URL (ingress or internal service)",
)
DEFAULT_REPO: str = Field(
default="",
description="Default repository in owner/repo format",
)
DEFAULT_BRANCH: str = Field(
default="main",
description="Default branch name for operations",
)
DEFAULT_ORG: str = Field(
default="",
description="Default organization for org-scoped operations",
)
ALLOW_USER_OVERRIDES: bool = Field(
default=True,
description="Allow users to override defaults via UserValves",
)
VERIFY_SSL: bool = Field(
default=True,
description="Verify SSL certificates (disable for self-signed certs)",
)
DEFAULT_PAGE_SIZE: int = Field(
default=50,
description="Default page size for list operations (max 50)",
ge=1,
le=50,
)
# Coder-specific settings
MAX_SIZE_DELTA_PERCENT: float = Field(
default=50.0,
description="Maximum allowed file size change percentage (quality gate)",
ge=1.0,
le=500.0,
)
PROTECTED_BRANCHES: List[str] = Field(
default=["main", "master", "develop", "dev", "release", "hotfix"],
description="Branches that cannot be committed to directly",
)
ALLOWED_SCOPES: List[str] = Field(
default=["feature", "fix", "refactor", "docs", "test", "chore", "wip"],
description="Allowed branch scope prefixes",
)
class UserValves(BaseModel):
"""Per-user configuration for personal credentials and overrides"""
GITEA_TOKEN: str = Field(
default="",
description="Your Gitea API token",
)
USER_DEFAULT_REPO: str = Field(
default="",
description="Override default repository for this user",
)
USER_DEFAULT_BRANCH: str = Field(
default="",
description="Override default branch for this user",
)
USER_DEFAULT_ORG: str = Field(
default="",
description="Override default organization for this user",
)
def __init__(self):
"""Initialize with optional valve configuration from framework"""
# Handle valves configuration from framework
self.valves = self.Valves()
# Enable tool usage visibility for debugging
self.citation = True
# Handle user valves configuration
self.user_valves = self.UserValves()
# Session cache: chat_id → {working_branch, default_branch} (with TTL)
self._session_cache: Dict[str, Tuple[Dict[str, Any], float]] = {}
self._cache_ttl_seconds = 3600 # 1 hour
# Initialize underlying dev operations (for actual API calls)
self._dev = None
def _api_url(self, endpoint: str) -> str:
"""Construct full API URL for Gitea endpoint"""
base = self._get_url()
return f"{base}/api/v1{endpoint}"
def _get_url(self) -> str:
"""Get effective Gitea URL with trailing slash handling"""
return self.valves.GITEA_URL.rstrip("/")
def _get_token(self, __user__: dict = None) -> str:
"""Extract Gitea token from user context with robust handling"""
if __user__ and "valves" in __user__:
user_valves = __user__.get("valves")
if user_valves:
return user_valves.GITEA_TOKEN
return ""
def _headers(self, __user__: dict = None) -> dict:
"""Generate authentication headers with token"""
token = self._get_token(__user__)
if not token:
return {"Content-Type": "application/json"}
return {
"Authorization": f"token {token}",
"Content-Type": "application/json",
}
def _format_error(self, e, context: str = "") -> str:
"""Format HTTP error with detailed context for LLM understanding"""
try:
error_json = e.response.json()
error_msg = error_json.get("message", e.response.text[:200])
except Exception:
error_msg = e.response.text[:200]
context_str = f" ({context})" if context else ""
return f"HTTP Error {e.response.status_code}{context_str}: {error_msg}"
def _get_repo(self, repo: Optional[str], __user__: dict = None) -> str:
"""Get effective repository with priority resolution"""
if repo:
return repo
if __user__ and "valves" in __user__:
user_valves = __user__.get("valves")
if user_valves:
if self.valves.ALLOW_USER_OVERRIDES and user_valves.USER_DEFAULT_REPO:
return user_valves.USER_DEFAULT_REPO
return self.valves.DEFAULT_REPO
def _get_branch(self, branch: Optional[str], __user__: dict = None) -> str:
"""Get effective branch with priority resolution"""
if branch:
return branch
if __user__ and "valves" in __user__:
user_valves = __user__.get("valves")
if user_valves:
if self.valves.ALLOW_USER_OVERRIDES and user_valves.USER_DEFAULT_BRANCH:
return user_valves.USER_DEFAULT_BRANCH
return self.valves.DEFAULT_BRANCH
def _get_org(self, org: Optional[str], __user__: dict = None) -> str:
"""Get effective org with priority."""
if org:
return org
if __user__ and "valves" in __user__:
user_valves = __user__.get("valves")
if user_valves:
if self.valves.ALLOW_USER_OVERRIDES and user_valves.USER_DEFAULT_ORG:
return user_valves.USER_DEFAULT_ORG
return self.valves.DEFAULT_ORG
def _resolve_repo(
self, repo: Optional[str], __user__: dict = None
) -> tuple[str, str]:
"""Resolve repository string into owner and repo name with validation"""
effective_repo = self._get_repo(repo, __user__)
if not effective_repo:
raise ValueError(
"No repository specified. Set DEFAULT_REPO in Valves or USER_DEFAULT_REPO in UserValves."
)
if "/" not in effective_repo:
raise ValueError(
f"Repository must be in 'owner/repo' format, got: {effective_repo}"
)
return effective_repo.split("/", 1)
def _get_page_size(self, limit: Optional[int] = None) -> int:
"""Calculate effective page size, capped at Gitea's max of 50"""
if limit is not None:
return min(limit, 50)
return min(self.valves.DEFAULT_PAGE_SIZE, 50)
def _get_cached_data(self, chat_id: str, key: str) -> Optional[Any]:
"""Get cached data for chat session with TTL"""
if not chat_id:
return None
cache_key = f"{chat_id}"
if cache_key in self._session_cache:
data, timestamp = self._session_cache[cache_key]
if time.time() - timestamp < self._cache_ttl_seconds:
return data.get(key)
else:
# Expired, remove from cache
del self._session_cache[cache_key]
return None
def _set_cached_data(self, chat_id: str, key: str, data: Any) -> None:
"""Set cached data for chat session with TTL"""
if not chat_id:
return
cache_key = f"{chat_id}"
if cache_key not in self._session_cache:
self._session_cache[cache_key] = ({}, time.time())
self._session_cache[cache_key][0][key] = data
self._session_cache[cache_key] = (self._session_cache[cache_key][0], time.time())
def _get_working_branch(
self, __chat_id__: str = None, __user__: dict = None
) -> str:
"""
Get the working branch for the current session.
CRITICAL: This is the primary method for branch detection.
- LLM should NOT pass branch names to functions
- System automatically derives branch from chat_id
- Falls back to default branch if no session cached
Args:
__chat_id__: Session ID for cache lookup
__user__: User context
Returns:
Working branch name (system-managed)
"""
# Try to get from session cache first
if __chat_id__:
working_branch = self._get_cached_data(__chat_id__, "working_branch")
if working_branch:
return working_branch
# Fall back to default branch
return self._get_branch(None, __user__)
def _generate_branch_name(
self, issue_number: int, title: str, scope: str = "feature"
) -> str:
"""
System-managed branch name generation.
LLM should NOT call this directly - called by create_feature_branch()
Args:
issue_number: The issue/ticket number
title: Issue title (for description)
scope: Branch scope prefix
Returns:
Branch name in format: scope/issue-id-short-description
"""
# Clean up title for branch name
slug = re.sub(r"[^a-z0-9\s-]", "", title.lower())
slug = re.sub(r"[\s-]+", "-", slug)
slug = slug.strip("-")
# Truncate and add issue number
if len(slug) > 30:
slug = slug[:30].strip("-")
return f"{scope}/{issue_number}-{slug}"
def _validate_branch_name(self, branch_name: str) -> tuple[bool, str]:
"""
Validate branch name against allowed scopes and protected branches.
Returns:
tuple: (is_valid, error_message)
"""
# Check if it's a protected branch (direct commit attempt)
if branch_name in self.valves.PROTECTED_BRANCHES:
return False, (
f"Branch '{branch_name}' is protected. "
f"Direct commits to protected branches are not allowed. "
f"Create a feature branch first using create_feature_branch()."
)
# Check if it starts with an allowed scope
scope_pattern = r"^(" + "|".join(self.valves.ALLOWED_SCOPES) + r")/"
if not re.match(scope_pattern, branch_name):
allowed = ", ".join([f"{s}/" for s in self.valves.ALLOWED_SCOPES])
return False, (
f"Branch '{branch_name}' does not follow naming convention. "
f"Use format: {allowed}<issue-id>-<description>. "
f"Example: feature/42-add-user-auth"
)
return True, ""
def _parse_issue_refs(self, text: str) -> List[str]:
"""Extract issue references from text (e.g., #42, issue #42)"""
refs = re.findall(r"#(\d+)", text)
issue_refs = [f"#{ref}" for ref in refs]
# Also check for "issue N" pattern
issue_n_refs = re.findall(r"issue\s*#?(\d+)", text, re.IGNORECASE)
for ref in issue_n_refs:
issue_ref = f"#{ref}"
if issue_ref not in issue_refs:
issue_refs.append(issue_ref)
return issue_refs
def _generate_commit_message(
self,
change_type: str,
scope: str,
description: str,
issue_refs: Optional[List[str]] = None,
body: Optional[str] = None,
) -> str:
"""
Generate a conventional commit message.
Format: scope(type): description
Args:
change_type: Type of change (feat, fix, docs, etc.)
scope: Area of change (file, module, or component)
description: Brief description of changes
issue_refs: List of issue references (e.g., ["#42"])
body: Optional longer description
Returns:
Formatted commit message
"""
# Validate and normalize change type
valid_types = [
"feat", "fix", "docs", "style", "refactor", "test",
"chore", "perf", "ci", "build", "revert"
]
if change_type.lower() not in valid_types:
change_type = "chore" # Default for unknown types
# Build the subject line
scope_str = f"({scope})" if scope else ""
message = f"{change_type.lower()}{scope_str}: {description}"
# Add issue references to body or footer
if issue_refs:
refs_str = ", ".join(issue_refs)
footer = f"Refs: {refs_str}"
if body:
body = f"{body}\n\n{footer}"
else:
message = f"{message}\n\n{footer}"
return message
async def workflow_summary(
self,
__user__: dict = None,
__chat_id__: str = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
Get a summary of available coder workflows and commands.
CRITICAL: LLM does NOT manage branch names. System handles it automatically.
Returns:
Markdown-formatted workflow guide
"""
# Get current working branch
working_branch = self._get_working_branch(__chat_id__, __user__)
output = f"""# 🚀 Gitea Coder Workflow Guide (v1.0.0)
## ⚠️ CRITICAL ARCHITECTURAL CHANGE
**LLM no longer manages branch names!**
- System automatically derives branch from chat_id session
- LLM should NOT pass branch names to functions
- Use __chat_id__ parameter for automatic branch detection
## Quick Start
1. **Read the ticket:** `read_ticket(issue_number)`
2. **Create feature branch:** `create_feature_branch(issue_number, title)`
- System generates and caches branch name
3. **Make changes:** `apply_diff()` or `commit_changes()`
- Branch automatically detected from chat_id
4. **Create PR:** `create_pull_request(title)`
- Uses cached working branch
## Available Commands
### 📋 Reading Tickets
- `read_ticket(issue_number)` - Get full issue details
### 🌿 Branch Management
- `create_feature_branch(issue_number, title)` - System creates & caches branch
- `get_branch_status()` - See current working branch (from cache)
- `list_my_branches()` - List all branches in repository
### 📝 File Operations (NO branch parameter needed!)
- `apply_diff(path, diff, message)` - Apply unified diff patch
- `commit_changes(path, content, message)` - Commit with size delta gate
- `replace_file(path, content, message)` - Replace entire file
- `create_file(path, content, message)` - Create new file
- `delete_file(path, message)` - Delete a file ⭐ NEW
- `rename_file(old_path, new_path, message)` - Rename a file ⭐ NEW
- `get_file(path)` - Read file content
- `list_files(path)` - List directory contents
### 🔍 Quality Gates
- Size delta checks (default: 50% max change)
- Branch scope validation
- Protected branch enforcement
### 📦 Pull Requests
- `create_pull_request(title, description)` - Create PR from current branch
## How Branch Management Works
```
User creates chat with chat_id = "abc123"
1. create_feature_branch(42, "Add login")
System generates: "feature/42-add-login"
System creates branch
System caches: chat_id "abc123""feature/42-add-login"
2. apply_diff(path="auth.py", diff="...")
System checks cache: "abc123""feature/42-add-login"
System automatically uses that branch
3. commit_changes(path="config.py", content="...")
System checks cache: "abc123""feature/42-add-login"
System automatically uses that branch
```
**LLM never needs to know or manage branch names!**
## Branch Naming Convention (System-Generated)
```
<scope>/<issue-id>-<short-description>
Examples:
- feature/42-add-user-login
- fix/37-fix-memory-leak
- refactor/15-cleanup-api
- docs/20-update-readme
```
Allowed scopes: `feature/`, `fix/`, `refactor/`, `docs/`, `test/`, `chore/`, `wip/`
## Quality Gates
### Size Delta Gate (commit_changes)
- Files > 50% size change require diff-based updates
- Prevents accidental file replacements
- Configurable threshold in Valves
### Branch Protection
- Cannot commit directly to: main, master, develop, dev, release, hotfix
- Create feature branches instead
## Example Workflow
```python
# Read the ticket
ticket = read_ticket(42)
# Create branch (system generates & caches name)
create_feature_branch(42, ticket["title"])
# System creates: feature/42-add-login
# System caches: chat_id → feature/42-add-login
# Make changes (NO branch parameter - auto-detected from chat_id)
apply_diff(
path="src/auth.py",
diff=\"\"\"--- a/src/auth.py
+++ b/src/auth.py
@@ -10,3 +10,7 @@ class Auth:
+ def login(self, user: str) -> bool:
+ return True
\"\"\",
message="feat(auth): add login method to Auth class"
)
# System uses cached branch: feature/42-add-login
# Create PR (auto-detects current branch)
create_pull_request(
title="feat(auth): add login method",
body="Implements login functionality as specified in #42"
)
# System uses cached branch: feature/42-add-login
```
## Current Session Status
**Working Branch:** `{working_branch}`
**Session Cached:** {f"Yes - chat_id: {__chat_id__}" if __chat_id__ and self._get_cached_data(__chat_id__, "working_branch") else "No - create a feature branch first"}
## Tips
- Use `get_branch_status()` to verify current working branch
- Always use `__chat_id__` parameter in function calls
- LLM should focus on code/content, not infrastructure
- Use diff-based updates for incremental changes
- Large changes should be split into multiple commits
"""
return output
async def read_ticket(
self,
issue_number: int,
repo: Optional[str] = None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
Read and parse a ticket/issue to understand requirements.
Args:
issue_number: The issue/ticket number to read
repo: Repository in 'owner/repo' format
Returns:
Formatted ticket summary with parsed requirements
"""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured in UserValves settings."
try:
owner, repo_name = self._resolve_repo(repo, __user__)
except ValueError as e:
return f"Error: {e}"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Fetching issue #{issue_number}...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}/issues/{issue_number}"),
headers=self._headers(__user__),
)
response.raise_for_status()
issue = response.json()
title = issue.get("title", "No title")
body = issue.get("body", "")
state = issue.get("state", "unknown")
user = issue.get("user", {}).get("login", "unknown")
labels = [label.get("name", "") for label in issue.get("labels", [])]
created_at = issue.get("created_at", "")[:10]
html_url = issue.get("html_url", "")
# Parse body for structured info
testing_criteria = []
technical_notes = []
is_testing_required = False
is_docs_required = False
body_lower = body.lower()
if "test" in body_lower or "testing" in body_lower:
is_testing_required = True
# Try to extract testing criteria
testing_section = re.search(
r"(?:testing|criteria|test).*?:(.*?)(?:\n\n|$)",
body,
re.IGNORECASE | re.DOTALL,
)
if testing_section:
testing_criteria = [
line.strip().lstrip("-*•")
for line in testing_section.group(1).split("\n")
if line.strip()
]
if "documentation" in body_lower or "docs" in body_lower:
is_docs_required = True
# Check for technical notes section
tech_section = re.search(
r"(?:technical|tech).*?:(.*?)(?:\n\n|$)",
body,
re.IGNORECASE | re.DOTALL,
)
if tech_section:
technical_notes = [
line.strip().lstrip("-*•")
for line in tech_section.group(1).split("\n")
if line.strip()
]
# Extract issue references
issue_refs = self._parse_issue_refs(body)
if not any(ref == f"#{issue_number}" for ref in issue_refs):
issue_refs.insert(0, f"#{issue_number}")
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
output = f"# 📋 Ticket #{issue_number}: {title}\n\n"
output += f"**State:** {state.upper()} | **Author:** @{user} | **Created:** {created_at}\n"
output += f"**Labels:** {', '.join(labels) if labels else 'None'}\n"
output += f"**URL:** {html_url}\n\n"
if body:
output += "## 📝 Description\n\n"
# Truncate very long descriptions
if len(body) > 1000:
output += f"{body[:1000]}...\n\n"
output += "_Description truncated. Use `get_issue()` for full content._\n\n"
else:
output += f"{body}\n\n"
else:
output += "_No description provided._\n\n"
# Testing requirements
output += "## 🧪 Testing Requirements\n\n"
if is_testing_required:
if testing_criteria:
output += "**Testing Criteria:**\n"
for criterion in testing_criteria:
output += f"- [ ] {criterion}\n"
output += "\n"
else:
output += "Testing required, but no specific criteria listed.\n\n"
else:
output += "No explicit testing requirements detected.\n\n"
# Technical notes
if technical_notes:
output += "## 🔧 Technical Notes\n\n"
for note in technical_notes:
output += f"- {note}\n"
output += "\n"
# Documentation check
if is_docs_required:
output += "## 📚 Documentation Required\n\n"
output += "This ticket mentions documentation needs.\n\n"
# Issue references
if issue_refs:
output += "## 🔗 Related Issues\n\n"
for ref in issue_refs:
output += f"- {ref}\n"
output += "\n"
# Suggested branch name
suggested = self._generate_branch_name(issue_number, title)
output += "## 🌿 Suggested Branch Name\n\n"
output += f"```\n{suggested}\n```\n\n"
# Next steps
output += "## 🚀 Next Steps\n\n"
output += f"1. Create branch: `create_feature_branch({issue_number}, \"{title[:50]}...\")`\n"
output += " - System will generate & cache branch name\n"
output += "2. Make changes using `apply_diff()` or `commit_changes()`\n"
output += " - Branch automatically detected from chat_id\n"
output += "3. Create PR: `create_pull_request()`\n"
return output.strip()
except httpx.HTTPStatusError as e:
error_msg = self._format_error(e, f"issue #{issue_number}")
if e.response.status_code == 404:
return f"Error: Issue #{issue_number} not found in {owner}/{repo_name}."
return f"Error: Failed to fetch issue. {error_msg}"
except Exception as e:
return f"Error: Unexpected failure during issue fetch: {type(e).__name__}: {e}"
async def create_feature_branch(
self,
issue_number: int,
title: Optional[str] = None,
repo: Optional[str] = None,
scope: str = "feature",
__user__: dict = None,
__chat_id__: str = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
Create a new feature branch for a ticket.
CRITICAL: This is where branch names are generated and cached.
After this call, ALL operations use the cached branch automatically.
This method:
1. Generates branch name from issue_number + title (system-managed)
2. Validates against allowed scopes
3. Prevents commits to protected branches
4. Creates the branch from the default/base branch
5. Caches the branch name for the session (chat_id → working_branch)
Args:
issue_number: The ticket/issue number
title: Optional title (will fetch from issue if not provided)
repo: Repository in 'owner/repo' format
scope: Branch scope (feature, fix, refactor, etc.)
__chat_id__: Session ID for caching (REQUIRED for branch operations)
__user__: User context
Returns:
Branch creation confirmation with cached branch info
"""
# Validate that chat_id is provided
if not __chat_id__:
return """❌ **Chat ID Required**
To create a feature branch, you must provide the __chat_id__ parameter.
This is required for the system to:
1. Cache the branch name for the session
2. Automatically detect the branch for all future operations
Example: create_feature_branch(42, title, __chat_id__="session123")
"""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured in UserValves settings."
try:
owner, repo_name = self._resolve_repo(repo, __user__)
except ValueError as e:
return f"Error: {e}"
# Fetch title from issue if not provided
if not title:
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
issue_response = await client.get(
self._api_url(
f"/repos/{owner}/{repo_name}/issues/{issue_number}"
),
headers=self._headers(__user__),
)
issue_response.raise_for_status()
issue = issue_response.json()
title = issue.get("title", "")
except Exception as e:
return f"Error: Could not fetch issue #{issue_number}: {e}"
# Generate branch name (SYSTEM-MANAGED)
branch_name = self._generate_branch_name(issue_number, title, scope)
# Validate branch name
is_valid, error_msg = self._validate_branch_name(branch_name)
if not is_valid:
return f"❌ **Branch Validation Failed**\n\n{error_msg}"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Creating branch {branch_name}...",
"done": False,
},
}
)
# Get base branch (with caching)
base_branch = self._get_branch(None, __user__)
cached_base = self._get_cached_data(__chat_id__, "default_branch")
if cached_base:
base_branch = cached_base
else:
self._set_cached_data(__chat_id__, "default_branch", base_branch)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.post(
self._api_url(f"/repos/{owner}/{repo_name}/branches"),
headers=self._headers(__user__),
json={
"new_branch_name": branch_name,
"old_branch_name": base_branch,
},
)
# Handle branch already exists
if response.status_code == 409:
# Still cache it even if it exists
self._set_cached_data(__chat_id__, "working_branch", branch_name)
return f"⚠️ **Branch Already Exists (Cached)**\n\nBranch `{branch_name}` already exists in `{owner}/{repo_name}`.\n\n**Branch has been cached for this session.**\nAll future operations will automatically use this branch.\n\nNext steps:\n1. Make changes using `apply_diff()` or `commit_changes()`\n2. Commit with descriptive messages\n3. Create PR when ready: `create_pull_request()`\n"
response.raise_for_status()
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
# Cache the branch for session (CRITICAL!)
self._set_cached_data(__chat_id__, "working_branch", branch_name)
return f"""✅ **Feature Branch Created & Cached Successfully**
**Branch:** `{branch_name}`
**Base Branch:** `{base_branch}`
**Repository:** `{owner}/{repo_name}`
**Issue:** #{issue_number}
**Session Cached:** ✅
- chat_id: `{__chat_id__}`
- working_branch: `{branch_name}`
**What This Means:**
✅ All future operations will automatically use this branch
✅ LLM does NOT need to manage branch names
✅ Just make changes - system handles the rest
**Next Steps:**
1. Make changes to files using `apply_diff()` or `commit_changes()`
2. Branch automatically detected from chat_id: `{__chat_id__}`
3. Create PR when ready: `create_pull_request()`
**Branch Naming Convention:**
- Format: `<scope>/<issue-id>-<description>`
- Scopes: feature, fix, refactor, docs, test, chore, wip
- Examples:
- `feature/42-add-user-authentication`
- `fix/37-fix-memory-leak`
- `refactor/15-cleanup-api-code`
"""
except httpx.HTTPStatusError as e:
error_msg = self._format_error(e, "branch creation")
if e.response.status_code == 409:
# Cache it even if exists
self._set_cached_data(__chat_id__, "working_branch", branch_name)
return f"⚠️ **Branch Already Exists (Cached)**\n\nBranch `{branch_name}` already exists.\n\n**Cached for session.** All operations will use this branch."
return f"Error: Failed to create branch. {error_msg}"
except Exception as e:
return f"Error: Unexpected failure during branch creation: {type(e).__name__}: {e}"
async def get_branch_status(
self,
repo: Optional[str] = None,
__user__: dict = None,
__chat_id__: str = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
Get the current working branch status for the session.
CRITICAL: Shows the system-cached working branch.
Args:
repo: Repository in 'owner/repo' format
__chat_id__: Session ID for cache lookup (REQUIRED)
__user__: User context
Returns:
Current branch and cached info
"""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured."
try:
owner, repo_name = self._resolve_repo(repo, __user__)
except ValueError as e:
return f"Error: {e}"
# Get cached branches
working_branch = None
default_branch = None
if __chat_id__:
working_branch = self._get_cached_data(__chat_id__, "working_branch")
default_branch = self._get_cached_data(__chat_id__, "default_branch")
if not default_branch:
default_branch = self._get_branch(None, __user__)
output = f"# 🌿 Branch Status: {owner}/{repo_name}\n\n"
output += f"**Default Branch:** `{default_branch}`\n"
if __chat_id__:
output += f"**Session ID:** `{__chat_id__}`\n\n"
else:
output += "\n"
if working_branch:
output += f"## ✅ Working Branch Cached\n\n"
output += f"**Branch:** `{working_branch}`\n\n"
output += "**All file operations will automatically use this branch.**\n"
output += f"LLM should NOT pass branch names to functions.\n\n"
# Try to verify branch exists
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
branch_response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}/branches/{working_branch}"),
headers=self._headers(__user__),
)
if branch_response.status_code == 200:
output += "**Status:** ✅ Branch exists and is ready for commits\n"
else:
output += "**Status:** ⚠️ Branch may have been deleted\n"
except Exception:
output += "**Status:** Unable to verify (check repository)\n"
else:
output += "## ❌ No Working Branch\n\n"
output += "**No branch cached for this session.**\n\n"
output += "To create a branch:\n"
output += f"```python\ncreate_feature_branch(\n issue_number=42,\n title=\"Add login feature\",\n __chat_id__=\"{__chat_id__ or 'your_chat_id'}\"\n)\n```\n\n"
output += "**This will:**\n"
output += "1. Generate a branch name (system-managed)\n"
output += "2. Create the branch\n"
output += "3. Cache it for all future operations\n"
return output
async def list_my_branches(
self,
repo: Optional[str] = None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
List all branches in the repository (filtered view).
Args:
repo: Repository in 'owner/repo' format
__user__: User context
Returns:
Formatted list of branches
"""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured."
try:
owner, repo_name = self._resolve_repo(repo, __user__)
except ValueError as e:
return f"Error: {e}"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Fetching branches...", "done": False},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}/branches"),
headers=self._headers(__user__),
params={"limit": 50},
)
response.raise_for_status()
branches = response.json()
output = f"# 🌿 Branches in {owner}/{repo_name}\n\n"
# Separate protected and feature branches
protected = [b for b in branches if b.get("protected")]
feature = [
b
for b in branches
if not b.get("protected")
and any(b["name"].startswith(f"{s}/") for s in self.valves.ALLOWED_SCOPES)
]
other = [
b
for b in branches
if not b.get("protected")
and not any(b["name"].startswith(f"{s}/") for s in self.valves.ALLOWED_SCOPES)
]
if protected:
output += "## 🛡️ Protected Branches\n\n"
for branch in sorted(protected, key=lambda x: x["name"]):
name = branch.get("name", "")
commit_sha = branch.get("commit", {}).get("id", "")[:8]
output += f"- `{name}` [commit: {commit_sha}]\n"
output += "\n"
if feature:
output += "## 📦 Feature Branches\n\n"
for branch in sorted(feature, key=lambda x: x["name"]):
name = branch.get("name", "")
commit_sha = branch.get("commit", {}).get("id", "")[:8]
output += f"- `{name}` [commit: {commit_sha}]\n"
output += "\n"
if other:
output += "## 📄 Other Branches\n\n"
for branch in sorted(other, key=lambda x: x["name"])[:20]:
name = branch.get("name", "")
commit_sha = branch.get("commit", {}).get("id", "")[:8]
output += f"- `{name}` [commit: {commit_sha}]\n"
if len(other) > 20:
output += f"\n... and {len(other) - 20} more branches\n"
output += "\n"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
return output.strip()
except httpx.HTTPStatusError as e:
error_msg = self._format_error(e, "branch listing")
return f"Error: Failed to list branches. {error_msg}"
except Exception as e:
return f"Error: Unexpected failure: {type(e).__name__}: {e}"
async def apply_diff(
self,
path: str,
diff_content: str,
message: Optional[str] = None,
repo: Optional[str] = None,
auto_message: bool = True,
__user__: dict = None,
__chat_id__: str = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
Apply a unified diff patch to a file.
CRITICAL: Branch automatically detected from chat_id. LLM should NOT pass branch.
This is the PREFERRED method for making changes as it:
1. Is precise about what changes
2. Prevents accidental file replacements
3. Is what LLMs understand best (trained on GitHub PRs)
Args:
path: File path to update
diff_content: Unified diff in standard format
message: Commit message (auto-generated if not provided)
repo: Repository in 'owner/repo' format
auto_message: Generate commit message if not provided
__user__: User context
__chat_id__: Session ID for branch detection (REQUIRED)
__event_emitter__: Event emitter callback
Returns:
Commit details and diff summary
"""
# Get working branch from session cache (SYSTEM-MANAGED)
effective_branch = self._get_working_branch(__chat_id__, __user__)
if not effective_branch:
return """❌ **No Working Branch**
No branch cached for this session. Create a feature branch first:
```python
create_feature_branch(
issue_number=42,
title="Feature title",
__chat_id__="your_chat_id"
)
```
"""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured in UserValves settings."
try:
owner, repo_name = self._resolve_repo(repo, __user__)
except ValueError as e:
return f"Error: {e}"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Applying diff to {path}...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
# Get current file content
get_response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._headers(__user__),
params={"ref": effective_branch},
)
# Check if file exists
if get_response.status_code == 404:
return f"Error: File not found: `{path}`. Use `create_file()` to create a new file."
get_response.raise_for_status()
file_info = get_response.json()
current_sha = file_info.get("sha")
# Decode current content
current_content_b64 = file_info.get("content", "")
try:
current_content = base64.b64decode(current_content_b64).decode(
"utf-8"
)
except Exception:
return "Error: Could not decode current file content."
# Parse and apply the diff
new_content = self._apply_unified_diff(current_content, diff_content)
if new_content is None:
return "Error: Failed to parse or apply diff. Check the diff format."
# Generate commit message if needed
if not message:
if auto_message:
message = self._generate_diff_commit_message(path, diff_content)
else:
return "Error: Commit message is required when auto_message=False."
# Commit the changes
new_content_b64 = base64.b64encode(new_content.encode("utf-8")).decode(
"ascii"
)
response = await client.put(
self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._headers(__user__),
json={
"content": new_content_b64,
"message": message,
"branch": effective_branch,
"sha": current_sha,
},
)
response.raise_for_status()
result = response.json()
commit_sha = result.get("commit", {}).get("sha", "unknown")[:8]
# Parse diff stats
added_lines = diff_content.count("+") - diff_content.count("+++")
removed_lines = diff_content.count("-") - diff_content.count("---")
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
output = f"✅ **Diff Applied Successfully**\n\n"
output += f"**File:** `{path}`\n"
output += f"**Branch:** `{effective_branch}` (auto-detected from chat_id)\n"
output += f"**Commit:** `{commit_sha}`\n"
output += f"**Changes:** +{added_lines} lines, -{removed_lines} lines\n\n"
output += f"**Message:** {message}\n"
return output
except httpx.HTTPStatusError as e:
error_msg = self._format_error(e, f"diff application to '{path}'")
if e.response.status_code == 409:
return f"Error: Update conflict for `{path}`. Fetch the latest version and try again."
return f"Error: Failed to apply diff. {error_msg}"
except Exception as e:
return f"Error: Unexpected failure during diff application: {type(e).__name__}: {e}"
def _apply_unified_diff(
self, current_content: str, diff_content: str
) -> Optional[str]:
"""
Apply a unified diff to content.
Args:
current_content: Current file content
diff_content: Unified diff patch
Returns:
New content after applying diff, or None if failed
"""
try:
# Parse the diff
diff_lines = diff_content.splitlines(keepends=True)
# Parse hunks from unified diff
hunks = []
current_hunk = None
in_hunk = False
for line in diff_lines:
if line.startswith("---"):
continue # Skip old filename
elif line.startswith("+++"):
continue # Skip new filename
elif line.startswith("@@"):
# New hunk starts
if current_hunk:
hunks.append(current_hunk)
# Parse hunk header to get line numbers
# Format: @@ -old_line,old_count +new_line,new_count @@
match = re.search(
r"@@\s+-(\d+)(?:,(\d+))?\s+\+(\d+)(?:,(\d+))?\s+@@", line
)
if match:
old_start = int(match.group(1))
new_start = int(match.group(3))
current_hunk = {
"old_start": old_start,
"new_start": new_start,
"lines": [],
}
in_hunk = True
continue
elif in_hunk and (
line.startswith("+") or line.startswith("-") or line.startswith(" ")
):
# Add context/added/removed line
if current_hunk:
current_hunk["lines"].append(line)
elif in_hunk and not line.startswith(
"+"
) and not line.startswith("-") and not line.startswith(" "):
# End of hunk
if current_hunk:
hunks.append(current_hunk)
current_hunk = None
in_hunk = False
if current_hunk:
hunks.append(current_hunk)
# If no hunks, return unchanged
if not hunks:
return current_content
# Split content into lines
old_lines = current_content.splitlines(keepends=True)
# Use difflib to apply the patch
old_lines_stripped = [line.rstrip("\n") for line in old_lines]
# Create a list for the new content
new_lines_stripped = list(old_lines_stripped)
# Apply each hunk in reverse order (to maintain correct indices)
for hunk in sorted(hunks, key=lambda h: h["old_start"], reverse=True):
old_start = hunk["old_start"] - 1 # Convert to 0-indexed
# Collect lines to remove and add
lines_to_remove = []
lines_to_add = []
for line in hunk["lines"]:
if line.startswith("+"):
lines_to_add.append(line[1:].rstrip("\n"))
elif line.startswith("-"):
lines_to_remove.append(line[1:].rstrip("\n"))
# Remove old lines and add new ones
# First, find and remove the exact lines specified
if lines_to_remove:
# Look for the first removed line at old_start
found_start = False
for i in range(old_start, min(old_start + len(lines_to_remove), len(new_lines_stripped))):
if i < len(new_lines_stripped) and new_lines_stripped[i] == lines_to_remove[0]:
# Found the start, remove the lines
del new_lines_stripped[i : i + len(lines_to_remove)]
found_start = True
break
if not found_start:
# Fallback: just insert at old_start
pass
# Insert new lines at old_start
for line in reversed(lines_to_add):
new_lines_stripped.insert(old_start, line)
# Reconstruct content with line endings
new_content = "".join(
line + ("\n" if not line.endswith("\n") and i < len(new_lines_stripped) - 1 else "")
for i, line in enumerate(new_lines_stripped)
)
# Ensure proper line endings
if new_content and not new_content.endswith("\n"):
new_content += "\n"
return new_content
except Exception as e:
# Log the error but don't fail
print(f"Diff application warning: {e}")
return None
def _generate_diff_commit_message(self, path: str, diff_content: str) -> str:
"""
Generate a commit message from diff content.
Args:
path: File path
diff_content: Unified diff
Returns:
Generated commit message
"""
# Extract file name from path
file_name = path.split("/")[-1]
# Detect change type from diff
change_type = "chore"
if any(
line.startswith("+def ") or line.startswith("+class ")
for line in diff_content.splitlines()
):
change_type = "feat"
elif any(
line.startswith("+ return ") or line.startswith("+ return ")
for line in diff_content.splitlines()
):
change_type = "fix"
elif "test" in path.lower() or "spec" in path.lower():
change_type = "test"
elif ".md" in path.lower() or "readme" in path.lower():
change_type = "docs"
elif any(line.startswith("-") for line in diff_content.splitlines()):
change_type = "refactor"
# Generate message
message = f"{change_type}({file_name}): "
# Extract a short description from added lines
added_lines = [
line[1:].strip()
for line in diff_content.splitlines()
if line.startswith("+") and not line.startswith("+++")
]
if added_lines:
# Use first meaningful added line
description = ""
for line in added_lines:
if line and not line.startswith("import ") and not line.startswith("from "):
# Get function/class definition or first statement
match = re.match(
r"(def|class|const|var|let|interface|type)\s+(\w+)", line
)
if match:
kind = match.group(1)
name = match.group(2)
if kind == "def":
description = f"add {name}() function"
break
elif kind == "class":
description = f"add {name} class"
break
elif line.startswith(" ") or line.startswith("\t"):
# Indented line, skip
continue
else:
# Use as description
description = line[:50].rstrip(":")
if len(line) > 50:
description += "..."
break
if not description:
# Fallback to line count
added_count = len(added_lines)
description = f"update ({added_count} lines added)"
message += description
return message
async def commit_changes(
self,
path: str,
content: str,
message: Optional[str] = None,
repo: Optional[str] = None,
max_delta_percent: Optional[float] = None,
__user__: dict = None,
__chat_id__: str = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
Commit file changes with automatic content detection and size delta gating.
CRITICAL: Branch automatically detected from chat_id. LLM should NOT pass branch.
This method:
1. Detects whether to create or replace a file
2. Validates file size changes against threshold (quality gate)
3. Auto-generates commit message if not provided
4. Commits to the system-cached working branch
Args:
path: File path to create or update
content: New file content
message: Commit message (auto-generated if not provided)
repo: Repository in 'owner/repo' format
max_delta_percent: Override for size delta threshold (quality gate)
__user__: User context
__chat_id__: Session ID for branch detection (REQUIRED)
__event_emitter__: Event emitter for progress
Returns:
Commit details or error with guidance
"""
# Get working branch from session cache (SYSTEM-MANAGED)
effective_branch = self._get_working_branch(__chat_id__, __user__)
if not effective_branch:
return """❌ **No Working Branch**
No branch cached for this session. Create a feature branch first:
```python
create_feature_branch(
issue_number=42,
title="Feature title",
__chat_id__="your_chat_id"
)
```
"""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured in UserValves settings."
try:
owner, repo_name = self._resolve_repo(repo, __user__)
except ValueError as e:
return f"Error: {e}"
# Use provided threshold or default from valves
delta_threshold = max_delta_percent or self.valves.MAX_SIZE_DELTA_PERCENT
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Processing {path}...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
# Check if file exists
get_response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._headers(__user__),
params={"ref": effective_branch},
)
file_exists = get_response.status_code == 200
current_sha = None
current_size = 0
if file_exists:
get_response.raise_for_status()
file_info = get_response.json()
current_sha = file_info.get("sha")
current_size = file_info.get("size", 0)
# SIZE DELTA GATE - Quality check
new_size = len(content.encode("utf-8"))
if current_size > 0 and new_size > 0:
delta_percent = abs(new_size - current_size) / current_size * 100
if delta_percent > delta_threshold:
# Calculate actual bytes changed
size_diff = new_size - current_size
direction = "larger" if size_diff > 0 else "smaller"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": "Size gate triggered",
"done": True,
"hidden": True,
},
}
)
return f"""⚠️ **Quality Gate: Large File Change Detected**
**File:** `{path}`
**Current Size:** {current_size} bytes
**New Size:** {new_size} bytes
**Change:** {size_diff:+d} bytes ({delta_percent:.1f}% {direction})
**Threshold:** {delta_threshold}%
This change exceeds the size delta threshold, which may indicate:
- Accidental full file replacement
- Unintended data loss
- LLM confusion about existing content
**Recommended Actions:**
1. **Use diff-based updates** (preferred):
```python
apply_diff(
path="{path}",
diff=\"\"\"--- a/{path}
+++ b/{path}
@@ -1,3 +1,5 @@
existing line
+new line to add
-line to remove
\"\"\",
message="feat(scope): description of changes",
__chat_id__="your_chat_id"
)
```
2. **Fetch and review current content**:
```python
current = get_file("{path}", __chat_id__="your_chat_id")
# Compare with what you want to change
```
3. **Commit with override** (not recommended):
Increase the threshold if this is intentional:
```python
commit_changes(..., max_delta_percent=100, __chat_id__="your_chat_id")
```
**Why this gate exists:**
Large file replacements by LLMs often indicate the model didn't properly understand the existing file structure. Using diffs ensures precise, targeted changes.
"""
# Generate commit message if not provided
if not message:
message = self._generate_commit_message(
change_type="chore",
scope=path.split("/")[-1] if "/" in path else path,
description=f"update {path}",
)
# Prepare content
content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii")
if file_exists:
# Replace existing file
if not current_sha:
return f"Error: Could not retrieve SHA for existing file: {path}"
response = await client.put(
self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._headers(__user__),
json={
"content": content_b64,
"message": message,
"branch": effective_branch,
"sha": current_sha,
},
)
else:
# Create new file
response = await client.post(
self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._headers(__user__),
json={
"content": content_b64,
"message": message,
"branch": effective_branch,
},
)
response.raise_for_status()
result = response.json()
commit_sha = result.get("commit", {}).get("sha", "unknown")[:8]
action = "Updated" if file_exists else "Created"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
# Calculate and show size change
new_size = len(content.encode("utf-8"))
size_info = ""
if file_exists and current_size > 0:
delta = new_size - current_size
delta_percent = delta / current_size * 100
size_info = f"**Size Change:** {delta:+d} bytes ({delta_percent:.1f}%)\n"
output = f"""✅ **{action} File Successfully**
**File:** `{path}`
**Branch:** `{effective_branch}` (auto-detected from chat_id)
**Commit:** `{commit_sha}`
**Message:** {message}
{size_info}**Action:** {action}
"""
return output
except httpx.HTTPStatusError as e:
error_msg = self._format_error(e, f"file commit for '{path}'")
if e.response.status_code == 409:
return f"Error: Update conflict for `{path}`. Fetch the latest version and try again."
return f"Error: Failed to commit file. {error_msg}"
except Exception as e:
return f"Error: Unexpected failure during commit: {type(e).__name__}: {e}"
async def create_pull_request(
self,
title: str,
body: Optional[str] = "",
repo: Optional[str] = None,
__user__: dict = None,
__chat_id__: str = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
Create a pull request from the current branch.
CRITICAL: Branch automatically detected from chat_id. LLM should NOT pass branch.
Args:
title: PR title
body: PR description (auto-populates from issue if linked)
repo: Repository in 'owner/repo' format
__user__: User context
__chat_id__: Session ID for branch detection (REQUIRED)
__event_emitter__: Event emitter for progress
Returns:
PR creation confirmation with details
"""
# Get working branch from session cache (SYSTEM-MANAGED)
head_branch = self._get_working_branch(__chat_id__, __user__)
if not head_branch:
return """❌ **No Working Branch**
No branch cached for this session. Create a feature branch first:
```python
create_feature_branch(
issue_number=42,
title="Feature title",
__chat_id__="your_chat_id"
)
```
"""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured."
try:
owner, repo_name = self._resolve_repo(repo, __user__)
except ValueError as e:
return f"Error: {e}"
base_branch = self._get_branch(None, __user__)
# Validate that head branch is not a protected branch
is_valid, error_msg = self._validate_branch_name(head_branch)
if not is_valid:
return f"❌ **Cannot Create PR**\n\n{error_msg}\n\nCreate a feature branch first using `create_feature_branch()`."
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Creating PR...", "done": False},
}
)
try:
# Auto-populate body with issue reference if not provided
if not body:
# Try to extract issue number from branch name
match = re.search(r"/(\d+)-", head_branch)
if match:
issue_number = match.group(1)
body = f"Closes #{issue_number}\n\nThis PR implements the changes from issue #{issue_number}."
else:
body = "Automated PR from gitea_coder workflow."
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.post(
self._api_url(f"/repos/{owner}/{repo_name}/pulls"),
headers=self._headers(__user__),
json={
"title": title,
"head": head_branch,
"base": base_branch,
"body": body,
},
)
# Handle PR already exists
if response.status_code == 409:
return f"⚠️ **PR Already Exists**\n\nA pull request for branch `{head_branch}` → `{base_branch}` already exists.\n\nCheck existing PRs and update it instead."
response.raise_for_status()
pr = response.json()
pr_number = pr.get("number")
pr_url = pr.get("html_url", "")
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
return f"""✅ **Pull Request Created Successfully**
**PR #{pr_number}:** {title}
**Branch:** `{head_branch}` → `{base_branch}` (auto-detected from chat_id)
**URL:** {pr_url}
**Description:**
{body}
**Next Steps:**
1. Add reviewers if needed
2. Address any merge conflicts
3. Await review feedback
**To check PR status:**
```python
get_pull_request({pr_number})
```
"""
except httpx.HTTPStatusError as e:
error_msg = self._format_error(e, "PR creation")
if e.response.status_code == 422:
return "Error: Could not create PR. The branch may not exist or there may be merge conflicts."
return f"Error: Failed to create PR. {error_msg}"
except Exception as e:
return f"Error: Unexpected failure during PR creation: {type(e).__name__}: {e}"
async def replace_file(
self,
path: str,
content: str,
message: str,
repo: Optional[str] = None,
__user__: dict = None,
__chat_id__: str = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
Update an existing file in the repository (creates commit).
CRITICAL: Branch automatically detected from chat_id. LLM should NOT pass branch.
WARNING: This replaces the entire file content. For incremental changes,
use `apply_diff()` instead to prevent accidental data loss.
Args:
path: File path to update
content: New file content as string
message: Commit message
repo: Repository in 'owner/repo' format
__user__: User context
__chat_id__: Session ID for branch detection (REQUIRED)
__event_emitter__: Event emitter for progress
Returns:
Commit details and success confirmation
"""
# Get working branch from session cache (SYSTEM-MANAGED)
effective_branch = self._get_working_branch(__chat_id__, __user__)
if not effective_branch:
return """❌ **No Working Branch**
No branch cached for this session. Create a feature branch first.
"""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured in UserValves settings."
try:
owner, repo_name = self._resolve_repo(repo, __user__)
except ValueError as e:
return f"Error: {e}"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": f"Updating {path}...", "done": False},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
# Get current file SHA
get_response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._headers(__user__),
params={"ref": effective_branch},
)
if get_response.status_code == 404:
return f"Error: File not found: `{path}`. Use `create_file()` to create a new file, or `apply_diff()` to add content to a new file."
get_response.raise_for_status()
file_info = get_response.json()
sha = file_info.get("sha")
if not sha:
return "Error: Could not retrieve file SHA for update."
# Prepare updated content
content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii")
# Update file
response = await client.put(
self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._headers(__user__),
json={
"content": content_b64,
"message": message,
"branch": effective_branch,
"sha": sha,
},
)
response.raise_for_status()
result = response.json()
commit_sha = result.get("commit", {}).get("sha", "unknown")[:8]
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
output = f"**File Updated Successfully**\n\n"
output += f"**File:** `{owner}/{repo_name}/{path}`\n"
output += f"**Branch:** `{effective_branch}` (auto-detected from chat_id)\n"
output += f"**Commit:** `{commit_sha}`\n"
output += f"**Message:** {message}\n\n"
output += "_Use `apply_diff()` for incremental changes to prevent data loss._\n"
return output
except httpx.HTTPStatusError as e:
error_msg = self._format_error(e, f"file update for '{path}'")
if e.response.status_code == 409:
return f"Error: Update conflict for `{path}`. Fetch the latest version and try again."
return f"Error: Failed to update file. {error_msg}"
except Exception as e:
return f"Error: Unexpected failure during file update: {type(e).__name__}: {e}"
async def create_file(
self,
path: str,
content: str,
message: str,
repo: Optional[str] = None,
__user__: dict = None,
__chat_id__: str = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
Create a new file in the repository.
CRITICAL: Branch automatically detected from chat_id. LLM should NOT pass branch.
For adding content to existing files, use `apply_diff()` instead.
Args:
path: File path to create (e.g., 'docs/README.md')
content: Initial file content as string
message: Commit message
repo: Repository in 'owner/repo' format
__user__: User context
__chat_id__: Session ID for branch detection (REQUIRED)
__event_emitter__: Event emitter for progress
Returns:
Commit details and success confirmation
"""
# Get working branch from session cache (SYSTEM-MANAGED)
effective_branch = self._get_working_branch(__chat_id__, __user__)
if not effective_branch:
return """❌ **No Working Branch**
No branch cached for this session. Create a feature branch first.
"""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured in UserValves settings."
try:
owner, repo_name = self._resolve_repo(repo, __user__)
except ValueError as e:
return f"Error: {e}"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": f"Creating {path}...", "done": False},
}
)
try:
content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii")
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.post(
self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._headers(__user__),
json={
"content": content_b64,
"message": message,
"branch": effective_branch,
},
)
response.raise_for_status()
result = response.json()
commit_sha = result.get("commit", {}).get("sha", "unknown")[:8]
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
output = f"**File Created Successfully**\n\n"
output += f"**File:** `{owner}/{repo_name}/{path}`\n"
output += f"**Branch:** `{effective_branch}` (auto-detected from chat_id)\n"
output += f"**Commit:** `{commit_sha}`\n"
output += f"**Message:** {message}\n"
return output
except httpx.HTTPStatusError as e:
error_msg = self._format_error(e, f"file creation for '{path}'")
if e.response.status_code == 422:
return f"Error: File already exists: `{path}`. Use `replace_file()` or `apply_diff()` to modify it."
return f"Error: Failed to create file. {error_msg}"
except Exception as e:
return f"Error: Unexpected failure during file creation: {type(e).__name__}: {e}"
async def delete_file(
self,
path: str,
message: str,
repo: Optional[str] = None,
__user__: dict = None,
__chat_id__: str = None,
__event_emitter__: Callable[[dict], Any] = None,
__event_call__: Callable[[dict], Any] = None,
) -> str:
"""
Delete a file from the repository.
CRITICAL: Branch automatically detected from chat_id. LLM should NOT pass branch.
Args:
path: File path to delete
message: Commit message for the deletion
repo: Repository in 'owner/repo' format
__user__: User context
__chat_id__: Session ID for branch detection (REQUIRED)
__event_emitter__: Event emitter for progress
__event_call__: Confirmation callback (for user confirmation)
Returns:
Confirmation with commit details
"""
# Get working branch from session cache (SYSTEM-MANAGED)
effective_branch = self._get_working_branch(__chat_id__, __user__)
if not effective_branch:
return """❌ **No Working Branch**
No branch cached for this session. Create a feature branch first.
"""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured in UserValves settings."
try:
owner, repo_name = self._resolve_repo(repo, __user__)
except ValueError as e:
return f"Error: {e}"
# Confirmation dialog
if __event_call__:
result = await __event_call__(
{
"type": "confirmation",
"data": {
"title": "Confirm File Deletion",
"message": f"Delete `{path}` from `{owner}/{repo_name}` ({effective_branch})?",
},
}
)
if result is None or result is False:
return "⚠️ File deletion cancelled by user."
if isinstance(result, dict) and not result.get("confirmed"):
return "⚠️ File deletion cancelled by user."
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": f"Deleting {path}...", "done": False},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
# Get file SHA
get_response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._headers(__user__),
params={"ref": effective_branch},
)
if get_response.status_code == 404:
return f"Error: File not found: `{path}`"
get_response.raise_for_status()
file_info = get_response.json()
sha = file_info.get("sha")
if not sha:
return "Error: Could not retrieve file SHA for deletion."
# Delete file
response = await client.delete(
self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._headers(__user__),
json={
"message": message,
"branch": effective_branch,
"sha": sha,
},
)
response.raise_for_status()
result = response.json()
commit_sha = result.get("commit", {}).get("sha", "unknown")[:8]
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
output = f"**File Deleted Successfully**\n\n"
output += f"**File:** `{owner}/{repo_name}/{path}`\n"
output += f"**Branch:** `{effective_branch}` (auto-detected from chat_id)\n"
output += f"**Commit:** `{commit_sha}`\n"
output += f"**Message:** {message}\n"
return output
except httpx.HTTPStatusError as e:
error_msg = self._format_error(e, f"file deletion for '{path}'")
if e.response.status_code == 404:
return f"Error: File not found: `{path}`"
return f"Error: Failed to delete file. {error_msg}"
except Exception as e:
return f"Error: Unexpected failure during file deletion: {type(e).__name__}: {e}"
async def rename_file(
self,
old_path: str,
new_path: str,
message: Optional[str] = None,
repo: Optional[str] = None,
__user__: dict = None,
__chat_id__: str = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
Rename a file in the repository.
CRITICAL: Branch automatically detected from chat_id. LLM should NOT pass branch.
This is done by:
1. Getting the content of the old file
2. Creating a new file with the new name
3. Deleting the old file
Args:
old_path: Current file path
new_path: New file path
message: Commit message (auto-generated if not provided)
repo: Repository in 'owner/repo' format
__user__: User context
__chat_id__: Session ID for branch detection (REQUIRED)
__event_emitter__: Event emitter for progress
Returns:
Confirmation with commit details
"""
# Get working branch from session cache (SYSTEM-MANAGED)
effective_branch = self._get_working_branch(__chat_id__, __user__)
if not effective_branch:
return """❌ **No Working Branch**
No branch cached for this session. Create a feature branch first.
"""
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured in UserValves settings."
try:
owner, repo_name = self._resolve_repo(repo, __user__)
except ValueError as e:
return f"Error: {e}"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Renaming {old_path} to {new_path}...",
"done": False,
},
}
)
try:
# Generate commit message if not provided
if not message:
message = self._generate_commit_message(
change_type="chore",
scope="rename",
description=f"rename {old_path}{new_path}",
)
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
# Get old file content and SHA
get_response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}/contents/{old_path}"),
headers=self._headers(__user__),
params={"ref": effective_branch},
)
if get_response.status_code == 404:
return f"Error: File not found: `{old_path}`"
get_response.raise_for_status()
old_file = get_response.json()
old_sha = old_file.get("sha")
# Decode content
content_b64 = old_file.get("content", "")
try:
content = base64.b64decode(content_b64).decode("utf-8")
except Exception:
return "Error: Could not decode file content."
# Create new file with same content
new_content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii")
create_response = await client.post(
self._api_url(f"/repos/{owner}/{repo_name}/contents/{new_path}"),
headers=self._headers(__user__),
json={
"content": new_content_b64,
"message": message,
"branch": effective_branch,
},
)
if create_response.status_code == 422:
return f"Error: File already exists at new path: `{new_path}`"
create_response.raise_for_status()
# Delete old file
delete_response = await client.delete(
self._api_url(f"/repos/{owner}/{repo_name}/contents/{old_path}"),
headers=self._headers(__user__),
json={
"message": message,
"branch": effective_branch,
"sha": old_sha,
},
)
delete_response.raise_for_status()
commit_sha = create_response.json().get("commit", {}).get("sha", "unknown")[:8]
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
output = f"**File Renamed Successfully**\n\n"
output += f"**Old Path:** `{old_path}`\n"
output += f"**New Path:** `{new_path}`\n"
output += f"**Branch:** `{effective_branch}` (auto-detected from chat_id)\n"
output += f"**Commit:** `{commit_sha}`\n"
output += f"**Message:** {message}\n"
return output
except httpx.HTTPStatusError as e:
error_msg = self._format_error(e, f"file rename")
return f"Error: Failed to rename file. {error_msg}"
except Exception as e:
return f"Error: Unexpected failure during file rename: {type(e).__name__}: {e}"
async def get_file(
self,
path: str,
repo: Optional[str] = None,
__user__: dict = None,
__chat_id__: str = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
Get the contents of a file from the repository.
CRITICAL: Branch automatically detected from chat_id. LLM should NOT pass branch.
Args:
path: Full path to the file (e.g., 'src/main.py')
repo: Repository in 'owner/repo' format
__user__: User context
__chat_id__: Session ID for branch detection (REQUIRED)
__event_emitter__: Event emitter for progress
Returns:
File content with metadata (SHA, size, branch)
"""
# Get working branch from session cache (SYSTEM-MANAGED)
effective_branch = self._get_working_branch(__chat_id__, __user__)
if not effective_branch:
# Try to use default branch for reading
effective_branch = self._get_branch(None, __user__)
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured in UserValves settings."
try:
owner, repo_name = self._resolve_repo(repo, __user__)
except ValueError as e:
return f"Error: {e}"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Reading file {path}...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._headers(__user__),
params={"ref": effective_branch},
)
response.raise_for_status()
file_info = response.json()
if isinstance(file_info, list):
return f"**Note:** '{path}' is a directory. Use `list_files()` to browse its contents."
if file_info.get("type") != "file":
return f"Error: '{path}' is not a file (type: {file_info.get('type')})"
content_b64 = file_info.get("content", "")
try:
content = base64.b64decode(content_b64).decode("utf-8")
except Exception:
return "Error: Could not decode file content. The file may be binary or corrupted."
size = file_info.get("size", 0)
sha_short = file_info.get("sha", "unknown")[:8]
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
output = f"**File:** `{owner}/{repo_name}/{path}`\n"
output += f"**Branch:** `{effective_branch}`\n"
output += f"**SHA:** `{sha_short}` | **Size:** {size} bytes\n"
output += f"**URL:** {file_info.get('html_url', 'N/A')}\n\n"
output += f"```\n{content}\n```\n"
return output
except httpx.HTTPStatusError as e:
error_msg = self._format_error(e, f"file fetch for '{path}'")
if e.response.status_code == 404:
return f"Error: File not found: `{path}`. Verify the file path and branch."
return f"Error: Failed to fetch file. {error_msg}"
except Exception as e:
return f"Error: Unexpected failure during file fetch: {type(e).__name__}: {e}"
async def list_files(
self,
path: str = "",
repo: Optional[str] = None,
__user__: dict = None,
__chat_id__: str = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
List files and directories in a repository path.
CRITICAL: Branch automatically detected from chat_id. LLM should NOT pass branch.
Args:
path: Directory path to list (default: root)
repo: Repository in 'owner/repo' format
__user__: User context
__chat_id__: Session ID for branch detection (REQUIRED)
__event_emitter__: Event emitter for progress
Returns:
Formatted directory listing with file sizes and types
"""
# Get working branch from session cache (SYSTEM-MANAGED)
effective_branch = self._get_working_branch(__chat_id__, __user__)
if not effective_branch:
# Try to use default branch for reading
effective_branch = self._get_branch(None, __user__)
token = self._get_token(__user__)
if not token:
return "Error: GITEA_TOKEN not configured in UserValves settings."
try:
owner, repo_name = self._resolve_repo(repo, __user__)
except ValueError as e:
return f"Error: {e}"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Listing {path or 'root'}...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.get(
self._api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._headers(__user__),
params={"ref": effective_branch},
)
response.raise_for_status()
contents = response.json()
if isinstance(contents, dict):
return f"**Note:** '{path}' is a file. Use `get_file()` to read its contents."
output = f"**Contents of {owner}/{repo_name}/{path or '.'}** (`{effective_branch}`)\n\n"
dirs = [item for item in contents if item.get("type") == "dir"]
files = [item for item in contents if item.get("type") == "file"]
if dirs:
output += "**📁 Directories:**\n"
for item in sorted(dirs, key=lambda x: x.get("name", "").lower()):
output += f"- `📁 {item.get('name', '')}/`\n"
output += "\n"
if files:
output += "**📄 Files:**\n"
for item in sorted(files, key=lambda x: x.get("name", "").lower()):
size = item.get("size", 0)
if size < 1024:
size_str = f"{size}B"
elif size < 1024 * 1024:
size_str = f"{size//1024}KB"
else:
size_str = f"{size//(1024*1024)}MB"
sha_short = item.get("sha", "unknown")[:8]
output += f"- `{item.get('name', '')}` ({size_str}, sha: {sha_short})\n"
output += f"\n**Total:** {len(dirs)} directories, {len(files)} files"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
return output
except httpx.HTTPStatusError as e:
error_msg = self._format_error(e, f"directory listing for '{path}'")
if e.response.status_code == 404:
return f"Error: Path not found: `{path}`. Verify the path exists in the repository."
return f"Error: Failed to list directory contents. {error_msg}"
except Exception as e:
return f"Error: Unexpected failure during directory listing: {type(e).__name__}: {e}"