1170 lines
44 KiB
Python
1170 lines
44 KiB
Python
"""
|
|
title: Venice.ai Image Tools
|
|
author: Jeff Smith + Claude
|
|
version: 1.6.0
|
|
license: MIT
|
|
required_open_webui_version: 0.6.0
|
|
requirements: httpx, pydantic
|
|
description: |
|
|
Venice.ai image generation, upscaling, and editing tools.
|
|
- generate_image: Create images from text prompts
|
|
- upscale_image: Increase resolution up to 4x with optional enhancement
|
|
- edit_image: Modify existing images with text instructions
|
|
|
|
All operations upload results to Open WebUI Files API for persistence
|
|
and attach images to chat via event emitter for inline display.
|
|
|
|
Re-entrant safe: Multiple concurrent calls accumulate images correctly.
|
|
|
|
v1.6.0: Added UserValves for SAFE_MODE and HIDE_WATERMARK with proper
|
|
admin/user override logic.
|
|
"""
|
|
|
|
import asyncio
|
|
import threading
|
|
import time
|
|
import base64
|
|
import io
|
|
from typing import Optional, Callable, Any, Dict, List
|
|
from pydantic import BaseModel, Field
|
|
import httpx
|
|
|
|
|
|
class Tools:
|
|
"""
|
|
Venice.ai image tools: generate, upscale, and edit.
|
|
All operations upload results to Open WebUI Files API and attach
|
|
images to chat messages for inline display.
|
|
|
|
Re-entrant: Multiple calls within the same message context accumulate
|
|
all images rather than overwriting each other.
|
|
"""
|
|
|
|
class Valves(BaseModel):
|
|
"""
|
|
Admin-controlled settings. These affect all users.
|
|
SAFE_MODE and HIDE_WATERMARK can be overridden per-user via UserValves.
|
|
"""
|
|
|
|
VENICE_API_KEY: str = Field(
|
|
default="",
|
|
description="Venice.ai API key (admin fallback if user has no key)",
|
|
)
|
|
DEFAULT_MODEL: str = Field(
|
|
default="z-image-turbo", description="Default image model"
|
|
)
|
|
SAFE_MODE: bool = Field(
|
|
default=True,
|
|
description="Admin: Require SFW content (user can also enable in UserValves)",
|
|
)
|
|
HIDE_WATERMARK: bool = Field(
|
|
default=False,
|
|
description="Admin: Hide watermark (user can also enable in UserValves)",
|
|
)
|
|
COOLDOWN_SECONDS: int = Field(
|
|
default=0,
|
|
description="Minimum seconds between generations per user (0 to disable)",
|
|
)
|
|
GENERATION_TIMEOUT: int = Field(
|
|
default=180, description="Timeout for image generation in seconds"
|
|
)
|
|
ACCUMULATOR_TTL: int = Field(
|
|
default=300, description="Seconds to keep accumulated files in memory"
|
|
)
|
|
|
|
class UserValves(BaseModel):
|
|
"""
|
|
User-specific settings. These can override or augment admin defaults.
|
|
|
|
For SAFE_MODE/HIDE_WATERMARK: Either admin OR user can enable.
|
|
- If admin sets SAFE_MODE=True, all users get SFW regardless of their setting
|
|
- If admin sets SAFE_MODE=False, users can still opt-in to SFW themselves
|
|
|
|
For API keys: User key takes priority, admin key is fallback.
|
|
"""
|
|
|
|
VENICE_API_KEY: str = Field(
|
|
default="", description="Your Venice.ai API key (overrides admin fallback)"
|
|
)
|
|
SAFE_MODE: bool = Field(
|
|
default=False,
|
|
description="Enable SFW content filtering (admin can also force SFW)",
|
|
)
|
|
HIDE_WATERMARK: bool = Field(
|
|
default=False, description="Hide Venice.ai watermark from generated images"
|
|
)
|
|
DEFAULT_MODEL: str = Field(
|
|
default="",
|
|
description="Your preferred image model (blank = use admin default)",
|
|
)
|
|
DEFAULT_NEGATIVE_PROMPT: str = Field(
|
|
default="",
|
|
description="Default negative prompt to exclude from all generations",
|
|
)
|
|
|
|
def __init__(self):
|
|
self.valves = self.Valves()
|
|
self.user_valves = self.UserValves()
|
|
self.citation = False
|
|
self._cooldowns: Dict[str, float] = {}
|
|
# Re-entrancy support: accumulate files per message context
|
|
self._message_files: Dict[str, Dict[str, Any]] = {}
|
|
# Single lock for all accumulator operations (avoids per-key race)
|
|
self._accumulator_lock: Optional[asyncio.Lock] = None
|
|
self._lock_init = threading.Lock() # Protects lazy init of asyncio lock
|
|
self._last_cleanup: float = 0.0
|
|
|
|
# ==================== Configuration Helpers ====================
|
|
|
|
def _is_safe_mode_enabled(self) -> bool:
|
|
"""
|
|
Safe mode is ON if Admin OR User has it enabled.
|
|
|
|
| Admin | User | Result |
|
|
|-------|------|--------|
|
|
| SFW | SFW | SFW | Both want safe
|
|
| SFW | NSFW | SFW | Admin wins (restricts)
|
|
| NSFW | NSFW | NSFW | Both allow NSFW
|
|
| NSFW | SFW | SFW | User opts in to safety
|
|
"""
|
|
return self.valves.SAFE_MODE or self.user_valves.SAFE_MODE
|
|
|
|
def _is_watermark_hidden(self) -> bool:
|
|
"""
|
|
Watermark is hidden if Admin OR User has it enabled.
|
|
Either party can opt to hide the watermark.
|
|
"""
|
|
return self.valves.HIDE_WATERMARK or self.user_valves.HIDE_WATERMARK
|
|
|
|
def _get_venice_key(self) -> str:
|
|
"""
|
|
Get Venice API key with UserValves priority.
|
|
User key first, then admin fallback, then empty (error handled by caller).
|
|
"""
|
|
if self.user_valves.VENICE_API_KEY:
|
|
return self.user_valves.VENICE_API_KEY
|
|
if self.valves.VENICE_API_KEY:
|
|
return self.valves.VENICE_API_KEY
|
|
return ""
|
|
|
|
def _get_default_model(self) -> str:
|
|
"""Get default model: user preference first, then admin default."""
|
|
if self.user_valves.DEFAULT_MODEL:
|
|
return self.user_valves.DEFAULT_MODEL
|
|
return self.valves.DEFAULT_MODEL
|
|
|
|
def _get_default_negative_prompt(self) -> Optional[str]:
|
|
"""Get user's default negative prompt if set."""
|
|
if self.user_valves.DEFAULT_NEGATIVE_PROMPT:
|
|
return self.user_valves.DEFAULT_NEGATIVE_PROMPT
|
|
return None
|
|
|
|
def _get_owui_config(self, __request__=None) -> tuple[str, dict]:
|
|
"""Get Open WebUI API config from request context."""
|
|
base_url = ""
|
|
headers = {}
|
|
if __request__:
|
|
base_url = str(getattr(__request__, "base_url", "") or "")
|
|
req_headers = getattr(__request__, "headers", {})
|
|
if req_headers:
|
|
auth = dict(req_headers).get("authorization", "")
|
|
if auth:
|
|
headers["Authorization"] = auth
|
|
return base_url.rstrip("/"), headers
|
|
|
|
def _get_message_key(self, __metadata__: dict = None) -> str:
|
|
"""Get unique key for message context (for file accumulation)."""
|
|
if __metadata__:
|
|
chat_id = __metadata__.get("chat_id", "")
|
|
message_id = __metadata__.get("message_id", "")
|
|
if message_id:
|
|
return f"{chat_id}:{message_id}"
|
|
# Fallback for missing context
|
|
return f"unknown:{int(time.time())}"
|
|
|
|
def _get_lock(self) -> asyncio.Lock:
|
|
"""Get accumulator lock (lazy init for compatibility)."""
|
|
if self._accumulator_lock is None:
|
|
with self._lock_init:
|
|
if self._accumulator_lock is None:
|
|
self._accumulator_lock = asyncio.Lock()
|
|
return self._accumulator_lock
|
|
|
|
def _parse_venice_image_response(
|
|
self, response
|
|
) -> tuple[Optional[bytes], Optional[str]]:
|
|
"""
|
|
Parse Venice API image response - handles both binary and JSON formats.
|
|
|
|
Returns (image_bytes, error_message).
|
|
"""
|
|
content_type = response.headers.get("content-type", "")
|
|
|
|
# Binary response (PNG, WebP, etc.)
|
|
if content_type.startswith("image/"):
|
|
return response.content, None
|
|
|
|
# JSON response with base64
|
|
if "application/json" in content_type:
|
|
try:
|
|
data = response.json()
|
|
# Check various possible field names
|
|
for field in ("image", "images", "data", "result"):
|
|
if field in data:
|
|
value = data[field]
|
|
# Single image as base64 string
|
|
if isinstance(value, str):
|
|
return base64.b64decode(value), None
|
|
# Array of images
|
|
if isinstance(value, list) and value:
|
|
return base64.b64decode(value[0]), None
|
|
# If we got JSON but no image field, return the error
|
|
if "error" in data:
|
|
return None, f"API error: {data.get('error')}"
|
|
return None, f"Unexpected JSON response keys: {list(data.keys())}"
|
|
except Exception as e:
|
|
return None, f"Failed to parse JSON response: {e}"
|
|
|
|
# Fallback: try as binary if it looks like image data
|
|
if len(response.content) > 1000:
|
|
return response.content, None
|
|
|
|
return (
|
|
None,
|
|
f"Unknown response format: {content_type}, length: {len(response.content)}",
|
|
)
|
|
|
|
async def _accumulate_files(
|
|
self,
|
|
key: str,
|
|
new_files: List[dict],
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
):
|
|
"""
|
|
Add files to accumulator and emit ALL accumulated files.
|
|
Thread-safe: uses single lock for all operations.
|
|
"""
|
|
all_files = []
|
|
|
|
async with self._get_lock():
|
|
# Initialize if needed
|
|
if key not in self._message_files:
|
|
self._message_files[key] = {"files": [], "timestamp": time.time()}
|
|
|
|
# Add new files (copy dicts to avoid reference issues)
|
|
for f in new_files:
|
|
self._message_files[key]["files"].append(dict(f))
|
|
self._message_files[key]["timestamp"] = time.time()
|
|
|
|
# Copy list for emission (prevents mutation during async emit)
|
|
all_files = list(self._message_files[key]["files"])
|
|
|
|
# Periodic cleanup (every 60s, while holding lock)
|
|
now = time.time()
|
|
if now - self._last_cleanup > 60:
|
|
self._last_cleanup = now
|
|
ttl = self.valves.ACCUMULATOR_TTL
|
|
expired = [
|
|
k
|
|
for k, v in self._message_files.items()
|
|
if now - v.get("timestamp", 0) > ttl
|
|
]
|
|
for k in expired:
|
|
del self._message_files[k]
|
|
|
|
# Emit outside lock (event_emitter could be slow)
|
|
if all_files and __event_emitter__:
|
|
await __event_emitter__({"type": "files", "data": {"files": all_files}})
|
|
|
|
def _get_accumulated_count(self, key: str) -> int:
|
|
"""Get current count of accumulated files for a key (non-blocking peek)."""
|
|
entry = self._message_files.get(key)
|
|
if entry:
|
|
return len(entry.get("files", []))
|
|
return 0
|
|
|
|
# ==================== Files API ====================
|
|
|
|
async def _upload_image(
|
|
self,
|
|
image_data: bytes | str,
|
|
filename: str,
|
|
metadata: dict,
|
|
content_type: str = "image/webp",
|
|
__request__=None,
|
|
) -> tuple[Optional[str], Optional[str]]:
|
|
"""
|
|
Upload image to Open WebUI Files API.
|
|
|
|
Args:
|
|
image_data: Raw bytes or base64-encoded string
|
|
filename: Name for the uploaded file
|
|
metadata: File metadata dict
|
|
content_type: MIME type (image/webp, image/png, etc.)
|
|
__request__: Request context for auth
|
|
|
|
Returns (file_id, error_message).
|
|
"""
|
|
base_url, headers = self._get_owui_config(__request__)
|
|
|
|
if not base_url:
|
|
return None, "Could not determine Open WebUI URL"
|
|
|
|
# Handle both bytes and base64 input
|
|
if isinstance(image_data, str):
|
|
try:
|
|
image_bytes = base64.b64decode(image_data)
|
|
except Exception as e:
|
|
return None, f"Failed to decode image: {e}"
|
|
else:
|
|
image_bytes = image_data
|
|
|
|
# Prepare multipart upload
|
|
try:
|
|
async with httpx.AsyncClient(timeout=60.0) as client:
|
|
files = {"file": (filename, io.BytesIO(image_bytes), content_type)}
|
|
|
|
# Include metadata as form field
|
|
import json
|
|
|
|
data = {"metadata": json.dumps(metadata)}
|
|
|
|
response = await client.post(
|
|
f"{base_url}/api/v1/files/",
|
|
headers=headers,
|
|
files=files,
|
|
data=data,
|
|
params={"process": "false", "process_in_background": "false"},
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
return result.get("id"), None
|
|
else:
|
|
return (
|
|
None,
|
|
f"Upload failed: {response.status_code} - {response.text[:200]}",
|
|
)
|
|
|
|
except Exception as e:
|
|
return None, f"Upload error: {type(e).__name__}: {e}"
|
|
|
|
# ==================== Image Fetching ====================
|
|
|
|
async def _get_image_bytes(
|
|
self,
|
|
image_source: str,
|
|
__request__=None,
|
|
__files__: list = None,
|
|
__metadata__: dict = None,
|
|
) -> tuple[Optional[bytes], Optional[str]]:
|
|
"""
|
|
Fetch image bytes from various sources.
|
|
|
|
Handles:
|
|
- Index references: "1", "2", "[1]", "last", "previous" (from message context)
|
|
- Open WebUI file URLs (/api/v1/files/{id}/content)
|
|
- Full URLs (https://...)
|
|
- File IDs from __files__ context
|
|
|
|
Returns (image_bytes, error_message).
|
|
"""
|
|
base_url, headers = self._get_owui_config(__request__)
|
|
image_source = image_source.strip()
|
|
|
|
# Parse index references: "1", "[1]", "last", "previous", "latest"
|
|
index = None
|
|
if image_source.lower() in ("last", "latest", "previous", "recent"):
|
|
index = -1 # Last image
|
|
elif image_source.isdigit():
|
|
index = int(image_source) - 1 # 1-based to 0-based
|
|
elif image_source.startswith("[") and image_source.endswith("]"):
|
|
inner = image_source[1:-1].strip()
|
|
if inner.isdigit():
|
|
index = int(inner) - 1
|
|
|
|
# Resolve index to actual file URL
|
|
if index is not None:
|
|
if not base_url:
|
|
return None, "Could not determine Open WebUI URL"
|
|
|
|
# Build message chain: current message + parents
|
|
message_ids = set()
|
|
if __metadata__:
|
|
if __metadata__.get("message_id"):
|
|
message_ids.add(__metadata__["message_id"])
|
|
# Walk up parent chain
|
|
parent = __metadata__.get("parent_message")
|
|
while parent:
|
|
if parent.get("id"):
|
|
message_ids.add(parent["id"])
|
|
parent = parent.get("parent_message") # May not exist
|
|
# Also check parent_message_id directly
|
|
if __metadata__.get("parent_message_id"):
|
|
message_ids.add(__metadata__["parent_message_id"])
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=30.0) as client:
|
|
response = await client.get(
|
|
f"{base_url}/api/v1/files/",
|
|
headers=headers,
|
|
params={"content": "false"},
|
|
)
|
|
if response.status_code != 200:
|
|
return None, f"Failed to list files: {response.status_code}"
|
|
|
|
files = response.json()
|
|
|
|
# Get images - prefer message context, fallback to all recent
|
|
context_images = []
|
|
all_images = []
|
|
|
|
for f in files:
|
|
meta = f.get("meta", {})
|
|
content_type = meta.get("content_type", "")
|
|
|
|
if not content_type.startswith("image/"):
|
|
continue
|
|
|
|
all_images.append(f)
|
|
|
|
# Check for message_id at both levels:
|
|
# - meta.data.message_id (new, simpler)
|
|
# - meta.data.data.message_id (old, nested)
|
|
inner_data = meta.get("data", {})
|
|
file_msg_id = None
|
|
if isinstance(inner_data, dict):
|
|
file_msg_id = inner_data.get(
|
|
"message_id"
|
|
) # Check simpler path first
|
|
if not file_msg_id:
|
|
nested_data = inner_data.get("data", {})
|
|
if isinstance(nested_data, dict):
|
|
file_msg_id = nested_data.get(
|
|
"message_id"
|
|
) # Fallback to nested
|
|
|
|
if file_msg_id and file_msg_id in message_ids:
|
|
context_images.append(f)
|
|
|
|
# Use context images if available, otherwise all images
|
|
user_images = context_images if context_images else all_images
|
|
|
|
# Sort by created_at (newest first)
|
|
user_images.sort(key=lambda f: f.get("created_at", 0), reverse=True)
|
|
|
|
if not user_images:
|
|
return None, "No images found"
|
|
|
|
# Handle index: -1 = last (newest), "1" = most recent, "2" = second most recent
|
|
if index == -1:
|
|
target_idx = 0 # Newest
|
|
else:
|
|
target_idx = index
|
|
|
|
if 0 <= target_idx < len(user_images):
|
|
file_id = user_images[target_idx].get("id")
|
|
image_source = f"/api/v1/files/{file_id}/content"
|
|
else:
|
|
return (
|
|
None,
|
|
f"Image [{index+1}] not found (found {len(user_images)} image{'s' if len(user_images) != 1 else ''})",
|
|
)
|
|
|
|
except Exception as e:
|
|
return None, f"Failed to resolve image index: {type(e).__name__}: {e}"
|
|
|
|
# Check if it's a reference to an uploaded file in current message
|
|
if __files__:
|
|
for f in __files__:
|
|
file_id = f.get("id", "")
|
|
file_url = f.get("url", "")
|
|
|
|
# Match by ID, URL, or partial match
|
|
if (
|
|
image_source == file_id
|
|
or image_source == file_url
|
|
or image_source in file_url
|
|
or file_id in image_source
|
|
):
|
|
image_source = file_url or f"/api/v1/files/{file_id}/content"
|
|
break
|
|
|
|
# Normalize URL
|
|
if image_source.startswith("/api/"):
|
|
if not base_url:
|
|
return None, "Could not determine Open WebUI URL for relative path"
|
|
fetch_url = f"{base_url}{image_source}"
|
|
elif image_source.startswith("http://") or image_source.startswith("https://"):
|
|
fetch_url = image_source
|
|
else:
|
|
# Assume it's a file ID
|
|
if not base_url:
|
|
return None, "Could not determine Open WebUI URL"
|
|
fetch_url = f"{base_url}/api/v1/files/{image_source}/content"
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=60.0) as client:
|
|
response = await client.get(fetch_url, headers=headers)
|
|
if response.status_code == 200:
|
|
return response.content, None
|
|
else:
|
|
return (
|
|
None,
|
|
f"Failed to fetch image: {response.status_code} from {fetch_url}",
|
|
)
|
|
|
|
except Exception as e:
|
|
return None, f"Fetch error: {type(e).__name__}: {e}"
|
|
|
|
# ==================== Generate ====================
|
|
|
|
async def generate_image(
|
|
self,
|
|
prompt: str,
|
|
model: Optional[str] = None,
|
|
width: int = 1024,
|
|
height: int = 1024,
|
|
negative_prompt: Optional[str] = None,
|
|
style_preset: Optional[str] = None,
|
|
variants: int = 1,
|
|
__request__=None,
|
|
__user__: dict = None,
|
|
__metadata__: dict = None,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> str:
|
|
"""
|
|
Generate image(s) using Venice.ai and upload to Open WebUI.
|
|
|
|
IMPORTANT: For multiple images with the SAME prompt, use the 'variants'
|
|
parameter (1-4) instead of calling this function multiple times.
|
|
Multiple calls with DIFFERENT prompts are fine and will accumulate correctly.
|
|
|
|
:param prompt: Text description of the image to generate
|
|
:param model: Image model (default: user pref or z-image-turbo). Use venice_info list_models("image") to see options
|
|
:param width: Image width 512-1280 pixels (default 1024)
|
|
:param height: Image height 512-1280 pixels (default 1024)
|
|
:param negative_prompt: What to avoid in the image (or uses user's default if set)
|
|
:param style_preset: Style preset (use venice_info list_styles to see options). Note: Some models like z-image-turbo don't support styles
|
|
:param variants: Number of variations 1-4 with same prompt (use this instead of multiple calls)
|
|
:return: Generated image details
|
|
"""
|
|
# Check API key
|
|
venice_key = self._get_venice_key()
|
|
if not venice_key:
|
|
return "Generate Image\nStatus: 0\nError: Venice.ai API key not configured. Set in UserValves or ask admin."
|
|
|
|
# Validate prompt
|
|
if not prompt or not prompt.strip():
|
|
return "Generate Image\nStatus: 0\nError: Prompt is required"
|
|
|
|
# Get message context for accumulation
|
|
msg_key = self._get_message_key(__metadata__)
|
|
|
|
# Check cooldown (skip if disabled or if this is a re-entrant call)
|
|
user_id = __user__.get("id", "default") if __user__ else "default"
|
|
cooldown = self.valves.COOLDOWN_SECONDS
|
|
|
|
if cooldown > 0:
|
|
now = time.time()
|
|
last_gen = self._cooldowns.get(user_id, 0)
|
|
is_reentrant = self._get_accumulated_count(msg_key) > 0
|
|
|
|
if not is_reentrant and now - last_gen < cooldown:
|
|
remaining = cooldown - (now - last_gen)
|
|
return f"Generate Image\nStatus: 429\nError: Rate limited. Wait {remaining:.1f}s. TIP: Use 'variants' parameter (1-4) for multiple images with the same prompt."
|
|
|
|
self._cooldowns[user_id] = now
|
|
|
|
# Get effective settings using helper methods
|
|
model = model or self._get_default_model()
|
|
safe_mode = self._is_safe_mode_enabled()
|
|
hide_watermark = self._is_watermark_hidden()
|
|
|
|
# Handle negative prompt: explicit > user default > none
|
|
effective_negative_prompt = negative_prompt
|
|
if effective_negative_prompt is None:
|
|
effective_negative_prompt = self._get_default_negative_prompt()
|
|
|
|
# Clamp values
|
|
variants = max(1, min(4, variants))
|
|
width = max(512, min(1280, width))
|
|
height = max(512, min(1280, height))
|
|
|
|
# Count existing images (non-blocking peek)
|
|
existing_count = self._get_accumulated_count(msg_key)
|
|
|
|
# Status update
|
|
if __event_emitter__:
|
|
status_msg = (
|
|
f"Generating {variants} image{'s' if variants > 1 else ''} with {model}"
|
|
)
|
|
if existing_count > 0:
|
|
status_msg += f" (adding to {existing_count} existing)"
|
|
if safe_mode:
|
|
status_msg += " [SFW]"
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {"description": f"{status_msg}...", "done": False},
|
|
}
|
|
)
|
|
|
|
# Build Venice request
|
|
payload = {
|
|
"model": model,
|
|
"prompt": prompt,
|
|
"width": width,
|
|
"height": height,
|
|
"safe_mode": safe_mode,
|
|
"hide_watermark": hide_watermark,
|
|
"return_binary": False,
|
|
"variants": variants,
|
|
}
|
|
|
|
if effective_negative_prompt:
|
|
payload["negative_prompt"] = effective_negative_prompt
|
|
if style_preset:
|
|
payload["style_preset"] = style_preset
|
|
|
|
# Generate images - with retry for unsupported params
|
|
retried = False
|
|
dropped_params = []
|
|
|
|
try:
|
|
async with httpx.AsyncClient(
|
|
timeout=float(self.valves.GENERATION_TIMEOUT)
|
|
) as client:
|
|
response = await client.post(
|
|
"https://api.venice.ai/api/v1/image/generate",
|
|
headers={
|
|
"Authorization": f"Bearer {venice_key}",
|
|
"Content-Type": "application/json",
|
|
},
|
|
json=payload,
|
|
)
|
|
|
|
# Check for parameter errors and retry without optional params
|
|
if response.status_code in (400, 422) and not retried:
|
|
error_text = response.text.lower()
|
|
# Check if it's a style/parameter support issue
|
|
if any(
|
|
kw in error_text
|
|
for kw in ("style", "unsupported", "invalid", "parameter")
|
|
):
|
|
# Retry without style_preset and negative_prompt
|
|
if style_preset:
|
|
del payload["style_preset"]
|
|
dropped_params.append("style_preset")
|
|
if effective_negative_prompt:
|
|
del payload["negative_prompt"]
|
|
dropped_params.append("negative_prompt")
|
|
|
|
if dropped_params:
|
|
retried = True
|
|
response = await client.post(
|
|
"https://api.venice.ai/api/v1/image/generate",
|
|
headers={
|
|
"Authorization": f"Bearer {venice_key}",
|
|
"Content-Type": "application/json",
|
|
},
|
|
json=payload,
|
|
)
|
|
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"done": True}})
|
|
return f"Generate Image\nStatus: {e.response.status_code}\nError: Venice API error: {e.response.text[:200]}"
|
|
except httpx.TimeoutException:
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"done": True}})
|
|
return f"Generate Image\nStatus: 408\nError: Generation timed out after {self.valves.GENERATION_TIMEOUT}s"
|
|
except Exception as e:
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"done": True}})
|
|
return f"Generate Image\nStatus: 0\nError: {type(e).__name__}: {e}"
|
|
|
|
images = result.get("images", [])
|
|
if not images:
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"done": True}})
|
|
return "Generate Image\nStatus: 0\nError: No images returned from Venice.ai"
|
|
|
|
# Update status
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {
|
|
"description": f"Uploading {len(images)} image{'s' if len(images) > 1 else ''}...",
|
|
"done": False,
|
|
},
|
|
}
|
|
)
|
|
|
|
# Get context for metadata
|
|
chat_id = __metadata__.get("chat_id") if __metadata__ else None
|
|
message_id = __metadata__.get("message_id") if __metadata__ else None
|
|
|
|
# Upload each image to Files API
|
|
uploaded_files = []
|
|
errors = []
|
|
|
|
for i, image_b64 in enumerate(images):
|
|
timestamp = int(time.time() * 1000) # ms precision for uniqueness
|
|
filename = f"venice_{model}_{timestamp}_{i+1}.webp"
|
|
|
|
# Build metadata - chat_id/message_id at top level for consistency
|
|
file_metadata = {
|
|
"name": filename,
|
|
"content_type": "image/webp",
|
|
"data": {
|
|
"model": model,
|
|
"prompt": prompt,
|
|
"negative_prompt": effective_negative_prompt,
|
|
"style_preset": style_preset,
|
|
"width": width,
|
|
"height": height,
|
|
"variant": i + 1,
|
|
"total_variants": len(images),
|
|
"safe_mode": safe_mode,
|
|
"hide_watermark": hide_watermark,
|
|
},
|
|
}
|
|
|
|
# Associations at top level (becomes meta.data.chat_id, not meta.data.data.chat_id)
|
|
if chat_id:
|
|
file_metadata["chat_id"] = chat_id
|
|
if message_id:
|
|
file_metadata["message_id"] = message_id
|
|
|
|
file_id, error = await self._upload_image(
|
|
image_b64, filename, file_metadata, "image/webp", __request__
|
|
)
|
|
|
|
if file_id:
|
|
uploaded_files.append(
|
|
{
|
|
"type": "image",
|
|
"url": f"/api/v1/files/{file_id}/content",
|
|
}
|
|
)
|
|
else:
|
|
errors.append(f"Variant {i+1}: {error}")
|
|
|
|
# Accumulate files and emit ALL accumulated files for this message
|
|
if uploaded_files:
|
|
await self._accumulate_files(msg_key, uploaded_files, __event_emitter__)
|
|
|
|
# Get final count after accumulation
|
|
final_count = self._get_accumulated_count(msg_key)
|
|
|
|
# Clear status
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {
|
|
"description": f"Done ({final_count} image{'s' if final_count != 1 else ''} total)",
|
|
"done": True,
|
|
},
|
|
}
|
|
)
|
|
|
|
# Build response with file references for upscale/edit
|
|
parts = [
|
|
"Generate Image",
|
|
"Status: 200",
|
|
"",
|
|
f"Generated {len(uploaded_files)} image(s) for: {prompt[:100]}{'...' if len(prompt) > 100 else ''}",
|
|
f"Model: {model} | Size: {width}x{height}",
|
|
]
|
|
|
|
# Show effective settings
|
|
settings_parts = []
|
|
if safe_mode:
|
|
settings_parts.append("SFW")
|
|
if hide_watermark:
|
|
settings_parts.append("No watermark")
|
|
if settings_parts:
|
|
parts.append(f"Settings: {', '.join(settings_parts)}")
|
|
|
|
# Include file URLs so subsequent upscale/edit calls know what to reference
|
|
if uploaded_files:
|
|
parts.append("")
|
|
parts.append("Files (use these URLs for upscale_image or edit_image):")
|
|
for i, f in enumerate(uploaded_files):
|
|
parts.append(f" [{i+1}] {f['url']}")
|
|
|
|
if dropped_params:
|
|
parts.append(
|
|
f"Note: {model} doesn't support: {', '.join(dropped_params)} (ignored)"
|
|
)
|
|
|
|
if final_count > len(uploaded_files):
|
|
parts.append(f"Total images in message: {final_count}")
|
|
|
|
if errors:
|
|
parts.append("")
|
|
parts.append("Warnings:")
|
|
for e in errors:
|
|
parts.append(f" - {e}")
|
|
|
|
return "\n".join(parts)
|
|
|
|
# ==================== Upscale ====================
|
|
|
|
async def upscale_image(
|
|
self,
|
|
image: str,
|
|
scale: int = 2,
|
|
enhance: bool = False,
|
|
enhance_creativity: float = 0.5,
|
|
enhance_prompt: Optional[str] = None,
|
|
__request__=None,
|
|
__user__: dict = None,
|
|
__metadata__: dict = None,
|
|
__files__: list = None,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> str:
|
|
"""
|
|
Upscale an image to higher resolution, optionally with AI enhancement.
|
|
|
|
:param image: Image reference - simplest options:
|
|
- "1" or "last" - Your most recently generated/uploaded image
|
|
- "2" - Second most recent image
|
|
- Or full URL from generate_image output
|
|
:param scale: Scale factor 1-4 (default 2). Use 1 with enhance=True for enhancement only
|
|
:param enhance: Apply AI enhancement during upscaling (adds detail/clarity)
|
|
:param enhance_creativity: How much AI can modify the image 0.0-1.0 (default 0.5). Higher = more changes
|
|
:param enhance_prompt: Style hint for enhancement like "sharp", "cinematic", "vibrant" (optional)
|
|
:return: Upscaled image result
|
|
"""
|
|
# Check API key
|
|
venice_key = self._get_venice_key()
|
|
if not venice_key:
|
|
return "Upscale Image\nStatus: 0\nError: Venice.ai API key not configured."
|
|
|
|
# Validate parameters
|
|
scale = max(1, min(4, scale))
|
|
if scale == 1 and not enhance:
|
|
return "Upscale Image\nStatus: 0\nError: scale=1 requires enhance=True (enhancement only mode)"
|
|
|
|
enhance_creativity = max(0.0, min(1.0, enhance_creativity))
|
|
|
|
# Get message context for accumulation
|
|
msg_key = self._get_message_key(__metadata__)
|
|
existing_count = self._get_accumulated_count(msg_key)
|
|
|
|
# Status update
|
|
if __event_emitter__:
|
|
mode = "Enhancing" if scale == 1 else f"Upscaling {scale}x"
|
|
if enhance and scale > 1:
|
|
mode += " + enhancing"
|
|
await __event_emitter__(
|
|
{"type": "status", "data": {"description": f"{mode}...", "done": False}}
|
|
)
|
|
|
|
# Fetch source image
|
|
image_bytes, error = await self._get_image_bytes(
|
|
image, __request__, __files__, __metadata__
|
|
)
|
|
if error:
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"done": True}})
|
|
return (
|
|
f"Upscale Image\nStatus: 0\nError: Failed to get source image: {error}"
|
|
)
|
|
|
|
# Build request
|
|
payload = {
|
|
"image": base64.b64encode(image_bytes).decode("utf-8"),
|
|
"scale": scale,
|
|
}
|
|
|
|
if enhance:
|
|
payload["enhance"] = True
|
|
payload["enhanceCreativity"] = enhance_creativity
|
|
if enhance_prompt:
|
|
payload["enhancePrompt"] = enhance_prompt[:1500]
|
|
|
|
# Call Venice API
|
|
try:
|
|
async with httpx.AsyncClient(
|
|
timeout=float(self.valves.GENERATION_TIMEOUT)
|
|
) as client:
|
|
response = await client.post(
|
|
"https://api.venice.ai/api/v1/image/upscale",
|
|
headers={
|
|
"Authorization": f"Bearer {venice_key}",
|
|
"Content-Type": "application/json",
|
|
},
|
|
json=payload,
|
|
)
|
|
response.raise_for_status()
|
|
|
|
# Parse response (handles both binary and JSON)
|
|
result_bytes, parse_error = self._parse_venice_image_response(response)
|
|
if parse_error:
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{"type": "status", "data": {"done": True}}
|
|
)
|
|
return f"Upscale Image\nStatus: 0\nError: {parse_error}"
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"done": True}})
|
|
return f"Upscale Image\nStatus: {e.response.status_code}\nError: {e.response.text[:200]}"
|
|
except Exception as e:
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"done": True}})
|
|
return f"Upscale Image\nStatus: 0\nError: {type(e).__name__}: {e}"
|
|
|
|
# Upload result
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {"description": "Uploading...", "done": False},
|
|
}
|
|
)
|
|
|
|
chat_id = __metadata__.get("chat_id") if __metadata__ else None
|
|
message_id = __metadata__.get("message_id") if __metadata__ else None
|
|
|
|
timestamp = int(time.time() * 1000)
|
|
filename = f"venice_upscale_{scale}x_{timestamp}.png"
|
|
|
|
file_metadata = {
|
|
"name": filename,
|
|
"content_type": "image/png",
|
|
"data": {
|
|
"operation": "upscale",
|
|
"scale": scale,
|
|
"enhance": enhance,
|
|
"enhance_creativity": enhance_creativity if enhance else None,
|
|
"enhance_prompt": enhance_prompt if enhance else None,
|
|
"source_image": image[:100],
|
|
},
|
|
}
|
|
|
|
# Associations at top level (becomes meta.data.chat_id)
|
|
if chat_id:
|
|
file_metadata["chat_id"] = chat_id
|
|
if message_id:
|
|
file_metadata["message_id"] = message_id
|
|
|
|
file_id, upload_error = await self._upload_image(
|
|
result_bytes, filename, file_metadata, "image/png", __request__
|
|
)
|
|
|
|
if not file_id:
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"done": True}})
|
|
return f"Upscale Image\nStatus: 0\nError: Upload failed - {upload_error}"
|
|
|
|
# Accumulate and emit all files
|
|
new_file = {"type": "image", "url": f"/api/v1/files/{file_id}/content"}
|
|
await self._accumulate_files(msg_key, [new_file], __event_emitter__)
|
|
|
|
final_count = self._get_accumulated_count(msg_key)
|
|
|
|
# Clear status
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {
|
|
"description": f"Done ({final_count} image{'s' if final_count != 1 else ''} total)",
|
|
"done": True,
|
|
},
|
|
}
|
|
)
|
|
|
|
# Build response
|
|
parts = [
|
|
"Upscale Image",
|
|
"Status: 200",
|
|
"",
|
|
f"Upscaled {scale}x" + (" with enhancement" if enhance else ""),
|
|
"",
|
|
f"File (use for further upscale/edit): {new_file['url']}",
|
|
]
|
|
|
|
return "\n".join(parts)
|
|
|
|
# ==================== Edit ====================
|
|
|
|
async def edit_image(
|
|
self,
|
|
image: str,
|
|
prompt: str,
|
|
__request__=None,
|
|
__user__: dict = None,
|
|
__metadata__: dict = None,
|
|
__files__: list = None,
|
|
__event_emitter__: Callable[[dict], Any] = None,
|
|
) -> str:
|
|
"""
|
|
Edit an existing image using text instructions.
|
|
|
|
Venice uses the Qwen-Image model for editing. Works best with
|
|
short, clear instructions like:
|
|
- "Remove the background"
|
|
- "Change the sky to sunset"
|
|
- "Make it black and white"
|
|
- "Add snow to the scene"
|
|
|
|
:param image: Image reference - simplest options:
|
|
- "1" or "last" - Your most recently generated/uploaded image
|
|
- "2" - Second most recent image
|
|
- Or full URL from generate_image output
|
|
:param prompt: Text instructions describing the edit to make (max 1500 chars)
|
|
:return: Edited image result
|
|
"""
|
|
# Check API key
|
|
venice_key = self._get_venice_key()
|
|
if not venice_key:
|
|
return "Edit Image\nStatus: 0\nError: Venice.ai API key not configured."
|
|
|
|
# Validate prompt
|
|
if not prompt or not prompt.strip():
|
|
return "Edit Image\nStatus: 0\nError: Edit prompt is required"
|
|
|
|
prompt = prompt.strip()[:1500]
|
|
|
|
# Get message context for accumulation
|
|
msg_key = self._get_message_key(__metadata__)
|
|
existing_count = self._get_accumulated_count(msg_key)
|
|
|
|
# Status update
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {"description": "Editing image...", "done": False},
|
|
}
|
|
)
|
|
|
|
# Fetch source image
|
|
image_bytes, error = await self._get_image_bytes(
|
|
image, __request__, __files__, __metadata__
|
|
)
|
|
if error:
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"done": True}})
|
|
return f"Edit Image\nStatus: 0\nError: Failed to get source image: {error}"
|
|
|
|
# Build request
|
|
payload = {
|
|
"image": base64.b64encode(image_bytes).decode("utf-8"),
|
|
"prompt": prompt,
|
|
}
|
|
|
|
# Call Venice API
|
|
try:
|
|
async with httpx.AsyncClient(
|
|
timeout=float(self.valves.GENERATION_TIMEOUT)
|
|
) as client:
|
|
response = await client.post(
|
|
"https://api.venice.ai/api/v1/image/edit",
|
|
headers={
|
|
"Authorization": f"Bearer {venice_key}",
|
|
"Content-Type": "application/json",
|
|
},
|
|
json=payload,
|
|
)
|
|
response.raise_for_status()
|
|
|
|
# Parse response (handles both binary and JSON)
|
|
result_bytes, parse_error = self._parse_venice_image_response(response)
|
|
if parse_error:
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{"type": "status", "data": {"done": True}}
|
|
)
|
|
return f"Edit Image\nStatus: 0\nError: {parse_error}"
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"done": True}})
|
|
return f"Edit Image\nStatus: {e.response.status_code}\nError: {e.response.text[:200]}"
|
|
except Exception as e:
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"done": True}})
|
|
return f"Edit Image\nStatus: 0\nError: {type(e).__name__}: {e}"
|
|
|
|
# Upload result
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {"description": "Uploading...", "done": False},
|
|
}
|
|
)
|
|
|
|
chat_id = __metadata__.get("chat_id") if __metadata__ else None
|
|
message_id = __metadata__.get("message_id") if __metadata__ else None
|
|
|
|
timestamp = int(time.time() * 1000)
|
|
filename = f"venice_edit_{timestamp}.png"
|
|
|
|
file_metadata = {
|
|
"name": filename,
|
|
"content_type": "image/png",
|
|
"data": {
|
|
"operation": "edit",
|
|
"prompt": prompt,
|
|
"source_image": image[:100],
|
|
},
|
|
}
|
|
|
|
# Associations at top level (becomes meta.data.chat_id)
|
|
if chat_id:
|
|
file_metadata["chat_id"] = chat_id
|
|
if message_id:
|
|
file_metadata["message_id"] = message_id
|
|
|
|
file_id, upload_error = await self._upload_image(
|
|
result_bytes, filename, file_metadata, "image/png", __request__
|
|
)
|
|
|
|
if not file_id:
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"done": True}})
|
|
return f"Edit Image\nStatus: 0\nError: Upload failed - {upload_error}"
|
|
|
|
# Accumulate and emit all files
|
|
new_file = {"type": "image", "url": f"/api/v1/files/{file_id}/content"}
|
|
await self._accumulate_files(msg_key, [new_file], __event_emitter__)
|
|
|
|
final_count = self._get_accumulated_count(msg_key)
|
|
|
|
# Clear status
|
|
if __event_emitter__:
|
|
await __event_emitter__(
|
|
{
|
|
"type": "status",
|
|
"data": {
|
|
"description": f"Done ({final_count} image{'s' if final_count != 1 else ''} total)",
|
|
"done": True,
|
|
},
|
|
}
|
|
)
|
|
|
|
# Build response
|
|
parts = [
|
|
"Edit Image",
|
|
"Status: 200",
|
|
"",
|
|
f"Applied edit: {prompt[:80]}{'...' if len(prompt) > 80 else ''}",
|
|
"",
|
|
f"File (use for further upscale/edit): {new_file['url']}",
|
|
]
|
|
|
|
return "\n".join(parts)
|