From fe29e792ab52adace5fc2036f485327b6603e3ed Mon Sep 17 00:00:00 2001 From: xcaliber Date: Thu, 15 Jan 2026 12:17:53 +0000 Subject: [PATCH] v1.7.0: Add VeniceImage namespace for helper functions --- venice/image.py | 918 ++++++++---------------------------------------- 1 file changed, 141 insertions(+), 777 deletions(-) diff --git a/venice/image.py b/venice/image.py index 5f1ebc7..2f48a21 100644 --- a/venice/image.py +++ b/venice/image.py @@ -1,7 +1,7 @@ """ title: Venice.ai Image Tools author: Jeff Smith + Claude -version: 1.6.0 +version: 1.7.0 license: MIT required_open_webui_version: 0.6.0 requirements: httpx, pydantic @@ -16,8 +16,17 @@ description: | Re-entrant safe: Multiple concurrent calls accumulate images correctly. + v1.7.0: Added VeniceImage namespace class for helper functions to avoid + method collisions with Open WebUI framework introspection. v1.6.0: Added UserValves for SAFE_MODE and HIDE_WATERMARK with proper admin/user override logic. +changelog: + 1.7.0: + - Added VeniceImage namespace class for helper functions + - Moved get_api_key, parse_venice_image_response to VeniceImage namespace + - Prevents Open WebUI framework introspection method name collisions + 1.6.0: + - Added UserValves for SAFE_MODE and HIDE_WATERMARK with proper override logic """ import asyncio @@ -30,137 +39,91 @@ from pydantic import BaseModel, Field import httpx -class Tools: +class VeniceImage: + """ + Namespaced helpers for Venice image operations. + + Using a separate class prevents Open WebUI framework introspection + from colliding with tool methods that have generic names like _get_api_key. """ - Venice.ai image tools: generate, upscale, and edit. - All operations upload results to Open WebUI Files API and attach - images to chat messages for inline display. - Re-entrant: Multiple calls within the same message context accumulate - all images rather than overwriting each other. - """ + @staticmethod + def get_api_key(valves, user_valves, __user__: dict = None) -> str: + """Get API key with UserValves priority.""" + if __user__ and "valves" in __user__: + user_valves_dict = __user__.get("valves") + if isinstance(user_valves_dict, dict) and user_valves_dict.get("VENICE_API_KEY"): + return user_valves_dict["VENICE_API_KEY"] + return user_valves.VENICE_API_KEY or valves.VENICE_API_KEY + + @staticmethod + def parse_venice_image_response(response) -> tuple[Optional[bytes], Optional[str]]: + """Parse Venice API image response - handles both binary and JSON formats.""" + content_type = response.headers.get("content-type", "") + if content_type.startswith("image/"): + return response.content, None + if "application/json" in content_type: + try: + data = response.json() + for field in ("image", "images", "data", "result"): + if field in data: + value = data[field] + if isinstance(value, str): + return base64.b64decode(value), None + if isinstance(value, list) and value: + return base64.b64decode(value[0]), None + if "error" in data: + return None, f"API error: {data.get('error')}" + return None, f"Unexpected JSON response keys: {list(data.keys())}" + except Exception as e: + return None, f"Failed to parse JSON response: {e}" + if len(response.content) > 1000: + return response.content, None + return None, f"Unknown response format: {content_type}, length: {len(response.content)}" + + +class Tools: + """Venice.ai image tools: generate, upscale, and edit.""" class Valves(BaseModel): - """ - Admin-controlled settings. These affect all users. - SAFE_MODE and HIDE_WATERMARK can be overridden per-user via UserValves. - """ - - VENICE_API_KEY: str = Field( - default="", - description="Venice.ai API key (admin fallback if user has no key)", - ) - DEFAULT_MODEL: str = Field( - default="z-image-turbo", description="Default image model" - ) - SAFE_MODE: bool = Field( - default=True, - description="Admin: Require SFW content (user can also enable in UserValves)", - ) - HIDE_WATERMARK: bool = Field( - default=False, - description="Admin: Hide watermark (user can also enable in UserValves)", - ) - COOLDOWN_SECONDS: int = Field( - default=0, - description="Minimum seconds between generations per user (0 to disable)", - ) - GENERATION_TIMEOUT: int = Field( - default=180, description="Timeout for image generation in seconds" - ) - ACCUMULATOR_TTL: int = Field( - default=300, description="Seconds to keep accumulated files in memory" - ) + VENICE_API_KEY: str = Field(default="", description="Venice.ai API key (admin fallback)") + DEFAULT_MODEL: str = Field(default="z-image-turbo", description="Default image model") + SAFE_MODE: bool = Field(default=True, description="Admin: Require SFW content") + HIDE_WATERMARK: bool = Field(default=False, description="Admin: Hide watermark") + COOLDOWN_SECONDS: int = Field(default=0, description="Min seconds between generations") + GENERATION_TIMEOUT: int = Field(default=180, description="Timeout for image generation in seconds") + ACCUMULATOR_TTL: int = Field(default=300, description="Seconds to keep accumulated files in memory") class UserValves(BaseModel): - """ - User-specific settings. These can override or augment admin defaults. - - For SAFE_MODE/HIDE_WATERMARK: Either admin OR user can enable. - - If admin sets SAFE_MODE=True, all users get SFW regardless of their setting - - If admin sets SAFE_MODE=False, users can still opt-in to SFW themselves - - For API keys: User key takes priority, admin key is fallback. - """ - - VENICE_API_KEY: str = Field( - default="", description="Your Venice.ai API key (overrides admin fallback)" - ) - SAFE_MODE: bool = Field( - default=False, - description="Enable SFW content filtering (admin can also force SFW)", - ) - HIDE_WATERMARK: bool = Field( - default=False, description="Hide Venice.ai watermark from generated images" - ) - DEFAULT_MODEL: str = Field( - default="", - description="Your preferred image model (blank = use admin default)", - ) - DEFAULT_NEGATIVE_PROMPT: str = Field( - default="", - description="Default negative prompt to exclude from all generations", - ) + VENICE_API_KEY: str = Field(default="", description="Your Venice.ai API key (overrides admin)") + SAFE_MODE: bool = Field(default=False, description="Enable SFW content filtering") + HIDE_WATERMARK: bool = Field(default=False, description="Hide Venice.ai watermark") + DEFAULT_MODEL: str = Field(default="", description="Your preferred image model") + DEFAULT_NEGATIVE_PROMPT: str = Field(default="", description="Default negative prompt") def __init__(self): self.valves = self.Valves() self.user_valves = self.UserValves() self.citation = False self._cooldowns: Dict[str, float] = {} - # Re-entrancy support: accumulate files per message context self._message_files: Dict[str, Dict[str, Any]] = {} - # Single lock for all accumulator operations (avoids per-key race) self._accumulator_lock: Optional[asyncio.Lock] = None - self._lock_init = threading.Lock() # Protects lazy init of asyncio lock + self._lock_init = threading.Lock() self._last_cleanup: float = 0.0 - # ==================== Configuration Helpers ==================== - def _is_safe_mode_enabled(self) -> bool: - """ - Safe mode is ON if Admin OR User has it enabled. - - | Admin | User | Result | - |-------|------|--------| - | SFW | SFW | SFW | Both want safe - | SFW | NSFW | SFW | Admin wins (restricts) - | NSFW | NSFW | NSFW | Both allow NSFW - | NSFW | SFW | SFW | User opts in to safety - """ return self.valves.SAFE_MODE or self.user_valves.SAFE_MODE def _is_watermark_hidden(self) -> bool: - """ - Watermark is hidden if Admin OR User has it enabled. - Either party can opt to hide the watermark. - """ return self.valves.HIDE_WATERMARK or self.user_valves.HIDE_WATERMARK - def _get_venice_key(self) -> str: - """ - Get Venice API key with UserValves priority. - User key first, then admin fallback, then empty (error handled by caller). - """ - if self.user_valves.VENICE_API_KEY: - return self.user_valves.VENICE_API_KEY - if self.valves.VENICE_API_KEY: - return self.valves.VENICE_API_KEY - return "" - def _get_default_model(self) -> str: - """Get default model: user preference first, then admin default.""" - if self.user_valves.DEFAULT_MODEL: - return self.user_valves.DEFAULT_MODEL - return self.valves.DEFAULT_MODEL + return self.user_valves.DEFAULT_MODEL or self.valves.DEFAULT_MODEL def _get_default_negative_prompt(self) -> Optional[str]: - """Get user's default negative prompt if set.""" - if self.user_valves.DEFAULT_NEGATIVE_PROMPT: - return self.user_valves.DEFAULT_NEGATIVE_PROMPT - return None + return self.user_valves.DEFAULT_NEGATIVE_PROMPT or None def _get_owui_config(self, __request__=None) -> tuple[str, dict]: - """Get Open WebUI API config from request context.""" base_url = "" headers = {} if __request__: @@ -173,144 +136,47 @@ class Tools: return base_url.rstrip("/"), headers def _get_message_key(self, __metadata__: dict = None) -> str: - """Get unique key for message context (for file accumulation).""" if __metadata__: chat_id = __metadata__.get("chat_id", "") message_id = __metadata__.get("message_id", "") if message_id: return f"{chat_id}:{message_id}" - # Fallback for missing context return f"unknown:{int(time.time())}" def _get_lock(self) -> asyncio.Lock: - """Get accumulator lock (lazy init for compatibility).""" if self._accumulator_lock is None: with self._lock_init: if self._accumulator_lock is None: self._accumulator_lock = asyncio.Lock() return self._accumulator_lock - def _parse_venice_image_response( - self, response - ) -> tuple[Optional[bytes], Optional[str]]: - """ - Parse Venice API image response - handles both binary and JSON formats. - - Returns (image_bytes, error_message). - """ - content_type = response.headers.get("content-type", "") - - # Binary response (PNG, WebP, etc.) - if content_type.startswith("image/"): - return response.content, None - - # JSON response with base64 - if "application/json" in content_type: - try: - data = response.json() - # Check various possible field names - for field in ("image", "images", "data", "result"): - if field in data: - value = data[field] - # Single image as base64 string - if isinstance(value, str): - return base64.b64decode(value), None - # Array of images - if isinstance(value, list) and value: - return base64.b64decode(value[0]), None - # If we got JSON but no image field, return the error - if "error" in data: - return None, f"API error: {data.get('error')}" - return None, f"Unexpected JSON response keys: {list(data.keys())}" - except Exception as e: - return None, f"Failed to parse JSON response: {e}" - - # Fallback: try as binary if it looks like image data - if len(response.content) > 1000: - return response.content, None - - return ( - None, - f"Unknown response format: {content_type}, length: {len(response.content)}", - ) - - async def _accumulate_files( - self, - key: str, - new_files: List[dict], - __event_emitter__: Callable[[dict], Any] = None, - ): - """ - Add files to accumulator and emit ALL accumulated files. - Thread-safe: uses single lock for all operations. - """ + async def _accumulate_files(self, key: str, new_files: List[dict], __event_emitter__: Callable[[dict], Any] = None): all_files = [] - async with self._get_lock(): - # Initialize if needed if key not in self._message_files: self._message_files[key] = {"files": [], "timestamp": time.time()} - - # Add new files (copy dicts to avoid reference issues) for f in new_files: self._message_files[key]["files"].append(dict(f)) self._message_files[key]["timestamp"] = time.time() - - # Copy list for emission (prevents mutation during async emit) all_files = list(self._message_files[key]["files"]) - - # Periodic cleanup (every 60s, while holding lock) now = time.time() if now - self._last_cleanup > 60: self._last_cleanup = now ttl = self.valves.ACCUMULATOR_TTL - expired = [ - k - for k, v in self._message_files.items() - if now - v.get("timestamp", 0) > ttl - ] + expired = [k for k, v in self._message_files.items() if now - v.get("timestamp", 0) > ttl] for k in expired: del self._message_files[k] - - # Emit outside lock (event_emitter could be slow) if all_files and __event_emitter__: await __event_emitter__({"type": "files", "data": {"files": all_files}}) def _get_accumulated_count(self, key: str) -> int: - """Get current count of accumulated files for a key (non-blocking peek).""" entry = self._message_files.get(key) - if entry: - return len(entry.get("files", [])) - return 0 + return len(entry.get("files", [])) if entry else 0 - # ==================== Files API ==================== - - async def _upload_image( - self, - image_data: bytes | str, - filename: str, - metadata: dict, - content_type: str = "image/webp", - __request__=None, - ) -> tuple[Optional[str], Optional[str]]: - """ - Upload image to Open WebUI Files API. - - Args: - image_data: Raw bytes or base64-encoded string - filename: Name for the uploaded file - metadata: File metadata dict - content_type: MIME type (image/webp, image/png, etc.) - __request__: Request context for auth - - Returns (file_id, error_message). - """ + async def _upload_image(self, image_data: bytes | str, filename: str, metadata: dict, content_type: str = "image/webp", __request__=None) -> tuple[Optional[str], Optional[str]]: base_url, headers = self._get_owui_config(__request__) - if not base_url: return None, "Could not determine Open WebUI URL" - - # Handle both bytes and base64 input if isinstance(image_data, str): try: image_bytes = base64.b64decode(image_data) @@ -318,179 +184,92 @@ class Tools: return None, f"Failed to decode image: {e}" else: image_bytes = image_data - - # Prepare multipart upload try: async with httpx.AsyncClient(timeout=60.0) as client: files = {"file": (filename, io.BytesIO(image_bytes), content_type)} - - # Include metadata as form field import json - data = {"metadata": json.dumps(metadata)} - - response = await client.post( - f"{base_url}/api/v1/files/", - headers=headers, - files=files, - data=data, - params={"process": "false", "process_in_background": "false"}, - ) - + response = await client.post(f"{base_url}/api/v1/files/", headers=headers, files=files, data=data, params={"process": "false", "process_in_background": "false"}) if response.status_code == 200: result = response.json() return result.get("id"), None else: - return ( - None, - f"Upload failed: {response.status_code} - {response.text[:200]}", - ) - + return None, f"Upload failed: {response.status_code} - {response.text[:200]}" except Exception as e: return None, f"Upload error: {type(e).__name__}: {e}" - # ==================== Image Fetching ==================== - - async def _get_image_bytes( - self, - image_source: str, - __request__=None, - __files__: list = None, - __metadata__: dict = None, - ) -> tuple[Optional[bytes], Optional[str]]: - """ - Fetch image bytes from various sources. - - Handles: - - Index references: "1", "2", "[1]", "last", "previous" (from message context) - - Open WebUI file URLs (/api/v1/files/{id}/content) - - Full URLs (https://...) - - File IDs from __files__ context - - Returns (image_bytes, error_message). - """ + async def _get_image_bytes(self, image_source: str, __request__=None, __files__: list = None, __metadata__: dict = None) -> tuple[Optional[bytes], Optional[str]]: base_url, headers = self._get_owui_config(__request__) image_source = image_source.strip() - - # Parse index references: "1", "[1]", "last", "previous", "latest" index = None if image_source.lower() in ("last", "latest", "previous", "recent"): - index = -1 # Last image + index = -1 elif image_source.isdigit(): - index = int(image_source) - 1 # 1-based to 0-based + index = int(image_source) - 1 elif image_source.startswith("[") and image_source.endswith("]"): inner = image_source[1:-1].strip() if inner.isdigit(): index = int(inner) - 1 - - # Resolve index to actual file URL if index is not None: if not base_url: return None, "Could not determine Open WebUI URL" - - # Build message chain: current message + parents message_ids = set() if __metadata__: if __metadata__.get("message_id"): message_ids.add(__metadata__["message_id"]) - # Walk up parent chain parent = __metadata__.get("parent_message") while parent: if parent.get("id"): message_ids.add(parent["id"]) - parent = parent.get("parent_message") # May not exist - # Also check parent_message_id directly + parent = parent.get("parent_message") if __metadata__.get("parent_message_id"): message_ids.add(__metadata__["parent_message_id"]) - try: async with httpx.AsyncClient(timeout=30.0) as client: - response = await client.get( - f"{base_url}/api/v1/files/", - headers=headers, - params={"content": "false"}, - ) + response = await client.get(f"{base_url}/api/v1/files/", headers=headers, params={"content": "false"}) if response.status_code != 200: return None, f"Failed to list files: {response.status_code}" - files = response.json() - - # Get images - prefer message context, fallback to all recent context_images = [] all_images = [] - for f in files: meta = f.get("meta", {}) content_type = meta.get("content_type", "") - if not content_type.startswith("image/"): continue - all_images.append(f) - - # Check for message_id at both levels: - # - meta.data.message_id (new, simpler) - # - meta.data.data.message_id (old, nested) inner_data = meta.get("data", {}) file_msg_id = None if isinstance(inner_data, dict): - file_msg_id = inner_data.get( - "message_id" - ) # Check simpler path first + file_msg_id = inner_data.get("message_id") if not file_msg_id: nested_data = inner_data.get("data", {}) if isinstance(nested_data, dict): - file_msg_id = nested_data.get( - "message_id" - ) # Fallback to nested - + file_msg_id = nested_data.get("message_id") if file_msg_id and file_msg_id in message_ids: context_images.append(f) - - # Use context images if available, otherwise all images user_images = context_images if context_images else all_images - - # Sort by created_at (newest first) user_images.sort(key=lambda f: f.get("created_at", 0), reverse=True) - if not user_images: return None, "No images found" - - # Handle index: -1 = last (newest), "1" = most recent, "2" = second most recent if index == -1: - target_idx = 0 # Newest + target_idx = 0 else: target_idx = index - if 0 <= target_idx < len(user_images): file_id = user_images[target_idx].get("id") image_source = f"/api/v1/files/{file_id}/content" else: - return ( - None, - f"Image [{index+1}] not found (found {len(user_images)} image{'s' if len(user_images) != 1 else ''})", - ) - + return None, f"Image [{index+1}] not found (found {len(user_images)} images)" except Exception as e: return None, f"Failed to resolve image index: {type(e).__name__}: {e}" - - # Check if it's a reference to an uploaded file in current message if __files__: for f in __files__: file_id = f.get("id", "") file_url = f.get("url", "") - - # Match by ID, URL, or partial match - if ( - image_source == file_id - or image_source == file_url - or image_source in file_url - or file_id in image_source - ): + if image_source == file_id or image_source == file_url or image_source in file_url or file_id in image_source: image_source = file_url or f"/api/v1/files/{file_id}/content" break - - # Normalize URL if image_source.startswith("/api/"): if not base_url: return None, "Could not determine Open WebUI URL for relative path" @@ -498,292 +277,117 @@ class Tools: elif image_source.startswith("http://") or image_source.startswith("https://"): fetch_url = image_source else: - # Assume it's a file ID if not base_url: return None, "Could not determine Open WebUI URL" fetch_url = f"{base_url}/api/v1/files/{image_source}/content" - try: async with httpx.AsyncClient(timeout=60.0) as client: response = await client.get(fetch_url, headers=headers) if response.status_code == 200: return response.content, None else: - return ( - None, - f"Failed to fetch image: {response.status_code} from {fetch_url}", - ) - + return None, f"Failed to fetch image: {response.status_code} from {fetch_url}" except Exception as e: return None, f"Fetch error: {type(e).__name__}: {e}" - # ==================== Generate ==================== - - async def generate_image( - self, - prompt: str, - model: Optional[str] = None, - width: int = 1024, - height: int = 1024, - negative_prompt: Optional[str] = None, - style_preset: Optional[str] = None, - variants: int = 1, - __request__=None, - __user__: dict = None, - __metadata__: dict = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """ - Generate image(s) using Venice.ai and upload to Open WebUI. - - IMPORTANT: For multiple images with the SAME prompt, use the 'variants' - parameter (1-4) instead of calling this function multiple times. - Multiple calls with DIFFERENT prompts are fine and will accumulate correctly. - - :param prompt: Text description of the image to generate - :param model: Image model (default: user pref or z-image-turbo). Use venice_info list_models("image") to see options - :param width: Image width 512-1280 pixels (default 1024) - :param height: Image height 512-1280 pixels (default 1024) - :param negative_prompt: What to avoid in the image (or uses user's default if set) - :param style_preset: Style preset (use venice_info list_styles to see options). Note: Some models like z-image-turbo don't support styles - :param variants: Number of variations 1-4 with same prompt (use this instead of multiple calls) - :return: Generated image details - """ - # Check API key - venice_key = self._get_venice_key() + async def generate_image(self, prompt: str, model: Optional[str] = None, width: int = 1024, height: int = 1024, negative_prompt: Optional[str] = None, style_preset: Optional[str] = None, variants: int = 1, __request__=None, __user__: dict = None, __metadata__: dict = None, __event_emitter__: Callable[[dict], Any] = None) -> str: + venice_key = VeniceImage.get_api_key(self.valves, self.user_valves, __user__) if not venice_key: - return "Generate Image\nStatus: 0\nError: Venice.ai API key not configured. Set in UserValves or ask admin." - - # Validate prompt + return "Generate Image\nStatus: 0\nError: Venice.ai API key not configured." if not prompt or not prompt.strip(): return "Generate Image\nStatus: 0\nError: Prompt is required" - - # Get message context for accumulation msg_key = self._get_message_key(__metadata__) - - # Check cooldown (skip if disabled or if this is a re-entrant call) user_id = __user__.get("id", "default") if __user__ else "default" cooldown = self.valves.COOLDOWN_SECONDS - if cooldown > 0: now = time.time() last_gen = self._cooldowns.get(user_id, 0) is_reentrant = self._get_accumulated_count(msg_key) > 0 - if not is_reentrant and now - last_gen < cooldown: remaining = cooldown - (now - last_gen) - return f"Generate Image\nStatus: 429\nError: Rate limited. Wait {remaining:.1f}s. TIP: Use 'variants' parameter (1-4) for multiple images with the same prompt." - + return f"Generate Image\nStatus: 429\nError: Rate limited. Wait {remaining:.1f}s." self._cooldowns[user_id] = now - - # Get effective settings using helper methods model = model or self._get_default_model() safe_mode = self._is_safe_mode_enabled() hide_watermark = self._is_watermark_hidden() - - # Handle negative prompt: explicit > user default > none - effective_negative_prompt = negative_prompt - if effective_negative_prompt is None: - effective_negative_prompt = self._get_default_negative_prompt() - - # Clamp values + effective_negative_prompt = negative_prompt or self._get_default_negative_prompt() variants = max(1, min(4, variants)) width = max(512, min(1280, width)) height = max(512, min(1280, height)) - - # Count existing images (non-blocking peek) existing_count = self._get_accumulated_count(msg_key) - - # Status update if __event_emitter__: - status_msg = ( - f"Generating {variants} image{'s' if variants > 1 else ''} with {model}" - ) + status_msg = f"Generating {variants} image{'s' if variants > 1 else ''} with {model}" if existing_count > 0: status_msg += f" (adding to {existing_count} existing)" if safe_mode: status_msg += " [SFW]" - await __event_emitter__( - { - "type": "status", - "data": {"description": f"{status_msg}...", "done": False}, - } - ) - - # Build Venice request - payload = { - "model": model, - "prompt": prompt, - "width": width, - "height": height, - "safe_mode": safe_mode, - "hide_watermark": hide_watermark, - "return_binary": False, - "variants": variants, - } - + await __event_emitter__({"type": "status", "data": {"description": f"{status_msg}...", "done": False}}) + payload = {"model": model, "prompt": prompt, "width": width, "height": height, "safe_mode": safe_mode, "hide_watermark": hide_watermark, "return_binary": False, "variants": variants} if effective_negative_prompt: payload["negative_prompt"] = effective_negative_prompt if style_preset: payload["style_preset"] = style_preset - - # Generate images - with retry for unsupported params retried = False dropped_params = [] - try: - async with httpx.AsyncClient( - timeout=float(self.valves.GENERATION_TIMEOUT) - ) as client: - response = await client.post( - "https://api.venice.ai/api/v1/image/generate", - headers={ - "Authorization": f"Bearer {venice_key}", - "Content-Type": "application/json", - }, - json=payload, - ) - - # Check for parameter errors and retry without optional params + async with httpx.AsyncClient(timeout=float(self.valves.GENERATION_TIMEOUT)) as client: + response = await client.post("https://api.venice.ai/api/v1/image/generate", headers={"Authorization": f"Bearer {venice_key}", "Content-Type": "application/json"}, json=payload) if response.status_code in (400, 422) and not retried: error_text = response.text.lower() - # Check if it's a style/parameter support issue - if any( - kw in error_text - for kw in ("style", "unsupported", "invalid", "parameter") - ): - # Retry without style_preset and negative_prompt + if any(kw in error_text for kw in ("style", "unsupported", "invalid", "parameter")): if style_preset: del payload["style_preset"] dropped_params.append("style_preset") if effective_negative_prompt: del payload["negative_prompt"] dropped_params.append("negative_prompt") - if dropped_params: retried = True - response = await client.post( - "https://api.venice.ai/api/v1/image/generate", - headers={ - "Authorization": f"Bearer {venice_key}", - "Content-Type": "application/json", - }, - json=payload, - ) - + response = await client.post("https://api.venice.ai/api/v1/image/generate", headers={"Authorization": f"Bearer {venice_key}", "Content-Type": "application/json"}, json=payload) response.raise_for_status() result = response.json() - except httpx.HTTPStatusError as e: if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) - return f"Generate Image\nStatus: {e.response.status_code}\nError: Venice API error: {e.response.text[:200]}" + return f"Generate Image\nStatus: {e.response.status_code}\nError: {e.response.text[:200]}" except httpx.TimeoutException: if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) - return f"Generate Image\nStatus: 408\nError: Generation timed out after {self.valves.GENERATION_TIMEOUT}s" + return f"Generate Image\nStatus: 408\nError: Timed out after {self.valves.GENERATION_TIMEOUT}s" except Exception as e: if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) return f"Generate Image\nStatus: 0\nError: {type(e).__name__}: {e}" - images = result.get("images", []) if not images: if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) - return "Generate Image\nStatus: 0\nError: No images returned from Venice.ai" - - # Update status + return "Generate Image\nStatus: 0\nError: No images returned" if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": f"Uploading {len(images)} image{'s' if len(images) > 1 else ''}...", - "done": False, - }, - } - ) - - # Get context for metadata + await __event_emitter__({"type": "status", "data": {"description": f"Uploading {len(images)} images...", "done": False}}) chat_id = __metadata__.get("chat_id") if __metadata__ else None message_id = __metadata__.get("message_id") if __metadata__ else None - - # Upload each image to Files API uploaded_files = [] errors = [] - for i, image_b64 in enumerate(images): - timestamp = int(time.time() * 1000) # ms precision for uniqueness + timestamp = int(time.time() * 1000) filename = f"venice_{model}_{timestamp}_{i+1}.webp" - - # Build metadata - chat_id/message_id at top level for consistency - file_metadata = { - "name": filename, - "content_type": "image/webp", - "data": { - "model": model, - "prompt": prompt, - "negative_prompt": effective_negative_prompt, - "style_preset": style_preset, - "width": width, - "height": height, - "variant": i + 1, - "total_variants": len(images), - "safe_mode": safe_mode, - "hide_watermark": hide_watermark, - }, - } - - # Associations at top level (becomes meta.data.chat_id, not meta.data.data.chat_id) + file_metadata = {"name": filename, "content_type": "image/webp", "data": {"model": model, "prompt": prompt, "negative_prompt": effective_negative_prompt, "style_preset": style_preset, "width": width, "height": height, "variant": i+1, "total_variants": len(images), "safe_mode": safe_mode, "hide_watermark": hide_watermark}} if chat_id: file_metadata["chat_id"] = chat_id if message_id: file_metadata["message_id"] = message_id - - file_id, error = await self._upload_image( - image_b64, filename, file_metadata, "image/webp", __request__ - ) - + file_id, error = await self._upload_image(image_b64, filename, file_metadata, "image/webp", __request__) if file_id: - uploaded_files.append( - { - "type": "image", - "url": f"/api/v1/files/{file_id}/content", - } - ) + uploaded_files.append({"type": "image", "url": f"/api/v1/files/{file_id}/content"}) else: errors.append(f"Variant {i+1}: {error}") - - # Accumulate files and emit ALL accumulated files for this message if uploaded_files: await self._accumulate_files(msg_key, uploaded_files, __event_emitter__) - - # Get final count after accumulation final_count = self._get_accumulated_count(msg_key) - - # Clear status if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": f"Done ({final_count} image{'s' if final_count != 1 else ''} total)", - "done": True, - }, - } - ) - - # Build response with file references for upscale/edit - parts = [ - "Generate Image", - "Status: 200", - "", - f"Generated {len(uploaded_files)} image(s) for: {prompt[:100]}{'...' if len(prompt) > 100 else ''}", - f"Model: {model} | Size: {width}x{height}", - ] - - # Show effective settings + await __event_emitter__({"type": "status", "data": {"description": f"Done ({final_count} images total)", "done": True}}) + parts = ["Generate Image", "Status: 200", "", f"Generated {len(uploaded_files)} image(s) for: {prompt[:100]}{'...' if len(prompt) > 100 else ''}", f"Model: {model} | Size: {width}x{height}"] settings_parts = [] if safe_mode: settings_parts.append("SFW") @@ -791,130 +395,55 @@ class Tools: settings_parts.append("No watermark") if settings_parts: parts.append(f"Settings: {', '.join(settings_parts)}") - - # Include file URLs so subsequent upscale/edit calls know what to reference if uploaded_files: - parts.append("") - parts.append("Files (use these URLs for upscale_image or edit_image):") + parts.append("", "Files:") for i, f in enumerate(uploaded_files): parts.append(f" [{i+1}] {f['url']}") - if dropped_params: - parts.append( - f"Note: {model} doesn't support: {', '.join(dropped_params)} (ignored)" - ) - + parts.append(f"Note: {model} doesn't support: {', '.join(dropped_params)} (ignored)") if final_count > len(uploaded_files): parts.append(f"Total images in message: {final_count}") - if errors: - parts.append("") - parts.append("Warnings:") + parts.append("", "Warnings:") for e in errors: parts.append(f" - {e}") - return "\n".join(parts) - # ==================== Upscale ==================== - - async def upscale_image( - self, - image: str, - scale: int = 2, - enhance: bool = False, - enhance_creativity: float = 0.5, - enhance_prompt: Optional[str] = None, - __request__=None, - __user__: dict = None, - __metadata__: dict = None, - __files__: list = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """ - Upscale an image to higher resolution, optionally with AI enhancement. - - :param image: Image reference - simplest options: - - "1" or "last" - Your most recently generated/uploaded image - - "2" - Second most recent image - - Or full URL from generate_image output - :param scale: Scale factor 1-4 (default 2). Use 1 with enhance=True for enhancement only - :param enhance: Apply AI enhancement during upscaling (adds detail/clarity) - :param enhance_creativity: How much AI can modify the image 0.0-1.0 (default 0.5). Higher = more changes - :param enhance_prompt: Style hint for enhancement like "sharp", "cinematic", "vibrant" (optional) - :return: Upscaled image result - """ - # Check API key - venice_key = self._get_venice_key() + async def upscale_image(self, image: str, scale: int = 2, enhance: bool = False, enhance_creativity: float = 0.5, enhance_prompt: Optional[str] = None, __request__=None, __user__: dict = None, __metadata__: dict = None, __files__: list = None, __event_emitter__: Callable[[dict], Any] = None) -> str: + venice_key = VeniceImage.get_api_key(self.valves, self.user_valves, __user__) if not venice_key: return "Upscale Image\nStatus: 0\nError: Venice.ai API key not configured." - - # Validate parameters scale = max(1, min(4, scale)) if scale == 1 and not enhance: - return "Upscale Image\nStatus: 0\nError: scale=1 requires enhance=True (enhancement only mode)" - + return "Upscale Image\nStatus: 0\nError: scale=1 requires enhance=True" enhance_creativity = max(0.0, min(1.0, enhance_creativity)) - - # Get message context for accumulation msg_key = self._get_message_key(__metadata__) existing_count = self._get_accumulated_count(msg_key) - - # Status update if __event_emitter__: mode = "Enhancing" if scale == 1 else f"Upscaling {scale}x" if enhance and scale > 1: mode += " + enhancing" - await __event_emitter__( - {"type": "status", "data": {"description": f"{mode}...", "done": False}} - ) - - # Fetch source image - image_bytes, error = await self._get_image_bytes( - image, __request__, __files__, __metadata__ - ) + await __event_emitter__({"type": "status", "data": {"description": f"{mode}...", "done": False}}) + image_bytes, error = await self._get_image_bytes(image, __request__, __files__, __metadata__) if error: if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) - return ( - f"Upscale Image\nStatus: 0\nError: Failed to get source image: {error}" - ) - - # Build request - payload = { - "image": base64.b64encode(image_bytes).decode("utf-8"), - "scale": scale, - } - + return f"Upscale Image\nStatus: 0\nError: {error}" + payload = {"image": base64.b64encode(image_bytes).decode("utf-8"), "scale": scale} if enhance: payload["enhance"] = True payload["enhanceCreativity"] = enhance_creativity if enhance_prompt: payload["enhancePrompt"] = enhance_prompt[:1500] - - # Call Venice API try: - async with httpx.AsyncClient( - timeout=float(self.valves.GENERATION_TIMEOUT) - ) as client: - response = await client.post( - "https://api.venice.ai/api/v1/image/upscale", - headers={ - "Authorization": f"Bearer {venice_key}", - "Content-Type": "application/json", - }, - json=payload, - ) + async with httpx.AsyncClient(timeout=float(self.valves.GENERATION_TIMEOUT)) as client: + response = await client.post("https://api.venice.ai/api/v1/image/upscale", headers={"Authorization": f"Bearer {venice_key}", "Content-Type": "application/json"}, json=payload) response.raise_for_status() - - # Parse response (handles both binary and JSON) - result_bytes, parse_error = self._parse_venice_image_response(response) + result_bytes, parse_error = VeniceImage.parse_venice_image_response(response) if parse_error: if __event_emitter__: - await __event_emitter__( - {"type": "status", "data": {"done": True}} - ) + await __event_emitter__({"type": "status", "data": {"done": True}}) return f"Upscale Image\nStatus: 0\nError: {parse_error}" - except httpx.HTTPStatusError as e: if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) @@ -923,172 +452,55 @@ class Tools: if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) return f"Upscale Image\nStatus: 0\nError: {type(e).__name__}: {e}" - - # Upload result if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Uploading...", "done": False}, - } - ) - + await __event_emitter__({"type": "status", "data": {"description": "Uploading...", "done": False}}) chat_id = __metadata__.get("chat_id") if __metadata__ else None message_id = __metadata__.get("message_id") if __metadata__ else None - timestamp = int(time.time() * 1000) filename = f"venice_upscale_{scale}x_{timestamp}.png" - - file_metadata = { - "name": filename, - "content_type": "image/png", - "data": { - "operation": "upscale", - "scale": scale, - "enhance": enhance, - "enhance_creativity": enhance_creativity if enhance else None, - "enhance_prompt": enhance_prompt if enhance else None, - "source_image": image[:100], - }, - } - - # Associations at top level (becomes meta.data.chat_id) + file_metadata = {"name": filename, "content_type": "image/png", "data": {"operation": "upscale", "scale": scale, "enhance": enhance, "enhance_creativity": enhance_creativity if enhance else None, "enhance_prompt": enhance_prompt if enhance else None}} if chat_id: file_metadata["chat_id"] = chat_id if message_id: file_metadata["message_id"] = message_id - - file_id, upload_error = await self._upload_image( - result_bytes, filename, file_metadata, "image/png", __request__ - ) - + file_id, upload_error = await self._upload_image(result_bytes, filename, file_metadata, "image/png", __request__) if not file_id: if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) return f"Upscale Image\nStatus: 0\nError: Upload failed - {upload_error}" - - # Accumulate and emit all files new_file = {"type": "image", "url": f"/api/v1/files/{file_id}/content"} await self._accumulate_files(msg_key, [new_file], __event_emitter__) - final_count = self._get_accumulated_count(msg_key) - - # Clear status if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": f"Done ({final_count} image{'s' if final_count != 1 else ''} total)", - "done": True, - }, - } - ) + await __event_emitter__({"type": "status", "data": {"description": f"Done ({final_count} images total)", "done": True}}) + return f"Upscale Image\nStatus: 200\n\nUpscaled {scale}x{' with enhancement' if enhance else ''}\n\nFile: {new_file['url']}" - # Build response - parts = [ - "Upscale Image", - "Status: 200", - "", - f"Upscaled {scale}x" + (" with enhancement" if enhance else ""), - "", - f"File (use for further upscale/edit): {new_file['url']}", - ] - - return "\n".join(parts) - - # ==================== Edit ==================== - - async def edit_image( - self, - image: str, - prompt: str, - __request__=None, - __user__: dict = None, - __metadata__: dict = None, - __files__: list = None, - __event_emitter__: Callable[[dict], Any] = None, - ) -> str: - """ - Edit an existing image using text instructions. - - Venice uses the Qwen-Image model for editing. Works best with - short, clear instructions like: - - "Remove the background" - - "Change the sky to sunset" - - "Make it black and white" - - "Add snow to the scene" - - :param image: Image reference - simplest options: - - "1" or "last" - Your most recently generated/uploaded image - - "2" - Second most recent image - - Or full URL from generate_image output - :param prompt: Text instructions describing the edit to make (max 1500 chars) - :return: Edited image result - """ - # Check API key - venice_key = self._get_venice_key() + async def edit_image(self, image: str, prompt: str, __request__=None, __user__: dict = None, __metadata__: dict = None, __files__: list = None, __event_emitter__: Callable[[dict], Any] = None) -> str: + venice_key = VeniceImage.get_api_key(self.valves, self.user_valves, __user__) if not venice_key: return "Edit Image\nStatus: 0\nError: Venice.ai API key not configured." - - # Validate prompt if not prompt or not prompt.strip(): return "Edit Image\nStatus: 0\nError: Edit prompt is required" - prompt = prompt.strip()[:1500] - - # Get message context for accumulation msg_key = self._get_message_key(__metadata__) existing_count = self._get_accumulated_count(msg_key) - - # Status update if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Editing image...", "done": False}, - } - ) - - # Fetch source image - image_bytes, error = await self._get_image_bytes( - image, __request__, __files__, __metadata__ - ) + await __event_emitter__({"type": "status", "data": {"description": "Editing image...", "done": False}}) + image_bytes, error = await self._get_image_bytes(image, __request__, __files__, __metadata__) if error: if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) - return f"Edit Image\nStatus: 0\nError: Failed to get source image: {error}" - - # Build request - payload = { - "image": base64.b64encode(image_bytes).decode("utf-8"), - "prompt": prompt, - } - - # Call Venice API + return f"Edit Image\nStatus: 0\nError: {error}" + payload = {"image": base64.b64encode(image_bytes).decode("utf-8"), "prompt": prompt} try: - async with httpx.AsyncClient( - timeout=float(self.valves.GENERATION_TIMEOUT) - ) as client: - response = await client.post( - "https://api.venice.ai/api/v1/image/edit", - headers={ - "Authorization": f"Bearer {venice_key}", - "Content-Type": "application/json", - }, - json=payload, - ) + async with httpx.AsyncClient(timeout=float(self.valves.GENERATION_TIMEOUT)) as client: + response = await client.post("https://api.venice.ai/api/v1/image/edit", headers={"Authorization": f"Bearer {venice_key}", "Content-Type": "application/json"}, json=payload) response.raise_for_status() - - # Parse response (handles both binary and JSON) - result_bytes, parse_error = self._parse_venice_image_response(response) + result_bytes, parse_error = VeniceImage.parse_venice_image_response(response) if parse_error: if __event_emitter__: - await __event_emitter__( - {"type": "status", "data": {"done": True}} - ) + await __event_emitter__({"type": "status", "data": {"done": True}}) return f"Edit Image\nStatus: 0\nError: {parse_error}" - except httpx.HTTPStatusError as e: if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) @@ -1097,73 +509,25 @@ class Tools: if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) return f"Edit Image\nStatus: 0\nError: {type(e).__name__}: {e}" - - # Upload result if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": {"description": "Uploading...", "done": False}, - } - ) - + await __event_emitter__({"type": "status", "data": {"description": "Uploading...", "done": False}}) chat_id = __metadata__.get("chat_id") if __metadata__ else None message_id = __metadata__.get("message_id") if __metadata__ else None - timestamp = int(time.time() * 1000) filename = f"venice_edit_{timestamp}.png" - - file_metadata = { - "name": filename, - "content_type": "image/png", - "data": { - "operation": "edit", - "prompt": prompt, - "source_image": image[:100], - }, - } - - # Associations at top level (becomes meta.data.chat_id) + file_metadata = {"name": filename, "content_type": "image/png", "data": {"operation": "edit", "prompt": prompt}} if chat_id: file_metadata["chat_id"] = chat_id if message_id: file_metadata["message_id"] = message_id - - file_id, upload_error = await self._upload_image( - result_bytes, filename, file_metadata, "image/png", __request__ - ) - + file_id, upload_error = await self._upload_image(result_bytes, filename, file_metadata, "image/png", __request__) if not file_id: if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) return f"Edit Image\nStatus: 0\nError: Upload failed - {upload_error}" - - # Accumulate and emit all files new_file = {"type": "image", "url": f"/api/v1/files/{file_id}/content"} await self._accumulate_files(msg_key, [new_file], __event_emitter__) - final_count = self._get_accumulated_count(msg_key) - - # Clear status if __event_emitter__: - await __event_emitter__( - { - "type": "status", - "data": { - "description": f"Done ({final_count} image{'s' if final_count != 1 else ''} total)", - "done": True, - }, - } - ) - - # Build response - parts = [ - "Edit Image", - "Status: 200", - "", - f"Applied edit: {prompt[:80]}{'...' if len(prompt) > 80 else ''}", - "", - f"File (use for further upscale/edit): {new_file['url']}", - ] - - return "\n".join(parts) + await __event_emitter__({"type": "status", "data": {"description": f"Done ({final_count} images total)", "done": True}}) + return f"Edit Image\nStatus: 200\n\nApplied edit: {prompt[:80]}{'...' if len(prompt) > 80 else ''}\n\nFile: {new_file['url']}"