import { serve } from "@hono/node-server"; import { serveStatic } from "@hono/node-server/serve-static"; import { CopilotRuntime, ExperimentalEmptyAdapter, copilotRuntimeNextJSAppRouterEndpoint, } from "@copilotkit/runtime"; import { HttpAgent, Middleware, EventType } from "@ag-ui/client"; import { Observable } from "rxjs"; import { Hono } from "hono"; const AGENT_URL = process.env.AGENT_URL ?? "http://backend:8000/api/v1/agent/agui"; const BACKEND_URL = process.env.BACKEND_URL ?? "http://localhost:8001"; const isProd = process.env.NODE_ENV === "production"; const PORT = parseInt(process.env.PORT ?? (isProd ? "3000" : "3001")); // ── pairOrphanToolCalls ────────────────────────────────────────────────────── // CopilotKit's browser-side store sometimes drops the tool-role message // between turns, producing assistant(toolCalls=[X]) → assistant(text) which // OpenAI rejects. Inject a synthetic empty tool response for each orphan id. interface AGUIMessage { id?: string; role?: string; content?: unknown; toolCalls?: Array<{ id?: string }>; tool_calls?: Array<{ id?: string }>; toolCallId?: string; tool_call_id?: string; } function pairOrphanToolCalls(messages: AGUIMessage[]): AGUIMessage[] { const out: AGUIMessage[] = []; let pending: string[] = []; const flush = () => { for (const callId of pending) { out.push({ id: `synth-${Math.random().toString(36).slice(2)}`, role: "tool", toolCallId: callId, content: "" }); } pending = []; }; for (const msg of messages) { const role = String(msg?.role ?? ""); if (role === "tool") { const callId = msg.toolCallId ?? msg.tool_call_id; if (callId) pending = pending.filter((p) => p !== callId); out.push(msg); continue; } if (role === "assistant") { flush(); const toolCalls = msg.toolCalls ?? msg.tool_calls ?? []; out.push(msg); for (const tc of toolCalls) { if (tc?.id) pending.push(String(tc.id)); } continue; } flush(); out.push(msg); } flush(); return out; } // ── DeduplicateToolCallMiddleware ───────────────────────────────────────────── // MAF re-emits TOOL_CALL_START for declaration-only calls when they are invoked // in parallel with other tools, causing verifyEvents to throw an AGUIError. // This middleware tracks completed tool calls and silently drops any duplicate // START/ARGS/END events for a call ID that has already been closed. // // NOTE: runNextWithState emits { event, messages, state } wrappers; always // extract `.event` and forward the raw BaseEvent — never the wrapper. // ── DebugLogMiddleware ────────────────────────────────────────────────────── // Temporary: logs every event type flowing out to diagnose double-response bug. // eslint-disable-next-line @typescript-eslint/no-explicit-any class DebugLogMiddleware extends (Middleware as any) { // eslint-disable-next-line @typescript-eslint/no-explicit-any run(input: any, next: any): Observable { // eslint-disable-next-line @typescript-eslint/no-explicit-any return new Observable((observer) => { // eslint-disable-next-line @typescript-eslint/no-explicit-any const sub = (this as any).runNextWithState(input, next).subscribe({ // eslint-disable-next-line @typescript-eslint/no-explicit-any next: (eventWithState: any) => { const event = eventWithState.event; const type: string = event?.type ?? ""; const extra = event?.toolCallName ? ` [${event.toolCallName}]` : event?.activityType ? ` [${event.activityType}]` : event?.messageId ? ` [msg:${event.messageId.slice(0, 8)}]` : ""; console.log(`[DBG] ${type}${extra}`); if (type === EventType.MESSAGES_SNAPSHOT) { const msgs = (event as any)?.messages ?? []; console.log(`[DBG] SNAPSHOT msgs: ${msgs.map((m: any) => `${m.role}:${(m.id ?? "").slice(0,8)}:${typeof m.content === "string" ? m.content.length : "?"}c`).join(" | ")}`); } observer.next(event); }, // eslint-disable-next-line @typescript-eslint/no-explicit-any error: (err: any) => observer.error(err), complete: () => observer.complete(), }); return () => sub.unsubscribe(); }); } } // eslint-disable-next-line @typescript-eslint/no-explicit-any class DeduplicateToolCallMiddleware extends (Middleware as any) { // eslint-disable-next-line @typescript-eslint/no-explicit-any run(input: any, next: any): Observable { const open = new Set(); // tool call IDs currently in progress const closed = new Set(); // tool call IDs that already received END // eslint-disable-next-line @typescript-eslint/no-explicit-any return new Observable((observer) => { // eslint-disable-next-line @typescript-eslint/no-explicit-any const sub = (this as any).runNextWithState(input, next).subscribe({ // eslint-disable-next-line @typescript-eslint/no-explicit-any next: (eventWithState: any) => { const event = eventWithState.event; // raw BaseEvent const type: string = event?.type ?? ""; const id: string | undefined = event?.toolCallId; if (type === EventType.TOOL_CALL_START) { if (!id || closed.has(id) || open.has(id)) return; // duplicate open.add(id); } else if (type === EventType.TOOL_CALL_ARGS || type === EventType.TOOL_CALL_END) { if (id && closed.has(id)) return; // already completed, drop if (type === EventType.TOOL_CALL_END && id) { open.delete(id); closed.add(id); } } observer.next(event); // emit raw BaseEvent }, // eslint-disable-next-line @typescript-eslint/no-explicit-any error: (err: any) => observer.error(err), complete: () => observer.complete(), }); return () => sub.unsubscribe(); }); } } // ── StripModelArtifactsMiddleware ──────────────────────────────────────────── // Some OpenAI models occasionally leak training-data special tokens such as // `<|ipynb_marker|>` into completion text. Filter them out of streamed deltas // before they reach the chat UI. const ARTIFACT_RE = /<\|[^|>]+\|>/g; // eslint-disable-next-line @typescript-eslint/no-explicit-any class StripModelArtifactsMiddleware extends (Middleware as any) { // eslint-disable-next-line @typescript-eslint/no-explicit-any run(input: any, next: any): Observable { // eslint-disable-next-line @typescript-eslint/no-explicit-any return new Observable((observer) => { // eslint-disable-next-line @typescript-eslint/no-explicit-any const sub = (this as any).runNextWithState(input, next).subscribe({ // eslint-disable-next-line @typescript-eslint/no-explicit-any next: (eventWithState: any) => { const event = eventWithState.event; if ( event?.type === EventType.TEXT_MESSAGE_CONTENT && typeof event?.delta === "string" && ARTIFACT_RE.test(event.delta) ) { const cleaned = event.delta.replace(ARTIFACT_RE, ""); if (cleaned.length === 0) return; observer.next({ ...event, delta: cleaned }); return; } observer.next(event); }, // eslint-disable-next-line @typescript-eslint/no-explicit-any error: (err: any) => observer.error(err), complete: () => observer.complete(), }); return () => sub.unsubscribe(); }); } } // ── ReconcileSnapshotMiddleware ────────────────────────────────────────────── // MAF's `_build_messages_snapshot` (agent_framework_ag_ui/_agent_run.py:686) // mints a fresh UUID for the post-tool-call assistant text instead of reusing // the streamed TEXT_MESSAGE_START id. The resulting MESSAGES_SNAPSHOT then // contains TWO assistant entries: the streamed id (holding the toolCalls) and // a brand-new id (holding the duplicated text). ag-ui's snapshot merge replaces // by id then APPENDS unknown ids, so the browser ends up with two assistant // bubbles for the same answer. Dropping the snapshot entirely fixes the dupe // but breaks render_a2ui card persistence (cards rely on the snapshot to keep // the assistant-with-toolCalls message in state past the run). The right fix // is to drop just the orphan text-only assistant message that has no streamed // counterpart. Remove once `_agent_run.py:686` reuses `flow.message_id`. // eslint-disable-next-line @typescript-eslint/no-explicit-any class ReconcileSnapshotMiddleware extends (Middleware as any) { // eslint-disable-next-line @typescript-eslint/no-explicit-any run(input: any, next: any): Observable { const streamedTextIds = new Set(); // eslint-disable-next-line @typescript-eslint/no-explicit-any return new Observable((observer) => { // eslint-disable-next-line @typescript-eslint/no-explicit-any const sub = (this as any).runNextWithState(input, next).subscribe({ // eslint-disable-next-line @typescript-eslint/no-explicit-any next: (eventWithState: any) => { const event = eventWithState.event; const type: string = event?.type ?? ""; if (type === EventType.TEXT_MESSAGE_START && event?.messageId) { streamedTextIds.add(String(event.messageId)); } if (type === EventType.MESSAGES_SNAPSHOT) { // eslint-disable-next-line @typescript-eslint/no-explicit-any const msgs: any[] = Array.isArray(event?.messages) ? event.messages : []; const filtered = msgs.filter((m) => { if (m?.role !== "assistant") return true; const id = String(m?.id ?? ""); const hasText = typeof m?.content === "string" && m.content.length > 0; const hasToolCalls = (Array.isArray(m?.toolCalls) && m.toolCalls.length > 0) || (Array.isArray(m?.tool_calls) && m.tool_calls.length > 0); if (hasText && !hasToolCalls && !streamedTextIds.has(id)) { return false; // drop orphan text-only assistant duplicate } return true; }); if (filtered.length !== msgs.length) { observer.next({ ...event, messages: filtered }); return; } } observer.next(event); }, // eslint-disable-next-line @typescript-eslint/no-explicit-any error: (err: any) => observer.error(err), complete: () => observer.complete(), }); return () => sub.unsubscribe(); }); } } // ── SuppressRenderToolTextMiddleware ────────────────────────────────────────── // When the LLM emits text content in the same response as a render tool call // (render_a2ui or render_spending_summary), the text appears as a duplicate // below the card. This middleware buffers TEXT_MESSAGE_* events and discards // them if any render tool call is detected in the same turn. const RENDER_TOOLS = new Set(["render_a2ui", "render_spending_summary"]); // eslint-disable-next-line @typescript-eslint/no-explicit-any class SuppressRenderToolTextMiddleware extends (Middleware as any) { // eslint-disable-next-line @typescript-eslint/no-explicit-any run(input: any, next: any): Observable { let renderToolSeen = false; // eslint-disable-next-line @typescript-eslint/no-explicit-any const textBuffer: any[] = []; // eslint-disable-next-line @typescript-eslint/no-explicit-any return new Observable((observer) => { // eslint-disable-next-line @typescript-eslint/no-explicit-any const sub = (this as any).runNextWithState(input, next).subscribe({ // eslint-disable-next-line @typescript-eslint/no-explicit-any next: (eventWithState: any) => { const event = eventWithState.event; // raw BaseEvent const type: string = event?.type ?? ""; if (type === EventType.TOOL_CALL_START) { const toolName: string = event?.toolCallName ?? ""; if (RENDER_TOOLS.has(toolName)) { renderToolSeen = true; textBuffer.length = 0; // discard text buffered before we saw the tool call } } if ( type === EventType.TEXT_MESSAGE_START || type === EventType.TEXT_MESSAGE_CONTENT || type === EventType.TEXT_MESSAGE_END ) { if (!renderToolSeen) textBuffer.push(event); // buffer raw event return; // always hold — flush at turn end } if (type === EventType.RUN_FINISHED || type === EventType.RUN_ERROR) { if (!renderToolSeen) { for (const e of textBuffer) observer.next(e); // flush raw events } textBuffer.length = 0; observer.next(event); // emit raw event return; } observer.next(event); // emit raw event }, // eslint-disable-next-line @typescript-eslint/no-explicit-any error: (err: any) => observer.error(err), complete: () => { if (!renderToolSeen) { for (const e of textBuffer) observer.next(e); } observer.complete(); }, }); return () => sub.unsubscribe(); }); } } // ── Hono app ───────────────────────────────────────────────────────────────── const app = new Hono(); app.all("/api/copilotkit/*", async (c) => { const cookieHeader = c.req.header("cookie") ?? ""; const match = cookieHeader.match(/(?:^|;\s*)ws_token=([^;]+)/); const token = match?.[1]; const agentHeaders: Record = token ? { Authorization: `Bearer ${token}` } : {}; const agent = new HttpAgent({ url: AGENT_URL, headers: agentHeaders }); // eslint-disable-next-line @typescript-eslint/no-explicit-any agent.use(new SuppressRenderToolTextMiddleware() as any); // eslint-disable-next-line @typescript-eslint/no-explicit-any agent.use(new StripModelArtifactsMiddleware() as any); // eslint-disable-next-line @typescript-eslint/no-explicit-any agent.use(new ReconcileSnapshotMiddleware() as any); // eslint-disable-next-line @typescript-eslint/no-explicit-any agent.use(new DeduplicateToolCallMiddleware() as any); // eslint-disable-next-line @typescript-eslint/no-explicit-any agent.use(new DebugLogMiddleware() as any); const runtime = new CopilotRuntime({ agents: { wealthysmart: agent }, a2ui: { injectA2UITool: true }, beforeRequestMiddleware: async ({ request: outbound }) => { if (outbound.method !== "POST") return; const ct = outbound.headers.get("content-type") ?? ""; if (!ct.includes("application/json")) return; try { const body = (await outbound.clone().json()) as { messages?: AGUIMessage[] }; if (!Array.isArray(body.messages)) return; const paired = pairOrphanToolCalls(body.messages); if (paired.length === body.messages.length) return; return new Request(outbound.url, { method: outbound.method, headers: outbound.headers, body: JSON.stringify({ ...body, messages: paired }), }); } catch { return; } }, }); const { handleRequest } = copilotRuntimeNextJSAppRouterEndpoint({ runtime, serviceAdapter: new ExperimentalEmptyAdapter(), endpoint: "/api/copilotkit", }); return handleRequest(c.req.raw as Parameters[0]); }); app.get("/api/health", (c) => c.json({ ok: true })); // Proxy backend API calls (FastAPI). In dev these hit Vite's proxy directly, // but in prod the browser talks to this Hono server, which must forward // `/api/v1/*` and `/api/auth/*` to the FastAPI container — otherwise the SPA // fallback below swallows them and returns index.html. const proxyToBackend = async (c: import("hono").Context) => { const url = new URL(c.req.url); const target = `${BACKEND_URL}${url.pathname}${url.search}`; const method = c.req.method; const headers = new Headers(c.req.raw.headers); headers.delete("host"); const init: RequestInit = { method, headers, redirect: "manual", }; if (method !== "GET" && method !== "HEAD") { init.body = c.req.raw.body; // @ts-expect-error undici requires duplex for streamed bodies init.duplex = "half"; } return fetch(target, init); }; app.all("/api/v1/*", proxyToBackend); app.all("/api/auth/*", proxyToBackend); // In production, serve the Vite build output. if (isProd) { app.use("/*", serveStatic({ root: "./dist" })); // SPA fallback: any non-asset path returns index.html app.get("/*", serveStatic({ path: "./dist/index.html" })); } serve({ fetch: app.fetch, port: PORT }, (info) => { console.log(`CopilotKit server on port ${info.port} [${isProd ? "prod" : "dev"}]`); });