""" title: Venice.ai Chat author: Jeff Smith version: 1.4.0 license: MIT required_open_webui_version: 0.6.0 requirements: httpx, pydantic description: | Chat completions using Venice.ai API. Enables LLM-to-LLM conversations, web search queries, and reasoning model access. Each user can configure their own API key. Model selection: - Empty/omit: Auto-selects via Venice traits API - "self": Uses the calling model's base (if it's a Venice model) - Explicit ID: Validates model exists before calling Use venice_info/list_models("text") to discover available models. changelog: 1.4.0: - Added VeniceChat namespace class for helper functions to avoid method collisions - Moved _get_api_key, _truncate, _format_error to VeniceChat namespace - Prevents Open WebUI framework introspection method name collisions 1.3.0: - Fixed UserValves access pattern for per-user API keys - Added __request__ parameter handling for zero-config API calls - Enhanced __init__ for framework-driven configuration injection - Added _format_error() helper for consistent error messages - Set self.citation = True for tool usage visibility - Improved response formatting consistency """ from typing import Optional, Callable, Any, Dict from pydantic import BaseModel, Field import httpx import json import time class VeniceChat: """ Namespaced helpers for Venice chat operations. Using a separate class prevents Open WebUI framework introspection from colliding with tool methods that have generic names like _get_api_key. """ @staticmethod def get_api_key(valves, user_valves, __user__: dict = None) -> str: """Get API key with UserValves priority.""" # Check __user__ parameter first (for direct method calls) if __user__ and "valves" in __user__: user_valves_dict = __user__.get("valves") if isinstance(user_valves_dict, dict) and user_valves_dict.get("VENICE_API_KEY"): return user_valves_dict["VENICE_API_KEY"] # Fall back to UserValves instance return user_valves.VENICE_API_KEY or valves.VENICE_API_KEY @staticmethod def truncate(text: str, max_size: int) -> str: """Truncate response to max size.""" if max_size and len(text) > max_size: return text[:max_size] + f"\n\n[...{len(text) - max_size} chars omitted]" return text @staticmethod def format_error(e, context: str = "") -> str: """Format HTTP error with detailed context for LLM understanding.""" try: if hasattr(e, "response") and e.response is not None: error_msg = e.response.text[:200] try: error_json = e.response.json() error_msg = error_json.get("message", error_msg) except Exception: pass else: error_msg = str(e)[:200] except Exception: error_msg = str(e)[:200] context_str = f" ({context})" if context else "" return f"Error{context_str}: {error_msg}" class Tools: """ Venice.ai chat completions tool. Query Venice.ai text models for responses, including reasoning models and web search enabled queries. """ class Valves(BaseModel): """Admin configuration.""" VENICE_API_KEY: str = Field( default="", description="Venice.ai API key (admin default)" ) DEFAULT_MODEL: str = Field( default="", description="Default chat model (empty = auto-select via traits)", ) DEFAULT_REASONING_MODEL: str = Field( default="", description="Default reasoning model (empty = auto-select via traits)", ) ENABLE_WEB_SEARCH: bool = Field( default=False, description="Enable web search by default" ) INCLUDE_VENICE_SYSTEM_PROMPT: bool = Field( default=False, description="Include Venice system prompt" ) CHAT_TIMEOUT: int = Field( default=120, description="Timeout for chat requests in seconds" ) REASONING_TIMEOUT: int = Field( default=300, description="Timeout for reasoning requests (longer for complex thinking)", ) MAX_RESPONSE_SIZE: int = Field( default=16384, description="Maximum response size in characters" ) MODEL_CACHE_TTL: int = Field( default=300, description="How long to cache model list (seconds)" ) class UserValves(BaseModel): """Per-user configuration.""" VENICE_API_KEY: str = Field( default="", description="Your Venice.ai API key (overrides admin default)" ) def __init__(self): """Initialize with optional valve configuration from framework""" # Handle valves configuration from framework self.valves = self.Valves() # Enable tool usage visibility for debugging self.citation = True # Handle user valves configuration self.user_valves = self.UserValves() # Simple in-memory cache self._cache: dict = {} self._cache_times: dict = {} def _is_cache_valid(self, key: str) -> bool: """Check if cached data is still valid.""" if key not in self._cache_times: return False return (time.time() - self._cache_times[key]) < self.valves.MODEL_CACHE_TTL async def _get_traits(self, __user__: dict = None) -> dict: """Fetch model traits from Venice (cached).""" cache_key = "traits" if self._is_cache_valid(cache_key): return self._cache.get(cache_key, {}) api_key = VeniceChat.get_api_key(self.valves, self.user_valves, __user__) if not api_key: return {} try: async with httpx.AsyncClient(timeout=30.0) as client: response = await client.get( "https://api.venice.ai/api/v1/models/traits", headers={"Authorization": f"Bearer {api_key}"}, ) if response.status_code == 200: traits = response.json().get("data", {}) self._cache[cache_key] = traits self._cache_times[cache_key] = time.time() return traits except Exception: pass return {} async def _get_available_models( self, model_type: str = "text", __user__: dict = None ) -> list[dict]: """Fetch available models (cached).""" cache_key = f"models_{model_type}" if self._is_cache_valid(cache_key): return self._cache.get(cache_key, []) api_key = VeniceChat.get_api_key(self.valves, self.user_valves, __user__) if not api_key: return [] try: async with httpx.AsyncClient(timeout=30.0) as client: response = await client.get( f"https://api.venice.ai/api/v1/models?type={model_type}", headers={"Authorization": f"Bearer {api_key}"}, ) if response.status_code == 200: models = response.json().get("data", []) self._cache[cache_key] = models self._cache_times[cache_key] = time.time() return models except Exception: pass return [] async def _resolve_model( self, model: Optional[str], model_type: str = "text", require_reasoning: bool = False, __model__: dict = None, __user__: dict = None, ) -> tuple[str, Optional[str]]: """ Resolve model specification to actual model ID with validation. Handles: - Empty/None: Auto-select via traits - "self": Use calling model's base_model_id - Explicit ID: Validate exists and is online Returns (model_id, error_message). """ original_input = model # Handle "self" - use the calling model's base if model and model.lower() == "self": if __model__: base_model = __model__.get("info", {}).get("base_model_id", "") if base_model: model = base_model else: model = None # Fall through to auto-select # Handle explicit Valve defaults if not model: if require_reasoning and self.valves.DEFAULT_REASONING_MODEL: model = self.valves.DEFAULT_REASONING_MODEL elif not require_reasoning and self.valves.DEFAULT_MODEL: model = self.valves.DEFAULT_MODEL # If still no model, try traits API if not model: traits = await self._get_traits(__user__) if require_reasoning: # Try reasoning-specific traits for trait_name in ["default_reasoning", "reasoning", "thinking"]: if trait_name in traits: model = traits[trait_name] break if not model: # General default for trait_name in ["default", "default_text", "fastest"]: if trait_name in traits: model = traits[trait_name] break # If still no model, pick first available with required capability if not model: models = await self._get_available_models(model_type, __user__) for m in models: spec = m.get("model_spec", {}) if spec.get("offline"): continue if require_reasoning: caps = spec.get("capabilities", {}) if not caps.get("supportsReasoning"): continue model = m.get("id") break if not model: if require_reasoning: return ( "", "No reasoning model available. Use venice_info/list_models('text') to find models with [reasoning] capability.", ) return ( "", "No model specified and could not auto-select. Use venice_info/list_models('text') to discover available models.", ) # Validate model exists and is online models = await self._get_available_models(model_type, __user__) model_map = {m.get("id"): m for m in models} if model not in model_map: # Not found - suggest similar models suggestions = [ mid for mid in model_map.keys() if model.lower() in mid.lower() ][:3] suggestion_text = ( f" Similar: {', '.join(suggestions)}" if suggestions else "" ) return ( "", f"Model '{model}' not found.{suggestion_text} Use venice_info/list_models('text') for available models.", ) # Check if offline model_data = model_map[model] if model_data.get("model_spec", {}).get("offline"): # Find alternative alternatives = [] for m in models: spec = m.get("model_spec", {}) if spec.get("offline"): continue if require_reasoning and not spec.get("capabilities", {}).get( "supportsReasoning" ): continue alternatives.append(m.get("id")) if len(alternatives) >= 3: break alt_text = f" Try: {', '.join(alternatives)}" if alternatives else "" return "", f"Model '{model}' is currently offline.{alt_text}" # Check reasoning capability if required if require_reasoning: caps = model_data.get("model_spec", {}).get("capabilities", {}) if not caps.get("supportsReasoning"): # Find models with reasoning reasoning_models = [] for m in models: spec = m.get("model_spec", {}) if spec.get("offline"): continue if spec.get("capabilities", {}).get("supportsReasoning"): reasoning_models.append(m.get("id")) if len(reasoning_models) >= 3: break alt_text = ( f" Models with reasoning: {', '.join(reasoning_models)}" if reasoning_models else "" ) return "", f"Model '{model}' does not support reasoning.{alt_text}" return model, None # ==================== Chat Methods ==================== async def chat( self, message: str, model: Optional[str] = None, system_prompt: Optional[str] = None, temperature: float = 0.7, max_tokens: int = 2048, web_search: Optional[bool] = None, __user__: dict = None, __model__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ Send a message to a Venice.ai chat model. :param message: The message to send :param model: Model ID, "self" for calling model, or empty for auto-select. Use venice_info/list_models("text") to discover. :param system_prompt: Optional system prompt for model behavior :param temperature: Sampling temperature 0-2 (default 0.7) :param max_tokens: Maximum response tokens (default 2048) :param web_search: Enable web search for current information :return: Model response """ api_key = VeniceChat.get_api_key(self.valves, self.user_valves, __user__) if not api_key: return "Error: Venice API key not configured. Set VENICE_API_KEY in UserValves or ask admin." if not message or not message.strip(): return "Error: Message is required" # Resolve and validate model resolved_model, error = await self._resolve_model( model, "text", False, __model__, __user__ ) if error: return f"Error: {error}" enable_web_search = ( web_search if web_search is not None else self.valves.ENABLE_WEB_SEARCH ) if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Querying {resolved_model}...", "done": False, }, } ) # Build messages array messages = [] if system_prompt: messages.append({"role": "system", "content": system_prompt}) messages.append({"role": "user", "content": message}) payload = { "model": resolved_model, "messages": messages, "temperature": temperature, "max_tokens": max_tokens, "stream": False, "venice_parameters": { "enable_web_search": "on" if enable_web_search else "off", "include_venice_system_prompt": self.valves.INCLUDE_VENICE_SYSTEM_PROMPT, }, } try: async with httpx.AsyncClient( timeout=float(self.valves.CHAT_TIMEOUT) ) as client: response = await client.post( "https://api.venice.ai/api/v1/chat/completions", headers={ "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", }, json=payload, ) response.raise_for_status() result = response.json() if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) choices = result.get("choices", []) if not choices: return f"Error: No response from model {resolved_model}" assistant_message = choices[0].get("message", {}) content = assistant_message.get("content", "") reasoning = assistant_message.get("reasoning_content") # Build response lines = [f"**Venice Chat** ({resolved_model})", ""] if reasoning: lines.append(f"**Reasoning:**\n{reasoning}") lines.append("") lines.append(f"**Response:**\n{content}") # Include web citations if present venice_params = result.get("venice_parameters", {}) citations = venice_params.get("web_search_citations", []) if citations: lines.append("") lines.append("**Sources:**") for cite in citations[:5]: title = cite.get("title", "Link") url = cite.get("url", "") lines.append(f"- {title}: {url}") # Usage stats usage = result.get("usage", {}) if usage: lines.append("") lines.append( f"_Tokens: {usage.get('prompt_tokens', 0)} in / {usage.get('completion_tokens', 0)} out_" ) return VeniceChat.truncate("\n".join(lines), self.valves.MAX_RESPONSE_SIZE) except httpx.HTTPStatusError as e: error_msg = VeniceChat.format_error(e, f"chat request to {resolved_model}") if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) return f"Error: {error_msg}" except httpx.TimeoutException: if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) return f"Error: Request timed out for {resolved_model}" except Exception as e: error_msg = VeniceChat.format_error(e, "chat request") if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) return f"Error: {error_msg}" async def chat_conversation( self, messages_json: str, model: Optional[str] = None, system_prompt: Optional[str] = None, temperature: float = 0.7, max_tokens: int = 2048, __user__: dict = None, __model__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ Send a multi-turn conversation to Venice.ai. :param messages_json: JSON array: [{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}, ...] :param model: Model ID, "self" for calling model, or empty for auto-select. Use venice_info/list_models("text") to discover. :param system_prompt: Optional system prompt :param temperature: Sampling temperature 0-2 (default 0.7) :param max_tokens: Maximum response tokens (default 2048) :return: Model response """ api_key = VeniceChat.get_api_key(self.valves, self.user_valves, __user__) if not api_key: return "Error: Venice API key not configured." if not messages_json: return "Error: messages_json is required" try: conversation = json.loads(messages_json) if not isinstance(conversation, list): return "Error: messages_json must be a JSON array" except json.JSONDecodeError as e: return f"Error: Invalid JSON - {e}" # Resolve and validate model resolved_model, error = await self._resolve_model( model, "text", False, __model__, __user__ ) if error: return f"Error: {error}" if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Querying {resolved_model}...", "done": False, }, } ) # Build messages array messages = [] if system_prompt: messages.append({"role": "system", "content": system_prompt}) messages.extend(conversation) payload = { "model": resolved_model, "messages": messages, "temperature": temperature, "max_tokens": max_tokens, "stream": False, "venice_parameters": { "enable_web_search": "off", "include_venice_system_prompt": self.valves.INCLUDE_VENICE_SYSTEM_PROMPT, }, } try: async with httpx.AsyncClient( timeout=float(self.valves.CHAT_TIMEOUT) ) as client: response = await client.post( "https://api.venice.ai/api/v1/chat/completions", headers={ "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", }, json=payload, ) response.raise_for_status() result = response.json() if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) choices = result.get("choices", []) if not choices: return f"Error: No response from model {resolved_model}" assistant_message = choices[0].get("message", {}) content = assistant_message.get("content", "") reasoning = assistant_message.get("reasoning_content") lines = [f"**Venice Chat Conversation** ({resolved_model})", ""] if reasoning: lines.append(f"**Reasoning:**\n{reasoning}") lines.append("") lines.append(f"**Response:**\n{content}") return VeniceChat.truncate("\n".join(lines), self.valves.MAX_RESPONSE_SIZE) except httpx.HTTPStatusError as e: error_msg = VeniceChat.format_error(e, f"conversation with {resolved_model}") if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) return f"Error: {error_msg}" except Exception as e: error_msg = VeniceChat.format_error(e, "conversation request") if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) return f"Error: {error_msg}" async def ask_reasoning_model( self, question: str, reasoning_effort: str = "medium", model: Optional[str] = None, __user__: dict = None, __model__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ Ask a reasoning/thinking model for complex problem solving. :param question: The question or problem to solve :param reasoning_effort: Effort level: low, medium, high (default: medium) :param model: Model with reasoning capability, "self", or empty for auto-select. Use venice_info/list_models("text") to find models with [reasoning]. :return: Response with reasoning process """ api_key = VeniceChat.get_api_key(self.valves, self.user_valves, __user__) if not api_key: return "Error: Venice API key not configured." if not question or not question.strip(): return "Error: Question is required" if reasoning_effort not in ["low", "medium", "high"]: return "Error: reasoning_effort must be low, medium, or high" # Resolve and validate model (require reasoning capability) resolved_model, error = await self._resolve_model( model, "text", require_reasoning=True, __model__=__model__, __user__=__user__, ) if error: return f"Error: {error}" if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Thinking with {resolved_model} ({reasoning_effort})...", "done": False, }, } ) payload = { "model": resolved_model, "messages": [{"role": "user", "content": question}], "reasoning_effort": reasoning_effort, "stream": False, "venice_parameters": { "enable_web_search": "off", "include_venice_system_prompt": False, }, } try: async with httpx.AsyncClient( timeout=float(self.valves.REASONING_TIMEOUT) ) as client: response = await client.post( "https://api.venice.ai/api/v1/chat/completions", headers={ "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", }, json=payload, ) response.raise_for_status() result = response.json() if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) choices = result.get("choices", []) if not choices: return f"Error: No response from model {resolved_model}" assistant_message = choices[0].get("message", {}) content = assistant_message.get("content", "") reasoning = assistant_message.get("reasoning_content", "") lines = [ f"**Venice Reasoning** ({resolved_model})", f"**Effort:** {reasoning_effort}", "", ] if reasoning: lines.append(f"**Reasoning Process:**\n{reasoning}") lines.append("") lines.append(f"**Answer:**\n{content}") # Usage stats usage = result.get("usage", {}) if usage: lines.append("") total = usage.get("total_tokens", 0) reasoning_tokens = usage.get("reasoning_tokens", 0) lines.append( f"_Tokens: {total:,} total ({reasoning_tokens:,} reasoning)_" ) return VeniceChat.truncate("\n".join(lines), self.valves.MAX_RESPONSE_SIZE) except httpx.HTTPStatusError as e: error_msg = VeniceChat.format_error(e, f"reasoning with {resolved_model}") if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) return f"Error: {error_msg}" except httpx.TimeoutException: if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) return f"Error: Request timed out for {resolved_model} (reasoning can take a while)" except Exception as e: error_msg = VeniceChat.format_error(e, "reasoning request") if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) return f"Error: {error_msg}" async def web_search_query( self, query: str, model: Optional[str] = None, __user__: dict = None, __model__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ Search the web and get an AI-synthesized response. :param query: Search query or question about current events :param model: Model to use, "self", or empty for auto-select :return: Response with web sources """ api_key = VeniceChat.get_api_key(self.valves, self.user_valves, __user__) if not api_key: return "Error: Venice API key not configured." if not query or not query.strip(): return "Error: Query is required" # Resolve model - prefer models with web search capability resolved_model, error = await self._resolve_model( model, "text", False, __model__, __user__ ) if error: return f"Error: {error}" if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Searching with {resolved_model}...", "done": False, }, } ) payload = { "model": resolved_model, "messages": [{"role": "user", "content": query}], "temperature": 0.3, # Lower for factual responses "max_tokens": 2048, "stream": False, "venice_parameters": { "enable_web_search": "on", "include_venice_system_prompt": False, }, } try: async with httpx.AsyncClient( timeout=float(self.valves.CHAT_TIMEOUT) ) as client: response = await client.post( "https://api.venice.ai/api/v1/chat/completions", headers={ "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", }, json=payload, ) response.raise_for_status() result = response.json() if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) choices = result.get("choices", []) if not choices: return f"Error: No response from model {resolved_model}" assistant_message = choices[0].get("message", {}) content = assistant_message.get("content", "") lines = [ f"**Venice Web Search** ({resolved_model})", "", f"**Response:**\n{content}", ] # Include citations venice_params = result.get("venice_parameters", {}) citations = venice_params.get("web_search_citations", []) if citations: lines.append("") lines.append(f"**Sources** ({len(citations)}):") for cite in citations[:10]: title = cite.get("title", "Link") url = cite.get("url", "") lines.append(f"- {title}") lines.append(f" {url}") return VeniceChat.truncate("\n".join(lines), self.valves.MAX_RESPONSE_SIZE) except httpx.HTTPStatusError as e: error_msg = VeniceChat.format_error(e, f"web search with {resolved_model}") if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) return f"Error: {error_msg}" except Exception as e: error_msg = VeniceChat.format_error(e, "web search request") if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) return f"Error: {error_msg}"