diff --git a/app-prefixable/src/context/sync.tsx b/app-prefixable/src/context/sync.tsx index 0b303b00..4255ea40 100644 --- a/app-prefixable/src/context/sync.tsx +++ b/app-prefixable/src/context/sync.tsx @@ -1,6 +1,6 @@ import { createContext, useContext, onCleanup, batch, type ParentProps } from "solid-js" import { createStore, reconcile, produce } from "solid-js/store" -import type { Session, Message, Part, Provider } from "../sdk/client" +import type { Session, Message, Part, Provider, TextPart } from "../sdk/client" import { useSDK } from "./sdk" import { useServer } from "./server" import { createSSEParser } from "../utils/sse" @@ -58,6 +58,95 @@ function sortParts(parts: Part[]): Part[] { return [...withId, ...withoutId] } +function toolEnd(part: Extract): number { + const state = part.state + if (state.status === "completed") return state.time.end + if (state.status === "error") return state.time.end + return 0 +} + +function toolStart(part: Extract): number { + const state = part.state + if (state.status === "running") return state.time.start + if (state.status === "completed") return state.time.start + if (state.status === "error") return state.time.start + return 0 +} + +function toolRank(part: Extract): number { + const state = part.state + if (state.status === "pending") return 1 + if (state.status === "running") return 2 + return 3 +} + +function mergePart(existing: Part, synced: Part): Part { + if (existing.type !== "tool") { + if (synced.type !== "tool") { + const getTimeValue = (part: Part): number => { + if (part.type === "text" || part.type === "reasoning") { + const t = (part as TextPart).time + return t?.end ?? t?.start ?? 0 + } + if (part.type === "retry") { + return (part as { time?: { created?: number } }).time?.created ?? 0 + } + return 0 + } + const existingEnd = getTimeValue(existing) + const syncedEnd = getTimeValue(synced) + if (existingEnd > syncedEnd) return existing + if (syncedEnd > existingEnd) return synced + return existing + } + return synced + } + if (synced.type !== "tool") return synced + + const existingEnd = toolEnd(existing) + const syncedEnd = toolEnd(synced) + if (existingEnd > syncedEnd) return existing + if (syncedEnd > existingEnd) return synced + + const existingStart = toolStart(existing) + const syncedStart = toolStart(synced) + if (existingStart > syncedStart) return existing + if (syncedStart > existingStart) return synced + + const existingRank = toolRank(existing) + const syncedRank = toolRank(synced) + if (existingRank > syncedRank) return existing + if (syncedRank > existingRank) return synced + + return existing +} + +function mergeMessage(existing: MessageWithParts, synced: MessageWithParts): MessageWithParts { + const existingWithId = existing.parts.filter((p) => !!p.id) + const existingWithoutId = existing.parts.filter((p) => !p.id) + const map = new Map(existingWithId.map((part) => [part.id, part])) + const merged = synced.parts.map((part) => { + const current = map.get(part.id) + if (!current) return part + return mergePart(current, part) + }) + const ids = new Set(merged.filter((p) => !!p.id).map((part) => part.id)) + + for (const part of existingWithId) { + if (ids.has(part.id)) continue + merged.push(part) + } + + for (const part of existingWithoutId) { + merged.push(part) + } + + return { + info: synced.info, + parts: sortParts(merged), + } +} + function errorText(err: unknown) { if (err instanceof Error && err.message.trim()) return err.message return "Failed to bootstrap app state from API." @@ -246,17 +335,38 @@ export function SyncProvider(props: ParentProps) { setStore("message", session.id, reconcile([])) } + function isNewer(a: Part, b: Part): boolean { + if (a.type !== "tool" || b.type !== "tool") return false + const aState = a.state as Extract["state"] + const bState = b.state as Extract["state"] + if (aState.status === "pending" || bState.status === "pending") return false + const aEnd = aState.time?.end ?? aState.time?.start + const bEnd = bState.time?.end ?? bState.time?.start + if (!aEnd) return false + if (!bEnd) return true + return aEnd > bEnd + } + // Message part events - the main real-time update mechanism if (event.type === "message.part.updated") { const part = props.part as Part if (!part?.sessionID || !part?.messageID) return - // Update or insert the part + const hasId = !!part.id setStore("part", part.messageID, (existing: Part[] | undefined) => { if (!existing) return sortParts([part]) - const idx = existing.findIndex((p) => p.id === part.id) - if (idx === -1) return sortParts([...existing, part]) - return existing.map((p, i) => (i === idx ? part : p)) + + if (hasId) { + const idx = existing.findIndex((p) => p.id === part.id) + if (idx === -1) return sortParts([...existing, part]) + const existingPart = existing[idx] + if (!existingPart.id || isNewer(existingPart, part)) return existing + return existing.map((p, i) => (i === idx ? part : p)) + } + + const noIdParts = existing.filter((p) => !p.id) + const withIdParts = existing.filter((p) => !!p.id) + return sortParts([...withIdParts, ...noIdParts, part]) }) // Update parts in existing messages only - don't synthesize messages from parts @@ -269,9 +379,16 @@ export function SyncProvider(props: ParentProps) { // Update existing message parts return msgs.map((m, i) => { if (i !== msgIdx) return m - const partIdx = m.parts.findIndex((p) => p.id === part.id) - const newParts = partIdx === -1 ? [...m.parts, part] : m.parts.map((p, pi) => (pi === partIdx ? part : p)) - return { ...m, parts: newParts } + if (hasId) { + const partIdx = m.parts.findIndex((p) => p.id === part.id) + if (partIdx !== -1) { + const existingPart = m.parts[partIdx] + if (!existingPart.id || isNewer(existingPart, part)) return m + } + const newParts = partIdx === -1 ? [...m.parts, part] : m.parts.map((p, pi) => (pi === partIdx ? part : p)) + return { ...m, parts: newParts } + } + return { ...m, parts: [...m.parts, part] } }) }) } @@ -306,9 +423,17 @@ export function SyncProvider(props: ParentProps) { if (!existing || existing.length === 0) return existing return existing.map((m) => { if (m.info.id !== info.id) return m - // Merge info and optionally update parts if provided + // Prefer info with newer time.completed + const mergedInfo = + m.info.time?.completed && info.time?.completed + ? m.info.time.completed > info.time.completed + ? m.info + : info + : info.time?.completed + ? info + : m.info const updatedParts = parts ? sortParts(parts) : m.parts - return { info, parts: updatedParts } + return { info: mergedInfo, parts: updatedParts } }) }) @@ -405,19 +530,18 @@ export function SyncProvider(props: ParentProps) { setStore("message", sessionID, (existing: MessageWithParts[]) => { if (!existing || existing.length === 0) return synced - // Merge: use existing message if it has more recent parts - const merged = synced.map((s) => { - const e = existing.find((m) => m.info.id === s.info.id) - if (!e) return s - // Keep existing if it has more parts (SSE updates arrived) - return e.parts.length >= s.parts.length ? e : s + const map = new Map(existing.map((msg) => [msg.info.id, msg])) + const merged = synced.map((msg) => { + const current = map.get(msg.info.id) + if (!current) return msg + return mergeMessage(current, msg) }) + const ids = new Set(merged.map((msg) => msg.info.id)) // Add any messages from existing that aren't in synced (new SSE messages) - for (const e of existing) { - if (!merged.find((m) => m.info.id === e.info.id)) { - merged.push(e) - } + for (const msg of existing) { + if (ids.has(msg.info.id)) continue + merged.push(msg) } return merged.sort((a, b) => cmp(a.info.id, b.info.id))