Files
tools/venice/image.py
Jeffrey Smith 3d8a8190f9 return type fixes (#9)
Reviewed-on: #9
Co-authored-by: Jeffrey Smith <jasafpro@gmail.com>
Co-committed-by: Jeffrey Smith <jasafpro@gmail.com>
2026-01-17 12:31:59 +00:00

609 lines
30 KiB
Python

"""
title: Venice.ai Image Tools
author: Jeff Smith + Claude
version: 1.7.0
license: MIT
required_open_webui_version: 0.6.0
requirements: httpx, pydantic
description: |
Venice.ai image generation, upscaling, and editing tools.
- generate_image: Create images from text prompts
- upscale_image: Increase resolution up to 4x with optional enhancement
- edit_image: Modify existing images with text instructions
All operations upload results to Open WebUI Files API for persistence
and attach images to chat via event emitter for inline display.
Re-entrant safe: Multiple concurrent calls accumulate images correctly.
changelog:
1.7.1:
- changed return type from string to dictionary, mirroring the default tools behavior
- fixed issues with user valve overrides - Watermake and Safe Mode
- status message will display either [SFW] or [NSFW] depending on flag not content
1.7.0:
- Added VeniceImage namespace class for helper functions
- Moved get_api_key, parse_venice_image_response to VeniceImage namespace
- Prevents Open WebUI framework introspection method name collisions
1.6.0:
- Added UserValves for SAFE_MODE and HIDE_WATERMARK with proper override logic
"""
import asyncio
import threading
import time
import base64
import io
from typing import Optional, Callable, Any, Dict, List
from pydantic import BaseModel, Field
import httpx
class VeniceImage:
"""
Namespaced helpers for Venice image operations.
Using a separate class prevents Open WebUI framework introspection
from colliding with tool methods that have generic names like _get_api_key.
"""
@staticmethod
def get_api_key(valves, user_valves, __user__: dict = None) -> str:
"""Get API key with UserValves priority."""
if __user__ and "valves" in __user__:
user_valves_dict = __user__.get("valves")
if isinstance(user_valves_dict, dict) and user_valves_dict.get("VENICE_API_KEY"):
return user_valves_dict["VENICE_API_KEY"]
return user_valves.VENICE_API_KEY or valves.VENICE_API_KEY
@staticmethod
def parse_venice_image_response(response) -> tuple[Optional[bytes], Optional[str]]:
"""Parse Venice API image response - handles both binary and JSON formats."""
content_type = response.headers.get("content-type", "")
if content_type.startswith("image/"):
return response.content, None
if "application/json" in content_type:
try:
data = response.json()
for field in ("image", "images", "data", "result"):
if field in data:
value = data[field]
if isinstance(value, str):
return base64.b64decode(value), None
if isinstance(value, list) and value:
return base64.b64decode(value[0]), None
if "error" in data:
return None, f"API error: {data.get('error')}"
return None, f"Unexpected JSON response keys: {list(data.keys())}"
except Exception as e:
return None, f"Failed to parse JSON response: {e}"
if len(response.content) > 1000:
return response.content, None
return None, f"Unknown response format: {content_type}, length: {len(response.content)}"
class Tools:
"""Venice.ai image tools: generate, upscale, and edit."""
class Valves(BaseModel):
VENICE_API_KEY: str = Field(default="", description="Venice.ai API key (admin fallback)")
DEFAULT_MODEL: str = Field(default="z-image-turbo", description="Default image model")
SAFE_MODE: bool = Field(default=True, description="Admin: Require SFW content")
HIDE_WATERMARK: bool = Field(default=False, description="Admin: Hide watermark")
COOLDOWN_SECONDS: int = Field(default=0, description="Min seconds between generations")
GENERATION_TIMEOUT: int = Field(default=180, description="Timeout for image generation in seconds")
ACCUMULATOR_TTL: int = Field(default=300, description="Seconds to keep accumulated files in memory")
class UserValves(BaseModel):
VENICE_API_KEY: str = Field(default="", description="Your Venice.ai API key (overrides admin)")
SAFE_MODE: bool = Field(default=True, description="Enable SFW content filtering")
HIDE_WATERMARK: bool = Field(default=False, description="Hide Venice.ai watermark")
DEFAULT_MODEL: str = Field(default="", description="Your preferred image model")
DEFAULT_NEGATIVE_PROMPT: str = Field(default="", description="Default negative prompt")
def __init__(self):
self.valves = self.Valves()
self.user_valves = self.UserValves()
self.citation = False
self._cooldowns: Dict[str, float] = {}
self._message_files: Dict[str, Dict[str, Any]] = {}
self._accumulator_lock: Optional[asyncio.Lock] = None
self._lock_init = threading.Lock()
self._last_cleanup: float = 0.0
def _is_safe_mode_enabled(self, __user__: dict = None) -> bool:
user_safe_mode = self.user_valves.SAFE_MODE
if __user__ and "valves" in __user__:
user_safe_mode = __user__["valves"].SAFE_MODE
return self.valves.SAFE_MODE or user_safe_mode
def _is_watermark_hidden(self, __user__: dict = None) -> bool:
user_hide_watermark = self.user_valves.HIDE_WATERMARK
if __user__ and "valves" in __user__:
user_hide_watermark = __user__["valves"].HIDE_WATERMARK
return self.valves.HIDE_WATERMARK or user_hide_watermark
def _get_default_model(self) -> str:
return self.user_valves.DEFAULT_MODEL or self.valves.DEFAULT_MODEL
def _get_default_negative_prompt(self) -> Optional[str]:
return self.user_valves.DEFAULT_NEGATIVE_PROMPT or None
def _get_owui_config(self, __request__=None) -> tuple[str, dict]:
base_url = ""
headers = {}
if __request__:
base_url = str(getattr(__request__, "base_url", "") or "")
req_headers = getattr(__request__, "headers", {})
if req_headers:
auth = dict(req_headers).get("authorization", "")
if auth:
headers["Authorization"] = auth
return base_url.rstrip("/"), headers
def _get_message_key(self, __metadata__: dict = None) -> str:
if __metadata__:
chat_id = __metadata__.get("chat_id", "")
message_id = __metadata__.get("message_id", "")
if message_id:
return f"{chat_id}:{message_id}"
return f"unknown:{int(time.time())}"
def _get_lock(self) -> asyncio.Lock:
if self._accumulator_lock is None:
with self._lock_init:
if self._accumulator_lock is None:
self._accumulator_lock = asyncio.Lock()
return self._accumulator_lock
async def _accumulate_files(self, key: str, new_files: List[dict], __event_emitter__: Callable[[dict], Any] = None):
all_files = []
async with self._get_lock():
if key not in self._message_files:
self._message_files[key] = {"files": [], "timestamp": time.time()}
for f in new_files:
self._message_files[key]["files"].append(dict(f))
self._message_files[key]["timestamp"] = time.time()
all_files = list(self._message_files[key]["files"])
now = time.time()
if now - self._last_cleanup > 60:
self._last_cleanup = now
ttl = self.valves.ACCUMULATOR_TTL
expired = [k for k, v in self._message_files.items() if now - v.get("timestamp", 0) > ttl]
for k in expired:
del self._message_files[k]
if all_files and __event_emitter__:
await __event_emitter__({"type": "files", "data": {"files": all_files}})
def _get_accumulated_count(self, key: str) -> int:
entry = self._message_files.get(key)
return len(entry.get("files", [])) if entry else 0
async def _upload_image(self, image_data: bytes | str, filename: str, metadata: dict, content_type: str = "image/webp", __request__=None) -> tuple[Optional[str], Optional[str]]:
base_url, headers = self._get_owui_config(__request__)
if not base_url:
return None, "Could not determine Open WebUI URL"
if isinstance(image_data, str):
try:
image_bytes = base64.b64decode(image_data)
except Exception as e:
return None, f"Failed to decode image: {e}"
else:
image_bytes = image_data
try:
async with httpx.AsyncClient(timeout=60.0) as client:
files = {"file": (filename, io.BytesIO(image_bytes), content_type)}
import json
data = {"metadata": json.dumps(metadata)}
response = await client.post(f"{base_url}/api/v1/files/", headers=headers, files=files, data=data, params={"process": "false", "process_in_background": "false"})
if response.status_code == 200:
result = response.json()
return result.get("id"), None
else:
return None, f"Upload failed: {response.status_code} - {response.text[:200]}"
except Exception as e:
return None, f"Upload error: {type(e).__name__}: {e}"
async def _get_image_bytes(self, image_source: str, __request__=None, __files__: list = None, __metadata__: dict = None) -> tuple[Optional[bytes], Optional[str]]:
base_url, headers = self._get_owui_config(__request__)
image_source = image_source.strip()
index = None
if image_source.lower() in ("last", "latest", "previous", "recent"):
index = -1
elif image_source.isdigit():
index = int(image_source) - 1
elif image_source.startswith("[") and image_source.endswith("]"):
inner = image_source[1:-1].strip()
if inner.isdigit():
index = int(inner) - 1
if index is not None:
if not base_url:
return None, "Could not determine Open WebUI URL"
message_ids = set()
if __metadata__:
if __metadata__.get("message_id"):
message_ids.add(__metadata__["message_id"])
parent = __metadata__.get("parent_message")
while parent:
if parent.get("id"):
message_ids.add(parent["id"])
parent = parent.get("parent_message")
if __metadata__.get("parent_message_id"):
message_ids.add(__metadata__["parent_message_id"])
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(f"{base_url}/api/v1/files/", headers=headers, params={"content": "false"})
if response.status_code != 200:
return None, f"Failed to list files: {response.status_code}"
files = response.json()
context_images = []
all_images = []
for f in files:
meta = f.get("meta", {})
content_type = meta.get("content_type", "")
if not content_type.startswith("image/"):
continue
all_images.append(f)
inner_data = meta.get("data", {})
file_msg_id = None
if isinstance(inner_data, dict):
file_msg_id = inner_data.get("message_id")
if not file_msg_id:
nested_data = inner_data.get("data", {})
if isinstance(nested_data, dict):
file_msg_id = nested_data.get("message_id")
if file_msg_id and file_msg_id in message_ids:
context_images.append(f)
user_images = context_images if context_images else all_images
user_images.sort(key=lambda f: f.get("created_at", 0), reverse=True)
if not user_images:
return None, "No images found"
if index == -1:
target_idx = 0
else:
target_idx = index
if 0 <= target_idx < len(user_images):
file_id = user_images[target_idx].get("id")
image_source = f"/api/v1/files/{file_id}/content"
else:
return None, f"Image [{index+1}] not found (found {len(user_images)} images)"
except Exception as e:
return None, f"Failed to resolve image index: {type(e).__name__}: {e}"
if __files__:
for f in __files__:
file_id = f.get("id", "")
file_url = f.get("url", "")
if image_source == file_id or image_source == file_url or image_source in file_url or file_id in image_source:
image_source = file_url or f"/api/v1/files/{file_id}/content"
break
if image_source.startswith("/api/"):
if not base_url:
return None, "Could not determine Open WebUI URL for relative path"
fetch_url = f"{base_url}{image_source}"
elif image_source.startswith("http://") or image_source.startswith("https://"):
fetch_url = image_source
else:
if not base_url:
return None, "Could not determine Open WebUI URL"
fetch_url = f"{base_url}/api/v1/files/{image_source}/content"
try:
async with httpx.AsyncClient(timeout=60.0) as client:
response = await client.get(fetch_url, headers=headers)
if response.status_code == 200:
return response.content, None
else:
return None, f"Failed to fetch image: {response.status_code} from {fetch_url}"
except Exception as e:
return None, f"Fetch error: {type(e).__name__}: {e}"
async def generate_image(
self,
prompt: str,
model: Optional[str] = None,
width: int = 1024,
height: int = 1024,
negative_prompt: Optional[str] = None,
style_preset: Optional[str] = None,
variants: int = 1,
__request__=None,
__user__: dict = None,
__metadata__: dict = None,
__event_emitter__: Callable[[dict], Any] = None
) -> dict:
retVal = {
"status": "failed",
"message": "",
"settings": {},
"images": [],
}
venice_key = VeniceImage.get_api_key(self.valves, self.user_valves, __user__)
if not venice_key:
retVal["message"] = "Error: Venice.ai API key not configured",
return retVal
if not prompt or not prompt.strip():
retVal["message"] = "Error: Prompt is required",
return retVal
msg_key = self._get_message_key(__metadata__)
user_id = __user__.get("id", "default") if __user__ else "default"
cooldown = self.valves.COOLDOWN_SECONDS
if cooldown > 0:
now = time.time()
last_gen = self._cooldowns.get(user_id, 0)
is_reentrant = self._get_accumulated_count(msg_key) > 0
if not is_reentrant and now - last_gen < cooldown:
remaining = cooldown - (now - last_gen)
retVal["message"] = "Error: Rate limited. Wait {remaining:.1f}s.",
return retVal
self._cooldowns[user_id] = now
model = model or self._get_default_model()
safe_mode = self._is_safe_mode_enabled(__user__)
hide_watermark = self._is_watermark_hidden(__user__)
effective_negative_prompt = negative_prompt or self._get_default_negative_prompt()
variants = max(1, min(4, variants))
width = max(512, min(1280, width))
height = max(512, min(1280, height))
existing_count = self._get_accumulated_count(msg_key)
if __event_emitter__:
status_msg = f"Generating {variants} image{'s' if variants > 1 else ''} with {model}"
if existing_count > 0:
status_msg += f" (adding to {existing_count} existing)"
if safe_mode:
status_msg += " [SFW]"
else:
status_msg += " [NSFW]"
await __event_emitter__({"type": "status", "data": {"description": f"{status_msg}...", "done": False}})
payload = {"model": model, "prompt": prompt, "width": width, "height": height, "safe_mode": safe_mode, "hide_watermark": hide_watermark, "return_binary": False, "variants": variants}
if effective_negative_prompt:
payload["negative_prompt"] = effective_negative_prompt
if style_preset:
payload["style_preset"] = style_preset
retried = False
dropped_params = []
try:
async with httpx.AsyncClient(timeout=float(self.valves.GENERATION_TIMEOUT)) as client:
response = await client.post("https://api.venice.ai/api/v1/image/generate", headers={"Authorization": f"Bearer {venice_key}", "Content-Type": "application/json"}, json=payload)
if response.status_code in (400, 422) and not retried:
error_text = response.text.lower()
if any(kw in error_text for kw in ("style", "unsupported", "invalid", "parameter")):
if style_preset:
del payload["style_preset"]
dropped_params.append("style_preset")
if effective_negative_prompt:
del payload["negative_prompt"]
dropped_params.append("negative_prompt")
if dropped_params:
retried = True
response = await client.post("https://api.venice.ai/api/v1/image/generate", headers={"Authorization": f"Bearer {venice_key}", "Content-Type": "application/json"}, json=payload)
response.raise_for_status()
result = response.json()
except httpx.HTTPStatusError as e:
if __event_emitter__:
await __event_emitter__({"type": "status", "data": {"done": True}})
retVal["message"] = f"Status: {e.response.status_code} Error: {e.response.text[:200]}",
return retVal["message"]
except httpx.TimeoutException:
if __event_emitter__:
await __event_emitter__({"type": "status", "data": {"done": True}})
retVal["message"] = f"Status: 408\nError: Timed out after {self.valves.GENERATION_TIMEOUT}s",
return retVal
except Exception as e:
if __event_emitter__:
await __event_emitter__({"type": "status", "data": {"done": True}})
retVal["message"] = f"Status: 0\nError: {type(e).__name__}: {e}",
return retVal
images = result.get("images", [])
if not images:
if __event_emitter__:
await __event_emitter__({"type": "status", "data": {"done": True}})
retVal["message"] = f"Status: 0\nError: No images returned",
return retVal
if __event_emitter__:
await __event_emitter__({"type": "status", "data": {"description": f"Uploading {len(images)} images...", "done": False}})
chat_id = __metadata__.get("chat_id") if __metadata__ else None
message_id = __metadata__.get("message_id") if __metadata__ else None
uploaded_files = []
errors = []
for i, image_b64 in enumerate(images):
timestamp = int(time.time() * 1000)
filename = f"venice_{model}_{timestamp}_{i+1}.webp"
file_metadata = {"name": filename, "content_type": "image/webp", "data": {"model": model, "prompt": prompt, "negative_prompt": effective_negative_prompt, "style_preset": style_preset, "width": width, "height": height, "variant": i+1, "total_variants": len(images), "safe_mode": safe_mode, "hide_watermark": hide_watermark}}
if chat_id:
file_metadata["chat_id"] = chat_id
if message_id:
file_metadata["message_id"] = message_id
file_id, error = await self._upload_image(image_b64, filename, file_metadata, "image/webp", __request__)
if file_id:
uploaded_files.append({"type": "image", "url": f"/api/v1/files/{file_id}/content"})
else:
errors.append(f"Variant {i+1}: {error}")
if uploaded_files:
await self._accumulate_files(msg_key, uploaded_files, __event_emitter__)
final_count = self._get_accumulated_count(msg_key)
if __event_emitter__:
await __event_emitter__({"type": "status", "data": {"description": f"Done ({final_count} images total)", "done": True}})
retVal["status"] = "success"
retVal["message"] = "The image has been successfully generated and is already visible to the user in the chat. You do not need to display or embed the image again - just acknowledge that it has been created.",
if safe_mode:
retVal["settings"]["safe_mode"]: "SFW"
else:
retVal["settings"]["safe_mode"]: "NSFW"
if hide_watermark:
retVal["settings"]["hide_watermark"]: "hide_watermark"
if uploaded_files:
retVal["images"] = uploaded_files
if dropped_params:
retVal["note"] = f" {model} doesn't support: {', '.join(dropped_params)} (ignored)"
if errors:
retVal["warnings"] = errors
return retVal
async def upscale_image(self, image: str, scale: int = 2, enhance: bool = False, enhance_creativity: float = 0.5, enhance_prompt: Optional[str] = None, __request__=None, __user__: dict = None, __metadata__: dict = None, __files__: list = None, __event_emitter__: Callable[[dict], Any] = None) -> str:
venice_key = VeniceImage.get_api_key(self.valves, self.user_valves, __user__)
if not venice_key:
return "Upscale Image\nStatus: 0\nError: Venice.ai API key not configured."
scale = max(1, min(4, scale))
if scale == 1 and not enhance:
return "Upscale Image\nStatus: 0\nError: scale=1 requires enhance=True"
enhance_creativity = max(0.0, min(1.0, enhance_creativity))
msg_key = self._get_message_key(__metadata__)
existing_count = self._get_accumulated_count(msg_key)
if __event_emitter__:
mode = "Enhancing" if scale == 1 else f"Upscaling {scale}x"
if enhance and scale > 1:
mode += " + enhancing"
await __event_emitter__({"type": "status", "data": {"description": f"{mode}...", "done": False}})
image_bytes, error = await self._get_image_bytes(image, __request__, __files__, __metadata__)
if error:
if __event_emitter__:
await __event_emitter__({"type": "status", "data": {"done": True}})
return f"Upscale Image\nStatus: 0\nError: {error}"
payload = {"image": base64.b64encode(image_bytes).decode("utf-8"), "scale": scale}
if enhance:
payload["enhance"] = True
payload["enhanceCreativity"] = enhance_creativity
if enhance_prompt:
payload["enhancePrompt"] = enhance_prompt[:1500]
try:
async with httpx.AsyncClient(timeout=float(self.valves.GENERATION_TIMEOUT)) as client:
response = await client.post("https://api.venice.ai/api/v1/image/upscale", headers={"Authorization": f"Bearer {venice_key}", "Content-Type": "application/json"}, json=payload)
response.raise_for_status()
result_bytes, parse_error = VeniceImage.parse_venice_image_response(response)
if parse_error:
if __event_emitter__:
await __event_emitter__({"type": "status", "data": {"done": True}})
return f"Upscale Image\nStatus: 0\nError: {parse_error}"
except httpx.HTTPStatusError as e:
if __event_emitter__:
await __event_emitter__({"type": "status", "data": {"done": True}})
return f"Upscale Image\nStatus: {e.response.status_code}\nError: {e.response.text[:200]}"
except Exception as e:
if __event_emitter__:
await __event_emitter__({"type": "status", "data": {"done": True}})
return f"Upscale Image\nStatus: 0\nError: {type(e).__name__}: {e}"
if __event_emitter__:
await __event_emitter__({"type": "status", "data": {"description": "Uploading...", "done": False}})
chat_id = __metadata__.get("chat_id") if __metadata__ else None
message_id = __metadata__.get("message_id") if __metadata__ else None
timestamp = int(time.time() * 1000)
filename = f"venice_upscale_{scale}x_{timestamp}.png"
file_metadata = {"name": filename, "content_type": "image/png", "data": {"operation": "upscale", "scale": scale, "enhance": enhance, "enhance_creativity": enhance_creativity if enhance else None, "enhance_prompt": enhance_prompt if enhance else None}}
if chat_id:
file_metadata["chat_id"] = chat_id
if message_id:
file_metadata["message_id"] = message_id
file_id, upload_error = await self._upload_image(result_bytes, filename, file_metadata, "image/png", __request__)
if not file_id:
if __event_emitter__:
await __event_emitter__({"type": "status", "data": {"done": True}})
return f"Upscale Image\nStatus: 0\nError: Upload failed - {upload_error}"
new_file = {"type": "image", "url": f"/api/v1/files/{file_id}/content"}
await self._accumulate_files(msg_key, [new_file], __event_emitter__)
final_count = self._get_accumulated_count(msg_key)
if __event_emitter__:
await __event_emitter__({"type": "status", "data": {"description": f"Done ({final_count} images total)", "done": True}})
return f"Upscale Image\nStatus: 200\n\nUpscaled {scale}x{' with enhancement' if enhance else ''}\n\nFile: {new_file['url']}"
async def edit_image(self, image: str, prompt: str, __request__=None, __user__: dict = None, __metadata__: dict = None, __files__: list = None, __event_emitter__: Callable[[dict], Any] = None) -> str:
venice_key = VeniceImage.get_api_key(self.valves, self.user_valves, __user__)
if not venice_key:
return "Edit Image\nStatus: 0\nError: Venice.ai API key not configured."
if not prompt or not prompt.strip():
return "Edit Image\nStatus: 0\nError: Edit prompt is required"
prompt = prompt.strip()[:1500]
msg_key = self._get_message_key(__metadata__)
existing_count = self._get_accumulated_count(msg_key)
if __event_emitter__:
await __event_emitter__({"type": "status", "data": {"description": "Editing image...", "done": False}})
image_bytes, error = await self._get_image_bytes(image, __request__, __files__, __metadata__)
if error:
if __event_emitter__:
await __event_emitter__({"type": "status", "data": {"done": True}})
return f"Edit Image\nStatus: 0\nError: {error}"
payload = {"image": base64.b64encode(image_bytes).decode("utf-8"), "prompt": prompt}
try:
async with httpx.AsyncClient(timeout=float(self.valves.GENERATION_TIMEOUT)) as client:
response = await client.post("https://api.venice.ai/api/v1/image/edit", headers={"Authorization": f"Bearer {venice_key}", "Content-Type": "application/json"}, json=payload)
response.raise_for_status()
result_bytes, parse_error = VeniceImage.parse_venice_image_response(response)
if parse_error:
if __event_emitter__:
await __event_emitter__({"type": "status", "data": {"done": True}})
return f"Edit Image\nStatus: 0\nError: {parse_error}"
except httpx.HTTPStatusError as e:
if __event_emitter__:
await __event_emitter__({"type": "status", "data": {"done": True}})
return f"Edit Image\nStatus: {e.response.status_code}\nError: {e.response.text[:200]}"
except Exception as e:
if __event_emitter__:
await __event_emitter__({"type": "status", "data": {"done": True}})
return f"Edit Image\nStatus: 0\nError: {type(e).__name__}: {e}"
if __event_emitter__:
await __event_emitter__({"type": "status", "data": {"description": "Uploading...", "done": False}})
chat_id = __metadata__.get("chat_id") if __metadata__ else None
message_id = __metadata__.get("message_id") if __metadata__ else None
timestamp = int(time.time() * 1000)
filename = f"venice_edit_{timestamp}.png"
file_metadata = {"name": filename, "content_type": "image/png", "data": {"operation": "edit", "prompt": prompt}}
if chat_id:
file_metadata["chat_id"] = chat_id
if message_id:
file_metadata["message_id"] = message_id
file_id, upload_error = await self._upload_image(result_bytes, filename, file_metadata, "image/png", __request__)
if not file_id:
if __event_emitter__:
await __event_emitter__({"type": "status", "data": {"done": True}})
return f"Edit Image\nStatus: 0\nError: Upload failed - {upload_error}"
new_file = {"type": "image", "url": f"/api/v1/files/{file_id}/content"}
await self._accumulate_files(msg_key, [new_file], __event_emitter__)
final_count = self._get_accumulated_count(msg_key)
if __event_emitter__:
await __event_emitter__({"type": "status", "data": {"description": f"Done ({final_count} images total)", "done": True}})
return f"Edit Image\nStatus: 200\n\nApplied edit: {prompt[:80]}{'...' if len(prompt) > 80 else ''}\n\nFile: {new_file['url']}"