Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions livekit-agents/livekit/agents/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@
FunctionToolsExecutedEvent,
MetricsCollectedEvent,
ModelSettings,
RecordingExporter,
RecordingExportResult,
RecordingOptions,
RunContext,
SessionUsageUpdatedEvent,
Expand Down Expand Up @@ -201,6 +203,8 @@ def __getattr__(name: str) -> typing.Any:
"SimulationRun",
"SimulationVerdict",
"AgentSession",
"RecordingExporter",
"RecordingExportResult",
"RecordingOptions",
"text_transforms",
"AgentEvent",
Expand Down
109 changes: 69 additions & 40 deletions livekit-agents/livekit/agents/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,30 @@ def _observability_url(livekit_url: str) -> str | None:
return None


_RECORDING_EXPORT_TIMEOUT = 30.0


async def _export_session_recording(session: AgentSession, report: SessionReport) -> None:
exporter = session._recording_exporter
if exporter is None:
return

try:
result = await asyncio.wait_for(
exporter.export(report),
timeout=_RECORDING_EXPORT_TIMEOUT,
)
except asyncio.TimeoutError:
logger.exception("recording exporter timed out")
return
except Exception:
logger.exception("recording exporter failed")
return

if result is not None:
session._set_recording_export_result(result)


if TYPE_CHECKING:
from .ipc.inference_executor import InferenceExecutor
from .simulation import SimulationContext
Expand Down Expand Up @@ -266,47 +290,52 @@ async def _on_session_end(self) -> None:
if not (session := self._primary_agent_session):
return

otel_metrics.flush_turn_metrics(session.history)

c = AgentsConsole.get_instance()

# in case AgentSession.aclose() was cancelled due to timeout
if (recorder_io := session._recorder_io) and recorder_io.recording:
logger.warning("recorder_io is still recording at session end, closing it")
await recorder_io.aclose()

report = self.make_session_report(session)

# console recording, dump data to a local file
if c.enabled and c.record:
try:
report_json = json.dumps(report.to_dict(), indent=2)

import aiofiles
import aiofiles.os

await aiofiles.os.makedirs(self._session_directory, exist_ok=True)
async with aiofiles.open(
self._session_directory / "session_report.json", mode="w"
) as f:
await f.write(report_json)

except Exception:
logger.exception("failed to save session report")
try:
otel_metrics.flush_turn_metrics(session.history)

c = AgentsConsole.get_instance()

# in case AgentSession.aclose() was cancelled due to timeout
if (recorder_io := session._recorder_io) and recorder_io.recording:
logger.warning("recorder_io is still recording at session end, closing it")
await recorder_io.aclose()

report = self.make_session_report(session)

# console recording, dump data to a local file
if c.enabled and c.record:
try:
report_json = json.dumps(report.to_dict(), indent=2)

import aiofiles
import aiofiles.os

await aiofiles.os.makedirs(self._session_directory, exist_ok=True)
async with aiofiles.open(
self._session_directory / "session_report.json", mode="w"
) as f:
await f.write(report_json)

except Exception:
logger.exception("failed to save session report")

has_evals = bool(self._tagger.evaluations or self._tagger.outcome)
obs_url = _observability_url(self._info.url)
if (any(report.recording_options.values()) or has_evals) and obs_url:
try:
await _upload_session_report(
agent_name=self._info.job.agent_name,
observability_url=obs_url,
report=report,
tagger=self._tagger,
http_session=http_context.http_session(),
)
except Exception:
logger.exception("failed to upload the session report to LiveKit Cloud")

has_evals = bool(self._tagger.evaluations or self._tagger.outcome)
obs_url = _observability_url(self._info.url)
if (any(report.recording_options.values()) or has_evals) and obs_url:
try:
await _upload_session_report(
agent_name=self._info.job.agent_name,
observability_url=obs_url,
report=report,
tagger=self._tagger,
http_session=http_context.http_session(),
)
except Exception:
logger.exception("failed to upload the session report to LiveKit Cloud")
await _export_session_recording(session, report)
finally:
session._end_session_span()

def _on_cleanup(self) -> None:
# if session.start() was never reached and server wanted recording,
Expand Down
3 changes: 3 additions & 0 deletions livekit-agents/livekit/agents/telemetry/trace_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
ATTR_AGENT_NAME = "lk.agent_name"
ATTR_ROOM_NAME = "lk.room_name"
ATTR_SESSION_OPTIONS = "lk.session_options"
ATTR_RECORDING_URL = "lk.recording_url"
ATTR_RECORDING_ID = "lk.recording_id"
ATTR_RECORDING_PATH = "lk.recording_path"

# agent turn
ATTR_AGENT_TURN_ID = "lk.generation_id"
Expand Down
3 changes: 3 additions & 0 deletions livekit-agents/livekit/agents/voice/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
UserStateChangedEvent,
UserTurnExceededEvent,
)
from .recording_exporter import RecordingExporter, RecordingExportResult
from .remote_session import RemoteSession
from .room_io import (
_ParticipantAudioOutput,
Expand All @@ -30,6 +31,8 @@
__all__ = [
"AgentSession",
"RecordingOptions",
"RecordingExporter",
"RecordingExportResult",
"VoiceActivityVideoSampler",
"Agent",
"ModelSettings",
Expand Down
59 changes: 53 additions & 6 deletions livekit-agents/livekit/agents/voice/agent_session.py
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from contextlib import AbstractContextManager, asynccontextmanager, nullcontext
from contextvars import Token
from dataclasses import dataclass
from pathlib import Path
from types import TracebackType
from typing import (
TYPE_CHECKING,
Expand Down Expand Up @@ -62,6 +63,7 @@
)
from .ivr import IVRActivity
from .recorder_io import RecorderIO
from .recording_exporter import RecordingExporter, RecordingExportResult
from .remote_session import RoomSessionTransport, SessionHost, SessionTransport
from .run_result import RunResult
from .speech_handle import InputDetails, SpeechHandle
Expand Down Expand Up @@ -452,6 +454,7 @@ def __init__(
# used to keep a reference to the room io
self._room_io: room_io.RoomIO | None = None
self._recorder_io: RecorderIO | None = None
self._recording_exporter: RecordingExporter | None = None
self._session_transport: SessionTransport | None = None
self._session_transport_audio_input: TcpAudioInput | None = None
self._session_transport_audio_output: TcpAudioOutput | None = None
Expand Down Expand Up @@ -586,6 +589,15 @@ def usage(self) -> AgentSessionUsage:
"""Returns usage summaries for this session, one per model/provider combination."""
return AgentSessionUsage(model_usage=self._usage_collector.flatten())

@property
def recording_path(self) -> Path | None:
"""Path to the session audio recording, or None when audio recording is disabled.

The path may be available while recording is still active. The recording
file is only guaranteed to be complete after the session is closed.
"""
return self._recorder_io.output_path if self._recorder_io else None

def run(
self,
*,
Expand All @@ -610,6 +622,7 @@ async def start(
room: NotGivenOr[rtc.Room] = NOT_GIVEN,
room_options: NotGivenOr[room_io.RoomOptions] = NOT_GIVEN,
record: bool | RecordingOptions = True,
recording_exporter: RecordingExporter | None = None,
# deprecated
room_input_options: NotGivenOr[room_io.RoomInputOptions] = NOT_GIVEN,
room_output_options: NotGivenOr[room_io.RoomOutputOptions] = NOT_GIVEN,
Expand All @@ -624,6 +637,7 @@ async def start(
room: NotGivenOr[rtc.Room] = NOT_GIVEN,
room_options: NotGivenOr[room_io.RoomOptions] = NOT_GIVEN,
record: bool | RecordingOptions = True,
recording_exporter: RecordingExporter | None = None,
# deprecated
room_input_options: NotGivenOr[room_io.RoomInputOptions] = NOT_GIVEN,
room_output_options: NotGivenOr[room_io.RoomOutputOptions] = NOT_GIVEN,
Expand All @@ -637,6 +651,7 @@ async def start(
room: NotGivenOr[rtc.Room] = NOT_GIVEN,
room_options: NotGivenOr[room_io.RoomOptions] = NOT_GIVEN,
record: NotGivenOr[bool | RecordingOptions] = NOT_GIVEN,
recording_exporter: RecordingExporter | None = None,
# deprecated
room_input_options: NotGivenOr[room_io.RoomInputOptions] = NOT_GIVEN,
room_output_options: NotGivenOr[room_io.RoomOutputOptions] = NOT_GIVEN,
Expand All @@ -652,6 +667,7 @@ async def start(
room_input_options: Options for the room input
room_output_options: Options for the room output
record: Whether to record the audio, transcripts, traces, or logs
recording_exporter: Optional exporter called with the completed session report
"""
async with self._lock:
if self._started:
Expand All @@ -662,6 +678,9 @@ async def start(
# configure observability first
record_is_given = is_given(record)
job_ctx = get_job_context(required=False)
if recording_exporter is not None and job_ctx is None:
raise RuntimeError("recording_exporter requires an active JobContext")

if not is_given(record):
# defer to server-side setting for recording
record = job_ctx.job.enable_recording if job_ctx else False
Expand All @@ -675,6 +694,10 @@ async def start(
job_ctx._primary_agent_session = self
else:
is_primary = False
if recording_exporter is not None:
raise RuntimeError(
"recording_exporter can only be used with the primary AgentSession"
)
if any(self._recording_options.values()):
if record_is_given:
raise RuntimeError(
Expand All @@ -688,19 +711,20 @@ async def start(

job_ctx.init_recording(self._recording_options)

self._session_span = current_span = tracer.start_span("agent_session")
# we detach here to avoid context issues since tokens need to be detached
# in the same context as it was created
if self._session_ctx_token is not None:
otel_context.detach(self._session_ctx_token)
self._session_ctx_token = None
self._end_session_span()

self._session_span = current_span = tracer.start_span("agent_session")
ctx = trace.set_span_in_context(current_span)
self._session_ctx_token = otel_context.attach(ctx)

self._recorded_events = []
self._usage_collector = ModelUsageCollector()
self._room_io = None
self._recorder_io = None
self._recording_exporter = recording_exporter
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
self._session_host = None

self._closing = False
Expand Down Expand Up @@ -1030,9 +1054,8 @@ async def _aclose_impl(
return_exceptions=True,
)

if self._session_span:
self._session_span.end()
self._session_span = None
if not self._defer_session_span_end():
self._end_session_span()

self._started = False

Comment on lines 1060 to 1061

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 OTel context token not detached on session close (pre-existing)

In _aclose_impl, self._session_ctx_token is never detached — it's only detached on restart in start() at agent_session.py:714-716. This means after a primary session closes, the OTel context token remains attached until the process exits or the session restarts. This is pre-existing behavior (the old code also didn't detach in _aclose_impl), so it's not introduced by this PR, but it's worth noting as the deferred span pattern makes this more visible.

(Refers to lines 1060-1069)

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Expand Down Expand Up @@ -1061,6 +1084,30 @@ async def _aclose_impl(

logger.debug("session closed", extra={"reason": reason.value, "error": error})

def _defer_session_span_end(self) -> bool:
job_ctx = get_job_context(required=False)
return job_ctx is not None and job_ctx._primary_agent_session is self

def _end_session_span(self) -> None:
if self._session_span:
self._session_span.end()
self._session_span = None

def _set_recording_export_result(self, result: RecordingExportResult) -> None:
if self._session_span is None:
return

attributes: dict[str, Any] = dict(result.trace_attributes)
if result.recording_url:
attributes[trace_types.ATTR_RECORDING_URL] = result.recording_url
if result.recording_id:
attributes[trace_types.ATTR_RECORDING_ID] = result.recording_id
if result.recording_path:
attributes[trace_types.ATTR_RECORDING_PATH] = str(result.recording_path)

if attributes:
self._session_span.set_attributes(attributes)

async def aclose(self) -> None:
await self._aclose_impl(reason=CloseReason.USER_INITIATED)

Expand Down
42 changes: 42 additions & 0 deletions livekit-agents/livekit/agents/voice/recording_exporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from collections.abc import Mapping
from dataclasses import dataclass, field
from pathlib import Path
from typing import TYPE_CHECKING

from opentelemetry.util.types import AttributeValue

if TYPE_CHECKING:
from .report import SessionReport


@dataclass(frozen=True)
class RecordingExportResult:
"""References produced by a recording exporter.

These values may be attached to the ``agent_session`` span. Prefer governed
artifact ids or signed, expiring URLs over raw storage paths for sensitive
recordings.
"""

recording_url: str | None = None
"""Externally resolvable recording URL, ideally signed or access-controlled."""

recording_id: str | None = None
"""Opaque recording artifact id suitable for joining traces to storage."""

recording_path: str | Path | None = None
"""Optional local/storage path. Only set this when it is safe to expose in traces."""

trace_attributes: Mapping[str, AttributeValue] = field(default_factory=dict)
"""Additional OpenTelemetry attributes to attach to the session span."""


class RecordingExporter(ABC):
"""Exports a completed session recording to an external backend."""

@abstractmethod
async def export(self, report: SessionReport) -> RecordingExportResult | None:
"""Export the completed recording and return trace-linking metadata."""
Loading
Loading