Files
tools/gitea/coder.py
2026-01-18 20:33:28 +00:00

2890 lines
104 KiB
Python

"""
title: Gitea Coder - Workflow Role with Automatic Branch Management
author: Jeff Smith + minimax + Claude
version: 1.0.0
license: MIT
description: High-level workflow role for LLM-based code generation with automatic branch management and quality gates
requirements: pydantic, httpx
changelog:
1.0.0:
- Initial implementation of gitea_coder role
- Branch name = chat_id (KISS principle)
- Stateless design - no caching complexity
- Branch creation with scope gating (prevents main pushes)
- Generates detailed commit messages with ticket references
- Creates PRs from branches
- Reads ticket requirements from issues (by number or URL)
- Updates tickets with status
- Reads PR feedback
- Unified file operations workflow
- Diff-based updates with apply_diff()
- Size delta gating in commit_changes() for quality control
- Complete CRUD operations: create_file, replace_file, delete_file, rename_file
- FIX: apply_unified_diff now properly fails when no valid hunks are parsed
- FIX: apply_diff detects no-op changes and returns failure with guidance
- FEATURE: read_ticket auto-creates branch for immediate file operations
- FEATURE: apply_diff returns patched file content for LLM review
- FEATURE: Persistent session state (repo/issue/branch/pr) survives restarts
- FEATURE: State enables cheaper models (kimi-k2, qwen3) that lose context
- FEATURE: get_session_state() and clear_session_state() for debugging
- FEATURE: list_sessions() finds orphaned work from crashed chats
- FEATURE: claim_session() continues work from dead sessions (disaster recovery)
- ENHANCEMENT: Simplified 4-step workflow (read ticket → modify → update ticket → PR)
- ENHANCEMENT: Ticket updates emphasized as "statement of work" / audit trail
- ENHANCEMENT: Review checklist for common diff errors (accidental removals, misplacements)
- ENHANCEMENT: State resolution priority: explicit arg > state file > env var > UserValves > Valves
- ENHANCEMENT: Branch resolution uses state for session continuity after claim_session
"""
from typing import Optional, Callable, Any, Dict, List, Tuple
from pydantic import BaseModel, Field
from datetime import datetime
from pathlib import Path
import re
import base64
import httpx
import json
import os
class GiteaHelpers:
"""
Helper methods for Gitea API interactions.
Designed to be mixed into Tools class via composition.
"""
def __init__(self, tools_instance):
"""Initialize with reference to parent Tools instance for valve access"""
self.tools = tools_instance
@property
def valves(self):
"""Access valves from parent Tools instance"""
return self.tools.valves
# ============= STATE PERSISTENCE =============
# Persists repo/issue/branch across restarts and helps models
# that struggle with context tracking (kimi-k2, qwen3-code)
def _state_dir(self) -> Path:
"""Get base state directory"""
base = os.environ.get(
"GITEA_CODER_STATE_DIR", os.path.expanduser("~/.gitea_coder")
)
return Path(base)
def _state_path(self, chat_id: str) -> Path:
"""Get state file path for a chat session"""
return self._state_dir() / "chats" / chat_id / "state.json"
def load_state(self, chat_id: str) -> dict:
"""Load persisted state for chat session"""
if not chat_id:
return {}
path = self._state_path(chat_id)
if path.exists():
try:
return json.loads(path.read_text())
except Exception:
return {}
return {}
def save_state(self, chat_id: str, **updates):
"""Update and persist state for chat session"""
if not chat_id:
return
try:
state = self.load_state(chat_id)
state.update(updates)
state["updated_at"] = datetime.utcnow().isoformat()
path = self._state_path(chat_id)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(state, indent=2))
except Exception:
pass # Best effort - don't fail operations on state save errors
def get_state_summary(self, chat_id: str) -> str:
"""Get human-readable state summary for debugging"""
state = self.load_state(chat_id)
if not state:
return "No persisted state"
parts = []
if state.get("repo"):
parts.append(f"repo=`{state['repo']}`")
if state.get("issue"):
parts.append(f"issue=#{state['issue']}")
if state.get("branch"):
parts.append(f"branch=`{state['branch'][:8]}...`")
return ", ".join(parts) if parts else "Empty state"
# ============= API HELPERS =============
def api_url(self, endpoint: str) -> str:
"""Construct full API URL for Gitea endpoint"""
base = self.get_url()
return f"{base}/api/v1{endpoint}"
def get_url(self) -> str:
"""Get effective Gitea URL with trailing slash handling"""
return self.valves.GITEA_URL.rstrip("/")
def get_token(self, __user__: dict = None) -> str:
"""Extract Gitea token from user context"""
if __user__ and "valves" in __user__:
user_valves = __user__.get("valves")
if user_valves:
token = getattr(user_valves, "GITEA_TOKEN", "")
if token:
return token
return ""
def headers(self, __user__: dict = None) -> dict:
"""Generate authentication headers with token"""
token = self.get_token(__user__)
headers = {"Content-Type": "application/json"}
if token:
headers["Authorization"] = f"token {token}"
return headers
def format_error(self, e: httpx.HTTPStatusError, context: str = "") -> str:
"""Format HTTP error with detailed context"""
try:
error_json = e.response.json()
error_msg = error_json.get("message", e.response.text[:200])
except Exception:
error_msg = e.response.text[:200]
context_str = f" ({context})" if context else ""
return f"HTTP Error {e.response.status_code}{context_str}: {error_msg}"
def get_repo(
self, repo: Optional[str], __user__: dict = None, __metadata__: dict = None
) -> str:
"""
Get effective repository with priority resolution:
1. Explicit arg (always wins)
2. Persistent state (learned from previous operations)
3. Environment variable GITEA_DEFAULT_REPO (k8s/cron)
4. UserValves override (interactive)
5. Admin Valves default
"""
# 1. Explicit arg always wins
if repo:
return repo
# 2. Check persistent state (helps models that lose track)
if __metadata__:
chat_id = __metadata__.get("chat_id")
if chat_id:
state = self.load_state(chat_id)
if state.get("repo"):
return state["repo"]
# 3. Environment variable (k8s/cron native)
env_repo = os.environ.get("GITEA_DEFAULT_REPO")
if env_repo:
return env_repo
# 4. UserValves override
if __user__ and "valves" in __user__:
user_valves = __user__.get("valves")
if user_valves and self.valves.ALLOW_USER_OVERRIDES:
user_repo = getattr(user_valves, "USER_DEFAULT_REPO", "")
if user_repo:
return user_repo
# 5. Admin Valves default
return self.valves.DEFAULT_REPO
def get_branch(self, __user__: dict = None, __metadata__: dict = None) -> str:
"""
Get effective branch name with priority resolution:
1. Claimed session branch (from state file - for disaster recovery)
2. chat_id from metadata (default - branch = chat)
3. UserValves override
4. Fallback to DEFAULT_BRANCH
"""
chat_id = None
if __metadata__:
chat_id = __metadata__.get("chat_id")
# Check if we've claimed another session's branch (disaster recovery)
if chat_id:
state = self.load_state(chat_id)
if state.get("branch"):
# Use branch from state - could be our own or claimed
return state["branch"]
else:
# No state yet, use chat_id as branch
return chat_id
# Then check user valves override
if __user__ and "valves" in __user__:
user_valves = __user__.get("valves")
if user_valves and self.valves.ALLOW_USER_OVERRIDES:
user_branch = getattr(user_valves, "USER_DEFAULT_BRANCH", "")
if user_branch:
return user_branch
# Finally fall back to default
return self.valves.DEFAULT_BRANCH
def resolve_repo(
self, repo: Optional[str], __user__: dict = None, __metadata__: dict = None
) -> Tuple[str, str]:
"""Resolve repository string into owner and repo name"""
effective_repo = self.get_repo(repo, __user__, __metadata__)
if not effective_repo:
raise ValueError(
"No repository specified. Set DEFAULT_REPO in Valves, USER_DEFAULT_REPO in UserValves, "
"or pass repo='owner/name' explicitly."
)
if "/" not in effective_repo:
raise ValueError(
f"Repository must be in 'owner/repo' format, got: {effective_repo}"
)
return tuple(effective_repo.split("/", 1))
def parse_issue_url(
self, url: str
) -> Tuple[Optional[str], Optional[str], Optional[int]]:
"""
Parse a Gitea issue URL into components.
Format: https://<domain>/<owner>/<repo>/issues/<number>
Returns:
Tuple of (owner, repo, issue_number) or (None, None, None) if invalid
"""
# Match: https://domain/owner/repo/issues/123
match = re.match(r"https?://[^/]+/([^/]+)/([^/]+)/issues/(\d+)", url)
if match:
owner = match.group(1)
repo = match.group(2)
issue_number = int(match.group(3))
return (owner, repo, issue_number)
return (None, None, None)
def parse_issue_refs(self, text: str) -> List[str]:
"""Extract issue references from text (e.g., #42, issue #42)"""
refs = re.findall(r"#(\d+)", text)
issue_refs = [f"#{ref}" for ref in refs]
# Also check for "issue N" pattern
issue_n_refs = re.findall(r"issue\s*#?(\d+)", text, re.IGNORECASE)
for ref in issue_n_refs:
issue_ref = f"#{ref}"
if issue_ref not in issue_refs:
issue_refs.append(issue_ref)
return issue_refs
def apply_unified_diff(
self, current_content: str, diff_content: str
) -> Tuple[Optional[str], str]:
"""
Apply a unified diff to content.
Args:
current_content: Current file content
diff_content: Unified diff patch
Returns:
Tuple of (new_content, error_message)
- On success: (new_content, "")
- On failure: (None, error_description)
"""
try:
diff_lines = diff_content.splitlines(keepends=True)
# Validate diff format - check for hunk headers
has_hunk_header = any(line.startswith("@@") for line in diff_lines)
has_add_or_remove = any(
line.startswith("+") or line.startswith("-")
for line in diff_lines
if not line.startswith("+++") and not line.startswith("---")
)
if not has_hunk_header:
return (
None,
"No hunk headers (@@) found. Unified diff format required.",
)
if not has_add_or_remove:
return (None, "No additions (+) or deletions (-) found in diff.")
# Parse hunks from unified diff
hunks = []
current_hunk = None
in_hunk = False
for line in diff_lines:
if line.startswith("---") or line.startswith("+++"):
continue # Skip file headers
elif line.startswith("@@"):
if current_hunk:
hunks.append(current_hunk)
# Parse hunk header: @@ -old_line,old_count +new_line,new_count @@
match = re.search(
r"@@\s+-(\d+)(?:,(\d+))?\s+\+(\d+)(?:,(\d+))?\s+@@", line
)
if match:
old_start = int(match.group(1))
new_start = int(match.group(3))
current_hunk = {
"old_start": old_start,
"new_start": new_start,
"lines": [],
}
in_hunk = True
else:
return (None, f"Invalid hunk header format: {line.strip()}")
continue
elif in_hunk and len(line) > 0 and line[0:1] in ("+", "-", " "):
if current_hunk:
current_hunk["lines"].append(line)
elif in_hunk and line.strip() == "":
# Empty line in diff - treat as context
if current_hunk:
current_hunk["lines"].append(" \n")
elif in_hunk:
if current_hunk:
hunks.append(current_hunk)
current_hunk = None
in_hunk = False
if current_hunk:
hunks.append(current_hunk)
if not hunks:
return (None, "No valid hunks could be parsed from the diff content.")
# Split content into lines
old_lines = current_content.splitlines(keepends=False)
new_lines = list(old_lines)
# Apply each hunk in reverse order to maintain correct indices
for hunk in sorted(hunks, key=lambda h: h["old_start"], reverse=True):
old_start = hunk["old_start"] - 1 # Convert to 0-indexed
# Collect lines to remove and add
lines_to_remove = []
lines_to_add = []
for line in hunk["lines"]:
if line.startswith("+"):
lines_to_add.append(line[1:].rstrip("\n"))
elif line.startswith("-"):
lines_to_remove.append(line[1:].rstrip("\n"))
# Remove old lines if they match
if lines_to_remove:
end_idx = old_start + len(lines_to_remove)
if end_idx <= len(new_lines):
actual_lines = new_lines[old_start:end_idx]
if actual_lines == lines_to_remove:
del new_lines[old_start:end_idx]
else:
# Context mismatch - warn but continue
del new_lines[old_start:end_idx]
# Insert new lines at old_start
for line in reversed(lines_to_add):
new_lines.insert(old_start, line)
# Reconstruct content with line endings
new_content = "\n".join(new_lines)
if new_content and not new_content.endswith("\n"):
new_content += "\n"
return (new_content, "")
except Exception as e:
return (None, f"Diff parsing error: {type(e).__name__}: {e}")
def generate_commit_message(
self,
change_type: str,
scope: str,
description: str,
issue_refs: Optional[List[str]] = None,
body: Optional[str] = None,
) -> str:
"""
Generate a conventional commit message.
Format: <type>(<scope>): <description>
"""
# Validate and normalize change type
valid_types = [
"feat",
"fix",
"docs",
"style",
"refactor",
"test",
"chore",
"perf",
"ci",
"build",
"revert",
]
if change_type.lower() not in valid_types:
change_type = "chore"
# Build the subject line
scope_str = f"({scope})" if scope else ""
message = f"{change_type.lower()}{scope_str}: {description}"
# Add issue references to footer
if issue_refs:
refs_str = ", ".join(issue_refs)
footer = f"Refs: {refs_str}"
if body:
body = f"{body}\n\n{footer}"
else:
message = f"{message}\n\n{footer}"
if body:
message = f"{message}\n\n{body}"
return message
def generate_diff_commit_message(self, path: str, diff_content: str) -> str:
"""Generate a commit message from diff content"""
file_name = path.split("/")[-1]
# Detect change type from diff
change_type = "chore"
diff_lines = diff_content.splitlines()
if any(
line.startswith("+def ") or line.startswith("+class ")
for line in diff_lines
):
change_type = "feat"
elif any(line.startswith("+ return ") for line in diff_lines):
change_type = "fix"
elif "test" in path.lower() or "spec" in path.lower():
change_type = "test"
elif ".md" in path.lower() or "readme" in path.lower():
change_type = "docs"
elif any(line.startswith("-") for line in diff_lines):
change_type = "refactor"
# Extract description from added lines
added_lines = [
line[1:].strip()
for line in diff_lines
if line.startswith("+") and not line.startswith("+++")
]
description = "update"
if added_lines:
for line in added_lines:
if line and not line.startswith(("import ", "from ")):
match = re.match(
r"(def|class|const|var|let|interface|type)\s+(\w+)", line
)
if match:
kind, name = match.groups()
if kind == "def":
description = f"add {name}() function"
break
elif kind == "class":
description = f"add {name} class"
break
elif not line.startswith((" ", "\t")):
description = line[:50].rstrip(":")
if len(line) > 50:
description += "..."
break
return f"{change_type}({file_name}): {description}"
class Tools:
"""
Gitea Coder Role - High-level workflow automation for code generation tasks.
ARCHITECTURE:
- Stateless design - no caching complexity
- Branch name = chat_id (PERIOD. Nothing else.)
- All operations are self-contained
- LLM focuses on code, not infrastructure
Workflow:
reads ticket → creates branch (chat_id) → commits → updates ticket → creates PR
Key Features:
- Branch name IS chat_id (KISS principle)
- Auto-generates conventional commit messages
- Quality gates for file changes (size delta validation)
- Diff-based updates to prevent accidental file replacements
- Complete CRUD operations (create, replace, delete, rename)
- Ticket updates for status tracking
- PR reading for feedback
"""
class Valves(BaseModel):
"""System-wide configuration for Gitea Coder integration"""
GITEA_URL: str = Field(
default="https://gitea.example.com",
description="Gitea server URL",
)
DEFAULT_REPO: str = Field(
default="",
description="Default repository in owner/repo format",
)
DEFAULT_BRANCH: str = Field(
default="main",
description="Default branch name for base operations",
)
ALLOW_USER_OVERRIDES: bool = Field(
default=True,
description="Allow users to override defaults via UserValves",
)
VERIFY_SSL: bool = Field(
default=True,
description="Verify SSL certificates",
)
MAX_SIZE_DELTA_PERCENT: float = Field(
default=50.0,
description="Maximum allowed file size change percentage (quality gate)",
ge=1.0,
le=500.0,
)
PROTECTED_BRANCHES: List[str] = Field(
default=["main", "master", "develop", "dev", "release", "hotfix"],
description="Branches that cannot be committed to directly",
)
class UserValves(BaseModel):
"""Per-user configuration"""
GITEA_TOKEN: str = Field(
default="",
description="Your Gitea API token",
)
USER_DEFAULT_REPO: str = Field(
default="",
description="Override default repository",
)
USER_DEFAULT_BRANCH: str = Field(
default="",
description="Override default branch",
)
def __init__(self):
"""Initialize tools"""
self.valves = self.Valves()
self.user_valves = self.UserValves()
self.citation = True
# Initialize helper functions
self._gitea = GiteaHelpers(self)
async def workflow_summary(
self,
__user__: dict = None,
__metadata__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> dict:
"""Get a summary of available coder workflows and commands"""
branch = self._gitea.get_branch(__user__, __metadata__)
message = f"""# 🚀 Gitea Coder Workflow Guide
## Workflow (4 Steps)
1. **Read the ticket:** `read_ticket(issue_number)` or `read_ticket_by_url(url)`
- Branch `{branch}` is auto-created when you read the ticket
- Files are immediately accessible
2. **Read & modify files:**
- `get_file(path)` - Read current content
- `list_files(path)` - Browse repository
- `apply_diff(path, diff, message)` - Apply changes (preferred)
- `commit_changes(path, content, message)` - Full file replacement
3. **Update the ticket:** `update_ticket(issue_number, comment)` ← **CRITICAL**
- This is your **statement of work**
- Document what you did, what you found, decisions made
- Update throughout the process, not just at the end
4. **Create PR:** `create_pull_request(title, description)`
- Use `read_pull_request(pr_number)` to check feedback
## Available Commands
### 📋 Ticket Operations (Statement of Work)
- `read_ticket(issue_number)` - Get issue details, auto-creates branch
- `read_ticket_by_url(url)` - Get issue details by URL, auto-creates branch
- `update_ticket(issue_number, comment)` - **Document your work on the ticket**
### 🌿 Branch Management
- `create_branch()` - Manually create branch (usually not needed)
- `get_branch_status()` - See current working branch
- `list_branches()` - List all branches in repository
### 🧠 Session State (Auto-managed)
- `get_session_state()` - View persisted repo/issue/branch context
- `clear_session_state()` - Reset session (start fresh)
- `list_sessions()` - Find orphaned sessions from crashed chats
- `claim_session(chat_id)` - Continue work from a dead session
### 📝 File Operations
- `get_file(path)` - Read file content
- `list_files(path)` - List directory contents
- `apply_diff(path, diff, message)` - Apply unified diff patch (preferred)
- `commit_changes(path, content, message)` - Commit with size delta gate
- `replace_file(path, content, message)` - Replace entire file
- `create_file(path, content, message)` - Create new file
- `delete_file(path, message)` - Delete a file
- `rename_file(old_path, new_path, message)` - Rename a file
### 📦 Pull Request Operations
- `create_pull_request(title, description)` - Create PR from current branch
- `read_pull_request(pr_number)` - Get PR details and review feedback
## Current Session
**Working Branch:** `{branch}` (same as chat_id)
## Diff Format Guide
When using `apply_diff()`, provide a **unified diff** format:
```diff
--- a/path/to/file.js
+++ b/path/to/file.js
@@ -10,3 +10,4 @@
existing line (context)
+new line to add
another existing line
-line to remove
```
**Key elements:**
- `@@` hunk headers are REQUIRED
- Lines starting with `+` are additions
- Lines starting with `-` are deletions
- Lines starting with ` ` (space) are context
## Best Practices
1. **Always update the ticket** - It's your audit trail and statement of work
2. **Use `apply_diff()` for changes** - More precise, prevents accidents
3. **Read files before modifying** - Understand current state first
4. **Commit messages matter** - They're auto-generated but can be customized
"""
return {"status": "success", "message": message}
async def read_ticket(
self,
issue_number: int,
repo: Optional[str] = None,
__user__: dict = None,
__metadata__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> dict:
"""
Read and parse a ticket/issue by number.
Automatically creates the working branch (chat_id) if it doesn't exist.
This enables immediate file operations after reading the ticket.
"""
retVal = {"status": "failure", "message": ""}
token = self._gitea.get_token(__user__)
if not token:
retVal["message"] = "GITEA_TOKEN not configured in UserValves settings."
return retVal
try:
owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__)
except ValueError as e:
retVal["message"] = str(e)
return retVal
# Get branch name (chat_id)
branch_name = self._gitea.get_branch(__user__, __metadata__)
branch_status = "unknown"
# Auto-create branch for immediate file operations
if branch_name not in self.valves.PROTECTED_BRANCHES:
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Setting up branch {branch_name[:8]}...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.post(
self._gitea.api_url(f"/repos/{owner}/{repo_name}/branches"),
headers=self._gitea.headers(__user__),
json={
"new_branch_name": branch_name,
"old_branch_name": self.valves.DEFAULT_BRANCH,
},
)
if response.status_code == 409:
branch_status = "exists"
elif response.status_code in (200, 201):
branch_status = "created"
else:
branch_status = "failed"
except Exception:
branch_status = "failed"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Fetching issue #{issue_number}...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.get(
self._gitea.api_url(
f"/repos/{owner}/{repo_name}/issues/{issue_number}"
),
headers=self._gitea.headers(__user__),
)
response.raise_for_status()
issue = response.json()
title = issue.get("title", "No title")
body = issue.get("body", "")
state = issue.get("state", "unknown")
user = issue.get("user", {}).get("login", "unknown")
labels = [label.get("name", "") for label in issue.get("labels", [])]
created_at = issue.get("created_at", "")[:10]
html_url = issue.get("html_url", "")
# Parse body for structured info
testing_criteria = []
technical_notes = []
is_testing_required = "test" in body.lower() or "testing" in body.lower()
is_docs_required = "documentation" in body.lower() or "docs" in body.lower()
if is_testing_required:
testing_section = re.search(
r"(?:testing|criteria|test).*?:(.*?)(?:\n\n|$)",
body,
re.IGNORECASE | re.DOTALL,
)
if testing_section:
testing_criteria = [
line.strip().lstrip("-*•")
for line in testing_section.group(1).split("\n")
if line.strip()
]
tech_section = re.search(
r"(?:technical|tech).*?:(.*?)(?:\n\n|$)",
body,
re.IGNORECASE | re.DOTALL,
)
if tech_section:
technical_notes = [
line.strip().lstrip("-*•")
for line in tech_section.group(1).split("\n")
if line.strip()
]
issue_refs = self._gitea.parse_issue_refs(body)
if not any(ref == f"#{issue_number}" for ref in issue_refs):
issue_refs.insert(0, f"#{issue_number}")
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
# Build message
message = f"# 📋 Ticket #{issue_number}: {title}\n\n"
message += f"**State:** {state.upper()} | **Author:** @{user} | **Created:** {created_at}\n"
message += f"**Labels:** {', '.join(labels) if labels else 'None'}\n"
message += f"**URL:** {html_url}\n\n"
# Show branch status
if branch_status == "created":
message += f"🌿 **Branch Created:** `{branch_name}` (from `{self.valves.DEFAULT_BRANCH}`)\n\n"
elif branch_status == "exists":
message += f"🌿 **Branch Ready:** `{branch_name}` (already exists)\n\n"
elif branch_status == "failed":
message += f"⚠️ **Branch:** Could not create `{branch_name}` - you may need to call `create_branch()`\n\n"
if body:
message += "## 📝 Description\n\n"
if len(body) > 1000:
message += f"{body[:1000]}...\n\n"
else:
message += f"{body}\n\n"
message += "## 🧪 Testing Requirements\n\n"
if is_testing_required:
if testing_criteria:
message += "**Testing Criteria:**\n"
for criterion in testing_criteria:
message += f"- [ ] {criterion}\n"
message += "\n"
else:
message += "Testing required, but no specific criteria listed.\n\n"
else:
message += "No explicit testing requirements detected.\n\n"
if technical_notes:
message += "## 🔧 Technical Notes\n\n"
for note in technical_notes:
message += f"- {note}\n"
message += "\n"
if is_docs_required:
message += "## 📚 Documentation Required\n\n"
message += "This ticket mentions documentation needs.\n\n"
if issue_refs:
message += "## 🔗 Related Issues\n\n"
for ref in issue_refs:
message += f"- {ref}\n"
message += "\n"
message += "## 🚀 Workflow\n\n"
message += f"1. **Read files:** `get_file(path)` or `list_files(path)` - branch `{branch_name}` is ready\n"
message += "2. **Make changes:** `apply_diff()` or `commit_changes()`\n"
message += f"3. **Update ticket:** `update_ticket({issue_number}, comment)` ← **Document your work!**\n"
message += "4. **Create PR:** `create_pull_request(title)`\n\n"
message += "---\n"
message += f"💡 **Important:** Always update the ticket with your progress and findings.\n"
message += f"The ticket is your **statement of work** - it documents what was done and why.\n"
# Persist state for context recovery (helps models that lose track)
if __metadata__ and __metadata__.get("chat_id"):
self._gitea.save_state(
__metadata__["chat_id"],
repo=f"{owner}/{repo_name}",
issue=issue_number,
branch=branch_name,
)
retVal["status"] = "success"
retVal["message"] = message
return retVal
except httpx.HTTPStatusError as e:
error_msg = self._gitea.format_error(e, f"issue #{issue_number}")
if e.response.status_code == 404:
retVal["message"] = (
f"Issue #{issue_number} not found in {owner}/{repo_name}."
)
else:
retVal["message"] = f"Failed to fetch issue. {error_msg}"
return retVal
except Exception as e:
retVal["message"] = f"Unexpected failure: {type(e).__name__}: {e}"
return retVal
async def read_ticket_by_url(
self,
url: str,
__user__: dict = None,
__metadata__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> dict:
"""
Read and parse a ticket/issue by URL.
Format: https://<domain>/<owner>/<repo>/issues/<number>
Automatically creates the working branch (chat_id) if it doesn't exist.
This enables immediate file operations after reading the ticket.
"""
retVal = {"status": "failure", "message": ""}
# Parse URL
owner, repo, issue_number = self._gitea.parse_issue_url(url)
if not owner or not repo or not issue_number:
retVal["message"] = (
f"Invalid issue URL format. Expected: https://domain/owner/repo/issues/number\nGot: {url}"
)
return retVal
# Use the standard read_ticket with parsed components
return await self.read_ticket(
issue_number=issue_number,
repo=f"{owner}/{repo}",
__user__=__user__,
__metadata__=__metadata__,
__event_emitter__=__event_emitter__,
)
async def update_ticket(
self,
issue_number: int,
comment: str,
repo: Optional[str] = None,
__user__: dict = None,
__metadata__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> dict:
"""
Post a status update comment to a ticket.
THIS IS YOUR STATEMENT OF WORK.
Use this to document:
- What you analyzed and found
- What changes you made and why
- Decisions and trade-offs
- Testing performed
- Any blockers or questions
Update the ticket throughout your work, not just at the end.
The ticket comment history is the audit trail of the work performed.
"""
retVal = {"status": "failure", "message": ""}
token = self._gitea.get_token(__user__)
if not token:
retVal["message"] = "GITEA_TOKEN not configured in UserValves settings."
return retVal
try:
owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__)
except ValueError as e:
retVal["message"] = str(e)
return retVal
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Updating issue #{issue_number}...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.post(
self._gitea.api_url(
f"/repos/{owner}/{repo_name}/issues/{issue_number}/comments"
),
headers=self._gitea.headers(__user__),
json={"body": comment},
)
response.raise_for_status()
result = response.json()
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
retVal["status"] = "success"
retVal["message"] = f"✅ Updated issue #{issue_number} with status comment"
return retVal
except httpx.HTTPStatusError as e:
error_msg = self._gitea.format_error(e, f"issue #{issue_number} update")
retVal["message"] = f"Failed to update issue. {error_msg}"
return retVal
except Exception as e:
retVal["message"] = f"Unexpected failure: {type(e).__name__}: {e}"
return retVal
async def create_branch(
self,
repo: Optional[str] = None,
__user__: dict = None,
__metadata__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> dict:
"""
Create a new branch with name = chat_id.
KISS: Branch name IS chat_id. Nothing else.
"""
retVal = {"status": "failure", "message": ""}
token = self._gitea.get_token(__user__)
if not token:
retVal["message"] = "GITEA_TOKEN not configured in UserValves settings."
return retVal
try:
owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__)
except ValueError as e:
retVal["message"] = str(e)
return retVal
# Branch name IS chat_id
branch_name = self._gitea.get_branch(__user__, __metadata__)
# Check if protected branch
if branch_name in self.valves.PROTECTED_BRANCHES:
retVal["message"] = (
f"❌ Cannot create branch with protected name '{branch_name}'"
)
return retVal
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Creating branch {branch_name}...",
"done": False,
},
}
)
# Get base branch
base_branch = self.valves.DEFAULT_BRANCH
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.post(
self._gitea.api_url(f"/repos/{owner}/{repo_name}/branches"),
headers=self._gitea.headers(__user__),
json={
"new_branch_name": branch_name,
"old_branch_name": base_branch,
},
)
# Handle branch already exists
if response.status_code == 409:
retVal["status"] = "success"
retVal["message"] = f"⚠️ Branch `{branch_name}` already exists."
return retVal
response.raise_for_status()
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
# Persist state for context recovery
if __metadata__ and __metadata__.get("chat_id"):
self._gitea.save_state(
__metadata__["chat_id"],
repo=f"{owner}/{repo_name}",
branch=branch_name,
)
retVal["status"] = "success"
retVal[
"message"
] = f"""✅ **Branch Created Successfully**
**Branch:** `{branch_name}` (chat_id)
**Base Branch:** `{base_branch}`
**Repository:** `{owner}/{repo_name}`
**Next Steps:**
1. Make changes to files using `apply_diff()` or `commit_changes()`
2. Update ticket with status: `update_ticket(issue_number, comment)`
3. Create PR: `create_pull_request(title)`
"""
return retVal
except httpx.HTTPStatusError as e:
error_msg = self._gitea.format_error(e, "branch creation")
retVal["message"] = f"Failed to create branch. {error_msg}"
return retVal
except Exception as e:
retVal["message"] = f"Unexpected failure: {type(e).__name__}: {e}"
return retVal
async def get_branch_status(
self,
repo: Optional[str] = None,
__user__: dict = None,
__metadata__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> dict:
"""Get the current working branch status"""
retVal = {"status": "success", "message": ""}
try:
owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__)
except ValueError as e:
retVal["status"] = "failure"
retVal["message"] = str(e)
return retVal
# Get branch from metadata (chat_id)
working_branch = self._gitea.get_branch(__user__, __metadata__)
message = f"# 🌿 Branch Status: {owner}/{repo_name}\n\n"
message += f"**Current Branch:** `{working_branch}` (chat_id)\n\n"
if __metadata__ and __metadata__.get("chat_id"):
message += f"**Chat ID:** `{__metadata__.get('chat_id')}`\n\n"
message += "All file operations will use this branch.\n"
retVal["message"] = message
return retVal
async def get_session_state(
self,
__user__: dict = None,
__metadata__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> dict:
"""
View the persisted session state.
State is automatically saved when you read tickets, create branches,
or make commits. This helps maintain context across operations.
"""
retVal = {"status": "success", "message": ""}
chat_id = __metadata__.get("chat_id") if __metadata__ else None
if not chat_id:
retVal["message"] = (
"No chat_id available - session state requires a chat context."
)
return retVal
state = self._gitea.load_state(chat_id)
if not state:
retVal[
"message"
] = f"""# 🧠 Session State
**Chat ID:** `{chat_id}`
**State:** Empty (no operations performed yet)
Session state is automatically populated when you:
- Read a ticket (`read_ticket` / `read_ticket_by_url`)
- Create a branch (`create_branch`)
- Make commits
This state helps maintain context across operations.
"""
return retVal
message = f"# 🧠 Session State\n\n"
message += f"**Chat ID:** `{chat_id}`\n\n"
if state.get("repo"):
message += f"**Repository:** `{state['repo']}`\n"
if state.get("issue"):
message += f"**Issue:** #{state['issue']}\n"
if state.get("branch"):
message += f"**Branch:** `{state['branch']}`\n"
if state.get("updated_at"):
message += f"**Last Updated:** {state['updated_at']}\n"
message += "\nThis state is automatically used when repo/issue is not explicitly provided."
retVal["message"] = message
return retVal
async def clear_session_state(
self,
__user__: dict = None,
__metadata__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> dict:
"""
Clear the persisted session state.
Use this to start fresh or switch to a different repository/issue.
"""
retVal = {"status": "success", "message": ""}
chat_id = __metadata__.get("chat_id") if __metadata__ else None
if not chat_id:
retVal["message"] = "No chat_id available - nothing to clear."
return retVal
state_path = self._gitea._state_path(chat_id)
if state_path.exists():
try:
state_path.unlink()
retVal["message"] = (
f"✅ Session state cleared for chat `{chat_id[:8]}...`\n\nYou can now start fresh with a new repository or issue."
)
except Exception as e:
retVal["status"] = "failure"
retVal["message"] = f"Failed to clear state: {e}"
else:
retVal["message"] = "Session state was already empty."
return retVal
async def list_sessions(
self,
__user__: dict = None,
__metadata__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> dict:
"""
List all persisted sessions with their metadata.
Use this to find orphaned work from crashed/dead sessions.
Sessions can be claimed with `claim_session(chat_id)` to continue work.
"""
retVal = {"status": "success", "message": ""}
current_chat_id = __metadata__.get("chat_id") if __metadata__ else None
state_dir = self._gitea._state_dir() / "chats"
if not state_dir.exists():
retVal["message"] = "No sessions found. State directory does not exist."
return retVal
sessions = []
for chat_dir in state_dir.iterdir():
if chat_dir.is_dir():
state_file = chat_dir / "state.json"
if state_file.exists():
try:
state = json.loads(state_file.read_text())
state["chat_id"] = chat_dir.name
state["is_current"] = chat_dir.name == current_chat_id
sessions.append(state)
except Exception:
pass
if not sessions:
retVal["message"] = "No sessions found."
return retVal
# Sort by updated_at descending (most recent first)
sessions.sort(key=lambda x: x.get("updated_at", ""), reverse=True)
message = "# 📋 Saved Sessions\n\n"
message += f"**Current Chat:** `{current_chat_id[:8] if current_chat_id else 'unknown'}...`\n\n"
message += "---\n\n"
for s in sessions:
chat_id = s.get("chat_id", "unknown")
is_current = s.get("is_current", False)
marker = " ← **CURRENT**" if is_current else ""
message += f"### `{chat_id[:8]}...`{marker}\n\n"
if s.get("repo"):
message += f"- **Repo:** `{s['repo']}`\n"
if s.get("issue"):
message += f"- **Issue:** #{s['issue']}\n"
if s.get("branch"):
message += f"- **Branch:** `{s['branch'][:12]}...`\n"
if s.get("pr_number"):
message += f"- **PR:** #{s['pr_number']}\n"
if s.get("updated_at"):
message += f"- **Last Activity:** {s['updated_at']}\n"
if not is_current:
message += f'\n→ `claim_session("{chat_id}")` to continue this work\n'
message += "\n---\n\n"
message += f"**Total Sessions:** {len(sessions)}\n"
retVal["message"] = message
return retVal
async def claim_session(
self,
session_id: str,
__user__: dict = None,
__metadata__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> dict:
"""
Claim an orphaned session to continue its work.
Use `list_sessions()` to find available sessions.
This copies the session's state (repo, issue, branch) to the current chat,
allowing you to continue work on an existing branch after a session death.
IMPORTANT: The branch name in Gitea will still be the old chat_id.
This function updates the tool's working context, not the Gitea branch name.
"""
retVal = {"status": "failure", "message": ""}
current_chat_id = __metadata__.get("chat_id") if __metadata__ else None
if not current_chat_id:
retVal["message"] = "No current chat_id available."
return retVal
if session_id == current_chat_id:
retVal["message"] = "Cannot claim your own session - already active."
retVal["status"] = "success"
return retVal
# Load the orphaned session's state
orphan_state = self._gitea.load_state(session_id)
if not orphan_state:
retVal["message"] = (
f"Session `{session_id[:8]}...` not found or has no state."
)
return retVal
# Copy state to current session, preserving the original branch name
self._gitea.save_state(
current_chat_id,
repo=orphan_state.get("repo"),
issue=orphan_state.get("issue"),
branch=orphan_state.get("branch"), # Keep original branch name!
pr_number=orphan_state.get("pr_number"),
claimed_from=session_id,
)
message = f"✅ **Session Claimed**\n\n"
message += f"**Claimed From:** `{session_id[:8]}...`\n"
message += f"**Current Chat:** `{current_chat_id[:8]}...`\n\n"
message += "**Inherited State:**\n"
if orphan_state.get("repo"):
message += f"- **Repo:** `{orphan_state['repo']}`\n"
if orphan_state.get("issue"):
message += f"- **Issue:** #{orphan_state['issue']}\n"
if orphan_state.get("branch"):
message += f"- **Branch:** `{orphan_state['branch']}`\n"
if orphan_state.get("pr_number"):
message += f"- **PR:** #{orphan_state['pr_number']}\n"
message += "\n⚠️ **Note:** File operations will use branch `{branch}`, not your current chat_id.\n".format(
branch=orphan_state.get("branch", "unknown")[:12] + "..."
)
message += "This is correct - you're continuing work on the existing branch.\n"
retVal["status"] = "success"
retVal["message"] = message
return retVal
async def list_branches(
self,
repo: Optional[str] = None,
__user__: dict = None,
__metadata__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> dict:
"""List all branches in the repository"""
retVal = {"status": "failure", "message": ""}
token = self._gitea.get_token(__user__)
if not token:
retVal["message"] = "GITEA_TOKEN not configured."
return retVal
try:
owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__)
except ValueError as e:
retVal["message"] = str(e)
return retVal
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Fetching branches...", "done": False},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.get(
self._gitea.api_url(f"/repos/{owner}/{repo_name}/branches"),
headers=self._gitea.headers(__user__),
params={"limit": 50},
)
response.raise_for_status()
branches = response.json()
message = f"# 🌿 Branches in {owner}/{repo_name}\n\n"
# Separate protected and other branches
protected = [b for b in branches if b.get("protected")]
other = [b for b in branches if not b.get("protected")]
if protected:
message += "## 🛡️ Protected Branches\n\n"
for branch in sorted(protected, key=lambda x: x["name"]):
name = branch.get("name", "")
commit_sha = branch.get("commit", {}).get("id", "")[:8]
message += f"- `{name}` [commit: {commit_sha}]\n"
message += "\n"
if other:
message += "## 📦 Other Branches\n\n"
for branch in sorted(other, key=lambda x: x["name"])[:20]:
name = branch.get("name", "")
commit_sha = branch.get("commit", {}).get("id", "")[:8]
message += f"- `{name}` [commit: {commit_sha}]\n"
if len(other) > 20:
message += f"\n... and {len(other) - 20} more branches\n"
message += "\n"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
retVal["status"] = "success"
retVal["message"] = message.strip()
return retVal
except httpx.HTTPStatusError as e:
error_msg = self._gitea.format_error(e, "branch listing")
retVal["message"] = f"Failed to list branches. {error_msg}"
return retVal
except Exception as e:
retVal["message"] = f"Unexpected failure: {type(e).__name__}: {e}"
return retVal
async def apply_diff(
self,
path: str,
diff_content: str,
message: Optional[str] = None,
repo: Optional[str] = None,
auto_message: bool = True,
__user__: dict = None,
__metadata__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> dict:
"""
Apply a unified diff patch to a file.
This is the PREFERRED method for making changes as it:
1. Is precise about what changes
2. Prevents accidental file replacements
3. Is what LLMs understand best (trained on GitHub PRs)
REQUIRED FORMAT - Unified diff with hunk headers:
```diff
--- a/path/to/file
+++ b/path/to/file
@@ -line,count +line,count @@
context line
+added line
-removed line
```
"""
retVal = {"status": "failure", "message": ""}
# Get working branch from metadata
effective_branch = self._gitea.get_branch(__user__, __metadata__)
token = self._gitea.get_token(__user__)
if not token:
retVal["message"] = "GITEA_TOKEN not configured in UserValves settings."
return retVal
try:
owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__)
except ValueError as e:
retVal["message"] = str(e)
return retVal
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Applying diff to {path}...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
# Get current file content
get_response = await client.get(
self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._gitea.headers(__user__),
params={"ref": effective_branch},
)
# Check if file exists
if get_response.status_code == 404:
retVal["message"] = (
f"File not found: `{path}`. Use `create_file()` to create a new file."
)
return retVal
get_response.raise_for_status()
file_info = get_response.json()
current_sha = file_info.get("sha")
# Decode current content
current_content_b64 = file_info.get("content", "")
try:
current_content = base64.b64decode(current_content_b64).decode(
"utf-8"
)
except Exception:
retVal["message"] = "Could not decode current file content."
return retVal
# Parse and apply the diff
new_content, error_msg = self._gitea.apply_unified_diff(
current_content, diff_content
)
if new_content is None:
retVal[
"message"
] = f"""❌ **Failed to apply diff**
**Reason:** {error_msg}
**Required format:** Unified diff with hunk headers
```diff
--- a/{path}
+++ b/{path}
@@ -10,3 +10,4 @@
existing line (context)
+new line to add
-line to remove
```
**Key requirements:**
- `@@` hunk headers are REQUIRED (e.g., `@@ -10,3 +10,4 @@`)
- Lines starting with `+` are additions
- Lines starting with `-` are deletions
- Lines starting with ` ` (space) are context
Use `get_file(path)` to see current content and construct a proper diff.
"""
return retVal
# Check if diff actually changed anything
if new_content == current_content:
retVal[
"message"
] = f"""⚠️ **Diff produced no changes**
The diff was parsed but resulted in no modifications to the file.
**Possible causes:**
1. The context lines in your diff don't match the actual file content
2. The hunk line numbers are incorrect
3. The changes were already applied
Use `get_file("{path}")` to see the current content and verify your diff targets the correct lines.
"""
return retVal
# Generate commit message if needed
if not message:
if auto_message:
message = self._gitea.generate_diff_commit_message(
path, diff_content
)
else:
retVal["message"] = (
"Commit message is required when auto_message=False."
)
return retVal
# Commit the changes
new_content_b64 = base64.b64encode(new_content.encode("utf-8")).decode(
"ascii"
)
response = await client.put(
self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._gitea.headers(__user__),
json={
"content": new_content_b64,
"message": message,
"branch": effective_branch,
"sha": current_sha,
},
)
response.raise_for_status()
result = response.json()
commit_sha = result.get("commit", {}).get("sha", "unknown")[:8]
# Parse diff stats
added_lines = len(
[
l
for l in diff_content.splitlines()
if l.startswith("+") and not l.startswith("+++")
]
)
removed_lines = len(
[
l
for l in diff_content.splitlines()
if l.startswith("-") and not l.startswith("---")
]
)
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
# Prepare patched content for review
# For large files, show a truncated version
patched_lines = new_content.splitlines()
total_lines = len(patched_lines)
if total_lines <= 100:
# Small file - show entire content
patched_preview = new_content
else:
# Large file - show first 30 and last 20 lines with indicator
head = "\n".join(patched_lines[:30])
tail = "\n".join(patched_lines[-20:])
patched_preview = (
f"{head}\n\n... [{total_lines - 50} lines omitted] ...\n\n{tail}"
)
message_text = f"✅ **Diff Applied - REVIEW REQUIRED**\n\n"
message_text += f"**File:** `{path}`\n"
message_text += f"**Branch:** `{effective_branch}`\n"
message_text += f"**Commit:** `{commit_sha}`\n"
message_text += (
f"**Changes:** +{added_lines} lines, -{removed_lines} lines\n"
)
message_text += f"**Message:** {message}\n\n"
message_text += "---\n\n"
message_text += "## 📋 Review the Patched File\n\n"
message_text += "**Check for:**\n"
message_text += "- ❌ Accidental line removals\n"
message_text += "- ❌ Code placed in wrong location\n"
message_text += "- ❌ Duplicate code blocks\n"
message_text += "- ❌ Missing imports or dependencies\n"
message_text += "- ❌ Broken syntax or indentation\n\n"
message_text += f"**Patched Content ({total_lines} lines):**\n\n"
message_text += f"```\n{patched_preview}\n```\n\n"
message_text += "---\n"
message_text += (
"⚠️ **If the patch is incorrect**, use `apply_diff()` again to fix, "
)
message_text += "or `get_file()` to see current state and `replace_file()` to correct.\n"
retVal["status"] = "success"
retVal["message"] = message_text
return retVal
except httpx.HTTPStatusError as e:
error_msg = self._gitea.format_error(e, f"diff application to '{path}'")
if e.response.status_code == 409:
retVal["message"] = (
f"Update conflict for `{path}`. Fetch the latest version and try again."
)
else:
retVal["message"] = f"Failed to apply diff. {error_msg}"
return retVal
except Exception as e:
retVal["message"] = (
f"Unexpected failure during diff application: {type(e).__name__}: {e}"
)
return retVal
async def commit_changes(
self,
path: str,
content: str,
message: Optional[str] = None,
repo: Optional[str] = None,
max_delta_percent: Optional[float] = None,
__user__: dict = None,
__metadata__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> dict:
"""
Commit file changes with automatic content detection and size delta gating.
This method:
1. Detects whether to create or replace a file
2. Validates file size changes against threshold (quality gate)
3. Auto-generates commit message if not provided
4. Commits to the working branch
"""
retVal = {"status": "failure", "message": ""}
# Get working branch from metadata
effective_branch = self._gitea.get_branch(__user__, __metadata__)
token = self._gitea.get_token(__user__)
if not token:
retVal["message"] = "GITEA_TOKEN not configured in UserValves settings."
return retVal
try:
owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__)
except ValueError as e:
retVal["message"] = str(e)
return retVal
# Use provided threshold or default from valves
delta_threshold = max_delta_percent or self.valves.MAX_SIZE_DELTA_PERCENT
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Processing {path}...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
# Check if file exists
get_response = await client.get(
self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._gitea.headers(__user__),
params={"ref": effective_branch},
)
file_exists = get_response.status_code == 200
current_sha = None
current_size = 0
if file_exists:
get_response.raise_for_status()
file_info = get_response.json()
current_sha = file_info.get("sha")
current_size = file_info.get("size", 0)
# SIZE DELTA GATE - Quality check
new_size = len(content.encode("utf-8"))
if current_size > 0 and new_size > 0:
delta_percent = (
abs(new_size - current_size) / current_size * 100
)
if delta_percent > delta_threshold:
# Calculate actual bytes changed
size_diff = new_size - current_size
direction = "larger" if size_diff > 0 else "smaller"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": "Size gate triggered",
"done": True,
"hidden": True,
},
}
)
retVal[
"message"
] = f"""⚠️ **Quality Gate: Large File Change Detected**
**File:** `{path}`
**Current Size:** {current_size} bytes
**New Size:** {new_size} bytes
**Change:** {size_diff:+d} bytes ({delta_percent:.1f}% {direction})
**Threshold:** {delta_threshold}%
**Recommended: Use diff-based updates instead**
```python
apply_diff(path="{path}", diff_content="...")
```
Or override with:
```python
commit_changes(..., max_delta_percent=100)
```
"""
return retVal
# Generate commit message if not provided
if not message:
message = self._gitea.generate_commit_message(
change_type="chore",
scope=path.split("/")[-1] if "/" in path else path,
description=f"update {path}",
)
# Prepare content
content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii")
if file_exists:
# Replace existing file
if not current_sha:
retVal["message"] = (
f"Could not retrieve SHA for existing file: {path}"
)
return retVal
response = await client.put(
self._gitea.api_url(
f"/repos/{owner}/{repo_name}/contents/{path}"
),
headers=self._gitea.headers(__user__),
json={
"content": content_b64,
"message": message,
"branch": effective_branch,
"sha": current_sha,
},
)
else:
# Create new file
response = await client.post(
self._gitea.api_url(
f"/repos/{owner}/{repo_name}/contents/{path}"
),
headers=self._gitea.headers(__user__),
json={
"content": content_b64,
"message": message,
"branch": effective_branch,
},
)
response.raise_for_status()
result = response.json()
commit_sha = result.get("commit", {}).get("sha", "unknown")[:8]
action = "Updated" if file_exists else "Created"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
# Calculate and show size change
new_size = len(content.encode("utf-8"))
size_info = ""
if file_exists and current_size > 0:
delta = new_size - current_size
delta_percent = delta / current_size * 100
size_info = (
f"**Size Change:** {delta:+d} bytes ({delta_percent:.1f}%)\n"
)
message_text = f"""✅ **{action} File Successfully**
**File:** `{path}`
**Branch:** `{effective_branch}`
**Commit:** `{commit_sha}`
**Message:** {message}
{size_info}**Action:** {action}
"""
retVal["status"] = "success"
retVal["message"] = message_text
return retVal
except httpx.HTTPStatusError as e:
error_msg = self._gitea.format_error(e, f"file commit for '{path}'")
if e.response.status_code == 409:
retVal["message"] = (
f"Update conflict for `{path}`. Fetch the latest version and try again."
)
else:
retVal["message"] = f"Failed to commit file. {error_msg}"
return retVal
except Exception as e:
retVal["message"] = (
f"Unexpected failure during commit: {type(e).__name__}: {e}"
)
return retVal
async def create_pull_request(
self,
title: str,
body: Optional[str] = "",
repo: Optional[str] = None,
__user__: dict = None,
__metadata__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> dict:
"""Create a pull request from the current branch"""
retVal = {"status": "failure", "message": ""}
# Get working branch from metadata
head_branch = self._gitea.get_branch(__user__, __metadata__)
token = self._gitea.get_token(__user__)
if not token:
retVal["message"] = "GITEA_TOKEN not configured."
return retVal
try:
owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__)
except ValueError as e:
retVal["message"] = str(e)
return retVal
base_branch = self.valves.DEFAULT_BRANCH
# Check if protected branch
if head_branch in self.valves.PROTECTED_BRANCHES:
retVal["message"] = (
f"❌ Cannot create PR from protected branch '{head_branch}'"
)
return retVal
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Creating PR...", "done": False},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.post(
self._gitea.api_url(f"/repos/{owner}/{repo_name}/pulls"),
headers=self._gitea.headers(__user__),
json={
"title": title,
"head": head_branch,
"base": base_branch,
"body": body,
},
)
# Handle PR already exists
if response.status_code == 409:
retVal["message"] = (
f"⚠️ PR already exists for branch `{head_branch}`"
)
return retVal
response.raise_for_status()
pr = response.json()
pr_number = pr.get("number")
pr_url = pr.get("html_url", "")
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
# Persist PR number for session recovery
if __metadata__ and __metadata__.get("chat_id"):
self._gitea.save_state(__metadata__["chat_id"], pr_number=pr_number)
retVal["status"] = "success"
retVal[
"message"
] = f"""✅ **Pull Request Created Successfully**
**PR #{pr_number}:** {title}
**Branch:** `{head_branch}` → `{base_branch}`
**URL:** {pr_url}
**Next Steps:**
1. Read PR feedback: `read_pull_request({pr_number})`
2. Address reviewer comments
3. Update ticket with PR link
"""
return retVal
except httpx.HTTPStatusError as e:
error_msg = self._gitea.format_error(e, "PR creation")
if e.response.status_code == 422:
retVal["message"] = (
"Could not create PR. The branch may not exist or there may be merge conflicts."
)
else:
retVal["message"] = f"Failed to create PR. {error_msg}"
return retVal
except Exception as e:
retVal["message"] = (
f"Unexpected failure during PR creation: {type(e).__name__}: {e}"
)
return retVal
async def read_pull_request(
self,
pr_number: int,
repo: Optional[str] = None,
__user__: dict = None,
__metadata__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> dict:
"""
Read pull request details including review feedback.
This is how you get feedback to iterate on changes.
"""
retVal = {"status": "failure", "message": ""}
token = self._gitea.get_token(__user__)
if not token:
retVal["message"] = "GITEA_TOKEN not configured."
return retVal
try:
owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__)
except ValueError as e:
retVal["message"] = str(e)
return retVal
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Fetching PR #{pr_number}...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
# Get PR details
pr_response = await client.get(
self._gitea.api_url(
f"/repos/{owner}/{repo_name}/pulls/{pr_number}"
),
headers=self._gitea.headers(__user__),
)
pr_response.raise_for_status()
pr = pr_response.json()
# Get PR comments
comments_response = await client.get(
self._gitea.api_url(
f"/repos/{owner}/{repo_name}/pulls/{pr_number}/comments"
),
headers=self._gitea.headers(__user__),
)
comments_response.raise_for_status()
comments = comments_response.json()
title = pr.get("title", "")
body = pr.get("body", "")
state = pr.get("state", "")
user = pr.get("user", {}).get("login", "")
head_branch = pr.get("head", {}).get("ref", "")
base_branch = pr.get("base", {}).get("ref", "")
mergeable = pr.get("mergeable", False)
merged = pr.get("merged", False)
html_url = pr.get("html_url", "")
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
# Build message
message = f"# 🔀 Pull Request #{pr_number}: {title}\n\n"
message += f"**State:** {state.upper()} | **Author:** @{user}\n"
message += f"**Branch:** `{head_branch}` → `{base_branch}`\n"
message += f"**Mergeable:** {'✅ Yes' if mergeable else '❌ No'}\n"
message += f"**Merged:** {'✅ Yes' if merged else '❌ No'}\n"
message += f"**URL:** {html_url}\n\n"
if body:
message += "## 📝 Description\n\n"
message += f"{body}\n\n"
if comments:
message += f"## 💬 Review Comments ({len(comments)})\n\n"
for comment in comments[:10]: # Limit to 10 most recent
comment_user = comment.get("user", {}).get("login", "unknown")
comment_body = comment.get("body", "")
comment_path = comment.get("path", "")
comment_line = comment.get("line", "")
message += f"**@{comment_user}**"
if comment_path:
message += f" on `{comment_path}`"
if comment_line:
message += f" (line {comment_line})"
message += f":\n{comment_body}\n\n"
if len(comments) > 10:
message += f"... and {len(comments) - 10} more comments\n\n"
else:
message += "## 💬 No review comments yet\n\n"
message += "## 🚀 Next Steps\n\n"
if comments:
message += "1. Address review comments\n"
message += (
"2. Make changes using `apply_diff()` or `commit_changes()`\n"
)
message += "3. Update ticket with progress\n"
else:
message += "Waiting for review feedback...\n"
retVal["status"] = "success"
retVal["message"] = message
return retVal
except httpx.HTTPStatusError as e:
error_msg = self._gitea.format_error(e, f"PR #{pr_number}")
if e.response.status_code == 404:
retVal["message"] = f"PR #{pr_number} not found in {owner}/{repo_name}."
else:
retVal["message"] = f"Failed to fetch PR. {error_msg}"
return retVal
except Exception as e:
retVal["message"] = f"Unexpected failure: {type(e).__name__}: {e}"
return retVal
async def replace_file(
self,
path: str,
content: str,
message: str,
repo: Optional[str] = None,
__user__: dict = None,
__metadata__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> dict:
"""
Update an existing file in the repository.
WARNING: This replaces the entire file content.
Use `apply_diff()` for incremental changes.
"""
retVal = {"status": "failure", "message": ""}
effective_branch = self._gitea.get_branch(__user__, __metadata__)
token = self._gitea.get_token(__user__)
if not token:
retVal["message"] = "GITEA_TOKEN not configured in UserValves settings."
return retVal
try:
owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__)
except ValueError as e:
retVal["message"] = str(e)
return retVal
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": f"Updating {path}...", "done": False},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
get_response = await client.get(
self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._gitea.headers(__user__),
params={"ref": effective_branch},
)
if get_response.status_code == 404:
retVal["message"] = (
f"File not found: `{path}`. Use `create_file()` to create a new file."
)
return retVal
get_response.raise_for_status()
file_info = get_response.json()
sha = file_info.get("sha")
if not sha:
retVal["message"] = "Could not retrieve file SHA for update."
return retVal
content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii")
response = await client.put(
self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._gitea.headers(__user__),
json={
"content": content_b64,
"message": message,
"branch": effective_branch,
"sha": sha,
},
)
response.raise_for_status()
result = response.json()
commit_sha = result.get("commit", {}).get("sha", "unknown")[:8]
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
message_text = f"✅ **File Updated Successfully**\n\n"
message_text += f"**File:** `{path}`\n"
message_text += f"**Branch:** `{effective_branch}`\n"
message_text += f"**Commit:** `{commit_sha}`\n"
message_text += f"**Message:** {message}\n"
retVal["status"] = "success"
retVal["message"] = message_text
return retVal
except httpx.HTTPStatusError as e:
error_msg = self._gitea.format_error(e, f"file update for '{path}'")
if e.response.status_code == 409:
retVal["message"] = (
f"Update conflict for `{path}`. Fetch the latest version and try again."
)
else:
retVal["message"] = f"Failed to update file. {error_msg}"
return retVal
except Exception as e:
retVal["message"] = (
f"Unexpected failure during file update: {type(e).__name__}: {e}"
)
return retVal
async def create_file(
self,
path: str,
content: str,
message: str,
repo: Optional[str] = None,
__user__: dict = None,
__metadata__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> dict:
"""Create a new file in the repository"""
retVal = {"status": "failure", "message": ""}
effective_branch = self._gitea.get_branch(__user__, __metadata__)
token = self._gitea.get_token(__user__)
if not token:
retVal["message"] = "GITEA_TOKEN not configured in UserValves settings."
return retVal
try:
owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__)
except ValueError as e:
retVal["message"] = str(e)
return retVal
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": f"Creating {path}...", "done": False},
}
)
try:
content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii")
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.post(
self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._gitea.headers(__user__),
json={
"content": content_b64,
"message": message,
"branch": effective_branch,
},
)
response.raise_for_status()
result = response.json()
commit_sha = result.get("commit", {}).get("sha", "unknown")[:8]
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
message_text = f"✅ **File Created Successfully**\n\n"
message_text += f"**File:** `{path}`\n"
message_text += f"**Branch:** `{effective_branch}`\n"
message_text += f"**Commit:** `{commit_sha}`\n"
message_text += f"**Message:** {message}\n"
retVal["status"] = "success"
retVal["message"] = message_text
return retVal
except httpx.HTTPStatusError as e:
error_msg = self._gitea.format_error(e, f"file creation for '{path}'")
if e.response.status_code == 422:
retVal["message"] = (
f"File already exists: `{path}`. Use `replace_file()` or `apply_diff()` to modify it."
)
else:
retVal["message"] = f"Failed to create file. {error_msg}"
return retVal
except Exception as e:
retVal["message"] = (
f"Unexpected failure during file creation: {type(e).__name__}: {e}"
)
return retVal
async def delete_file(
self,
path: str,
message: str,
repo: Optional[str] = None,
__user__: dict = None,
__metadata__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
__event_call__: Callable[[dict], Any] = None,
) -> dict:
"""Delete a file from the repository"""
retVal = {"status": "failure", "message": ""}
effective_branch = self._gitea.get_branch(__user__, __metadata__)
token = self._gitea.get_token(__user__)
if not token:
retVal["message"] = "GITEA_TOKEN not configured in UserValves settings."
return retVal
try:
owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__)
except ValueError as e:
retVal["message"] = str(e)
return retVal
# Confirmation dialog
if __event_call__:
result = await __event_call__(
{
"type": "confirmation",
"data": {
"title": "Confirm File Deletion",
"message": f"Delete `{path}` from `{owner}/{repo_name}` ({effective_branch})?",
},
}
)
if result is None or result is False:
retVal["message"] = "⚠️ File deletion cancelled by user."
return retVal
if isinstance(result, dict) and not result.get("confirmed"):
retVal["message"] = "⚠️ File deletion cancelled by user."
return retVal
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": f"Deleting {path}...", "done": False},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
get_response = await client.get(
self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._gitea.headers(__user__),
params={"ref": effective_branch},
)
if get_response.status_code == 404:
retVal["message"] = f"File not found: `{path}`"
return retVal
get_response.raise_for_status()
file_info = get_response.json()
sha = file_info.get("sha")
if not sha:
retVal["message"] = "Could not retrieve file SHA for deletion."
return retVal
response = await client.delete(
self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._gitea.headers(__user__),
json={
"message": message,
"branch": effective_branch,
"sha": sha,
},
)
response.raise_for_status()
result = response.json()
commit_sha = result.get("commit", {}).get("sha", "unknown")[:8]
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
message_text = f"✅ **File Deleted Successfully**\n\n"
message_text += f"**File:** `{path}`\n"
message_text += f"**Branch:** `{effective_branch}`\n"
message_text += f"**Commit:** `{commit_sha}`\n"
message_text += f"**Message:** {message}\n"
retVal["status"] = "success"
retVal["message"] = message_text
return retVal
except httpx.HTTPStatusError as e:
error_msg = self._gitea.format_error(e, f"file deletion for '{path}'")
if e.response.status_code == 404:
retVal["message"] = f"File not found: `{path}`"
else:
retVal["message"] = f"Failed to delete file. {error_msg}"
return retVal
except Exception as e:
retVal["message"] = (
f"Unexpected failure during file deletion: {type(e).__name__}: {e}"
)
return retVal
async def rename_file(
self,
old_path: str,
new_path: str,
message: Optional[str] = None,
repo: Optional[str] = None,
__user__: dict = None,
__metadata__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> dict:
"""Rename a file in the repository"""
retVal = {"status": "failure", "message": ""}
effective_branch = self._gitea.get_branch(__user__, __metadata__)
token = self._gitea.get_token(__user__)
if not token:
retVal["message"] = "GITEA_TOKEN not configured in UserValves settings."
return retVal
try:
owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__)
except ValueError as e:
retVal["message"] = str(e)
return retVal
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Renaming {old_path} to {new_path}...",
"done": False,
},
}
)
try:
if not message:
message = self._gitea.generate_commit_message(
change_type="chore",
scope="rename",
description=f"rename {old_path}{new_path}",
)
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
get_response = await client.get(
self._gitea.api_url(
f"/repos/{owner}/{repo_name}/contents/{old_path}"
),
headers=self._gitea.headers(__user__),
params={"ref": effective_branch},
)
if get_response.status_code == 404:
retVal["message"] = f"File not found: `{old_path}`"
return retVal
get_response.raise_for_status()
old_file = get_response.json()
old_sha = old_file.get("sha")
content_b64 = old_file.get("content", "")
try:
content = base64.b64decode(content_b64).decode("utf-8")
except Exception:
retVal["message"] = "Could not decode file content."
return retVal
new_content_b64 = base64.b64encode(content.encode("utf-8")).decode(
"ascii"
)
create_response = await client.post(
self._gitea.api_url(
f"/repos/{owner}/{repo_name}/contents/{new_path}"
),
headers=self._gitea.headers(__user__),
json={
"content": new_content_b64,
"message": message,
"branch": effective_branch,
},
)
if create_response.status_code == 422:
retVal["message"] = f"File already exists at new path: `{new_path}`"
return retVal
create_response.raise_for_status()
delete_response = await client.delete(
self._gitea.api_url(
f"/repos/{owner}/{repo_name}/contents/{old_path}"
),
headers=self._gitea.headers(__user__),
json={
"message": message,
"branch": effective_branch,
"sha": old_sha,
},
)
delete_response.raise_for_status()
commit_sha = (
create_response.json().get("commit", {}).get("sha", "unknown")[:8]
)
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
message_text = f"✅ **File Renamed Successfully**\n\n"
message_text += f"**Old Path:** `{old_path}`\n"
message_text += f"**New Path:** `{new_path}`\n"
message_text += f"**Branch:** `{effective_branch}`\n"
message_text += f"**Commit:** `{commit_sha}`\n"
message_text += f"**Message:** {message}\n"
retVal["status"] = "success"
retVal["message"] = message_text
return retVal
except httpx.HTTPStatusError as e:
error_msg = self._gitea.format_error(e, "file rename")
retVal["message"] = f"Failed to rename file. {error_msg}"
return retVal
except Exception as e:
retVal["message"] = (
f"Unexpected failure during file rename: {type(e).__name__}: {e}"
)
return retVal
async def get_file(
self,
path: str,
repo: Optional[str] = None,
__user__: dict = None,
__metadata__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> dict:
"""Get the contents of a file from the repository"""
retVal = {"status": "failure", "message": ""}
effective_branch = self._gitea.get_branch(__user__, __metadata__)
token = self._gitea.get_token(__user__)
if not token:
retVal["message"] = "GITEA_TOKEN not configured in UserValves settings."
return retVal
try:
owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__)
except ValueError as e:
retVal["message"] = str(e)
return retVal
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Reading file {path}...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.get(
self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._gitea.headers(__user__),
params={"ref": effective_branch},
)
response.raise_for_status()
file_info = response.json()
if isinstance(file_info, list):
retVal["message"] = (
f"'{path}' is a directory. Use `list_files()` to browse its contents."
)
return retVal
if file_info.get("type") != "file":
retVal["message"] = (
f"'{path}' is not a file (type: {file_info.get('type')})"
)
return retVal
content_b64 = file_info.get("content", "")
try:
content = base64.b64decode(content_b64).decode("utf-8")
except Exception:
retVal["message"] = (
"Could not decode file content. The file may be binary."
)
return retVal
size = file_info.get("size", 0)
sha_short = file_info.get("sha", "unknown")[:8]
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
message_text = f"**File:** `{path}`\n"
message_text += f"**Branch:** `{effective_branch}`\n"
message_text += f"**SHA:** `{sha_short}` | **Size:** {size} bytes\n"
message_text += f"**URL:** {file_info.get('html_url', 'N/A')}\n\n"
message_text += f"```\n{content}\n```\n"
retVal["status"] = "success"
retVal["message"] = message_text
return retVal
except httpx.HTTPStatusError as e:
error_msg = self._gitea.format_error(e, f"file fetch for '{path}'")
if e.response.status_code == 404:
retVal["message"] = f"File not found: `{path}`"
else:
retVal["message"] = f"Failed to fetch file. {error_msg}"
return retVal
except Exception as e:
retVal["message"] = f"Unexpected failure: {type(e).__name__}: {e}"
return retVal
async def list_files(
self,
path: str = "",
repo: Optional[str] = None,
__user__: dict = None,
__metadata__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> dict:
"""List files and directories in a repository path"""
retVal = {"status": "failure", "message": ""}
effective_branch = self._gitea.get_branch(__user__, __metadata__)
token = self._gitea.get_token(__user__)
if not token:
retVal["message"] = "GITEA_TOKEN not configured in UserValves settings."
return retVal
try:
owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__)
except ValueError as e:
retVal["message"] = str(e)
return retVal
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Listing {path or 'root'}...",
"done": False,
},
}
)
try:
async with httpx.AsyncClient(
timeout=30.0, verify=self.valves.VERIFY_SSL
) as client:
response = await client.get(
self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
headers=self._gitea.headers(__user__),
params={"ref": effective_branch},
)
response.raise_for_status()
contents = response.json()
if isinstance(contents, dict):
retVal["message"] = (
f"'{path}' is a file. Use `get_file()` to read its contents."
)
return retVal
message = f"**Contents of {owner}/{repo_name}/{path or '.'}** (`{effective_branch}`)\n\n"
dirs = [item for item in contents if item.get("type") == "dir"]
files = [item for item in contents if item.get("type") == "file"]
if dirs:
message += "**📁 Directories:**\n"
for item in sorted(dirs, key=lambda x: x.get("name", "").lower()):
message += f"- `📁 {item.get('name', '')}/`\n"
message += "\n"
if files:
message += "**📄 Files:**\n"
for item in sorted(files, key=lambda x: x.get("name", "").lower()):
size = item.get("size", 0)
if size < 1024:
size_str = f"{size}B"
elif size < 1024 * 1024:
size_str = f"{size//1024}KB"
else:
size_str = f"{size//(1024*1024)}MB"
sha_short = item.get("sha", "unknown")[:8]
message += (
f"- `{item.get('name', '')}` ({size_str}, sha: {sha_short})\n"
)
message += f"\n**Total:** {len(dirs)} directories, {len(files)} files"
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Done", "done": True, "hidden": True},
}
)
retVal["status"] = "success"
retVal["message"] = message
return retVal
except httpx.HTTPStatusError as e:
error_msg = self._gitea.format_error(e, f"directory listing for '{path}'")
if e.response.status_code == 404:
retVal["message"] = f"Path not found: `{path}`"
else:
retVal["message"] = f"Failed to list directory contents. {error_msg}"
return retVal
except Exception as e:
retVal["message"] = f"Unexpected failure: {type(e).__name__}: {e}"
return retVal