From 8903614e8425199ca497f1e4d5c75c002dd6eb92 Mon Sep 17 00:00:00 2001 From: xcaliber Date: Thu, 15 Jan 2026 13:35:29 +0000 Subject: [PATCH] Add http/client.py --- http/client.py | 559 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 559 insertions(+) create mode 100644 http/client.py diff --git a/http/client.py b/http/client.py new file mode 100644 index 0000000..8a393f1 --- /dev/null +++ b/http/client.py @@ -0,0 +1,559 @@ +""" +title: HTTP Client Tool +author: Jeff Smith +version: 0.4.1 +license: MIT +required_open_webui_version: 0.6.0 +requirements: httpx, pydantic +description: | + A curl-like HTTP client for Open WebUI. + - Supports full URLs or relative endpoints + - Auto-discovers URL and auth for self-API calls (current platform) + - UserValves for per-user external API configuration + - Supports GET, POST, PUT, PATCH, DELETE, HEAD, OPTIONS + All methods return strings for reliable LLM context injection. +""" + +from typing import Callable, Any, Optional +import httpx +import json +import asyncio +import random +from pydantic import BaseModel, Field + + +class Tools: + """ + HTTP client tool for API testing. + + Configuration priority: + 1. Full URL in endpoint - Used directly (e.g., https://api.example.com/v1/resource) + 2. UserValves.EXTERNAL_BASE_URL - Per-user external API config + 3. Auto-discover from __request__ - Current platform (self-API) + + All methods return strings for reliable LLM context injection. + """ + + class Valves(BaseModel): + """ + Admin configuration. + Shared defaults for all users. + """ + + AUTO_DISCOVER: bool = Field( + default=True, + description="Auto-discover URL and auth from request context for self-API calls", + ) + TIMEOUT: int = Field(default=30, description="Request timeout in seconds") + RETRIES: int = Field( + default=3, description="Number of retry attempts for failed requests" + ) + MAX_RESPONSE_SIZE: int = Field( + default=8192, + description="Maximum response size in characters (0 for unlimited)", + ) + INCLUDE_HEADERS: bool = Field( + default=False, description="Include response headers in output" + ) + + class UserValves(BaseModel): + """ + Per-user configuration for external API access. + Each user can configure their own external endpoints. + """ + + EXTERNAL_BASE_URL: str = Field( + default="", + description="Your external API base URL (leave empty to use current platform)", + ) + EXTERNAL_API_KEY: str = Field( + default="", description="Your API key for external endpoints" + ) + + def __init__(self): + self.valves = self.Valves() + self.user_valves = self.UserValves() + self.citation = False + + # ==================== Configuration ==================== + + def _get_config(self, __request__=None) -> tuple[str, dict]: + """ + Get API configuration with priority: + 1. UserValves (per-user external API config) + 2. Auto-discover from request (self-API) + + Returns (base_url, headers) tuple. + """ + base_url = "" + headers = {} + + # Priority 1: UserValves for external API (per-user config) + if self.user_valves.EXTERNAL_BASE_URL: + base_url = self.user_valves.EXTERNAL_BASE_URL + if self.user_valves.EXTERNAL_API_KEY: + headers["Authorization"] = f"Bearer {self.user_valves.EXTERNAL_API_KEY}" + return base_url.rstrip("/"), headers + + # Priority 2: Auto-discover from request context (self-API) + if self.valves.AUTO_DISCOVER and __request__: + base_url = str(getattr(__request__, "base_url", "") or "") + req_headers = getattr(__request__, "headers", {}) + if req_headers: + auth = dict(req_headers).get("authorization", "") + if auth: + headers["Authorization"] = auth + + return base_url.rstrip("/"), headers + + # ==================== Internal Helpers ==================== + + def _truncate(self, text: str) -> str: + """Truncate response to configured max size.""" + max_size = self.valves.MAX_RESPONSE_SIZE + if max_size and len(text) > max_size: + return ( + text[:max_size] + + f"\n\n[...truncated, {len(text) - max_size} chars omitted]" + ) + return text + + def _format_response( + self, + method: str, + url: str, + status_code: int, + response_body: Any = None, + headers: Optional[dict] = None, + error: Optional[str] = None, + ) -> str: + """Format response as string for LLM context injection.""" + parts = [f"{method} {url}", f"Status: {status_code}"] + + if self.valves.INCLUDE_HEADERS and headers: + header_str = "\n".join(f" {k}: {v}" for k, v in headers.items()) + parts.append(f"Headers:\n{header_str}") + + if error: + parts.append(f"Error: {error}") + elif response_body is not None: + if isinstance(response_body, (dict, list)): + body_str = json.dumps(response_body, indent=2) + else: + body_str = str(response_body) + parts.append(f"Body:\n{body_str}") + + return self._truncate("\n".join(parts)) + + async def _make_request( + self, + method: str, + endpoint: str, + headers: Optional[dict] = None, + body: Optional[dict] = None, + __request__=None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Make HTTP request with retries and auto-discovered config. + Returns formatted string for LLM context. + """ + # Check if endpoint is already a full URL + if endpoint.startswith(("http://", "https://")): + url = endpoint + # Still get headers from open_webui.config for auth if needed + _, default_headers = self._get_config(__request__) + else: + base_url, default_headers = self._get_config(__request__) + if not base_url: + return self._format_response( + method, + endpoint, + 0, + error="Could not determine API URL. Set EXTERNAL_BASE_URL in your user settings or enable AUTO_DISCOVER for self-API.", + ) + # Ensure proper path joining + if endpoint and not endpoint.startswith("/"): + endpoint = "/" + endpoint + url = f"{base_url}{endpoint}" + + # Merge headers + if body is not None: + default_headers["Content-Type"] = "application/json" + default_headers.update(headers or {}) + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": f"{method} {endpoint}", "done": False}, + } + ) + + last_error = None + retries = max(1, self.valves.RETRIES) + + for attempt in range(1, retries + 1): + try: + async with httpx.AsyncClient( + headers=default_headers, + timeout=float(self.valves.TIMEOUT), + ) as client: + response = await client.request(method, url, json=body) + + # Parse response body + content_type = response.headers.get("content-type", "") + if "application/json" in content_type: + try: + response_body = response.json() + except json.JSONDecodeError: + response_body = response.text + else: + response_body = response.text + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"{method} {endpoint}", + "done": True, + }, + } + ) + + return self._format_response( + method=method, + url=url, + status_code=response.status_code, + response_body=response_body, + headers=( + dict(response.headers) + if self.valves.INCLUDE_HEADERS + else None + ), + ) + + except httpx.TimeoutException: + last_error = "Request timeout" + except httpx.RequestError as e: + last_error = f"Request failed: {str(e)}" + + # Retry with exponential backoff + if attempt < retries: + wait = (0.5 * (2 ** (attempt - 1))) + random.uniform(0, 0.25) + await asyncio.sleep(wait) + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"{method} {endpoint} failed", + "done": True, + }, + } + ) + + return self._format_response( + method=method, + url=url, + status_code=0, + error=last_error, + ) + + async def _confirm_action( + self, + action: str, + endpoint: str, + body: Optional[dict] = None, + __event_call__: Callable[[dict], Any] = None, + ) -> Optional[str]: + """Request user confirmation. Returns None if confirmed, error string if cancelled.""" + if not __event_call__: + return None + + payload_preview = "" + if body: + preview = json.dumps(body, indent=2) + if len(preview) > 200: + preview = preview[:200] + "..." + payload_preview = f"\n\nPayload:\n{preview}" + + try: + confirmation = await __event_call__( + { + "type": "confirmation", + "data": { + "title": f"Confirm {action}", + "message": f"{action} {endpoint}{payload_preview}\n\nProceed?", + }, + } + ) + + if confirmation is None: + return f"{action} cancelled - no response" + if isinstance(confirmation, dict) and not confirmation.get( + "confirmed", False + ): + return f"{action} cancelled by user" + if confirmation is False: + return f"{action} cancelled by user" + + return None + except Exception as e: + return f"Confirmation failed: {str(e)}" + + # ==================== HTTP Methods ==================== + + async def get( + self, + endpoint: str, + headers: Optional[dict] = None, + __request__=None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Perform a GET request. + + :param endpoint: API endpoint path (e.g., "/api/v1/users") or full URL + :param headers: Optional additional headers + :return: Formatted response string + """ + return await self._make_request( + "GET", + endpoint, + headers=headers, + __request__=__request__, + __event_emitter__=__event_emitter__, + ) + + async def post( + self, + endpoint: str, + body: dict, + headers: Optional[dict] = None, + confirm: bool = False, + __request__=None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + __event_call__: Callable[[dict], Any] = None, + ) -> str: + """ + Perform a POST request with JSON body. + + :param endpoint: API endpoint path or full URL + :param body: JSON body to send + :param headers: Optional additional headers + :param confirm: Require user confirmation (default False) + :return: Formatted response string + """ + if confirm: + error = await self._confirm_action("POST", endpoint, body, __event_call__) + if error: + return self._format_response("POST", endpoint, 0, error=error) + + return await self._make_request( + "POST", + endpoint, + headers=headers, + body=body, + __request__=__request__, + __event_emitter__=__event_emitter__, + ) + + async def put( + self, + endpoint: str, + body: dict, + headers: Optional[dict] = None, + confirm: bool = True, + __request__=None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + __event_call__: Callable[[dict], Any] = None, + ) -> str: + """ + Perform a PUT request to replace a resource. + + :param endpoint: API endpoint path or full URL + :param body: JSON body representing the full resource + :param headers: Optional additional headers + :param confirm: Require user confirmation (default True) + :return: Formatted response string + """ + if confirm: + error = await self._confirm_action("PUT", endpoint, body, __event_call__) + if error: + return self._format_response("PUT", endpoint, 0, error=error) + + return await self._make_request( + "PUT", + endpoint, + headers=headers, + body=body, + __request__=__request__, + __event_emitter__=__event_emitter__, + ) + + async def patch( + self, + endpoint: str, + body: dict, + headers: Optional[dict] = None, + confirm: bool = False, + __request__=None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + __event_call__: Callable[[dict], Any] = None, + ) -> str: + """ + Perform a PATCH request for partial updates. + + :param endpoint: API endpoint path or full URL + :param body: JSON body with fields to update + :param headers: Optional additional headers + :param confirm: Require user confirmation (default False) + :return: Formatted response string + """ + if confirm: + error = await self._confirm_action("PATCH", endpoint, body, __event_call__) + if error: + return self._format_response("PATCH", endpoint, 0, error=error) + + return await self._make_request( + "PATCH", + endpoint, + headers=headers, + body=body, + __request__=__request__, + __event_emitter__=__event_emitter__, + ) + + async def delete( + self, + endpoint: str, + headers: Optional[dict] = None, + confirm: bool = True, + __request__=None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + __event_call__: Callable[[dict], Any] = None, + ) -> str: + """ + Perform a DELETE request. + + :param endpoint: API endpoint path or full URL + :param headers: Optional additional headers + :param confirm: Require user confirmation (default True) + :return: Formatted response string + """ + if confirm: + error = await self._confirm_action("DELETE", endpoint, None, __event_call__) + if error: + return self._format_response("DELETE", endpoint, 0, error=error) + + return await self._make_request( + "DELETE", + endpoint, + headers=headers, + __request__=__request__, + __event_emitter__=__event_emitter__, + ) + + async def head( + self, + endpoint: str, + headers: Optional[dict] = None, + __request__=None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Perform a HEAD request to check resource existence. + + :param endpoint: API endpoint path or full URL + :param headers: Optional additional headers + :return: Formatted response with status and headers + """ + original_setting = self.valves.INCLUDE_HEADERS + self.valves.INCLUDE_HEADERS = True + + result = await self._make_request( + "HEAD", + endpoint, + headers=headers, + __request__=__request__, + __event_emitter__=__event_emitter__, + ) + + self.valves.INCLUDE_HEADERS = original_setting + return result + + async def options( + self, + endpoint: str, + headers: Optional[dict] = None, + __request__=None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Perform an OPTIONS request to discover allowed methods. + + :param endpoint: API endpoint path or full URL + :param headers: Optional additional headers + :return: Formatted response with allowed methods + """ + original_setting = self.valves.INCLUDE_HEADERS + self.valves.INCLUDE_HEADERS = True + + result = await self._make_request( + "OPTIONS", + endpoint, + headers=headers, + __request__=__request__, + __event_emitter__=__event_emitter__, + ) + + self.valves.INCLUDE_HEADERS = original_setting + return result + + async def request( + self, + method: str, + endpoint: str, + body: Optional[dict] = None, + headers: Optional[dict] = None, + confirm: bool = False, + __request__=None, + __user__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + __event_call__: Callable[[dict], Any] = None, + ) -> str: + """ + Generic HTTP request for any method. + + :param method: HTTP method (GET, POST, PUT, PATCH, DELETE, etc.) + :param endpoint: API endpoint path or full URL + :param body: Optional JSON body + :param headers: Optional additional headers + :param confirm: Require user confirmation for mutating operations + :return: Formatted response string + """ + method = method.upper() + + if confirm and method in ("POST", "PUT", "PATCH", "DELETE"): + error = await self._confirm_action(method, endpoint, body, __event_call__) + if error: + return self._format_response(method, endpoint, 0, error=error) + + return await self._make_request( + method, + endpoint, + headers=headers, + body=body, + __request__=__request__, + __event_emitter__=__event_emitter__, + )