Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions frontend/src/client/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20371,6 +20371,7 @@ export const $Role = {
"tracecat-cli",
"tracecat-executor",
"tracecat-agent-executor",
"tracecat-case-duration-sync",
"tracecat-case-triggers",
"tracecat-llm-gateway",
"tracecat-mcp",
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/client/services.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10805,7 +10805,7 @@ export const caseDurationsDeleteCaseDurationDefinition = (

/**
* List Case Durations
* Sync and list case durations for the provided case.
* List materialized case durations for the provided case.
* @param data The data for the request.
* @param data.caseId
* @param data.workspaceId
Expand Down
2 changes: 2 additions & 0 deletions frontend/src/client/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6158,6 +6158,7 @@ export type Role = {
| "tracecat-cli"
| "tracecat-executor"
| "tracecat-agent-executor"
| "tracecat-case-duration-sync"
| "tracecat-case-triggers"
| "tracecat-llm-gateway"
| "tracecat-mcp"
Expand All @@ -6178,6 +6179,7 @@ export type service_id =
| "tracecat-cli"
| "tracecat-executor"
| "tracecat-agent-executor"
| "tracecat-case-duration-sync"
| "tracecat-case-triggers"
| "tracecat-llm-gateway"
| "tracecat-mcp"
Expand Down
241 changes: 241 additions & 0 deletions tests/integration/test_case_duration_benchmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,18 @@
TRACECAT_CASE_DURATION_BENCHMARK_UPDATES_PER_CASE
TRACECAT_CASE_DURATION_BENCHMARK_HEALTH_INTERVAL_MS
TRACECAT_CASE_DURATION_BENCHMARK_HEALTH_TIMEOUT_MS
TRACECAT_CASE_DURATION_BENCHMARK_HOT_CASE_MUTATORS
TRACECAT_CASE_DURATION_BENCHMARK_HOT_CASE_MUTATIONS
TRACECAT_CASE_DURATION_BENCHMARK_HOT_CASE_LOADS
TRACECAT_CASE_DURATION_BENCHMARK_HOT_CASE_BASELINE_LOADS
TRACECAT_CASE_DURATION_BENCHMARK_HOT_CASE_LOAD_INTERVAL_MS
TRACECAT_CASE_DURATION_BENCHMARK_OUTPUT
"""

from __future__ import annotations

import asyncio
import contextlib
import json
import math
import os
Expand All @@ -39,6 +45,8 @@
from tracecat.api.app import app
from tracecat.auth.types import Role
from tracecat.authz.scopes import ADMIN_SCOPES
from tracecat.cases.durations import consumer as duration_sync_consumer
from tracecat.cases.durations.consumer import CaseDurationSyncConsumer
from tracecat.cases.durations.schemas import (
CaseDurationAnchorSelection,
CaseDurationDefinitionCreate,
Expand All @@ -53,6 +61,7 @@
from tracecat.cases.schemas import CaseCreate, CaseUpdate
from tracecat.cases.service import CasesService
from tracecat.db.models import CaseEvent, Organization, Workspace
from tracecat.redis.client import get_redis_client

RUN_BENCHMARKS = os.environ.get("TRACECAT_RUN_CASE_DURATION_BENCHMARKS") == "1"
BENCHMARK_OUTPUT_PATH = os.environ.get("TRACECAT_CASE_DURATION_BENCHMARK_OUTPUT")
Expand Down Expand Up @@ -94,6 +103,25 @@ class CaseDurationBurstBenchmarkConfig:
)
/ 1000
)
hot_case_mutators: int = int(
os.environ.get("TRACECAT_CASE_DURATION_BENCHMARK_HOT_CASE_MUTATORS") or 4
)
hot_case_mutations: int = int(
os.environ.get("TRACECAT_CASE_DURATION_BENCHMARK_HOT_CASE_MUTATIONS") or 8
)
hot_case_loads: int = int(
os.environ.get("TRACECAT_CASE_DURATION_BENCHMARK_HOT_CASE_LOADS") or 12
)
hot_case_baseline_loads: int = int(
os.environ.get("TRACECAT_CASE_DURATION_BENCHMARK_HOT_CASE_BASELINE_LOADS") or 3
)
hot_case_load_interval_s: float = (
int(
os.environ.get("TRACECAT_CASE_DURATION_BENCHMARK_HOT_CASE_LOAD_INTERVAL_MS")
or 10
)
/ 1000
)


def _percentile(values: list[float], percentile: float) -> float | None:
Expand Down Expand Up @@ -297,6 +325,77 @@ async def update_one_case(
)


async def _sync_initial_case_durations(*, async_engine, role: Role, case_id: uuid.UUID):
async with AsyncSession(async_engine, expire_on_commit=False) as session:
await CaseDurationService(session=session, role=role).sync_case_durations(
case_id
)
await session.commit()


async def _load_case_page_once(
*,
async_engine,
role: Role,
case_id: uuid.UUID,
) -> float:
async def load_case_detail() -> None:
async with AsyncSession(async_engine, expire_on_commit=False) as session:
case = await CasesService(session=session, role=role).get_case(
case_id,
track_view=True,
)
if case is None:
raise AssertionError(f"Case {case_id} not found during benchmark")

async def load_case_durations() -> None:
async with AsyncSession(async_engine, expire_on_commit=False) as session:
await CaseDurationService(session=session, role=role).list_durations(
case_id
)

started = time.perf_counter()
await asyncio.gather(load_case_detail(), load_case_durations())
return time.perf_counter() - started


async def _load_case_page_repeatedly(
*,
async_engine,
role: Role,
case_id: uuid.UUID,
load_count: int,
interval_s: float,
) -> list[float]:
latencies: list[float] = []
for _ in range(load_count):
latencies.append(
await _load_case_page_once(
async_engine=async_engine,
role=role,
case_id=case_id,
)
)
await asyncio.sleep(interval_s)
return latencies


async def _run_hot_case_update_burst(
*,
async_engine,
role: Role,
case_id: uuid.UUID,
mutators: int,
mutations_per_mutator: int,
) -> tuple[list[float], int]:
return await _run_case_update_burst(
async_engine=async_engine,
role=role,
case_ids=[case_id for _ in range(mutators)],
updates_per_case=mutations_per_mutator,
)


@pytest.mark.anyio
async def test_case_duration_update_burst_health_latency(
monkeypatch: pytest.MonkeyPatch,
Expand Down Expand Up @@ -429,3 +528,145 @@ async def probe_health() -> None:
assert health_latencies["burst"]
finally:
await async_engine.dispose()


@pytest.mark.anyio
async def test_hot_case_load_latency_during_async_duration_mutation_burst(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Measure case-load latency while same-case mutations enqueue async sync."""

cfg = CaseDurationBurstBenchmarkConfig(case_count=1)
stream_suffix = uuid.uuid4().hex[:8]
monkeypatch.setattr(config, "TRACECAT__CASE_TRIGGERS_ENABLED", False)
monkeypatch.setattr(config, "TRACECAT__CASE_DURATION_SYNC_ENABLED", False)
monkeypatch.setattr(
config,
"TRACECAT__CASE_DURATION_SYNC_STREAM_KEY",
f"case-duration-sync-benchmark-{stream_suffix}",
)
monkeypatch.setattr(
config,
"TRACECAT__CASE_DURATION_SYNC_GROUP",
f"case-duration-sync-benchmark-{stream_suffix}",
)

async_engine = create_async_engine(
TEST_DB_CONFIG.test_url,
poolclass=NullPool,
)
consumer_task: asyncio.Task[None] | None = None

@contextlib.asynccontextmanager
async def benchmark_bypass_session():
async with AsyncSession(async_engine, expire_on_commit=False) as session:
yield session

try:
with (
patch.object(
CaseDurationDefinitionService,
"has_entitlement",
new=AsyncMock(return_value=True),
),
patch.object(
CaseDurationService,
"has_entitlement",
new=AsyncMock(return_value=True),
),
patch.object(
CasesService,
"has_entitlement",
new=AsyncMock(return_value=False),
),
patch.object(
duration_sync_consumer,
"get_async_session_bypass_rls_context_manager",
benchmark_bypass_session,
),
):
role = await _seed_benchmark_role(async_engine)
case_ids = await _seed_cases_definitions_and_history(
async_engine=async_engine,
role=role,
cfg=cfg,
)
case_id = case_ids[0]
await _sync_initial_case_durations(
async_engine=async_engine,
role=role,
case_id=case_id,
)
monkeypatch.setattr(config, "TRACECAT__CASE_DURATION_SYNC_ENABLED", True)
consumer = CaseDurationSyncConsumer(
await get_redis_client(),
consumer_name=f"duration-benchmark-{uuid.uuid4().hex[:8]}",
)
consumer_task = asyncio.create_task(consumer.run())
await asyncio.sleep(0.1)

baseline_loads = await _load_case_page_repeatedly(
async_engine=async_engine,
role=role,
case_id=case_id,
load_count=cfg.hot_case_baseline_loads,
interval_s=cfg.hot_case_load_interval_s,
)

load_task = asyncio.create_task(
_load_case_page_repeatedly(
async_engine=async_engine,
role=role,
case_id=case_id,
load_count=cfg.hot_case_loads,
interval_s=cfg.hot_case_load_interval_s,
)
)
mutation_task = asyncio.create_task(
_run_hot_case_update_burst(
async_engine=async_engine,
role=role,
case_id=case_id,
mutators=cfg.hot_case_mutators,
mutations_per_mutator=cfg.hot_case_mutations,
)
)
burst_loads, (mutation_latencies, mutation_errors) = await asyncio.gather(
load_task,
mutation_task,
)
await asyncio.sleep(0.5)

summary: dict[str, object] = {
"config": {
"cases": cfg.case_count,
"definitions": cfg.definition_count,
"history_events_per_case": cfg.history_events_per_case,
"hot_case_mutators": cfg.hot_case_mutators,
"hot_case_mutations": cfg.hot_case_mutations,
"hot_case_loads": cfg.hot_case_loads,
"hot_case_baseline_loads": cfg.hot_case_baseline_loads,
"hot_case_load_interval_ms": round(cfg.hot_case_load_interval_s * 1000),
},
"case_load_baseline": _latency_stats(baseline_loads),
"case_load_burst": _latency_stats(burst_loads),
"mutation_latencies": _latency_stats(mutation_latencies),
"mutation_errors": mutation_errors,
}
_write_summary_to_file(summary)

print("\nHot case async duration sync benchmark:")
print(summary)

assert baseline_loads
assert burst_loads
assert mutation_latencies
assert mutation_errors == 0
finally:
if consumer_task is not None:
consumer_task.cancel()
try:
await consumer_task
except asyncio.CancelledError:
pass
await async_engine.dispose()
37 changes: 37 additions & 0 deletions tests/unit/test_case_duration_router.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import uuid
from unittest.mock import AsyncMock

import pytest
from sqlalchemy.ext.asyncio import AsyncSession

from tracecat.cases.durations.router import list_case_durations
from tracecat.cases.durations.service import CaseDurationService

pytestmark = pytest.mark.usefixtures("db")


@pytest.mark.anyio
async def test_list_case_durations_is_read_only(
session: AsyncSession,
svc_role,
monkeypatch: pytest.MonkeyPatch,
) -> None:
sync_mock = AsyncMock()
list_mock = AsyncMock(return_value=[])
commit_mock = AsyncMock()

monkeypatch.setattr(CaseDurationService, "sync_case_durations", sync_mock)
monkeypatch.setattr(CaseDurationService, "list_durations", list_mock)
monkeypatch.setattr(session, "commit", commit_mock)

case_id = uuid.uuid4()
result = await list_case_durations(
role=svc_role,
session=session,
case_id=case_id,
)

assert result == []
sync_mock.assert_not_awaited()
commit_mock.assert_not_awaited()
list_mock.assert_awaited_once_with(case_id)
Loading
Loading