-
Notifications
You must be signed in to change notification settings - Fork 2.5k
fix(daemon): isolate parallel subAgent text streams in transcript reducer #4689
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
2cb848a
bb8a7ab
079284e
0f53b37
dbdb8f6
ee53f7e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -69,12 +69,16 @@ export class MessageEmitter extends BaseEmitter { | |
| async emitAgentThought( | ||
| text: string, | ||
| timestamp?: string | number, | ||
| subagentMeta?: SubagentMeta, | ||
| ): Promise<void> { | ||
| const epochMs = BaseEmitter.toEpochMs(timestamp); | ||
| const _meta = this.buildChunkMeta( | ||
| BaseEmitter.toEpochMs(timestamp), | ||
| subagentMeta, | ||
| ); | ||
| await this.sendUpdate({ | ||
| sessionUpdate: 'agent_thought_chunk', | ||
| content: { type: 'text', text }, | ||
| ...(epochMs != null && { _meta: { timestamp: epochMs } }), | ||
| ...(_meta ? { _meta } : {}), | ||
| }); | ||
| } | ||
|
|
||
|
|
@@ -87,12 +91,16 @@ export class MessageEmitter extends BaseEmitter { | |
| async emitAgentMessage( | ||
| text: string, | ||
| timestamp?: string | number, | ||
| subagentMeta?: SubagentMeta, | ||
| ): Promise<void> { | ||
| const epochMs = BaseEmitter.toEpochMs(timestamp); | ||
| const _meta = this.buildChunkMeta( | ||
| BaseEmitter.toEpochMs(timestamp), | ||
| subagentMeta, | ||
| ); | ||
| await this.sendUpdate({ | ||
| sessionUpdate: 'agent_message_chunk', | ||
| content: { type: 'text', text }, | ||
| ...(epochMs != null && { _meta: { timestamp: epochMs } }), | ||
| ...(_meta ? { _meta } : {}), | ||
| }); | ||
| } | ||
|
|
||
|
|
@@ -139,12 +147,28 @@ export class MessageEmitter extends BaseEmitter { | |
| role: 'user' | 'assistant', | ||
| isThought: boolean = false, | ||
| timestamp?: string | number, | ||
| subagentMeta?: SubagentMeta, | ||
| ): Promise<void> { | ||
| if (role === 'user') { | ||
| return this.emitUserMessage(text, timestamp); | ||
| } | ||
| return isThought | ||
| ? this.emitAgentThought(text, timestamp) | ||
| : this.emitAgentMessage(text, timestamp); | ||
| ? this.emitAgentThought(text, timestamp, subagentMeta) | ||
| : this.emitAgentMessage(text, timestamp, subagentMeta); | ||
| } | ||
|
|
||
| private buildChunkMeta( | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Suggestion] 两种不一致的序列化模式
— qwen3.7-max via Qwen Code /review |
||
| epochMs: number | undefined, | ||
| subagentMeta?: SubagentMeta, | ||
| ): Record<string, unknown> | undefined { | ||
| const meta: Record<string, unknown> = {}; | ||
| if (subagentMeta?.parentToolCallId) { | ||
| meta['parentToolCallId'] = subagentMeta.parentToolCallId; | ||
| } | ||
| if (subagentMeta?.subagentType) { | ||
| meta['subagentType'] = subagentMeta.subagentType; | ||
| } | ||
| if (epochMs != null) meta['timestamp'] = epochMs; | ||
| return Object.keys(meta).length > 0 ? meta : undefined; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -408,11 +408,29 @@ function normalizeSessionUpdate( | |
| } | ||
| case 'agent_message_chunk': { | ||
| const text = getTextContent(update['content']); | ||
| return text ? [{ ...base, type: 'assistant.text.delta', text }] : []; | ||
| if (!text) return []; | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Suggestion] 风格不一致:此处使用 early-return guard( — qwen3.7-max via Qwen Code /review |
||
| const parentToolCallId = extractParentToolCallId(update); | ||
| return [ | ||
| { | ||
| ...base, | ||
| type: 'assistant.text.delta' as const, | ||
| text, | ||
| ...(parentToolCallId ? { parentToolCallId } : {}), | ||
| }, | ||
| ]; | ||
| } | ||
| case 'agent_thought_chunk': { | ||
| const text = getTextContent(update['content']); | ||
| return text ? [{ ...base, type: 'thought.text.delta', text }] : []; | ||
| if (!text) return []; | ||
| const parentToolCallId = extractParentToolCallId(update); | ||
| return [ | ||
| { | ||
| ...base, | ||
| type: 'thought.text.delta' as const, | ||
| text, | ||
| ...(parentToolCallId ? { parentToolCallId } : {}), | ||
| }, | ||
| ]; | ||
| } | ||
| case 'tool_call': | ||
| case 'tool_call_update': | ||
|
|
@@ -465,6 +483,13 @@ function normalizeSessionUpdate( | |
| } | ||
| } | ||
|
|
||
| function extractParentToolCallId( | ||
| update: Record<string, unknown>, | ||
| ): string | undefined { | ||
| const meta = isRecord(update['_meta']) ? update['_meta'] : undefined; | ||
| return meta ? getString(meta, 'parentToolCallId') : undefined; | ||
| } | ||
|
|
||
| function normalizeToolUpdate( | ||
| update: Record<string, unknown>, | ||
| base: NormalizedEventBase, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,6 +13,7 @@ import type { | |
| DaemonTranscriptReducerOptions, | ||
| DaemonTranscriptState, | ||
| DaemonUiEvent, | ||
| DaemonUiTextEvent, | ||
| DaemonUserShellTranscriptBlock, | ||
| } from './types.js'; | ||
| import { DAEMON_PLAN_TOOL_CALL_ID } from './types.js'; | ||
|
|
@@ -42,6 +43,8 @@ export function createDaemonTranscriptState( | |
| toolBlockByCallId: {}, | ||
| trimmedToolNotificationByCallId: {}, | ||
| permissionBlockByRequestId: {}, | ||
| activeAssistantBlockByParent: {}, | ||
| activeThoughtBlockByParent: {}, | ||
| // PR-E sidechannel: track current tool / approval mode / progress | ||
| toolProgress: {}, | ||
| awaitingResync: false, | ||
|
|
@@ -98,6 +101,8 @@ export function appendLocalUserTranscriptMessage( | |
| next.activeUserBlockId = block.id; | ||
| next.activeAssistantBlockId = undefined; | ||
| next.activeThoughtBlockId = undefined; | ||
| next.activeAssistantBlockByParent = {}; | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Suggestion] 直接 for (const blockId of Object.values(next.activeAssistantBlockByParent)) {
const block = getWritableBlockById(next, blockId);
if (block?.kind === 'assistant') {
block.streaming = false;
block.updatedAt = next.now;
}
}
next.activeAssistantBlockByParent = {};
next.activeThoughtBlockByParent = {};— qwen3.7-max via Qwen Code /review |
||
| next.activeThoughtBlockByParent = {}; | ||
| return trimTranscriptState(next); | ||
| } | ||
|
|
||
|
|
@@ -364,6 +369,10 @@ export function selectPendingPermissionBlocks( | |
| ); | ||
| } | ||
|
|
||
| // Keyed (parentToolCallId) and scalar (activeAssistantBlockId) paths are | ||
| // fully independent. Neither clears nor finalizes the other's blocks. | ||
| // Only finishAssistant() or clearActiveText() with matching parentToolCallId | ||
| // can finalize keyed-path blocks. | ||
| function appendTextDelta( | ||
| state: DaemonTranscriptState, | ||
| kind: 'user' | 'assistant' | 'thought', | ||
|
|
@@ -374,7 +383,24 @@ function appendTextDelta( | |
| text: string, | ||
| event: DaemonUiEvent, | ||
| ): void { | ||
| const existing = getWritableBlockById(state, state[activeKey]); | ||
| const parentId = | ||
|
doudouOUC marked this conversation as resolved.
|
||
| kind !== 'user' && 'parentToolCallId' in event | ||
| ? (event as DaemonUiTextEvent).parentToolCallId | ||
| : undefined; | ||
|
doudouOUC marked this conversation as resolved.
|
||
|
|
||
| const parentMap = | ||
| parentId != null | ||
| ? kind === 'assistant' | ||
| ? state.activeAssistantBlockByParent | ||
| : kind === 'thought' | ||
| ? state.activeThoughtBlockByParent | ||
| : undefined | ||
| : undefined; | ||
|
|
||
| const effectiveId = | ||
| parentMap && parentId != null ? parentMap[parentId] : state[activeKey]; | ||
|
|
||
| const existing = getWritableBlockById(state, effectiveId); | ||
| if (existing && existing.kind === kind) { | ||
| existing.text = appendBoundedText(existing.text, text); | ||
| existing.updatedAt = state.now; | ||
|
|
@@ -392,11 +418,37 @@ function appendTextDelta( | |
| ); | ||
| if (kind === 'assistant') block.streaming = true; | ||
| if (kind === 'thought') block.collapsed = true; | ||
| if (parentId != null) { | ||
| (block as DaemonTextTranscriptBlock).parentToolCallId = parentId; | ||
| } | ||
| appendBlock(state, block); | ||
| state[activeKey] = block.id; | ||
| if (kind !== 'user') state.activeUserBlockId = undefined; | ||
| if (kind !== 'assistant') state.activeAssistantBlockId = undefined; | ||
| if (kind !== 'thought') state.activeThoughtBlockId = undefined; | ||
|
|
||
| if (parentMap && parentId != null) { | ||
| parentMap[parentId] = block.id; | ||
| } else { | ||
| state[activeKey] = block.id; | ||
| } | ||
|
|
||
| if (parentId != null) { | ||
| if (kind === 'assistant') { | ||
| delete state.activeThoughtBlockByParent[parentId]; | ||
| } | ||
| if (kind === 'thought') { | ||
| const evictedAssistId = state.activeAssistantBlockByParent[parentId]; | ||
| if (evictedAssistId) { | ||
| const evicted = getWritableBlockById(state, evictedAssistId); | ||
| if (evicted?.kind === 'assistant') { | ||
| evicted.streaming = false; | ||
| evicted.updatedAt = state.now; | ||
| } | ||
| } | ||
| delete state.activeAssistantBlockByParent[parentId]; | ||
|
doudouOUC marked this conversation as resolved.
|
||
| } | ||
| } else { | ||
| if (kind !== 'user') state.activeUserBlockId = undefined; | ||
| if (kind !== 'assistant') state.activeAssistantBlockId = undefined; | ||
| if (kind !== 'thought') state.activeThoughtBlockId = undefined; | ||
| } | ||
| } | ||
|
|
||
| function finishAssistant(state: DaemonTranscriptState): void { | ||
|
|
@@ -406,6 +458,16 @@ function finishAssistant(state: DaemonTranscriptState): void { | |
| existing.updatedAt = state.now; | ||
| } | ||
| state.activeAssistantBlockId = undefined; | ||
|
|
||
| for (const blockId of Object.values(state.activeAssistantBlockByParent)) { | ||
| const block = getWritableBlockById(state, blockId); | ||
| if (block?.kind === 'assistant') { | ||
| block.streaming = false; | ||
| block.updatedAt = state.now; | ||
| } | ||
| } | ||
| state.activeAssistantBlockByParent = {}; | ||
| state.activeThoughtBlockByParent = {}; | ||
| } | ||
|
|
||
| function upsertToolBlock( | ||
|
|
@@ -542,7 +604,7 @@ function upsertToolBlock( | |
| // never points at it. Effective-status keeps the pointer in sync | ||
| // with what was actually written to the block. | ||
| updateCurrentToolPointer(state, event.toolCallId, event.status ?? 'pending'); | ||
| clearActiveText(state); | ||
| clearActiveText(state, event.parentToolCallId); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -842,6 +904,8 @@ function cloneTranscriptState( | |
| blocks: state.blocks, | ||
| blockIndexById: state.blockIndexById, | ||
| toolBlockByCallId: { ...state.toolBlockByCallId }, | ||
| activeAssistantBlockByParent: { ...state.activeAssistantBlockByParent }, | ||
| activeThoughtBlockByParent: { ...state.activeThoughtBlockByParent }, | ||
| trimmedToolNotificationByCallId: { | ||
| ...state.trimmedToolNotificationByCallId, | ||
| }, | ||
|
|
@@ -926,6 +990,20 @@ function trimTranscriptState( | |
| if (!keptIds.has(state.activeThoughtBlockId ?? '')) { | ||
| state.activeThoughtBlockId = undefined; | ||
| } | ||
| for (const [parentId, blockId] of Object.entries( | ||
| state.activeAssistantBlockByParent, | ||
| )) { | ||
| if (!keptIds.has(blockId)) { | ||
| delete state.activeAssistantBlockByParent[parentId]; | ||
| } | ||
| } | ||
| for (const [parentId, blockId] of Object.entries( | ||
| state.activeThoughtBlockByParent, | ||
| )) { | ||
| if (!keptIds.has(blockId)) { | ||
| delete state.activeThoughtBlockByParent[parentId]; | ||
| } | ||
| } | ||
| return state; | ||
| } | ||
|
|
||
|
|
@@ -1021,10 +1099,26 @@ function allocateBlockId(state: DaemonTranscriptState, prefix: string): string { | |
| return id; | ||
| } | ||
|
|
||
| function clearActiveText(state: DaemonTranscriptState): void { | ||
| finishAssistant(state); | ||
| state.activeUserBlockId = undefined; | ||
| state.activeThoughtBlockId = undefined; | ||
| function clearActiveText( | ||
| state: DaemonTranscriptState, | ||
| parentToolCallId?: string, | ||
| ): void { | ||
| if (parentToolCallId) { | ||
| const assistId = state.activeAssistantBlockByParent[parentToolCallId]; | ||
| if (assistId) { | ||
| const block = getWritableBlockById(state, assistId); | ||
| if (block?.kind === 'assistant') { | ||
| block.streaming = false; | ||
| block.updatedAt = state.now; | ||
| } | ||
| delete state.activeAssistantBlockByParent[parentToolCallId]; | ||
| } | ||
| delete state.activeThoughtBlockByParent[parentToolCallId]; | ||
|
doudouOUC marked this conversation as resolved.
|
||
| } else { | ||
| finishAssistant(state); | ||
| state.activeUserBlockId = undefined; | ||
| state.activeThoughtBlockId = undefined; | ||
| } | ||
| } | ||
|
|
||
| function appendBoundedText(existing: string, text: string): string { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.