diff --git a/hindsight-api-slim/hindsight_api/alembic/versions/c7e9f1a3b5d2_maintenance_routines_skip_vanished_schemas.py b/hindsight-api-slim/hindsight_api/alembic/versions/c7e9f1a3b5d2_maintenance_routines_skip_vanished_schemas.py new file mode 100644 index 000000000..c53e28844 --- /dev/null +++ b/hindsight-api-slim/hindsight_api/alembic/versions/c7e9f1a3b5d2_maintenance_routines_skip_vanished_schemas.py @@ -0,0 +1,158 @@ +"""Make maintenance routines resilient to schemas that vanish mid-scan. + +``public.banks_needing_consolidation()`` and +``public.schemas_with_expired_rows(...)`` snapshot the set of schemas owning a +target table from ``pg_class`` and then run a dynamic query against each schema +in turn. That is a time-of-check/time-of-use race: a schema (or its tables) can +be dropped — a tenant being deleted, or a tenant migration that recreates +tables — between the snapshot and the per-schema query, which then aborts the +whole routine with:: + + relation ".memory_units" does not exist + relation ".audit_log" does not exist + +In the test suite this surfaces as cross-worker contamination: the multi-tenant +maintenance test creates and drops ~100 ``mt_NNN`` schemas while +``test_maintenance_routines`` (on another xdist worker, same DB) calls the +routines. In production the background maintenance loop hits the same race when +a tenant is removed or mid-migration. + +Wrap each per-schema query in its own ``BEGIN ... EXCEPTION`` block so a schema +that disappears (``undefined_table`` / ``invalid_schema_name`` / +``undefined_column``) is skipped instead of aborting the scan. The routines stay +``CREATE OR REPLACE`` and PostgreSQL-only, and are (re)installed only on the run +that targets the shared ``public`` schema — same gating as the original +install (``e5f6a7b8c9d0``) and its repair (``b2d4f6a8c1e3``). + +Revision ID: c7e9f1a3b5d2 +Revises: e1f2a3b4c5d6 +Create Date: 2026-06-19 +""" + +from collections.abc import Sequence + +from alembic import context, op + +from hindsight_api.alembic._dialect import run_for_dialect + +revision: str = "c7e9f1a3b5d2" +down_revision: str | Sequence[str] | None = "e1f2a3b4c5d6" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def _should_install_public_routines(target_schema: str | None) -> bool: + """True for the run that must (re)create the shared ``public.*`` routines. + + The routines physically live in ``public``, so they are installed exactly + once — on the base run (no ``target_schema``) or the run that explicitly + targets ``public``. Mirrors ``b2d4f6a8c1e3``. + """ + return not target_schema or target_schema == "public" + + +def _pg_upgrade() -> None: + if not _should_install_public_routines(context.config.get_main_option("target_schema")): + return + + # Same body as b2d4f6a8c1e3, but each per-schema query runs in its own + # subtransaction so a schema dropped mid-scan is skipped, not fatal. + op.execute( + """ + CREATE OR REPLACE FUNCTION public.banks_needing_consolidation() + RETURNS TABLE(schema_name text, bank_id text) + LANGUAGE plpgsql STABLE + AS $fn$ + DECLARE + sch text; + BEGIN + FOR sch IN + SELECT n.nspname + FROM pg_class c + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE c.relname = 'memory_units' AND c.relkind = 'r' + LOOP + BEGIN + RETURN QUERY EXECUTE format($q$ + SELECT %1$L::text, m.bank_id + FROM %1$I.memory_units m + JOIN %1$I.banks b ON b.bank_id = m.bank_id + WHERE m.consolidated_at IS NULL + AND m.consolidation_failed_at IS NULL + AND m.fact_type IN ('experience', 'world') + AND COALESCE(b.config -> 'enable_auto_consolidation', 'true'::jsonb) <> 'false'::jsonb + AND NOT EXISTS ( + SELECT 1 FROM %1$I.async_operations o + WHERE o.bank_id = m.bank_id + AND o.operation_type = 'consolidation' + AND o.status IN ('pending', 'processing') + ) + GROUP BY m.bank_id + $q$, sch); + EXCEPTION + -- Schema or its tables vanished between the pg_class + -- snapshot and this query (tenant dropped or migrating). + WHEN undefined_table OR invalid_schema_name OR undefined_column THEN + CONTINUE; + END; + END LOOP; + END; + $fn$; + """ + ) + + op.execute( + """ + CREATE OR REPLACE FUNCTION public.schemas_with_expired_rows( + p_table text, p_ts_col text, p_days int + ) + RETURNS SETOF text + LANGUAGE plpgsql STABLE + AS $fn$ + DECLARE + sch text; + has_expired boolean; + BEGIN + IF p_days IS NULL OR p_days <= 0 THEN + RETURN; + END IF; + FOR sch IN + SELECT n.nspname + FROM pg_class c + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE c.relname = p_table AND c.relkind = 'r' + LOOP + BEGIN + EXECUTE format( + 'SELECT EXISTS (SELECT 1 FROM %I.%I WHERE %I < NOW() - make_interval(days => $1))', + sch, p_table, p_ts_col + ) INTO has_expired USING p_days; + EXCEPTION + -- Schema or its table vanished mid-scan; skip it. + WHEN undefined_table OR invalid_schema_name OR undefined_column THEN + CONTINUE; + END; + IF has_expired THEN + RETURN NEXT sch; + END IF; + END LOOP; + END; + $fn$; + """ + ) + + +def _pg_downgrade() -> None: + # No-op: e5f6a7b8c9d0 owns these functions' lifecycle and drops them on its + # own downgrade. This migration only re-installs them (the resilient body is + # a strict superset of the previous behaviour), so there is nothing to undo + # without racing that migration's DROP. + pass + + +def upgrade() -> None: + run_for_dialect(pg=_pg_upgrade) + + +def downgrade() -> None: + run_for_dialect(pg=_pg_downgrade) diff --git a/hindsight-api-slim/tests/test_link_utils.py b/hindsight-api-slim/tests/test_link_utils.py index 3668087af..0b43b1d3e 100644 --- a/hindsight-api-slim/tests/test_link_utils.py +++ b/hindsight-api-slim/tests/test_link_utils.py @@ -5,6 +5,7 @@ from datetime import datetime, timezone, timedelta from unittest.mock import AsyncMock, MagicMock +from hindsight_api.config import clear_config_cache from hindsight_api.engine.retain.link_utils import ( _normalize_datetime, _cap_links_per_unit, @@ -408,6 +409,19 @@ class TestComputeSemanticLinksAnnPgBouncerSafety: following the CREATE TEMP TABLE. """ + @pytest.fixture(autouse=True) + def _reset_config_cache(self): + # Tests below monkeypatch HINDSIGHT_API_VECTOR_EXTENSION. The ANN code + # path reads it through the process-global config cache, and monkeypatch + # reverts only the env var — not the cache. Left uncleared, a leaked + # "vchord" makes every later bank-creating test on the same xdist worker + # emit `USING vchordrq` against the pgvector-only test DB and fail with + # `access method "vchordrq" does not exist`. Clear before and after so + # the cache is rebuilt from the current env for each test. + clear_config_cache() + yield + clear_config_cache() + @pytest.fixture def mock_conn(self): """An asyncpg-like connection mock with an async `transaction()` diff --git a/hindsight-api-slim/tests/test_maintenance_multitenant.py b/hindsight-api-slim/tests/test_maintenance_multitenant.py index 84d3ff76a..78931e4a0 100644 --- a/hindsight-api-slim/tests/test_maintenance_multitenant.py +++ b/hindsight-api-slim/tests/test_maintenance_multitenant.py @@ -39,11 +39,20 @@ async def hundred_tenant_schemas(memory: MemoryEngine): """Create N_TENANTS isolated schemas cloning the loop's tables; drop them after.""" prefix = f"mt{uuid.uuid4().hex[:8]}" schemas = [f"{prefix}_{i:03d}" for i in range(N_TENANTS)] + # Create all schemas + their tables in ONE transaction so the schemas become + # visible to other connections only once fully built. Without this, each DDL + # autocommits, leaving a window where a schema exists with only some of its + # tables. The global maintenance routines (schemas_with_expired_rows / + # banks_needing_consolidation) discover schemas by table presence and are run + # concurrently by test_maintenance_routines on another xdist worker against + # the shared test DB; they would query a not-yet-created table in a half-built + # schema and fail with `relation "." does not exist`. async with memory._pool.acquire() as conn: - for s in schemas: - await conn.execute(f'CREATE SCHEMA "{s}"') - for table in _CLONED_TABLES: - await conn.execute(f'CREATE TABLE "{s}".{table} (LIKE public.{table} INCLUDING DEFAULTS)') + async with conn.transaction(): + for s in schemas: + await conn.execute(f'CREATE SCHEMA "{s}"') + for table in _CLONED_TABLES: + await conn.execute(f'CREATE TABLE "{s}".{table} (LIKE public.{table} INCLUDING DEFAULTS)') try: yield prefix, schemas finally: diff --git a/hindsight-api-slim/tests/test_maintenance_routines.py b/hindsight-api-slim/tests/test_maintenance_routines.py index dbd0bc997..920a73c82 100644 --- a/hindsight-api-slim/tests/test_maintenance_routines.py +++ b/hindsight-api-slim/tests/test_maintenance_routines.py @@ -134,6 +134,33 @@ async def test_banks_needing_consolidation_includes_in_flight_after_completion(m assert bank in {r["bank_id"] for r in rows} +@pytest.mark.asyncio +async def test_banks_needing_consolidation_skips_schema_with_vanished_table(memory: MemoryEngine): + """A schema discovered via its ``memory_units`` table but missing the + ``banks`` table the routine joins must be skipped, not abort the scan. + + This reproduces the time-of-check/time-of-use race deterministically: the + routine snapshots schemas owning ``memory_units`` from ``pg_class`` and then + joins each schema's ``banks`` table. A tenant being dropped or migrated (and, + in the test suite, the concurrent multi-tenant maintenance test) can leave a + schema whose ``banks`` table is gone. Before the fix the dynamic query raised + ``undefined_table`` and aborted the whole routine (migration c7e9f1a3b5d2).""" + schema = f"mtvanish{uuid.uuid4().hex[:8]}" + try: + async with memory._pool.acquire() as conn: + await conn.execute(f'CREATE SCHEMA "{schema}"') + # Discovered by the FOR loop (has memory_units) but the JOIN target + # `banks` is absent — exactly a half-built / vanishing schema. + await conn.execute(f'CREATE TABLE "{schema}".memory_units (LIKE public.memory_units INCLUDING DEFAULTS)') + + # Must not raise; the bad schema is simply skipped. + rows = await conn.fetch("SELECT schema_name, bank_id FROM public.banks_needing_consolidation()") + assert schema not in {r["schema_name"] for r in rows} + finally: + async with memory._pool.acquire() as conn: + await conn.execute(f'DROP SCHEMA IF EXISTS "{schema}" CASCADE') + + @pytest.mark.asyncio async def test_schemas_with_expired_rows(memory: MemoryEngine): """Returns schemas holding a row older than p_days; respects the p_days<=0 guard.""" diff --git a/hindsight-api-slim/tests/test_memory_defense.py b/hindsight-api-slim/tests/test_memory_defense.py index f5e8b4cc4..7d639845b 100644 --- a/hindsight-api-slim/tests/test_memory_defense.py +++ b/hindsight-api-slim/tests/test_memory_defense.py @@ -334,6 +334,9 @@ def _make_minimal_engine(): mock_embeddings = MagicMock() mock_embeddings.dimension = 384 + from hindsight_api.config import clear_config_cache + from hindsight_api.engine.memory_engine import MemoryEngine + with patch.dict( os.environ, { @@ -343,11 +346,17 @@ def _make_minimal_engine(): }, clear=False, ): - from hindsight_api.config import clear_config_cache - from hindsight_api.engine.memory_engine import MemoryEngine - clear_config_cache() - return MemoryEngine(db_url="postgresql://localhost/hindsight_test", embeddings=mock_embeddings) + engine = MemoryEngine(db_url="postgresql://localhost/hindsight_test", embeddings=mock_embeddings) + + # Constructing the engine above repopulated the process-global config cache + # from the patched env (provider="none" forces retain_extraction_mode="chunks"). + # Now that the patched env is gone, drop that cache so the leaked "none"/chunks + # config does not bleed into other tests on this xdist worker — their retains + # would silently skip entity extraction (0 unit_entities) and fail unrelated + # assertions. The next get_config() rebuilds from the real env. + clear_config_cache() + return engine def test_engine_memory_defense_shares_ext_ctx() -> None: