From f9d5762ad9bb725c45fdb55eb476006ed3f5d6c3 Mon Sep 17 00:00:00 2001 From: Jeffrey Smith Date: Sun, 18 Jan 2026 19:30:15 -0500 Subject: [PATCH] slowly working on the revised version of the chat, my rethink the whole idea --- owui/chat.py | 842 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 842 insertions(+) create mode 100644 owui/chat.py diff --git a/owui/chat.py b/owui/chat.py new file mode 100644 index 0000000..9da69ce --- /dev/null +++ b/owui/chat.py @@ -0,0 +1,842 @@ +""" +title: Multi-Model Chat Orchestrator +author: Jeff Smith + Kimi K2 +version: 1.0.0 +license: MIT +required_open_webui_version: 0.7.0 +requirements: httpx, pydantic +description: | + Genuine model-to-model conversations with full persistence. Enables an orchestrator model to + coordinate real chats between different models. Features relationship-based model discovery, + programmatic tagging, and robust multi-turn conversation handling. + +🚀 QUICK START: + 1. list_models() - See available models (no limits) + 2. get_models_by_tag("cap:artist") - Find models by capability tag + 3. discover_model_relationships("rel:works-with") - Visualize model networks + 4. chat_with_model("new", "luna", "Hello!") - Start new orchestrated chat + 5. chat_with_model("{chat_id}", None, "Continue...") - Multi-turn conversation + +🔑 ORCHESTRATOR PATTERN: + - YOU are the orchestrator - you coordinate, you don't write content + - Target models receive messages with injection: "(You may respond in chat {chat_id})" + - All completions are automatically persisted with proper parent-child linkage + - Use cap:artist, rel:works-with-luna tags for intelligent model selection +""" + +import json +import uuid +import time +import asyncio +from typing import Callable, Any, Optional, Dict, List +from pydantic import BaseModel, Field +import httpx + +class OWUIChat: + """ + """ + + def __init(self, tools_instance): + self.tools = tools_instance + + @property + def valves(self): + """Access valves from parent Tools instance""" + return self.tools.valves + + def get_api_config(self, __request__=None) -> tuple[str, dict]: + """Extract base URL and auth from request context [1].""" + base_url = "" + headers = {"Content-Type": "application/json"} + + if __request__: + base_url = str(getattr(__request__, "base_url", "") or "") + req_headers = getattr(__request__, "headers", {}) + if req_headers: + auth = dict(req_headers).get("authorization", "") + if auth: + headers["Authorization"] = auth + + return base_url.rstrip("/"), headers + + async def api_request( + self, + method: str, + endpoint: str, + __request__=None, + json_data: Dict = None, + params: Dict = None, + ) -> tuple[int, Any]: + """Make authenticated API request with proper error handling.""" + base_url, headers = self.get_api_config(__request__) + if not base_url: + return 0, {"error": "Could not determine API URL"} + + url = f"{base_url}{endpoint}" + try: + async with httpx.AsyncClient( + headers=headers, timeout=float(self.valves().TIMEOUT) + ) as client: + response = await client.request( + method, url, json=json_data, params=params + ) + content_type = response.headers.get("content-type", "") + if "application/json" in content_type: + return response.status_code, response.json() + return response.status_code, response.text + except Exception as e: + return 0, {"error": f"Request failed: {str(e)}"} + + +class Tools: + """Multi-Model Chat Orchestrator - Real model-to-model communication with persistence.""" + + class Valves(BaseModel): + """Configuration for the orchestrator.""" + + DEFAULT_MODEL_TAGS: str = Field( + default="assistant,helper,agent", + description="Comma-separated tags for automatic model discovery", + ) + TIMEOUT: int = Field(default=180, description="API request timeout in seconds") + DEBUG_MODE: bool = Field( + default=False, description="Enable verbose error output" + ) + MAX_RESULTS: int = Field( + default=50, description="Maximum models to display in list operations" + ) + TAG_PREFIX_CAPABILITY: str = Field( + default="cap:", description="Prefix for capability tags" + ) + TAG_PREFIX_RELATIONSHIP: str = Field( + default="rel:", description="Prefix for relationship tags" + ) + + class UserValves(BaseModel): + pass + + def __init__(self): + self.valves = self.Valves() + self.owui_chat = OWUIChat(self) + + async def find_chat_by_title(self, title: str, __request__=None) -> dict: + """Find chat ID by title (partial match). Returns exact ID if one match found.""" + status, data = await self.owui_chat.api_request("GET", "/api/v1/chats/", __request__) + retVal = {"status": "failure", "message": ""} + + if status != 200: + retVal["message"] = f"❌ find_chat_by_title FAILED\nHTTP {status}: {data}" + return retVal + + chat_list = ( + data + if isinstance(data, list) + else data.get("items", []) if isinstance(data, dict) else [] + ) + + matches = [] + for chat in chat_list: + if not isinstance(chat, dict): + continue + chat_title = chat.get("title", "") + if title.lower() in chat_title.lower(): + matches.append( + { + "id": chat.get("id"), + "title": chat_title, + "models": ( + chat.get("chat", {}).get("models", []) + if isinstance(chat.get("chat"), dict) + else [] + ), + } + ) + + if not matches: + retVal["messages"] = f"❌ No chats found matching '{title}'" + return retVal + if len(matches) == 1: + m = matches[0] + retVal["status"] = "success" + retVal["message"] = f"✅ Single match found:\nChat ID: {m['id']}\nTitle: {m['title']}" + return retVal + + result = f"⚠️ Multiple matches ({len(matches)}):\n\n" + for i, m in enumerate(matches[:5], 1): + result += f"{i}. {m['title']}\n ID: {m['id']}\n" + retVal["status"] = "success" + retVal["message"] = result + return retVal + + async def owui_list_models(self, __request__=None) -> dict: + """List all available models with IDs, tags, and capabilities.""" + status, data = await self.owui_chat.api_request("GET", "/api/v1/models/", __request__) + retVal = {"status": "failure", "message": "", "models": []} + if status != 200: + retVal["message"] = f"❌ list_models FAILED\nHTTP {status}: {data}" + return retVal + + models = ( + data + if isinstance(data, list) + else data.get("items", []) if isinstance(data, dict) else [] + ) + if not models: + retVal = "❌ No models found" + return retVal + + retVal["message"] = f"📋 Available Models ({len(models)} total, showing {self.valves.MAX_RESULTS}):\n\n" + for model in models[: self.valves.MAX_RESULTS]: + if not isinstance(model, dict): + continue + + if not "name" in model or model.get("name") == "": + continue + + retVal["models"].append(model) +# name = model.get("name", model.get("id", "Unknown")) +# model_id = model.get("id") +# meta = model.get("meta", {}) +# tags = meta.get("tags", []) +# tag_str = ( +# ", ".join( +# t.get("name") if isinstance(t, dict) else t for t in tags if t +# ) +# if tags +# else "none" +# ) +# +# capabilities = meta.get("capabilities", {}) +# cap_str = ( +# ", ".join(f"{k}={v}" for k, v in capabilities.items()) +# if capabilities +# else "none" +# ) +# +# result += f"• {name}\n ID: {model_id}\n Tags: {tag_str}\n Capabilities: {cap_str}\n\n" + + if len(models) > self.valves.MAX_RESULTS: + retVal["message"] += f"... and {len(models) - self.valves.MAX_RESULTS} more\n" + return retVal + + async def get_models_by_tag(self, tag: str, __request__=None) -> str: + """Get models by tag name with full tag profiles.""" + status, data = await self._api_request( + "GET", f"/api/v1/models/list?tag={tag}&page=1", __request__ + ) + if status != 200: + return f"❌ get_models_by_tag FAILED\nHTTP {status}: {data}" + + items = ( + data.get("items", []) + if isinstance(data, dict) + else data if isinstance(data, list) else [] + ) + total = data.get("total", len(items)) if isinstance(data, dict) else len(items) + + if not items: + return f"❌ No models found with tag '{tag}'" + + result = f"✅ Models with tag '{tag}' ({total} total):\n\n" + for model in items[: self.valves.MAX_RESULTS]: + if not isinstance(model, dict): + continue + name = model.get("name", model.get("id")) + model_id = model.get("id") + meta = model.get("meta", {}) + all_tags = meta.get("tags", []) + tag_list = ( + ", ".join( + t.get("name", t) + for t in all_tags + if isinstance(t, dict) or isinstance(t, str) + ) + if all_tags + else "none" + ) + + result += f"• {name} (`{model_id}`)\n All tags: {tag_list}\n\n" + + if len(items) > self.valves.MAX_RESULTS: + result += f"... and {len(items) - self.valves.MAX_RESULTS} more\n" + return result + + async def get_model_by_name(self, name: str, __request__=None) -> str: + """Find model by partial name match.""" + status, data = await self._api_request("GET", "/api/v1/models/", __request__) + if status != 200: + return f"❌ get_model_by_name FAILED\nHTTP {status}: {data}" + + models = ( + data + if isinstance(data, list) + else data.get("items", []) if isinstance(data, dict) else [] + ) + if not models: + return "❌ get_model_by_name FAILED\nNo models returned" + + matches = [] + for model in models: + if not isinstance(model, dict): + continue + model_name = model.get("name", model.get("id", "")) + if name.lower() in model_name.lower(): + matches.append(model) + + if not matches: + return f"❌ No models matching '{name}'" + + if len(matches) == 1: + m = matches[0] + name = m.get("name", m.get("id")) + model_id = m.get("id") + meta = m.get("meta", {}) + tags = meta.get("tags", []) + tag_str = ( + ", ".join( + t.get("name", t) + for t in tags + if isinstance(t, dict) or isinstance(t, str) + ) + if tags + else "none" + ) + return f"✅ Found: {name}\nID: {model_id}\nTags: {tag_str}" + + result = f"⚠️ Multiple matches ({len(matches)}):\n\n" + for i, m in enumerate(matches[:5], 1): + result += f"{i}. {m.get('name')}\n ID: {m.get('id')}\n\n" + return result + + async def discover_model_relationships( + self, + relationship_prefix: str = "rel:works-with", + __request__=None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """Discover models with relationship tags and dump their full tag profiles.""" + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": "Scanning for relationships...", + "done": False, + }, + } + ) + + status, data = await self._api_request("GET", "/api/v1/models/", __request__) + if status != 200: + return f"❌ Failed: HTTP {status}" + + models = ( + data + if isinstance(data, list) + else data.get("items", []) if isinstance(data, dict) else [] + ) + + relationship_map = {} + + for model in models: + if not isinstance(model, dict): + continue + + model_id = model.get("id") + model_name = model.get("name", model_id) + meta = model.get("meta", {}) + tags = meta.get("tags", []) + + # Extract all relationship tags + for tag in tags: + tag_name = tag.get("name") if isinstance(tag, dict) else tag + if tag_name and tag_name.lower().startswith( + relationship_prefix.lower() + ): + target = tag_name[len(relationship_prefix) :].strip() + relationship_map.setdefault(target, []).append( + { + "model_id": model_id, + "model_name": model_name, + "all_tags": [t.get("name", t) for t in tags if t], + "capabilities": meta.get("capabilities", {}), + } + ) + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Found {len(relationship_map)} relationship(s)", + "done": True, + }, + } + ) + + if not relationship_map: + return f"❌ No '{relationship_prefix}' relationships found" + + result = f"🔍 Relationship Dump: '{relationship_prefix}'\n{'='*60}\n\n" + for target, sources in sorted(relationship_map.items()): + result += f"📌 Target: **{target}** ({len(sources)} source(s))\n" + for src in sources: + result += f" • {src['model_name']} (`{src['model_id']}`)\n" + result += f" Tags: {', '.join(src['all_tags']) or 'none'}\n" + result += f" Caps: {', '.join(f'{k}:{v}' for k,v in src['capabilities'].items()) or 'none'}\n" + result += "\n" + + return result + + async def add_capability_tag( + self, + model_id: str, + capability: str, + __request__=None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> dict: + """Add a capability tag (cap:) to a model.""" + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Adding capability tag to {model_id}...", + "done": False, + }, + } + ) + + # Get current model data + status, model_data = await self._api_request( + "GET", f"/api/v1/models/model?id={model_id}", __request__ + ) + if status != 200: + return f"❌ Could not get model: {model_data}" + + meta = model_data.get("meta", {}) + tags = meta.get("tags", []) + + # Add new capability tag + new_tag = f"{self.valves.TAG_PREFIX_CAPABILITY}{capability}" + if new_tag not in [t.get("name", t) for t in tags]: + tags.append({"name": new_tag}) + + # Update model + update_payload = {"id": model_id, "meta": {**meta, "tags": tags}} + + status, response = await self._api_request( + "POST", "/api/v1/models/model/update", __request__, json_data=update_payload + ) + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Tag added successfully", "done": True}, + } + ) + + if status == 200: + return f"✅ Added capability tag '{new_tag}' to {model_id}" + return f"❌ Failed to update model: {response}" + + async def add_relationship_tag( + self, + model_id: str, + relationship: str, + target_model: str, + __request__=None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """Add a relationship tag (rel:) to a model.""" + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Adding relationship tag to {model_id}...", + "done": False, + }, + } + ) + + # Get current model data + status, model_data = await self._api_request( + "GET", f"/api/v1/models/model?id={model_id}", __request__ + ) + if status != 200: + return f"❌ Could not get model: {model_data}" + + meta = model_data.get("meta", {}) + tags = meta.get("tags", []) + + # Add new relationship tag + new_tag = f"{self.valves.TAG_PREFIX_RELATIONSHIP}{relationship}:{target_model}" + if new_tag not in [t.get("name", t) for t in tags]: + tags.append({"name": new_tag}) + + # Update model + update_payload = {"id": model_id, "meta": {**meta, "tags": tags}} + + status, response = await self._api_request( + "POST", "/api/v1/models/model/update", __request__, json_data=update_payload + ) + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Relationship tag added", "done": True}, + } + ) + + if status == 200: + return f"✅ Added relationship tag '{new_tag}' to {model_id}" + return f"❌ Failed to update model: {response}" + + async def dump_tag_taxonomy(self, __request__=None) -> str: + """Dump complete tag taxonomy showing cap: and rel: distributions.""" + status, data = await self._api_request("GET", "/api/v1/models/", __request__) + if status != 200: + return f"❌ Failed: HTTP {status}" + + models = ( + data + if isinstance(data, list) + else data.get("items", []) if isinstance(data, dict) else [] + ) + + capability_tags = {} + relationship_tags = {} + all_capabilities = {} + + for model in models: + if not isinstance(model, dict): + continue + + model_id = model.get("id") + model_name = model.get("name", model_id) + meta = model.get("meta", {}) + tags = meta.get("tags", []) + capabilities = meta.get("capabilities", {}) + + # Aggregate capabilities + for cap, val in capabilities.items(): + all_capabilities.setdefault(cap, []).append(f"{model_name}={val}") + + # Categorize tags + for tag in tags: + tag_name = tag.get("name") if isinstance(tag, dict) else tag + if not tag_name: + continue + + if tag_name.startswith(self.valves.TAG_PREFIX_CAPABILITY): + capability_tags.setdefault(tag_name, []).append(model_name) + elif tag_name.startswith(self.valves.TAG_PREFIX_RELATIONSHIP): + relationship_tags.setdefault(tag_name, []).append(model_name) + + result = "📊 Tag Taxonomy Dump\n" + "=" * 60 + "\n\n" + + if capability_tags: + result += "🎯 CAPABILITY TAGS:\n" + for cap, models in sorted(capability_tags.items()): + result += f" {cap}: {len(models)} model(s)\n" + for m in models[:3]: + result += f" • {m}\n" + if len(models) > 3: + result += f" ... and {len(models)-3} more\n" + result += "\n" + + if relationship_tags: + result += "🔗 RELATIONSHIP TAGS:\n" + for rel, models in sorted(relationship_tags.items()): + result += f" {rel}: {len(models)} model(s)\n" + for m in models[:3]: + result += f" • {m}\n" + if len(models) > 3: + result += f" ... and {len(models)-3} more\n" + result += "\n" + + if all_capabilities: + result += "🧠 CAPABILITIES:\n" + for cap, instances in sorted(all_capabilities.items()): + result += f" {cap}: {len(instances)} value(s)\n" + for inst in instances[:3]: + result += f" • {inst}\n" + if len(instances) > 3: + result += f" ... and {len(instances)-3} more\n" + + return result or "❌ No tags found" + + async def chat_with_model( + self, + chat_id: str, + model_identifier: Optional[str], + message: str, + __request__=None, + __event_emitter__: Callable[[dict], Any] = None, + ) -> str: + """🚀 PRIMARY TOOL: Robust multi-turn chat with proper message threading.""" + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Resolving model...", "done": False}, + } + ) + + # NEW CHAT: model_identifier is REQUIRED + if chat_id == "new": + if not model_identifier: + return "❌ chat_with_model FAILED: model_identifier required when creating new chat" + + # Resolve model_identifier to actual model_id + target_model = None + model_name = model_identifier + + # Try tag first (if no slash in identifier) + if "/" not in model_identifier: + status, tag_data = await self._api_request( + "GET", + f"/api/v1/models/list?tag={model_identifier}&page=1", + __request__, + ) + if status == 200: + items = ( + tag_data.get("items", []) + if isinstance(tag_data, dict) + else tag_data + ) + if items and isinstance(items, list) and len(items) > 0: + target_model = items[0].get("id") + model_name = items[0].get("name", target_model) + + # Fallback to direct model ID + if not target_model: + target_model = model_identifier + + # Get friendly name + status, model_info = await self._api_request( + "GET", f"/api/v1/models/{model_identifier}", __request__ + ) + if status == 200 and isinstance(model_info, dict): + model_name = model_info.get("name", model_identifier) + else: + model_name = model_identifier.replace("-", " ").title() + + if not target_model: + return "❌ chat_with_model FAILED: Could not resolve target model" + + # Create new chat + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "Creating chat...", "done": False}, + } + ) + + payload = { + "chat": { + "title": f"Orchestrated: {model_name}", + "models": [target_model], + "params": {}, + "history": {"messages": {}, "currentId": None}, + "messages": [], + "tags": ["orchestrated"], + "files": [], + } + } + + status, chat_data = await self._api_request( + "POST", "/api/v1/chats/new", __request__, json_data=payload + ) + if status != 200: + return f"❌ chat_with_model FAILED\nCould not create chat: {chat_data}" + + chat_id = chat_data.get("id") + if not chat_id: + return "❌ chat_with_model FAILED: No chat ID returned" + + # EXISTING CHAT: model_identifier is OPTIONAL + else: + # Verify chat exists + status, chat_data = await self._api_request( + "GET", f"/api/v1/chats/{chat_id}", __request__ + ) + if status != 200: + return f"❌ chat_with_model FAILED\nCould not get chat: {chat_data}" + + chat_obj = chat_data.get("chat", {}) + existing_models = chat_obj.get("models", []) + + if model_identifier: + # Use provided model + target_model = model_identifier + model_name = model_identifier.replace("-", " ").title() + elif existing_models and len(existing_models) > 0: + # Auto-detect from chat + target_model = existing_models[0] + status, model_info = await self._api_request( + "GET", f"/api/v1/models/{target_model}", __request__ + ) + if status == 200 and isinstance(model_info, dict): + model_name = model_info.get("name", target_model) + else: + model_name = target_model.replace("-", " ").title() + else: + return "❌ chat_with_model FAILED: No model found and no model_identifier provided" + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "description": f"Chatting with {model_name}...", + "done": False, + }, + } + ) + + # Build conversation history WITH PROPER THREADING + status, chat_data = await self._api_request( + "GET", f"/api/v1/chats/{chat_id}", __request__ + ) + if status != 200: + return f"❌ Could not retrieve chat history: {chat_data}" + + chat_obj = chat_data.get("chat", {}) + existing_messages = chat_obj.get("messages", []) + + completion_messages = [] + for msg in existing_messages: + if msg.get("role") in ["user", "assistant"]: + completion_messages.append( + {"role": msg["role"], "content": msg["content"]} + ) + + # 🔑 KEY ENHANCEMENT: Enhanced chat_id injection for multi-turn awareness + enhanced_message = f"{message}\n\n[System: This is part of chat {chat_id}. You may respond here.]" + completion_messages.append({"role": "user", "content": enhanced_message}) + + # Send to target model + status, completion_data = await self._api_request( + "POST", + "/api/v1/chat/completions", + __request__, + json_data={ + "model": target_model, + "stream": False, + "messages": completion_messages, + "chat_id": chat_id, + }, + ) + + if status != 200: + return f"❌ chat_with_model FAILED\nCompletion error: {completion_data}" + + try: + response_text = completion_data["choices"][0]["message"]["content"] + except (KeyError, IndexError, TypeError) as e: + return f"❌ Parse error: {e}" + + # Persist with proper message threading + updated_chat = self._build_message_structure( + existing_messages, enhanced_message, response_text, target_model, model_name + ) + updated_chat["title"] = chat_obj.get("title", f"Chat with {model_name}") + + status, _ = await self._api_request( + "POST", + f"/api/v1/chats/{chat_id}", + __request__, + json_data={"chat": updated_chat}, + ) + + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": {"description": "✅ Response persisted", "done": True}, + } + ) + + return f"""✅ chat_with_model SUCCESS +═══════════════════════════════════════════════════════════ +Model: {model_name} ({target_model}) +Chat ID: {chat_id} +═══════════════════════════════════════════════════════════ +Your message: {message} +═══════════════════════════════════════════════════════════ +Response (persisted): {response_text} +═══════════════════════════════════════════════════════════ +✅ This multi-turn response is now in chat history +""" + + def _build_message_structure( + self, + existing_messages: List[Dict], + user_content: str, + assistant_content: str, + model: str, + model_name: str = None, + ) -> Dict: + """Build chat structure with proper message linking for multi-turn.""" + if not model_name: + model_name = model.replace("-", " ").title() + + timestamp = int(time.time()) + user_uuid = str(uuid.uuid4()) + asst_uuid = str(uuid.uuid4()) + + # Find last message ID for proper threading + last_msg_id = existing_messages[-1].get("id") if existing_messages else None + + # Build user message + user_msg = { + "id": user_uuid, + "parentId": last_msg_id, + "childrenIds": [asst_uuid], + "role": "user", + "content": user_content, + "timestamp": timestamp, + "models": [model], + } + + # Build assistant message + asst_msg = { + "id": asst_uuid, + "parentId": user_uuid, + "childrenIds": [], + "role": "assistant", + "content": assistant_content, + "model": model, + "modelName": model_name, + "modelIdx": 0, + "timestamp": timestamp, + "done": True, + } + + # Update parent's children if exists + if last_msg_id: + for msg in existing_messages: + if msg.get("id") == last_msg_id: + if "childrenIds" not in msg: + msg["childrenIds"] = [] + msg["childrenIds"].append(user_uuid) + break + + all_messages = existing_messages + [user_msg, asst_msg] + history_messages = {msg["id"]: msg for msg in all_messages} + + return { + "models": [model], + "params": {}, + "history": {"messages": history_messages, "currentId": asst_uuid}, + "messages": all_messages, + "tags": ["orchestrated"], + "files": [], + } +