402 lines
22 KiB
Python
402 lines
22 KiB
Python
"""
|
|
title: Venice.ai Info
|
|
author: Jeff Smith
|
|
version: 1.2.0
|
|
license: MIT
|
|
required_open_webui_version: 0.6.0
|
|
requirements: httpx, pydantic
|
|
description: |
|
|
Venice.ai reference utility - account status and model discovery.
|
|
- Check DIEM balance and rate limits
|
|
- List available models by type
|
|
- Look up model capabilities and pricing
|
|
- List image style presets
|
|
- List model traits (semantic mappings)
|
|
- List compatibility mappings (external model aliases)
|
|
|
|
This is a read-only info tool. Use venice_image or venice_chat for actions.
|
|
|
|
v1.2.0: Added VeniceInfo namespace class for helper functions to avoid
|
|
method collisions with Open WebUI framework introspection.
|
|
changelog:
|
|
1.2.0:
|
|
- Added VeniceInfo namespace class for helper functions
|
|
- Moved get_api_key to VeniceInfo namespace
|
|
- Prevents Open WebUI framework introspection method name collisions
|
|
"""
|
|
|
|
from typing import Callable, Any
|
|
from pydantic import BaseModel, Field
|
|
import httpx
|
|
|
|
|
|
class VeniceInfo:
|
|
"""
|
|
Namespaced helpers for Venice info operations.
|
|
|
|
Using a separate class prevents Open WebUI framework introspection
|
|
from colliding with tool methods that have generic names like _get_api_key.
|
|
"""
|
|
|
|
@staticmethod
|
|
def get_api_key(valves, user_valves, __user__: dict = None) -> str:
|
|
"""Get API key with UserValves priority."""
|
|
if __user__ and "valves" in __user__:
|
|
user_valves_dict = __user__.get("valves")
|
|
if isinstance(user_valves_dict, dict) and user_valves_dict.get("VENICE_API_KEY"):
|
|
return user_valves_dict["VENICE_API_KEY"]
|
|
return user_valves.VENICE_API_KEY or valves.VENICE_API_KEY
|
|
|
|
|
|
class Tools:
|
|
"""
|
|
Venice.ai information and reference tool.
|
|
Check account status, discover models, and look up capabilities.
|
|
All methods are read-only and don't consume DIEM.
|
|
"""
|
|
|
|
class Valves(BaseModel):
|
|
"""Admin configuration."""
|
|
|
|
VENICE_API_KEY: str = Field(default="", description="Venice.ai API key (admin default)")
|
|
DIEM_WARNING_THRESHOLD: float = Field(default=1.0, description="DIEM balance below this triggers warning")
|
|
DAILY_DIEM_ALLOCATION: float = Field(default=8.10, description="Daily DIEM allocation for usage calc")
|
|
TIMEOUT: int = Field(default=30, description="API request timeout in seconds")
|
|
|
|
class UserValves(BaseModel):
|
|
"""Per-user configuration."""
|
|
|
|
VENICE_API_KEY: str = Field(default="", description="Your Venice.ai API key")
|
|
|
|
def __init__(self):
|
|
self.valves = self.Valves()
|
|
self.user_valves = self.UserValves()
|
|
self.citation = False
|
|
|
|
async def check_balance(self, show_rate_limits: bool = False, __user__: dict = None, __event_emitter__: Callable[[dict], Any] = None) -> str:
|
|
api_key = VeniceInfo.get_api_key(self.valves, self.user_valves, __user__)
|
|
if not api_key:
|
|
return "Check Balance\nStatus: 0\nError: API key not configured."
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"description": "Checking balance...", "done": False}})
|
|
try:
|
|
async with httpx.AsyncClient(timeout=float(self.valves.TIMEOUT)) as client:
|
|
response = await client.get("https://api.venice.ai/api/v1/api_keys/rate_limits", headers={"Authorization": f"Bearer {api_key}"})
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"done": True}})
|
|
data = result.get("data", {})
|
|
balances = data.get("balances", {})
|
|
tier = data.get("apiTier", {}).get("id", "unknown")
|
|
next_epoch = data.get("nextEpochBegins", "unknown")
|
|
diem = balances.get("DIEM", 0)
|
|
usd = balances.get("USD", 0)
|
|
daily = self.valves.DAILY_DIEM_ALLOCATION
|
|
used = max(0, daily - diem)
|
|
usage_pct = (used / daily * 100) if daily > 0 else 0
|
|
threshold = self.valves.DIEM_WARNING_THRESHOLD
|
|
if diem < threshold:
|
|
status = f"⚠️ LOW (below {threshold} DIEM)"
|
|
elif diem < daily * 0.25:
|
|
status = "⚡ Getting low"
|
|
else:
|
|
status = "✓ OK"
|
|
lines = ["Check Balance", "Status: 200", "", f"Tier: {tier}", f"Balance: {diem:.2f} DIEM (≈ ${diem:.2f} USD) {status}", f"Used today: {used:.2f} DIEM of {daily:.2f} DIEM ({usage_pct:.0f}%)", f"Resets: {next_epoch}", "", "Note: 1 DIEM = $1 USD."]
|
|
if usd < 0:
|
|
lines.append(f"USD Overage: ${usd:.4f}")
|
|
if show_rate_limits:
|
|
rate_limits = data.get("rateLimits", [])
|
|
with_limits = []
|
|
for limit in rate_limits:
|
|
model_id = limit.get("apiModelId", "")
|
|
limits = limit.get("rateLimits", [])
|
|
if not limits:
|
|
continue
|
|
parts = []
|
|
for rl in limits:
|
|
t, a = rl.get("type", ""), rl.get("amount", 0)
|
|
if t == "RPM":
|
|
parts.append(f"{a} RPM")
|
|
elif t == "TPM":
|
|
parts.append(f"{a//1000}K TPM")
|
|
if parts:
|
|
with_limits.append(f" {model_id}: {', '.join(parts)}")
|
|
if with_limits:
|
|
lines.append("")
|
|
lines.append(f"Rate Limits ({len(with_limits)} models):")
|
|
lines.extend(sorted(with_limits))
|
|
return "\n".join(lines)
|
|
except httpx.HTTPStatusError as e:
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"done": True}})
|
|
return f"Check Balance\nStatus: {e.response.status_code}\nError: {e.response.text[:200]}"
|
|
except Exception as e:
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"done": True}})
|
|
return f"Check Balance\nStatus: 0\nError: {type(e).__name__}: {e}"
|
|
|
|
async def list_models(self, model_type: str = "image", __user__: dict = None, __event_emitter__: Callable[[dict], Any] = None) -> str:
|
|
valid_types = ["image", "text", "video", "embedding", "tts"]
|
|
if model_type not in valid_types:
|
|
return f"List Models\nStatus: 0\nError: Invalid type '{model_type}'. Valid: {', '.join(valid_types)}"
|
|
api_key = VeniceInfo.get_api_key(self.valves, self.user_valves, __user__)
|
|
if not api_key:
|
|
return "List Models\nStatus: 0\nError: API key not configured."
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"description": f"Fetching {model_type} models...", "done": False}})
|
|
try:
|
|
async with httpx.AsyncClient(timeout=float(self.valves.TIMEOUT)) as client:
|
|
response = await client.get(f"https://api.venice.ai/api/v1/models?type={model_type}", headers={"Authorization": f"Bearer {api_key}"})
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"done": True}})
|
|
models = result.get("data", [])
|
|
if not models:
|
|
return f"List Models ({model_type})\nStatus: 200\nResult: No models available"
|
|
lines = [f"List Models ({model_type})", "Status: 200", "", "Prices in DIEM (1 DIEM = $1 USD)", ""]
|
|
for model in models:
|
|
mid = model.get("id", "unknown")
|
|
spec = model.get("model_spec", {})
|
|
name = spec.get("name", mid)
|
|
offline = spec.get("offline", False)
|
|
beta = spec.get("betaModel", False)
|
|
pricing = spec.get("pricing", {})
|
|
if model_type == "image":
|
|
if "generation" in pricing:
|
|
p = pricing.get("generation", {}).get("usd", 0)
|
|
price = f"{p:.3f} DIEM/img"
|
|
elif "resolutions" in pricing:
|
|
res_pricing = pricing.get("resolutions", {})
|
|
res_parts = [f"{res}:{p.get('usd', 0):.2f}" for res, p in res_pricing.items()]
|
|
price = f"{' | '.join(res_parts)} DIEM"
|
|
else:
|
|
price = ""
|
|
elif model_type == "text":
|
|
i = pricing.get("input", {}).get("usd", 0)
|
|
o = pricing.get("output", {}).get("usd", 0)
|
|
price = f"{i:.4f}/{o:.4f} DIEM/1M"
|
|
elif model_type == "video":
|
|
p = pricing.get("generation", {}).get("usd", 0)
|
|
price = f"{p:.2f} DIEM/vid"
|
|
else:
|
|
price = ""
|
|
caps = spec.get("capabilities", {})
|
|
cap_list = []
|
|
if caps.get("supportsVision"):
|
|
cap_list.append("vision")
|
|
if caps.get("supportsFunctionCalling"):
|
|
cap_list.append("tools")
|
|
if caps.get("supportsReasoning"):
|
|
cap_list.append("reasoning")
|
|
if caps.get("supportsWebSearch"):
|
|
cap_list.append("web")
|
|
if model_type == "image" and spec.get("supportsWebSearch"):
|
|
cap_list.append("web")
|
|
parts = [f" {mid}"]
|
|
if name != mid:
|
|
partsname})")
|
|
if price:
|
|
.append(f"({ parts.append(price)
|
|
if cap_list:
|
|
parts.append(f"[{', '.join(cap_list)}]")
|
|
if beta:
|
|
parts.append("BETA")
|
|
if offline:
|
|
parts.append("OFFLINE")
|
|
lines.append(" ".join(parts))
|
|
lines.append("")
|
|
lines.append(f"Total: {len(models)} {model_type} models")
|
|
return "\n".join(lines)
|
|
except httpx.HTTPStatusError as e:
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"done": True}})
|
|
return f"List Models\nStatus: {e.response.status_code}\nError: {e.response.text[:200]}"
|
|
except Exception as e:
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"done": True}})
|
|
return f"List Models\nStatus: 0\nError: {type(e).__name__}: {e}"
|
|
|
|
async def list_styles(self, __user__: dict = None, __event_emitter__: Callable[[dict], Any] = None) -> str:
|
|
api_key = VeniceInfo.get_api_key(self.valves, self.user_valves, __user__)
|
|
if not api_key:
|
|
return "List Styles\nStatus: 0\nError: API key not configured."
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"description": "Fetching styles...", "done": False}})
|
|
try:
|
|
async with httpx.AsyncClient(timeout=float(self.valves.TIMEOUT)) as client:
|
|
response = await client.get("https://api.venice.ai/api/v1/image/styles", headers={"Authorization": f"Bearer {api_key}"})
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"done": True}})
|
|
styles = result.get("data", [])
|
|
lines = ["List Styles", "Status: 200", "", "Image Style Presets:"]
|
|
for style in sorted(styles):
|
|
lines.append(f" - {style}")
|
|
lines.append("")
|
|
lines.append(f"Total: {len(styles)} styles")
|
|
lines.append("Usage: Pass as 'style_preset' to image generation")
|
|
return "\n".join(lines)
|
|
except httpx.HTTPStatusError as e:
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"done": True}})
|
|
return f"List Styles\nStatus: {e.response.status_code}\nError: {e.response.text[:200]}"
|
|
except Exception as e:
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"done": True}})
|
|
return f"List Styles\nStatus: 0\nError: {type(e).__name__}: {e}"
|
|
|
|
async def list_traits(self, __user__: dict = None, __event_emitter__: Callable[[dict], Any] = None) -> str:
|
|
api_key = VeniceInfo.get_api_key(self.valves, self.user_valves, __user__)
|
|
if not api_key:
|
|
return "List Traits\nStatus: 0\nError: API key not configured."
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"description": "Fetching traits...", "done": False}})
|
|
try:
|
|
async with httpx.AsyncClient(timeout=float(self.valves.TIMEOUT)) as client:
|
|
response = await client.get("https://api.venice.ai/api/v1/models/traits", headers={"Authorization": f"Bearer {api_key}"})
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"done": True}})
|
|
traits = result.get("data", {})
|
|
lines = ["List Traits", "Status: 200", "", "Model Traits (semantic mappings):", ""]
|
|
for trait_name in sorted(traits.keys()):
|
|
model_id = traits[trait_name]
|
|
lines.append(f" {trait_name}: {model_id}")
|
|
lines.append("")
|
|
lines.append(f"Total: {len(traits)} traits")
|
|
lines.append("")
|
|
lines.append("Usage: Request models by trait for automatic selection.")
|
|
return "\n".join(lines)
|
|
except httpx.HTTPStatusError as e:
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"done": True}})
|
|
return f"List Traits\nStatus: {e.response.status_code}\nError: {e.response.text[:200]}"
|
|
except Exception as e:
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"done": True}})
|
|
return f"List Traits\nStatus: 0\nError: {type(e).__name__}: {e}"
|
|
|
|
async def list_compatibility(self, __user__: dict = None, __event_emitter__: Callable[[dict], Any] = None) -> str:
|
|
api_key = VeniceInfo.get_api_key(self.valves, self.user_valves, __user__)
|
|
if not api_key:
|
|
return "List Compatibility\nStatus: 0\nError: API key not configured."
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"description": "Fetching compatibility mappings...", "done": False}})
|
|
try:
|
|
async with httpx.AsyncClient(timeout=float(self.valves.TIMEOUT)) as client:
|
|
response = await client.get("https://api.venice.ai/api/v1/models/compatibility_mapping", headers={"Authorization": f"Bearer {api_key}"})
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"done": True}})
|
|
mappings = result.get("data", {})
|
|
lines = ["List Compatibility Mappings", "Status: 200", "", "External model names mapped to Venice equivalents:", ""]
|
|
by_target: dict[str, list[str]] = {}
|
|
for external_name, venice_model in mappings.items():
|
|
if venice_model not in by_target:
|
|
by_target[venice_model] = []
|
|
by_target[venice_model].append(external_name)
|
|
for venice_model in sorted(by_target.keys()):
|
|
external_names = sorted(by_target[venice_model])
|
|
lines.append(f" {venice_model}:")
|
|
for ext_name in external_names:
|
|
lines.append(f" <- {ext_name}")
|
|
lines.append("")
|
|
lines.append(f"Total: {len(mappings)} mappings to {len(by_target)} Venice models")
|
|
lines.append("")
|
|
lines.append("Usage: Use external names (gpt-4o, etc.) for compatibility.")
|
|
return "\n".join(lines)
|
|
except httpx.HTTPStatusError as e:
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"done": True}})
|
|
return f"List Compatibility\nStatus: {e.response.status_code}\nError: {e.response.text[:200]}"
|
|
except Exception as e:
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"done": True}})
|
|
return f"List Compatibility\nStatus: 0\nError: {type(e).__name__}: {e}"
|
|
|
|
async def get_model_info(self, model_id: str, __user__: dict = None, __event_emitter__: Callable[[dict], Any] = None) -> str:
|
|
api_key = VeniceInfo.get_api_key(self.valves, self.user_valves, __user__)
|
|
if not api_key:
|
|
return "Get Model Info\nStatus: 0\nError: API key not configured."
|
|
if not model_id:
|
|
return "Get Model Info\nStatus: 0\nError: model_id required"
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"description": f"Looking up {model_id}...", "done": False}})
|
|
for model_type in ["text", "image", "video", "embedding", "tts"]:
|
|
try:
|
|
async with httpx.AsyncClient(timeout=float(self.valves.TIMEOUT)) as client:
|
|
response = await client.get(f"https://api.venice.ai/api/v1/models?type={model_type}", headers={"Authorization": f"Bearer {api_key}"})
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
for model in result.get("data", []):
|
|
if model.get("id") == model_id:
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"done": True}})
|
|
spec = model.get("model_spec", {})
|
|
lines = [f"Get Model Info ({model_id})", "Status: 200", "", f"Name: {spec.get('name', model_id)}", f"Type: {model_type}", f"Privacy: {spec.get('privacy', 'unknown')}", f"Offline: {spec.get('offline', False)}", f"Beta: {spec.get('betaModel', False)}"]
|
|
desc = spec.get("description")
|
|
if desc:
|
|
lines.append(f"Description: {desc}")
|
|
ctx = spec.get("availableContextTokens")
|
|
if ctx:
|
|
lines.append(f"Context: {ctx:,} tokens")
|
|
traits = spec.get("traits", [])
|
|
if traits:
|
|
lines.append(f"Traits: {', '.join(traits)}")
|
|
constraints = spec.get("constraints", {})
|
|
if constraints:
|
|
lines.append("")
|
|
lines.append("Constraints:")
|
|
if "promptCharacterLimit" in constraints:
|
|
lines.append(f" Prompt limit: {constraints['promptCharacterLimit']:,} chars")
|
|
if "steps" in constraints:
|
|
steps = constraints["steps"]
|
|
lines.append(f" Steps: default={steps.get('default')}, max={steps.get('max')}")
|
|
if "resolutions" in constraints:
|
|
lines.append(f" Resolutions: {', '.join(constraints['resolutions'])}")
|
|
pricing = spec.get("pricing", {})
|
|
if pricing:
|
|
lines.append("")
|
|
lines.append("Pricing (1 DIEM = $1 USD):")
|
|
if "input" in pricing:
|
|
p = pricing["input"].get("usd", 0)
|
|
lines.append(f" Input: {p:.4f} DIEM/1M tokens")
|
|
if "output" in pricing:
|
|
p = pricing["output"].get("usd", 0)
|
|
lines.append(f" Output: {p:.4f} DIEM/1M tokens")
|
|
if "generation" in pricing:
|
|
p = pricing["generation"].get("usd", 0)
|
|
lines.append(f" Generation: {p:.4f} DIEM")
|
|
if "resolutions" in pricing:
|
|
lines.append(" Resolution-based:")
|
|
for res, price in pricing["resolutions"].items():
|
|
lines.append(f" {res}: {price.get('usd', 0):.2f} DIEM")
|
|
if "upscale" in pricing:
|
|
lines.append(" Upscale:")
|
|
for scale, price in pricing["upscale"].items():
|
|
lines.append(f" {scale}: {price.get('usd', 0):.2f} DIEM")
|
|
caps = spec.get("capabilities", {})
|
|
active_caps = [k for k, v in caps.items() if v]
|
|
if model_type == "image" and spec.get("supportsWebSearch"):
|
|
active_caps.append("supportsWebSearch")
|
|
if active_caps:
|
|
lines.append("")
|
|
lines.append("Capabilities:")
|
|
for cap in active_caps:
|
|
lines.append(f" - {cap}")
|
|
source = spec.get("modelSource")
|
|
if source:
|
|
lines.append("")
|
|
lines.append(f"Source: {source}")
|
|
return "\n".join(lines)
|
|
except Exception:
|
|
continue
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"done": True}})
|
|
return f"Get Model Info ({model_id})\nStatus: 404\nError: Model not found"
|