Files
tools/venice/image.py
2026-01-14 10:26:10 +00:00

1170 lines
44 KiB
Python

"""
title: Venice.ai Image Tools
author: Jeff Smith + Claude
version: 1.6.0
license: MIT
required_open_webui_version: 0.6.0
requirements: httpx, pydantic
description: |
Venice.ai image generation, upscaling, and editing tools.
- generate_image: Create images from text prompts
- upscale_image: Increase resolution up to 4x with optional enhancement
- edit_image: Modify existing images with text instructions
All operations upload results to Open WebUI Files API for persistence
and attach images to chat via event emitter for inline display.
Re-entrant safe: Multiple concurrent calls accumulate images correctly.
v1.6.0: Added UserValves for SAFE_MODE and HIDE_WATERMARK with proper
admin/user override logic.
"""
import asyncio
import threading
import time
import base64
import io
from typing import Optional, Callable, Any, Dict, List
from pydantic import BaseModel, Field
import httpx
class Tools:
"""
Venice.ai image tools: generate, upscale, and edit.
All operations upload results to Open WebUI Files API and attach
images to chat messages for inline display.
Re-entrant: Multiple calls within the same message context accumulate
all images rather than overwriting each other.
"""
class Valves(BaseModel):
"""
Admin-controlled settings. These affect all users.
SAFE_MODE and HIDE_WATERMARK can be overridden per-user via UserValves.
"""
VENICE_API_KEY: str = Field(
default="",
description="Venice.ai API key (admin fallback if user has no key)",
)
DEFAULT_MODEL: str = Field(
default="z-image-turbo", description="Default image model"
)
SAFE_MODE: bool = Field(
default=True,
description="Admin: Require SFW content (user can also enable in UserValves)",
)
HIDE_WATERMARK: bool = Field(
default=False,
description="Admin: Hide watermark (user can also enable in UserValves)",
)
COOLDOWN_SECONDS: int = Field(
default=0,
description="Minimum seconds between generations per user (0 to disable)",
)
GENERATION_TIMEOUT: int = Field(
default=180, description="Timeout for image generation in seconds"
)
ACCUMULATOR_TTL: int = Field(
default=300, description="Seconds to keep accumulated files in memory"
)
class UserValves(BaseModel):
"""
User-specific settings. These can override or augment admin defaults.
For SAFE_MODE/HIDE_WATERMARK: Either admin OR user can enable.
- If admin sets SAFE_MODE=True, all users get SFW regardless of their setting
- If admin sets SAFE_MODE=False, users can still opt-in to SFW themselves
For API keys: User key takes priority, admin key is fallback.
"""
VENICE_API_KEY: str = Field(
default="", description="Your Venice.ai API key (overrides admin fallback)"
)
SAFE_MODE: bool = Field(
default=False,
description="Enable SFW content filtering (admin can also force SFW)",
)
HIDE_WATERMARK: bool = Field(
default=False, description="Hide Venice.ai watermark from generated images"
)
DEFAULT_MODEL: str = Field(
default="",
description="Your preferred image model (blank = use admin default)",
)
DEFAULT_NEGATIVE_PROMPT: str = Field(
default="",
description="Default negative prompt to exclude from all generations",
)
def __init__(self):
self.valves = self.Valves()
self.user_valves = self.UserValves()
self.citation = False
self._cooldowns: Dict[str, float] = {}
# Re-entrancy support: accumulate files per message context
self._message_files: Dict[str, Dict[str, Any]] = {}
# Single lock for all accumulator operations (avoids per-key race)
self._accumulator_lock: Optional[asyncio.Lock] = None
self._lock_init = threading.Lock() # Protects lazy init of asyncio lock
self._last_cleanup: float = 0.0
# ==================== Configuration Helpers ====================
def _is_safe_mode_enabled(self) -> bool:
"""
Safe mode is ON if Admin OR User has it enabled.
| Admin | User | Result |
|-------|------|--------|
| SFW | SFW | SFW | Both want safe
| SFW | NSFW | SFW | Admin wins (restricts)
| NSFW | NSFW | NSFW | Both allow NSFW
| NSFW | SFW | SFW | User opts in to safety
"""
return self.valves.SAFE_MODE or self.user_valves.SAFE_MODE
def _is_watermark_hidden(self) -> bool:
"""
Watermark is hidden if Admin OR User has it enabled.
Either party can opt to hide the watermark.
"""
return self.valves.HIDE_WATERMARK or self.user_valves.HIDE_WATERMARK
def _get_venice_key(self) -> str:
"""
Get Venice API key with UserValves priority.
User key first, then admin fallback, then empty (error handled by caller).
"""
if self.user_valves.VENICE_API_KEY:
return self.user_valves.VENICE_API_KEY
if self.valves.VENICE_API_KEY:
return self.valves.VENICE_API_KEY
return ""
def _get_default_model(self) -> str:
"""Get default model: user preference first, then admin default."""
if self.user_valves.DEFAULT_MODEL:
return self.user_valves.DEFAULT_MODEL
return self.valves.DEFAULT_MODEL
def _get_default_negative_prompt(self) -> Optional[str]:
"""Get user's default negative prompt if set."""
if self.user_valves.DEFAULT_NEGATIVE_PROMPT:
return self.user_valves.DEFAULT_NEGATIVE_PROMPT
return None
def _get_owui_config(self, __request__=None) -> tuple[str, dict]:
"""Get Open WebUI API config from request context."""
base_url = ""
headers = {}
if __request__:
base_url = str(getattr(__request__, "base_url", "") or "")
req_headers = getattr(__request__, "headers", {})
if req_headers:
auth = dict(req_headers).get("authorization", "")
if auth:
headers["Authorization"] = auth
return base_url.rstrip("/"), headers
def _get_message_key(self, __metadata__: dict = None) -> str:
"""Get unique key for message context (for file accumulation)."""
if __metadata__:
chat_id = __metadata__.get("chat_id", "")
message_id = __metadata__.get("message_id", "")
if message_id:
return f"{chat_id}:{message_id}"
# Fallback for missing context
return f"unknown:{int(time.time())}"
def _get_lock(self) -> asyncio.Lock:
"""Get accumulator lock (lazy init for compatibility)."""
if self._accumulator_lock is None:
with self._lock_init:
if self._accumulator_lock is None:
self._accumulator_lock = asyncio.Lock()
return self._accumulator_lock
def _parse_venice_image_response(
self, response
) -> tuple[Optional[bytes], Optional[str]]:
"""
Parse Venice API image response - handles both binary and JSON formats.
Returns (image_bytes, error_message).
"""
content_type = response.headers.get("content-type", "")
# Binary response (PNG, WebP, etc.)
if content_type.startswith("image/"):
return response.content, None
# JSON response with base64
if "application/json" in content_type:
try:
data = response.json()
# Check various possible field names
for field in ("image", "images", "data", "result"):
if field in data:
value = data[field]
# Single image as base64 string
if isinstance(value, str):
return base64.b64decode(value), None
# Array of images
if isinstance(value, list) and value:
return base64.b64decode(value[0]), None
# If we got JSON but no image field, return the error
if "error" in data:
return None, f"API error: {data.get('error')}"
return None, f"Unexpected JSON response keys: {list(data.keys())}"
except Exception as e:
return None, f"Failed to parse JSON response: {e}"
# Fallback: try as binary if it looks like image data
if len(response.content) > 1000:
return response.content, None
return (
None,
f"Unknown response format: {content_type}, length: {len(response.content)}",
)
async def _accumulate_files(
self,
key: str,
new_files: List[dict],
__event_emitter__: Callable[[dict], Any] = None,
):
"""
Add files to accumulator and emit ALL accumulated files.
Thread-safe: uses single lock for all operations.
"""
all_files = []
async with self._get_lock():
# Initialize if needed
if key not in self._message_files:
self._message_files[key] = {"files": [], "timestamp": time.time()}
# Add new files (copy dicts to avoid reference issues)
for f in new_files:
self._message_files[key]["files"].append(dict(f))
self._message_files[key]["timestamp"] = time.time()
# Copy list for emission (prevents mutation during async emit)
all_files = list(self._message_files[key]["files"])
# Periodic cleanup (every 60s, while holding lock)
now = time.time()
if now - self._last_cleanup > 60:
self._last_cleanup = now
ttl = self.valves.ACCUMULATOR_TTL
expired = [
k
for k, v in self._message_files.items()
if now - v.get("timestamp", 0) > ttl
]
for k in expired:
del self._message_files[k]
# Emit outside lock (event_emitter could be slow)
if all_files and __event_emitter__:
await __event_emitter__({"type": "files", "data": {"files": all_files}})
def _get_accumulated_count(self, key: str) -> int:
"""Get current count of accumulated files for a key (non-blocking peek)."""
entry = self._message_files.get(key)
if entry:
return len(entry.get("files", []))
return 0
# ==================== Files API ====================
async def _upload_image(
self,
image_data: bytes | str,
filename: str,
metadata: dict,
content_type: str = "image/webp",
__request__=None,
) -> tuple[Optional[str], Optional[str]]:
"""
Upload image to Open WebUI Files API.
Args:
image_data: Raw bytes or base64-encoded string
filename: Name for the uploaded file
metadata: File metadata dict
content_type: MIME type (image/webp, image/png, etc.)
__request__: Request context for auth
Returns (file_id, error_message).
"""
base_url, headers = self._get_owui_config(__request__)
if not base_url:
return None, "Could not determine Open WebUI URL"
# Handle both bytes and base64 input
if isinstance(image_data, str):
try:
image_bytes = base64.b64decode(image_data)
except Exception as e:
return None, f"Failed to decode image: {e}"
else:
image_bytes = image_data
# Prepare multipart upload
try:
async with httpx.AsyncClient(timeout=60.0) as client:
files = {"file": (filename, io.BytesIO(image_bytes), content_type)}
# Include metadata as form field
import json
data = {"metadata": json.dumps(metadata)}
response = await client.post(
f"{base_url}/api/v1/files/",
headers=headers,
files=files,
data=data,
params={"process": "false", "process_in_background": "false"},
)
if response.status_code == 200:
result = response.json()
return result.get("id"), None
else:
return (
None,
f"Upload failed: {response.status_code} - {response.text[:200]}",
)
except Exception as e:
return None, f"Upload error: {type(e).__name__}: {e}"
# ==================== Image Fetching ====================
async def _get_image_bytes(
self,
image_source: str,
__request__=None,
__files__: list = None,
__metadata__: dict = None,
) -> tuple[Optional[bytes], Optional[str]]:
"""
Fetch image bytes from various sources.
Handles:
- Index references: "1", "2", "[1]", "last", "previous" (from message context)
- Open WebUI file URLs (/api/v1/files/{id}/content)
- Full URLs (https://...)
- File IDs from __files__ context
Returns (image_bytes, error_message).
"""
base_url, headers = self._get_owui_config(__request__)
image_source = image_source.strip()
# Parse index references: "1", "[1]", "last", "previous", "latest"
index = None
if image_source.lower() in ("last", "latest", "previous", "recent"):
index = -1 # Last image
elif image_source.isdigit():
index = int(image_source) - 1 # 1-based to 0-based
elif image_source.startswith("[") and image_source.endswith("]"):
inner = image_source[1:-1].strip()
if inner.isdigit():
index = int(inner) - 1
# Resolve index to actual file URL
if index is not None:
if not base_url:
return None, "Could not determine Open WebUI URL"
# Build message chain: current message + parents
message_ids = set()
if __metadata__:
if __metadata__.get("message_id"):
message_ids.add(__metadata__["message_id"])
# Walk up parent chain
parent = __metadata__.get("parent_message")
while parent:
if parent.get("id"):
message_ids.add(parent["id"])
parent = parent.get("parent_message") # May not exist
# Also check parent_message_id directly
if __metadata__.get("parent_message_id"):
message_ids.add(__metadata__["parent_message_id"])
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(
f"{base_url}/api/v1/files/",
headers=headers,
params={"content": "false"},
)
if response.status_code != 200:
return None, f"Failed to list files: {response.status_code}"
files = response.json()
# Get images - prefer message context, fallback to all recent
context_images = []
all_images = []
for f in files:
meta = f.get("meta", {})
content_type = meta.get("content_type", "")
if not content_type.startswith("image/"):
continue
all_images.append(f)
# Check for message_id at both levels:
# - meta.data.message_id (new, simpler)
# - meta.data.data.message_id (old, nested)
inner_data = meta.get("data", {})
file_msg_id = None
if isinstance(inner_data, dict):
file_msg_id = inner_data.get(
"message_id"
) # Check simpler path first
if not file_msg_id:
nested_data = inner_data.get("data", {})
if isinstance(nested_data, dict):
file_msg_id = nested_data.get(
"message_id"
) # Fallback to nested
if file_msg_id and file_msg_id in message_ids:
context_images.append(f)
# Use context images if available, otherwise all images
user_images = context_images if context_images else all_images
# Sort by created_at (newest first)
user_images.sort(key=lambda f: f.get("created_at", 0), reverse=True)
if not user_images:
return None, "No images found"
# Handle index: -1 = last (newest), "1" = most recent, "2" = second most recent
if index == -1:
target_idx = 0 # Newest
else:
target_idx = index
if 0 <= target_idx < len(user_images):
file_id = user_images[target_idx].get("id")
image_source = f"/api/v1/files/{file_id}/content"
else:
return (
None,
f"Image [{index+1}] not found (found {len(user_images)} image{'s' if len(user_images) != 1 else ''})",
)
except Exception as e:
return None, f"Failed to resolve image index: {type(e).__name__}: {e}"
# Check if it's a reference to an uploaded file in current message
if __files__:
for f in __files__:
file_id = f.get("id", "")
file_url = f.get("url", "")
# Match by ID, URL, or partial match
if (
image_source == file_id
or image_source == file_url
or image_source in file_url
or file_id in image_source
):
image_source = file_url or f"/api/v1/files/{file_id}/content"
break
# Normalize URL
if image_source.startswith("/api/"):
if not base_url:
return None, "Could not determine Open WebUI URL for relative path"
fetch_url = f"{base_url}{image_source}"
elif image_source.startswith("http://") or image_source.startswith("https://"):
fetch_url = image_source
else:
# Assume it's a file ID
if not base_url:
return None, "Could not determine Open WebUI URL"
fetch_url = f"{base_url}/api/v1/files/{image_source}/content"
try:
async with httpx.AsyncClient(timeout=60.0) as client:
response = await client.get(fetch_url, headers=headers)
if response.status_code == 200:
return response.content, None
else:
return (
None,
f"Failed to fetch image: {response.status_code} from {fetch_url}",
)
except Exception as e:
return None, f"Fetch error: {type(e).__name__}: {e}"
# ==================== Generate ====================
async def generate_image(
self,
prompt: str,
model: Optional[str] = None,
width: int = 1024,
height: int = 1024,
negative_prompt: Optional[str] = None,
style_preset: Optional[str] = None,
variants: int = 1,
__request__=None,
__user__: dict = None,
__metadata__: dict = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
Generate image(s) using Venice.ai and upload to Open WebUI.
IMPORTANT: For multiple images with the SAME prompt, use the 'variants'
parameter (1-4) instead of calling this function multiple times.
Multiple calls with DIFFERENT prompts are fine and will accumulate correctly.
:param prompt: Text description of the image to generate
:param model: Image model (default: user pref or z-image-turbo). Use venice_info list_models("image") to see options
:param width: Image width 512-1280 pixels (default 1024)
:param height: Image height 512-1280 pixels (default 1024)
:param negative_prompt: What to avoid in the image (or uses user's default if set)
:param style_preset: Style preset (use venice_info list_styles to see options). Note: Some models like z-image-turbo don't support styles
:param variants: Number of variations 1-4 with same prompt (use this instead of multiple calls)
:return: Generated image details
"""
# Check API key
venice_key = self._get_venice_key()
if not venice_key:
return "Generate Image\nStatus: 0\nError: Venice.ai API key not configured. Set in UserValves or ask admin."
# Validate prompt
if not prompt or not prompt.strip():
return "Generate Image\nStatus: 0\nError: Prompt is required"
# Get message context for accumulation
msg_key = self._get_message_key(__metadata__)
# Check cooldown (skip if disabled or if this is a re-entrant call)
user_id = __user__.get("id", "default") if __user__ else "default"
cooldown = self.valves.COOLDOWN_SECONDS
if cooldown > 0:
now = time.time()
last_gen = self._cooldowns.get(user_id, 0)
is_reentrant = self._get_accumulated_count(msg_key) > 0
if not is_reentrant and now - last_gen < cooldown:
remaining = cooldown - (now - last_gen)
return f"Generate Image\nStatus: 429\nError: Rate limited. Wait {remaining:.1f}s. TIP: Use 'variants' parameter (1-4) for multiple images with the same prompt."
self._cooldowns[user_id] = now
# Get effective settings using helper methods
model = model or self._get_default_model()
safe_mode = self._is_safe_mode_enabled()
hide_watermark = self._is_watermark_hidden()
# Handle negative prompt: explicit > user default > none
effective_negative_prompt = negative_prompt
if effective_negative_prompt is None:
effective_negative_prompt = self._get_default_negative_prompt()
# Clamp values
variants = max(1, min(4, variants))
width = max(512, min(1280, width))
height = max(512, min(1280, height))
# Count existing images (non-blocking peek)
existing_count = self._get_accumulated_count(msg_key)
# Status update
if __event_emitter__:
status_msg = (
f"Generating {variants} image{'s' if variants > 1 else ''} with {model}"
)
if existing_count > 0:
status_msg += f" (adding to {existing_count} existing)"
if safe_mode:
status_msg += " [SFW]"
await __event_emitter__(
{
"type": "status",
"data": {"description": f"{status_msg}...", "done": False},
}
)
# Build Venice request
payload = {
"model": model,
"prompt": prompt,
"width": width,
"height": height,
"safe_mode": safe_mode,
"hide_watermark": hide_watermark,
"return_binary": False,
"variants": variants,
}
if effective_negative_prompt:
payload["negative_prompt"] = effective_negative_prompt
if style_preset:
payload["style_preset"] = style_preset
# Generate images - with retry for unsupported params
retried = False
dropped_params = []
try:
async with httpx.AsyncClient(
timeout=float(self.valves.GENERATION_TIMEOUT)
) as client:
response = await client.post(
"https://api.venice.ai/api/v1/image/generate",
headers={
"Authorization": f"Bearer {venice_key}",
"Content-Type": "application/json",
},
json=payload,
)
# Check for parameter errors and retry without optional params
if response.status_code in (400, 422) and not retried:
error_text = response.text.lower()
# Check if it's a style/parameter support issue
if any(
kw in error_text
for kw in ("style", "unsupported", "invalid", "parameter")
):
# Retry without style_preset and negative_prompt
if style_preset:
del payload["style_preset"]
dropped_params.append("style_preset")
if effective_negative_prompt:
del payload["negative_prompt"]
dropped_params.append("negative_prompt")
if dropped_params:
retried = True
response = await client.post(
"https://api.venice.ai/api/v1/image/generate",
headers={
"Authorization": f"Bearer {venice_key}",
"Content-Type": "application/json",
},
json=payload,
)
response.raise_for_status()
result = response.json()
except httpx.HTTPStatusError as e:
if __event_emitter__:
await __event_emitter__({"type": "status", "data": {"done": True}})
return f"Generate Image\nStatus: {e.response.status_code}\nError: Venice API error: {e.response.text[:200]}"
except httpx.TimeoutException:
if __event_emitter__:
await __event_emitter__({"type": "status", "data": {"done": True}})
return f"Generate Image\nStatus: 408\nError: Generation timed out after {self.valves.GENERATION_TIMEOUT}s"
except Exception as e:
if __event_emitter__:
await __event_emitter__({"type": "status", "data": {"done": True}})
return f"Generate Image\nStatus: 0\nError: {type(e).__name__}: {e}"
images = result.get("images", [])
if not images:
if __event_emitter__:
await __event_emitter__({"type": "status", "data": {"done": True}})
return "Generate Image\nStatus: 0\nError: No images returned from Venice.ai"
# Update status
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Uploading {len(images)} image{'s' if len(images) > 1 else ''}...",
"done": False,
},
}
)
# Get context for metadata
chat_id = __metadata__.get("chat_id") if __metadata__ else None
message_id = __metadata__.get("message_id") if __metadata__ else None
# Upload each image to Files API
uploaded_files = []
errors = []
for i, image_b64 in enumerate(images):
timestamp = int(time.time() * 1000) # ms precision for uniqueness
filename = f"venice_{model}_{timestamp}_{i+1}.webp"
# Build metadata - chat_id/message_id at top level for consistency
file_metadata = {
"name": filename,
"content_type": "image/webp",
"data": {
"model": model,
"prompt": prompt,
"negative_prompt": effective_negative_prompt,
"style_preset": style_preset,
"width": width,
"height": height,
"variant": i + 1,
"total_variants": len(images),
"safe_mode": safe_mode,
"hide_watermark": hide_watermark,
},
}
# Associations at top level (becomes meta.data.chat_id, not meta.data.data.chat_id)
if chat_id:
file_metadata["chat_id"] = chat_id
if message_id:
file_metadata["message_id"] = message_id
file_id, error = await self._upload_image(
image_b64, filename, file_metadata, "image/webp", __request__
)
if file_id:
uploaded_files.append(
{
"type": "image",
"url": f"/api/v1/files/{file_id}/content",
}
)
else:
errors.append(f"Variant {i+1}: {error}")
# Accumulate files and emit ALL accumulated files for this message
if uploaded_files:
await self._accumulate_files(msg_key, uploaded_files, __event_emitter__)
# Get final count after accumulation
final_count = self._get_accumulated_count(msg_key)
# Clear status
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Done ({final_count} image{'s' if final_count != 1 else ''} total)",
"done": True,
},
}
)
# Build response with file references for upscale/edit
parts = [
"Generate Image",
"Status: 200",
"",
f"Generated {len(uploaded_files)} image(s) for: {prompt[:100]}{'...' if len(prompt) > 100 else ''}",
f"Model: {model} | Size: {width}x{height}",
]
# Show effective settings
settings_parts = []
if safe_mode:
settings_parts.append("SFW")
if hide_watermark:
settings_parts.append("No watermark")
if settings_parts:
parts.append(f"Settings: {', '.join(settings_parts)}")
# Include file URLs so subsequent upscale/edit calls know what to reference
if uploaded_files:
parts.append("")
parts.append("Files (use these URLs for upscale_image or edit_image):")
for i, f in enumerate(uploaded_files):
parts.append(f" [{i+1}] {f['url']}")
if dropped_params:
parts.append(
f"Note: {model} doesn't support: {', '.join(dropped_params)} (ignored)"
)
if final_count > len(uploaded_files):
parts.append(f"Total images in message: {final_count}")
if errors:
parts.append("")
parts.append("Warnings:")
for e in errors:
parts.append(f" - {e}")
return "\n".join(parts)
# ==================== Upscale ====================
async def upscale_image(
self,
image: str,
scale: int = 2,
enhance: bool = False,
enhance_creativity: float = 0.5,
enhance_prompt: Optional[str] = None,
__request__=None,
__user__: dict = None,
__metadata__: dict = None,
__files__: list = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
Upscale an image to higher resolution, optionally with AI enhancement.
:param image: Image reference - simplest options:
- "1" or "last" - Your most recently generated/uploaded image
- "2" - Second most recent image
- Or full URL from generate_image output
:param scale: Scale factor 1-4 (default 2). Use 1 with enhance=True for enhancement only
:param enhance: Apply AI enhancement during upscaling (adds detail/clarity)
:param enhance_creativity: How much AI can modify the image 0.0-1.0 (default 0.5). Higher = more changes
:param enhance_prompt: Style hint for enhancement like "sharp", "cinematic", "vibrant" (optional)
:return: Upscaled image result
"""
# Check API key
venice_key = self._get_venice_key()
if not venice_key:
return "Upscale Image\nStatus: 0\nError: Venice.ai API key not configured."
# Validate parameters
scale = max(1, min(4, scale))
if scale == 1 and not enhance:
return "Upscale Image\nStatus: 0\nError: scale=1 requires enhance=True (enhancement only mode)"
enhance_creativity = max(0.0, min(1.0, enhance_creativity))
# Get message context for accumulation
msg_key = self._get_message_key(__metadata__)
existing_count = self._get_accumulated_count(msg_key)
# Status update
if __event_emitter__:
mode = "Enhancing" if scale == 1 else f"Upscaling {scale}x"
if enhance and scale > 1:
mode += " + enhancing"
await __event_emitter__(
{"type": "status", "data": {"description": f"{mode}...", "done": False}}
)
# Fetch source image
image_bytes, error = await self._get_image_bytes(
image, __request__, __files__, __metadata__
)
if error:
if __event_emitter__:
await __event_emitter__({"type": "status", "data": {"done": True}})
return (
f"Upscale Image\nStatus: 0\nError: Failed to get source image: {error}"
)
# Build request
payload = {
"image": base64.b64encode(image_bytes).decode("utf-8"),
"scale": scale,
}
if enhance:
payload["enhance"] = True
payload["enhanceCreativity"] = enhance_creativity
if enhance_prompt:
payload["enhancePrompt"] = enhance_prompt[:1500]
# Call Venice API
try:
async with httpx.AsyncClient(
timeout=float(self.valves.GENERATION_TIMEOUT)
) as client:
response = await client.post(
"https://api.venice.ai/api/v1/image/upscale",
headers={
"Authorization": f"Bearer {venice_key}",
"Content-Type": "application/json",
},
json=payload,
)
response.raise_for_status()
# Parse response (handles both binary and JSON)
result_bytes, parse_error = self._parse_venice_image_response(response)
if parse_error:
if __event_emitter__:
await __event_emitter__(
{"type": "status", "data": {"done": True}}
)
return f"Upscale Image\nStatus: 0\nError: {parse_error}"
except httpx.HTTPStatusError as e:
if __event_emitter__:
await __event_emitter__({"type": "status", "data": {"done": True}})
return f"Upscale Image\nStatus: {e.response.status_code}\nError: {e.response.text[:200]}"
except Exception as e:
if __event_emitter__:
await __event_emitter__({"type": "status", "data": {"done": True}})
return f"Upscale Image\nStatus: 0\nError: {type(e).__name__}: {e}"
# Upload result
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Uploading...", "done": False},
}
)
chat_id = __metadata__.get("chat_id") if __metadata__ else None
message_id = __metadata__.get("message_id") if __metadata__ else None
timestamp = int(time.time() * 1000)
filename = f"venice_upscale_{scale}x_{timestamp}.png"
file_metadata = {
"name": filename,
"content_type": "image/png",
"data": {
"operation": "upscale",
"scale": scale,
"enhance": enhance,
"enhance_creativity": enhance_creativity if enhance else None,
"enhance_prompt": enhance_prompt if enhance else None,
"source_image": image[:100],
},
}
# Associations at top level (becomes meta.data.chat_id)
if chat_id:
file_metadata["chat_id"] = chat_id
if message_id:
file_metadata["message_id"] = message_id
file_id, upload_error = await self._upload_image(
result_bytes, filename, file_metadata, "image/png", __request__
)
if not file_id:
if __event_emitter__:
await __event_emitter__({"type": "status", "data": {"done": True}})
return f"Upscale Image\nStatus: 0\nError: Upload failed - {upload_error}"
# Accumulate and emit all files
new_file = {"type": "image", "url": f"/api/v1/files/{file_id}/content"}
await self._accumulate_files(msg_key, [new_file], __event_emitter__)
final_count = self._get_accumulated_count(msg_key)
# Clear status
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Done ({final_count} image{'s' if final_count != 1 else ''} total)",
"done": True,
},
}
)
# Build response
parts = [
"Upscale Image",
"Status: 200",
"",
f"Upscaled {scale}x" + (" with enhancement" if enhance else ""),
"",
f"File (use for further upscale/edit): {new_file['url']}",
]
return "\n".join(parts)
# ==================== Edit ====================
async def edit_image(
self,
image: str,
prompt: str,
__request__=None,
__user__: dict = None,
__metadata__: dict = None,
__files__: list = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
Edit an existing image using text instructions.
Venice uses the Qwen-Image model for editing. Works best with
short, clear instructions like:
- "Remove the background"
- "Change the sky to sunset"
- "Make it black and white"
- "Add snow to the scene"
:param image: Image reference - simplest options:
- "1" or "last" - Your most recently generated/uploaded image
- "2" - Second most recent image
- Or full URL from generate_image output
:param prompt: Text instructions describing the edit to make (max 1500 chars)
:return: Edited image result
"""
# Check API key
venice_key = self._get_venice_key()
if not venice_key:
return "Edit Image\nStatus: 0\nError: Venice.ai API key not configured."
# Validate prompt
if not prompt or not prompt.strip():
return "Edit Image\nStatus: 0\nError: Edit prompt is required"
prompt = prompt.strip()[:1500]
# Get message context for accumulation
msg_key = self._get_message_key(__metadata__)
existing_count = self._get_accumulated_count(msg_key)
# Status update
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Editing image...", "done": False},
}
)
# Fetch source image
image_bytes, error = await self._get_image_bytes(
image, __request__, __files__, __metadata__
)
if error:
if __event_emitter__:
await __event_emitter__({"type": "status", "data": {"done": True}})
return f"Edit Image\nStatus: 0\nError: Failed to get source image: {error}"
# Build request
payload = {
"image": base64.b64encode(image_bytes).decode("utf-8"),
"prompt": prompt,
}
# Call Venice API
try:
async with httpx.AsyncClient(
timeout=float(self.valves.GENERATION_TIMEOUT)
) as client:
response = await client.post(
"https://api.venice.ai/api/v1/image/edit",
headers={
"Authorization": f"Bearer {venice_key}",
"Content-Type": "application/json",
},
json=payload,
)
response.raise_for_status()
# Parse response (handles both binary and JSON)
result_bytes, parse_error = self._parse_venice_image_response(response)
if parse_error:
if __event_emitter__:
await __event_emitter__(
{"type": "status", "data": {"done": True}}
)
return f"Edit Image\nStatus: 0\nError: {parse_error}"
except httpx.HTTPStatusError as e:
if __event_emitter__:
await __event_emitter__({"type": "status", "data": {"done": True}})
return f"Edit Image\nStatus: {e.response.status_code}\nError: {e.response.text[:200]}"
except Exception as e:
if __event_emitter__:
await __event_emitter__({"type": "status", "data": {"done": True}})
return f"Edit Image\nStatus: 0\nError: {type(e).__name__}: {e}"
# Upload result
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Uploading...", "done": False},
}
)
chat_id = __metadata__.get("chat_id") if __metadata__ else None
message_id = __metadata__.get("message_id") if __metadata__ else None
timestamp = int(time.time() * 1000)
filename = f"venice_edit_{timestamp}.png"
file_metadata = {
"name": filename,
"content_type": "image/png",
"data": {
"operation": "edit",
"prompt": prompt,
"source_image": image[:100],
},
}
# Associations at top level (becomes meta.data.chat_id)
if chat_id:
file_metadata["chat_id"] = chat_id
if message_id:
file_metadata["message_id"] = message_id
file_id, upload_error = await self._upload_image(
result_bytes, filename, file_metadata, "image/png", __request__
)
if not file_id:
if __event_emitter__:
await __event_emitter__({"type": "status", "data": {"done": True}})
return f"Edit Image\nStatus: 0\nError: Upload failed - {upload_error}"
# Accumulate and emit all files
new_file = {"type": "image", "url": f"/api/v1/files/{file_id}/content"}
await self._accumulate_files(msg_key, [new_file], __event_emitter__)
final_count = self._get_accumulated_count(msg_key)
# Clear status
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"description": f"Done ({final_count} image{'s' if final_count != 1 else ''} total)",
"done": True,
},
}
)
# Build response
parts = [
"Edit Image",
"Status: 200",
"",
f"Applied edit: {prompt[:80]}{'...' if len(prompt) > 80 else ''}",
"",
f"File (use for further upscale/edit): {new_file['url']}",
]
return "\n".join(parts)