""" title: Venice.ai Chat author: Jeff Smith version: 1.2.0 license: MIT required_open_webui_version: 0.6.0 requirements: httpx, pydantic description: | Chat completions using Venice.ai API. Enables LLM-to-LLM conversations, web search queries, and reasoning model access. Each user can configure their own API key. Model selection: - Empty/omit: Auto-selects via Venice traits API - "self": Uses the calling model's base (if it's a Venice model) - Explicit ID: Validates model exists before calling Use venice_info/list_models("text") to discover available models. """ from typing import Optional, Callable, Any from pydantic import BaseModel, Field import httpx import json import time class Tools: """ Venice.ai chat completions tool. Query Venice.ai text models for responses, including reasoning models and web search enabled queries. """ class Valves(BaseModel): """Admin configuration.""" VENICE_API_KEY: str = Field( default="", description="Venice.ai API key (admin default)" ) DEFAULT_MODEL: str = Field( default="", description="Default chat model (empty = auto-select via traits)", ) DEFAULT_REASONING_MODEL: str = Field( default="", description="Default reasoning model (empty = auto-select via traits)", ) ENABLE_WEB_SEARCH: bool = Field( default=False, description="Enable web search by default" ) INCLUDE_VENICE_SYSTEM_PROMPT: bool = Field( default=False, description="Include Venice system prompt" ) CHAT_TIMEOUT: int = Field( default=120, description="Timeout for chat requests in seconds" ) REASONING_TIMEOUT: int = Field( default=300, description="Timeout for reasoning requests (longer for complex thinking)", ) MAX_RESPONSE_SIZE: int = Field( default=16384, description="Maximum response size in characters" ) MODEL_CACHE_TTL: int = Field( default=300, description="How long to cache model list (seconds)" ) class UserValves(BaseModel): """Per-user configuration.""" VENICE_API_KEY: str = Field( default="", description="Your Venice.ai API key (overrides admin default)" ) def __init__(self): self.valves = self.Valves() self.user_valves = self.UserValves() self.citation = False # Simple in-memory cache self._cache: dict = {} self._cache_times: dict = {} def _get_api_key(self) -> str: """Get Venice API key with UserValves priority.""" return self.user_valves.VENICE_API_KEY or self.valves.VENICE_API_KEY def _truncate(self, text: str) -> str: """Truncate response to max size.""" max_size = self.valves.MAX_RESPONSE_SIZE if max_size and len(text) > max_size: return ( text[:max_size] + f"\n\n[...truncated, {len(text) - max_size} chars omitted]" ) return text def _is_cache_valid(self, key: str) -> bool: """Check if cached data is still valid.""" if key not in self._cache_times: return False return (time.time() - self._cache_times[key]) < self.valves.MODEL_CACHE_TTL async def _get_traits(self) -> dict: """Fetch model traits from Venice (cached).""" cache_key = "traits" if self._is_cache_valid(cache_key): return self._cache.get(cache_key, {}) api_key = self._get_api_key() if not api_key: return {} try: async with httpx.AsyncClient(timeout=30.0) as client: response = await client.get( "https://api.venice.ai/api/v1/models/traits", headers={"Authorization": f"Bearer {api_key}"}, ) if response.status_code == 200: traits = response.json().get("data", {}) self._cache[cache_key] = traits self._cache_times[cache_key] = time.time() return traits except Exception: pass return {} async def _get_available_models(self, model_type: str = "text") -> list[dict]: """Fetch available models (cached).""" cache_key = f"models_{model_type}" if self._is_cache_valid(cache_key): return self._cache.get(cache_key, []) api_key = self._get_api_key() if not api_key: return [] try: async with httpx.AsyncClient(timeout=30.0) as client: response = await client.get( f"https://api.venice.ai/api/v1/models?type={model_type}", headers={"Authorization": f"Bearer {api_key}"}, ) if response.status_code == 200: models = response.json().get("data", []) self._cache[cache_key] = models self._cache_times[cache_key] = time.time() return models except Exception: pass return [] async def _resolve_model( self, model: Optional[str], model_type: str = "text", require_reasoning: bool = False, __model__: dict = None, ) -> tuple[str, Optional[str]]: """ Resolve model specification to actual model ID with validation. Handles: - Empty/None: Auto-select via traits - "self": Use calling model's base_model_id - Explicit ID: Validate exists and is online Returns (model_id, error_message). """ original_input = model # Handle "self" - use the calling model's base if model and model.lower() == "self": if __model__: base_model = __model__.get("info", {}).get("base_model_id", "") if base_model: model = base_model else: model = None # Fall through to auto-select # Handle explicit Valve defaults if not model: if require_reasoning and self.valves.DEFAULT_REASONING_MODEL: model = self.valves.DEFAULT_REASONING_MODEL elif not require_reasoning and self.valves.DEFAULT_MODEL: model = self.valves.DEFAULT_MODEL # If still no model, try traits API if not model: traits = await self._get_traits() if require_reasoning: # Try reasoning-specific traits for trait_name in ["default_reasoning", "reasoning", "thinking"]: if trait_name in traits: model = traits[trait_name] break if not model: # General default for trait_name in ["default", "default_text", "fastest"]: if trait_name in traits: model = traits[trait_name] break # If still no model, pick first available with required capability if not model: models = await self._get_available_models(model_type) for m in models: spec = m.get("model_spec", {}) if spec.get("offline"): continue if require_reasoning: caps = spec.get("capabilities", {}) if not caps.get("supportsReasoning"): continue model = m.get("id") break if not model: if require_reasoning: return ( "", "No reasoning model available. Use venice_info/list_models('text') to find models with [reasoning] capability.", ) return ( "", "No model specified and could not auto-select. Use venice_info/list_models('text') to discover available models.", ) # Validate model exists and is online models = await self._get_available_models(model_type) model_map = {m.get("id"): m for m in models} if model not in model_map: # Not found - suggest similar models suggestions = [ mid for mid in model_map.keys() if model.lower() in mid.lower() ][:3] suggestion_text = ( f" Similar: {', '.join(suggestions)}" if suggestions else "" ) return ( "", f"Model '{model}' not found.{suggestion_text} Use venice_info/list_models('text') for available models.", ) # Check if offline model_data = model_map[model] if model_data.get("model_spec", {}).get("offline"): # Find alternative alternatives = [] for m in models: spec = m.get("model_spec", {}) if spec.get("offline"): continue if require_reasoning and not spec.get("capabilities", {}).get( "supportsReasoning" ): continue alternatives.append(m.get("id")) if len(alternatives) >= 3: break alt_text = f" Try: {', '.join(alternatives)}" if alternatives else "" return "", f"Model '{model}' is currently offline.{alt_text}" # Check reasoning capability if required if require_reasoning: caps = model_data.get("model_spec", {}).get("capabilities", {}) if not caps.get("supportsReasoning"): # Find models with reasoning reasoning_models = [] for m in models: spec = m.get("model_spec", {}) if spec.get("offline"): continue if spec.get("capabilities", {}).get("supportsReasoning"): reasoning_models.append(m.get("id")) if len(reasoning_models) >= 3: break alt_text = ( f" Models with reasoning: {', '.join(reasoning_models)}" if reasoning_models else "" ) return "", f"Model '{model}' does not support reasoning.{alt_text}" return model, None # ==================== Chat Methods ==================== async def chat( self, message: str, model: Optional[str] = None, system_prompt: Optional[str] = None, temperature: float = 0.7, max_tokens: int = 2048, web_search: Optional[bool] = None, __user__: dict = None, __model__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ Send a message to a Venice.ai chat model. :param message: The message to send :param model: Model ID, "self" for calling model, or empty for auto-select. Use venice_info/list_models("text") to discover. :param system_prompt: Optional system prompt for model behavior :param temperature: Sampling temperature 0-2 (default 0.7) :param max_tokens: Maximum response tokens (default 2048) :param web_search: Enable web search for current information :return: Model response """ api_key = self._get_api_key() if not api_key: return "Venice Chat\nStatus: 0\nError: API key not configured. Set in UserValves or ask admin." if not message or not message.strip(): return "Venice Chat\nStatus: 0\nError: Message required" # Resolve and validate model resolved_model, error = await self._resolve_model( model, "text", False, __model__ ) if error: return f"Venice Chat\nStatus: 0\nError: {error}" enable_web_search = ( web_search if web_search is not None else self.valves.ENABLE_WEB_SEARCH ) if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Querying {resolved_model}...", "done": False, }, } ) # Build messages array messages = [] if system_prompt: messages.append({"role": "system", "content": system_prompt}) messages.append({"role": "user", "content": message}) payload = { "model": resolved_model, "messages": messages, "temperature": temperature, "max_tokens": max_tokens, "stream": False, "venice_parameters": { "enable_web_search": "on" if enable_web_search else "off", "include_venice_system_prompt": self.valves.INCLUDE_VENICE_SYSTEM_PROMPT, }, } try: async with httpx.AsyncClient( timeout=float(self.valves.CHAT_TIMEOUT) ) as client: response = await client.post( "https://api.venice.ai/api/v1/chat/completions", headers={ "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", }, json=payload, ) response.raise_for_status() result = response.json() if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) choices = result.get("choices", []) if not choices: return f"Venice Chat ({resolved_model})\nStatus: 200\nError: No response from model" assistant_message = choices[0].get("message", {}) content = assistant_message.get("content", "") reasoning = assistant_message.get("reasoning_content") # Build response lines = [f"Venice Chat ({resolved_model})", "Status: 200", ""] if reasoning: lines.append(f"Reasoning:\n{reasoning}") lines.append("") lines.append(f"Response:\n{content}") # Include web citations if present venice_params = result.get("venice_parameters", {}) citations = venice_params.get("web_search_citations", []) if citations: lines.append("") lines.append("Sources:") for cite in citations[:5]: title = cite.get("title", "Link") url = cite.get("url", "") lines.append(f" - {title}: {url}") # Usage stats usage = result.get("usage", {}) if usage: lines.append("") lines.append( f"Tokens: {usage.get('prompt_tokens', 0)} in / {usage.get('completion_tokens', 0)} out" ) return self._truncate("\n".join(lines)) except httpx.HTTPStatusError as e: if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) return f"Venice Chat ({resolved_model})\nStatus: {e.response.status_code}\nError: {e.response.text[:200]}" except httpx.TimeoutException: if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) return ( f"Venice Chat ({resolved_model})\nStatus: 408\nError: Request timed out" ) except Exception as e: if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) return f"Venice Chat ({resolved_model})\nStatus: 0\nError: {type(e).__name__}: {e}" async def chat_conversation( self, messages_json: str, model: Optional[str] = None, system_prompt: Optional[str] = None, temperature: float = 0.7, max_tokens: int = 2048, __user__: dict = None, __model__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ Send a multi-turn conversation to Venice.ai. :param messages_json: JSON array: [{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}, ...] :param model: Model ID, "self" for calling model, or empty for auto-select. Use venice_info/list_models("text") to discover. :param system_prompt: Optional system prompt :param temperature: Sampling temperature 0-2 (default 0.7) :param max_tokens: Maximum response tokens (default 2048) :return: Model response """ api_key = self._get_api_key() if not api_key: return "Venice Chat Conversation\nStatus: 0\nError: API key not configured." if not messages_json: return "Venice Chat Conversation\nStatus: 0\nError: messages_json required" try: conversation = json.loads(messages_json) if not isinstance(conversation, list): return "Venice Chat Conversation\nStatus: 0\nError: messages_json must be a JSON array" except json.JSONDecodeError as e: return f"Venice Chat Conversation\nStatus: 0\nError: Invalid JSON - {e}" # Resolve and validate model resolved_model, error = await self._resolve_model( model, "text", False, __model__ ) if error: return f"Venice Chat Conversation\nStatus: 0\nError: {error}" if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Querying {resolved_model}...", "done": False, }, } ) # Build messages array messages = [] if system_prompt: messages.append({"role": "system", "content": system_prompt}) messages.extend(conversation) payload = { "model": resolved_model, "messages": messages, "temperature": temperature, "max_tokens": max_tokens, "stream": False, "venice_parameters": { "enable_web_search": "off", "include_venice_system_prompt": self.valves.INCLUDE_VENICE_SYSTEM_PROMPT, }, } try: async with httpx.AsyncClient( timeout=float(self.valves.CHAT_TIMEOUT) ) as client: response = await client.post( "https://api.venice.ai/api/v1/chat/completions", headers={ "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", }, json=payload, ) response.raise_for_status() result = response.json() if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) choices = result.get("choices", []) if not choices: return f"Venice Chat Conversation ({resolved_model})\nStatus: 200\nError: No response" assistant_message = choices[0].get("message", {}) content = assistant_message.get("content", "") reasoning = assistant_message.get("reasoning_content") lines = [f"Venice Chat Conversation ({resolved_model})", "Status: 200", ""] if reasoning: lines.append(f"Reasoning:\n{reasoning}") lines.append("") lines.append(f"Response:\n{content}") return self._truncate("\n".join(lines)) except httpx.HTTPStatusError as e: if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) return f"Venice Chat Conversation ({resolved_model})\nStatus: {e.response.status_code}\nError: {e.response.text[:200]}" except Exception as e: if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) return f"Venice Chat Conversation ({resolved_model})\nStatus: 0\nError: {type(e).__name__}: {e}" async def ask_reasoning_model( self, question: str, reasoning_effort: str = "medium", model: Optional[str] = None, __user__: dict = None, __model__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ Ask a reasoning/thinking model for complex problem solving. :param question: The question or problem to solve :param reasoning_effort: Effort level: low, medium, high (default: medium) :param model: Model with reasoning capability, "self", or empty for auto-select. Use venice_info/list_models("text") to find models with [reasoning]. :return: Response with reasoning process """ api_key = self._get_api_key() if not api_key: return "Venice Reasoning\nStatus: 0\nError: API key not configured." if not question or not question.strip(): return "Venice Reasoning\nStatus: 0\nError: Question required" if reasoning_effort not in ["low", "medium", "high"]: return "Venice Reasoning\nStatus: 0\nError: reasoning_effort must be low, medium, or high" # Resolve and validate model (require reasoning capability) resolved_model, error = await self._resolve_model( model, "text", require_reasoning=True, __model__=__model__ ) if error: return f"Venice Reasoning\nStatus: 0\nError: {error}" if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Thinking with {resolved_model} ({reasoning_effort})...", "done": False, }, } ) payload = { "model": resolved_model, "messages": [{"role": "user", "content": question}], "reasoning_effort": reasoning_effort, "stream": False, "venice_parameters": { "enable_web_search": "off", "include_venice_system_prompt": False, }, } try: async with httpx.AsyncClient( timeout=float(self.valves.REASONING_TIMEOUT) ) as client: response = await client.post( "https://api.venice.ai/api/v1/chat/completions", headers={ "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", }, json=payload, ) response.raise_for_status() result = response.json() if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) choices = result.get("choices", []) if not choices: return f"Venice Reasoning ({resolved_model})\nStatus: 200\nError: No response" assistant_message = choices[0].get("message", {}) content = assistant_message.get("content", "") reasoning = assistant_message.get("reasoning_content", "") lines = [ f"Venice Reasoning ({resolved_model})", "Status: 200", f"Effort: {reasoning_effort}", "", ] if reasoning: lines.append(f"Reasoning Process:\n{reasoning}") lines.append("") lines.append(f"Answer:\n{content}") # Usage stats usage = result.get("usage", {}) if usage: lines.append("") total = usage.get("total_tokens", 0) reasoning_tokens = usage.get("reasoning_tokens", 0) lines.append( f"Tokens: {total:,} total ({reasoning_tokens:,} reasoning)" ) return self._truncate("\n".join(lines)) except httpx.HTTPStatusError as e: if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) return f"Venice Reasoning ({resolved_model})\nStatus: {e.response.status_code}\nError: {e.response.text[:200]}" except httpx.TimeoutException: if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) return f"Venice Reasoning ({resolved_model})\nStatus: 408\nError: Request timed out (reasoning can take a while)" except Exception as e: if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) return f"Venice Reasoning ({resolved_model})\nStatus: 0\nError: {type(e).__name__}: {e}" async def web_search_query( self, query: str, model: Optional[str] = None, __user__: dict = None, __model__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ Search the web and get an AI-synthesized response. :param query: Search query or question about current events :param model: Model to use, "self", or empty for auto-select :return: Response with web sources """ api_key = self._get_api_key() if not api_key: return "Venice Web Search\nStatus: 0\nError: API key not configured." if not query or not query.strip(): return "Venice Web Search\nStatus: 0\nError: Query required" # Resolve model - prefer models with web search capability resolved_model, error = await self._resolve_model( model, "text", False, __model__ ) if error: return f"Venice Web Search\nStatus: 0\nError: {error}" if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Searching with {resolved_model}...", "done": False, }, } ) payload = { "model": resolved_model, "messages": [{"role": "user", "content": query}], "temperature": 0.3, # Lower for factual responses "max_tokens": 2048, "stream": False, "venice_parameters": { "enable_web_search": "on", "include_venice_system_prompt": False, }, } try: async with httpx.AsyncClient( timeout=float(self.valves.CHAT_TIMEOUT) ) as client: response = await client.post( "https://api.venice.ai/api/v1/chat/completions", headers={ "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", }, json=payload, ) response.raise_for_status() result = response.json() if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) choices = result.get("choices", []) if not choices: return f"Venice Web Search ({resolved_model})\nStatus: 200\nError: No response" assistant_message = choices[0].get("message", {}) content = assistant_message.get("content", "") lines = [ f"Venice Web Search ({resolved_model})", "Status: 200", "", f"Response:\n{content}", ] # Include citations venice_params = result.get("venice_parameters", {}) citations = venice_params.get("web_search_citations", []) if citations: lines.append("") lines.append(f"Sources ({len(citations)}):") for cite in citations[:10]: title = cite.get("title", "Link") url = cite.get("url", "") lines.append(f" - {title}") lines.append(f" {url}") return self._truncate("\n".join(lines)) except httpx.HTTPStatusError as e: if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) return f"Venice Web Search ({resolved_model})\nStatus: {e.response.status_code}\nError: {e.response.text[:200]}" except Exception as e: if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) return f"Venice Web Search ({resolved_model})\nStatus: 0\nError: {type(e).__name__}: {e}"