""" title: Multi-Model Chat Orchestrator author: Jeff Smith + Kimi K2 version: 1.0.0 license: MIT required_open_webui_version: 0.7.0 requirements: httpx, pydantic description: | Genuine model-to-model conversations with full persistence. Enables an orchestrator model to coordinate real chats between different models. Features relationship-based model discovery, programmatic tagging, and robust multi-turn conversation handling. 🚀 QUICK START: 1. list_models() - See available models (no limits) 2. get_models_by_tag("cap:artist") - Find models by capability tag 3. discover_model_relationships("rel:works-with") - Visualize model networks 4. chat_with_model("new", "luna", "Hello!") - Start new orchestrated chat 5. chat_with_model("{chat_id}", None, "Continue...") - Multi-turn conversation 🔑 ORCHESTRATOR PATTERN: - YOU are the orchestrator - you coordinate, you don't write content - Target models receive messages with injection: "(You may respond in chat {chat_id})" - All completions are automatically persisted with proper parent-child linkage - Use cap:artist, rel:works-with-luna tags for intelligent model selection """ import json import uuid import time import asyncio from typing import Callable, Any, Optional, Dict, List from pydantic import BaseModel, Field import httpx class OWUIChat: """ """ def __init(self, tools_instance): self.tools = tools_instance @property def valves(self): """Access valves from parent Tools instance""" return self.tools.valves def get_api_config(self, __request__=None) -> tuple[str, dict]: """Extract base URL and auth from request context [1].""" base_url = "" headers = {"Content-Type": "application/json"} if __request__: base_url = str(getattr(__request__, "base_url", "") or "") req_headers = getattr(__request__, "headers", {}) if req_headers: auth = dict(req_headers).get("authorization", "") if auth: headers["Authorization"] = auth return base_url.rstrip("/"), headers async def api_request( self, method: str, endpoint: str, __request__=None, json_data: Dict = None, params: Dict = None, ) -> tuple[int, Any]: """Make authenticated API request with proper error handling.""" base_url, headers = self.get_api_config(__request__) if not base_url: return 0, {"error": "Could not determine API URL"} url = f"{base_url}{endpoint}" try: async with httpx.AsyncClient( headers=headers, timeout=float(self.valves().TIMEOUT) ) as client: response = await client.request( method, url, json=json_data, params=params ) content_type = response.headers.get("content-type", "") if "application/json" in content_type: return response.status_code, response.json() return response.status_code, response.text except Exception as e: return 0, {"error": f"Request failed: {str(e)}"} class Tools: """Multi-Model Chat Orchestrator - Real model-to-model communication with persistence.""" class Valves(BaseModel): """Configuration for the orchestrator.""" DEFAULT_MODEL_TAGS: str = Field( default="assistant,helper,agent", description="Comma-separated tags for automatic model discovery", ) TIMEOUT: int = Field(default=180, description="API request timeout in seconds") DEBUG_MODE: bool = Field( default=False, description="Enable verbose error output" ) MAX_RESULTS: int = Field( default=50, description="Maximum models to display in list operations" ) TAG_PREFIX_CAPABILITY: str = Field( default="cap:", description="Prefix for capability tags" ) TAG_PREFIX_RELATIONSHIP: str = Field( default="rel:", description="Prefix for relationship tags" ) class UserValves(BaseModel): pass def __init__(self): self.valves = self.Valves() self.owui_chat = OWUIChat(self) async def find_chat_by_title(self, title: str, __request__=None) -> dict: """Find chat ID by title (partial match). Returns exact ID if one match found.""" status, data = await self.owui_chat.api_request("GET", "/api/v1/chats/", __request__) retVal = {"status": "failure", "message": ""} if status != 200: retVal["message"] = f"❌ find_chat_by_title FAILED\nHTTP {status}: {data}" return retVal chat_list = ( data if isinstance(data, list) else data.get("items", []) if isinstance(data, dict) else [] ) matches = [] for chat in chat_list: if not isinstance(chat, dict): continue chat_title = chat.get("title", "") if title.lower() in chat_title.lower(): matches.append( { "id": chat.get("id"), "title": chat_title, "models": ( chat.get("chat", {}).get("models", []) if isinstance(chat.get("chat"), dict) else [] ), } ) if not matches: retVal["messages"] = f"❌ No chats found matching '{title}'" return retVal if len(matches) == 1: m = matches[0] retVal["status"] = "success" retVal["message"] = f"✅ Single match found:\nChat ID: {m['id']}\nTitle: {m['title']}" return retVal result = f"⚠️ Multiple matches ({len(matches)}):\n\n" for i, m in enumerate(matches[:5], 1): result += f"{i}. {m['title']}\n ID: {m['id']}\n" retVal["status"] = "success" retVal["message"] = result return retVal async def owui_list_models(self, __request__=None) -> dict: """List all available models with IDs, tags, and capabilities.""" status, data = await self.owui_chat.api_request("GET", "/api/v1/models/", __request__) retVal = {"status": "failure", "message": "", "models": []} if status != 200: retVal["message"] = f"❌ list_models FAILED\nHTTP {status}: {data}" return retVal models = ( data if isinstance(data, list) else data.get("items", []) if isinstance(data, dict) else [] ) if not models: retVal = "❌ No models found" return retVal retVal["message"] = f"📋 Available Models ({len(models)} total, showing {self.valves.MAX_RESULTS}):\n\n" for model in models[: self.valves.MAX_RESULTS]: if not isinstance(model, dict): continue if not "name" in model or model.get("name") == "": continue retVal["models"].append(model) # name = model.get("name", model.get("id", "Unknown")) # model_id = model.get("id") # meta = model.get("meta", {}) # tags = meta.get("tags", []) # tag_str = ( # ", ".join( # t.get("name") if isinstance(t, dict) else t for t in tags if t # ) # if tags # else "none" # ) # # capabilities = meta.get("capabilities", {}) # cap_str = ( # ", ".join(f"{k}={v}" for k, v in capabilities.items()) # if capabilities # else "none" # ) # # result += f"• {name}\n ID: {model_id}\n Tags: {tag_str}\n Capabilities: {cap_str}\n\n" if len(models) > self.valves.MAX_RESULTS: retVal["message"] += f"... and {len(models) - self.valves.MAX_RESULTS} more\n" return retVal async def get_models_by_tag(self, tag: str, __request__=None) -> str: """Get models by tag name with full tag profiles.""" status, data = await self._api_request( "GET", f"/api/v1/models/list?tag={tag}&page=1", __request__ ) if status != 200: return f"❌ get_models_by_tag FAILED\nHTTP {status}: {data}" items = ( data.get("items", []) if isinstance(data, dict) else data if isinstance(data, list) else [] ) total = data.get("total", len(items)) if isinstance(data, dict) else len(items) if not items: return f"❌ No models found with tag '{tag}'" result = f"✅ Models with tag '{tag}' ({total} total):\n\n" for model in items[: self.valves.MAX_RESULTS]: if not isinstance(model, dict): continue name = model.get("name", model.get("id")) model_id = model.get("id") meta = model.get("meta", {}) all_tags = meta.get("tags", []) tag_list = ( ", ".join( t.get("name", t) for t in all_tags if isinstance(t, dict) or isinstance(t, str) ) if all_tags else "none" ) result += f"• {name} (`{model_id}`)\n All tags: {tag_list}\n\n" if len(items) > self.valves.MAX_RESULTS: result += f"... and {len(items) - self.valves.MAX_RESULTS} more\n" return result async def get_model_by_name(self, name: str, __request__=None) -> str: """Find model by partial name match.""" status, data = await self._api_request("GET", "/api/v1/models/", __request__) if status != 200: return f"❌ get_model_by_name FAILED\nHTTP {status}: {data}" models = ( data if isinstance(data, list) else data.get("items", []) if isinstance(data, dict) else [] ) if not models: return "❌ get_model_by_name FAILED\nNo models returned" matches = [] for model in models: if not isinstance(model, dict): continue model_name = model.get("name", model.get("id", "")) if name.lower() in model_name.lower(): matches.append(model) if not matches: return f"❌ No models matching '{name}'" if len(matches) == 1: m = matches[0] name = m.get("name", m.get("id")) model_id = m.get("id") meta = m.get("meta", {}) tags = meta.get("tags", []) tag_str = ( ", ".join( t.get("name", t) for t in tags if isinstance(t, dict) or isinstance(t, str) ) if tags else "none" ) return f"✅ Found: {name}\nID: {model_id}\nTags: {tag_str}" result = f"⚠️ Multiple matches ({len(matches)}):\n\n" for i, m in enumerate(matches[:5], 1): result += f"{i}. {m.get('name')}\n ID: {m.get('id')}\n\n" return result async def discover_model_relationships( self, relationship_prefix: str = "rel:works-with", __request__=None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """Discover models with relationship tags and dump their full tag profiles.""" if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": "Scanning for relationships...", "done": False, }, } ) status, data = await self._api_request("GET", "/api/v1/models/", __request__) if status != 200: return f"❌ Failed: HTTP {status}" models = ( data if isinstance(data, list) else data.get("items", []) if isinstance(data, dict) else [] ) relationship_map = {} for model in models: if not isinstance(model, dict): continue model_id = model.get("id") model_name = model.get("name", model_id) meta = model.get("meta", {}) tags = meta.get("tags", []) # Extract all relationship tags for tag in tags: tag_name = tag.get("name") if isinstance(tag, dict) else tag if tag_name and tag_name.lower().startswith( relationship_prefix.lower() ): target = tag_name[len(relationship_prefix) :].strip() relationship_map.setdefault(target, []).append( { "model_id": model_id, "model_name": model_name, "all_tags": [t.get("name", t) for t in tags if t], "capabilities": meta.get("capabilities", {}), } ) if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Found {len(relationship_map)} relationship(s)", "done": True, }, } ) if not relationship_map: return f"❌ No '{relationship_prefix}' relationships found" result = f"🔍 Relationship Dump: '{relationship_prefix}'\n{'='*60}\n\n" for target, sources in sorted(relationship_map.items()): result += f"📌 Target: **{target}** ({len(sources)} source(s))\n" for src in sources: result += f" • {src['model_name']} (`{src['model_id']}`)\n" result += f" Tags: {', '.join(src['all_tags']) or 'none'}\n" result += f" Caps: {', '.join(f'{k}:{v}' for k,v in src['capabilities'].items()) or 'none'}\n" result += "\n" return result async def add_capability_tag( self, model_id: str, capability: str, __request__=None, __event_emitter__: Callable[[dict], Any] = None, ) -> dict: """Add a capability tag (cap:) to a model.""" if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Adding capability tag to {model_id}...", "done": False, }, } ) # Get current model data status, model_data = await self._api_request( "GET", f"/api/v1/models/model?id={model_id}", __request__ ) if status != 200: return f"❌ Could not get model: {model_data}" meta = model_data.get("meta", {}) tags = meta.get("tags", []) # Add new capability tag new_tag = f"{self.valves.TAG_PREFIX_CAPABILITY}{capability}" if new_tag not in [t.get("name", t) for t in tags]: tags.append({"name": new_tag}) # Update model update_payload = {"id": model_id, "meta": {**meta, "tags": tags}} status, response = await self._api_request( "POST", "/api/v1/models/model/update", __request__, json_data=update_payload ) if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Tag added successfully", "done": True}, } ) if status == 200: return f"✅ Added capability tag '{new_tag}' to {model_id}" return f"❌ Failed to update model: {response}" async def add_relationship_tag( self, model_id: str, relationship: str, target_model: str, __request__=None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """Add a relationship tag (rel:) to a model.""" if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Adding relationship tag to {model_id}...", "done": False, }, } ) # Get current model data status, model_data = await self._api_request( "GET", f"/api/v1/models/model?id={model_id}", __request__ ) if status != 200: return f"❌ Could not get model: {model_data}" meta = model_data.get("meta", {}) tags = meta.get("tags", []) # Add new relationship tag new_tag = f"{self.valves.TAG_PREFIX_RELATIONSHIP}{relationship}:{target_model}" if new_tag not in [t.get("name", t) for t in tags]: tags.append({"name": new_tag}) # Update model update_payload = {"id": model_id, "meta": {**meta, "tags": tags}} status, response = await self._api_request( "POST", "/api/v1/models/model/update", __request__, json_data=update_payload ) if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Relationship tag added", "done": True}, } ) if status == 200: return f"✅ Added relationship tag '{new_tag}' to {model_id}" return f"❌ Failed to update model: {response}" async def dump_tag_taxonomy(self, __request__=None) -> str: """Dump complete tag taxonomy showing cap: and rel: distributions.""" status, data = await self._api_request("GET", "/api/v1/models/", __request__) if status != 200: return f"❌ Failed: HTTP {status}" models = ( data if isinstance(data, list) else data.get("items", []) if isinstance(data, dict) else [] ) capability_tags = {} relationship_tags = {} all_capabilities = {} for model in models: if not isinstance(model, dict): continue model_id = model.get("id") model_name = model.get("name", model_id) meta = model.get("meta", {}) tags = meta.get("tags", []) capabilities = meta.get("capabilities", {}) # Aggregate capabilities for cap, val in capabilities.items(): all_capabilities.setdefault(cap, []).append(f"{model_name}={val}") # Categorize tags for tag in tags: tag_name = tag.get("name") if isinstance(tag, dict) else tag if not tag_name: continue if tag_name.startswith(self.valves.TAG_PREFIX_CAPABILITY): capability_tags.setdefault(tag_name, []).append(model_name) elif tag_name.startswith(self.valves.TAG_PREFIX_RELATIONSHIP): relationship_tags.setdefault(tag_name, []).append(model_name) result = "📊 Tag Taxonomy Dump\n" + "=" * 60 + "\n\n" if capability_tags: result += "🎯 CAPABILITY TAGS:\n" for cap, models in sorted(capability_tags.items()): result += f" {cap}: {len(models)} model(s)\n" for m in models[:3]: result += f" • {m}\n" if len(models) > 3: result += f" ... and {len(models)-3} more\n" result += "\n" if relationship_tags: result += "🔗 RELATIONSHIP TAGS:\n" for rel, models in sorted(relationship_tags.items()): result += f" {rel}: {len(models)} model(s)\n" for m in models[:3]: result += f" • {m}\n" if len(models) > 3: result += f" ... and {len(models)-3} more\n" result += "\n" if all_capabilities: result += "🧠 CAPABILITIES:\n" for cap, instances in sorted(all_capabilities.items()): result += f" {cap}: {len(instances)} value(s)\n" for inst in instances[:3]: result += f" • {inst}\n" if len(instances) > 3: result += f" ... and {len(instances)-3} more\n" return result or "❌ No tags found" async def chat_with_model( self, chat_id: str, model_identifier: Optional[str], message: str, __request__=None, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """🚀 PRIMARY TOOL: Robust multi-turn chat with proper message threading.""" if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Resolving model...", "done": False}, } ) # NEW CHAT: model_identifier is REQUIRED if chat_id == "new": if not model_identifier: return "❌ chat_with_model FAILED: model_identifier required when creating new chat" # Resolve model_identifier to actual model_id target_model = None model_name = model_identifier # Try tag first (if no slash in identifier) if "/" not in model_identifier: status, tag_data = await self._api_request( "GET", f"/api/v1/models/list?tag={model_identifier}&page=1", __request__, ) if status == 200: items = ( tag_data.get("items", []) if isinstance(tag_data, dict) else tag_data ) if items and isinstance(items, list) and len(items) > 0: target_model = items[0].get("id") model_name = items[0].get("name", target_model) # Fallback to direct model ID if not target_model: target_model = model_identifier # Get friendly name status, model_info = await self._api_request( "GET", f"/api/v1/models/{model_identifier}", __request__ ) if status == 200 and isinstance(model_info, dict): model_name = model_info.get("name", model_identifier) else: model_name = model_identifier.replace("-", " ").title() if not target_model: return "❌ chat_with_model FAILED: Could not resolve target model" # Create new chat if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "Creating chat...", "done": False}, } ) payload = { "chat": { "title": f"Orchestrated: {model_name}", "models": [target_model], "params": {}, "history": {"messages": {}, "currentId": None}, "messages": [], "tags": ["orchestrated"], "files": [], } } status, chat_data = await self._api_request( "POST", "/api/v1/chats/new", __request__, json_data=payload ) if status != 200: return f"❌ chat_with_model FAILED\nCould not create chat: {chat_data}" chat_id = chat_data.get("id") if not chat_id: return "❌ chat_with_model FAILED: No chat ID returned" # EXISTING CHAT: model_identifier is OPTIONAL else: # Verify chat exists status, chat_data = await self._api_request( "GET", f"/api/v1/chats/{chat_id}", __request__ ) if status != 200: return f"❌ chat_with_model FAILED\nCould not get chat: {chat_data}" chat_obj = chat_data.get("chat", {}) existing_models = chat_obj.get("models", []) if model_identifier: # Use provided model target_model = model_identifier model_name = model_identifier.replace("-", " ").title() elif existing_models and len(existing_models) > 0: # Auto-detect from chat target_model = existing_models[0] status, model_info = await self._api_request( "GET", f"/api/v1/models/{target_model}", __request__ ) if status == 200 and isinstance(model_info, dict): model_name = model_info.get("name", target_model) else: model_name = target_model.replace("-", " ").title() else: return "❌ chat_with_model FAILED: No model found and no model_identifier provided" if __event_emitter__: await __event_emitter__( { "type": "status", "data": { "description": f"Chatting with {model_name}...", "done": False, }, } ) # Build conversation history WITH PROPER THREADING status, chat_data = await self._api_request( "GET", f"/api/v1/chats/{chat_id}", __request__ ) if status != 200: return f"❌ Could not retrieve chat history: {chat_data}" chat_obj = chat_data.get("chat", {}) existing_messages = chat_obj.get("messages", []) completion_messages = [] for msg in existing_messages: if msg.get("role") in ["user", "assistant"]: completion_messages.append( {"role": msg["role"], "content": msg["content"]} ) # 🔑 KEY ENHANCEMENT: Enhanced chat_id injection for multi-turn awareness enhanced_message = f"{message}\n\n[System: This is part of chat {chat_id}. You may respond here.]" completion_messages.append({"role": "user", "content": enhanced_message}) # Send to target model status, completion_data = await self._api_request( "POST", "/api/v1/chat/completions", __request__, json_data={ "model": target_model, "stream": False, "messages": completion_messages, "chat_id": chat_id, }, ) if status != 200: return f"❌ chat_with_model FAILED\nCompletion error: {completion_data}" try: response_text = completion_data["choices"][0]["message"]["content"] except (KeyError, IndexError, TypeError) as e: return f"❌ Parse error: {e}" # Persist with proper message threading updated_chat = self._build_message_structure( existing_messages, enhanced_message, response_text, target_model, model_name ) updated_chat["title"] = chat_obj.get("title", f"Chat with {model_name}") status, _ = await self._api_request( "POST", f"/api/v1/chats/{chat_id}", __request__, json_data={"chat": updated_chat}, ) if __event_emitter__: await __event_emitter__( { "type": "status", "data": {"description": "✅ Response persisted", "done": True}, } ) return f"""✅ chat_with_model SUCCESS ═══════════════════════════════════════════════════════════ Model: {model_name} ({target_model}) Chat ID: {chat_id} ═══════════════════════════════════════════════════════════ Your message: {message} ═══════════════════════════════════════════════════════════ Response (persisted): {response_text} ═══════════════════════════════════════════════════════════ ✅ This multi-turn response is now in chat history """ def _build_message_structure( self, existing_messages: List[Dict], user_content: str, assistant_content: str, model: str, model_name: str = None, ) -> Dict: """Build chat structure with proper message linking for multi-turn.""" if not model_name: model_name = model.replace("-", " ").title() timestamp = int(time.time()) user_uuid = str(uuid.uuid4()) asst_uuid = str(uuid.uuid4()) # Find last message ID for proper threading last_msg_id = existing_messages[-1].get("id") if existing_messages else None # Build user message user_msg = { "id": user_uuid, "parentId": last_msg_id, "childrenIds": [asst_uuid], "role": "user", "content": user_content, "timestamp": timestamp, "models": [model], } # Build assistant message asst_msg = { "id": asst_uuid, "parentId": user_uuid, "childrenIds": [], "role": "assistant", "content": assistant_content, "model": model, "modelName": model_name, "modelIdx": 0, "timestamp": timestamp, "done": True, } # Update parent's children if exists if last_msg_id: for msg in existing_messages: if msg.get("id") == last_msg_id: if "childrenIds" not in msg: msg["childrenIds"] = [] msg["childrenIds"].append(user_uuid) break all_messages = existing_messages + [user_msg, asst_msg] history_messages = {msg["id"]: msg for msg in all_messages} return { "models": [model], "params": {}, "history": {"messages": history_messages, "currentId": asst_uuid}, "messages": all_messages, "tags": ["orchestrated"], "files": [], }