Add http/client.py

This commit is contained in:
2026-01-15 13:35:29 +00:00
parent 585710bb38
commit 8903614e84

559
http/client.py Normal file
View File

@@ -0,0 +1,559 @@
"""
title: HTTP Client Tool
author: Jeff Smith
version: 0.4.1
license: MIT
required_open_webui_version: 0.6.0
requirements: httpx, pydantic
description: |
A curl-like HTTP client for Open WebUI.
- Supports full URLs or relative endpoints
- Auto-discovers URL and auth for self-API calls (current platform)
- UserValves for per-user external API configuration
- Supports GET, POST, PUT, PATCH, DELETE, HEAD, OPTIONS
All methods return strings for reliable LLM context injection.
"""
from typing import Callable, Any, Optional
import httpx
import json
import asyncio
import random
from pydantic import BaseModel, Field
class Tools:
"""
HTTP client tool for API testing.
Configuration priority:
1. Full URL in endpoint - Used directly (e.g., https://api.example.com/v1/resource)
2. UserValves.EXTERNAL_BASE_URL - Per-user external API config
3. Auto-discover from __request__ - Current platform (self-API)
All methods return strings for reliable LLM context injection.
"""
class Valves(BaseModel):
"""
Admin configuration.
Shared defaults for all users.
"""
AUTO_DISCOVER: bool = Field(
default=True,
description="Auto-discover URL and auth from request context for self-API calls",
)
TIMEOUT: int = Field(default=30, description="Request timeout in seconds")
RETRIES: int = Field(
default=3, description="Number of retry attempts for failed requests"
)
MAX_RESPONSE_SIZE: int = Field(
default=8192,
description="Maximum response size in characters (0 for unlimited)",
)
INCLUDE_HEADERS: bool = Field(
default=False, description="Include response headers in output"
)
class UserValves(BaseModel):
"""
Per-user configuration for external API access.
Each user can configure their own external endpoints.
"""
EXTERNAL_BASE_URL: str = Field(
default="",
description="Your external API base URL (leave empty to use current platform)",
)
EXTERNAL_API_KEY: str = Field(
default="", description="Your API key for external endpoints"
)
def __init__(self):
self.valves = self.Valves()
self.user_valves = self.UserValves()
self.citation = False
# ==================== Configuration ====================
def _get_config(self, __request__=None) -> tuple[str, dict]:
"""
Get API configuration with priority:
1. UserValves (per-user external API config)
2. Auto-discover from request (self-API)
Returns (base_url, headers) tuple.
"""
base_url = ""
headers = {}
# Priority 1: UserValves for external API (per-user config)
if self.user_valves.EXTERNAL_BASE_URL:
base_url = self.user_valves.EXTERNAL_BASE_URL
if self.user_valves.EXTERNAL_API_KEY:
headers["Authorization"] = f"Bearer {self.user_valves.EXTERNAL_API_KEY}"
return base_url.rstrip("/"), headers
# Priority 2: Auto-discover from request context (self-API)
if self.valves.AUTO_DISCOVER and __request__:
base_url = str(getattr(__request__, "base_url", "") or "")
req_headers = getattr(__request__, "headers", {})
if req_headers:
auth = dict(req_headers).get("authorization", "")
if auth:
headers["Authorization"] = auth
return base_url.rstrip("/"), headers
# ==================== Internal Helpers ====================
def _truncate(self, text: str) -> str:
"""Truncate response to configured max size."""
max_size = self.valves.MAX_RESPONSE_SIZE
if max_size and len(text) > max_size:
return (
text[:max_size]
+ f"\n\n[...truncated, {len(text) - max_size} chars omitted]"
)
return text
def _format_response(
self,
method: str,
url: str,
status_code: int,
response_body: Any = None,
headers: Optional[dict] = None,
error: Optional[str] = None,
) -> str:
"""Format response as string for LLM context injection."""
parts = [f"{method} {url}", f"Status: {status_code}"]
if self.valves.INCLUDE_HEADERS and headers:
header_str = "\n".join(f" {k}: {v}" for k, v in headers.items())
parts.append(f"Headers:\n{header_str}")
if error:
parts.append(f"Error: {error}")
elif response_body is not None:
if isinstance(response_body, (dict, list)):
body_str = json.dumps(response_body, indent=2)
else:
body_str = str(response_body)
parts.append(f"Body:\n{body_str}")
return self._truncate("\n".join(parts))
async def _make_request(
self,
method: str,
endpoint: str,
headers: Optional[dict] = None,
body: Optional[dict] = None,
__request__=None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
Make HTTP request with retries and auto-discovered config.
Returns formatted string for LLM context.
"""
# Check if endpoint is already a full URL
if endpoint.startswith(("http://", "https://")):
url = endpoint
# Still get headers from open_webui.config for auth if needed
_, default_headers = self._get_config(__request__)
else:
base_url, default_headers = self._get_config(__request__)
if not base_url:
return self._format_response(
method,
endpoint,
0,
error="Could not determine API URL. Set EXTERNAL_BASE_URL in your user settings or enable AUTO_DISCOVER for self-API.",
)
# Ensure proper path joining
if endpoint and not endpoint.startswith("/"):
endpoint = "/" + endpoint
url = f"{base_url}{endpoint}"
# Merge headers
if body is not None:
default_headers["Content-Type"] = "application/json"
default_headers.update(headers or {})
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": f"{method} {endpoint}", "done": False},
}
)
last_error = None
retries = max(1, self.valves.RETRIES)
for attempt in range(1, retries + 1):
try:
async with httpx.AsyncClient(
headers=default_headers,
timeout=float(self.valves.TIMEOUT),
) as client:
response = await client.request(method, url, json=body)
# Parse response body
content_type = response.headers.get("content-type", "")
if "application/json" in content_type:
try:
response_body = response.json()
except json.JSONDecodeError:
response_body = response.text
else:
response_body = response.text
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"{method} {endpoint}",
"done": True,
},
}
)
return self._format_response(
method=method,
url=url,
status_code=response.status_code,
response_body=response_body,
headers=(
dict(response.headers)
if self.valves.INCLUDE_HEADERS
else None
),
)
except httpx.TimeoutException:
last_error = "Request timeout"
except httpx.RequestError as e:
last_error = f"Request failed: {str(e)}"
# Retry with exponential backoff
if attempt < retries:
wait = (0.5 * (2 ** (attempt - 1))) + random.uniform(0, 0.25)
await asyncio.sleep(wait)
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"{method} {endpoint} failed",
"done": True,
},
}
)
return self._format_response(
method=method,
url=url,
status_code=0,
error=last_error,
)
async def _confirm_action(
self,
action: str,
endpoint: str,
body: Optional[dict] = None,
__event_call__: Callable[[dict], Any] = None,
) -> Optional[str]:
"""Request user confirmation. Returns None if confirmed, error string if cancelled."""
if not __event_call__:
return None
payload_preview = ""
if body:
preview = json.dumps(body, indent=2)
if len(preview) > 200:
preview = preview[:200] + "..."
payload_preview = f"\n\nPayload:\n{preview}"
try:
confirmation = await __event_call__(
{
"type": "confirmation",
"data": {
"title": f"Confirm {action}",
"message": f"{action} {endpoint}{payload_preview}\n\nProceed?",
},
}
)
if confirmation is None:
return f"{action} cancelled - no response"
if isinstance(confirmation, dict) and not confirmation.get(
"confirmed", False
):
return f"{action} cancelled by user"
if confirmation is False:
return f"{action} cancelled by user"
return None
except Exception as e:
return f"Confirmation failed: {str(e)}"
# ==================== HTTP Methods ====================
async def get(
self,
endpoint: str,
headers: Optional[dict] = None,
__request__=None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
Perform a GET request.
:param endpoint: API endpoint path (e.g., "/api/v1/users") or full URL
:param headers: Optional additional headers
:return: Formatted response string
"""
return await self._make_request(
"GET",
endpoint,
headers=headers,
__request__=__request__,
__event_emitter__=__event_emitter__,
)
async def post(
self,
endpoint: str,
body: dict,
headers: Optional[dict] = None,
confirm: bool = False,
__request__=None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
__event_call__: Callable[[dict], Any] = None,
) -> str:
"""
Perform a POST request with JSON body.
:param endpoint: API endpoint path or full URL
:param body: JSON body to send
:param headers: Optional additional headers
:param confirm: Require user confirmation (default False)
:return: Formatted response string
"""
if confirm:
error = await self._confirm_action("POST", endpoint, body, __event_call__)
if error:
return self._format_response("POST", endpoint, 0, error=error)
return await self._make_request(
"POST",
endpoint,
headers=headers,
body=body,
__request__=__request__,
__event_emitter__=__event_emitter__,
)
async def put(
self,
endpoint: str,
body: dict,
headers: Optional[dict] = None,
confirm: bool = True,
__request__=None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
__event_call__: Callable[[dict], Any] = None,
) -> str:
"""
Perform a PUT request to replace a resource.
:param endpoint: API endpoint path or full URL
:param body: JSON body representing the full resource
:param headers: Optional additional headers
:param confirm: Require user confirmation (default True)
:return: Formatted response string
"""
if confirm:
error = await self._confirm_action("PUT", endpoint, body, __event_call__)
if error:
return self._format_response("PUT", endpoint, 0, error=error)
return await self._make_request(
"PUT",
endpoint,
headers=headers,
body=body,
__request__=__request__,
__event_emitter__=__event_emitter__,
)
async def patch(
self,
endpoint: str,
body: dict,
headers: Optional[dict] = None,
confirm: bool = False,
__request__=None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
__event_call__: Callable[[dict], Any] = None,
) -> str:
"""
Perform a PATCH request for partial updates.
:param endpoint: API endpoint path or full URL
:param body: JSON body with fields to update
:param headers: Optional additional headers
:param confirm: Require user confirmation (default False)
:return: Formatted response string
"""
if confirm:
error = await self._confirm_action("PATCH", endpoint, body, __event_call__)
if error:
return self._format_response("PATCH", endpoint, 0, error=error)
return await self._make_request(
"PATCH",
endpoint,
headers=headers,
body=body,
__request__=__request__,
__event_emitter__=__event_emitter__,
)
async def delete(
self,
endpoint: str,
headers: Optional[dict] = None,
confirm: bool = True,
__request__=None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
__event_call__: Callable[[dict], Any] = None,
) -> str:
"""
Perform a DELETE request.
:param endpoint: API endpoint path or full URL
:param headers: Optional additional headers
:param confirm: Require user confirmation (default True)
:return: Formatted response string
"""
if confirm:
error = await self._confirm_action("DELETE", endpoint, None, __event_call__)
if error:
return self._format_response("DELETE", endpoint, 0, error=error)
return await self._make_request(
"DELETE",
endpoint,
headers=headers,
__request__=__request__,
__event_emitter__=__event_emitter__,
)
async def head(
self,
endpoint: str,
headers: Optional[dict] = None,
__request__=None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
Perform a HEAD request to check resource existence.
:param endpoint: API endpoint path or full URL
:param headers: Optional additional headers
:return: Formatted response with status and headers
"""
original_setting = self.valves.INCLUDE_HEADERS
self.valves.INCLUDE_HEADERS = True
result = await self._make_request(
"HEAD",
endpoint,
headers=headers,
__request__=__request__,
__event_emitter__=__event_emitter__,
)
self.valves.INCLUDE_HEADERS = original_setting
return result
async def options(
self,
endpoint: str,
headers: Optional[dict] = None,
__request__=None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
Perform an OPTIONS request to discover allowed methods.
:param endpoint: API endpoint path or full URL
:param headers: Optional additional headers
:return: Formatted response with allowed methods
"""
original_setting = self.valves.INCLUDE_HEADERS
self.valves.INCLUDE_HEADERS = True
result = await self._make_request(
"OPTIONS",
endpoint,
headers=headers,
__request__=__request__,
__event_emitter__=__event_emitter__,
)
self.valves.INCLUDE_HEADERS = original_setting
return result
async def request(
self,
method: str,
endpoint: str,
body: Optional[dict] = None,
headers: Optional[dict] = None,
confirm: bool = False,
__request__=None,
__user__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
__event_call__: Callable[[dict], Any] = None,
) -> str:
"""
Generic HTTP request for any method.
:param method: HTTP method (GET, POST, PUT, PATCH, DELETE, etc.)
:param endpoint: API endpoint path or full URL
:param body: Optional JSON body
:param headers: Optional additional headers
:param confirm: Require user confirmation for mutating operations
:return: Formatted response string
"""
method = method.upper()
if confirm and method in ("POST", "PUT", "PATCH", "DELETE"):
error = await self._confirm_action(method, endpoint, body, __event_call__)
if error:
return self._format_response(method, endpoint, 0, error=error)
return await self._make_request(
method,
endpoint,
headers=headers,
body=body,
__request__=__request__,
__event_emitter__=__event_emitter__,
)