534 lines
30 KiB
Python
534 lines
30 KiB
Python
"""
|
|
title: Venice.ai Image Tools
|
|
author: Jeff Smith + Claude
|
|
version: 1.7.0
|
|
license: MIT
|
|
required_open_webui_version: 0.6.0
|
|
requirements: httpx, pydantic
|
|
description: |
|
|
Venice.ai image generation, upscaling, and editing tools.
|
|
- generate_image: Create images from text prompts
|
|
- upscale_image: Increase resolution up to 4x with optional enhancement
|
|
- edit_image: Modify existing images with text instructions
|
|
|
|
All operations upload results to Open WebUI Files API for persistence
|
|
and attach images to chat via event emitter for inline display.
|
|
|
|
Re-entrant safe: Multiple concurrent calls accumulate images correctly.
|
|
|
|
v1.7.0: Added VeniceImage namespace class for helper functions to avoid
|
|
method collisions with Open WebUI framework introspection.
|
|
v1.6.0: Added UserValves for SAFE_MODE and HIDE_WATERMARK with proper
|
|
admin/user override logic.
|
|
changelog:
|
|
1.7.0:
|
|
- Added VeniceImage namespace class for helper functions
|
|
- Moved get_api_key, parse_venice_image_response to VeniceImage namespace
|
|
- Prevents Open WebUI framework introspection method name collisions
|
|
1.6.0:
|
|
- Added UserValves for SAFE_MODE and HIDE_WATERMARK with proper override logic
|
|
"""
|
|
|
|
import asyncio
|
|
import threading
|
|
import time
|
|
import base64
|
|
import io
|
|
from typing import Optional, Callable, Any, Dict, List
|
|
from pydantic import BaseModel, Field
|
|
import httpx
|
|
|
|
|
|
class VeniceImage:
|
|
"""
|
|
Namespaced helpers for Venice image operations.
|
|
|
|
Using a separate class prevents Open WebUI framework introspection
|
|
from colliding with tool methods that have generic names like _get_api_key.
|
|
"""
|
|
|
|
@staticmethod
|
|
def get_api_key(valves, user_valves, __user__: dict = None) -> str:
|
|
"""Get API key with UserValves priority."""
|
|
if __user__ and "valves" in __user__:
|
|
user_valves_dict = __user__.get("valves")
|
|
if isinstance(user_valves_dict, dict) and user_valves_dict.get("VENICE_API_KEY"):
|
|
return user_valves_dict["VENICE_API_KEY"]
|
|
return user_valves.VENICE_API_KEY or valves.VENICE_API_KEY
|
|
|
|
@staticmethod
|
|
def parse_venice_image_response(response) -> tuple[Optional[bytes], Optional[str]]:
|
|
"""Parse Venice API image response - handles both binary and JSON formats."""
|
|
content_type = response.headers.get("content-type", "")
|
|
if content_type.startswith("image/"):
|
|
return response.content, None
|
|
if "application/json" in content_type:
|
|
try:
|
|
data = response.json()
|
|
for field in ("image", "images", "data", "result"):
|
|
if field in data:
|
|
value = data[field]
|
|
if isinstance(value, str):
|
|
return base64.b64decode(value), None
|
|
if isinstance(value, list) and value:
|
|
return base64.b64decode(value[0]), None
|
|
if "error" in data:
|
|
return None, f"API error: {data.get('error')}"
|
|
return None, f"Unexpected JSON response keys: {list(data.keys())}"
|
|
except Exception as e:
|
|
return None, f"Failed to parse JSON response: {e}"
|
|
if len(response.content) > 1000:
|
|
return response.content, None
|
|
return None, f"Unknown response format: {content_type}, length: {len(response.content)}"
|
|
|
|
|
|
class Tools:
|
|
"""Venice.ai image tools: generate, upscale, and edit."""
|
|
|
|
class Valves(BaseModel):
|
|
VENICE_API_KEY: str = Field(default="", description="Venice.ai API key (admin fallback)")
|
|
DEFAULT_MODEL: str = Field(default="z-image-turbo", description="Default image model")
|
|
SAFE_MODE: bool = Field(default=True, description="Admin: Require SFW content")
|
|
HIDE_WATERMARK: bool = Field(default=False, description="Admin: Hide watermark")
|
|
COOLDOWN_SECONDS: int = Field(default=0, description="Min seconds between generations")
|
|
GENERATION_TIMEOUT: int = Field(default=180, description="Timeout for image generation in seconds")
|
|
ACCUMULATOR_TTL: int = Field(default=300, description="Seconds to keep accumulated files in memory")
|
|
|
|
class UserValves(BaseModel):
|
|
VENICE_API_KEY: str = Field(default="", description="Your Venice.ai API key (overrides admin)")
|
|
SAFE_MODE: bool = Field(default=False, description="Enable SFW content filtering")
|
|
HIDE_WATERMARK: bool = Field(default=False, description="Hide Venice.ai watermark")
|
|
DEFAULT_MODEL: str = Field(default="", description="Your preferred image model")
|
|
DEFAULT_NEGATIVE_PROMPT: str = Field(default="", description="Default negative prompt")
|
|
|
|
def __init__(self):
|
|
self.valves = self.Valves()
|
|
self.user_valves = self.UserValves()
|
|
self.citation = False
|
|
self._cooldowns: Dict[str, float] = {}
|
|
self._message_files: Dict[str, Dict[str, Any]] = {}
|
|
self._accumulator_lock: Optional[asyncio.Lock] = None
|
|
self._lock_init = threading.Lock()
|
|
self._last_cleanup: float = 0.0
|
|
|
|
def _is_safe_mode_enabled(self) -> bool:
|
|
return self.valves.SAFE_MODE or self.user_valves.SAFE_MODE
|
|
|
|
def _is_watermark_hidden(self) -> bool:
|
|
return self.valves.HIDE_WATERMARK or self.user_valves.HIDE_WATERMARK
|
|
|
|
def _get_default_model(self) -> str:
|
|
return self.user_valves.DEFAULT_MODEL or self.valves.DEFAULT_MODEL
|
|
|
|
def _get_default_negative_prompt(self) -> Optional[str]:
|
|
return self.user_valves.DEFAULT_NEGATIVE_PROMPT or None
|
|
|
|
def _get_owui_config(self, __request__=None) -> tuple[str, dict]:
|
|
base_url = ""
|
|
headers = {}
|
|
if __request__:
|
|
base_url = str(getattr(__request__, "base_url", "") or "")
|
|
req_headers = getattr(__request__, "headers", {})
|
|
if req_headers:
|
|
auth = dict(req_headers).get("authorization", "")
|
|
if auth:
|
|
headers["Authorization"] = auth
|
|
return base_url.rstrip("/"), headers
|
|
|
|
def _get_message_key(self, __metadata__: dict = None) -> str:
|
|
if __metadata__:
|
|
chat_id = __metadata__.get("chat_id", "")
|
|
message_id = __metadata__.get("message_id", "")
|
|
if message_id:
|
|
return f"{chat_id}:{message_id}"
|
|
return f"unknown:{int(time.time())}"
|
|
|
|
def _get_lock(self) -> asyncio.Lock:
|
|
if self._accumulator_lock is None:
|
|
with self._lock_init:
|
|
if self._accumulator_lock is None:
|
|
self._accumulator_lock = asyncio.Lock()
|
|
return self._accumulator_lock
|
|
|
|
async def _accumulate_files(self, key: str, new_files: List[dict], __event_emitter__: Callable[[dict], Any] = None):
|
|
all_files = []
|
|
async with self._get_lock():
|
|
if key not in self._message_files:
|
|
self._message_files[key] = {"files": [], "timestamp": time.time()}
|
|
for f in new_files:
|
|
self._message_files[key]["files"].append(dict(f))
|
|
self._message_files[key]["timestamp"] = time.time()
|
|
all_files = list(self._message_files[key]["files"])
|
|
now = time.time()
|
|
if now - self._last_cleanup > 60:
|
|
self._last_cleanup = now
|
|
ttl = self.valves.ACCUMULATOR_TTL
|
|
expired = [k for k, v in self._message_files.items() if now - v.get("timestamp", 0) > ttl]
|
|
for k in expired:
|
|
del self._message_files[k]
|
|
if all_files and __event_emitter__:
|
|
await __event_emitter__({"type": "files", "data": {"files": all_files}})
|
|
|
|
def _get_accumulated_count(self, key: str) -> int:
|
|
entry = self._message_files.get(key)
|
|
return len(entry.get("files", [])) if entry else 0
|
|
|
|
async def _upload_image(self, image_data: bytes | str, filename: str, metadata: dict, content_type: str = "image/webp", __request__=None) -> tuple[Optional[str], Optional[str]]:
|
|
base_url, headers = self._get_owui_config(__request__)
|
|
if not base_url:
|
|
return None, "Could not determine Open WebUI URL"
|
|
if isinstance(image_data, str):
|
|
try:
|
|
image_bytes = base64.b64decode(image_data)
|
|
except Exception as e:
|
|
return None, f"Failed to decode image: {e}"
|
|
else:
|
|
image_bytes = image_data
|
|
try:
|
|
async with httpx.AsyncClient(timeout=60.0) as client:
|
|
files = {"file": (filename, io.BytesIO(image_bytes), content_type)}
|
|
import json
|
|
data = {"metadata": json.dumps(metadata)}
|
|
response = await client.post(f"{base_url}/api/v1/files/", headers=headers, files=files, data=data, params={"process": "false", "process_in_background": "false"})
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
return result.get("id"), None
|
|
else:
|
|
return None, f"Upload failed: {response.status_code} - {response.text[:200]}"
|
|
except Exception as e:
|
|
return None, f"Upload error: {type(e).__name__}: {e}"
|
|
|
|
async def _get_image_bytes(self, image_source: str, __request__=None, __files__: list = None, __metadata__: dict = None) -> tuple[Optional[bytes], Optional[str]]:
|
|
base_url, headers = self._get_owui_config(__request__)
|
|
image_source = image_source.strip()
|
|
index = None
|
|
if image_source.lower() in ("last", "latest", "previous", "recent"):
|
|
index = -1
|
|
elif image_source.isdigit():
|
|
index = int(image_source) - 1
|
|
elif image_source.startswith("[") and image_source.endswith("]"):
|
|
inner = image_source[1:-1].strip()
|
|
if inner.isdigit():
|
|
index = int(inner) - 1
|
|
if index is not None:
|
|
if not base_url:
|
|
return None, "Could not determine Open WebUI URL"
|
|
message_ids = set()
|
|
if __metadata__:
|
|
if __metadata__.get("message_id"):
|
|
message_ids.add(__metadata__["message_id"])
|
|
parent = __metadata__.get("parent_message")
|
|
while parent:
|
|
if parent.get("id"):
|
|
message_ids.add(parent["id"])
|
|
parent = parent.get("parent_message")
|
|
if __metadata__.get("parent_message_id"):
|
|
message_ids.add(__metadata__["parent_message_id"])
|
|
try:
|
|
async with httpx.AsyncClient(timeout=30.0) as client:
|
|
response = await client.get(f"{base_url}/api/v1/files/", headers=headers, params={"content": "false"})
|
|
if response.status_code != 200:
|
|
return None, f"Failed to list files: {response.status_code}"
|
|
files = response.json()
|
|
context_images = []
|
|
all_images = []
|
|
for f in files:
|
|
meta = f.get("meta", {})
|
|
content_type = meta.get("content_type", "")
|
|
if not content_type.startswith("image/"):
|
|
continue
|
|
all_images.append(f)
|
|
inner_data = meta.get("data", {})
|
|
file_msg_id = None
|
|
if isinstance(inner_data, dict):
|
|
file_msg_id = inner_data.get("message_id")
|
|
if not file_msg_id:
|
|
nested_data = inner_data.get("data", {})
|
|
if isinstance(nested_data, dict):
|
|
file_msg_id = nested_data.get("message_id")
|
|
if file_msg_id and file_msg_id in message_ids:
|
|
context_images.append(f)
|
|
user_images = context_images if context_images else all_images
|
|
user_images.sort(key=lambda f: f.get("created_at", 0), reverse=True)
|
|
if not user_images:
|
|
return None, "No images found"
|
|
if index == -1:
|
|
target_idx = 0
|
|
else:
|
|
target_idx = index
|
|
if 0 <= target_idx < len(user_images):
|
|
file_id = user_images[target_idx].get("id")
|
|
image_source = f"/api/v1/files/{file_id}/content"
|
|
else:
|
|
return None, f"Image [{index+1}] not found (found {len(user_images)} images)"
|
|
except Exception as e:
|
|
return None, f"Failed to resolve image index: {type(e).__name__}: {e}"
|
|
if __files__:
|
|
for f in __files__:
|
|
file_id = f.get("id", "")
|
|
file_url = f.get("url", "")
|
|
if image_source == file_id or image_source == file_url or image_source in file_url or file_id in image_source:
|
|
image_source = file_url or f"/api/v1/files/{file_id}/content"
|
|
break
|
|
if image_source.startswith("/api/"):
|
|
if not base_url:
|
|
return None, "Could not determine Open WebUI URL for relative path"
|
|
fetch_url = f"{base_url}{image_source}"
|
|
elif image_source.startswith("http://") or image_source.startswith("https://"):
|
|
fetch_url = image_source
|
|
else:
|
|
if not base_url:
|
|
return None, "Could not determine Open WebUI URL"
|
|
fetch_url = f"{base_url}/api/v1/files/{image_source}/content"
|
|
try:
|
|
async with httpx.AsyncClient(timeout=60.0) as client:
|
|
response = await client.get(fetch_url, headers=headers)
|
|
if response.status_code == 200:
|
|
return response.content, None
|
|
else:
|
|
return None, f"Failed to fetch image: {response.status_code} from {fetch_url}"
|
|
except Exception as e:
|
|
return None, f"Fetch error: {type(e).__name__}: {e}"
|
|
|
|
async def generate_image(self, prompt: str, model: Optional[str] = None, width: int = 1024, height: int = 1024, negative_prompt: Optional[str] = None, style_preset: Optional[str] = None, variants: int = 1, __request__=None, __user__: dict = None, __metadata__: dict = None, __event_emitter__: Callable[[dict], Any] = None) -> str:
|
|
venice_key = VeniceImage.get_api_key(self.valves, self.user_valves, __user__)
|
|
if not venice_key:
|
|
return "Generate Image\nStatus: 0\nError: Venice.ai API key not configured."
|
|
if not prompt or not prompt.strip():
|
|
return "Generate Image\nStatus: 0\nError: Prompt is required"
|
|
msg_key = self._get_message_key(__metadata__)
|
|
user_id = __user__.get("id", "default") if __user__ else "default"
|
|
cooldown = self.valves.COOLDOWN_SECONDS
|
|
if cooldown > 0:
|
|
now = time.time()
|
|
last_gen = self._cooldowns.get(user_id, 0)
|
|
is_reentrant = self._get_accumulated_count(msg_key) > 0
|
|
if not is_reentrant and now - last_gen < cooldown:
|
|
remaining = cooldown - (now - last_gen)
|
|
return f"Generate Image\nStatus: 429\nError: Rate limited. Wait {remaining:.1f}s."
|
|
self._cooldowns[user_id] = now
|
|
model = model or self._get_default_model()
|
|
safe_mode = self._is_safe_mode_enabled()
|
|
hide_watermark = self._is_watermark_hidden()
|
|
effective_negative_prompt = negative_prompt or self._get_default_negative_prompt()
|
|
variants = max(1, min(4, variants))
|
|
width = max(512, min(1280, width))
|
|
height = max(512, min(1280, height))
|
|
existing_count = self._get_accumulated_count(msg_key)
|
|
if __event_emitter__:
|
|
status_msg = f"Generating {variants} image{'s' if variants > 1 else ''} with {model}"
|
|
if existing_count > 0:
|
|
status_msg += f" (adding to {existing_count} existing)"
|
|
if safe_mode:
|
|
status_msg += " [SFW]"
|
|
await __event_emitter__({"type": "status", "data": {"description": f"{status_msg}...", "done": False}})
|
|
payload = {"model": model, "prompt": prompt, "width": width, "height": height, "safe_mode": safe_mode, "hide_watermark": hide_watermark, "return_binary": False, "variants": variants}
|
|
if effective_negative_prompt:
|
|
payload["negative_prompt"] = effective_negative_prompt
|
|
if style_preset:
|
|
payload["style_preset"] = style_preset
|
|
retried = False
|
|
dropped_params = []
|
|
try:
|
|
async with httpx.AsyncClient(timeout=float(self.valves.GENERATION_TIMEOUT)) as client:
|
|
response = await client.post("https://api.venice.ai/api/v1/image/generate", headers={"Authorization": f"Bearer {venice_key}", "Content-Type": "application/json"}, json=payload)
|
|
if response.status_code in (400, 422) and not retried:
|
|
error_text = response.text.lower()
|
|
if any(kw in error_text for kw in ("style", "unsupported", "invalid", "parameter")):
|
|
if style_preset:
|
|
del payload["style_preset"]
|
|
dropped_params.append("style_preset")
|
|
if effective_negative_prompt:
|
|
del payload["negative_prompt"]
|
|
dropped_params.append("negative_prompt")
|
|
if dropped_params:
|
|
retried = True
|
|
response = await client.post("https://api.venice.ai/api/v1/image/generate", headers={"Authorization": f"Bearer {venice_key}", "Content-Type": "application/json"}, json=payload)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
except httpx.HTTPStatusError as e:
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"done": True}})
|
|
return f"Generate Image\nStatus: {e.response.status_code}\nError: {e.response.text[:200]}"
|
|
except httpx.TimeoutException:
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"done": True}})
|
|
return f"Generate Image\nStatus: 408\nError: Timed out after {self.valves.GENERATION_TIMEOUT}s"
|
|
except Exception as e:
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"done": True}})
|
|
return f"Generate Image\nStatus: 0\nError: {type(e).__name__}: {e}"
|
|
images = result.get("images", [])
|
|
if not images:
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"done": True}})
|
|
return "Generate Image\nStatus: 0\nError: No images returned"
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"description": f"Uploading {len(images)} images...", "done": False}})
|
|
chat_id = __metadata__.get("chat_id") if __metadata__ else None
|
|
message_id = __metadata__.get("message_id") if __metadata__ else None
|
|
uploaded_files = []
|
|
errors = []
|
|
for i, image_b64 in enumerate(images):
|
|
timestamp = int(time.time() * 1000)
|
|
filename = f"venice_{model}_{timestamp}_{i+1}.webp"
|
|
file_metadata = {"name": filename, "content_type": "image/webp", "data": {"model": model, "prompt": prompt, "negative_prompt": effective_negative_prompt, "style_preset": style_preset, "width": width, "height": height, "variant": i+1, "total_variants": len(images), "safe_mode": safe_mode, "hide_watermark": hide_watermark}}
|
|
if chat_id:
|
|
file_metadata["chat_id"] = chat_id
|
|
if message_id:
|
|
file_metadata["message_id"] = message_id
|
|
file_id, error = await self._upload_image(image_b64, filename, file_metadata, "image/webp", __request__)
|
|
if file_id:
|
|
uploaded_files.append({"type": "image", "url": f"/api/v1/files/{file_id}/content"})
|
|
else:
|
|
errors.append(f"Variant {i+1}: {error}")
|
|
if uploaded_files:
|
|
await self._accumulate_files(msg_key, uploaded_files, __event_emitter__)
|
|
final_count = self._get_accumulated_count(msg_key)
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"description": f"Done ({final_count} images total)", "done": True}})
|
|
parts = ["Generate Image", "Status: 200", "", f"Generated {len(uploaded_files)} image(s) for: {prompt[:100]}{'...' if len(prompt) > 100 else ''}", f"Model: {model} | Size: {width}x{height}"]
|
|
settings_parts = []
|
|
if safe_mode:
|
|
settings_parts.append("SFW")
|
|
if hide_watermark:
|
|
settings_parts.append("No watermark")
|
|
if settings_parts:
|
|
parts.append(f"Settings: {', '.join(settings_parts)}")
|
|
if uploaded_files:
|
|
parts.append("", "Files:")
|
|
for i, f in enumerate(uploaded_files):
|
|
parts.append(f" [{i+1}] {f['url']}")
|
|
if dropped_params:
|
|
parts.append(f"Note: {model} doesn't support: {', '.join(dropped_params)} (ignored)")
|
|
if final_count > len(uploaded_files):
|
|
parts.append(f"Total images in message: {final_count}")
|
|
if errors:
|
|
parts.append("", "Warnings:")
|
|
for e in errors:
|
|
parts.append(f" - {e}")
|
|
return "\n".join(parts)
|
|
|
|
async def upscale_image(self, image: str, scale: int = 2, enhance: bool = False, enhance_creativity: float = 0.5, enhance_prompt: Optional[str] = None, __request__=None, __user__: dict = None, __metadata__: dict = None, __files__: list = None, __event_emitter__: Callable[[dict], Any] = None) -> str:
|
|
venice_key = VeniceImage.get_api_key(self.valves, self.user_valves, __user__)
|
|
if not venice_key:
|
|
return "Upscale Image\nStatus: 0\nError: Venice.ai API key not configured."
|
|
scale = max(1, min(4, scale))
|
|
if scale == 1 and not enhance:
|
|
return "Upscale Image\nStatus: 0\nError: scale=1 requires enhance=True"
|
|
enhance_creativity = max(0.0, min(1.0, enhance_creativity))
|
|
msg_key = self._get_message_key(__metadata__)
|
|
existing_count = self._get_accumulated_count(msg_key)
|
|
if __event_emitter__:
|
|
mode = "Enhancing" if scale == 1 else f"Upscaling {scale}x"
|
|
if enhance and scale > 1:
|
|
mode += " + enhancing"
|
|
await __event_emitter__({"type": "status", "data": {"description": f"{mode}...", "done": False}})
|
|
image_bytes, error = await self._get_image_bytes(image, __request__, __files__, __metadata__)
|
|
if error:
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"done": True}})
|
|
return f"Upscale Image\nStatus: 0\nError: {error}"
|
|
payload = {"image": base64.b64encode(image_bytes).decode("utf-8"), "scale": scale}
|
|
if enhance:
|
|
payload["enhance"] = True
|
|
payload["enhanceCreativity"] = enhance_creativity
|
|
if enhance_prompt:
|
|
payload["enhancePrompt"] = enhance_prompt[:1500]
|
|
try:
|
|
async with httpx.AsyncClient(timeout=float(self.valves.GENERATION_TIMEOUT)) as client:
|
|
response = await client.post("https://api.venice.ai/api/v1/image/upscale", headers={"Authorization": f"Bearer {venice_key}", "Content-Type": "application/json"}, json=payload)
|
|
response.raise_for_status()
|
|
result_bytes, parse_error = VeniceImage.parse_venice_image_response(response)
|
|
if parse_error:
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"done": True}})
|
|
return f"Upscale Image\nStatus: 0\nError: {parse_error}"
|
|
except httpx.HTTPStatusError as e:
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"done": True}})
|
|
return f"Upscale Image\nStatus: {e.response.status_code}\nError: {e.response.text[:200]}"
|
|
except Exception as e:
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"done": True}})
|
|
return f"Upscale Image\nStatus: 0\nError: {type(e).__name__}: {e}"
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"description": "Uploading...", "done": False}})
|
|
chat_id = __metadata__.get("chat_id") if __metadata__ else None
|
|
message_id = __metadata__.get("message_id") if __metadata__ else None
|
|
timestamp = int(time.time() * 1000)
|
|
filename = f"venice_upscale_{scale}x_{timestamp}.png"
|
|
file_metadata = {"name": filename, "content_type": "image/png", "data": {"operation": "upscale", "scale": scale, "enhance": enhance, "enhance_creativity": enhance_creativity if enhance else None, "enhance_prompt": enhance_prompt if enhance else None}}
|
|
if chat_id:
|
|
file_metadata["chat_id"] = chat_id
|
|
if message_id:
|
|
file_metadata["message_id"] = message_id
|
|
file_id, upload_error = await self._upload_image(result_bytes, filename, file_metadata, "image/png", __request__)
|
|
if not file_id:
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"done": True}})
|
|
return f"Upscale Image\nStatus: 0\nError: Upload failed - {upload_error}"
|
|
new_file = {"type": "image", "url": f"/api/v1/files/{file_id}/content"}
|
|
await self._accumulate_files(msg_key, [new_file], __event_emitter__)
|
|
final_count = self._get_accumulated_count(msg_key)
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"description": f"Done ({final_count} images total)", "done": True}})
|
|
return f"Upscale Image\nStatus: 200\n\nUpscaled {scale}x{' with enhancement' if enhance else ''}\n\nFile: {new_file['url']}"
|
|
|
|
async def edit_image(self, image: str, prompt: str, __request__=None, __user__: dict = None, __metadata__: dict = None, __files__: list = None, __event_emitter__: Callable[[dict], Any] = None) -> str:
|
|
venice_key = VeniceImage.get_api_key(self.valves, self.user_valves, __user__)
|
|
if not venice_key:
|
|
return "Edit Image\nStatus: 0\nError: Venice.ai API key not configured."
|
|
if not prompt or not prompt.strip():
|
|
return "Edit Image\nStatus: 0\nError: Edit prompt is required"
|
|
prompt = prompt.strip()[:1500]
|
|
msg_key = self._get_message_key(__metadata__)
|
|
existing_count = self._get_accumulated_count(msg_key)
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"description": "Editing image...", "done": False}})
|
|
image_bytes, error = await self._get_image_bytes(image, __request__, __files__, __metadata__)
|
|
if error:
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"done": True}})
|
|
return f"Edit Image\nStatus: 0\nError: {error}"
|
|
payload = {"image": base64.b64encode(image_bytes).decode("utf-8"), "prompt": prompt}
|
|
try:
|
|
async with httpx.AsyncClient(timeout=float(self.valves.GENERATION_TIMEOUT)) as client:
|
|
response = await client.post("https://api.venice.ai/api/v1/image/edit", headers={"Authorization": f"Bearer {venice_key}", "Content-Type": "application/json"}, json=payload)
|
|
response.raise_for_status()
|
|
result_bytes, parse_error = VeniceImage.parse_venice_image_response(response)
|
|
if parse_error:
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"done": True}})
|
|
return f"Edit Image\nStatus: 0\nError: {parse_error}"
|
|
except httpx.HTTPStatusError as e:
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"done": True}})
|
|
return f"Edit Image\nStatus: {e.response.status_code}\nError: {e.response.text[:200]}"
|
|
except Exception as e:
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"done": True}})
|
|
return f"Edit Image\nStatus: 0\nError: {type(e).__name__}: {e}"
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"description": "Uploading...", "done": False}})
|
|
chat_id = __metadata__.get("chat_id") if __metadata__ else None
|
|
message_id = __metadata__.get("message_id") if __metadata__ else None
|
|
timestamp = int(time.time() * 1000)
|
|
filename = f"venice_edit_{timestamp}.png"
|
|
file_metadata = {"name": filename, "content_type": "image/png", "data": {"operation": "edit", "prompt": prompt}}
|
|
if chat_id:
|
|
file_metadata["chat_id"] = chat_id
|
|
if message_id:
|
|
file_metadata["message_id"] = message_id
|
|
file_id, upload_error = await self._upload_image(result_bytes, filename, file_metadata, "image/png", __request__)
|
|
if not file_id:
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"done": True}})
|
|
return f"Edit Image\nStatus: 0\nError: Upload failed - {upload_error}"
|
|
new_file = {"type": "image", "url": f"/api/v1/files/{file_id}/content"}
|
|
await self._accumulate_files(msg_key, [new_file], __event_emitter__)
|
|
final_count = self._get_accumulated_count(msg_key)
|
|
if __event_emitter__:
|
|
await __event_emitter__({"type": "status", "data": {"description": f"Done ({final_count} images total)", "done": True}})
|
|
return f"Edit Image\nStatus: 200\n\nApplied edit: {prompt[:80]}{'...' if len(prompt) > 80 else ''}\n\nFile: {new_file['url']}"
|