Skip to content
Open
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 143 additions & 20 deletions app-prefixable/src/context/sync.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { createContext, useContext, onCleanup, batch, type ParentProps } from "solid-js"
import { createStore, reconcile, produce } from "solid-js/store"
import type { Session, Message, Part, Provider } from "../sdk/client"
import type { Session, Message, Part, Provider, TextPart } from "../sdk/client"
import { useSDK } from "./sdk"
import { useServer } from "./server"
import { createSSEParser } from "../utils/sse"
Expand Down Expand Up @@ -58,6 +58,95 @@ function sortParts(parts: Part[]): Part[] {
return [...withId, ...withoutId]
}

function toolEnd(part: Extract<Part, { type: "tool" }>): number {
const state = part.state
if (state.status === "completed") return state.time.end
if (state.status === "error") return state.time.end
return 0
}

function toolStart(part: Extract<Part, { type: "tool" }>): number {
const state = part.state
if (state.status === "running") return state.time.start
if (state.status === "completed") return state.time.start
if (state.status === "error") return state.time.start
return 0
}

function toolRank(part: Extract<Part, { type: "tool" }>): number {
const state = part.state
if (state.status === "pending") return 1
if (state.status === "running") return 2
return 3
}

function mergePart(existing: Part, synced: Part): Part {
if (existing.type !== "tool") {
if (synced.type !== "tool") {
const getTimeValue = (part: Part): number => {
if (part.type === "text" || part.type === "reasoning") {
const t = (part as TextPart).time
return t?.end ?? t?.start ?? 0
}
if (part.type === "retry") {
return (part as { time?: { created?: number } }).time?.created ?? 0
}
return 0
}
const existingEnd = getTimeValue(existing)
const syncedEnd = getTimeValue(synced)
if (existingEnd > syncedEnd) return existing
if (syncedEnd > existingEnd) return synced
return existing
}
Comment on lines +96 to +101
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mergePart() defaults to returning synced when the compared freshness signals are equal (e.g., existingEnd === syncedEnd), which can overwrite newer in-memory/SSE state when freshness can't be established. Consider preferring existing on ties (or adding a tie-breaker) to avoid regressing local state during sync merges.

Copilot uses AI. Check for mistakes.
return synced
}
if (synced.type !== "tool") return synced
Comment on lines +83 to +104
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mergePart() always returns synced for non-tool parts. This can overwrite newer local SSE updates if a message.part.updated event for the same part.id arrives while syncSession() is in-flight (the sync payload can be an older snapshot than the current store). Consider adding a generic freshness check for non-tool parts (e.g. compare part.time.start/end when present, or default to keeping existing when both are same id and you can't prove synced is newer), while keeping the tool-specific state merge logic.

Suggested change
function mergePart(existing: Part, synced: Part): Part {
if (existing.type !== "tool") return synced
if (synced.type !== "tool") return synced
function partTimeValue(part: Part, key: "start" | "end"): number | undefined {
const value = (part as { time?: { start?: unknown; end?: unknown } }).time?.[key]
return typeof value === "number" ? value : undefined
}
function mergeNonToolPart(existing: Part, synced: Part): Part {
const existingEnd = partTimeValue(existing, "end")
const syncedEnd = partTimeValue(synced, "end")
if (existingEnd !== undefined && syncedEnd !== undefined) {
if (existingEnd > syncedEnd) return existing
if (syncedEnd > existingEnd) return synced
}
const existingStart = partTimeValue(existing, "start")
const syncedStart = partTimeValue(synced, "start")
if (existingStart !== undefined && syncedStart !== undefined) {
if (existingStart > syncedStart) return existing
if (syncedStart > existingStart) return synced
}
if (existing.id && synced.id && existing.id === synced.id) return existing
return synced
}
function mergePart(existing: Part, synced: Part): Part {
if (existing.type !== "tool" || synced.type !== "tool") {
return mergeNonToolPart(existing, synced)
}

Copilot uses AI. Check for mistakes.

Comment on lines +83 to +105
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mergePart() currently returns synced for all non-tool parts, which means a full-session sync can overwrite newer local/SSE part updates for the same part.id (contrary to the issue acceptance criterion about not regressing newer SSE state). Consider extending the freshness comparison to non-tool parts using their time fields (e.g., end/start/created), or defaulting to existing unless synced is provably newer.

Copilot uses AI. Check for mistakes.
const existingEnd = toolEnd(existing)
const syncedEnd = toolEnd(synced)
if (existingEnd > syncedEnd) return existing
if (syncedEnd > existingEnd) return synced

const existingStart = toolStart(existing)
const syncedStart = toolStart(synced)
if (existingStart > syncedStart) return existing
if (syncedStart > existingStart) return synced

const existingRank = toolRank(existing)
const syncedRank = toolRank(synced)
if (existingRank > syncedRank) return existing
if (syncedRank > existingRank) return synced

return existing
}

function mergeMessage(existing: MessageWithParts, synced: MessageWithParts): MessageWithParts {
const existingWithId = existing.parts.filter((p) => !!p.id)
const existingWithoutId = existing.parts.filter((p) => !p.id)
const map = new Map(existingWithId.map((part) => [part.id, part]))
const merged = synced.parts.map((part) => {
const current = map.get(part.id)
if (!current) return part
return mergePart(current, part)
})
const ids = new Set(merged.filter((p) => !!p.id).map((part) => part.id))

for (const part of existingWithId) {
if (ids.has(part.id)) continue
merged.push(part)
}
Comment on lines +124 to +138
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mergeMessage() only tracks existing parts that have an id (existingWithId). Any locally-added parts without an id (which this file explicitly handles in message.part.updated) will be dropped during a full-session sync merge if they aren’t present in the synced payload. If id-less parts are expected, they should be preserved similarly to how you preserve id-bearing parts (e.g., append existing parts with no id after merging, or incorporate them into the merge output explicitly).

Copilot uses AI. Check for mistakes.

for (const part of existingWithoutId) {
merged.push(part)
}

return {
info: synced.info,
parts: sortParts(merged),
}
Comment on lines +144 to +147
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mergeMessage() always uses info: synced.info, which can regress message metadata if newer message.updated SSE events arrived after the sync request started (same race as parts). Consider preferring the info object with the newer time.completed (or other monotonic fields) when both sides refer to the same message, instead of always taking synced.info.

Copilot uses AI. Check for mistakes.
Comment on lines +144 to +147
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mergeMessage() always takes synced.info. Elsewhere (message.updated) you already guard against out-of-order updates by preferring the Message with newer time.completed. To truly “preserve newer SSE updates” during syncSession(), consider merging info with a similar freshness check (at least for assistant messages) so a stale sync payload can’t overwrite newer local info fields.

Copilot uses AI. Check for mistakes.
Comment on lines +144 to +147
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mergeMessage() always sets info: synced.info, which can overwrite newer message info already received via SSE (e.g., a newer time.completed, token/cost updates, or errors) and violates the intent to preserve newer local state during full-session sync. Consider merging existing.info vs synced.info using a freshness signal (like time.completed/time.created) similar to the logic used in the message.updated handler, and keep the newer one.

Copilot uses AI. Check for mistakes.
Comment on lines +144 to +147
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mergeMessage() always returns info: synced.info, which can overwrite newer message info already applied via SSE (e.g. time.completed/error) if the syncSession() request started before the SSE update and returns a stale snapshot. Consider merging existing.info vs synced.info with the same “prefer newer time.completed” logic used in the message.updated handler, so full sync can’t regress message completion/error state.

Copilot uses AI. Check for mistakes.
}

function errorText(err: unknown) {
if (err instanceof Error && err.message.trim()) return err.message
return "Failed to bootstrap app state from API."
Expand Down Expand Up @@ -246,17 +335,37 @@ export function SyncProvider(props: ParentProps) {
setStore("message", session.id, reconcile([]))
}

function isNewer(a: Part, b: Part): boolean {
if (a.type !== "tool" || b.type !== "tool") return false
const aState = a.state as Extract<Part, { type: "tool" }>["state"]
const bState = b.state as Extract<Part, { type: "tool" }>["state"]
const aEnd = aState.time?.end ?? aState.time?.start
const bEnd = bState.time?.end ?? bState.time?.start
if (!aEnd) return false
if (!bEnd) return true
return aEnd > bEnd
Comment on lines +338 to +347
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isNewer() reads part.time.completed, but Part.time objects in the SDK don’t have a completed field (messages do; parts use start/end/created depending on type). This should be updated to compare the correct timestamps (e.g., time.end ?? time.start for text/reasoning, time.created for retry, and/or tool state.time.*), otherwise this won’t typecheck and won’t correctly protect against out-of-order updates.

Suggested change
function isNewer(a: Part, b: Part): boolean {
const aEnd = a.time?.completed ?? a.time?.start
const bEnd = b.time?.completed ?? b.time?.start
if (!aEnd) return false
if (!bEnd) return true
return aEnd > bEnd
function toTimestamp(value: unknown): number | undefined {
if (typeof value === "number") return Number.isNaN(value) ? undefined : value
if (typeof value === "string") {
const parsed = Date.parse(value)
return Number.isNaN(parsed) ? undefined : parsed
}
if (value instanceof Date) {
const parsed = value.getTime()
return Number.isNaN(parsed) ? undefined : parsed
}
return undefined
}
function getPartTimestamp(part: Part): number | undefined {
const time = part.time as { end?: unknown; start?: unknown; created?: unknown } | undefined
const directTimestamp =
toTimestamp(time?.end) ?? toTimestamp(time?.start) ?? toTimestamp(time?.created)
if (directTimestamp !== undefined) return directTimestamp
const stateTime = (
part as Part & {
state?: {
time?: {
completed?: unknown
end?: unknown
start?: unknown
created?: unknown
}
}
}
).state?.time
return (
toTimestamp(stateTime?.completed) ??
toTimestamp(stateTime?.end) ??
toTimestamp(stateTime?.start) ??
toTimestamp(stateTime?.created)
)
}
function isNewer(a: Part, b: Part): boolean {
const aTime = getPartTimestamp(a)
const bTime = getPartTimestamp(b)
if (aTime === undefined) return false
if (bTime === undefined) return true
return aTime > bTime

Copilot uses AI. Check for mistakes.
}
Comment on lines +338 to +348
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isNewer() references ToolPart["state"], but ToolPart is not imported/defined in this module, so this won’t typecheck. Import ToolPart from ../sdk/client (like other SDK types) or rewrite the narrowing to use Extract<Part, { type: "tool" }>/the existing helper functions (toolStart/toolEnd) without needing ToolPart.

Copilot uses AI. Check for mistakes.
Comment on lines +338 to +348
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isNewer() reads aState.time/bState.time directly, but ToolStatePending has no time field in the generated SDK types. With strict: true this should fail typechecking and can also lead to incorrect comparisons. Narrow on state.status !== "pending" (or use a type guard like "time" in state) before reading time, and compute a comparable timestamp from end ?? start only when the field exists.

Copilot uses AI. Check for mistakes.

// Message part events - the main real-time update mechanism
if (event.type === "message.part.updated") {
const part = props.part as Part
if (!part?.sessionID || !part?.messageID) return

// Update or insert the part
const hasId = !!part.id
setStore("part", part.messageID, (existing: Part[] | undefined) => {
if (!existing) return sortParts([part])
const idx = existing.findIndex((p) => p.id === part.id)
if (idx === -1) return sortParts([...existing, part])
return existing.map((p, i) => (i === idx ? part : p))

if (hasId) {
const idx = existing.findIndex((p) => p.id === part.id)
if (idx === -1) return sortParts([...existing, part])
const existingPart = existing[idx]
if (!existingPart.id || isNewer(existingPart, part)) return existing
return existing.map((p, i) => (i === idx ? part : p))
}

const noIdParts = existing.filter((p) => !p.id)
const withIdParts = existing.filter((p) => !!p.id)
return sortParts([...withIdParts, ...noIdParts, part])
})

// Update parts in existing messages only - don't synthesize messages from parts
Expand All @@ -269,9 +378,16 @@ export function SyncProvider(props: ParentProps) {
// Update existing message parts
return msgs.map((m, i) => {
if (i !== msgIdx) return m
const partIdx = m.parts.findIndex((p) => p.id === part.id)
const newParts = partIdx === -1 ? [...m.parts, part] : m.parts.map((p, pi) => (pi === partIdx ? part : p))
return { ...m, parts: newParts }
if (hasId) {
const partIdx = m.parts.findIndex((p) => p.id === part.id)
if (partIdx !== -1) {
const existingPart = m.parts[partIdx]
if (!existingPart.id || isNewer(existingPart, part)) return m
}
const newParts = partIdx === -1 ? [...m.parts, part] : m.parts.map((p, pi) => (pi === partIdx ? part : p))
return { ...m, parts: newParts }
Comment on lines +388 to +389
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the message.part.updated handler, the part store update uses isNewer(existingPart, part) to ignore stale tool-part updates, but the corresponding update of store.message[...] does not. Since most UI code reads msg.parts from sync.messages(), an out-of-order stale tool update (e.g. reverting completed back to pending) can still regress the visible state even if the part store ignored it. Apply the same freshness guard when replacing parts inside msgs.map(...) to keep the two stores consistent and prevent stale tool state from reappearing.

Suggested change
const newParts = partIdx === -1 ? [...m.parts, part] : m.parts.map((p, pi) => (pi === partIdx ? part : p))
return { ...m, parts: newParts }
if (partIdx === -1) return { ...m, parts: [...m.parts, part] }
const existingPart = m.parts[partIdx]
if (!existingPart.id || isNewer(existingPart, part)) return m
return { ...m, parts: m.parts.map((p, pi) => (pi === partIdx ? part : p)) }

Copilot uses AI. Check for mistakes.
}
return { ...m, parts: [...m.parts, part] }
})
})
}
Expand Down Expand Up @@ -306,9 +422,17 @@ export function SyncProvider(props: ParentProps) {
if (!existing || existing.length === 0) return existing
return existing.map((m) => {
if (m.info.id !== info.id) return m
// Merge info and optionally update parts if provided
// Prefer info with newer time.completed
const mergedInfo =
m.info.time?.completed && info.time?.completed
? m.info.time.completed > info.time.completed
? m.info
: info
: info.time?.completed
? info
: m.info
const updatedParts = parts ? sortParts(parts) : m.parts
return { info, parts: updatedParts }
return { info: mergedInfo, parts: updatedParts }
})
})

Expand Down Expand Up @@ -405,19 +529,18 @@ export function SyncProvider(props: ParentProps) {
setStore("message", sessionID, (existing: MessageWithParts[]) => {
if (!existing || existing.length === 0) return synced

// Merge: use existing message if it has more recent parts
const merged = synced.map((s) => {
const e = existing.find((m) => m.info.id === s.info.id)
if (!e) return s
// Keep existing if it has more parts (SSE updates arrived)
return e.parts.length >= s.parts.length ? e : s
const map = new Map(existing.map((msg) => [msg.info.id, msg]))
const merged = synced.map((msg) => {
const current = map.get(msg.info.id)
if (!current) return msg
return mergeMessage(current, msg)
})
const ids = new Set(merged.map((msg) => msg.info.id))

// Add any messages from existing that aren't in synced (new SSE messages)
for (const e of existing) {
if (!merged.find((m) => m.info.id === e.info.id)) {
merged.push(e)
}
for (const msg of existing) {
if (ids.has(msg.info.id)) continue
merged.push(msg)
}

return merged.sort((a, b) => cmp(a.info.id, b.info.id))
Expand Down