From 2c8cc59a80a7d765a16288162273f64343caae08 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Fri, 19 Jun 2026 12:41:34 +0200 Subject: [PATCH 1/4] fix(tests): reset config cache after vchord vector-extension tests to stop cross-test contamination MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The ANN tests in test_link_utils.py monkeypatch HINDSIGHT_API_VECTOR_EXTENSION (e.g. to "vchord"). That env var is read through the process-global config cache (get_config()), and monkeypatch reverts only the env var on teardown — not the cache. Once get_config() caches "vchord", it persists for the rest of the xdist worker. Every subsequent bank-creating test on that worker then builds per-bank vector indexes with `USING vchordrq` against the pgvector-only test DB and fails with: asyncpg.exceptions.UndefinedObjectError: access method "vchordrq" does not exist cascading across dozens of unrelated tests in the test-api shard (test_list_documents, test_maintenance_routines, test_mental_models, test_observations, ...). Because the leak depends on which worker first populates the cache, the failure looked like a flaky, shard-specific infra problem. Fix: add an autouse fixture to the class that clears the config cache before and after each test, so the cache is rebuilt from the current env per test and "vchord" can't leak out. --- hindsight-api-slim/tests/test_link_utils.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/hindsight-api-slim/tests/test_link_utils.py b/hindsight-api-slim/tests/test_link_utils.py index 3668087af..0b43b1d3e 100644 --- a/hindsight-api-slim/tests/test_link_utils.py +++ b/hindsight-api-slim/tests/test_link_utils.py @@ -5,6 +5,7 @@ from datetime import datetime, timezone, timedelta from unittest.mock import AsyncMock, MagicMock +from hindsight_api.config import clear_config_cache from hindsight_api.engine.retain.link_utils import ( _normalize_datetime, _cap_links_per_unit, @@ -408,6 +409,19 @@ class TestComputeSemanticLinksAnnPgBouncerSafety: following the CREATE TEMP TABLE. """ + @pytest.fixture(autouse=True) + def _reset_config_cache(self): + # Tests below monkeypatch HINDSIGHT_API_VECTOR_EXTENSION. The ANN code + # path reads it through the process-global config cache, and monkeypatch + # reverts only the env var — not the cache. Left uncleared, a leaked + # "vchord" makes every later bank-creating test on the same xdist worker + # emit `USING vchordrq` against the pgvector-only test DB and fail with + # `access method "vchordrq" does not exist`. Clear before and after so + # the cache is rebuilt from the current env for each test. + clear_config_cache() + yield + clear_config_cache() + @pytest.fixture def mock_conn(self): """An asyncpg-like connection mock with an async `transaction()` From 717288084009823a1593aeb7a94d096a5c5f9479 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Fri, 19 Jun 2026 13:01:56 +0200 Subject: [PATCH 2/4] fix(tests): create multi-tenant maintenance schemas atomically MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit test_maintenance_multitenant provisions 100 tenant schemas by running CREATE SCHEMA + 5×CREATE TABLE per schema. Each statement autocommitted, so there was a window where a schema existed with only some of its tables. The global maintenance routines (public.schemas_with_expired_rows / banks_needing_consolidation) discover schemas by table presence and are exercised concurrently by test_maintenance_routines on another xdist worker against the shared test DB. They would query a not-yet-created table in a half-built schema and fail with: asyncpg.exceptions.UndefinedTableError: relation "mt_NNN.memory_units" does not exist Wrap the whole provisioning in a single transaction so the schemas become visible to other connections only once fully built. --- .../tests/test_maintenance_multitenant.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/hindsight-api-slim/tests/test_maintenance_multitenant.py b/hindsight-api-slim/tests/test_maintenance_multitenant.py index 84d3ff76a..78931e4a0 100644 --- a/hindsight-api-slim/tests/test_maintenance_multitenant.py +++ b/hindsight-api-slim/tests/test_maintenance_multitenant.py @@ -39,11 +39,20 @@ async def hundred_tenant_schemas(memory: MemoryEngine): """Create N_TENANTS isolated schemas cloning the loop's tables; drop them after.""" prefix = f"mt{uuid.uuid4().hex[:8]}" schemas = [f"{prefix}_{i:03d}" for i in range(N_TENANTS)] + # Create all schemas + their tables in ONE transaction so the schemas become + # visible to other connections only once fully built. Without this, each DDL + # autocommits, leaving a window where a schema exists with only some of its + # tables. The global maintenance routines (schemas_with_expired_rows / + # banks_needing_consolidation) discover schemas by table presence and are run + # concurrently by test_maintenance_routines on another xdist worker against + # the shared test DB; they would query a not-yet-created table in a half-built + # schema and fail with `relation "." does not exist`. async with memory._pool.acquire() as conn: - for s in schemas: - await conn.execute(f'CREATE SCHEMA "{s}"') - for table in _CLONED_TABLES: - await conn.execute(f'CREATE TABLE "{s}".{table} (LIKE public.{table} INCLUDING DEFAULTS)') + async with conn.transaction(): + for s in schemas: + await conn.execute(f'CREATE SCHEMA "{s}"') + for table in _CLONED_TABLES: + await conn.execute(f'CREATE TABLE "{s}".{table} (LIKE public.{table} INCLUDING DEFAULTS)') try: yield prefix, schemas finally: From 03ae905aa9ada926c88ceaa3e0c1e2003f83e3ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Fri, 19 Jun 2026 13:21:45 +0200 Subject: [PATCH 3/4] fix(maintenance): skip schemas that vanish mid-scan in maintenance routines MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit public.banks_needing_consolidation() and public.schemas_with_expired_rows() snapshot the schemas owning a target table from pg_class, then run a dynamic query against each schema in turn. That is a TOCTOU race: a schema (or its tables) can be dropped between the snapshot and the per-schema query — a tenant being deleted, a tenant migration recreating tables, or (in the test suite) the multi-tenant maintenance test creating/dropping ~100 schemas concurrently with test_maintenance_routines on the shared DB. The query then aborts the whole routine with: relation ".memory_units" does not exist relation ".audit_log" does not exist Forward migration c7e9f1a3b5d2 redefines both routines (CREATE OR REPLACE, public/base-run gated, PG-only) so each per-schema query runs in its own subtransaction that skips the schema on undefined_table / invalid_schema_name / undefined_column instead of failing the scan. Adds a deterministic regression test (schema with memory_units but no banks table) for the skip path. --- ...ntenance_routines_skip_vanished_schemas.py | 158 ++++++++++++++++++ .../tests/test_maintenance_routines.py | 27 +++ 2 files changed, 185 insertions(+) create mode 100644 hindsight-api-slim/hindsight_api/alembic/versions/c7e9f1a3b5d2_maintenance_routines_skip_vanished_schemas.py diff --git a/hindsight-api-slim/hindsight_api/alembic/versions/c7e9f1a3b5d2_maintenance_routines_skip_vanished_schemas.py b/hindsight-api-slim/hindsight_api/alembic/versions/c7e9f1a3b5d2_maintenance_routines_skip_vanished_schemas.py new file mode 100644 index 000000000..c53e28844 --- /dev/null +++ b/hindsight-api-slim/hindsight_api/alembic/versions/c7e9f1a3b5d2_maintenance_routines_skip_vanished_schemas.py @@ -0,0 +1,158 @@ +"""Make maintenance routines resilient to schemas that vanish mid-scan. + +``public.banks_needing_consolidation()`` and +``public.schemas_with_expired_rows(...)`` snapshot the set of schemas owning a +target table from ``pg_class`` and then run a dynamic query against each schema +in turn. That is a time-of-check/time-of-use race: a schema (or its tables) can +be dropped — a tenant being deleted, or a tenant migration that recreates +tables — between the snapshot and the per-schema query, which then aborts the +whole routine with:: + + relation ".memory_units" does not exist + relation ".audit_log" does not exist + +In the test suite this surfaces as cross-worker contamination: the multi-tenant +maintenance test creates and drops ~100 ``mt_NNN`` schemas while +``test_maintenance_routines`` (on another xdist worker, same DB) calls the +routines. In production the background maintenance loop hits the same race when +a tenant is removed or mid-migration. + +Wrap each per-schema query in its own ``BEGIN ... EXCEPTION`` block so a schema +that disappears (``undefined_table`` / ``invalid_schema_name`` / +``undefined_column``) is skipped instead of aborting the scan. The routines stay +``CREATE OR REPLACE`` and PostgreSQL-only, and are (re)installed only on the run +that targets the shared ``public`` schema — same gating as the original +install (``e5f6a7b8c9d0``) and its repair (``b2d4f6a8c1e3``). + +Revision ID: c7e9f1a3b5d2 +Revises: e1f2a3b4c5d6 +Create Date: 2026-06-19 +""" + +from collections.abc import Sequence + +from alembic import context, op + +from hindsight_api.alembic._dialect import run_for_dialect + +revision: str = "c7e9f1a3b5d2" +down_revision: str | Sequence[str] | None = "e1f2a3b4c5d6" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def _should_install_public_routines(target_schema: str | None) -> bool: + """True for the run that must (re)create the shared ``public.*`` routines. + + The routines physically live in ``public``, so they are installed exactly + once — on the base run (no ``target_schema``) or the run that explicitly + targets ``public``. Mirrors ``b2d4f6a8c1e3``. + """ + return not target_schema or target_schema == "public" + + +def _pg_upgrade() -> None: + if not _should_install_public_routines(context.config.get_main_option("target_schema")): + return + + # Same body as b2d4f6a8c1e3, but each per-schema query runs in its own + # subtransaction so a schema dropped mid-scan is skipped, not fatal. + op.execute( + """ + CREATE OR REPLACE FUNCTION public.banks_needing_consolidation() + RETURNS TABLE(schema_name text, bank_id text) + LANGUAGE plpgsql STABLE + AS $fn$ + DECLARE + sch text; + BEGIN + FOR sch IN + SELECT n.nspname + FROM pg_class c + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE c.relname = 'memory_units' AND c.relkind = 'r' + LOOP + BEGIN + RETURN QUERY EXECUTE format($q$ + SELECT %1$L::text, m.bank_id + FROM %1$I.memory_units m + JOIN %1$I.banks b ON b.bank_id = m.bank_id + WHERE m.consolidated_at IS NULL + AND m.consolidation_failed_at IS NULL + AND m.fact_type IN ('experience', 'world') + AND COALESCE(b.config -> 'enable_auto_consolidation', 'true'::jsonb) <> 'false'::jsonb + AND NOT EXISTS ( + SELECT 1 FROM %1$I.async_operations o + WHERE o.bank_id = m.bank_id + AND o.operation_type = 'consolidation' + AND o.status IN ('pending', 'processing') + ) + GROUP BY m.bank_id + $q$, sch); + EXCEPTION + -- Schema or its tables vanished between the pg_class + -- snapshot and this query (tenant dropped or migrating). + WHEN undefined_table OR invalid_schema_name OR undefined_column THEN + CONTINUE; + END; + END LOOP; + END; + $fn$; + """ + ) + + op.execute( + """ + CREATE OR REPLACE FUNCTION public.schemas_with_expired_rows( + p_table text, p_ts_col text, p_days int + ) + RETURNS SETOF text + LANGUAGE plpgsql STABLE + AS $fn$ + DECLARE + sch text; + has_expired boolean; + BEGIN + IF p_days IS NULL OR p_days <= 0 THEN + RETURN; + END IF; + FOR sch IN + SELECT n.nspname + FROM pg_class c + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE c.relname = p_table AND c.relkind = 'r' + LOOP + BEGIN + EXECUTE format( + 'SELECT EXISTS (SELECT 1 FROM %I.%I WHERE %I < NOW() - make_interval(days => $1))', + sch, p_table, p_ts_col + ) INTO has_expired USING p_days; + EXCEPTION + -- Schema or its table vanished mid-scan; skip it. + WHEN undefined_table OR invalid_schema_name OR undefined_column THEN + CONTINUE; + END; + IF has_expired THEN + RETURN NEXT sch; + END IF; + END LOOP; + END; + $fn$; + """ + ) + + +def _pg_downgrade() -> None: + # No-op: e5f6a7b8c9d0 owns these functions' lifecycle and drops them on its + # own downgrade. This migration only re-installs them (the resilient body is + # a strict superset of the previous behaviour), so there is nothing to undo + # without racing that migration's DROP. + pass + + +def upgrade() -> None: + run_for_dialect(pg=_pg_upgrade) + + +def downgrade() -> None: + run_for_dialect(pg=_pg_downgrade) diff --git a/hindsight-api-slim/tests/test_maintenance_routines.py b/hindsight-api-slim/tests/test_maintenance_routines.py index dbd0bc997..920a73c82 100644 --- a/hindsight-api-slim/tests/test_maintenance_routines.py +++ b/hindsight-api-slim/tests/test_maintenance_routines.py @@ -134,6 +134,33 @@ async def test_banks_needing_consolidation_includes_in_flight_after_completion(m assert bank in {r["bank_id"] for r in rows} +@pytest.mark.asyncio +async def test_banks_needing_consolidation_skips_schema_with_vanished_table(memory: MemoryEngine): + """A schema discovered via its ``memory_units`` table but missing the + ``banks`` table the routine joins must be skipped, not abort the scan. + + This reproduces the time-of-check/time-of-use race deterministically: the + routine snapshots schemas owning ``memory_units`` from ``pg_class`` and then + joins each schema's ``banks`` table. A tenant being dropped or migrated (and, + in the test suite, the concurrent multi-tenant maintenance test) can leave a + schema whose ``banks`` table is gone. Before the fix the dynamic query raised + ``undefined_table`` and aborted the whole routine (migration c7e9f1a3b5d2).""" + schema = f"mtvanish{uuid.uuid4().hex[:8]}" + try: + async with memory._pool.acquire() as conn: + await conn.execute(f'CREATE SCHEMA "{schema}"') + # Discovered by the FOR loop (has memory_units) but the JOIN target + # `banks` is absent — exactly a half-built / vanishing schema. + await conn.execute(f'CREATE TABLE "{schema}".memory_units (LIKE public.memory_units INCLUDING DEFAULTS)') + + # Must not raise; the bad schema is simply skipped. + rows = await conn.fetch("SELECT schema_name, bank_id FROM public.banks_needing_consolidation()") + assert schema not in {r["schema_name"] for r in rows} + finally: + async with memory._pool.acquire() as conn: + await conn.execute(f'DROP SCHEMA IF EXISTS "{schema}" CASCADE') + + @pytest.mark.asyncio async def test_schemas_with_expired_rows(memory: MemoryEngine): """Returns schemas holding a row older than p_days; respects the p_days<=0 guard.""" From 6c3b9992df44b16f1dacc8260128c6b1e57c9920 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Fri, 19 Jun 2026 15:19:41 +0200 Subject: [PATCH 4/4] fix(tests): clear config cache after none-provider engine build to stop chunks-mode leak MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit test_memory_defense._make_minimal_engine() builds a MemoryEngine inside a patch.dict that sets HINDSIGHT_API_LLM_PROVIDER=none. Constructing the engine calls get_config(), repopulating the process-global config cache from the patched env — and provider="none" forces retain_extraction_mode="chunks". When patch.dict restores the env, the cache still holds the "none"/chunks config. It then leaks to every later test on the same xdist worker: their retains run in chunks mode (raw text, NO entity extraction), so unrelated assertions fail — notably the test_observations entity tests ("John/Alice/Nexora entity should exist"), which presented as a flaky, shard-specific failure (whichever entity test landed on the poisoned worker). Drop the config cache after the patched env is restored so the next get_config() rebuilds from the real env. Reproduced deterministically: pytest test_memory_defense.py::test_engine_memory_defense_shares_ext_ctx \ test_observations.py::test_entity_extraction_on_retain # before: entity test FAILED (Insert unit_entities: 0 pairs) # after: passed --- hindsight-api-slim/tests/test_memory_defense.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/hindsight-api-slim/tests/test_memory_defense.py b/hindsight-api-slim/tests/test_memory_defense.py index f5e8b4cc4..7d639845b 100644 --- a/hindsight-api-slim/tests/test_memory_defense.py +++ b/hindsight-api-slim/tests/test_memory_defense.py @@ -334,6 +334,9 @@ def _make_minimal_engine(): mock_embeddings = MagicMock() mock_embeddings.dimension = 384 + from hindsight_api.config import clear_config_cache + from hindsight_api.engine.memory_engine import MemoryEngine + with patch.dict( os.environ, { @@ -343,11 +346,17 @@ def _make_minimal_engine(): }, clear=False, ): - from hindsight_api.config import clear_config_cache - from hindsight_api.engine.memory_engine import MemoryEngine - clear_config_cache() - return MemoryEngine(db_url="postgresql://localhost/hindsight_test", embeddings=mock_embeddings) + engine = MemoryEngine(db_url="postgresql://localhost/hindsight_test", embeddings=mock_embeddings) + + # Constructing the engine above repopulated the process-global config cache + # from the patched env (provider="none" forces retain_extraction_mode="chunks"). + # Now that the patched env is gone, drop that cache so the leaked "none"/chunks + # config does not bleed into other tests on this xdist worker — their retains + # would silently skip entity extraction (0 unit_entities) and fail unrelated + # assertions. The next get_config() rebuilds from the real env. + clear_config_cache() + return engine def test_engine_memory_defense_shares_ext_ctx() -> None: