diff --git a/venice/image.py b/venice/image.py new file mode 100644 index 0000000..5f1ebc7 --- /dev/null +++ b/venice/image.py @@ -0,0 +1,1169 @@ +""" +title: Venice.ai Image Tools +author: Jeff Smith + Claude +version: 1.6.0 +license: MIT +required_open_webui_version: 0.6.0 +requirements: httpx, pydantic +description: | + Venice.ai image generation, upscaling, and editing tools. + - generate_image: Create images from text prompts + - upscale_image: Increase resolution up to 4x with optional enhancement + - edit_image: Modify existing images with text instructions + + All operations upload results to Open WebUI Files API for persistence + and attach images to chat via event emitter for inline display. + + Re-entrant safe: Multiple concurrent calls accumulate images correctly. + + v1.6.0: Added UserValves for SAFE_MODE and HIDE_WATERMARK with proper + admin/user override logic. +""" + +import asyncio +import threading +import time +import base64 +import io +from typing import Optional, Callable, Any, Dict, List +from pydantic import BaseModel, Field +import httpx + + +class Tools: + """ + Venice.ai image tools: generate, upscale, and edit. + All operations upload results to Open WebUI Files API and attach + images to chat messages for inline display. + + Re-entrant: Multiple calls within the same message context accumulate + all images rather than overwriting each other. + """ + + class Valves(BaseModel): + """ + Admin-controlled settings. These affect all users. + SAFE_MODE and HIDE_WATERMARK can be overridden per-user via UserValves. + """ + + VENICE_API_KEY: str = Field( + default="", + description="Venice.ai API key (admin fallback if user has no key)", + ) + DEFAULT_MODEL: str = Field( + default="z-image-turbo", description="Default image model" + ) + SAFE_MODE: bool = Field( + default=True, + description="Admin: Require SFW content (user can also enable in UserValves)", + ) + HIDE_WATERMARK: bool = Field( + default=False, + description="Admin: Hide watermark (user can also enable in UserValves)", + ) + COOLDOWN_SECONDS: int = Field( + default=0, + description="Minimum seconds between generations per user (0 to disable)", + ) + GENERATION_TIMEOUT: int = Field( + default=180, description="Timeout for image generation in seconds" + ) + ACCUMULATOR_TTL: int = Field( + default=300, description="Seconds to keep accumulated files in memory" + ) + + class UserValves(BaseModel): + """ + User-specific settings. These can override or augment admin defaults. + + For SAFE_MODE/HIDE_WATERMARK: Either admin OR user can enable. + - If admin sets SAFE_MODE=True, all users get SFW regardless of their setting + - If admin sets SAFE_MODE=False, users can still opt-in to SFW themselves + + For API keys: User key takes priority, admin key is fallback. + """ + + VENICE_API_KEY: str = Field( + default="", description="Your Venice.ai API key (overrides admin fallback)" + ) + SAFE_MODE: bool = Field( + default=False, + description="Enable SFW content filtering (admin can also force SFW)", + ) + HIDE_WATERMARK: bool = Field( + default=False, description="Hide Venice.ai watermark from generated images" + ) + DEFAULT_MODEL: str = Field( + default="", + description="Your preferred image model (blank = use admin default)", + ) + DEFAULT_NEGATIVE_PROMPT: str = Field( + default="", + description="Default negative prompt to exclude from all generations", + ) + + def __init__(self): + self.valves = self.Valves() + self.user_valves = self.UserValves() + self.citation = False + self._cooldowns: Dict[str, float] = {} + # Re-entrancy support: accumulate files per message context + self._message_files: Dict[str, Dict[str, Any]] = {} + # Single lock for all accumulator operations (avoids per-key race) + self._accumulator_lock: Optional[asyncio.Lock] = None + self._lock_init = threading.Lock() # Protects lazy init of asyncio lock + self._last_cleanup: float = 0.0 + + # ==================== Configuration Helpers ==================== + + def _is_safe_mode_enabled(self) -> bool: + """ + Safe mode is ON if Admin OR User has it enabled. + + | Admin | User | Result | + |-------|------|--------| + | SFW | SFW | SFW | Both want safe + | SFW | NSFW | SFW | Admin wins (restricts) + | NSFW | NSFW | NSFW | Both allow NSFW + | NSFW | SFW | SFW | User opts in to safety + """ + return self.valves.SAFE_MODE or self.user_valves.SAFE_MODE + + def _is_watermark_hidden(self) -> bool: + """ + Watermark is hidden if Admin OR User has it enabled. + Either party can opt to hide the watermark. + """ + return self.valves.HIDE_WATERMARK or self.user_valves.HIDE_WATERMARK + + def _get_venice_key(self) -> str: + """ + Get Venice API key with UserValves priority. + User key first, then admin fallback, then empty (error handled by caller). + """ + if self.user_valves.VENICE_API_KEY: + return self.user_valves.VENICE_API_KEY + if self.valves.VENICE_API_KEY: + return self.valves.VENICE_API_KEY + return "" + + def _get_default_model(self) -> str: + """Get default model: user preference first, then admin default.""" + if self.user_valves.DEFAULT_MODEL: + return self.user_valves.DEFAULT_MODEL + return self.valves.DEFAULT_MODEL + + def _get_default_negative_prompt(self) -> Optional[str]: + """Get user's default negative prompt if set.""" + if self.user_valves.DEFAULT_NEGATIVE_PROMPT: + return self.user_valves.DEFAULT_NEGATIVE_PROMPT + return None + + def _get_owui_config(self, __request__=None) -> tuple[str, dict]: + """Get Open WebUI API config from request context.""" + base_url = "" + headers = {} + if __request__: + base_url = str(getattr(__request__, "base_url", "") or "") + req_headers = getattr(__request__, "headers", {}) + if req_headers: + auth = dict(req_headers).get("authorization", "") + if auth: + headers["Authorization"] = auth + return base_url.rstrip("/"), headers + + def _get_message_key(self, __metadata__: dict = None) -> str: + """Get unique key for message context (for file accumulation).""" + if __metadata__: + chat_id = __metadata__.get("chat_id", "") + message_id = __metadata__.get("message_id", "") + if message_id: + return f"{chat_id}:{message_id}" + # Fallback for missing context + return f"unknown:{int(time.time())}" + + def _get_lock(self) -> asyncio.Lock: + """Get accumulator lock (lazy init for compatibility).""" + if self._accumulator_lock is None: + with self._lock_init: + if self._accumulator_lock is None: + self._accumulator_lock = asyncio.Lock() + return self._accumulator_lock + + def _parse_venice_image_response( + self, response + ) -> tuple[Optional[bytes], Optional[str]]: + """ + Parse Venice API image response - handles both binary and JSON formats. + + Returns (image_bytes, error_message). + """ + content_type = response.headers.get("content-type", "") + + # Binary response (PNG, WebP, etc.) + if content_type.startswith("image/"): + return response.content, None + + # JSON response with base64 + if "application/json" in content_type: + try: + data = response.json() + # Check various possible field names + for field in ("image", "images", "data", "result"): + if field in data: + value = data[field] + # Single image as base64 string + if isinstance(value, str): + return base64.b64decode(value), None + # Array of images + if isinstance(value, list) and value: + return base64.b64decode(value[0]), None + # If we got JSON but no image field, return the error + if "error" in data: + return None, f"API error: {data.get('error')}" + return None, f"Unexpected JSON response keys: {list(data.keys())}" + except Exception as e: + return None, f"Failed to parse JSON response: {e}" + + # Fallback: try as binary if it looks like image data + if len(response.content) > 1000: + return response.content, None + + return ( + None, + f"Unknown response format: {content_type}, length: {len(response.content)}", + ) + + async def _accumulate_files( + self, + key: str, + new_files: List[dict], + __event_emitter__: Callable[[dict], Any] = None, + ): + """ + Add files to accumulator and emit ALL accumulated files. + Thread-safe: uses single lock for all operations. + """ + all_files = [] + + async with self._get_lock(): + # Initialize if needed + if key not in self._message_files: + self._message_files[key] = {"files": [], "timestamp": time.time()} + + # Add new files (copy dicts to avoid reference issues) + for f in new_files: + self._message_files[key]["files"].append(dict(f)) + self._message_files[key]["timestamp"] = time.time() + + # Copy list for emission (prevents mutation during async emit) + all_files = list(self._message_files[key]["files"]) + + # Periodic cleanup (every 60s, while holding lock) + now = time.time() + if now - self._last_cleanup > 60: + self._last_cleanup = now + ttl = self.valves.ACCUMULATOR_TTL + expired = [ + k + for k, v in self._message_files.items() + if now - v.get("timestamp", 0) > ttl + ] + for k in expired: + del self._message_files[k] + + # Emit outside lock (event_emitter could be slow) + if all_files and __event_emitter__: + await __event_emitter__({"type": "files", "data": {"files": all_files}}) + + def _get_accumulated_count(self, key: str) -> int: + """Get current count of accumulated files for a key (non-blocking peek).""" + entry = self._message_files.get(key) + if entry: + return len(entry.get("files", [])) + return 0 + + # ==================== Files API ==================== + + async def _upload_image( + self, + image_data: bytes | str, + filename: str, + metadata: dict, + content_type: str = "image/webp", + __request__=None, + ) -> tuple[Optional[str], Optional[str]]: + """ + Upload image to Open WebUI Files API. + + Args: + image_data: Raw bytes or base64-encoded string + filename: Name for the uploaded file + metadata: File metadata dict + content_type: MIME type (image/webp, image/png, etc.) + __request__: Request context for auth + + Returns (file_id, error_message). + """ + base_url, headers = self._get_owui_config(__request__) + + if not base_url: + return None, "Could not determine Open WebUI URL" + + # Handle both bytes and base64 input + if isinstance(image_data, str): + try: + image_bytes = base64.b64decode(image_data) + except Exception as e: + return None, f"Failed to decode image: {e}" + else: + image_bytes = image_data + + # Prepare multipart upload + try: + async with httpx.AsyncClient(timeout=60.0) as client: + files = {"file": (filename, io.BytesIO(image_bytes), content_type)} + + # Include metadata as form field + import json + + data = {"metadata": json.dumps(metadata)} + + response = await client.post( + f"{base_url}/api/v1/files/", + headers=headers, + files=files, + data=data, + params={"process": "false", "process_in_background": "false"}, + ) + + if response.status_code == 200: + result = response.json() + return result.get("id"), None + else: + return ( + None, + f"Upload failed: {response.status_code} - {response.text[:200]}", + ) + + except Exception as e: + return None, f"Upload error: {type(e).__name__}: {e}" + + # ==================== Image Fetching ==================== + + async def _get_image_bytes( + self, + image_source: str, + __request__=None, + __files__: list = None, + __metadata__: dict = None, + ) -> tuple[Optional[bytes], Optional[str]]: + """ + Fetch image bytes from various sources. + + Handles: + - Index references: "1", "2", "[1]", "last", "previous" (from message context) + - Open WebUI file URLs (/api/v1/files/{id}/content) + - Full URLs (https://...) + - File IDs from __files__ context + + Returns (image_bytes, error_message). + """ + base_url, headers = self._get_owui_config(__request__) + image_source = image_source.strip() + + # Parse index references: "1", "[1]", "last", "previous", "latest" + index = None + if image_source.lower() in ("last", "latest", "previous", "recent"): + index = -1 # Last image + elif image_source.isdigit(): + index = int(image_source) - 1 # 1-based to 0-based + elif image_source.startswith("[") and image_source.endswith("]"): + inner = image_source[1:-1].strip() + if inner.isdigit(): + index = int(inner) - 1 + + # Resolve index to actual file URL + if index is not None: + if not base_url: + return None, "Could not determine Open WebUI URL" + + # Build message chain: current message + parents + message_ids = set() + if __metadata__: + if __metadata__.get("message_id"): + message_ids.add(__metadata__["message_id"]) + # Walk up parent chain + parent = __metadata__.get("parent_message") + while parent: + if parent.get("id"): + message_ids.add(parent["id"]) + parent = parent.get("parent_message") # May not exist + # Also check parent_message_id directly + if __metadata__.get("parent_message_id"): + message_ids.add(__metadata__["parent_message_id"]) + + try: + async with httpx.AsyncClient(timeout=30.0) as client: + response = await client.get( + f"{base_url}/api/v1/files/", + headers=headers, + params={"content": "false"}, + ) + if response.status_code != 200: + return None, f"Failed to list files: {response.status_code}" + + files = response.json() + + # Get images - prefer message context, fallback to all recent + context_images = [] + all_images = [] + + for f in files: + meta = f.get("meta", {}) + content_type = meta.get("content_type", "") + + if not content_type.startswith("image/"): + continue + + all_images.append(f) + + # Check for message_id at both levels: + # - meta.data.message_id (new, simpler) + # - meta.data.data.message_id (old, nested) + inner_data = meta.get("data", {}) + file_msg_id = None + if isinstance(inner_data, dict): + file_msg_id = inner_data.get( + "message_id" + ) # Check simpler path first + if not file_msg_id: + nested_data = inner_data.get("data", {}) + if isinstance(nested_data, dict): + file_msg_id = nested_data.get( + "message_id" + ) # Fallback to nested + + if file_msg_id and file_msg_id in message_ids: + context_images.append(f) + + # Use context images if available, otherwise all images + user_images = context_images if context_images else all_images + + # Sort by created_at (newest first) + user_images.sort(key=lambda f: f.get("created_at", 0), reverse=True) + + if not user_images: + return None, "No images found" + + # Handle index: -1 = last (newest), "1" = most recent, "2" = second most recent + if index == -1: + target_idx = 0 # Newest + else: + target_idx = index + + if 0 <= target_idx < len(user_images): + file_id = user_images[target_idx].get("id") + image_source = f"/api/v1/files/{file_id}/content" + else: + return ( + None, + f"Image [{index+1}] not found (found {len(user_images)} image{'s' if len(user_images) != 1 else ''})", + ) + + except Exception as e: + return None, f"Failed to resolve image index: {type(e).__name__}: {e}" + + # Check if it's a reference to an uploaded file in current message + if __files__: + for f in __files__: + file_id = f.get("id", "") + file_url = f.get("url", "") + + # Match by ID, URL, or partial match + if ( + image_source == file_id + or image_source == file_url + or image_source in file_url + or file_id in image_source + ): + image_source = file_url or f"/api/v1/files/{file_id}/content" + break + + # Normalize URL + if image_source.startswith("/api/"): + if not base_url: + return None, "Could not determine Open WebUI URL for relative path" + fetch_url = f"{base_url}{image_source}" + elif image_source.startswith("http://") or image_source.startswith("https://"): + fetch_url = image_source + else: + # Assume it's a file ID + if not base_url: + return None, "Could not determine Open WebUI URL" + fetch_url = f"{base_url}/api/v1/files/{image_source}/content" + + try: + async with httpx.AsyncClient(timeout=60.0) as client: + response = await client.get(fetch_url, headers=headers) + if response.status_code == 200: + return response.content, None + else: + return ( + None, + f"Failed to fetch image: {response.status_code} from {fetch_url}", + ) + + except Exception as e: + return None, f"Fetch error: {type(e).__name__}: {e}" + + # ==================== Generate ==================== + + async def generate_image( + self, + prompt: str, + model: Optional[str] = None, + width: int = 1024, + height: int = 1024, + negative_prompt: Optional[str] = None, + style_preset: Optional[str] = None, + variants: int = 1, + __request__=None, + __user__: dict = None, + __metadata__: dict = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Generate image(s) using Venice.ai and upload to Open WebUI. + + IMPORTANT: For multiple images with the SAME prompt, use the 'variants' + parameter (1-4) instead of calling this function multiple times. + Multiple calls with DIFFERENT prompts are fine and will accumulate correctly. + + :param prompt: Text description of the image to generate + :param model: Image model (default: user pref or z-image-turbo). Use venice_info list_models("image") to see options + :param width: Image width 512-1280 pixels (default 1024) + :param height: Image height 512-1280 pixels (default 1024) + :param negative_prompt: What to avoid in the image (or uses user's default if set) + :param style_preset: Style preset (use venice_info list_styles to see options). Note: Some models like z-image-turbo don't support styles + :param variants: Number of variations 1-4 with same prompt (use this instead of multiple calls) + :return: Generated image details + """ + # Check API key + venice_key = self._get_venice_key() + if not venice_key: + return "Generate Image\nStatus: 0\nError: Venice.ai API key not configured. Set in UserValves or ask admin." + + # Validate prompt + if not prompt or not prompt.strip(): + return "Generate Image\nStatus: 0\nError: Prompt is required" + + # Get message context for accumulation + msg_key = self._get_message_key(__metadata__) + + # Check cooldown (skip if disabled or if this is a re-entrant call) + user_id = __user__.get("id", "default") if __user__ else "default" + cooldown = self.valves.COOLDOWN_SECONDS + + if cooldown > 0: + now = time.time() + last_gen = self._cooldowns.get(user_id, 0) + is_reentrant = self._get_accumulated_count(msg_key) > 0 + + if not is_reentrant and now - last_gen < cooldown: + remaining = cooldown - (now - last_gen) + return f"Generate Image\nStatus: 429\nError: Rate limited. Wait {remaining:.1f}s. TIP: Use 'variants' parameter (1-4) for multiple images with the same prompt." + + self._cooldowns[user_id] = now + + # Get effective settings using helper methods + model = model or self._get_default_model() + safe_mode = self._is_safe_mode_enabled() + hide_watermark = self._is_watermark_hidden() + + # Handle negative prompt: explicit > user default > none + effective_negative_prompt = negative_prompt + if effective_negative_prompt is None: + effective_negative_prompt = self._get_default_negative_prompt() + + # Clamp values + variants = max(1, min(4, variants)) + width = max(512, min(1280, width)) + height = max(512, min(1280, height)) + + # Count existing images (non-blocking peek) + existing_count = self._get_accumulated_count(msg_key) + + # Status update + if __event_emitter__: + status_msg = ( + f"Generating {variants} image{'s' if variants > 1 else ''} with {model}" + ) + if existing_count > 0: + status_msg += f" (adding to {existing_count} existing)" + if safe_mode: + status_msg += " [SFW]" + await __event_emitter__( + { + "type": "status", + "data": {"description": f"{status_msg}...", "done": False}, + } + ) + + # Build Venice request + payload = { + "model": model, + "prompt": prompt, + "width": width, + "height": height, + "safe_mode": safe_mode, + "hide_watermark": hide_watermark, + "return_binary": False, + "variants": variants, + } + + if effective_negative_prompt: + payload["negative_prompt"] = effective_negative_prompt + if style_preset: + payload["style_preset"] = style_preset + + # Generate images - with retry for unsupported params + retried = False + dropped_params = [] + + try: + async with httpx.AsyncClient( + timeout=float(self.valves.GENERATION_TIMEOUT) + ) as client: + response = await client.post( + "https://api.venice.ai/api/v1/image/generate", + headers={ + "Authorization": f"Bearer {venice_key}", + "Content-Type": "application/json", + }, + json=payload, + ) + + # Check for parameter errors and retry without optional params + if response.status_code in (400, 422) and not retried: + error_text = response.text.lower() + # Check if it's a style/parameter support issue + if any( + kw in error_text + for kw in ("style", "unsupported", "invalid", "parameter") + ): + # Retry without style_preset and negative_prompt + if style_preset: + del payload["style_preset"] + dropped_params.append("style_preset") + if effective_negative_prompt: + del payload["negative_prompt"] + dropped_params.append("negative_prompt") + + if dropped_params: + retried = True + response = await client.post( + "https://api.venice.ai/api/v1/image/generate", + headers={ + "Authorization": f"Bearer {venice_key}", + "Content-Type": "application/json", + }, + json=payload, + ) + + response.raise_for_status() + result = response.json() + + except httpx.HTTPStatusError as e: + if __event_emitter__: + await __event_emitter__({"type": "status", "data": {"done": True}}) + return f"Generate Image\nStatus: {e.response.status_code}\nError: Venice API error: {e.response.text[:200]}" + except httpx.TimeoutException: + if __event_emitter__: + await __event_emitter__({"type": "status", "data": {"done": True}}) + return f"Generate Image\nStatus: 408\nError: Generation timed out after {self.valves.GENERATION_TIMEOUT}s" + except Exception as e: + if __event_emitter__: + await __event_emitter__({"type": "status", "data": {"done": True}}) + return f"Generate Image\nStatus: 0\nError: {type(e).__name__}: {e}" + + images = result.get("images", []) + if not images: + if __event_emitter__: + await __event_emitter__({"type": "status", "data": {"done": True}}) + return "Generate Image\nStatus: 0\nError: No images returned from Venice.ai" + + # Update status + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Uploading {len(images)} image{'s' if len(images) > 1 else ''}...", + "done": False, + }, + } + ) + + # Get context for metadata + chat_id = __metadata__.get("chat_id") if __metadata__ else None + message_id = __metadata__.get("message_id") if __metadata__ else None + + # Upload each image to Files API + uploaded_files = [] + errors = [] + + for i, image_b64 in enumerate(images): + timestamp = int(time.time() * 1000) # ms precision for uniqueness + filename = f"venice_{model}_{timestamp}_{i+1}.webp" + + # Build metadata - chat_id/message_id at top level for consistency + file_metadata = { + "name": filename, + "content_type": "image/webp", + "data": { + "model": model, + "prompt": prompt, + "negative_prompt": effective_negative_prompt, + "style_preset": style_preset, + "width": width, + "height": height, + "variant": i + 1, + "total_variants": len(images), + "safe_mode": safe_mode, + "hide_watermark": hide_watermark, + }, + } + + # Associations at top level (becomes meta.data.chat_id, not meta.data.data.chat_id) + if chat_id: + file_metadata["chat_id"] = chat_id + if message_id: + file_metadata["message_id"] = message_id + + file_id, error = await self._upload_image( + image_b64, filename, file_metadata, "image/webp", __request__ + ) + + if file_id: + uploaded_files.append( + { + "type": "image", + "url": f"/api/v1/files/{file_id}/content", + } + ) + else: + errors.append(f"Variant {i+1}: {error}") + + # Accumulate files and emit ALL accumulated files for this message + if uploaded_files: + await self._accumulate_files(msg_key, uploaded_files, __event_emitter__) + + # Get final count after accumulation + final_count = self._get_accumulated_count(msg_key) + + # Clear status + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Done ({final_count} image{'s' if final_count != 1 else ''} total)", + "done": True, + }, + } + ) + + # Build response with file references for upscale/edit + parts = [ + "Generate Image", + "Status: 200", + "", + f"Generated {len(uploaded_files)} image(s) for: {prompt[:100]}{'...' if len(prompt) > 100 else ''}", + f"Model: {model} | Size: {width}x{height}", + ] + + # Show effective settings + settings_parts = [] + if safe_mode: + settings_parts.append("SFW") + if hide_watermark: + settings_parts.append("No watermark") + if settings_parts: + parts.append(f"Settings: {', '.join(settings_parts)}") + + # Include file URLs so subsequent upscale/edit calls know what to reference + if uploaded_files: + parts.append("") + parts.append("Files (use these URLs for upscale_image or edit_image):") + for i, f in enumerate(uploaded_files): + parts.append(f" [{i+1}] {f['url']}") + + if dropped_params: + parts.append( + f"Note: {model} doesn't support: {', '.join(dropped_params)} (ignored)" + ) + + if final_count > len(uploaded_files): + parts.append(f"Total images in message: {final_count}") + + if errors: + parts.append("") + parts.append("Warnings:") + for e in errors: + parts.append(f" - {e}") + + return "\n".join(parts) + + # ==================== Upscale ==================== + + async def upscale_image( + self, + image: str, + scale: int = 2, + enhance: bool = False, + enhance_creativity: float = 0.5, + enhance_prompt: Optional[str] = None, + __request__=None, + __user__: dict = None, + __metadata__: dict = None, + __files__: list = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Upscale an image to higher resolution, optionally with AI enhancement. + + :param image: Image reference - simplest options: + - "1" or "last" - Your most recently generated/uploaded image + - "2" - Second most recent image + - Or full URL from generate_image output + :param scale: Scale factor 1-4 (default 2). Use 1 with enhance=True for enhancement only + :param enhance: Apply AI enhancement during upscaling (adds detail/clarity) + :param enhance_creativity: How much AI can modify the image 0.0-1.0 (default 0.5). Higher = more changes + :param enhance_prompt: Style hint for enhancement like "sharp", "cinematic", "vibrant" (optional) + :return: Upscaled image result + """ + # Check API key + venice_key = self._get_venice_key() + if not venice_key: + return "Upscale Image\nStatus: 0\nError: Venice.ai API key not configured." + + # Validate parameters + scale = max(1, min(4, scale)) + if scale == 1 and not enhance: + return "Upscale Image\nStatus: 0\nError: scale=1 requires enhance=True (enhancement only mode)" + + enhance_creativity = max(0.0, min(1.0, enhance_creativity)) + + # Get message context for accumulation + msg_key = self._get_message_key(__metadata__) + existing_count = self._get_accumulated_count(msg_key) + + # Status update + if __event_emitter__: + mode = "Enhancing" if scale == 1 else f"Upscaling {scale}x" + if enhance and scale > 1: + mode += " + enhancing" + await __event_emitter__( + {"type": "status", "data": {"description": f"{mode}...", "done": False}} + ) + + # Fetch source image + image_bytes, error = await self._get_image_bytes( + image, __request__, __files__, __metadata__ + ) + if error: + if __event_emitter__: + await __event_emitter__({"type": "status", "data": {"done": True}}) + return ( + f"Upscale Image\nStatus: 0\nError: Failed to get source image: {error}" + ) + + # Build request + payload = { + "image": base64.b64encode(image_bytes).decode("utf-8"), + "scale": scale, + } + + if enhance: + payload["enhance"] = True + payload["enhanceCreativity"] = enhance_creativity + if enhance_prompt: + payload["enhancePrompt"] = enhance_prompt[:1500] + + # Call Venice API + try: + async with httpx.AsyncClient( + timeout=float(self.valves.GENERATION_TIMEOUT) + ) as client: + response = await client.post( + "https://api.venice.ai/api/v1/image/upscale", + headers={ + "Authorization": f"Bearer {venice_key}", + "Content-Type": "application/json", + }, + json=payload, + ) + response.raise_for_status() + + # Parse response (handles both binary and JSON) + result_bytes, parse_error = self._parse_venice_image_response(response) + if parse_error: + if __event_emitter__: + await __event_emitter__( + {"type": "status", "data": {"done": True}} + ) + return f"Upscale Image\nStatus: 0\nError: {parse_error}" + + except httpx.HTTPStatusError as e: + if __event_emitter__: + await __event_emitter__({"type": "status", "data": {"done": True}}) + return f"Upscale Image\nStatus: {e.response.status_code}\nError: {e.response.text[:200]}" + except Exception as e: + if __event_emitter__: + await __event_emitter__({"type": "status", "data": {"done": True}}) + return f"Upscale Image\nStatus: 0\nError: {type(e).__name__}: {e}" + + # Upload result + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Uploading...", "done": False}, + } + ) + + chat_id = __metadata__.get("chat_id") if __metadata__ else None + message_id = __metadata__.get("message_id") if __metadata__ else None + + timestamp = int(time.time() * 1000) + filename = f"venice_upscale_{scale}x_{timestamp}.png" + + file_metadata = { + "name": filename, + "content_type": "image/png", + "data": { + "operation": "upscale", + "scale": scale, + "enhance": enhance, + "enhance_creativity": enhance_creativity if enhance else None, + "enhance_prompt": enhance_prompt if enhance else None, + "source_image": image[:100], + }, + } + + # Associations at top level (becomes meta.data.chat_id) + if chat_id: + file_metadata["chat_id"] = chat_id + if message_id: + file_metadata["message_id"] = message_id + + file_id, upload_error = await self._upload_image( + result_bytes, filename, file_metadata, "image/png", __request__ + ) + + if not file_id: + if __event_emitter__: + await __event_emitter__({"type": "status", "data": {"done": True}}) + return f"Upscale Image\nStatus: 0\nError: Upload failed - {upload_error}" + + # Accumulate and emit all files + new_file = {"type": "image", "url": f"/api/v1/files/{file_id}/content"} + await self._accumulate_files(msg_key, [new_file], __event_emitter__) + + final_count = self._get_accumulated_count(msg_key) + + # Clear status + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Done ({final_count} image{'s' if final_count != 1 else ''} total)", + "done": True, + }, + } + ) + + # Build response + parts = [ + "Upscale Image", + "Status: 200", + "", + f"Upscaled {scale}x" + (" with enhancement" if enhance else ""), + "", + f"File (use for further upscale/edit): {new_file['url']}", + ] + + return "\n".join(parts) + + # ==================== Edit ==================== + + async def edit_image( + self, + image: str, + prompt: str, + __request__=None, + __user__: dict = None, + __metadata__: dict = None, + __files__: list = None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """ + Edit an existing image using text instructions. + + Venice uses the Qwen-Image model for editing. Works best with + short, clear instructions like: + - "Remove the background" + - "Change the sky to sunset" + - "Make it black and white" + - "Add snow to the scene" + + :param image: Image reference - simplest options: + - "1" or "last" - Your most recently generated/uploaded image + - "2" - Second most recent image + - Or full URL from generate_image output + :param prompt: Text instructions describing the edit to make (max 1500 chars) + :return: Edited image result + """ + # Check API key + venice_key = self._get_venice_key() + if not venice_key: + return "Edit Image\nStatus: 0\nError: Venice.ai API key not configured." + + # Validate prompt + if not prompt or not prompt.strip(): + return "Edit Image\nStatus: 0\nError: Edit prompt is required" + + prompt = prompt.strip()[:1500] + + # Get message context for accumulation + msg_key = self._get_message_key(__metadata__) + existing_count = self._get_accumulated_count(msg_key) + + # Status update + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Editing image...", "done": False}, + } + ) + + # Fetch source image + image_bytes, error = await self._get_image_bytes( + image, __request__, __files__, __metadata__ + ) + if error: + if __event_emitter__: + await __event_emitter__({"type": "status", "data": {"done": True}}) + return f"Edit Image\nStatus: 0\nError: Failed to get source image: {error}" + + # Build request + payload = { + "image": base64.b64encode(image_bytes).decode("utf-8"), + "prompt": prompt, + } + + # Call Venice API + try: + async with httpx.AsyncClient( + timeout=float(self.valves.GENERATION_TIMEOUT) + ) as client: + response = await client.post( + "https://api.venice.ai/api/v1/image/edit", + headers={ + "Authorization": f"Bearer {venice_key}", + "Content-Type": "application/json", + }, + json=payload, + ) + response.raise_for_status() + + # Parse response (handles both binary and JSON) + result_bytes, parse_error = self._parse_venice_image_response(response) + if parse_error: + if __event_emitter__: + await __event_emitter__( + {"type": "status", "data": {"done": True}} + ) + return f"Edit Image\nStatus: 0\nError: {parse_error}" + + except httpx.HTTPStatusError as e: + if __event_emitter__: + await __event_emitter__({"type": "status", "data": {"done": True}}) + return f"Edit Image\nStatus: {e.response.status_code}\nError: {e.response.text[:200]}" + except Exception as e: + if __event_emitter__: + await __event_emitter__({"type": "status", "data": {"done": True}}) + return f"Edit Image\nStatus: 0\nError: {type(e).__name__}: {e}" + + # Upload result + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Uploading...", "done": False}, + } + ) + + chat_id = __metadata__.get("chat_id") if __metadata__ else None + message_id = __metadata__.get("message_id") if __metadata__ else None + + timestamp = int(time.time() * 1000) + filename = f"venice_edit_{timestamp}.png" + + file_metadata = { + "name": filename, + "content_type": "image/png", + "data": { + "operation": "edit", + "prompt": prompt, + "source_image": image[:100], + }, + } + + # Associations at top level (becomes meta.data.chat_id) + if chat_id: + file_metadata["chat_id"] = chat_id + if message_id: + file_metadata["message_id"] = message_id + + file_id, upload_error = await self._upload_image( + result_bytes, filename, file_metadata, "image/png", __request__ + ) + + if not file_id: + if __event_emitter__: + await __event_emitter__({"type": "status", "data": {"done": True}}) + return f"Edit Image\nStatus: 0\nError: Upload failed - {upload_error}" + + # Accumulate and emit all files + new_file = {"type": "image", "url": f"/api/v1/files/{file_id}/content"} + await self._accumulate_files(msg_key, [new_file], __event_emitter__) + + final_count = self._get_accumulated_count(msg_key) + + # Clear status + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Done ({final_count} image{'s' if final_count != 1 else ''} total)", + "done": True, + }, + } + ) + + # Build response + parts = [ + "Edit Image", + "Status: 200", + "", + f"Applied edit: {prompt[:80]}{'...' if len(prompt) > 80 else ''}", + "", + f"File (use for further upscale/edit): {new_file['url']}", + ] + + return "\n".join(parts)