feat(langgraph-ts): AG-UI StreamTransformer (v3 protocol path) for all demos#1651
Open
ranst91 wants to merge 17 commits into
Open
feat(langgraph-ts): AG-UI StreamTransformer (v3 protocol path) for all demos#1651ranst91 wants to merge 17 commits into
ranst91 wants to merge 17 commits into
Conversation
Adds an opt-in v3-protocol path to @ag-ui/langgraph that wires an
AG-UI StreamTransformer at graph compile time and exposes events on
custom:agui. agent.ts subscribes via ThreadStream and forwards them
into its run Observable. Legacy translation stays in place; opt-in
via LangGraphAgent({useTransformer: true}) per-agent.
Currently scoped to the agentic_chat TS demo (single-turn text +
frontend tool calls + state/messages snapshots). Other event families
(interrupts, reasoning, steps, custom passthrough) land in follow-up
phases.
Notable transport detail: caches one ThreadStream + one persistent
custom:agui SubscriptionHandle per threadId across clone()s. Pause/
resume bracket each run instead of close/reopen, which avoids the
LangGraph API server replaying its per-thread queuedEvents (including
prior runs' lifecycle terminals) to a fresh sink — that replay would
otherwise trip the SDK's terminal-pause path and drop all events on
the second run.
Adds Phase 3.5: interrupt handling on the v3 transformer path, plus the human_in_the_loop demo joins the transformer-enabled set in dojo. Server-side (transformer): - Forward `input.requested` events as AG-UI CUSTOM `OnInterrupt` (legacy contract; matches dojo's `useLangGraphInterrupt`). - v3 protocol surfaces `interrupt(...)` calls as `tasks` events with an `interrupts: [...]` array on the task result and a root lifecycle terminal of `completed`, NOT as `input.requested`. Scan `tasks` events for these and emit `OnInterrupt` keyed by id. - Flush snapshots on `lifecycle.interrupted` in addition to `completed` so interrupt boundaries get current state. - Switch `cacheState` from replace to shallow merge — subsequent root `values` events on the same run can carry only the keys that just changed; replacing wholesale was shipping an empty MESSAGES_SNAPSHOT and resetting the dojo UI. Client-side (agent.ts): - Skip the regenerate branch when `command.resume` is set: HITL resume is not a fork-from-checkpoint. - Resume routes through `streamingThread.respondInput(...)` instead of `submitRun(...)`. interrupt id/namespace come from the live `streamingThread.interrupts` array first, with an `agentState.tasks` fallback for cold-start ThreadStream cache misses. Demo wiring: - `examples/src/agents/human_in_the_loop/agent.ts` registers `aguiTransformer` at compile time. - `apps/dojo/src/agents.ts` adds `human_in_the_loop` to `transformerEnabled`.
…ing demos on v3 transformer Transformer (server-side): - Forward content-block-start `type: "reasoning"` (standardized v3) and `type: "thinking"` (legacy Anthropic alias) to AG-UI REASONING_START + REASONING_MESSAGE_START + initial content; tracked per content-block index. - Forward `reasoning-delta` / `thinking-delta` to REASONING_MESSAGE_CONTENT. - Dispatch content-block-finish by the FINISHING block's `type` rather than by tracker-presence, so reasoning + text that share a protocol index don't bleed into each other's END events. - Anthropic `redacted_thinking` and reasoning-block `signature` surface as REASONING_ENCRYPTED_VALUE. - Implicit text-block open: the server emits text-deltas at an index already occupied by a reasoning block, without a preceding content-block-start of type=text. Treat the first such text-delta as an implicit open and close it on message-finish (server also omits the matching content-block-finish). - Forward `input.requested` events as CUSTOM `OnInterrupt`, plus scan `tasks` events for any `interrupts: [...]` array on the task result and emit `OnInterrupt` per id (the v3 dev server surfaces HITL interrupts through `tasks` rather than `input.requested`, with the root lifecycle terminal still reported as `completed`). - Snapshot flush also fires on lifecycle `interrupted` so HITL boundaries get current state. - cacheState shallow-merges instead of replacing — later root `values` events sometimes carry only the changed keys; replacing wholesale was shipping empty messages snapshots and resetting the UI. Agent (client-side): - Skip the regenerate branch when `command.resume` is set — HITL resume is not a fork-from-checkpoint. - Resume routes through `streamingThread.respondInput(...)` instead of `submitRun(...)`. Interrupt id/namespace come from the live `streamingThread.interrupts` array, with an `agentState.tasks` fallback for cold-start ThreadStream cache misses. - Sanitizer for prior assistant messages now strips `tool_call` content blocks AND drops `response_metadata.output_version: "v1"`. The v1 flag activated langchain-core's `contentBlocks` path, which then had langchain-openai's Responses serializer mistype prior assistant text blocks as `input_text` (OpenAI 400). Dropping the flag falls back to the legacy content-array path that the Responses API accepts. Demo wiring: - examples/src/agents/human_in_the_loop/agent.ts and examples/src/agents/agentic_chat_reasoning/agent.ts register `aguiTransformer` at compile time. - apps/dojo/src/agents.ts adds both to the transformer-enabled set. - examples bumped to `@langchain/openai@^1.4.5` — 1.2.0 emitted no reasoning content blocks on v3; 1.4.5 does. - @ag-ui/langgraph bumped to `@langchain/langgraph-sdk@^1.9.2` for client-side block-index reconciliation between reasoning and text. Known gap (confirmed upstream): on OpenAI Responses with reasoning enabled, langchain-openai 1.4.5's server-side AIMessage assembler drops text deltas that land on a content-block index already occupied by a reasoning block. Text streams correctly on the wire (rendered live by the UI) but the assembled AIMessage persisted to state ends up reasoning-only, so MESSAGES_SNAPSHOT replaces the streamed text with nothing once the run finishes. Verified by calling threads.getState directly — the missing text is upstream, not in our cache.
Non-root lifecycle events bracket individual Pregel nodes. Translate them into AG-UI STEP_STARTED / STEP_FINISHED so consumers can show progress on multi-node graphs (e.g. the human-in-the-loop graph's chat_node → process_steps_node transitions). The namespace head is `nodeName:taskUuid`; strip the uuid for a readable step name. Active steps are keyed by the full namespace path so parallel tasks for the same node don't unbalance the STEP_STARTED/STEP_FINISHED pairs that AG-UI's verify enforces. Any steps still open at run end are closed in `finalize()`.
… all TS demos
Transformer:
- Handle the v3 `custom` channel. Branch on data.name:
- ManuallyEmitMessage → TEXT_MESSAGE_START/CONTENT/END
- ManuallyEmitToolCall → TOOL_CALL_START/ARGS/END
- ManuallyEmitState → merge payload into cached state, ship an
immediate STATE_SNAPSHOT, fall through to generic CUSTOM
passthrough so listeners that key off the name still receive it
- everything else → generic CUSTOM forward (preserves the legacy
`value: event.data` contract)
Demos: register `aguiTransformer` at compile time for every TS demo
(agentic_chat_multimodal, agentic_generative_ui, backend_tool_rendering,
multimodal_messages, predictive_state_updates, shared_state,
subgraphs, tool_based_generative_ui). agentic_chat, human_in_the_loop,
agentic_chat_reasoning were already wired earlier.
dojo: route all langgraph-typescript demos through the transformer
path (`useTransformer: true`). The legacy translation in agent.ts
remains in place but is no longer reachable from langgraph-typescript;
non-langgraph deployments continue to use it via `useTransformer:
false`.
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
Contributor
Python Preview PackagesVersion
Install with uvAdd the TestPyPI index to your [[tool.uv.index]]
name = "testpypi"
url = "https://test.pypi.org/simple/"
explicit = trueThen install the packages you need: # Core SDK
uv add 'ag-ui-protocol==0.0.0.dev1778539273' --index testpypi
# Integrations (each already depends on the matching ag-ui-protocol preview)
uv add 'ag-ui-langgraph==0.0.0.dev1778539273' --index testpypi
uv add 'ag-ui-crewai==0.0.0.dev1778539273' --index testpypi
# NOTE: ag-ui-agent-spec depends on pyagentspec (git-only, not on PyPI).
# You will need to install pyagentspec separately from its git repo.
uv add 'ag-ui-agent-spec==0.0.0.dev1778539273' --index testpypi
uv add 'ag_ui_adk==0.0.0.dev1778539273' --index testpypi
uv add 'ag_ui_strands==0.0.0.dev1778539273' --index testpypiInstall with pippip install \
--index-url https://test.pypi.org/simple/ \
--extra-index-url https://pypi.org/simple/ \
ag-ui-protocol==0.0.0.dev1778539273
Commit: ba07817 |
@ag-ui/a2a-middleware
@ag-ui/a2ui-middleware
@ag-ui/event-throttle-middleware
@ag-ui/mcp-apps-middleware
@ag-ui/middleware-starter
@ag-ui/a2a
@ag-ui/adk
@ag-ui/ag2
@ag-ui/agno
@ag-ui/aws-strands
@ag-ui/claude-agent-sdk
@ag-ui/crewai
@ag-ui/langchain
@ag-ui/langgraph
@ag-ui/langroid
@ag-ui/llamaindex
@ag-ui/mastra
@ag-ui/pydantic-ai
@ag-ui/server-starter
@ag-ui/server-starter-all-features
@ag-ui/vercel-ai-sdk
create-ag-ui-app
@ag-ui/client
@ag-ui/core
@ag-ui/encoder
@ag-ui/proto
commit: |
…subgraph lifecycles don't unbalance pairs AG-UI's verify enforces at most one active step per stepName. Previously we deduped by full namespace key, so a subgraph node and its inner graph node (both rooted under `experiences_agent:...`) each opened a STEP_STARTED with the same stepName, and verify rejected the second: Error: Step "experiences_agent" is already active for 'STEP_STARTED' Track active step names alongside the namespace map. The first namespace to open a stepName wins; deeper nested lifecycles whose stripped head collides are ignored until that step closes. The matching STEP_FINISHED still fires when the originating namespace's lifecycle terminates, so outer-vs-inner ordering stays balanced.
…, flush snapshots at node boundaries Constructor `useTransformer` now defaults to `true` but honors an explicit `false` from callers — every demo opted in by passing the flag, but the unit tests in `subgraph-streaming.test.ts`, `predict-state-e2e.test.ts`, and `messages-tuple.test.ts` synthesize legacy events-mode chunks and need `handleStreamEvents` to run, so they now construct their agents with `useTransformer: false`. Transformer: also flush `STATE_SNAPSHOT` + `MESSAGES_SNAPSHOT` when a non-root lifecycle event terminates with `completed`. A subgraph (or any node) can produce many intermediate `values` updates; locking in a single hash-deduped snapshot at the node/subgraph boundary gives consumers a coherent view of state as soon as that node's contribution is committed to the parent checkpoint, rather than waiting until the root run terminates.
…-stream routes 1.1.13 lacked POST /threads/:tid/commands and /threads/:tid/stream/events, so the AG-UI transformer path returned 404 in the langgraph-typescript e2e job. 1.2.1 ships the v3 protocol surface that ThreadStream uses.
…fresh header doc Removed dead state (`runStartedEmitted`, `runFinishedEmitted`, `runErrorEmitted`, `ensureRunStarted`) — leftover from when the transformer owned RUN_STARTED/RUN_FINISHED before that responsibility moved to `agent.ts`. Refreshed the file header to describe the event-family coverage that's actually shipped, dropping the phase-numbered TODO list and the stale "loose dictionary" comment.
…s is gone from @langchain/core
…actor targets
Two new test files capture the target shape of the upcoming refactor
work — they're committed red so the implementation pass can drive them
green.
prepare-stream.test.ts:
- sanitizeAssistantMessages (named export from ./agent) is a pure
helper: tool_call content blocks stripped from AI messages,
response_metadata.output_version 'v1' dropped, sibling keys and
non-AI messages preserved, no-throw on missing fields.
- transformerThreads cache shared across clone()s — second
prepareStream call on the same threadId reuses the cached
ThreadStream and its persistent custom:agui subscription instead of
re-subscribing.
- Resume routing — when forwardedProps.command.resume is set and a
pending interrupt is reachable (live streamingThread.interrupts or
agentState.tasks fallback), the agent calls respondInput; otherwise
submitRun. No-interrupt resume falls through to submitRun.
prepare-regenerate-stream.test.ts:
- useTransformer=true: regen runs through the cached ThreadStream's
submitRun({ forkFrom: { checkpointId } }) — no client.runs.stream
call, one custom:agui subscribe.
- useTransformer=false: legacy client.runs.stream path is preserved
(backwards-compatible fallback when transformer wiring isn't
available).
…op-level helper Moves the inline assistant-message sanitizer out of `prepareStream` and exports it as a top-level pure function: - Strips `tool_call` content blocks from re-sent AI messages (CopilotKit replays them on the wire; langchain 1.4 + OpenAI reject). - Drops `response_metadata.output_version: "v1"` so langchain-core's v1 contentBlocks path doesn't route prior text through langchain-openai's Responses serializer (mistypes as `input_text`). Behavior is identical to the previous inline implementation; the extraction is a precondition for the upcoming unit tests of the sanitizer in isolation and for sharing it with `prepareRegenerateStream`.
… three private helpers `prepareStream`'s transformer path is now an orchestrator (~30 lines) on top of three private helpers, all behavior-preserving: - `acquireTransformerThread(threadId)` — get-or-create the cached `(ThreadStream, custom:agui SubscriptionHandle)` pair. The sub is opened once and reused across every run on the thread so server-side replays never land on a fresh sink. - `findPendingInterrupt(thread, agentState, resume?)` — resolves which interrupt to resume against; live `thread.interrupts` first, then `agentState.tasks` fallback for ThreadStream-cache cold starts. - `watchForRootTerminal(thread, sub)` — registers the per-run onEvent listener that pauses the persistent sub when the root lifecycle terminates; returns its unsubscribe. Casts remain `any` for now — type tightening lands in the next commit so this change reads as pure extraction.
…ts, use SDK exports
Imports `ThreadStream` and `SubscriptionHandle` from
`@langchain/langgraph-sdk` and introduces a `TransformerThreadEntry`
interface to type the per-thread cache and the three private helpers
extracted in the previous commit.
Casts removed:
- `Map<string, { thread: any; aguiSub: any }>` →
`Map<string, TransformerThreadEntry>`.
- `(thread as any).subscribe(...)` → typed handle; result narrowed to
`SubscriptionHandle<any, ProcessedEvents>` (the named-custom unwrap
yields ProcessedEvents payloads).
- `(streamingThread as any).onEvent(...)` → typed call; event payload
narrowed inside the handler.
- `(aguiSub as any)?.pause?.()` → `aguiSub.pause()` on the typed
handle.
- `(streamingThread as any).submitRun(...)` /
`.respondInput(...)` → typed call sites.
- `streamResponse: aguiSub as any` → returns the typed handle directly.
- `(stream as any)?.close?.()` in the run handler → narrow cast to
`{ close?: () => void | Promise<void> } | undefined` so the optional
closer is invoked only when present.
No behavior change; this is the type pass on top of the helper split.
…rity
When `useTransformer` is enabled, regenerate now reuses the cached
ThreadStream + custom:agui subscription via
`streamingThread.submitRun({ ..., forkFrom: { checkpointId } })` —
the v3 protocol primitive for forking a new run from an explicit
checkpoint. The sanitizer is applied to the regen input too, matching
`prepareStream`.
When `useTransformer` is disabled (or `streamingThread` can't be
acquired), the existing legacy `client.runs.stream(...)` path is
preserved verbatim — backwards-compatible fallback for callers that
haven't opted into the transformer path.
Closes the last gap where the regen flow bypassed the transformer
and went through legacy translation regardless of agent config.
…west-first getHistory order `threads.getHistory` returns checkpoints newest-first; `getCheckpointByMessage` reverses to walk oldest-first and finds the first checkpoint containing the target message. The previous fixture ordered ck-old before ck-new, so after reverse the search found ck-new (which had both u1 and a1), saw `messagesAfter` non-empty, and recursed on the parent — but the mock returned the same list every time, so the search never terminated and the worker timed out. Swap to newest-first so the reversed walk lands on ck-old (only u1, no messagesAfter) and returns immediately.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds an opt-in v3-protocol streaming path to
@ag-ui/langgraph, wired into every LangGraph TypeScript demo and routed through dojo. Each compiled graph registers anaguiTransformerthat translates LangGraphProtocolEvents into AG-UI events on acustom:aguichannel;LangGraphAgentsubscribes viaclient.threads.stream(...)and forwards them into the run Observable. The legacy SSE translation stays in place behinduseTransformer: falsefor non-langgraph integrations.What changed
Transformer (
integrations/langgraph/typescript/src/transformer/agui-transformer.ts)RUN_STARTED/RUN_FINISHEDare owned byagent.ts; the transformer forwards rootlifecycle.failedasRUN_ERRORand flushes snapshots on rootcompletedandinterrupted.content-block-start/-delta/-finishfortext,tool_call_chunk/tool_call,reasoning(+ Anthropic legacythinking), andredacted_thinking(+ reasoningsignature) → AG-UITEXT_MESSAGE_*,TOOL_CALL_*,REASONING_*,REASONING_ENCRYPTED_VALUE. Tool-call args are diffed against a cumulative buffer so AG-UI sees true deltas. Reasoning + text can share a protocol index — finish events dispatch by block type, and a first text-delta at an occupied index implicitly opens a text block (closed onmessage-finish).valuesevents via shallow merge so later partial values don't drop unchanged keys; flush exactly oneSTATE_SNAPSHOT+ oneMESSAGES_SNAPSHOTat root terminal.CUSTOM OnInterruptfrom eitherinput.requestedevents or any task'sinterrupts: []array (v3 dev server surfaces HITL viatasks, notinput.requested).lifecycleevents becomeSTEP_STARTED/STEP_FINISHEDkeyed by namespace; any active steps are closed infinalize().ManuallyEmitMessage→ text events;ManuallyEmitToolCall→ tool-call events;ManuallyEmitState→ cache merge + immediateSTATE_SNAPSHOT; everything else forwards verbatim asCUSTOM.Agent (
integrations/langgraph/typescript/src/agent.ts)useTransformerflag opts aLangGraphAgentinto the v3 path.ThreadStream+ persistentcustom:aguiSubscriptionHandlecached acrossclone()s. The persistent sub is attached before the first run so server-siderecord.queuedEventsreplay never lands on it. Pause/resume bracket each run (submitRun's#prepareForNextRunauto-resumes).submitRun(narrow lifecycle) +respondInputon resume, withstreamingThread.interruptsandagentState.tasksas id/namespace sources.tool_callcontent blocks from re-sent AI messages and dropsresponse_metadata.output_version: "v1"to keep langchain-openai's Responses serializer from mistyping prior assistant text asinput_text(OpenAI 400).command.resumeis set.Demos + dojo
aguiTransformerregistered at compile time on every TS demo:agentic_chat,agentic_chat_multimodal,agentic_chat_reasoning,agentic_generative_ui,backend_tool_rendering,human_in_the_loop,multimodal_messages,predictive_state_updates,shared_state,subgraphs,tool_based_generative_ui.apps/dojo/src/agents.tsroutes everylanggraph-typescriptgraph through the transformer path.examplesbumped to@langchain/openai@^1.4.5(1.2.0 emitted no reasoning content blocks on v3);@ag-ui/langgraphbumped to@langchain/langgraph-sdk@^1.9.2for client-side reconciliation between reasoning and text blocks.Known gap (parked, confirmed upstream)
On OpenAI Responses with reasoning enabled,
@langchain/openai1.4.5's server-sideAIMessageassembler drops text deltas that land on a content-block index already occupied by a reasoning block. Text streams on the wire correctly (transformer + UI render it live), but the persistedAIMessageends up reasoning-only, soMESSAGES_SNAPSHOTreplaces the streamed text with nothing once the run finishes. Verified by callingthreads.getStatedirectly — the missing text is in the persisted state, not in our cache. Tracking upstream; Anthropic (type: "thinking"content blocks) is expected to work end-to-end.Test plan
agentic_chatend-to-end (text + frontend tool call) on the transformer path.human_in_the_loopend-to-end including resume viarespondInputand confirmingOnInterruptis sourced fromtasks.interrupts[]when v3 doesn't fireinput.requested.agentic_chat_reasoning— reasoning deltas stream asREASONING_*events and the live text answer renders (known gap on snapshot replacement documented above).chat_nodeandprocess_steps_nodein HITL run.apps/dojoplaywrightlanggraphTypescriptTests/*— pending an aimock-routed dojo + langgraph dev run; the transformer-mode dojo + langgraph dev currently hit real OpenAI in this branch's local setup.Notes for reviewers
handleStreamEventsis left intact, and any deployment passinguseTransformer: falsestill goes through the old SSE translator.ThreadStreamis load-bearing — without it, fresh sinks receive a server-side replay of prior-run lifecycle terminals and the SDK's terminal-pause path drops the live run's events.dispatchEvent/Subscriber.nextsemantics for the transformer path are intentionally identical to legacy so downstream consumers (CopilotKit, dojo widgets) don't need branch-aware logic.