""" title: Venice.ai Image Tools author: Jeff Smith + Claude version: 1.6.0 license: MIT required_open_webui_version: 0.6.0 requirements: httpx, pydantic description: | Venice.ai image generation, upscaling, and editing tools. - generate_image: Create images from text prompts - upscale_image: Increase resolution up to 4x with optional enhancement - edit_image: Modify existing images with text instructions All operations upload results to Open WebUI Files API for persistence and attach images to chat via event emitter for inline display. Re-entrant safe: Multiple concurrent calls accumulate images correctly. v1.6.0: Added UserValves for SAFE_MODE and HIDE_WATERMARK with proper admin/user override logic. """ import asyncio import threading import time import base64 import io from typing import Optional, Callable, Any, Dict, List from pydantic import BaseModel, Field import httpx class Tools: """ Venice.ai image tools: generate, upscale, and edit. All operations upload results to Open WebUI Files API and attach images to chat messages for inline display. Re-entrant: Multiple calls within the same message context accumulate all images rather than overwriting each other. """ class Valves(BaseModel): """ Admin-controlled settings. These affect all users. SAFE_MODE and HIDE_WATERMARK can be overridden per-user via UserValves. """ VENICE_API_KEY: str = Field( default="", description="Venice.ai API key (admin fallback if user has no key)", ) DEFAULT_MODEL: str = Field( default="z-image-turbo", description="Default image model" ) SAFE_MODE: bool = Field( default=True, description="Admin: Require SFW content (user can also enable in UserValves)", ) HIDE_WATERMARK: bool = Field( default=False, description="Admin: Hide watermark (user can also enable in UserValves)", ) COOLDOWN_SECONDS: int = Field( default=0, description="Minimum seconds between generations per user (0 to disable)", ) GENERATION_TIMEOUT: int = Field( default=180, description="Timeout for image generation in seconds" ) ACCUMULATOR_TTL: int = Field( default=300, description="Seconds to keep accumulated files in memory" ) class UserValves(BaseModel): """ User-specific settings. These can override or augment admin defaults. For SAFE_MODE/HIDE_WATERMARK: Either admin OR user can enable. - If admin sets SAFE_MODE=True, all users get SFW regardless of their setting - If admin sets SAFE_MODE=False, users can still opt-in to SFW themselves For API keys: User key takes priority, admin key is fallback. """ VENICE_API_KEY: str = Field( default="", description="Your Venice.ai API key (overrides admin fallback)" ) SAFE_MODE: bool = Field( default=False, description="Enable SFW content filtering (admin can also force SFW)", ) HIDE_WATERMARK: bool = Field( default=False, description="Hide Venice.ai watermark from generated images" ) DEFAULT_MODEL: str = Field( default="", description="Your preferred image model (blank = use admin default)", ) DEFAULT_NEGATIVE_PROMPT: str = Field( default="", description="Default negative prompt to exclude from all generations", ) def __init__(self): self.valves = self.Valves() self.user_valves = self.UserValves() self.citation = False self._cooldowns: Dict[str, float] = {} # Re-entrancy support: accumulate files per message context self._message_files: Dict[str, Dict[str, Any]] = {} # Single lock for all accumulator operations (avoids per-key race) self._accumulator_lock: Optional[asyncio.Lock] = None self._lock_init = threading.Lock() # Protects lazy init of asyncio lock self._last_cleanup: float = 0.0 # ==================== Configuration Helpers ==================== def _is_safe_mode_enabled(self) -> bool: """ Safe mode is ON if Admin OR User has it enabled. | Admin | User | Result | |-------|------|--------| | SFW | SFW | SFW | Both want safe | SFW | NSFW | SFW | Admin wins (restricts) | NSFW | NSFW | NSFW | Both allow NSFW | NSFW | SFW | SFW | User opts in to safety """ return self.valves.SAFE_MODE or self.user_valves.SAFE_MODE def _is_watermark_hidden(self) -> bool: """ Watermark is hidden if Admin OR User has it enabled. Either party can opt to hide the watermark. """ return self.valves.HIDE_WATERMARK or self.user_valves.HIDE_WATERMARK def _get_venice_key(self) -> str: """ Get Venice API key with UserValves priority. User key first, then admin fallback, then empty (error handled by caller). """ if self.user_valves.VENICE_API_KEY: return self.user_valves.VENICE_API_KEY if self.valves.VENICE_API_KEY: return self.valves.VENICE_API_KEY return "" def _get_default_model(self) -> str: """Get default model: user preference first, then admin default.""" if self.user_valves.DEFAULT_MODEL: return self.user_valves.DEFAULT_MODEL return self.valves.DEFAULT_MODEL def _get_default_negative_prompt(self) -> Optional[str]: """Get user's default negative prompt if set.""" if self.user_valves.DEFAULT_NEGATIVE_PROMPT: return self.user_valves.DEFAULT_NEGATIVE_PROMPT return None def _get_owui_config(self, __request__=None) -> tuple[str, dict]: """Get Open WebUI API config from request context.""" base_url = "" headers = {} if __request__: base_url = str(getattr(__request__, "base_url", "") or "") req_headers = getattr(__request__, "headers", {}) if req_headers: auth = dict(req_headers).get("authorization", "") if auth: headers["Authorization"] = auth return base_url.rstrip("/"), headers def _get_message_key(self, __metadata__: dict = None) -> str: """Get unique key for message context (for file accumulation).""" if __metadata__: chat_id = __metadata__.get("chat_id", "") message_id = __metadata__.get("message_id", "") if message_id: return f"{chat_id}:{message_id}" # Fallback for missing context return f"unknown:{int(time.time())}" def _get_lock(self) -> asyncio.Lock: """Get accumulator lock (lazy init for compatibility).""" if self._accumulator_lock is None: with self._lock_init: if self._accumulator_lock is None: self._accumulator_lock = asyncio.Lock() return self._accumulator_lock def _parse_venice_image_response( self, response ) -> tuple[Optional[bytes], Optional[str]]: """ Parse Venice API image response - handles both binary and JSON formats. Returns (image_bytes, error_message). """ content_type = response.headers.get("content-type", "") # Binary response (PNG, WebP, etc.) if content_type.startswith("image/"): return response.content, None # JSON response with base64 if "application/json" in content_type: try: data = response.json() # Check various possible field names for field in ("image", "images", "data", "result"): if field in data: value = data[field] # Single image as base64 string if isinstance(value, str): return base64.b64decode(value), None # Array of images if isinstance(value, list) and value: return base64.b64decode(value[0]), None # If we got JSON but no image field, return the error if "error" in data: return None, f"API error: {data.get('error')}" return None, f"Unexpected JSON response keys: {list(data.keys())}" except Exception as e: return None, f"Failed to parse JSON response: {e}" # Fallback: try as binary if it looks like image data if len(response.content) > 1000: return response.content, None return ( None, f"Unknown response format: {content_type}, length: {len(response.content)}", ) async def _accumulate_files( self, key: str, new_files: List[dict], __event_emitter__: Callable[[dict], Any] = None, ): """ Add files to accumulator and emit ALL accumulated files. Thread-safe: uses single lock for all operations. """ all_files = [] async with self._get_lock(): # Initialize if needed if key not in self._message_files: self._message_files[key] = {"files": [], "timestamp": time.time()} # Add new files (copy dicts to avoid reference issues) for f in new_files: self._message_files[key]["files"].append(dict(f)) self._message_files[key]["timestamp"] = time.time() # Copy list for emission (prevents mutation during async emit) all_files = list(self._message_files[key]["files"]) # Periodic cleanup (every 60s, while holding lock) now = time.time() if now - self._last_cleanup > 60: self._last_cleanup = now ttl = self.valves.ACCUMULATOR_TTL expired = [ k for k, v in self._message_files.items() if now - v.get("timestamp", 0) > ttl ] for k in expired: del self._message_files[k] # Emit outside lock (event_emitter could be slow) if all_files and __event_emitter__: await __event_emitter__({"type": "files", "data": {"files": all_files}}) def _get_accumulated_count(self, key: str) -> int: """Get current count of accumulated files for a key (non-blocking peek).""" entry = self._message_files.get(key) if entry: return len(entry.get("files", [])) return 0 # ==================== Files API ==================== async def _upload_image( self, image_data: bytes | str, filename: str, metadata: dict, content_type: str = "image/webp", __request__=None, ) -> tuple[Optional[str], Optional[str]]: """ Upload image to Open WebUI Files API. Args: image_data: Raw bytes or base64-encoded string filename: Name for the uploaded file metadata: File metadata dict content_type: MIME type (image/webp, image/png, etc.) __request__: Request context for auth Returns (file_id, error_message). """ base_url, headers = self._get_owui_config(__request__) if not base_url: return None, "Could not determine Open WebUI URL" # Handle both bytes and base64 input if isinstance(image_data, str): try: image_bytes = base64.b64decode(image_data) except Exception as e: return None, f"Failed to decode image: {e}" else: image_bytes = image_data # Prepare multipart upload try: async with httpx.AsyncClient(timeout=60.0) as client: files = {"file": (filename, io.BytesIO(image_bytes), content_type)} # Include metadata as form field import json data = {"metadata": json.dumps(metadata)} response = await client.post( f"{base_url}/api/v1/files/", headers=headers, files=files, data=data, params={"process": "false", "process_in_background": "false"}, ) if response.status_code == 200: result = response.json() return result.get("id"), None else: return ( None, f"Upload failed: {response.status_code} - {response.text[:200]}", ) except Exception as e: return None, f"Upload error: {type(e).__name__}: {e}" # ==================== Image Fetching ==================== async def _get_image_bytes( self, image_source: str, __request__=None, __files__: list = None, __metadata__: dict = None, ) -> tuple[Optional[bytes], Optional[str]]: """ Fetch image bytes from various sources. Handles: - Index references: "1", "2", "[1]", "last", "previous" (from message context) - Open WebUI file URLs (/api/v1/files/{id}/content) - Full URLs (https://...) - File IDs from __files__ context Returns (image_bytes, error_message). """ base_url, headers = self._get_owui_config(__request__) image_source = image_source.strip() # Parse index references: "1", "[1]", "last", "previous", "latest" index = None if image_source.lower() in ("last", "latest", "previous", "recent"): index = -1 # Last image elif image_source.isdigit(): index = int(image_source) - 1 # 1-based to 0-based elif image_source.startswith("[") and image_source.endswith("]"): inner = image_source[1:-1].strip() if inner.isdigit(): index = int(inner) - 1 # Resolve index to actual file URL if index is not None: if not base_url: return None, "Could not determine Open WebUI URL" # Build message chain: current message + parents message_ids = set() if __metadata__: if __metadata__.get("message_id"): message_ids.add(__metadata__["message_id"]) # Walk up parent chain parent = __metadata__.get("parent_message") while parent: if parent.get("id"): message_ids.add(parent["id"]) parent = parent.get("parent_message") # May not exist # Also check parent_message_id directly if __metadata__.get("parent_message_id"): message_ids.add(__metadata__["parent_message_id"]) try: async with httpx.AsyncClient(timeout=30.0) as client: response = await client.get( f"{base_url}/api/v1/files/", headers=headers, params={"content": "false"}, ) if response.status_code != 200: return None, f"Failed to list files: {response.status_code}" files = response.json() # Get images - prefer message context, fallback to all recent context_images = [] all_images = [] for f in files: meta = f.get("meta", {}) content_type = meta.get("content_type", "") if not content_type.startswith("image/"): continue all_images.append(f) # Check for message_id at both levels: # - meta.data.message_id (new, simpler) # - meta.data.data.message_id (old, nested) inner_data = meta.get("data", {}) file_msg_id = None if isinstance(inner_data, dict): file_msg_id = inner_data.get( "message_id" ) # Check simpler path first if not file_msg_id: nested_data = inner_data.get("data", {}) if isinstance(nested_data, dict): file_msg_id = nested_data.get( "message_id" ) # Fallback to nested if file_msg_id and file_msg_id in message_ids: context_images.append(f) # Use context images if available, otherwise all images user_images = context_images if context_images else all_images # Sort by created_at (newest first) user_images.sort(key=lambda f: f.get("created_at", 0), reverse=True) if not user_images: return None, "No images found" # Handle index: -1 = last (newest), "1" = most recent, "2" = second most recent if index == -1: target_idx = 0 # Newest else: target_idx = index if 0 <= target_idx < len(user_images): file_id = user_images[target_idx].get("id") image_source = f"/api/v1/files/{file_id}/content" else: return ( None, f"Image [{index+1}] not found (found {len(user_images)} image{'s' if len(user_images) != 1 else ''})", ) except Exception as e: return None, f"Failed to resolve image index: {type(e).__name__}: {e}" # Check if it's a reference to an uploaded file in current message if __files__: for f in __files__: file_id = f.get("id", "") file_url = f.get("url", "") # Match by ID, URL, or partial match if ( image_source == file_id or image_source == file_url or image_source in file_url or file_id in image_source ): image_source = file_url or f"/api/v1/files/{file_id}/content" break # Normalize URL if image_source.startswith("/api/"): if not base_url: return None, "Could not determine Open WebUI URL for relative path" fetch_url = f"{base_url}{image_source}" elif image_source.startswith("http://") or image_source.startswith("https://"): fetch_url = image_source else: # Assume it's a file ID if not base_url: return None, "Could not determine Open WebUI URL" fetch_url = f"{base_url}/api/v1/files/{image_source}/content" try: async with httpx.AsyncClient(timeout=60.0) as client: response = await client.get(fetch_url, headers=headers) if response.status_code == 200: return response.content, None else: return ( None, f"Failed to fetch image: {response.status_code} from {fetch_url}", ) except Exception as e: return None, f"Fetch error: {type(e).__name__}: {e}" # ==================== Generate ==================== async def generate_image( self, prompt: str, model: Optional[str] = None, width: int = 1024, height: int = 1024, negative_prompt: Optional[str] = None, style_preset: Optional[str] = None, variants: int = 1, __request__=None, __user__: dict = None, __metadata__: dict = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ Generate image(s) using Venice.ai and upload to Open WebUI. IMPORTANT: For multiple images with the SAME prompt, use the 'variants' parameter (1-4) instead of calling this function multiple times. Multiple calls with DIFFERENT prompts are fine and will accumulate correctly. :param prompt: Text description of the image to generate :param model: Image model (default: user pref or z-image-turbo). Use venice_info list_models("image") to see options :param width: Image width 512-1280 pixels (default 1024) :param height: Image height 512-1280 pixels (default 1024) :param negative_prompt: What to avoid in the image (or uses user's default if set) :param style_preset: Style preset (use venice_info list_styles to see options). Note: Some models like z-image-turbo don't support styles :param variants: Number of variations 1-4 with same prompt (use this instead of multiple calls) :return: Generated image details """ # Check API key venice_key = self._get_venice_key() if not venice_key: return "Generate Image\nStatus: 0\nError: Venice.ai API key not configured. Set in UserValves or ask admin." # Validate prompt if not prompt or not prompt.strip(): return "Generate Image\nStatus: 0\nError: Prompt is required" # Get message context for accumulation msg_key = self._get_message_key(__metadata__) # Check cooldown (skip if disabled or if this is a re-entrant call) user_id = __user__.get("id", "default") if __user__ else "default" cooldown = self.valves.COOLDOWN_SECONDS if cooldown > 0: now = time.time() last_gen = self._cooldowns.get(user_id, 0) is_reentrant = self._get_accumulated_count(msg_key) > 0 if not is_reentrant and now - last_gen < cooldown: remaining = cooldown - (now - last_gen) return f"Generate Image\nStatus: 429\nError: Rate limited. Wait {remaining:.1f}s. TIP: Use 'variants' parameter (1-4) for multiple images with the same prompt." self._cooldowns[user_id] = now # Get effective settings using helper methods model = model or self._get_default_model() safe_mode = self._is_safe_mode_enabled() hide_watermark = self._is_watermark_hidden() # Handle negative prompt: explicit > user default > none effective_negative_prompt = negative_prompt if effective_negative_prompt is None: effective_negative_prompt = self._get_default_negative_prompt() # Clamp values variants = max(1, min(4, variants)) width = max(512, min(1280, width)) height = max(512, min(1280, height)) # Count existing images (non-blocking peek) existing_count = self._get_accumulated_count(msg_key) # Status update if __event_emitter__: status_msg = ( f"Generating {variants} image{'s' if variants > 1 else ''} with {model}" ) if existing_count > 0: status_msg += f" (adding to {existing_count} existing)" if safe_mode: status_msg += " [SFW]" await __event_emitter__( { "type": "status", "data": {"description": f"{status_msg}...", "done": False}, } ) # Build Venice request payload = { "model": model, "prompt": prompt, "width": width, "height": height, "safe_mode": safe_mode, "hide_watermark": hide_watermark, "return_binary": False, "variants": variants, } if effective_negative_prompt: payload["negative_prompt"] = effective_negative_prompt if style_preset: payload["style_preset"] = style_preset # Generate images - with retry for unsupported params retried = False dropped_params = [] try: async with httpx.AsyncClient( timeout=float(self.valves.GENERATION_TIMEOUT) ) as client: response = await client.post( "https://api.venice.ai/api/v1/image/generate", headers={ "Authorization": f"Bearer {venice_key}", "Content-Type": "application/json", }, json=payload, ) # Check for parameter errors and retry without optional params if response.status_code in (400, 422) and not retried: error_text = response.text.lower() # Check if it's a style/parameter support issue if any( kw in error_text for kw in ("style", "unsupported", "invalid", "parameter") ): # Retry without style_preset and negative_prompt if style_preset: del payload["style_preset"] dropped_params.append("style_preset") if effective_negative_prompt: del payload["negative_prompt"] dropped_params.append("negative_prompt") if dropped_params: retried = True response = await client.post( "https://api.venice.ai/api/v1/image/generate", headers={ "Authorization": f"Bearer {venice_key}", "Content-Type": "application/json", }, json=payload, ) response.raise_for_status() result = response.json() except httpx.HTTPStatusError as e: if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) return f"Generate Image\nStatus: {e.response.status_code}\nError: Venice API error: {e.response.text[:200]}" except httpx.TimeoutException: if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) return f"Generate Image\nStatus: 408\nError: Generation timed out after {self.valves.GENERATION_TIMEOUT}s" except Exception as e: if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) return f"Generate Image\nStatus: 0\nError: {type(e).__name__}: {e}" images = result.get("images", []) if not images: if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) return "Generate Image\nStatus: 0\nError: No images returned from Venice.ai" # Update status if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Uploading {len(images)} image{'s' if len(images) > 1 else ''}...", "done": False, }, } ) # Get context for metadata chat_id = __metadata__.get("chat_id") if __metadata__ else None message_id = __metadata__.get("message_id") if __metadata__ else None # Upload each image to Files API uploaded_files = [] errors = [] for i, image_b64 in enumerate(images): timestamp = int(time.time() * 1000) # ms precision for uniqueness filename = f"venice_{model}_{timestamp}_{i+1}.webp" # Build metadata - chat_id/message_id at top level for consistency file_metadata = { "name": filename, "content_type": "image/webp", "data": { "model": model, "prompt": prompt, "negative_prompt": effective_negative_prompt, "style_preset": style_preset, "width": width, "height": height, "variant": i + 1, "total_variants": len(images), "safe_mode": safe_mode, "hide_watermark": hide_watermark, }, } # Associations at top level (becomes meta.data.chat_id, not meta.data.data.chat_id) if chat_id: file_metadata["chat_id"] = chat_id if message_id: file_metadata["message_id"] = message_id file_id, error = await self._upload_image( image_b64, filename, file_metadata, "image/webp", __request__ ) if file_id: uploaded_files.append( { "type": "image", "url": f"/api/v1/files/{file_id}/content", } ) else: errors.append(f"Variant {i+1}: {error}") # Accumulate files and emit ALL accumulated files for this message if uploaded_files: await self._accumulate_files(msg_key, uploaded_files, __event_emitter__) # Get final count after accumulation final_count = self._get_accumulated_count(msg_key) # Clear status if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Done ({final_count} image{'s' if final_count != 1 else ''} total)", "done": True, }, } ) # Build response with file references for upscale/edit parts = [ "Generate Image", "Status: 200", "", f"Generated {len(uploaded_files)} image(s) for: {prompt[:100]}{'...' if len(prompt) > 100 else ''}", f"Model: {model} | Size: {width}x{height}", ] # Show effective settings settings_parts = [] if safe_mode: settings_parts.append("SFW") if hide_watermark: settings_parts.append("No watermark") if settings_parts: parts.append(f"Settings: {', '.join(settings_parts)}") # Include file URLs so subsequent upscale/edit calls know what to reference if uploaded_files: parts.append("") parts.append("Files (use these URLs for upscale_image or edit_image):") for i, f in enumerate(uploaded_files): parts.append(f" [{i+1}] {f['url']}") if dropped_params: parts.append( f"Note: {model} doesn't support: {', '.join(dropped_params)} (ignored)" ) if final_count > len(uploaded_files): parts.append(f"Total images in message: {final_count}") if errors: parts.append("") parts.append("Warnings:") for e in errors: parts.append(f" - {e}") return "\n".join(parts) # ==================== Upscale ==================== async def upscale_image( self, image: str, scale: int = 2, enhance: bool = False, enhance_creativity: float = 0.5, enhance_prompt: Optional[str] = None, __request__=None, __user__: dict = None, __metadata__: dict = None, __files__: list = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ Upscale an image to higher resolution, optionally with AI enhancement. :param image: Image reference - simplest options: - "1" or "last" - Your most recently generated/uploaded image - "2" - Second most recent image - Or full URL from generate_image output :param scale: Scale factor 1-4 (default 2). Use 1 with enhance=True for enhancement only :param enhance: Apply AI enhancement during upscaling (adds detail/clarity) :param enhance_creativity: How much AI can modify the image 0.0-1.0 (default 0.5). Higher = more changes :param enhance_prompt: Style hint for enhancement like "sharp", "cinematic", "vibrant" (optional) :return: Upscaled image result """ # Check API key venice_key = self._get_venice_key() if not venice_key: return "Upscale Image\nStatus: 0\nError: Venice.ai API key not configured." # Validate parameters scale = max(1, min(4, scale)) if scale == 1 and not enhance: return "Upscale Image\nStatus: 0\nError: scale=1 requires enhance=True (enhancement only mode)" enhance_creativity = max(0.0, min(1.0, enhance_creativity)) # Get message context for accumulation msg_key = self._get_message_key(__metadata__) existing_count = self._get_accumulated_count(msg_key) # Status update if __event_emitter__: mode = "Enhancing" if scale == 1 else f"Upscaling {scale}x" if enhance and scale > 1: mode += " + enhancing" await __event_emitter__( {"type": "status", "data": {"description": f"{mode}...", "done": False}} ) # Fetch source image image_bytes, error = await self._get_image_bytes( image, __request__, __files__, __metadata__ ) if error: if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) return ( f"Upscale Image\nStatus: 0\nError: Failed to get source image: {error}" ) # Build request payload = { "image": base64.b64encode(image_bytes).decode("utf-8"), "scale": scale, } if enhance: payload["enhance"] = True payload["enhanceCreativity"] = enhance_creativity if enhance_prompt: payload["enhancePrompt"] = enhance_prompt[:1500] # Call Venice API try: async with httpx.AsyncClient( timeout=float(self.valves.GENERATION_TIMEOUT) ) as client: response = await client.post( "https://api.venice.ai/api/v1/image/upscale", headers={ "Authorization": f"Bearer {venice_key}", "Content-Type": "application/json", }, json=payload, ) response.raise_for_status() # Parse response (handles both binary and JSON) result_bytes, parse_error = self._parse_venice_image_response(response) if parse_error: if __event_emitter__: await __event_emitter__( {"type": "status", "data": {"done": True}} ) return f"Upscale Image\nStatus: 0\nError: {parse_error}" except httpx.HTTPStatusError as e: if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) return f"Upscale Image\nStatus: {e.response.status_code}\nError: {e.response.text[:200]}" except Exception as e: if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) return f"Upscale Image\nStatus: 0\nError: {type(e).__name__}: {e}" # Upload result if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Uploading...", "done": False}, } ) chat_id = __metadata__.get("chat_id") if __metadata__ else None message_id = __metadata__.get("message_id") if __metadata__ else None timestamp = int(time.time() * 1000) filename = f"venice_upscale_{scale}x_{timestamp}.png" file_metadata = { "name": filename, "content_type": "image/png", "data": { "operation": "upscale", "scale": scale, "enhance": enhance, "enhance_creativity": enhance_creativity if enhance else None, "enhance_prompt": enhance_prompt if enhance else None, "source_image": image[:100], }, } # Associations at top level (becomes meta.data.chat_id) if chat_id: file_metadata["chat_id"] = chat_id if message_id: file_metadata["message_id"] = message_id file_id, upload_error = await self._upload_image( result_bytes, filename, file_metadata, "image/png", __request__ ) if not file_id: if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) return f"Upscale Image\nStatus: 0\nError: Upload failed - {upload_error}" # Accumulate and emit all files new_file = {"type": "image", "url": f"/api/v1/files/{file_id}/content"} await self._accumulate_files(msg_key, [new_file], __event_emitter__) final_count = self._get_accumulated_count(msg_key) # Clear status if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Done ({final_count} image{'s' if final_count != 1 else ''} total)", "done": True, }, } ) # Build response parts = [ "Upscale Image", "Status: 200", "", f"Upscaled {scale}x" + (" with enhancement" if enhance else ""), "", f"File (use for further upscale/edit): {new_file['url']}", ] return "\n".join(parts) # ==================== Edit ==================== async def edit_image( self, image: str, prompt: str, __request__=None, __user__: dict = None, __metadata__: dict = None, __files__: list = None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ Edit an existing image using text instructions. Venice uses the Qwen-Image model for editing. Works best with short, clear instructions like: - "Remove the background" - "Change the sky to sunset" - "Make it black and white" - "Add snow to the scene" :param image: Image reference - simplest options: - "1" or "last" - Your most recently generated/uploaded image - "2" - Second most recent image - Or full URL from generate_image output :param prompt: Text instructions describing the edit to make (max 1500 chars) :return: Edited image result """ # Check API key venice_key = self._get_venice_key() if not venice_key: return "Edit Image\nStatus: 0\nError: Venice.ai API key not configured." # Validate prompt if not prompt or not prompt.strip(): return "Edit Image\nStatus: 0\nError: Edit prompt is required" prompt = prompt.strip()[:1500] # Get message context for accumulation msg_key = self._get_message_key(__metadata__) existing_count = self._get_accumulated_count(msg_key) # Status update if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Editing image...", "done": False}, } ) # Fetch source image image_bytes, error = await self._get_image_bytes( image, __request__, __files__, __metadata__ ) if error: if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) return f"Edit Image\nStatus: 0\nError: Failed to get source image: {error}" # Build request payload = { "image": base64.b64encode(image_bytes).decode("utf-8"), "prompt": prompt, } # Call Venice API try: async with httpx.AsyncClient( timeout=float(self.valves.GENERATION_TIMEOUT) ) as client: response = await client.post( "https://api.venice.ai/api/v1/image/edit", headers={ "Authorization": f"Bearer {venice_key}", "Content-Type": "application/json", }, json=payload, ) response.raise_for_status() # Parse response (handles both binary and JSON) result_bytes, parse_error = self._parse_venice_image_response(response) if parse_error: if __event_emitter__: await __event_emitter__( {"type": "status", "data": {"done": True}} ) return f"Edit Image\nStatus: 0\nError: {parse_error}" except httpx.HTTPStatusError as e: if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) return f"Edit Image\nStatus: {e.response.status_code}\nError: {e.response.text[:200]}" except Exception as e: if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) return f"Edit Image\nStatus: 0\nError: {type(e).__name__}: {e}" # Upload result if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Uploading...", "done": False}, } ) chat_id = __metadata__.get("chat_id") if __metadata__ else None message_id = __metadata__.get("message_id") if __metadata__ else None timestamp = int(time.time() * 1000) filename = f"venice_edit_{timestamp}.png" file_metadata = { "name": filename, "content_type": "image/png", "data": { "operation": "edit", "prompt": prompt, "source_image": image[:100], }, } # Associations at top level (becomes meta.data.chat_id) if chat_id: file_metadata["chat_id"] = chat_id if message_id: file_metadata["message_id"] = message_id file_id, upload_error = await self._upload_image( result_bytes, filename, file_metadata, "image/png", __request__ ) if not file_id: if __event_emitter__: await __event_emitter__({"type": "status", "data": {"done": True}}) return f"Edit Image\nStatus: 0\nError: Upload failed - {upload_error}" # Accumulate and emit all files new_file = {"type": "image", "url": f"/api/v1/files/{file_id}/content"} await self._accumulate_files(msg_key, [new_file], __event_emitter__) final_count = self._get_accumulated_count(msg_key) # Clear status if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Done ({final_count} image{'s' if final_count != 1 else ''} total)", "done": True, }, } ) # Build response parts = [ "Edit Image", "Status: 200", "", f"Applied edit: {prompt[:80]}{'...' if len(prompt) > 80 else ''}", "", f"File (use for further upscale/edit): {new_file['url']}", ] return "\n".join(parts)