diff --git a/venice/chat.py b/venice/chat.py new file mode 100644 index 0000000..583d104 --- /dev/null +++ b/venice/chat.py @@ -0,0 +1,779 @@ +""" +title: Venice.ai Chat +author: Jeff Smith +version: 1.2.0 +license: MIT +required_open_webui_version: 0.6.0 +requirements: httpx, pydantic +description: | + Chat completions using Venice.ai API. + + Enables LLM-to-LLM conversations, web search queries, + and reasoning model access. Each user can configure their own API key. + + Model selection: + - Empty/omit: Auto-selects via Venice traits API + - "self": Uses the calling model's base (if it's a Venice model) + - Explicit ID: Validates model exists before calling + + Use venice_info/list_models("text") to discover available models. +""" + +from typing import Optional, Callable, Any +from pydantic import BaseModel, Field +import httpx +import json +import time + + +class Tools: + """ + Venice.ai chat completions tool. + + Query Venice.ai text models for responses, including reasoning models + and web search enabled queries. + """ + + class Valves(BaseModel): + """Admin configuration.""" + + VENICE_API_KEY: str = Field( + default="", description="Venice.ai API key (admin default)" + ) + DEFAULT_MODEL: str = Field( + default="", + description="Default chat model (empty = auto-select via traits)", + ) + DEFAULT_REASONING_MODEL: str = Field( + default="", + description="Default reasoning model (empty = auto-select via traits)", + ) + ENABLE_WEB_SEARCH: bool = Field( + default=False, description="Enable web search by default" + ) + INCLUDE_VENICE_SYSTEM_PROMPT: bool = Field( + default=False, description="Include Venice system prompt" + ) + CHAT_TIMEOUT: int = Field( + default=120, description="Timeout for chat requests in seconds" + ) + REASONING_TIMEOUT: int = Field( + default=300, + description="Timeout for reasoning requests (longer for complex thinking)", + ) + MAX_RESPONSE_SIZE: int = Field( + default=16384, description="Maximum response size in characters" + ) + MODEL_CACHE_TTL: int = Field( + default=300, description="How long to cache model list (seconds)" + ) + + class UserValves(BaseModel): + """Per-user configuration.""" + + VENICE_API_KEY: str = Field( + default="", description="Your Venice.ai API key (overrides admin default)" + ) + + def __init__(self): + self.valves = self.Valves() + self.user_valves = self.UserValves() + self.citation = False + # Simple in-memory cache + self._cache: dict = {} + self._cache_times: dict = {} + + def _get_api_key(self) -> str: + """Get Venice API key with UserValves priority.""" + return self.user_valves.VENICE_API_KEY or self.valves.VENICE_API_KEY + + def _truncate(self, text: str) -> str: + """Truncate response to max size.""" + max_size = self.valves.MAX_RESPONSE_SIZE + if max_size and len(text) > max_size: + return ( + text[:max_size] + + f"\n\n[...truncated, {len(text) - max_size} chars omitted]" + ) + return text + + def _is_cache_valid(self, key: str) -> bool: + """Check if cached data is still valid.""" + if key not in self._cache_times: + return False + return (time.time() - self._cache_times[key]) < self.valves.MODEL_CACHE_TTL + + async def _get_traits(self) -> dict: + """Fetch model traits from Venice (cached).""" + cache_key = "traits" + if self._is_cache_valid(cache_key): + return self._cache.get(cache_key, {}) + + api_key = self._get_api_key() + if not api_key: + return {} + + try: + async with httpx.AsyncClient(timeout=30.0) as client: + response = await client.get( + "https://api.venice.ai/api/v1/models/traits", + headers={"Authorization": f"Bearer {api_key}"}, + ) + if response.status_code == 200: + traits = response.json().get("data", {}) + self._cache[cache_key] = traits + self._cache_times[cache_key] = time.time() + return traits + except Exception: + pass + return {} + + async def _get_available_models(self, model_type: str = "text") -> list[dict]: + """Fetch available models (cached).""" + cache_key = f"models_{model_type}" + if self._is_cache_valid(cache_key): + return self._cache.get(cache_key, []) + + api_key = self._get_api_key() + if not api_key: + return [] + + try: + async with httpx.AsyncClient(timeout=30.0) as client: + response = await client.get( + f"https://api.venice.ai/api/v1/models?type={model_type}", + headers={"Authorization": f"Bearer {api_key}"}, + ) + if response.status_code == 200: + models = response.json().get("data", []) + self._cache[cache_key] = models + self._cache_times[cache_key] = time.time() + return models + except Exception: + pass + return [] + + async def _resolve_model( + self, + model: Optional[str], + model_type: str = "text", + require_reasoning: bool = False, + __model__: dict = None, + ) -> tuple[str, Optional[str]]: + """ + Resolve model specification to actual model ID with validation. + + Handles: + - Empty/None: Auto-select via traits + - "self": Use calling model's base_model_id + - Explicit ID: Validate exists and is online + + Returns (model_id, error_message). + """ + original_input = model + + # Handle "self" - use the calling model's base + if model and model.lower() == "self": + if __model__: + base_model = __model__.get("info", {}).get("base_model_id", "") + if base_model: + model = base_model + else: + model = None # Fall through to auto-select + + # Handle explicit Valve defaults + if not model: + if require_reasoning and self.valves.DEFAULT_REASONING_MODEL: + model = self.valves.DEFAULT_REASONING_MODEL + elif not require_reasoning and self.valves.DEFAULT_MODEL: + model = self.valves.DEFAULT_MODEL + + # If still no model, try traits API + if not model: + traits = await self._get_traits() + + if require_reasoning: + # Try reasoning-specific traits + for trait_name in ["default_reasoning", "reasoning", "thinking"]: + if trait_name in traits: + model = traits[trait_name] + break + + if not model: + # General default + for trait_name in ["default", "default_text", "fastest"]: + if trait_name in traits: + model = traits[trait_name] + break + + # If still no model, pick first available with required capability + if not model: + models = await self._get_available_models(model_type) + for m in models: + spec = m.get("model_spec", {}) + if spec.get("offline"): + continue + if require_reasoning: + caps = spec.get("capabilities", {}) + if not caps.get("supportsReasoning"): + continue + model = m.get("id") + break + + if not model: + if require_reasoning: + return ( + "", + "No reasoning model available. Use venice_info/list_models('text') to find models with [reasoning] capability.", + ) + return ( + "", + "No model specified and could not auto-select. Use venice_info/list_models('text') to discover available models.", + ) + + # Validate model exists and is online + models = await self._get_available_models(model_type) + model_map = {m.get("id"): m for m in models} + + if model not in model_map: + # Not found - suggest similar models + suggestions = [ + mid for mid in model_map.keys() if model.lower() in mid.lower() + ][:3] + suggestion_text = ( + f" Similar: {', '.join(suggestions)}" if suggestions else "" + ) + return ( + "", + f"Model '{model}' not found.{suggestion_text} Use venice_info/list_models('text') for available models.", + ) + + # Check if offline + model_data = model_map[model] + if model_data.get("model_spec", {}).get("offline"): + # Find alternative + alternatives = [] + for m in models: + spec = m.get("model_spec", {}) + if spec.get("offline"): + continue + if require_reasoning and not spec.get("capabilities", {}).get( + "supportsReasoning" + ): + continue + alternatives.append(m.get("id")) + if len(alternatives) >= 3: + break + + alt_text = f" Try: {', '.join(alternatives)}" if alternatives else "" + return "", f"Model '{model}' is currently offline.{alt_text}" + + # Check reasoning capability if required + if require_reasoning: + caps = model_data.get("model_spec", {}).get("capabilities", {}) + if not caps.get("supportsReasoning"): + # Find models with reasoning + reasoning_models = [] + for m in models: + spec = m.get("model_spec", {}) + if spec.get("offline"): + continue + if spec.get("capabilities", {}).get("supportsReasoning"): + reasoning_models.append(m.get("id")) + if len(reasoning_models) >= 3: + break + + alt_text = ( + f" Models with reasoning: {', '.join(reasoning_models)}" + if reasoning_models + else "" + ) + return "", f"Model '{model}' does not support reasoning.{alt_text}" + + return model, None + + # ==================== Chat Methods ==================== + + async def chat( + self, + message: str, + model: Optional[str] = None, + system_prompt: Optional[str] = None, + temperature: float = 0.7, + max_tokens: int = 2048, + web_search: Optional[bool] = None, + __user__: dict = None, + __model__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Send a message to a Venice.ai chat model. + + :param message: The message to send + :param model: Model ID, "self" for calling model, or empty for auto-select. Use venice_info/list_models("text") to discover. + :param system_prompt: Optional system prompt for model behavior + :param temperature: Sampling temperature 0-2 (default 0.7) + :param max_tokens: Maximum response tokens (default 2048) + :param web_search: Enable web search for current information + :return: Model response + """ + api_key = self._get_api_key() + if not api_key: + return "Venice Chat\nStatus: 0\nError: API key not configured. Set in UserValves or ask admin." + + if not message or not message.strip(): + return "Venice Chat\nStatus: 0\nError: Message required" + + # Resolve and validate model + resolved_model, error = await self._resolve_model( + model, "text", False, __model__ + ) + if error: + return f"Venice Chat\nStatus: 0\nError: {error}" + + enable_web_search = ( + web_search if web_search is not None else self.valves.ENABLE_WEB_SEARCH + ) + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Querying {resolved_model}...", + "done": False, + }, + } + ) + + # Build messages array + messages = [] + if system_prompt: + messages.append({"role": "system", "content": system_prompt}) + messages.append({"role": "user", "content": message}) + + payload = { + "model": resolved_model, + "messages": messages, + "temperature": temperature, + "max_tokens": max_tokens, + "stream": False, + "venice_parameters": { + "enable_web_search": "on" if enable_web_search else "off", + "include_venice_system_prompt": self.valves.INCLUDE_VENICE_SYSTEM_PROMPT, + }, + } + + try: + async with httpx.AsyncClient( + timeout=float(self.valves.CHAT_TIMEOUT) + ) as client: + response = await client.post( + "https://api.venice.ai/api/v1/chat/completions", + headers={ + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json", + }, + json=payload, + ) + response.raise_for_status() + result = response.json() + + if __event_emitter__: + await __event_emitter__({"type": "status", "data": {"done": True}}) + + choices = result.get("choices", []) + if not choices: + return f"Venice Chat ({resolved_model})\nStatus: 200\nError: No response from model" + + assistant_message = choices[0].get("message", {}) + content = assistant_message.get("content", "") + reasoning = assistant_message.get("reasoning_content") + + # Build response + lines = [f"Venice Chat ({resolved_model})", "Status: 200", ""] + + if reasoning: + lines.append(f"Reasoning:\n{reasoning}") + lines.append("") + + lines.append(f"Response:\n{content}") + + # Include web citations if present + venice_params = result.get("venice_parameters", {}) + citations = venice_params.get("web_search_citations", []) + if citations: + lines.append("") + lines.append("Sources:") + for cite in citations[:5]: + title = cite.get("title", "Link") + url = cite.get("url", "") + lines.append(f" - {title}: {url}") + + # Usage stats + usage = result.get("usage", {}) + if usage: + lines.append("") + lines.append( + f"Tokens: {usage.get('prompt_tokens', 0)} in / {usage.get('completion_tokens', 0)} out" + ) + + return self._truncate("\n".join(lines)) + + except httpx.HTTPStatusError as e: + if __event_emitter__: + await __event_emitter__({"type": "status", "data": {"done": True}}) + return f"Venice Chat ({resolved_model})\nStatus: {e.response.status_code}\nError: {e.response.text[:200]}" + except httpx.TimeoutException: + if __event_emitter__: + await __event_emitter__({"type": "status", "data": {"done": True}}) + return ( + f"Venice Chat ({resolved_model})\nStatus: 408\nError: Request timed out" + ) + except Exception as e: + if __event_emitter__: + await __event_emitter__({"type": "status", "data": {"done": True}}) + return f"Venice Chat ({resolved_model})\nStatus: 0\nError: {type(e).__name__}: {e}" + + async def chat_conversation( + self, + messages_json: str, + model: Optional[str] = None, + system_prompt: Optional[str] = None, + temperature: float = 0.7, + max_tokens: int = 2048, + __user__: dict = None, + __model__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Send a multi-turn conversation to Venice.ai. + + :param messages_json: JSON array: [{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}, ...] + :param model: Model ID, "self" for calling model, or empty for auto-select. Use venice_info/list_models("text") to discover. + :param system_prompt: Optional system prompt + :param temperature: Sampling temperature 0-2 (default 0.7) + :param max_tokens: Maximum response tokens (default 2048) + :return: Model response + """ + api_key = self._get_api_key() + if not api_key: + return "Venice Chat Conversation\nStatus: 0\nError: API key not configured." + + if not messages_json: + return "Venice Chat Conversation\nStatus: 0\nError: messages_json required" + + try: + conversation = json.loads(messages_json) + if not isinstance(conversation, list): + return "Venice Chat Conversation\nStatus: 0\nError: messages_json must be a JSON array" + except json.JSONDecodeError as e: + return f"Venice Chat Conversation\nStatus: 0\nError: Invalid JSON - {e}" + + # Resolve and validate model + resolved_model, error = await self._resolve_model( + model, "text", False, __model__ + ) + if error: + return f"Venice Chat Conversation\nStatus: 0\nError: {error}" + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Querying {resolved_model}...", + "done": False, + }, + } + ) + + # Build messages array + messages = [] + if system_prompt: + messages.append({"role": "system", "content": system_prompt}) + messages.extend(conversation) + + payload = { + "model": resolved_model, + "messages": messages, + "temperature": temperature, + "max_tokens": max_tokens, + "stream": False, + "venice_parameters": { + "enable_web_search": "off", + "include_venice_system_prompt": self.valves.INCLUDE_VENICE_SYSTEM_PROMPT, + }, + } + + try: + async with httpx.AsyncClient( + timeout=float(self.valves.CHAT_TIMEOUT) + ) as client: + response = await client.post( + "https://api.venice.ai/api/v1/chat/completions", + headers={ + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json", + }, + json=payload, + ) + response.raise_for_status() + result = response.json() + + if __event_emitter__: + await __event_emitter__({"type": "status", "data": {"done": True}}) + + choices = result.get("choices", []) + if not choices: + return f"Venice Chat Conversation ({resolved_model})\nStatus: 200\nError: No response" + + assistant_message = choices[0].get("message", {}) + content = assistant_message.get("content", "") + reasoning = assistant_message.get("reasoning_content") + + lines = [f"Venice Chat Conversation ({resolved_model})", "Status: 200", ""] + + if reasoning: + lines.append(f"Reasoning:\n{reasoning}") + lines.append("") + + lines.append(f"Response:\n{content}") + + return self._truncate("\n".join(lines)) + + except httpx.HTTPStatusError as e: + if __event_emitter__: + await __event_emitter__({"type": "status", "data": {"done": True}}) + return f"Venice Chat Conversation ({resolved_model})\nStatus: {e.response.status_code}\nError: {e.response.text[:200]}" + except Exception as e: + if __event_emitter__: + await __event_emitter__({"type": "status", "data": {"done": True}}) + return f"Venice Chat Conversation ({resolved_model})\nStatus: 0\nError: {type(e).__name__}: {e}" + + async def ask_reasoning_model( + self, + question: str, + reasoning_effort: str = "medium", + model: Optional[str] = None, + __user__: dict = None, + __model__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Ask a reasoning/thinking model for complex problem solving. + + :param question: The question or problem to solve + :param reasoning_effort: Effort level: low, medium, high (default: medium) + :param model: Model with reasoning capability, "self", or empty for auto-select. Use venice_info/list_models("text") to find models with [reasoning]. + :return: Response with reasoning process + """ + api_key = self._get_api_key() + if not api_key: + return "Venice Reasoning\nStatus: 0\nError: API key not configured." + + if not question or not question.strip(): + return "Venice Reasoning\nStatus: 0\nError: Question required" + + if reasoning_effort not in ["low", "medium", "high"]: + return "Venice Reasoning\nStatus: 0\nError: reasoning_effort must be low, medium, or high" + + # Resolve and validate model (require reasoning capability) + resolved_model, error = await self._resolve_model( + model, "text", require_reasoning=True, __model__=__model__ + ) + if error: + return f"Venice Reasoning\nStatus: 0\nError: {error}" + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Thinking with {resolved_model} ({reasoning_effort})...", + "done": False, + }, + } + ) + + payload = { + "model": resolved_model, + "messages": [{"role": "user", "content": question}], + "reasoning_effort": reasoning_effort, + "stream": False, + "venice_parameters": { + "enable_web_search": "off", + "include_venice_system_prompt": False, + }, + } + + try: + async with httpx.AsyncClient( + timeout=float(self.valves.REASONING_TIMEOUT) + ) as client: + response = await client.post( + "https://api.venice.ai/api/v1/chat/completions", + headers={ + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json", + }, + json=payload, + ) + response.raise_for_status() + result = response.json() + + if __event_emitter__: + await __event_emitter__({"type": "status", "data": {"done": True}}) + + choices = result.get("choices", []) + if not choices: + return f"Venice Reasoning ({resolved_model})\nStatus: 200\nError: No response" + + assistant_message = choices[0].get("message", {}) + content = assistant_message.get("content", "") + reasoning = assistant_message.get("reasoning_content", "") + + lines = [ + f"Venice Reasoning ({resolved_model})", + "Status: 200", + f"Effort: {reasoning_effort}", + "", + ] + + if reasoning: + lines.append(f"Reasoning Process:\n{reasoning}") + lines.append("") + + lines.append(f"Answer:\n{content}") + + # Usage stats + usage = result.get("usage", {}) + if usage: + lines.append("") + total = usage.get("total_tokens", 0) + reasoning_tokens = usage.get("reasoning_tokens", 0) + lines.append( + f"Tokens: {total:,} total ({reasoning_tokens:,} reasoning)" + ) + + return self._truncate("\n".join(lines)) + + except httpx.HTTPStatusError as e: + if __event_emitter__: + await __event_emitter__({"type": "status", "data": {"done": True}}) + return f"Venice Reasoning ({resolved_model})\nStatus: {e.response.status_code}\nError: {e.response.text[:200]}" + except httpx.TimeoutException: + if __event_emitter__: + await __event_emitter__({"type": "status", "data": {"done": True}}) + return f"Venice Reasoning ({resolved_model})\nStatus: 408\nError: Request timed out (reasoning can take a while)" + except Exception as e: + if __event_emitter__: + await __event_emitter__({"type": "status", "data": {"done": True}}) + return f"Venice Reasoning ({resolved_model})\nStatus: 0\nError: {type(e).__name__}: {e}" + + async def web_search_query( + self, + query: str, + model: Optional[str] = None, + __user__: dict = None, + __model__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Search the web and get an AI-synthesized response. + + :param query: Search query or question about current events + :param model: Model to use, "self", or empty for auto-select + :return: Response with web sources + """ + api_key = self._get_api_key() + if not api_key: + return "Venice Web Search\nStatus: 0\nError: API key not configured." + + if not query or not query.strip(): + return "Venice Web Search\nStatus: 0\nError: Query required" + + # Resolve model - prefer models with web search capability + resolved_model, error = await self._resolve_model( + model, "text", False, __model__ + ) + if error: + return f"Venice Web Search\nStatus: 0\nError: {error}" + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Searching with {resolved_model}...", + "done": False, + }, + } + ) + + payload = { + "model": resolved_model, + "messages": [{"role": "user", "content": query}], + "temperature": 0.3, # Lower for factual responses + "max_tokens": 2048, + "stream": False, + "venice_parameters": { + "enable_web_search": "on", + "include_venice_system_prompt": False, + }, + } + + try: + async with httpx.AsyncClient( + timeout=float(self.valves.CHAT_TIMEOUT) + ) as client: + response = await client.post( + "https://api.venice.ai/api/v1/chat/completions", + headers={ + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json", + }, + json=payload, + ) + response.raise_for_status() + result = response.json() + + if __event_emitter__: + await __event_emitter__({"type": "status", "data": {"done": True}}) + + choices = result.get("choices", []) + if not choices: + return f"Venice Web Search ({resolved_model})\nStatus: 200\nError: No response" + + assistant_message = choices[0].get("message", {}) + content = assistant_message.get("content", "") + + lines = [ + f"Venice Web Search ({resolved_model})", + "Status: 200", + "", + f"Response:\n{content}", + ] + + # Include citations + venice_params = result.get("venice_parameters", {}) + citations = venice_params.get("web_search_citations", []) + if citations: + lines.append("") + lines.append(f"Sources ({len(citations)}):") + for cite in citations[:10]: + title = cite.get("title", "Link") + url = cite.get("url", "") + lines.append(f" - {title}") + lines.append(f" {url}") + + return self._truncate("\n".join(lines)) + + except httpx.HTTPStatusError as e: + if __event_emitter__: + await __event_emitter__({"type": "status", "data": {"done": True}}) + return f"Venice Web Search ({resolved_model})\nStatus: {e.response.status_code}\nError: {e.response.text[:200]}" + except Exception as e: + if __event_emitter__: + await __event_emitter__({"type": "status", "data": {"done": True}}) + return f"Venice Web Search ({resolved_model})\nStatus: 0\nError: {type(e).__name__}: {e}"