## Summary Implements the gitea_coder role as defined in issue #11, providing a complete workflow automation layer for Git operations with scope enforcement. ## Features ### Branch Management with Scope Gating - ✅ Enforces branch naming conventions (feature/, fix/, refactor/, docs/, test/, chore/) - ✅ Prevents direct pushes to protected branches (main, master, develop, dev) - ✅ Auto-appends issue numbers to branch names ### Unified Commit Workflow - ✅ Automatic create vs replace detection - ✅ Conventional commits format with issue references - ✅ Detailed commit message generation ### PR Creation - ✅ Validates source branch is not protected - ✅ Auto-references issues in PR description - ✅ Uses existing gitea/dev.py operations ### Ticket Integration - ✅ Reads and parses issue requirements - ✅ Extracts testing criteria and technical notes - ✅ Suggests branch names from issue content ## Files Added - `gitea/coder.py` - Complete gitea_coder role implementation ## Files Modified - `README.md` - Added gitea_coder documentation ## Testing Criteria ✅ Can create feature branch from ticket ✅ Can modify files according to ticket requirements ✅ Can generate commit messages with issue references ✅ Can create PR for review Refs: #11 Reviewed-on: #20
2890 lines
104 KiB
Python
2890 lines
104 KiB
Python
"""
|
|
title: Gitea Coder - Workflow Role with Automatic Branch Management
|
|
author: Jeff Smith + minimax + Claude
|
|
version: 1.0.0
|
|
license: MIT
|
|
description: High-level workflow role for LLM-based code generation with automatic branch management and quality gates
|
|
requirements: pydantic, httpx
|
|
changelog:
|
|
1.0.0:
|
|
- Initial implementation of gitea_coder role
|
|
- Branch name = chat_id (KISS principle)
|
|
- Stateless design - no caching complexity
|
|
- Branch creation with scope gating (prevents main pushes)
|
|
- Generates detailed commit messages with ticket references
|
|
- Creates PRs from branches
|
|
- Reads ticket requirements from issues (by number or URL)
|
|
- Updates tickets with status
|
|
- Reads PR feedback
|
|
- Unified file operations workflow
|
|
- Diff-based updates with apply_diff()
|
|
- Size delta gating in commit_changes() for quality control
|
|
- Complete CRUD operations: create_file, replace_file, delete_file, rename_file
|
|
- FIX: apply_unified_diff now properly fails when no valid hunks are parsed
|
|
- FIX: apply_diff detects no-op changes and returns failure with guidance
|
|
- FEATURE: read_ticket auto-creates branch for immediate file operations
|
|
- FEATURE: apply_diff returns patched file content for LLM review
|
|
- FEATURE: Persistent session state (repo/issue/branch/pr) survives restarts
|
|
- FEATURE: State enables cheaper models (kimi-k2, qwen3) that lose context
|
|
- FEATURE: get_session_state() and clear_session_state() for debugging
|
|
- FEATURE: list_sessions() finds orphaned work from crashed chats
|
|
- FEATURE: claim_session() continues work from dead sessions (disaster recovery)
|
|
- ENHANCEMENT: Simplified 4-step workflow (read ticket → modify → update ticket → PR)
|
|
- ENHANCEMENT: Ticket updates emphasized as "statement of work" / audit trail
|
|
- ENHANCEMENT: Review checklist for common diff errors (accidental removals, misplacements)
|
|
- ENHANCEMENT: State resolution priority: explicit arg > state file > env var > UserValves > Valves
|
|
- ENHANCEMENT: Branch resolution uses state for session continuity after claim_session
|
|
"""
|
|
|
|
from typing import Optional, Callable, Any, Dict, List, Tuple
|
|
from pydantic import BaseModel, Field
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
import re
|
|
import base64
|
|
import httpx
|
|
import json
|
|
import os
|
|
|
|
|
|
class GiteaHelpers:
|
|
"""
|
|
Helper methods for Gitea API interactions.
|
|
Designed to be mixed into Tools class via composition.
|
|
"""
|
|
|
|
def __init__(self, tools_instance):
|
|
"""Initialize with reference to parent Tools instance for valve access"""
|
|
self.tools = tools_instance
|
|
|
|
@property
|
|
def valves(self):
|
|
"""Access valves from parent Tools instance"""
|
|
return self.tools.valves
|
|
|
|
# ============= STATE PERSISTENCE =============
|
|
# Persists repo/issue/branch across restarts and helps models
|
|
# that struggle with context tracking (kimi-k2, qwen3-code)
|
|
|
|
def _state_dir(self) -> Path:
|
|
"""Get base state directory"""
|
|
base = os.environ.get(
|
|
"GITEA_CODER_STATE_DIR", os.path.expanduser("~/.gitea_coder")
|
|
)
|
|
return Path(base)
|
|
|
|
def _state_path(self, chat_id: str) -> Path:
|
|
"""Get state file path for a chat session"""
|
|
return self._state_dir() / "chats" / chat_id / "state.json"
|
|
|
|
def load_state(self, chat_id: str) -> dict:
|
|
"""Load persisted state for chat session"""
|
|
if not chat_id:
|
|
return {}
|
|
path = self._state_path(chat_id)
|
|
if path.exists():
|
|
try:
|
|
return json.loads(path.read_text())
|
|
except Exception:
|
|
return {}
|
|
return {}
|
|
|
|
def save_state(self, chat_id: str, **updates):
|
|
"""Update and persist state for chat session"""
|
|
if not chat_id:
|
|
return
|
|
try:
|
|
state = self.load_state(chat_id)
|
|
state.update(updates)
|
|
state["updated_at"] = datetime.utcnow().isoformat()
|
|
|
|
path = self._state_path(chat_id)
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text(json.dumps(state, indent=2))
|
|
except Exception:
|
|
pass # Best effort - don't fail operations on state save errors
|
|
|
|
def get_state_summary(self, chat_id: str) -> str:
|
|
"""Get human-readable state summary for debugging"""
|
|
state = self.load_state(chat_id)
|
|
if not state:
|
|
return "No persisted state"
|
|
parts = []
|
|
if state.get("repo"):
|
|
parts.append(f"repo=`{state['repo']}`")
|
|
if state.get("issue"):
|
|
parts.append(f"issue=#{state['issue']}")
|
|
if state.get("branch"):
|
|
parts.append(f"branch=`{state['branch'][:8]}...`")
|
|
return ", ".join(parts) if parts else "Empty state"
|
|
|
|
# ============= API HELPERS =============
|
|
|
|
def api_url(self, endpoint: str) -> str:
|
|
"""Construct full API URL for Gitea endpoint"""
|
|
base = self.get_url()
|
|
return f"{base}/api/v1{endpoint}"
|
|
|
|
def get_url(self) -> str:
|
|
"""Get effective Gitea URL with trailing slash handling"""
|
|
return self.valves.GITEA_URL.rstrip("/")
|
|
|
|
def get_token(self, __user__: dict = None) -> str:
|
|
"""Extract Gitea token from user context"""
|
|
if __user__ and "valves" in __user__:
|
|
user_valves = __user__.get("valves")
|
|
if user_valves:
|
|
token = getattr(user_valves, "GITEA_TOKEN", "")
|
|
if token:
|
|
return token
|
|
return ""
|
|
|
|
def headers(self, __user__: dict = None) -> dict:
|
|
"""Generate authentication headers with token"""
|
|
token = self.get_token(__user__)
|
|
headers = {"Content-Type": "application/json"}
|
|
if token:
|
|
headers["Authorization"] = f"token {token}"
|
|
return headers
|
|
|
|
def format_error(self, e: httpx.HTTPStatusError, context: str = "") -> str:
|
|
"""Format HTTP error with detailed context"""
|
|
try:
|
|
error_json = e.response.json()
|
|
error_msg = error_json.get("message", e.response.text[:200])
|
|
except Exception:
|
|
error_msg = e.response.text[:200]
|
|
|
|
context_str = f" ({context})" if context else ""
|
|
return f"HTTP Error {e.response.status_code}{context_str}: {error_msg}"
|
|
|
|
def get_repo(
|
|
self, repo: Optional[str], __user__: dict = None, __metadata__: dict = None
|
|
) -> str:
|
|
"""
|
|
Get effective repository with priority resolution:
|
|
1. Explicit arg (always wins)
|
|
2. Persistent state (learned from previous operations)
|
|
3. Environment variable GITEA_DEFAULT_REPO (k8s/cron)
|
|
4. UserValves override (interactive)
|
|
5. Admin Valves default
|
|
"""
|
|
# 1. Explicit arg always wins
|
|
if repo:
|
|
return repo
|
|
|
|
# 2. Check persistent state (helps models that lose track)
|
|
if __metadata__:
|
|
chat_id = __metadata__.get("chat_id")
|
|
if chat_id:
|
|
state = self.load_state(chat_id)
|
|
if state.get("repo"):
|
|
return state["repo"]
|
|
|
|
# 3. Environment variable (k8s/cron native)
|
|
env_repo = os.environ.get("GITEA_DEFAULT_REPO")
|
|
if env_repo:
|
|
return env_repo
|
|
|
|
# 4. UserValves override
|
|
if __user__ and "valves" in __user__:
|
|
user_valves = __user__.get("valves")
|
|
if user_valves and self.valves.ALLOW_USER_OVERRIDES:
|
|
user_repo = getattr(user_valves, "USER_DEFAULT_REPO", "")
|
|
if user_repo:
|
|
return user_repo
|
|
|
|
# 5. Admin Valves default
|
|
return self.valves.DEFAULT_REPO
|
|
|
|
def get_branch(self, __user__: dict = None, __metadata__: dict = None) -> str:
|
|
"""
|
|
Get effective branch name with priority resolution:
|
|
1. Claimed session branch (from state file - for disaster recovery)
|
|
2. chat_id from metadata (default - branch = chat)
|
|
3. UserValves override
|
|
4. Fallback to DEFAULT_BRANCH
|
|
"""
|
|
chat_id = None
|
|
if __metadata__:
|
|
chat_id = __metadata__.get("chat_id")
|
|
|
|
# Check if we've claimed another session's branch (disaster recovery)
|
|
if chat_id:
|
|
state = self.load_state(chat_id)
|
|
if state.get("branch"):
|
|
# Use branch from state - could be our own or claimed
|
|
return state["branch"]
|
|
else:
|
|
# No state yet, use chat_id as branch
|
|
return chat_id
|
|
|
|
# Then check user valves override
|
|
if __user__ and "valves" in __user__:
|
|
user_valves = __user__.get("valves")
|
|
if user_valves and self.valves.ALLOW_USER_OVERRIDES:
|
|
user_branch = getattr(user_valves, "USER_DEFAULT_BRANCH", "")
|
|
if user_branch:
|
|
return user_branch
|
|
|
|
# Finally fall back to default
|
|
return self.valves.DEFAULT_BRANCH
|
|
|
|
def resolve_repo(
|
|
self, repo: Optional[str], __user__: dict = None, __metadata__: dict = None
|
|
) -> Tuple[str, str]:
|
|
"""Resolve repository string into owner and repo name"""
|
|
effective_repo = self.get_repo(repo, __user__, __metadata__)
|
|
|
|
if not effective_repo:
|
|
raise ValueError(
|
|
"No repository specified. Set DEFAULT_REPO in Valves, USER_DEFAULT_REPO in UserValves, "
|
|
"or pass repo='owner/name' explicitly."
|
|
)
|
|
|
|
if "/" not in effective_repo:
|
|
raise ValueError(
|
|
f"Repository must be in 'owner/repo' format, got: {effective_repo}"
|
|
)
|
|
|
|
return tuple(effective_repo.split("/", 1))
|
|
|
|
def parse_issue_url(
|
|
self, url: str
|
|
) -> Tuple[Optional[str], Optional[str], Optional[int]]:
|
|
"""
|
|
Parse a Gitea issue URL into components.
|
|
Format: https://<domain>/<owner>/<repo>/issues/<number>
|
|
|
|
Returns:
|
|
Tuple of (owner, repo, issue_number) or (None, None, None) if invalid
|
|
"""
|
|
# Match: https://domain/owner/repo/issues/123
|
|
match = re.match(r"https?://[^/]+/([^/]+)/([^/]+)/issues/(\d+)", url)
|
|
if match:
|
|
owner = match.group(1)
|
|
repo = match.group(2)
|
|
issue_number = int(match.group(3))
|
|
return (owner, repo, issue_number)
|
|
return (None, None, None)
|
|
|
|
def parse_issue_refs(self, text: str) -> List[str]:
|
|
"""Extract issue references from text (e.g., #42, issue #42)"""
|
|
refs = re.findall(r"#(\d+)", text)
|
|
issue_refs = [f"#{ref}" for ref in refs]
|
|
|
|
# Also check for "issue N" pattern
|
|
issue_n_refs = re.findall(r"issue\s*#?(\d+)", text, re.IGNORECASE)
|
|
for ref in issue_n_refs:
|
|
issue_ref = f"#{ref}"
|
|
if issue_ref not in issue_refs:
|
|
issue_refs.append(issue_ref)
|
|
|
|
return issue_refs
|
|
|
|
def apply_unified_diff(
|
|
self, current_content: str, diff_content: str
|
|
) -> Tuple[Optional[str], str]:
|
|
"""
|
|
Apply a unified diff to content.
|
|
|
|
Args:
|
|
current_content: Current file content
|
|
diff_content: Unified diff patch
|
|
|
|
Returns:
|
|
Tuple of (new_content, error_message)
|
|
- On success: (new_content, "")
|
|
- On failure: (None, error_description)
|
|
"""
|
|
try:
|
|
diff_lines = diff_content.splitlines(keepends=True)
|
|
|
|
# Validate diff format - check for hunk headers
|
|
has_hunk_header = any(line.startswith("@@") for line in diff_lines)
|
|
has_add_or_remove = any(
|
|
line.startswith("+") or line.startswith("-")
|
|
for line in diff_lines
|
|
if not line.startswith("+++") and not line.startswith("---")
|
|
)
|
|
|
|
if not has_hunk_header:
|
|
return (
|
|
None,
|
|
"No hunk headers (@@) found. Unified diff format required.",
|
|
)
|
|
|
|
if not has_add_or_remove:
|
|
return (None, "No additions (+) or deletions (-) found in diff.")
|
|
|
|
# Parse hunks from unified diff
|
|
hunks = []
|
|
current_hunk = None
|
|
in_hunk = False
|
|
|
|
for line in diff_lines:
|
|
if line.startswith("---") or line.startswith("+++"):
|
|
continue # Skip file headers
|
|
elif line.startswith("@@"):
|
|
if current_hunk:
|
|
hunks.append(current_hunk)
|
|
|
|
# Parse hunk header: @@ -old_line,old_count +new_line,new_count @@
|
|
match = re.search(
|
|
r"@@\s+-(\d+)(?:,(\d+))?\s+\+(\d+)(?:,(\d+))?\s+@@", line
|
|
)
|
|
if match:
|
|
old_start = int(match.group(1))
|
|
new_start = int(match.group(3))
|
|
current_hunk = {
|
|
"old_start": old_start,
|
|
"new_start": new_start,
|
|
"lines": [],
|
|
}
|
|
in_hunk = True
|
|
else:
|
|
return (None, f"Invalid hunk header format: {line.strip()}")
|
|
continue
|
|
elif in_hunk and len(line) > 0 and line[0:1] in ("+", "-", " "):
|
|
if current_hunk:
|
|
current_hunk["lines"].append(line)
|
|
elif in_hunk and line.strip() == "":
|
|
# Empty line in diff - treat as context
|
|
if current_hunk:
|
|
current_hunk["lines"].append(" \n")
|
|
elif in_hunk:
|
|
if current_hunk:
|
|
hunks.append(current_hunk)
|
|
current_hunk = None
|
|
in_hunk = False
|
|
|
|
if current_hunk:
|
|
hunks.append(current_hunk)
|
|
|
|
if not hunks:
|
|
return (None, "No valid hunks could be parsed from the diff content.")
|
|
|
|
# Split content into lines
|
|
old_lines = current_content.splitlines(keepends=False)
|
|
new_lines = list(old_lines)
|
|
|
|
# Apply each hunk in reverse order to maintain correct indices
|
|
for hunk in sorted(hunks, key=lambda h: h["old_start"], reverse=True):
|
|
old_start = hunk["old_start"] - 1 # Convert to 0-indexed
|
|
|
|
# Collect lines to remove and add
|
|
lines_to_remove = []
|
|
lines_to_add = []
|
|
|
|
for line in hunk["lines"]:
|
|
if line.startswith("+"):
|
|
lines_to_add.append(line[1:].rstrip("\n"))
|
|
elif line.startswith("-"):
|
|
lines_to_remove.append(line[1:].rstrip("\n"))
|
|
|
|
# Remove old lines if they match
|
|
if lines_to_remove:
|
|
end_idx = old_start + len(lines_to_remove)
|
|
if end_idx <= len(new_lines):
|
|
actual_lines = new_lines[old_start:end_idx]
|
|
if actual_lines == lines_to_remove:
|
|
del new_lines[old_start:end_idx]
|
|
else:
|
|
# Context mismatch - warn but continue
|
|
del new_lines[old_start:end_idx]
|
|
|
|
# Insert new lines at old_start
|
|
for line in reversed(lines_to_add):
|
|
new_lines.insert(old_start, line)
|
|
|
|
# Reconstruct content with line endings
|
|
new_content = "\n".join(new_lines)
|
|
if new_content and not new_content.endswith("\n"):
|
|
new_content += "\n"
|
|
|
|
return (new_content, "")
|
|
|
|
except Exception as e:
|
|
return (None, f"Diff parsing error: {type(e).__name__}: {e}")
|
|
|
|
def generate_commit_message(
|
|
self,
|
|
change_type: str,
|
|
scope: str,
|
|
description: str,
|
|
issue_refs: Optional[List[str]] = None,
|
|
body: Optional[str] = None,
|
|
) -> str:
|
|
"""
|
|
Generate a conventional commit message.
|
|
Format: <type>(<scope>): <description>
|
|
"""
|
|
# Validate and normalize change type
|
|
valid_types = [
|
|
"feat",
|
|
"fix",
|
|
"docs",
|
|
"style",
|
|
"refactor",
|
|
"test",
|
|
"chore",
|
|
"perf",
|
|
"ci",
|
|
"build",
|
|
"revert",
|
|
]
|
|
if change_type.lower() not in valid_types:
|
|
change_type = "chore"
|
|
|
|
# Build the subject line
|
|
scope_str = f"({scope})" if scope else ""
|
|
message = f"{change_type.lower()}{scope_str}: {description}"
|
|
|
|
# Add issue references to footer
|
|
if issue_refs:
|
|
refs_str = ", ".join(issue_refs)
|
|
footer = f"Refs: {refs_str}"
|
|
|
|
if body:
|
|
body = f"{body}\n\n{footer}"
|
|
else:
|
|
message = f"{message}\n\n{footer}"
|
|
|
|
if body:
|
|
message = f"{message}\n\n{body}"
|
|
|
|
return message
|
|
|
|
def generate_diff_commit_message(self, path: str, diff_content: str) -> str:
|
|
"""Generate a commit message from diff content"""
|
|
file_name = path.split("/")[-1]
|
|
|
|
# Detect change type from diff
|
|
change_type = "chore"
|
|
diff_lines = diff_content.splitlines()
|
|
|
|
if any(
|
|
line.startswith("+def ") or line.startswith("+class ")
|
|
for line in diff_lines
|
|
):
|
|
change_type = "feat"
|
|
elif any(line.startswith("+ return ") for line in diff_lines):
|
|
change_type = "fix"
|
|
elif "test" in path.lower() or "spec" in path.lower():
|
|
change_type = "test"
|
|
elif ".md" in path.lower() or "readme" in path.lower():
|
|
change_type = "docs"
|
|
elif any(line.startswith("-") for line in diff_lines):
|
|
change_type = "refactor"
|
|
|
|
# Extract description from added lines
|
|
added_lines = [
|
|
line[1:].strip()
|
|
for line in diff_lines
|
|
if line.startswith("+") and not line.startswith("+++")
|
|
]
|
|
|
|
description = "update"
|
|
if added_lines:
|
|
for line in added_lines:
|
|
if line and not line.startswith(("import ", "from ")):
|
|
match = re.match(
|
|
r"(def|class|const|var|let|interface|type)\s+(\w+)", line
|
|
)
|
|
if match:
|
|
kind, name = match.groups()
|
|
if kind == "def":
|
|
description = f"add {name}() function"
|
|
break
|
|
elif kind == "class":
|
|
description = f"add {name} class"
|
|
break
|
|
elif not line.startswith((" ", "\t")):
|
|
description = line[:50].rstrip(":")
|
|
if len(line) > 50:
|
|
description += "..."
|
|
break
|
|
|
|
return f"{change_type}({file_name}): {description}"
|
|
|
|
|
|
class Tools:
|
|
"""
|
|
Gitea Coder Role - High-level workflow automation for code generation tasks.
|
|
|
|
ARCHITECTURE:
|
|
- Stateless design - no caching complexity
|
|
- Branch name = chat_id (PERIOD. Nothing else.)
|
|
- All operations are self-contained
|
|
- LLM focuses on code, not infrastructure
|
|
|
|
Workflow:
|
|
reads ticket → creates branch (chat_id) → commits → updates ticket → creates PR
|
|
|
|
Key Features:
|
|
- Branch name IS chat_id (KISS principle)
|
|
- Auto-generates conventional commit messages
|
|
- Quality gates for file changes (size delta validation)
|
|
- Diff-based updates to prevent accidental file replacements
|
|
- Complete CRUD operations (create, replace, delete, rename)
|
|
- Ticket updates for status tracking
|
|
- PR reading for feedback
|
|
"""
|
|
|
|
class Valves(BaseModel):
|
|
"""System-wide configuration for Gitea Coder integration"""
|
|
|
|
GITEA_URL: str = Field(
|
|
default="https://gitea.example.com",
|
|
description="Gitea server URL",
|
|
)
|
|
DEFAULT_REPO: str = Field(
|
|
default="",
|
|
description="Default repository in owner/repo format",
|
|
)
|
|
DEFAULT_BRANCH: str = Field(
|
|
default="main",
|
|
description="Default branch name for base operations",
|
|
)
|
|
ALLOW_USER_OVERRIDES: bool = Field(
|
|
default=True,
|
|
description="Allow users to override defaults via UserValves",
|
|
)
|
|
VERIFY_SSL: bool = Field(
|
|
default=True,
|
|
description="Verify SSL certificates",
|
|
)
|
|
MAX_SIZE_DELTA_PERCENT: float = Field(
|
|
default=50.0,
|
|
description="Maximum allowed file size change percentage (quality gate)",
|
|
ge=1.0,
|
|
le=500.0,
|
|
)
|
|
PROTECTED_BRANCHES: List[str] = Field(
|
|
default=["main", "master", "develop", "dev", "release", "hotfix"],
|
|
description="Branches that cannot be committed to directly",
|
|
)
|
|
|
|
class UserValves(BaseModel):
|
|
"""Per-user configuration"""
|
|
|
|
GITEA_TOKEN: str = Field(
|
|
default="",
|
|
description="Your Gitea API token",
|
|
)
|
|
USER_DEFAULT_REPO: str = Field(
|
|
default="",
|
|
description="Override default repository",
|
|
)
|
|
USER_DEFAULT_BRANCH: str = Field(
|
|
default="",
|
|
description="Override default branch",
|
|
)
|
|
|
|
def __init__(self):
|
|
"""Initialize tools"""
|
|
self.valves = self.Valves()
|
|
self.user_valves = self.UserValves()
|
|
self.citation = True
|
|
|
|
# Initialize helper functions
|
|
self._gitea = GiteaHelpers(self)
|
|
|
|
async def workflow_summary(
|
|
self,
|
|
__user__: dict = None,
|
|
__metadata__: dict = None,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> dict:
|
|
"""Get a summary of available coder workflows and commands"""
|
|
|
|
branch = self._gitea.get_branch(__user__, __metadata__)
|
|
|
|
message = f"""# 🚀 Gitea Coder Workflow Guide
|
|
|
|
## Workflow (4 Steps)
|
|
|
|
1. **Read the ticket:** `read_ticket(issue_number)` or `read_ticket_by_url(url)`
|
|
- Branch `{branch}` is auto-created when you read the ticket
|
|
- Files are immediately accessible
|
|
|
|
2. **Read & modify files:**
|
|
- `get_file(path)` - Read current content
|
|
- `list_files(path)` - Browse repository
|
|
- `apply_diff(path, diff, message)` - Apply changes (preferred)
|
|
- `commit_changes(path, content, message)` - Full file replacement
|
|
|
|
3. **Update the ticket:** `update_ticket(issue_number, comment)` ← **CRITICAL**
|
|
- This is your **statement of work**
|
|
- Document what you did, what you found, decisions made
|
|
- Update throughout the process, not just at the end
|
|
|
|
4. **Create PR:** `create_pull_request(title, description)`
|
|
- Use `read_pull_request(pr_number)` to check feedback
|
|
|
|
## Available Commands
|
|
|
|
### 📋 Ticket Operations (Statement of Work)
|
|
- `read_ticket(issue_number)` - Get issue details, auto-creates branch
|
|
- `read_ticket_by_url(url)` - Get issue details by URL, auto-creates branch
|
|
- `update_ticket(issue_number, comment)` - **Document your work on the ticket**
|
|
|
|
### 🌿 Branch Management
|
|
- `create_branch()` - Manually create branch (usually not needed)
|
|
- `get_branch_status()` - See current working branch
|
|
- `list_branches()` - List all branches in repository
|
|
|
|
### 🧠 Session State (Auto-managed)
|
|
- `get_session_state()` - View persisted repo/issue/branch context
|
|
- `clear_session_state()` - Reset session (start fresh)
|
|
- `list_sessions()` - Find orphaned sessions from crashed chats
|
|
- `claim_session(chat_id)` - Continue work from a dead session
|
|
|
|
### 📝 File Operations
|
|
- `get_file(path)` - Read file content
|
|
- `list_files(path)` - List directory contents
|
|
- `apply_diff(path, diff, message)` - Apply unified diff patch (preferred)
|
|
- `commit_changes(path, content, message)` - Commit with size delta gate
|
|
- `replace_file(path, content, message)` - Replace entire file
|
|
- `create_file(path, content, message)` - Create new file
|
|
- `delete_file(path, message)` - Delete a file
|
|
- `rename_file(old_path, new_path, message)` - Rename a file
|
|
|
|
### 📦 Pull Request Operations
|
|
- `create_pull_request(title, description)` - Create PR from current branch
|
|
- `read_pull_request(pr_number)` - Get PR details and review feedback
|
|
|
|
## Current Session
|
|
|
|
**Working Branch:** `{branch}` (same as chat_id)
|
|
|
|
## Diff Format Guide
|
|
|
|
When using `apply_diff()`, provide a **unified diff** format:
|
|
|
|
```diff
|
|
--- a/path/to/file.js
|
|
+++ b/path/to/file.js
|
|
@@ -10,3 +10,4 @@
|
|
existing line (context)
|
|
+new line to add
|
|
another existing line
|
|
-line to remove
|
|
```
|
|
|
|
**Key elements:**
|
|
- `@@` hunk headers are REQUIRED
|
|
- Lines starting with `+` are additions
|
|
- Lines starting with `-` are deletions
|
|
- Lines starting with ` ` (space) are context
|
|
|
|
## Best Practices
|
|
|
|
1. **Always update the ticket** - It's your audit trail and statement of work
|
|
2. **Use `apply_diff()` for changes** - More precise, prevents accidents
|
|
3. **Read files before modifying** - Understand current state first
|
|
4. **Commit messages matter** - They're auto-generated but can be customized
|
|
"""
|
|
|
|
return {"status": "success", "message": message}
|
|
|
|
async def read_ticket(
|
|
self,
|
|
issue_number: int,
|
|
repo: Optional[str] = None,
|
|
__user__: dict = None,
|
|
__metadata__: dict = None,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> dict:
|
|
"""
|
|
Read and parse a ticket/issue by number.
|
|
|
|
Automatically creates the working branch (chat_id) if it doesn't exist.
|
|
This enables immediate file operations after reading the ticket.
|
|
"""
|
|
|
|
retVal = {"status": "failure", "message": ""}
|
|
|
|
token = self._gitea.get_token(__user__)
|
|
if not token:
|
|
retVal["message"] = "GITEA_TOKEN not configured in UserValves settings."
|
|
return retVal
|
|
|
|
try:
|
|
owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__)
|
|
except ValueError as e:
|
|
retVal["message"] = str(e)
|
|
return retVal
|
|
|
|
# Get branch name (chat_id)
|
|
branch_name = self._gitea.get_branch(__user__, __metadata__)
|
|
branch_status = "unknown"
|
|
|
|
# Auto-create branch for immediate file operations
|
|
if branch_name not in self.valves.PROTECTED_BRANCHES:
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {
|
|
"description": f"Setting up branch {branch_name[:8]}...",
|
|
"done": False,
|
|
},
|
|
}
|
|
)
|
|
|
|
try:
|
|
async with httpx.AsyncClient(
|
|
timeout=30.0, verify=self.valves.VERIFY_SSL
|
|
) as client:
|
|
response = await client.post(
|
|
self._gitea.api_url(f"/repos/{owner}/{repo_name}/branches"),
|
|
headers=self._gitea.headers(__user__),
|
|
json={
|
|
"new_branch_name": branch_name,
|
|
"old_branch_name": self.valves.DEFAULT_BRANCH,
|
|
},
|
|
)
|
|
|
|
if response.status_code == 409:
|
|
branch_status = "exists"
|
|
elif response.status_code in (200, 201):
|
|
branch_status = "created"
|
|
else:
|
|
branch_status = "failed"
|
|
except Exception:
|
|
branch_status = "failed"
|
|
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {
|
|
"description": f"Fetching issue #{issue_number}...",
|
|
"done": False,
|
|
},
|
|
}
|
|
)
|
|
|
|
try:
|
|
async with httpx.AsyncClient(
|
|
timeout=30.0, verify=self.valves.VERIFY_SSL
|
|
) as client:
|
|
response = await client.get(
|
|
self._gitea.api_url(
|
|
f"/repos/{owner}/{repo_name}/issues/{issue_number}"
|
|
),
|
|
headers=self._gitea.headers(__user__),
|
|
)
|
|
response.raise_for_status()
|
|
issue = response.json()
|
|
|
|
title = issue.get("title", "No title")
|
|
body = issue.get("body", "")
|
|
state = issue.get("state", "unknown")
|
|
user = issue.get("user", {}).get("login", "unknown")
|
|
labels = [label.get("name", "") for label in issue.get("labels", [])]
|
|
created_at = issue.get("created_at", "")[:10]
|
|
html_url = issue.get("html_url", "")
|
|
|
|
# Parse body for structured info
|
|
testing_criteria = []
|
|
technical_notes = []
|
|
is_testing_required = "test" in body.lower() or "testing" in body.lower()
|
|
is_docs_required = "documentation" in body.lower() or "docs" in body.lower()
|
|
|
|
if is_testing_required:
|
|
testing_section = re.search(
|
|
r"(?:testing|criteria|test).*?:(.*?)(?:\n\n|$)",
|
|
body,
|
|
re.IGNORECASE | re.DOTALL,
|
|
)
|
|
if testing_section:
|
|
testing_criteria = [
|
|
line.strip().lstrip("-*•")
|
|
for line in testing_section.group(1).split("\n")
|
|
if line.strip()
|
|
]
|
|
|
|
tech_section = re.search(
|
|
r"(?:technical|tech).*?:(.*?)(?:\n\n|$)",
|
|
body,
|
|
re.IGNORECASE | re.DOTALL,
|
|
)
|
|
if tech_section:
|
|
technical_notes = [
|
|
line.strip().lstrip("-*•")
|
|
for line in tech_section.group(1).split("\n")
|
|
if line.strip()
|
|
]
|
|
|
|
issue_refs = self._gitea.parse_issue_refs(body)
|
|
if not any(ref == f"#{issue_number}" for ref in issue_refs):
|
|
issue_refs.insert(0, f"#{issue_number}")
|
|
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {"description": "Done", "done": True, "hidden": True},
|
|
}
|
|
)
|
|
|
|
# Build message
|
|
message = f"# 📋 Ticket #{issue_number}: {title}\n\n"
|
|
message += f"**State:** {state.upper()} | **Author:** @{user} | **Created:** {created_at}\n"
|
|
message += f"**Labels:** {', '.join(labels) if labels else 'None'}\n"
|
|
message += f"**URL:** {html_url}\n\n"
|
|
|
|
# Show branch status
|
|
if branch_status == "created":
|
|
message += f"🌿 **Branch Created:** `{branch_name}` (from `{self.valves.DEFAULT_BRANCH}`)\n\n"
|
|
elif branch_status == "exists":
|
|
message += f"🌿 **Branch Ready:** `{branch_name}` (already exists)\n\n"
|
|
elif branch_status == "failed":
|
|
message += f"⚠️ **Branch:** Could not create `{branch_name}` - you may need to call `create_branch()`\n\n"
|
|
|
|
if body:
|
|
message += "## 📝 Description\n\n"
|
|
if len(body) > 1000:
|
|
message += f"{body[:1000]}...\n\n"
|
|
else:
|
|
message += f"{body}\n\n"
|
|
|
|
message += "## 🧪 Testing Requirements\n\n"
|
|
if is_testing_required:
|
|
if testing_criteria:
|
|
message += "**Testing Criteria:**\n"
|
|
for criterion in testing_criteria:
|
|
message += f"- [ ] {criterion}\n"
|
|
message += "\n"
|
|
else:
|
|
message += "Testing required, but no specific criteria listed.\n\n"
|
|
else:
|
|
message += "No explicit testing requirements detected.\n\n"
|
|
|
|
if technical_notes:
|
|
message += "## 🔧 Technical Notes\n\n"
|
|
for note in technical_notes:
|
|
message += f"- {note}\n"
|
|
message += "\n"
|
|
|
|
if is_docs_required:
|
|
message += "## 📚 Documentation Required\n\n"
|
|
message += "This ticket mentions documentation needs.\n\n"
|
|
|
|
if issue_refs:
|
|
message += "## 🔗 Related Issues\n\n"
|
|
for ref in issue_refs:
|
|
message += f"- {ref}\n"
|
|
message += "\n"
|
|
|
|
message += "## 🚀 Workflow\n\n"
|
|
message += f"1. **Read files:** `get_file(path)` or `list_files(path)` - branch `{branch_name}` is ready\n"
|
|
message += "2. **Make changes:** `apply_diff()` or `commit_changes()`\n"
|
|
message += f"3. **Update ticket:** `update_ticket({issue_number}, comment)` ← **Document your work!**\n"
|
|
message += "4. **Create PR:** `create_pull_request(title)`\n\n"
|
|
message += "---\n"
|
|
message += f"💡 **Important:** Always update the ticket with your progress and findings.\n"
|
|
message += f"The ticket is your **statement of work** - it documents what was done and why.\n"
|
|
|
|
# Persist state for context recovery (helps models that lose track)
|
|
if __metadata__ and __metadata__.get("chat_id"):
|
|
self._gitea.save_state(
|
|
__metadata__["chat_id"],
|
|
repo=f"{owner}/{repo_name}",
|
|
issue=issue_number,
|
|
branch=branch_name,
|
|
)
|
|
|
|
retVal["status"] = "success"
|
|
retVal["message"] = message
|
|
return retVal
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
error_msg = self._gitea.format_error(e, f"issue #{issue_number}")
|
|
if e.response.status_code == 404:
|
|
retVal["message"] = (
|
|
f"Issue #{issue_number} not found in {owner}/{repo_name}."
|
|
)
|
|
else:
|
|
retVal["message"] = f"Failed to fetch issue. {error_msg}"
|
|
return retVal
|
|
except Exception as e:
|
|
retVal["message"] = f"Unexpected failure: {type(e).__name__}: {e}"
|
|
return retVal
|
|
|
|
async def read_ticket_by_url(
|
|
self,
|
|
url: str,
|
|
__user__: dict = None,
|
|
__metadata__: dict = None,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> dict:
|
|
"""
|
|
Read and parse a ticket/issue by URL.
|
|
Format: https://<domain>/<owner>/<repo>/issues/<number>
|
|
|
|
Automatically creates the working branch (chat_id) if it doesn't exist.
|
|
This enables immediate file operations after reading the ticket.
|
|
"""
|
|
|
|
retVal = {"status": "failure", "message": ""}
|
|
|
|
# Parse URL
|
|
owner, repo, issue_number = self._gitea.parse_issue_url(url)
|
|
|
|
if not owner or not repo or not issue_number:
|
|
retVal["message"] = (
|
|
f"Invalid issue URL format. Expected: https://domain/owner/repo/issues/number\nGot: {url}"
|
|
)
|
|
return retVal
|
|
|
|
# Use the standard read_ticket with parsed components
|
|
return await self.read_ticket(
|
|
issue_number=issue_number,
|
|
repo=f"{owner}/{repo}",
|
|
__user__=__user__,
|
|
__metadata__=__metadata__,
|
|
__event_emitter__=__event_emitter__,
|
|
)
|
|
|
|
async def update_ticket(
|
|
self,
|
|
issue_number: int,
|
|
comment: str,
|
|
repo: Optional[str] = None,
|
|
__user__: dict = None,
|
|
__metadata__: dict = None,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> dict:
|
|
"""
|
|
Post a status update comment to a ticket.
|
|
|
|
THIS IS YOUR STATEMENT OF WORK.
|
|
|
|
Use this to document:
|
|
- What you analyzed and found
|
|
- What changes you made and why
|
|
- Decisions and trade-offs
|
|
- Testing performed
|
|
- Any blockers or questions
|
|
|
|
Update the ticket throughout your work, not just at the end.
|
|
The ticket comment history is the audit trail of the work performed.
|
|
"""
|
|
|
|
retVal = {"status": "failure", "message": ""}
|
|
|
|
token = self._gitea.get_token(__user__)
|
|
if not token:
|
|
retVal["message"] = "GITEA_TOKEN not configured in UserValves settings."
|
|
return retVal
|
|
|
|
try:
|
|
owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__)
|
|
except ValueError as e:
|
|
retVal["message"] = str(e)
|
|
return retVal
|
|
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {
|
|
"description": f"Updating issue #{issue_number}...",
|
|
"done": False,
|
|
},
|
|
}
|
|
)
|
|
|
|
try:
|
|
async with httpx.AsyncClient(
|
|
timeout=30.0, verify=self.valves.VERIFY_SSL
|
|
) as client:
|
|
response = await client.post(
|
|
self._gitea.api_url(
|
|
f"/repos/{owner}/{repo_name}/issues/{issue_number}/comments"
|
|
),
|
|
headers=self._gitea.headers(__user__),
|
|
json={"body": comment},
|
|
)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {"description": "Done", "done": True, "hidden": True},
|
|
}
|
|
)
|
|
|
|
retVal["status"] = "success"
|
|
retVal["message"] = f"✅ Updated issue #{issue_number} with status comment"
|
|
return retVal
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
error_msg = self._gitea.format_error(e, f"issue #{issue_number} update")
|
|
retVal["message"] = f"Failed to update issue. {error_msg}"
|
|
return retVal
|
|
except Exception as e:
|
|
retVal["message"] = f"Unexpected failure: {type(e).__name__}: {e}"
|
|
return retVal
|
|
|
|
async def create_branch(
|
|
self,
|
|
repo: Optional[str] = None,
|
|
__user__: dict = None,
|
|
__metadata__: dict = None,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> dict:
|
|
"""
|
|
Create a new branch with name = chat_id.
|
|
KISS: Branch name IS chat_id. Nothing else.
|
|
"""
|
|
|
|
retVal = {"status": "failure", "message": ""}
|
|
|
|
token = self._gitea.get_token(__user__)
|
|
if not token:
|
|
retVal["message"] = "GITEA_TOKEN not configured in UserValves settings."
|
|
return retVal
|
|
|
|
try:
|
|
owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__)
|
|
except ValueError as e:
|
|
retVal["message"] = str(e)
|
|
return retVal
|
|
|
|
# Branch name IS chat_id
|
|
branch_name = self._gitea.get_branch(__user__, __metadata__)
|
|
|
|
# Check if protected branch
|
|
if branch_name in self.valves.PROTECTED_BRANCHES:
|
|
retVal["message"] = (
|
|
f"❌ Cannot create branch with protected name '{branch_name}'"
|
|
)
|
|
return retVal
|
|
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {
|
|
"description": f"Creating branch {branch_name}...",
|
|
"done": False,
|
|
},
|
|
}
|
|
)
|
|
|
|
# Get base branch
|
|
base_branch = self.valves.DEFAULT_BRANCH
|
|
|
|
try:
|
|
async with httpx.AsyncClient(
|
|
timeout=30.0, verify=self.valves.VERIFY_SSL
|
|
) as client:
|
|
response = await client.post(
|
|
self._gitea.api_url(f"/repos/{owner}/{repo_name}/branches"),
|
|
headers=self._gitea.headers(__user__),
|
|
json={
|
|
"new_branch_name": branch_name,
|
|
"old_branch_name": base_branch,
|
|
},
|
|
)
|
|
|
|
# Handle branch already exists
|
|
if response.status_code == 409:
|
|
retVal["status"] = "success"
|
|
retVal["message"] = f"⚠️ Branch `{branch_name}` already exists."
|
|
return retVal
|
|
|
|
response.raise_for_status()
|
|
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {"description": "Done", "done": True, "hidden": True},
|
|
}
|
|
)
|
|
|
|
# Persist state for context recovery
|
|
if __metadata__ and __metadata__.get("chat_id"):
|
|
self._gitea.save_state(
|
|
__metadata__["chat_id"],
|
|
repo=f"{owner}/{repo_name}",
|
|
branch=branch_name,
|
|
)
|
|
|
|
retVal["status"] = "success"
|
|
retVal[
|
|
"message"
|
|
] = f"""✅ **Branch Created Successfully**
|
|
|
|
**Branch:** `{branch_name}` (chat_id)
|
|
**Base Branch:** `{base_branch}`
|
|
**Repository:** `{owner}/{repo_name}`
|
|
|
|
**Next Steps:**
|
|
1. Make changes to files using `apply_diff()` or `commit_changes()`
|
|
2. Update ticket with status: `update_ticket(issue_number, comment)`
|
|
3. Create PR: `create_pull_request(title)`
|
|
"""
|
|
return retVal
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
error_msg = self._gitea.format_error(e, "branch creation")
|
|
retVal["message"] = f"Failed to create branch. {error_msg}"
|
|
return retVal
|
|
except Exception as e:
|
|
retVal["message"] = f"Unexpected failure: {type(e).__name__}: {e}"
|
|
return retVal
|
|
|
|
async def get_branch_status(
|
|
self,
|
|
repo: Optional[str] = None,
|
|
__user__: dict = None,
|
|
__metadata__: dict = None,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> dict:
|
|
"""Get the current working branch status"""
|
|
|
|
retVal = {"status": "success", "message": ""}
|
|
|
|
try:
|
|
owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__)
|
|
except ValueError as e:
|
|
retVal["status"] = "failure"
|
|
retVal["message"] = str(e)
|
|
return retVal
|
|
|
|
# Get branch from metadata (chat_id)
|
|
working_branch = self._gitea.get_branch(__user__, __metadata__)
|
|
|
|
message = f"# 🌿 Branch Status: {owner}/{repo_name}\n\n"
|
|
message += f"**Current Branch:** `{working_branch}` (chat_id)\n\n"
|
|
|
|
if __metadata__ and __metadata__.get("chat_id"):
|
|
message += f"**Chat ID:** `{__metadata__.get('chat_id')}`\n\n"
|
|
|
|
message += "All file operations will use this branch.\n"
|
|
|
|
retVal["message"] = message
|
|
return retVal
|
|
|
|
async def get_session_state(
|
|
self,
|
|
__user__: dict = None,
|
|
__metadata__: dict = None,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> dict:
|
|
"""
|
|
View the persisted session state.
|
|
|
|
State is automatically saved when you read tickets, create branches,
|
|
or make commits. This helps maintain context across operations.
|
|
"""
|
|
retVal = {"status": "success", "message": ""}
|
|
|
|
chat_id = __metadata__.get("chat_id") if __metadata__ else None
|
|
|
|
if not chat_id:
|
|
retVal["message"] = (
|
|
"No chat_id available - session state requires a chat context."
|
|
)
|
|
return retVal
|
|
|
|
state = self._gitea.load_state(chat_id)
|
|
|
|
if not state:
|
|
retVal[
|
|
"message"
|
|
] = f"""# 🧠 Session State
|
|
|
|
**Chat ID:** `{chat_id}`
|
|
**State:** Empty (no operations performed yet)
|
|
|
|
Session state is automatically populated when you:
|
|
- Read a ticket (`read_ticket` / `read_ticket_by_url`)
|
|
- Create a branch (`create_branch`)
|
|
- Make commits
|
|
|
|
This state helps maintain context across operations.
|
|
"""
|
|
return retVal
|
|
|
|
message = f"# 🧠 Session State\n\n"
|
|
message += f"**Chat ID:** `{chat_id}`\n\n"
|
|
|
|
if state.get("repo"):
|
|
message += f"**Repository:** `{state['repo']}`\n"
|
|
if state.get("issue"):
|
|
message += f"**Issue:** #{state['issue']}\n"
|
|
if state.get("branch"):
|
|
message += f"**Branch:** `{state['branch']}`\n"
|
|
if state.get("updated_at"):
|
|
message += f"**Last Updated:** {state['updated_at']}\n"
|
|
|
|
message += "\nThis state is automatically used when repo/issue is not explicitly provided."
|
|
|
|
retVal["message"] = message
|
|
return retVal
|
|
|
|
async def clear_session_state(
|
|
self,
|
|
__user__: dict = None,
|
|
__metadata__: dict = None,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> dict:
|
|
"""
|
|
Clear the persisted session state.
|
|
|
|
Use this to start fresh or switch to a different repository/issue.
|
|
"""
|
|
retVal = {"status": "success", "message": ""}
|
|
|
|
chat_id = __metadata__.get("chat_id") if __metadata__ else None
|
|
|
|
if not chat_id:
|
|
retVal["message"] = "No chat_id available - nothing to clear."
|
|
return retVal
|
|
|
|
state_path = self._gitea._state_path(chat_id)
|
|
|
|
if state_path.exists():
|
|
try:
|
|
state_path.unlink()
|
|
retVal["message"] = (
|
|
f"✅ Session state cleared for chat `{chat_id[:8]}...`\n\nYou can now start fresh with a new repository or issue."
|
|
)
|
|
except Exception as e:
|
|
retVal["status"] = "failure"
|
|
retVal["message"] = f"Failed to clear state: {e}"
|
|
else:
|
|
retVal["message"] = "Session state was already empty."
|
|
|
|
return retVal
|
|
|
|
async def list_sessions(
|
|
self,
|
|
__user__: dict = None,
|
|
__metadata__: dict = None,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> dict:
|
|
"""
|
|
List all persisted sessions with their metadata.
|
|
|
|
Use this to find orphaned work from crashed/dead sessions.
|
|
Sessions can be claimed with `claim_session(chat_id)` to continue work.
|
|
"""
|
|
retVal = {"status": "success", "message": ""}
|
|
|
|
current_chat_id = __metadata__.get("chat_id") if __metadata__ else None
|
|
state_dir = self._gitea._state_dir() / "chats"
|
|
|
|
if not state_dir.exists():
|
|
retVal["message"] = "No sessions found. State directory does not exist."
|
|
return retVal
|
|
|
|
sessions = []
|
|
for chat_dir in state_dir.iterdir():
|
|
if chat_dir.is_dir():
|
|
state_file = chat_dir / "state.json"
|
|
if state_file.exists():
|
|
try:
|
|
state = json.loads(state_file.read_text())
|
|
state["chat_id"] = chat_dir.name
|
|
state["is_current"] = chat_dir.name == current_chat_id
|
|
sessions.append(state)
|
|
except Exception:
|
|
pass
|
|
|
|
if not sessions:
|
|
retVal["message"] = "No sessions found."
|
|
return retVal
|
|
|
|
# Sort by updated_at descending (most recent first)
|
|
sessions.sort(key=lambda x: x.get("updated_at", ""), reverse=True)
|
|
|
|
message = "# 📋 Saved Sessions\n\n"
|
|
message += f"**Current Chat:** `{current_chat_id[:8] if current_chat_id else 'unknown'}...`\n\n"
|
|
message += "---\n\n"
|
|
|
|
for s in sessions:
|
|
chat_id = s.get("chat_id", "unknown")
|
|
is_current = s.get("is_current", False)
|
|
|
|
marker = " ← **CURRENT**" if is_current else ""
|
|
message += f"### `{chat_id[:8]}...`{marker}\n\n"
|
|
|
|
if s.get("repo"):
|
|
message += f"- **Repo:** `{s['repo']}`\n"
|
|
if s.get("issue"):
|
|
message += f"- **Issue:** #{s['issue']}\n"
|
|
if s.get("branch"):
|
|
message += f"- **Branch:** `{s['branch'][:12]}...`\n"
|
|
if s.get("pr_number"):
|
|
message += f"- **PR:** #{s['pr_number']}\n"
|
|
if s.get("updated_at"):
|
|
message += f"- **Last Activity:** {s['updated_at']}\n"
|
|
|
|
if not is_current:
|
|
message += f'\n→ `claim_session("{chat_id}")` to continue this work\n'
|
|
|
|
message += "\n---\n\n"
|
|
|
|
message += f"**Total Sessions:** {len(sessions)}\n"
|
|
|
|
retVal["message"] = message
|
|
return retVal
|
|
|
|
async def claim_session(
|
|
self,
|
|
session_id: str,
|
|
__user__: dict = None,
|
|
__metadata__: dict = None,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> dict:
|
|
"""
|
|
Claim an orphaned session to continue its work.
|
|
|
|
Use `list_sessions()` to find available sessions.
|
|
This copies the session's state (repo, issue, branch) to the current chat,
|
|
allowing you to continue work on an existing branch after a session death.
|
|
|
|
IMPORTANT: The branch name in Gitea will still be the old chat_id.
|
|
This function updates the tool's working context, not the Gitea branch name.
|
|
"""
|
|
retVal = {"status": "failure", "message": ""}
|
|
|
|
current_chat_id = __metadata__.get("chat_id") if __metadata__ else None
|
|
|
|
if not current_chat_id:
|
|
retVal["message"] = "No current chat_id available."
|
|
return retVal
|
|
|
|
if session_id == current_chat_id:
|
|
retVal["message"] = "Cannot claim your own session - already active."
|
|
retVal["status"] = "success"
|
|
return retVal
|
|
|
|
# Load the orphaned session's state
|
|
orphan_state = self._gitea.load_state(session_id)
|
|
|
|
if not orphan_state:
|
|
retVal["message"] = (
|
|
f"Session `{session_id[:8]}...` not found or has no state."
|
|
)
|
|
return retVal
|
|
|
|
# Copy state to current session, preserving the original branch name
|
|
self._gitea.save_state(
|
|
current_chat_id,
|
|
repo=orphan_state.get("repo"),
|
|
issue=orphan_state.get("issue"),
|
|
branch=orphan_state.get("branch"), # Keep original branch name!
|
|
pr_number=orphan_state.get("pr_number"),
|
|
claimed_from=session_id,
|
|
)
|
|
|
|
message = f"✅ **Session Claimed**\n\n"
|
|
message += f"**Claimed From:** `{session_id[:8]}...`\n"
|
|
message += f"**Current Chat:** `{current_chat_id[:8]}...`\n\n"
|
|
message += "**Inherited State:**\n"
|
|
|
|
if orphan_state.get("repo"):
|
|
message += f"- **Repo:** `{orphan_state['repo']}`\n"
|
|
if orphan_state.get("issue"):
|
|
message += f"- **Issue:** #{orphan_state['issue']}\n"
|
|
if orphan_state.get("branch"):
|
|
message += f"- **Branch:** `{orphan_state['branch']}`\n"
|
|
if orphan_state.get("pr_number"):
|
|
message += f"- **PR:** #{orphan_state['pr_number']}\n"
|
|
|
|
message += "\n⚠️ **Note:** File operations will use branch `{branch}`, not your current chat_id.\n".format(
|
|
branch=orphan_state.get("branch", "unknown")[:12] + "..."
|
|
)
|
|
message += "This is correct - you're continuing work on the existing branch.\n"
|
|
|
|
retVal["status"] = "success"
|
|
retVal["message"] = message
|
|
return retVal
|
|
|
|
async def list_branches(
|
|
self,
|
|
repo: Optional[str] = None,
|
|
__user__: dict = None,
|
|
__metadata__: dict = None,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> dict:
|
|
"""List all branches in the repository"""
|
|
|
|
retVal = {"status": "failure", "message": ""}
|
|
|
|
token = self._gitea.get_token(__user__)
|
|
if not token:
|
|
retVal["message"] = "GITEA_TOKEN not configured."
|
|
return retVal
|
|
|
|
try:
|
|
owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__)
|
|
except ValueError as e:
|
|
retVal["message"] = str(e)
|
|
return retVal
|
|
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {"description": "Fetching branches...", "done": False},
|
|
}
|
|
)
|
|
|
|
try:
|
|
async with httpx.AsyncClient(
|
|
timeout=30.0, verify=self.valves.VERIFY_SSL
|
|
) as client:
|
|
response = await client.get(
|
|
self._gitea.api_url(f"/repos/{owner}/{repo_name}/branches"),
|
|
headers=self._gitea.headers(__user__),
|
|
params={"limit": 50},
|
|
)
|
|
response.raise_for_status()
|
|
branches = response.json()
|
|
|
|
message = f"# 🌿 Branches in {owner}/{repo_name}\n\n"
|
|
|
|
# Separate protected and other branches
|
|
protected = [b for b in branches if b.get("protected")]
|
|
other = [b for b in branches if not b.get("protected")]
|
|
|
|
if protected:
|
|
message += "## 🛡️ Protected Branches\n\n"
|
|
for branch in sorted(protected, key=lambda x: x["name"]):
|
|
name = branch.get("name", "")
|
|
commit_sha = branch.get("commit", {}).get("id", "")[:8]
|
|
message += f"- `{name}` [commit: {commit_sha}]\n"
|
|
message += "\n"
|
|
|
|
if other:
|
|
message += "## 📦 Other Branches\n\n"
|
|
for branch in sorted(other, key=lambda x: x["name"])[:20]:
|
|
name = branch.get("name", "")
|
|
commit_sha = branch.get("commit", {}).get("id", "")[:8]
|
|
message += f"- `{name}` [commit: {commit_sha}]\n"
|
|
if len(other) > 20:
|
|
message += f"\n... and {len(other) - 20} more branches\n"
|
|
message += "\n"
|
|
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {"description": "Done", "done": True, "hidden": True},
|
|
}
|
|
)
|
|
|
|
retVal["status"] = "success"
|
|
retVal["message"] = message.strip()
|
|
return retVal
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
error_msg = self._gitea.format_error(e, "branch listing")
|
|
retVal["message"] = f"Failed to list branches. {error_msg}"
|
|
return retVal
|
|
except Exception as e:
|
|
retVal["message"] = f"Unexpected failure: {type(e).__name__}: {e}"
|
|
return retVal
|
|
|
|
async def apply_diff(
|
|
self,
|
|
path: str,
|
|
diff_content: str,
|
|
message: Optional[str] = None,
|
|
repo: Optional[str] = None,
|
|
auto_message: bool = True,
|
|
__user__: dict = None,
|
|
__metadata__: dict = None,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> dict:
|
|
"""
|
|
Apply a unified diff patch to a file.
|
|
|
|
This is the PREFERRED method for making changes as it:
|
|
1. Is precise about what changes
|
|
2. Prevents accidental file replacements
|
|
3. Is what LLMs understand best (trained on GitHub PRs)
|
|
|
|
REQUIRED FORMAT - Unified diff with hunk headers:
|
|
```diff
|
|
--- a/path/to/file
|
|
+++ b/path/to/file
|
|
@@ -line,count +line,count @@
|
|
context line
|
|
+added line
|
|
-removed line
|
|
```
|
|
"""
|
|
|
|
retVal = {"status": "failure", "message": ""}
|
|
|
|
# Get working branch from metadata
|
|
effective_branch = self._gitea.get_branch(__user__, __metadata__)
|
|
|
|
token = self._gitea.get_token(__user__)
|
|
if not token:
|
|
retVal["message"] = "GITEA_TOKEN not configured in UserValves settings."
|
|
return retVal
|
|
|
|
try:
|
|
owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__)
|
|
except ValueError as e:
|
|
retVal["message"] = str(e)
|
|
return retVal
|
|
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {
|
|
"description": f"Applying diff to {path}...",
|
|
"done": False,
|
|
},
|
|
}
|
|
)
|
|
|
|
try:
|
|
async with httpx.AsyncClient(
|
|
timeout=30.0, verify=self.valves.VERIFY_SSL
|
|
) as client:
|
|
# Get current file content
|
|
get_response = await client.get(
|
|
self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
|
|
headers=self._gitea.headers(__user__),
|
|
params={"ref": effective_branch},
|
|
)
|
|
|
|
# Check if file exists
|
|
if get_response.status_code == 404:
|
|
retVal["message"] = (
|
|
f"File not found: `{path}`. Use `create_file()` to create a new file."
|
|
)
|
|
return retVal
|
|
|
|
get_response.raise_for_status()
|
|
file_info = get_response.json()
|
|
current_sha = file_info.get("sha")
|
|
|
|
# Decode current content
|
|
current_content_b64 = file_info.get("content", "")
|
|
try:
|
|
current_content = base64.b64decode(current_content_b64).decode(
|
|
"utf-8"
|
|
)
|
|
except Exception:
|
|
retVal["message"] = "Could not decode current file content."
|
|
return retVal
|
|
|
|
# Parse and apply the diff
|
|
new_content, error_msg = self._gitea.apply_unified_diff(
|
|
current_content, diff_content
|
|
)
|
|
|
|
if new_content is None:
|
|
retVal[
|
|
"message"
|
|
] = f"""❌ **Failed to apply diff**
|
|
|
|
**Reason:** {error_msg}
|
|
|
|
**Required format:** Unified diff with hunk headers
|
|
```diff
|
|
--- a/{path}
|
|
+++ b/{path}
|
|
@@ -10,3 +10,4 @@
|
|
existing line (context)
|
|
+new line to add
|
|
-line to remove
|
|
```
|
|
|
|
**Key requirements:**
|
|
- `@@` hunk headers are REQUIRED (e.g., `@@ -10,3 +10,4 @@`)
|
|
- Lines starting with `+` are additions
|
|
- Lines starting with `-` are deletions
|
|
- Lines starting with ` ` (space) are context
|
|
|
|
Use `get_file(path)` to see current content and construct a proper diff.
|
|
"""
|
|
return retVal
|
|
|
|
# Check if diff actually changed anything
|
|
if new_content == current_content:
|
|
retVal[
|
|
"message"
|
|
] = f"""⚠️ **Diff produced no changes**
|
|
|
|
The diff was parsed but resulted in no modifications to the file.
|
|
|
|
**Possible causes:**
|
|
1. The context lines in your diff don't match the actual file content
|
|
2. The hunk line numbers are incorrect
|
|
3. The changes were already applied
|
|
|
|
Use `get_file("{path}")` to see the current content and verify your diff targets the correct lines.
|
|
"""
|
|
return retVal
|
|
|
|
# Generate commit message if needed
|
|
if not message:
|
|
if auto_message:
|
|
message = self._gitea.generate_diff_commit_message(
|
|
path, diff_content
|
|
)
|
|
else:
|
|
retVal["message"] = (
|
|
"Commit message is required when auto_message=False."
|
|
)
|
|
return retVal
|
|
|
|
# Commit the changes
|
|
new_content_b64 = base64.b64encode(new_content.encode("utf-8")).decode(
|
|
"ascii"
|
|
)
|
|
|
|
response = await client.put(
|
|
self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
|
|
headers=self._gitea.headers(__user__),
|
|
json={
|
|
"content": new_content_b64,
|
|
"message": message,
|
|
"branch": effective_branch,
|
|
"sha": current_sha,
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
|
|
commit_sha = result.get("commit", {}).get("sha", "unknown")[:8]
|
|
|
|
# Parse diff stats
|
|
added_lines = len(
|
|
[
|
|
l
|
|
for l in diff_content.splitlines()
|
|
if l.startswith("+") and not l.startswith("+++")
|
|
]
|
|
)
|
|
removed_lines = len(
|
|
[
|
|
l
|
|
for l in diff_content.splitlines()
|
|
if l.startswith("-") and not l.startswith("---")
|
|
]
|
|
)
|
|
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {"description": "Done", "done": True, "hidden": True},
|
|
}
|
|
)
|
|
|
|
# Prepare patched content for review
|
|
# For large files, show a truncated version
|
|
patched_lines = new_content.splitlines()
|
|
total_lines = len(patched_lines)
|
|
|
|
if total_lines <= 100:
|
|
# Small file - show entire content
|
|
patched_preview = new_content
|
|
else:
|
|
# Large file - show first 30 and last 20 lines with indicator
|
|
head = "\n".join(patched_lines[:30])
|
|
tail = "\n".join(patched_lines[-20:])
|
|
patched_preview = (
|
|
f"{head}\n\n... [{total_lines - 50} lines omitted] ...\n\n{tail}"
|
|
)
|
|
|
|
message_text = f"✅ **Diff Applied - REVIEW REQUIRED**\n\n"
|
|
message_text += f"**File:** `{path}`\n"
|
|
message_text += f"**Branch:** `{effective_branch}`\n"
|
|
message_text += f"**Commit:** `{commit_sha}`\n"
|
|
message_text += (
|
|
f"**Changes:** +{added_lines} lines, -{removed_lines} lines\n"
|
|
)
|
|
message_text += f"**Message:** {message}\n\n"
|
|
|
|
message_text += "---\n\n"
|
|
message_text += "## 📋 Review the Patched File\n\n"
|
|
message_text += "**Check for:**\n"
|
|
message_text += "- ❌ Accidental line removals\n"
|
|
message_text += "- ❌ Code placed in wrong location\n"
|
|
message_text += "- ❌ Duplicate code blocks\n"
|
|
message_text += "- ❌ Missing imports or dependencies\n"
|
|
message_text += "- ❌ Broken syntax or indentation\n\n"
|
|
|
|
message_text += f"**Patched Content ({total_lines} lines):**\n\n"
|
|
message_text += f"```\n{patched_preview}\n```\n\n"
|
|
|
|
message_text += "---\n"
|
|
message_text += (
|
|
"⚠️ **If the patch is incorrect**, use `apply_diff()` again to fix, "
|
|
)
|
|
message_text += "or `get_file()` to see current state and `replace_file()` to correct.\n"
|
|
|
|
retVal["status"] = "success"
|
|
retVal["message"] = message_text
|
|
return retVal
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
error_msg = self._gitea.format_error(e, f"diff application to '{path}'")
|
|
if e.response.status_code == 409:
|
|
retVal["message"] = (
|
|
f"Update conflict for `{path}`. Fetch the latest version and try again."
|
|
)
|
|
else:
|
|
retVal["message"] = f"Failed to apply diff. {error_msg}"
|
|
return retVal
|
|
except Exception as e:
|
|
retVal["message"] = (
|
|
f"Unexpected failure during diff application: {type(e).__name__}: {e}"
|
|
)
|
|
return retVal
|
|
|
|
async def commit_changes(
|
|
self,
|
|
path: str,
|
|
content: str,
|
|
message: Optional[str] = None,
|
|
repo: Optional[str] = None,
|
|
max_delta_percent: Optional[float] = None,
|
|
__user__: dict = None,
|
|
__metadata__: dict = None,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> dict:
|
|
"""
|
|
Commit file changes with automatic content detection and size delta gating.
|
|
|
|
This method:
|
|
1. Detects whether to create or replace a file
|
|
2. Validates file size changes against threshold (quality gate)
|
|
3. Auto-generates commit message if not provided
|
|
4. Commits to the working branch
|
|
"""
|
|
|
|
retVal = {"status": "failure", "message": ""}
|
|
|
|
# Get working branch from metadata
|
|
effective_branch = self._gitea.get_branch(__user__, __metadata__)
|
|
|
|
token = self._gitea.get_token(__user__)
|
|
if not token:
|
|
retVal["message"] = "GITEA_TOKEN not configured in UserValves settings."
|
|
return retVal
|
|
|
|
try:
|
|
owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__)
|
|
except ValueError as e:
|
|
retVal["message"] = str(e)
|
|
return retVal
|
|
|
|
# Use provided threshold or default from valves
|
|
delta_threshold = max_delta_percent or self.valves.MAX_SIZE_DELTA_PERCENT
|
|
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {
|
|
"description": f"Processing {path}...",
|
|
"done": False,
|
|
},
|
|
}
|
|
)
|
|
|
|
try:
|
|
async with httpx.AsyncClient(
|
|
timeout=30.0, verify=self.valves.VERIFY_SSL
|
|
) as client:
|
|
# Check if file exists
|
|
get_response = await client.get(
|
|
self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
|
|
headers=self._gitea.headers(__user__),
|
|
params={"ref": effective_branch},
|
|
)
|
|
|
|
file_exists = get_response.status_code == 200
|
|
current_sha = None
|
|
current_size = 0
|
|
|
|
if file_exists:
|
|
get_response.raise_for_status()
|
|
file_info = get_response.json()
|
|
current_sha = file_info.get("sha")
|
|
current_size = file_info.get("size", 0)
|
|
|
|
# SIZE DELTA GATE - Quality check
|
|
new_size = len(content.encode("utf-8"))
|
|
if current_size > 0 and new_size > 0:
|
|
delta_percent = (
|
|
abs(new_size - current_size) / current_size * 100
|
|
)
|
|
|
|
if delta_percent > delta_threshold:
|
|
# Calculate actual bytes changed
|
|
size_diff = new_size - current_size
|
|
direction = "larger" if size_diff > 0 else "smaller"
|
|
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {
|
|
"description": "Size gate triggered",
|
|
"done": True,
|
|
"hidden": True,
|
|
},
|
|
}
|
|
)
|
|
|
|
retVal[
|
|
"message"
|
|
] = f"""⚠️ **Quality Gate: Large File Change Detected**
|
|
|
|
**File:** `{path}`
|
|
**Current Size:** {current_size} bytes
|
|
**New Size:** {new_size} bytes
|
|
**Change:** {size_diff:+d} bytes ({delta_percent:.1f}% {direction})
|
|
**Threshold:** {delta_threshold}%
|
|
|
|
**Recommended: Use diff-based updates instead**
|
|
```python
|
|
apply_diff(path="{path}", diff_content="...")
|
|
```
|
|
|
|
Or override with:
|
|
```python
|
|
commit_changes(..., max_delta_percent=100)
|
|
```
|
|
"""
|
|
return retVal
|
|
|
|
# Generate commit message if not provided
|
|
if not message:
|
|
message = self._gitea.generate_commit_message(
|
|
change_type="chore",
|
|
scope=path.split("/")[-1] if "/" in path else path,
|
|
description=f"update {path}",
|
|
)
|
|
|
|
# Prepare content
|
|
content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii")
|
|
|
|
if file_exists:
|
|
# Replace existing file
|
|
if not current_sha:
|
|
retVal["message"] = (
|
|
f"Could not retrieve SHA for existing file: {path}"
|
|
)
|
|
return retVal
|
|
|
|
response = await client.put(
|
|
self._gitea.api_url(
|
|
f"/repos/{owner}/{repo_name}/contents/{path}"
|
|
),
|
|
headers=self._gitea.headers(__user__),
|
|
json={
|
|
"content": content_b64,
|
|
"message": message,
|
|
"branch": effective_branch,
|
|
"sha": current_sha,
|
|
},
|
|
)
|
|
else:
|
|
# Create new file
|
|
response = await client.post(
|
|
self._gitea.api_url(
|
|
f"/repos/{owner}/{repo_name}/contents/{path}"
|
|
),
|
|
headers=self._gitea.headers(__user__),
|
|
json={
|
|
"content": content_b64,
|
|
"message": message,
|
|
"branch": effective_branch,
|
|
},
|
|
)
|
|
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
|
|
commit_sha = result.get("commit", {}).get("sha", "unknown")[:8]
|
|
action = "Updated" if file_exists else "Created"
|
|
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {"description": "Done", "done": True, "hidden": True},
|
|
}
|
|
)
|
|
|
|
# Calculate and show size change
|
|
new_size = len(content.encode("utf-8"))
|
|
size_info = ""
|
|
if file_exists and current_size > 0:
|
|
delta = new_size - current_size
|
|
delta_percent = delta / current_size * 100
|
|
size_info = (
|
|
f"**Size Change:** {delta:+d} bytes ({delta_percent:.1f}%)\n"
|
|
)
|
|
|
|
message_text = f"""✅ **{action} File Successfully**
|
|
|
|
**File:** `{path}`
|
|
**Branch:** `{effective_branch}`
|
|
**Commit:** `{commit_sha}`
|
|
**Message:** {message}
|
|
|
|
{size_info}**Action:** {action}
|
|
"""
|
|
|
|
retVal["status"] = "success"
|
|
retVal["message"] = message_text
|
|
return retVal
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
error_msg = self._gitea.format_error(e, f"file commit for '{path}'")
|
|
if e.response.status_code == 409:
|
|
retVal["message"] = (
|
|
f"Update conflict for `{path}`. Fetch the latest version and try again."
|
|
)
|
|
else:
|
|
retVal["message"] = f"Failed to commit file. {error_msg}"
|
|
return retVal
|
|
except Exception as e:
|
|
retVal["message"] = (
|
|
f"Unexpected failure during commit: {type(e).__name__}: {e}"
|
|
)
|
|
return retVal
|
|
|
|
async def create_pull_request(
|
|
self,
|
|
title: str,
|
|
body: Optional[str] = "",
|
|
repo: Optional[str] = None,
|
|
__user__: dict = None,
|
|
__metadata__: dict = None,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> dict:
|
|
"""Create a pull request from the current branch"""
|
|
|
|
retVal = {"status": "failure", "message": ""}
|
|
|
|
# Get working branch from metadata
|
|
head_branch = self._gitea.get_branch(__user__, __metadata__)
|
|
|
|
token = self._gitea.get_token(__user__)
|
|
if not token:
|
|
retVal["message"] = "GITEA_TOKEN not configured."
|
|
return retVal
|
|
|
|
try:
|
|
owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__)
|
|
except ValueError as e:
|
|
retVal["message"] = str(e)
|
|
return retVal
|
|
|
|
base_branch = self.valves.DEFAULT_BRANCH
|
|
|
|
# Check if protected branch
|
|
if head_branch in self.valves.PROTECTED_BRANCHES:
|
|
retVal["message"] = (
|
|
f"❌ Cannot create PR from protected branch '{head_branch}'"
|
|
)
|
|
return retVal
|
|
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {"description": "Creating PR...", "done": False},
|
|
}
|
|
)
|
|
|
|
try:
|
|
async with httpx.AsyncClient(
|
|
timeout=30.0, verify=self.valves.VERIFY_SSL
|
|
) as client:
|
|
response = await client.post(
|
|
self._gitea.api_url(f"/repos/{owner}/{repo_name}/pulls"),
|
|
headers=self._gitea.headers(__user__),
|
|
json={
|
|
"title": title,
|
|
"head": head_branch,
|
|
"base": base_branch,
|
|
"body": body,
|
|
},
|
|
)
|
|
|
|
# Handle PR already exists
|
|
if response.status_code == 409:
|
|
retVal["message"] = (
|
|
f"⚠️ PR already exists for branch `{head_branch}`"
|
|
)
|
|
return retVal
|
|
|
|
response.raise_for_status()
|
|
pr = response.json()
|
|
|
|
pr_number = pr.get("number")
|
|
pr_url = pr.get("html_url", "")
|
|
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {"description": "Done", "done": True, "hidden": True},
|
|
}
|
|
)
|
|
|
|
# Persist PR number for session recovery
|
|
if __metadata__ and __metadata__.get("chat_id"):
|
|
self._gitea.save_state(__metadata__["chat_id"], pr_number=pr_number)
|
|
|
|
retVal["status"] = "success"
|
|
retVal[
|
|
"message"
|
|
] = f"""✅ **Pull Request Created Successfully**
|
|
|
|
**PR #{pr_number}:** {title}
|
|
**Branch:** `{head_branch}` → `{base_branch}`
|
|
**URL:** {pr_url}
|
|
|
|
**Next Steps:**
|
|
1. Read PR feedback: `read_pull_request({pr_number})`
|
|
2. Address reviewer comments
|
|
3. Update ticket with PR link
|
|
"""
|
|
return retVal
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
error_msg = self._gitea.format_error(e, "PR creation")
|
|
if e.response.status_code == 422:
|
|
retVal["message"] = (
|
|
"Could not create PR. The branch may not exist or there may be merge conflicts."
|
|
)
|
|
else:
|
|
retVal["message"] = f"Failed to create PR. {error_msg}"
|
|
return retVal
|
|
except Exception as e:
|
|
retVal["message"] = (
|
|
f"Unexpected failure during PR creation: {type(e).__name__}: {e}"
|
|
)
|
|
return retVal
|
|
|
|
async def read_pull_request(
|
|
self,
|
|
pr_number: int,
|
|
repo: Optional[str] = None,
|
|
__user__: dict = None,
|
|
__metadata__: dict = None,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> dict:
|
|
"""
|
|
Read pull request details including review feedback.
|
|
This is how you get feedback to iterate on changes.
|
|
"""
|
|
|
|
retVal = {"status": "failure", "message": ""}
|
|
|
|
token = self._gitea.get_token(__user__)
|
|
if not token:
|
|
retVal["message"] = "GITEA_TOKEN not configured."
|
|
return retVal
|
|
|
|
try:
|
|
owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__)
|
|
except ValueError as e:
|
|
retVal["message"] = str(e)
|
|
return retVal
|
|
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {
|
|
"description": f"Fetching PR #{pr_number}...",
|
|
"done": False,
|
|
},
|
|
}
|
|
)
|
|
|
|
try:
|
|
async with httpx.AsyncClient(
|
|
timeout=30.0, verify=self.valves.VERIFY_SSL
|
|
) as client:
|
|
# Get PR details
|
|
pr_response = await client.get(
|
|
self._gitea.api_url(
|
|
f"/repos/{owner}/{repo_name}/pulls/{pr_number}"
|
|
),
|
|
headers=self._gitea.headers(__user__),
|
|
)
|
|
pr_response.raise_for_status()
|
|
pr = pr_response.json()
|
|
|
|
# Get PR comments
|
|
comments_response = await client.get(
|
|
self._gitea.api_url(
|
|
f"/repos/{owner}/{repo_name}/pulls/{pr_number}/comments"
|
|
),
|
|
headers=self._gitea.headers(__user__),
|
|
)
|
|
comments_response.raise_for_status()
|
|
comments = comments_response.json()
|
|
|
|
title = pr.get("title", "")
|
|
body = pr.get("body", "")
|
|
state = pr.get("state", "")
|
|
user = pr.get("user", {}).get("login", "")
|
|
head_branch = pr.get("head", {}).get("ref", "")
|
|
base_branch = pr.get("base", {}).get("ref", "")
|
|
mergeable = pr.get("mergeable", False)
|
|
merged = pr.get("merged", False)
|
|
html_url = pr.get("html_url", "")
|
|
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {"description": "Done", "done": True, "hidden": True},
|
|
}
|
|
)
|
|
|
|
# Build message
|
|
message = f"# 🔀 Pull Request #{pr_number}: {title}\n\n"
|
|
message += f"**State:** {state.upper()} | **Author:** @{user}\n"
|
|
message += f"**Branch:** `{head_branch}` → `{base_branch}`\n"
|
|
message += f"**Mergeable:** {'✅ Yes' if mergeable else '❌ No'}\n"
|
|
message += f"**Merged:** {'✅ Yes' if merged else '❌ No'}\n"
|
|
message += f"**URL:** {html_url}\n\n"
|
|
|
|
if body:
|
|
message += "## 📝 Description\n\n"
|
|
message += f"{body}\n\n"
|
|
|
|
if comments:
|
|
message += f"## 💬 Review Comments ({len(comments)})\n\n"
|
|
for comment in comments[:10]: # Limit to 10 most recent
|
|
comment_user = comment.get("user", {}).get("login", "unknown")
|
|
comment_body = comment.get("body", "")
|
|
comment_path = comment.get("path", "")
|
|
comment_line = comment.get("line", "")
|
|
|
|
message += f"**@{comment_user}**"
|
|
if comment_path:
|
|
message += f" on `{comment_path}`"
|
|
if comment_line:
|
|
message += f" (line {comment_line})"
|
|
message += f":\n{comment_body}\n\n"
|
|
|
|
if len(comments) > 10:
|
|
message += f"... and {len(comments) - 10} more comments\n\n"
|
|
else:
|
|
message += "## 💬 No review comments yet\n\n"
|
|
|
|
message += "## 🚀 Next Steps\n\n"
|
|
if comments:
|
|
message += "1. Address review comments\n"
|
|
message += (
|
|
"2. Make changes using `apply_diff()` or `commit_changes()`\n"
|
|
)
|
|
message += "3. Update ticket with progress\n"
|
|
else:
|
|
message += "Waiting for review feedback...\n"
|
|
|
|
retVal["status"] = "success"
|
|
retVal["message"] = message
|
|
return retVal
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
error_msg = self._gitea.format_error(e, f"PR #{pr_number}")
|
|
if e.response.status_code == 404:
|
|
retVal["message"] = f"PR #{pr_number} not found in {owner}/{repo_name}."
|
|
else:
|
|
retVal["message"] = f"Failed to fetch PR. {error_msg}"
|
|
return retVal
|
|
except Exception as e:
|
|
retVal["message"] = f"Unexpected failure: {type(e).__name__}: {e}"
|
|
return retVal
|
|
|
|
async def replace_file(
|
|
self,
|
|
path: str,
|
|
content: str,
|
|
message: str,
|
|
repo: Optional[str] = None,
|
|
__user__: dict = None,
|
|
__metadata__: dict = None,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> dict:
|
|
"""
|
|
Update an existing file in the repository.
|
|
WARNING: This replaces the entire file content.
|
|
Use `apply_diff()` for incremental changes.
|
|
"""
|
|
|
|
retVal = {"status": "failure", "message": ""}
|
|
|
|
effective_branch = self._gitea.get_branch(__user__, __metadata__)
|
|
|
|
token = self._gitea.get_token(__user__)
|
|
if not token:
|
|
retVal["message"] = "GITEA_TOKEN not configured in UserValves settings."
|
|
return retVal
|
|
|
|
try:
|
|
owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__)
|
|
except ValueError as e:
|
|
retVal["message"] = str(e)
|
|
return retVal
|
|
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {"description": f"Updating {path}...", "done": False},
|
|
}
|
|
)
|
|
|
|
try:
|
|
async with httpx.AsyncClient(
|
|
timeout=30.0, verify=self.valves.VERIFY_SSL
|
|
) as client:
|
|
get_response = await client.get(
|
|
self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
|
|
headers=self._gitea.headers(__user__),
|
|
params={"ref": effective_branch},
|
|
)
|
|
|
|
if get_response.status_code == 404:
|
|
retVal["message"] = (
|
|
f"File not found: `{path}`. Use `create_file()` to create a new file."
|
|
)
|
|
return retVal
|
|
|
|
get_response.raise_for_status()
|
|
file_info = get_response.json()
|
|
sha = file_info.get("sha")
|
|
|
|
if not sha:
|
|
retVal["message"] = "Could not retrieve file SHA for update."
|
|
return retVal
|
|
|
|
content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii")
|
|
|
|
response = await client.put(
|
|
self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
|
|
headers=self._gitea.headers(__user__),
|
|
json={
|
|
"content": content_b64,
|
|
"message": message,
|
|
"branch": effective_branch,
|
|
"sha": sha,
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
|
|
commit_sha = result.get("commit", {}).get("sha", "unknown")[:8]
|
|
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {"description": "Done", "done": True, "hidden": True},
|
|
}
|
|
)
|
|
|
|
message_text = f"✅ **File Updated Successfully**\n\n"
|
|
message_text += f"**File:** `{path}`\n"
|
|
message_text += f"**Branch:** `{effective_branch}`\n"
|
|
message_text += f"**Commit:** `{commit_sha}`\n"
|
|
message_text += f"**Message:** {message}\n"
|
|
|
|
retVal["status"] = "success"
|
|
retVal["message"] = message_text
|
|
return retVal
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
error_msg = self._gitea.format_error(e, f"file update for '{path}'")
|
|
if e.response.status_code == 409:
|
|
retVal["message"] = (
|
|
f"Update conflict for `{path}`. Fetch the latest version and try again."
|
|
)
|
|
else:
|
|
retVal["message"] = f"Failed to update file. {error_msg}"
|
|
return retVal
|
|
except Exception as e:
|
|
retVal["message"] = (
|
|
f"Unexpected failure during file update: {type(e).__name__}: {e}"
|
|
)
|
|
return retVal
|
|
|
|
async def create_file(
|
|
self,
|
|
path: str,
|
|
content: str,
|
|
message: str,
|
|
repo: Optional[str] = None,
|
|
__user__: dict = None,
|
|
__metadata__: dict = None,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> dict:
|
|
"""Create a new file in the repository"""
|
|
|
|
retVal = {"status": "failure", "message": ""}
|
|
|
|
effective_branch = self._gitea.get_branch(__user__, __metadata__)
|
|
|
|
token = self._gitea.get_token(__user__)
|
|
if not token:
|
|
retVal["message"] = "GITEA_TOKEN not configured in UserValves settings."
|
|
return retVal
|
|
|
|
try:
|
|
owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__)
|
|
except ValueError as e:
|
|
retVal["message"] = str(e)
|
|
return retVal
|
|
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {"description": f"Creating {path}...", "done": False},
|
|
}
|
|
)
|
|
|
|
try:
|
|
content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii")
|
|
|
|
async with httpx.AsyncClient(
|
|
timeout=30.0, verify=self.valves.VERIFY_SSL
|
|
) as client:
|
|
response = await client.post(
|
|
self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
|
|
headers=self._gitea.headers(__user__),
|
|
json={
|
|
"content": content_b64,
|
|
"message": message,
|
|
"branch": effective_branch,
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
|
|
commit_sha = result.get("commit", {}).get("sha", "unknown")[:8]
|
|
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {"description": "Done", "done": True, "hidden": True},
|
|
}
|
|
)
|
|
|
|
message_text = f"✅ **File Created Successfully**\n\n"
|
|
message_text += f"**File:** `{path}`\n"
|
|
message_text += f"**Branch:** `{effective_branch}`\n"
|
|
message_text += f"**Commit:** `{commit_sha}`\n"
|
|
message_text += f"**Message:** {message}\n"
|
|
|
|
retVal["status"] = "success"
|
|
retVal["message"] = message_text
|
|
return retVal
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
error_msg = self._gitea.format_error(e, f"file creation for '{path}'")
|
|
if e.response.status_code == 422:
|
|
retVal["message"] = (
|
|
f"File already exists: `{path}`. Use `replace_file()` or `apply_diff()` to modify it."
|
|
)
|
|
else:
|
|
retVal["message"] = f"Failed to create file. {error_msg}"
|
|
return retVal
|
|
except Exception as e:
|
|
retVal["message"] = (
|
|
f"Unexpected failure during file creation: {type(e).__name__}: {e}"
|
|
)
|
|
return retVal
|
|
|
|
async def delete_file(
|
|
self,
|
|
path: str,
|
|
message: str,
|
|
repo: Optional[str] = None,
|
|
__user__: dict = None,
|
|
__metadata__: dict = None,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
__event_call__: Callable[[dict], Any] = None,
|
|
) -> dict:
|
|
"""Delete a file from the repository"""
|
|
|
|
retVal = {"status": "failure", "message": ""}
|
|
|
|
effective_branch = self._gitea.get_branch(__user__, __metadata__)
|
|
|
|
token = self._gitea.get_token(__user__)
|
|
if not token:
|
|
retVal["message"] = "GITEA_TOKEN not configured in UserValves settings."
|
|
return retVal
|
|
|
|
try:
|
|
owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__)
|
|
except ValueError as e:
|
|
retVal["message"] = str(e)
|
|
return retVal
|
|
|
|
# Confirmation dialog
|
|
if __event_call__:
|
|
result = await __event_call__(
|
|
{
|
|
"type": "confirmation",
|
|
"data": {
|
|
"title": "Confirm File Deletion",
|
|
"message": f"Delete `{path}` from `{owner}/{repo_name}` ({effective_branch})?",
|
|
},
|
|
}
|
|
)
|
|
if result is None or result is False:
|
|
retVal["message"] = "⚠️ File deletion cancelled by user."
|
|
return retVal
|
|
if isinstance(result, dict) and not result.get("confirmed"):
|
|
retVal["message"] = "⚠️ File deletion cancelled by user."
|
|
return retVal
|
|
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {"description": f"Deleting {path}...", "done": False},
|
|
}
|
|
)
|
|
|
|
try:
|
|
async with httpx.AsyncClient(
|
|
timeout=30.0, verify=self.valves.VERIFY_SSL
|
|
) as client:
|
|
get_response = await client.get(
|
|
self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
|
|
headers=self._gitea.headers(__user__),
|
|
params={"ref": effective_branch},
|
|
)
|
|
|
|
if get_response.status_code == 404:
|
|
retVal["message"] = f"File not found: `{path}`"
|
|
return retVal
|
|
|
|
get_response.raise_for_status()
|
|
file_info = get_response.json()
|
|
sha = file_info.get("sha")
|
|
|
|
if not sha:
|
|
retVal["message"] = "Could not retrieve file SHA for deletion."
|
|
return retVal
|
|
|
|
response = await client.delete(
|
|
self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
|
|
headers=self._gitea.headers(__user__),
|
|
json={
|
|
"message": message,
|
|
"branch": effective_branch,
|
|
"sha": sha,
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
|
|
commit_sha = result.get("commit", {}).get("sha", "unknown")[:8]
|
|
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {"description": "Done", "done": True, "hidden": True},
|
|
}
|
|
)
|
|
|
|
message_text = f"✅ **File Deleted Successfully**\n\n"
|
|
message_text += f"**File:** `{path}`\n"
|
|
message_text += f"**Branch:** `{effective_branch}`\n"
|
|
message_text += f"**Commit:** `{commit_sha}`\n"
|
|
message_text += f"**Message:** {message}\n"
|
|
|
|
retVal["status"] = "success"
|
|
retVal["message"] = message_text
|
|
return retVal
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
error_msg = self._gitea.format_error(e, f"file deletion for '{path}'")
|
|
if e.response.status_code == 404:
|
|
retVal["message"] = f"File not found: `{path}`"
|
|
else:
|
|
retVal["message"] = f"Failed to delete file. {error_msg}"
|
|
return retVal
|
|
except Exception as e:
|
|
retVal["message"] = (
|
|
f"Unexpected failure during file deletion: {type(e).__name__}: {e}"
|
|
)
|
|
return retVal
|
|
|
|
async def rename_file(
|
|
self,
|
|
old_path: str,
|
|
new_path: str,
|
|
message: Optional[str] = None,
|
|
repo: Optional[str] = None,
|
|
__user__: dict = None,
|
|
__metadata__: dict = None,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> dict:
|
|
"""Rename a file in the repository"""
|
|
|
|
retVal = {"status": "failure", "message": ""}
|
|
|
|
effective_branch = self._gitea.get_branch(__user__, __metadata__)
|
|
|
|
token = self._gitea.get_token(__user__)
|
|
if not token:
|
|
retVal["message"] = "GITEA_TOKEN not configured in UserValves settings."
|
|
return retVal
|
|
|
|
try:
|
|
owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__)
|
|
except ValueError as e:
|
|
retVal["message"] = str(e)
|
|
return retVal
|
|
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {
|
|
"description": f"Renaming {old_path} to {new_path}...",
|
|
"done": False,
|
|
},
|
|
}
|
|
)
|
|
|
|
try:
|
|
if not message:
|
|
message = self._gitea.generate_commit_message(
|
|
change_type="chore",
|
|
scope="rename",
|
|
description=f"rename {old_path} → {new_path}",
|
|
)
|
|
|
|
async with httpx.AsyncClient(
|
|
timeout=30.0, verify=self.valves.VERIFY_SSL
|
|
) as client:
|
|
get_response = await client.get(
|
|
self._gitea.api_url(
|
|
f"/repos/{owner}/{repo_name}/contents/{old_path}"
|
|
),
|
|
headers=self._gitea.headers(__user__),
|
|
params={"ref": effective_branch},
|
|
)
|
|
|
|
if get_response.status_code == 404:
|
|
retVal["message"] = f"File not found: `{old_path}`"
|
|
return retVal
|
|
|
|
get_response.raise_for_status()
|
|
old_file = get_response.json()
|
|
old_sha = old_file.get("sha")
|
|
|
|
content_b64 = old_file.get("content", "")
|
|
try:
|
|
content = base64.b64decode(content_b64).decode("utf-8")
|
|
except Exception:
|
|
retVal["message"] = "Could not decode file content."
|
|
return retVal
|
|
|
|
new_content_b64 = base64.b64encode(content.encode("utf-8")).decode(
|
|
"ascii"
|
|
)
|
|
|
|
create_response = await client.post(
|
|
self._gitea.api_url(
|
|
f"/repos/{owner}/{repo_name}/contents/{new_path}"
|
|
),
|
|
headers=self._gitea.headers(__user__),
|
|
json={
|
|
"content": new_content_b64,
|
|
"message": message,
|
|
"branch": effective_branch,
|
|
},
|
|
)
|
|
|
|
if create_response.status_code == 422:
|
|
retVal["message"] = f"File already exists at new path: `{new_path}`"
|
|
return retVal
|
|
|
|
create_response.raise_for_status()
|
|
|
|
delete_response = await client.delete(
|
|
self._gitea.api_url(
|
|
f"/repos/{owner}/{repo_name}/contents/{old_path}"
|
|
),
|
|
headers=self._gitea.headers(__user__),
|
|
json={
|
|
"message": message,
|
|
"branch": effective_branch,
|
|
"sha": old_sha,
|
|
},
|
|
)
|
|
delete_response.raise_for_status()
|
|
|
|
commit_sha = (
|
|
create_response.json().get("commit", {}).get("sha", "unknown")[:8]
|
|
)
|
|
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {"description": "Done", "done": True, "hidden": True},
|
|
}
|
|
)
|
|
|
|
message_text = f"✅ **File Renamed Successfully**\n\n"
|
|
message_text += f"**Old Path:** `{old_path}`\n"
|
|
message_text += f"**New Path:** `{new_path}`\n"
|
|
message_text += f"**Branch:** `{effective_branch}`\n"
|
|
message_text += f"**Commit:** `{commit_sha}`\n"
|
|
message_text += f"**Message:** {message}\n"
|
|
|
|
retVal["status"] = "success"
|
|
retVal["message"] = message_text
|
|
return retVal
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
error_msg = self._gitea.format_error(e, "file rename")
|
|
retVal["message"] = f"Failed to rename file. {error_msg}"
|
|
return retVal
|
|
except Exception as e:
|
|
retVal["message"] = (
|
|
f"Unexpected failure during file rename: {type(e).__name__}: {e}"
|
|
)
|
|
return retVal
|
|
|
|
async def get_file(
|
|
self,
|
|
path: str,
|
|
repo: Optional[str] = None,
|
|
__user__: dict = None,
|
|
__metadata__: dict = None,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> dict:
|
|
"""Get the contents of a file from the repository"""
|
|
|
|
retVal = {"status": "failure", "message": ""}
|
|
|
|
effective_branch = self._gitea.get_branch(__user__, __metadata__)
|
|
|
|
token = self._gitea.get_token(__user__)
|
|
if not token:
|
|
retVal["message"] = "GITEA_TOKEN not configured in UserValves settings."
|
|
return retVal
|
|
|
|
try:
|
|
owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__)
|
|
except ValueError as e:
|
|
retVal["message"] = str(e)
|
|
return retVal
|
|
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {
|
|
"description": f"Reading file {path}...",
|
|
"done": False,
|
|
},
|
|
}
|
|
)
|
|
|
|
try:
|
|
async with httpx.AsyncClient(
|
|
timeout=30.0, verify=self.valves.VERIFY_SSL
|
|
) as client:
|
|
response = await client.get(
|
|
self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
|
|
headers=self._gitea.headers(__user__),
|
|
params={"ref": effective_branch},
|
|
)
|
|
response.raise_for_status()
|
|
file_info = response.json()
|
|
|
|
if isinstance(file_info, list):
|
|
retVal["message"] = (
|
|
f"'{path}' is a directory. Use `list_files()` to browse its contents."
|
|
)
|
|
return retVal
|
|
|
|
if file_info.get("type") != "file":
|
|
retVal["message"] = (
|
|
f"'{path}' is not a file (type: {file_info.get('type')})"
|
|
)
|
|
return retVal
|
|
|
|
content_b64 = file_info.get("content", "")
|
|
try:
|
|
content = base64.b64decode(content_b64).decode("utf-8")
|
|
except Exception:
|
|
retVal["message"] = (
|
|
"Could not decode file content. The file may be binary."
|
|
)
|
|
return retVal
|
|
|
|
size = file_info.get("size", 0)
|
|
sha_short = file_info.get("sha", "unknown")[:8]
|
|
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {"description": "Done", "done": True, "hidden": True},
|
|
}
|
|
)
|
|
|
|
message_text = f"**File:** `{path}`\n"
|
|
message_text += f"**Branch:** `{effective_branch}`\n"
|
|
message_text += f"**SHA:** `{sha_short}` | **Size:** {size} bytes\n"
|
|
message_text += f"**URL:** {file_info.get('html_url', 'N/A')}\n\n"
|
|
message_text += f"```\n{content}\n```\n"
|
|
|
|
retVal["status"] = "success"
|
|
retVal["message"] = message_text
|
|
return retVal
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
error_msg = self._gitea.format_error(e, f"file fetch for '{path}'")
|
|
if e.response.status_code == 404:
|
|
retVal["message"] = f"File not found: `{path}`"
|
|
else:
|
|
retVal["message"] = f"Failed to fetch file. {error_msg}"
|
|
return retVal
|
|
except Exception as e:
|
|
retVal["message"] = f"Unexpected failure: {type(e).__name__}: {e}"
|
|
return retVal
|
|
|
|
async def list_files(
|
|
self,
|
|
path: str = "",
|
|
repo: Optional[str] = None,
|
|
__user__: dict = None,
|
|
__metadata__: dict = None,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> dict:
|
|
"""List files and directories in a repository path"""
|
|
|
|
retVal = {"status": "failure", "message": ""}
|
|
|
|
effective_branch = self._gitea.get_branch(__user__, __metadata__)
|
|
|
|
token = self._gitea.get_token(__user__)
|
|
if not token:
|
|
retVal["message"] = "GITEA_TOKEN not configured in UserValves settings."
|
|
return retVal
|
|
|
|
try:
|
|
owner, repo_name = self._gitea.resolve_repo(repo, __user__, __metadata__)
|
|
except ValueError as e:
|
|
retVal["message"] = str(e)
|
|
return retVal
|
|
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {
|
|
"description": f"Listing {path or 'root'}...",
|
|
"done": False,
|
|
},
|
|
}
|
|
)
|
|
|
|
try:
|
|
async with httpx.AsyncClient(
|
|
timeout=30.0, verify=self.valves.VERIFY_SSL
|
|
) as client:
|
|
response = await client.get(
|
|
self._gitea.api_url(f"/repos/{owner}/{repo_name}/contents/{path}"),
|
|
headers=self._gitea.headers(__user__),
|
|
params={"ref": effective_branch},
|
|
)
|
|
response.raise_for_status()
|
|
contents = response.json()
|
|
|
|
if isinstance(contents, dict):
|
|
retVal["message"] = (
|
|
f"'{path}' is a file. Use `get_file()` to read its contents."
|
|
)
|
|
return retVal
|
|
|
|
message = f"**Contents of {owner}/{repo_name}/{path or '.'}** (`{effective_branch}`)\n\n"
|
|
|
|
dirs = [item for item in contents if item.get("type") == "dir"]
|
|
files = [item for item in contents if item.get("type") == "file"]
|
|
|
|
if dirs:
|
|
message += "**📁 Directories:**\n"
|
|
for item in sorted(dirs, key=lambda x: x.get("name", "").lower()):
|
|
message += f"- `📁 {item.get('name', '')}/`\n"
|
|
message += "\n"
|
|
|
|
if files:
|
|
message += "**📄 Files:**\n"
|
|
for item in sorted(files, key=lambda x: x.get("name", "").lower()):
|
|
size = item.get("size", 0)
|
|
if size < 1024:
|
|
size_str = f"{size}B"
|
|
elif size < 1024 * 1024:
|
|
size_str = f"{size//1024}KB"
|
|
else:
|
|
size_str = f"{size//(1024*1024)}MB"
|
|
sha_short = item.get("sha", "unknown")[:8]
|
|
message += (
|
|
f"- `{item.get('name', '')}` ({size_str}, sha: {sha_short})\n"
|
|
)
|
|
|
|
message += f"\n**Total:** {len(dirs)} directories, {len(files)} files"
|
|
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {"description": "Done", "done": True, "hidden": True},
|
|
}
|
|
)
|
|
|
|
retVal["status"] = "success"
|
|
retVal["message"] = message
|
|
return retVal
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
error_msg = self._gitea.format_error(e, f"directory listing for '{path}'")
|
|
if e.response.status_code == 404:
|
|
retVal["message"] = f"Path not found: `{path}`"
|
|
else:
|
|
retVal["message"] = f"Failed to list directory contents. {error_msg}"
|
|
return retVal
|
|
except Exception as e:
|
|
retVal["message"] = f"Unexpected failure: {type(e).__name__}: {e}"
|
|
return retVal
|