Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
"""Make maintenance routines resilient to schemas that vanish mid-scan.

``public.banks_needing_consolidation()`` and
``public.schemas_with_expired_rows(...)`` snapshot the set of schemas owning a
target table from ``pg_class`` and then run a dynamic query against each schema
in turn. That is a time-of-check/time-of-use race: a schema (or its tables) can
be dropped — a tenant being deleted, or a tenant migration that recreates
tables — between the snapshot and the per-schema query, which then aborts the
whole routine with::

relation "<schema>.memory_units" does not exist
relation "<schema>.audit_log" does not exist

In the test suite this surfaces as cross-worker contamination: the multi-tenant
maintenance test creates and drops ~100 ``mt<hash>_NNN`` schemas while
``test_maintenance_routines`` (on another xdist worker, same DB) calls the
routines. In production the background maintenance loop hits the same race when
a tenant is removed or mid-migration.

Wrap each per-schema query in its own ``BEGIN ... EXCEPTION`` block so a schema
that disappears (``undefined_table`` / ``invalid_schema_name`` /
``undefined_column``) is skipped instead of aborting the scan. The routines stay
``CREATE OR REPLACE`` and PostgreSQL-only, and are (re)installed only on the run
that targets the shared ``public`` schema — same gating as the original
install (``e5f6a7b8c9d0``) and its repair (``b2d4f6a8c1e3``).

Revision ID: c7e9f1a3b5d2
Revises: e1f2a3b4c5d6
Create Date: 2026-06-19
"""

from collections.abc import Sequence

from alembic import context, op

from hindsight_api.alembic._dialect import run_for_dialect

revision: str = "c7e9f1a3b5d2"
down_revision: str | Sequence[str] | None = "e1f2a3b4c5d6"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None


def _should_install_public_routines(target_schema: str | None) -> bool:
"""True for the run that must (re)create the shared ``public.*`` routines.

The routines physically live in ``public``, so they are installed exactly
once — on the base run (no ``target_schema``) or the run that explicitly
targets ``public``. Mirrors ``b2d4f6a8c1e3``.
"""
return not target_schema or target_schema == "public"


def _pg_upgrade() -> None:
if not _should_install_public_routines(context.config.get_main_option("target_schema")):
return

# Same body as b2d4f6a8c1e3, but each per-schema query runs in its own
# subtransaction so a schema dropped mid-scan is skipped, not fatal.
op.execute(
"""
CREATE OR REPLACE FUNCTION public.banks_needing_consolidation()
RETURNS TABLE(schema_name text, bank_id text)
LANGUAGE plpgsql STABLE
AS $fn$
DECLARE
sch text;
BEGIN
FOR sch IN
SELECT n.nspname
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE c.relname = 'memory_units' AND c.relkind = 'r'
LOOP
BEGIN
RETURN QUERY EXECUTE format($q$
SELECT %1$L::text, m.bank_id
FROM %1$I.memory_units m
JOIN %1$I.banks b ON b.bank_id = m.bank_id
WHERE m.consolidated_at IS NULL
AND m.consolidation_failed_at IS NULL
AND m.fact_type IN ('experience', 'world')
AND COALESCE(b.config -> 'enable_auto_consolidation', 'true'::jsonb) <> 'false'::jsonb
AND NOT EXISTS (
SELECT 1 FROM %1$I.async_operations o
WHERE o.bank_id = m.bank_id
AND o.operation_type = 'consolidation'
AND o.status IN ('pending', 'processing')
)
GROUP BY m.bank_id
$q$, sch);
EXCEPTION
-- Schema or its tables vanished between the pg_class
-- snapshot and this query (tenant dropped or migrating).
WHEN undefined_table OR invalid_schema_name OR undefined_column THEN
CONTINUE;
END;
END LOOP;
END;
$fn$;
"""
)

op.execute(
"""
CREATE OR REPLACE FUNCTION public.schemas_with_expired_rows(
p_table text, p_ts_col text, p_days int
)
RETURNS SETOF text
LANGUAGE plpgsql STABLE
AS $fn$
DECLARE
sch text;
has_expired boolean;
BEGIN
IF p_days IS NULL OR p_days <= 0 THEN
RETURN;
END IF;
FOR sch IN
SELECT n.nspname
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE c.relname = p_table AND c.relkind = 'r'
LOOP
BEGIN
EXECUTE format(
'SELECT EXISTS (SELECT 1 FROM %I.%I WHERE %I < NOW() - make_interval(days => $1))',
sch, p_table, p_ts_col
) INTO has_expired USING p_days;
EXCEPTION
-- Schema or its table vanished mid-scan; skip it.
WHEN undefined_table OR invalid_schema_name OR undefined_column THEN
CONTINUE;
END;
IF has_expired THEN
RETURN NEXT sch;
END IF;
END LOOP;
END;
$fn$;
"""
)


def _pg_downgrade() -> None:
# No-op: e5f6a7b8c9d0 owns these functions' lifecycle and drops them on its
# own downgrade. This migration only re-installs them (the resilient body is
# a strict superset of the previous behaviour), so there is nothing to undo
# without racing that migration's DROP.
pass


def upgrade() -> None:
run_for_dialect(pg=_pg_upgrade)


def downgrade() -> None:
run_for_dialect(pg=_pg_downgrade)
14 changes: 14 additions & 0 deletions hindsight-api-slim/tests/test_link_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from datetime import datetime, timezone, timedelta
from unittest.mock import AsyncMock, MagicMock

from hindsight_api.config import clear_config_cache
from hindsight_api.engine.retain.link_utils import (
_normalize_datetime,
_cap_links_per_unit,
Expand Down Expand Up @@ -408,6 +409,19 @@ class TestComputeSemanticLinksAnnPgBouncerSafety:
following the CREATE TEMP TABLE.
"""

@pytest.fixture(autouse=True)
def _reset_config_cache(self):
# Tests below monkeypatch HINDSIGHT_API_VECTOR_EXTENSION. The ANN code
# path reads it through the process-global config cache, and monkeypatch
# reverts only the env var — not the cache. Left uncleared, a leaked
# "vchord" makes every later bank-creating test on the same xdist worker
# emit `USING vchordrq` against the pgvector-only test DB and fail with
# `access method "vchordrq" does not exist`. Clear before and after so
# the cache is rebuilt from the current env for each test.
clear_config_cache()
yield
clear_config_cache()

@pytest.fixture
def mock_conn(self):
"""An asyncpg-like connection mock with an async `transaction()`
Expand Down
17 changes: 13 additions & 4 deletions hindsight-api-slim/tests/test_maintenance_multitenant.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,20 @@ async def hundred_tenant_schemas(memory: MemoryEngine):
"""Create N_TENANTS isolated schemas cloning the loop's tables; drop them after."""
prefix = f"mt{uuid.uuid4().hex[:8]}"
schemas = [f"{prefix}_{i:03d}" for i in range(N_TENANTS)]
# Create all schemas + their tables in ONE transaction so the schemas become
# visible to other connections only once fully built. Without this, each DDL
# autocommits, leaving a window where a schema exists with only some of its
# tables. The global maintenance routines (schemas_with_expired_rows /
# banks_needing_consolidation) discover schemas by table presence and are run
# concurrently by test_maintenance_routines on another xdist worker against
# the shared test DB; they would query a not-yet-created table in a half-built
# schema and fail with `relation "<schema>.<table>" does not exist`.
async with memory._pool.acquire() as conn:
for s in schemas:
await conn.execute(f'CREATE SCHEMA "{s}"')
for table in _CLONED_TABLES:
await conn.execute(f'CREATE TABLE "{s}".{table} (LIKE public.{table} INCLUDING DEFAULTS)')
async with conn.transaction():
for s in schemas:
await conn.execute(f'CREATE SCHEMA "{s}"')
for table in _CLONED_TABLES:
await conn.execute(f'CREATE TABLE "{s}".{table} (LIKE public.{table} INCLUDING DEFAULTS)')
try:
yield prefix, schemas
finally:
Expand Down
27 changes: 27 additions & 0 deletions hindsight-api-slim/tests/test_maintenance_routines.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,33 @@ async def test_banks_needing_consolidation_includes_in_flight_after_completion(m
assert bank in {r["bank_id"] for r in rows}


@pytest.mark.asyncio
async def test_banks_needing_consolidation_skips_schema_with_vanished_table(memory: MemoryEngine):
"""A schema discovered via its ``memory_units`` table but missing the
``banks`` table the routine joins must be skipped, not abort the scan.

This reproduces the time-of-check/time-of-use race deterministically: the
routine snapshots schemas owning ``memory_units`` from ``pg_class`` and then
joins each schema's ``banks`` table. A tenant being dropped or migrated (and,
in the test suite, the concurrent multi-tenant maintenance test) can leave a
schema whose ``banks`` table is gone. Before the fix the dynamic query raised
``undefined_table`` and aborted the whole routine (migration c7e9f1a3b5d2)."""
schema = f"mtvanish{uuid.uuid4().hex[:8]}"
try:
async with memory._pool.acquire() as conn:
await conn.execute(f'CREATE SCHEMA "{schema}"')
# Discovered by the FOR loop (has memory_units) but the JOIN target
# `banks` is absent — exactly a half-built / vanishing schema.
await conn.execute(f'CREATE TABLE "{schema}".memory_units (LIKE public.memory_units INCLUDING DEFAULTS)')

# Must not raise; the bad schema is simply skipped.
rows = await conn.fetch("SELECT schema_name, bank_id FROM public.banks_needing_consolidation()")
assert schema not in {r["schema_name"] for r in rows}
finally:
async with memory._pool.acquire() as conn:
await conn.execute(f'DROP SCHEMA IF EXISTS "{schema}" CASCADE')


@pytest.mark.asyncio
async def test_schemas_with_expired_rows(memory: MemoryEngine):
"""Returns schemas holding a row older than p_days; respects the p_days<=0 guard."""
Expand Down
17 changes: 13 additions & 4 deletions hindsight-api-slim/tests/test_memory_defense.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,9 @@ def _make_minimal_engine():
mock_embeddings = MagicMock()
mock_embeddings.dimension = 384

from hindsight_api.config import clear_config_cache
from hindsight_api.engine.memory_engine import MemoryEngine

with patch.dict(
os.environ,
{
Expand All @@ -343,11 +346,17 @@ def _make_minimal_engine():
},
clear=False,
):
from hindsight_api.config import clear_config_cache
from hindsight_api.engine.memory_engine import MemoryEngine

clear_config_cache()
return MemoryEngine(db_url="postgresql://localhost/hindsight_test", embeddings=mock_embeddings)
engine = MemoryEngine(db_url="postgresql://localhost/hindsight_test", embeddings=mock_embeddings)

# Constructing the engine above repopulated the process-global config cache
# from the patched env (provider="none" forces retain_extraction_mode="chunks").
# Now that the patched env is gone, drop that cache so the leaked "none"/chunks
# config does not bleed into other tests on this xdist worker — their retains
# would silently skip entity extraction (0 unit_entities) and fail unrelated
# assertions. The next get_config() rebuilds from the real env.
clear_config_cache()
return engine


def test_engine_memory_defense_shares_ext_ctx() -> None:
Expand Down