Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/transcript-close-timeout.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@livekit/agents': patch
---

fix(voice): add timeout fallback while closing transcript segments
29 changes: 26 additions & 3 deletions agents/src/voice/transcription/synchronizer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import {
} from '../io.js';

const STANDARD_SPEECH_RATE = 3.83; // hyphens (syllables) per second
// max time close() waits for transcript forwarding to drain before moving on
const SEGMENT_CLOSE_TIMEOUT = 5000;

interface TextSyncOptions {
speed: number;
Expand Down Expand Up @@ -551,7 +553,19 @@ class SegmentSynchronizerImpl {
this.outputStreamWriter.close();
}
this.textData.wordStream.close();
await this.captureTask;

const timedOut = Symbol('timedOut');
let timeout: ReturnType<typeof setTimeout> | undefined;
const result = await Promise.race([
this.captureTask,
new Promise<typeof timedOut>((resolve) => {
timeout = setTimeout(() => resolve(timedOut), SEGMENT_CLOSE_TIMEOUT);
}),
]).finally(() => clearTimeout(timeout));

if (result === timedOut) {
this.logger.warn('SegmentSynchronizerImpl.close timed out draining capture task');
}
}
}

Expand Down Expand Up @@ -683,14 +697,23 @@ export class TranscriptionSynchronizer {

private async rotateSegmentTaskImpl(abort: AbortSignal, oldTask?: Task<void>) {
if (oldTask) {
await oldTask.result;
try {
await oldTask.result;
} catch {
// Continue rotating so the synchronizer is not left pointing at a closed impl.
}
}

if (abort.aborted) {
return;
}

await this._impl.close();
const oldImpl = this._impl;
try {
await oldImpl.close();
} catch (error) {
this.logger.error({ error }, 'failed to close segment synchronizer impl during rotation');
}
this._impl = new SegmentSynchronizerImpl(this.options, this.textOutput.nextInChain, true);

if (this._paused) {
Expand Down