diff --git a/gitea/dev.py b/gitea/dev.py index 8f096a3..999bcd0 100644 --- a/gitea/dev.py +++ b/gitea/dev.py @@ -1,11 +1,14 @@ """ title: Gitea Dev - Native Mode Optimized author: Jeff Smith + Claude + minimax + kimi-k2 -version: 1.3.0 +version: 1.4.0 license: MIT description: Interact with Gitea repositories - native tool calling optimized for high-tier LLMs with robust error handling requirements: pydantic, httpx changelog: + 1.4.0: + - Added CRUD operations for Issues (get, update, close, reopen, delete, comments) + - Added CRUD operations for Pull Requests (get, update, merge, comments) 1.3.0: - Native Mode optimization with explicit tool schemas - Added mapping protocol support to Valves/UserValves (__iter__, keys, __getitem__) @@ -1358,6 +1361,630 @@ class Tools: except Exception as e: return f"Error: Unexpected failure during issue listing: {type(e).__name__}: {e}" + async def get_issue( + self, + issue_number: int, + repo: Optional[str] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Get detailed information about a specific issue. + + :param issue_number: Issue number to retrieve + :param repo: Repository in 'owner/repo' format + :return: Comprehensive issue details including title, body, labels, and comments count + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured in UserValves settings." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Fetching issue #{issue_number}...", + "done": False, + }, + } + ) + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + response = await client.get( + self._api_url(f"/repos/{owner}/{repo_name}/issues/{issue_number}"), + headers=self._headers(__user__), + ) + response.raise_for_status() + issue = response.json() + + # Extract issue data + title = issue.get("title", "No title") + body = issue.get("body", "") + state = issue.get("state", "unknown") + user = issue.get("user", {}).get("login", "unknown") + created_at = issue.get("created_at", "")[:10] + updated_at = issue.get("updated_at", "")[:10] + comments_count = issue.get("comments", 0) + html_url = issue.get("html_url", "") + + # Labels + issue_labels = [ + label.get("name", "") for label in issue.get("labels", []) + ] + labels_str = ", ".join(issue_labels) if issue_labels else "None" + + # Assignee + assignee = issue.get("assignee") + assignee_str = ( + f"@{assignee.get('login', 'unknown')}" if assignee else "Unassigned" + ) + + # Build output + output = f"# Issue #{issue_number}: {title}\n\n" + output += f"**State:** {state.upper()}\n" + output += f"**Author:** @{user}\n" + output += f"**Assignee:** {assignee_str}\n" + output += f"**Labels:** {labels_str}\n" + output += f"**Created:** {created_at}\n" + output += f"**Updated:** {updated_at}\n" + output += f"**Comments:** {comments_count}\n" + output += f"**URL:** {html_url}\n\n" + + if body: + output += f"## Description\n\n{body}\n\n" + else: + output += "_No description provided._\n\n" + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + return output.strip() + + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, f"issue #{issue_number}") + if e.response.status_code == 404: + return f"Error: Issue #{issue_number} not found in {owner}/{repo_name}." + return f"Error: Failed to fetch issue. {error_msg}" + except Exception as e: + return f"Error: Unexpected failure during issue fetch: {type(e).__name__}: {e}" + + async def update_issue( + self, + issue_number: int, + repo: Optional[str] = None, + title: Optional[str] = None, + body: Optional[str] = None, + state: Optional[str] = None, + assignee: Optional[str] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Update an existing issue with new title, body, state, or assignee. + + :param issue_number: Issue number to update + :param repo: Repository in 'owner/repo' format + :param title: New title (optional) + :param body: New description body (optional) + :param state: New state - "open" or "closed" (optional) + :param assignee: Username to assign (optional, use empty string to unassign) + :return: Confirmation with updated issue details + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured in UserValves settings." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + # Validate state if provided + if state and state not in ("open", "closed"): + return "Error: State must be 'open' or 'closed'." + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Updating issue #{issue_number}...", + "done": False, + }, + } + ) + + try: + payload = {} + if title is not None: + payload["title"] = title + if body is not None: + payload["body"] = body + if state is not None: + payload["state"] = state + if assignee is not None: + payload["assignee"] = assignee if assignee else "" + + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + response = await client.patch( + self._api_url(f"/repos/{owner}/{repo_name}/issues/{issue_number}"), + headers=self._headers(__user__), + json=payload, + ) + response.raise_for_status() + issue = response.json() + + updated_title = issue.get("title", "No title") + updated_state = issue.get("state", "unknown") + html_url = issue.get("html_url", "") + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + output = f"**Issue #{issue_number} Updated Successfully**\n\n" + output += f"**Title:** {updated_title}\n" + output += f"**State:** {updated_state.upper()}\n" + output += f"**URL:** {html_url}\n" + output += f"\nChanges applied: {', '.join(payload.keys())}\n" + + return output.strip() + + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, f"issue #{issue_number} update") + return f"Error: Failed to update issue. {error_msg}" + except Exception as e: + return f"Error: Unexpected failure during issue update: {type(e).__name__}: {e}" + + async def close_issue( + self, + issue_number: int, + repo: Optional[str] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Close an open issue. + + :param issue_number: Issue number to close + :param repo: Repository in 'owner/repo' format + :return: Confirmation of issue closure + """ + return await self.update_issue(issue_number, repo, state="closed", __user__=__user__, __event_emitter__=__event_emitter__) + + async def reopen_issue( + self, + issue_number: int, + repo: Optional[str] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Reopen a closed issue. + + :param issue_number: Issue number to reopen + :param repo: Repository in 'owner/repo' format + :return: Confirmation of issue reopening + """ + return await self.update_issue(issue_number, repo, state="open", __user__=__user__, __event_emitter__=__event_emitter__) + + async def delete_issue( + self, + issue_number: int, + repo: Optional[str] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + __event_call__: Callable[[dict], Any] = None, + ) -> str: + """ + Delete an issue from the repository. + + :param issue_number: Issue number to delete + :param repo: Repository in 'owner/repo' format + :return: Confirmation of issue deletion + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured in UserValves settings." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + # Confirmation dialog + if __event_call__: + result = await __event_call__( + { + "type": "confirmation", + "data": { + "title": "Confirm Issue Deletion", + "message": f"Delete issue #{issue_number} from `{owner}/{repo_name}`?", + }, + } + ) + if result is None or result is False: + return "⚠️ Issue deletion cancelled by user." + if isinstance(result, dict) and not result.get("confirmed"): + return "⚠️ Issue deletion cancelled by user." + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Deleting issue #{issue_number}...", + "done": False, + }, + } + ) + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + response = await client.delete( + self._api_url(f"/repos/{owner}/{repo_name}/issues/{issue_number}"), + headers=self._headers(__user__), + ) + response.raise_for_status() + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + return f"**Issue #{issue_number} Deleted Successfully** from `{owner}/{repo_name}`" + + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, f"issue #{issue_number} deletion") + if e.response.status_code == 404: + return f"Error: Issue #{issue_number} not found in {owner}/{repo_name}." + return f"Error: Failed to delete issue. {error_msg}" + except Exception as e: + return f"Error: Unexpected failure during issue deletion: {type(e).__name__}: {e}" + + async def list_issue_comments( + self, + issue_number: int, + repo: Optional[str] = None, + page: int = 1, + limit: Optional[int] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + List all comments on an issue. + + :param issue_number: Issue number to get comments for + :param repo: Repository in 'owner/repo' format + :param page: Page number for pagination (default: 1) + :param limit: Results per page (max 50, default: 50) + :return: Formatted comment listing with author and date + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured in UserValves settings." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + page_size = self._get_page_size(limit) + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Fetching comments...", "done": False}, + } + ) + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + response = await client.get( + self._api_url( + f"/repos/{owner}/{repo_name}/issues/{issue_number}/comments" + ), + headers=self._headers(__user__), + params={"page": page, "limit": page_size}, + ) + response.raise_for_status() + comments = response.json() + + total_count = response.headers.get("x-total-count", "?") + + output = f"**Comments on Issue #{issue_number}** ({owner}/{repo_name})\n\n" + + if not comments: + output += "_No comments yet._" + else: + for comment in comments: + comment_id = comment.get("id", 0) + user = comment.get("user", {}).get("login", "unknown") + created_at = comment.get("created_at", "")[:10] + body = comment.get("body", "No content") + + output += f"**Comment by @{user}** (ID: {comment_id}, {created_at})\n" + output += f"{body}\n\n" + output += "---\n\n" + + output += f"\n**Showing {len(comments)} comments (Total: {total_count})**" + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + return output.strip() + + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, f"issue #{issue_number} comments") + if e.response.status_code == 404: + return f"Error: Issue #{issue_number} not found in {owner}/{repo_name}." + return f"Error: Failed to list issue comments. {error_msg}" + except Exception as e: + return f"Error: Unexpected failure during comment listing: {type(e).__name__}: {e}" + + async def create_issue_comment( + self, + issue_number: int, + body: str, + repo: Optional[str] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Add a comment to an issue. + + :param issue_number: Issue number to comment on + :param body: Comment content + :param repo: Repository in 'owner/repo' format + :return: Confirmation with comment details + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured in UserValves settings." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + if not body or not body.strip(): + return "Error: Comment body cannot be empty." + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Adding comment...", "done": False}, + } + ) + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + response = await client.post( + self._api_url( + f"/repos/{owner}/{repo_name}/issues/{issue_number}/comments" + ), + headers=self._headers(__user__), + json={"body": body}, + ) + response.raise_for_status() + comment = response.json() + + comment_id = comment.get("id", 0) + html_url = comment.get("html_url", "") + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + output = f"**Comment Added Successfully**\n\n" + output += f"**Issue:** #{issue_number}\n" + output += f"**Comment ID:** {comment_id}\n" + output += f"**URL:** {html_url}\n\n" + output += f"**Content:**\n{body}\n" + + return output.strip() + + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, f"comment on issue #{issue_number}") + if e.response.status_code == 404: + return f"Error: Issue #{issue_number} not found in {owner}/{repo_name}." + return f"Error: Failed to create comment. {error_msg}" + except Exception as e: + return f"Error: Unexpected failure during comment creation: {type(e).__name__}: {e}" + + async def update_issue_comment( + self, + comment_id: int, + body: str, + repo: Optional[str] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Update an existing comment on an issue. + + :param comment_id: Comment ID to update + :param body: New comment content + :param repo: Repository in 'owner/repo' format + :return: Confirmation with updated comment details + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured in UserValves settings." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + if not body or not body.strip(): + return "Error: Comment body cannot be empty." + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Updating comment...", "done": False}, + } + ) + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + response = await client.patch( + self._api_url(f"/repos/{owner}/{repo_name}/issues/comments/{comment_id}"), + headers=self._headers(__user__), + json={"body": body}, + ) + response.raise_for_status() + comment = response.json() + + updated_body = comment.get("body", "No content") + updated_at = comment.get("updated_at", "")[:10] + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + output = f"**Comment {comment_id} Updated Successfully**\n\n" + output += f"**Repository:** {owner}/{repo_name}\n" + output += f"**Updated:** {updated_at}\n\n" + output += f"**New Content:**\n{updated_body}\n" + + return output.strip() + + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, f"comment #{comment_id}") + if e.response.status_code == 404: + return f"Error: Comment #{comment_id} not found in {owner}/{repo_name}." + return f"Error: Failed to update comment. {error_msg}" + except Exception as e: + return f"Error: Unexpected failure during comment update: {type(e).__name__}: {e}" + + async def delete_issue_comment( + self, + comment_id: int, + repo: Optional[str] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + __event_call__: Callable[[dict], Any] = None, + ) -> str: + """ + Delete a comment from an issue. + + :param comment_id: Comment ID to delete + :param repo: Repository in 'owner/repo' format + :return: Confirmation of comment deletion + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured in UserValves settings." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + # Confirmation dialog + if __event_call__: + result = await __event_call__( + { + "type": "confirmation", + "data": { + "title": "Confirm Comment Deletion", + "message": f"Delete comment #{comment_id} from `{owner}/{repo_name}`?", + }, + } + ) + if result is None or result is False: + return "⚠️ Comment deletion cancelled by user." + if isinstance(result, dict) and not result.get("confirmed"): + return "⚠️ Comment deletion cancelled by user." + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Deleting comment #{comment_id}...", + "done": False, + }, + } + ) + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + response = await client.delete( + self._api_url(f"/repos/{owner}/{repo_name}/issues/comments/{comment_id}"), + headers=self._headers(__user__), + ) + response.raise_for_status() + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + return f"**Comment #{comment_id} Deleted Successfully** from `{owner}/{repo_name}`" + + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, f"comment #{comment_id} deletion") + if e.response.status_code == 404: + return f"Error: Comment #{comment_id} not found in {owner}/{repo_name}." + return f"Error: Failed to delete comment. {error_msg}" + except Exception as e: + return f"Error: Unexpected failure during comment deletion: {type(e).__name__}: {e}" + async def create_issue( self, title: str, @@ -1541,6 +2168,648 @@ class Tools: f"Error: Unexpected failure during PR listing: {type(e).__name__}: {e}" ) + async def get_pull_request( + self, + pr_number: int, + repo: Optional[str] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Get detailed information about a specific pull request. + + :param pr_number: Pull request number to retrieve + :param repo: Repository in 'owner/repo' format + :return: Comprehensive PR details including title, body, branches, and merge status + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured in UserValves settings." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Fetching PR #{pr_number}...", + "done": False, + }, + } + ) + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + response = await client.get( + self._api_url(f"/repos/{owner}/{repo_name}/pulls/{pr_number}"), + headers=self._headers(__user__), + ) + response.raise_for_status() + pr = response.json() + + # Extract PR data + title = pr.get("title", "No title") + body = pr.get("body", "") + state = pr.get("state", "unknown") + user = pr.get("user", {}).get("login", "unknown") + created_at = pr.get("created_at", "")[:10] + updated_at = pr.get("updated_at", "")[:10] + head_ref = pr.get("head", {}).get("ref", "") + base_ref = pr.get("base", {}).get("ref", "") + mergeable = pr.get("mergeable", None) + merged = pr.get("merged", False) + html_url = pr.get("html_url", "") + + # Build output + output = f"# PR #{pr_number}: {title}\n\n" + output += f"**State:** {state.upper()}{' (MERGED)' if merged else ''}\n" + output += f"**Author:** @{user}\n" + output += f"**Branches:** `{head_ref}` → `{base_ref}`\n" + output += f"**Created:** {created_at}\n" + output += f"**Updated:** {updated_at}\n" + output += f"**URL:** {html_url}\n" + + # Merge status + if mergeable is True: + output += "**Merge Status:** ✅ Mergeable\n" + elif mergeable is False: + output += "**Merge Status:** ⚠️ Has conflicts\n" + else: + output += "**Merge Status:** Unknown\n" + + output += "\n" + if body: + output += f"## Description\n\n{body}\n\n" + else: + output += "_No description provided._\n\n" + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + return output.strip() + + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, f"PR #{pr_number}") + if e.response.status_code == 404: + return f"Error: PR #{pr_number} not found in {owner}/{repo_name}." + return f"Error: Failed to fetch pull request. {error_msg}" + except Exception as e: + return f"Error: Unexpected failure during PR fetch: {type(e).__name__}: {e}" + + async def update_pull_request( + self, + pr_number: int, + repo: Optional[str] = None, + title: Optional[str] = None, + body: Optional[str] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Update an existing pull request with new title or body. + + :param pr_number: Pull request number to update + :param repo: Repository in 'owner/repo' format + :param title: New title (optional) + :param body: New description body (optional) + :return: Confirmation with updated PR details + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured in UserValves settings." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Updating PR #{pr_number}...", + "done": False, + }, + } + ) + + try: + payload = {} + if title is not None: + payload["title"] = title + if body is not None: + payload["body"] = body + + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + response = await client.patch( + self._api_url(f"/repos/{owner}/{repo_name}/pulls/{pr_number}"), + headers=self._headers(__user__), + json=payload, + ) + response.raise_for_status() + pr = response.json() + + updated_title = pr.get("title", "No title") + html_url = pr.get("html_url", "") + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + output = f"**PR #{pr_number} Updated Successfully**\n\n" + output += f"**Title:** {updated_title}\n" + output += f"**URL:** {html_url}\n" + if payload: + output += f"\nChanges applied: {', '.join(payload.keys())}\n" + + return output.strip() + + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, f"PR #{pr_number} update") + return f"Error: Failed to update pull request. {error_msg}" + except Exception as e: + return f"Error: Unexpected failure during PR update: {type(e).__name__}: {e}" + + async def merge_pull_request( + self, + pr_number: int, + repo: Optional[str] = None, + merge_strategy: str = "merge", + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Merge a pull request using the specified strategy. + + :param pr_number: Pull request number to merge + :param repo: Repository in 'owner/repo' format + :param merge_strategy: Merge strategy - "merge", "rebase", or "squash" (default: "merge") + :return: Confirmation of merge or error details + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured in UserValves settings." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + # Validate merge strategy + if merge_strategy not in ("merge", "rebase", "squash"): + return "Error: Merge strategy must be 'merge', 'rebase', or 'squash'." + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Merging PR #{pr_number}...", + "done": False, + }, + } + ) + + try: + async with httpx.AsyncClient( + timeout=60.0, verify=self.valves.VERIFY_SSL + ) as client: + response = await client.post( + self._api_url( + f"/repos/{owner}/{repo_name}/pulls/{pr_number}/merge" + ), + headers=self._headers(__user__), + json={"merge_strategy": merge_strategy}, + ) + + if response.status_code == 405: + return "Error: PR cannot be merged. Check if it's already merged or has conflicts." + + response.raise_for_status() + result = response.json() if response.text else {} + + merged = result.get("merged", True) + commit_sha = result.get("merge_commit", {}).get("sha", "")[:8] + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + if merged: + return f"**PR #{pr_number} Merged Successfully**\n\n" + output += f"**Strategy:** {merge_strategy.upper()}\n" + if commit_sha: + output += f"**Merge Commit:** `{commit_sha}`\n" + output += f"\n✅ The pull request has been merged into {owner}/{repo_name}." + else: + return f"**PR #{pr_number} Merge Result:**\n\n{result}\n" + + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, f"PR #{pr_number} merge") + if e.response.status_code == 405: + return f"Error: PR #{pr_number} cannot be merged. It may already be merged or have merge conflicts." + return f"Error: Failed to merge PR. {error_msg}" + except Exception as e: + return f"Error: Unexpected failure during PR merge: {type(e).__name__}: {e}" + + async def list_pull_request_comments( + self, + pr_number: int, + repo: Optional[str] = None, + page: int = 1, + limit: Optional[int] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + List all reviews/comments on a pull request. + + :param pr_number: PR number to get comments/reviews for + :param repo: Repository in 'owner/repo' format + :param page: Page number for pagination (default: 1) + :param limit: Results per page (max 50, default: 50) + :return: Formatted review/comment listing with author and content + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured in UserValves settings." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + page_size = self._get_page_size(limit) + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Fetching PR reviews...", "done": False}, + } + ) + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + # Try reviews endpoint first (Gitea uses /pulls/{index}/reviews) + response = await client.get( + self._api_url( + f"/repos/{owner}/{repo_name}/pulls/{pr_number}/reviews" + ), + headers=self._headers(__user__), + params={"page": page, "limit": page_size}, + ) + + # Fallback to issue comments if reviews endpoint not available + if response.status_code == 404: + response = await client.get( + self._api_url( + f"/repos/{owner}/{repo_name}/issues/{pr_number}/comments" + ), + headers=self._headers(__user__), + params={"page": page, "limit": page_size}, + ) + + response.raise_for_status() + comments = response.json() + + total_count = response.headers.get("x-total-count", "?") + + output = f"**Reviews/Comments on PR #{pr_number}** ({owner}/{repo_name})\n\n" + + if not comments: + output += "_No reviews or comments yet._" + else: + for comment in comments: + comment_id = comment.get("id", 0) + user = comment.get("user", {}).get("login", "unknown") + created_at = comment.get("created_at", "")[:10] + body = comment.get("body", "No content") + + # Review-specific fields + state = comment.get("state", "") + if state: + output += f"**Review by @{user}** - {state} ({created_at})\n" + else: + output += f"**Comment by @{user}** ({created_at})\n" + + output += f"{body}\n\n" + output += "---\n\n" + + output += f"\n**Showing {len(comments)} items (Total: {total_count})**" + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + return output.strip() + + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, f"PR #{pr_number} reviews") + if e.response.status_code == 404: + return f"Error: PR #{pr_number} not found in {owner}/{repo_name}." + return f"Error: Failed to list PR reviews. {error_msg}" + except Exception as e: + return f"Error: Unexpected failure during review listing: {type(e).__name__}: {e}" + + async def create_pull_request_comment( + self, + pr_number: int, + body: str, + repo: Optional[str] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Add a review comment to a pull request. + + :param pr_number: PR number to comment on + :param body: Comment content + :param repo: Repository in 'owner/repo' format + :return: Confirmation with comment details + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured in UserValves settings." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + if not body or not body.strip(): + return "Error: Comment body cannot be empty." + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Adding review comment...", "done": False}, + } + ) + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + response = await client.post( + self._api_url( + f"/repos/{owner}/{repo_name}/pulls/{pr_number}/reviews" + ), + headers=self._headers(__user__), + json={ + "body": body, + "event": "COMMENT", # Just add a comment, don't approve/reject + }, + ) + + # Fallback to issue comments if reviews endpoint not available + if response.status_code == 404: + response = await client.post( + self._api_url( + f"/repos/{owner}/{repo_name}/issues/{pr_number}/comments" + ), + headers=self._headers(__user__), + json={"body": body}, + ) + + response.raise_for_status() + comment = response.json() + + comment_id = comment.get("id", 0) + html_url = comment.get("html_url", "") + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + output = f"**Review Comment Added Successfully**\n\n" + output += f"**PR:** #{pr_number}\n" + output += f"**Comment ID:** {comment_id}\n" + output += f"**URL:** {html_url}\n\n" + output += f"**Content:**\n{body}\n" + + return output.strip() + + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, f"comment on PR #{pr_number}") + if e.response.status_code == 404: + return f"Error: PR #{pr_number} not found in {owner}/{repo_name}." + return f"Error: Failed to create review comment. {error_msg}" + except Exception as e: + return f"Error: Unexpected failure during comment creation: {type(e).__name__}: {e}" + + async def update_pull_request_comment( + self, + comment_id: int, + body: str, + repo: Optional[str] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Update an existing review comment on a pull request. + + :param comment_id: Comment ID to update + :param body: New comment content + :param repo: Repository in 'owner/repo' format + :return: Confirmation with updated comment details + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured in UserValves settings." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + if not body or not body.strip(): + return "Error: Comment body cannot be empty." + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Updating review comment...", "done": False}, + } + ) + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + response = await client.patch( + self._api_url( + f"/repos/{owner}/{repo_name}/pulls/comments/{comment_id}" + ), + headers=self._headers(__user__), + json={"body": body}, + ) + + # Fallback to issue comments if reviews endpoint not available + if response.status_code == 404: + response = await client.patch( + self._api_url( + f"/repos/{owner}/{repo_name}/issues/comments/{comment_id}" + ), + headers=self._headers(__user__), + json={"body": body}, + ) + + response.raise_for_status() + comment = response.json() + + updated_body = comment.get("body", "No content") + updated_at = comment.get("updated_at", "")[:10] + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + output = f"**Review Comment {comment_id} Updated Successfully**\n\n" + output += f"**Repository:** {owner}/{repo_name}\n" + output += f"**Updated:** {updated_at}\n\n" + output += f"**New Content:**\n{updated_body}\n" + + return output.strip() + + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, f"comment #{comment_id}") + if e.response.status_code == 404: + return f"Error: Comment #{comment_id} not found in {owner}/{repo_name}." + return f"Error: Failed to update review comment. {error_msg}" + except Exception as e: + return f"Error: Unexpected failure during comment update: {type(e).__name__}: {e}" + + async def delete_pull_request_comment( + self, + comment_id: int, + repo: Optional[str] = None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + __event_call__: Callable[[dict], Any] = None, + ) -> str: + """ + delete a review comment from a pull request. + + :param comment_id: Comment ID to delete + :param repo: Repository in 'owner/repo' format + :return: Confirmation of comment deletion + """ + token = self._get_token(__user__) + if not token: + return "Error: GITEA_TOKEN not configured in UserValves settings." + + try: + owner, repo_name = self._resolve_repo(repo, __user__) + except ValueError as e: + return f"Error: {e}" + + # Confirmation dialog + if __event_call__: + result = await __event_call__( + { + "type": "confirmation", + "data": { + "title": "Confirm Review Comment Deletion", + "message": f"Delete review comment #{comment_id} from `{owner}/{repo_name}`?", + }, + } + ) + if result is None or result is False: + return "⚠️ Comment deletion cancelled by user." + if isinstance(result, dict) and not result.get("confirmed"): + return "⚠️ Comment deletion cancelled by user." + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Deleting review comment #{comment_id}...", + "done": False, + }, + } + ) + + try: + async with httpx.AsyncClient( + timeout=30.0, verify=self.valves.VERIFY_SSL + ) as client: + # Try pull request comments endpoint first + response = await client.delete( + self._api_url( + f"/repos/{owner}/{repo_name}/pulls/comments/{comment_id}" + ), + headers=self._headers(__user__), + ) + + # Fallback to issue comments if reviews endpoint not available + if response.status_code == 404: + response = await client.delete( + self._api_url( + f"/repos/{owner}/{repo_name}/issues/comments/{comment_id}" + ), + headers=self._headers(__user__), + ) + + response.raise_for_status() + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Done", "done": True, "hidden": True}, + } + ) + + return f"**Review Comment #{comment_id} Deleted Successfully** from `{owner}/{repo_name}`" + + except httpx.HTTPStatusError as e: + error_msg = self._format_error(e, f"comment #{comment_id} deletion") + if e.response.status_code == 404: + return f"Error: Comment #{comment_id} not found in {owner}/{repo_name}." + return f"Error: Failed to delete review comment. {error_msg}" + except Exception as e: + return f"Error: Unexpected failure during comment deletion: {type(e).__name__}: {e}" + async def create_pull_request( self, title: str,