Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion hindsight-api-slim/hindsight_api/admin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ async def _run_migration(
schema: str | None = None,
base_schema: str = DEFAULT_DATABASE_SCHEMA,
embedding_dimension: int | None = None,
ensure_extensions: bool = True,
) -> list[str]:
"""Resolve database URL and run migrations for one schema or all discovered schemas."""
from ..migrations import run_migrations_for_schemas
Expand Down Expand Up @@ -292,7 +293,7 @@ async def _run_migration(
vector_extension=config.vector_extension,
text_search_extension=config.text_search_extension,
pg_search_tokenizer=config.text_search_extension_pg_search_tokenizer,
ensure_extensions=True,
ensure_extensions=ensure_extensions,
)

return schemas
Expand All @@ -311,6 +312,18 @@ def run_db_migration(
"--embedding-dimension",
help="Expected embedding dimension to enforce after migrations. Omit to skip dimension sync.",
),
skip_extension_reconcile: bool = typer.Option(
False,
"--skip-extension-reconcile",
help=(
"Skip the post-migration vector / text-search index reconcile. This step only does "
"work when the configured backend (HINDSIGHT_API_VECTOR_EXTENSION / "
"HINDSIGHT_API_TEXT_SEARCH_EXTENSION) differs from a schema's existing indexes — a "
"rare, operator-driven change. Skipping it makes a no-change re-migration over many "
"tenant schemas much faster. Only use when you have NOT changed the backend; a "
"backend change still needs a normal run to reshape the indexes."
),
),
):
"""Run database migrations to the latest version."""
config = HindsightConfig.from_env()
Expand All @@ -324,13 +337,16 @@ def run_db_migration(
typer.echo(f"Running database migrations for schema: {schema}...")
else:
typer.echo("Running database migrations for base schema and all discovered tenant schemas...")
if skip_extension_reconcile:
typer.echo("Skipping post-migration extension reconcile (--skip-extension-reconcile).")

schemas = asyncio.run(
_run_migration(
config.database_url,
schema=schema,
base_schema=config.database_schema,
embedding_dimension=embedding_dimension,
ensure_extensions=not skip_extension_reconcile,
)
)

Expand Down
32 changes: 14 additions & 18 deletions hindsight-api-slim/hindsight_api/migrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -686,24 +686,20 @@ def ensure_vector_extension(

if not current_index_info:
if table_name == "memory_units" and uses_per_bank_vector_indexes(target_ext):
# Check whether per-bank partial vector indexes already cover this table
# (created by the bank_utils lifecycle — no global index needed in that case)
per_bank_index_count = conn.execute(
text("""
SELECT COUNT(*)
FROM pg_indexes
WHERE schemaname = :schema
AND tablename = :table_name
AND indexname LIKE 'idx_mu_emb_%'
"""),
{"schema": schema_name, "table_name": table_name},
).scalar()
if per_bank_index_count and per_bank_index_count > 0:
logger.debug(
f"No global embedding index on {table_name}, but {per_bank_index_count} "
f"per-bank partial vector indexes exist — skipping global index creation"
)
continue
# Per-bank backends never use a GLOBAL memory_units vector index.
# Every vector search is bank + fact_type scoped and served by the
# per-(bank, fact_type) partial indexes created at bank-creation time
# (bank_utils.create_bank_vector_indexes); the planner never picks a
# global index when bank_id is in the WHERE clause, which is exactly
# why migration d5e6f7a8b9c0 drops it for these backends. So don't
# create one here either — not even on an empty schema with no per-bank
# indexes yet (those are built when the first bank is created). Verified
# via EXPLAIN: the query uses idx_mu_emb_* whether or not the global
# index exists, so creating it is dead weight.
logger.debug(
f"Per-bank vector backend ({target_ext}); skipping global {index_name} creation on {table_name}"
)
continue
logger.warning(f"No embedding index found for {table_name}, will create it if safe")
mismatched_tables.append((table_name, index_name, None, row_count))
continue
Expand Down
33 changes: 33 additions & 0 deletions hindsight-api-slim/tests/test_admin_backup_restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -547,3 +547,36 @@ def fake_ensure_text_search_extension(
assert calls["run_migrations"] == [("resolved::postgresql://test", "tenant_demo")]
assert calls["ensure_vector_extension"] == [("resolved::postgresql://test", "pgvector", "tenant_demo")]
assert calls["ensure_text_search_extension"] == [("resolved::postgresql://test", "native", "", "tenant_demo")]


@pytest.mark.parametrize(
("ensure_extensions", "expected"),
[(True, True), (False, False)],
)
@pytest.mark.asyncio
async def test_run_migration_threads_ensure_extensions_flag(monkeypatch, ensure_extensions, expected):
"""The --skip-extension-reconcile flag (ensure_extensions=False) must reach run_migrations_for_schemas.

The post-migration vector/text-search reconcile only does work on a backend change, so operators
can skip it on a no-change re-migration over many tenant schemas. Verify the flag is threaded through
rather than silently dropped.
"""
monkeypatch.setenv("HINDSIGHT_API_DATABASE_URL", "postgresql://test")
captured: dict = {}

async def fake_resolve_database_url(db_url: str) -> str:
return f"resolved::{db_url}"

def fake_run_migrations_for_schemas(database_url, schemas, **kwargs):
captured["ensure_extensions"] = kwargs.get("ensure_extensions")

monkeypatch.setattr(admin_cli, "load_extension", lambda *args, **kwargs: None)
monkeypatch.setattr(admin_cli, "resolve_database_url", fake_resolve_database_url)

from hindsight_api import migrations as migrations_module

monkeypatch.setattr(migrations_module, "run_migrations_for_schemas", fake_run_migrations_for_schemas)

await admin_cli._run_migration("postgresql://test", schema="tenant_demo", ensure_extensions=ensure_extensions)

assert captured["ensure_extensions"] is expected
70 changes: 70 additions & 0 deletions hindsight-api-slim/tests/test_ensure_vector_no_global_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""ensure_vector_extension must not create the (unused) global memory_units index.

For per-bank backends (pgvector / pgvectorscale / vchord) every vector search is
bank + fact_type scoped and served by the per-(bank, fact_type) partial indexes
created at bank-creation time. The global `idx_memory_units_embedding` is never
chosen by the planner (migration d5e6f7a8b9c0 drops it for exactly this reason),
so the post-migration reconcile must not recreate it on a fresh schema.
"""

import asyncio

import pytest
from sqlalchemy import create_engine, text

from hindsight_api._vector_index import uses_per_bank_vector_indexes
from hindsight_api.config import HindsightConfig
from hindsight_api.migrations import ensure_vector_extension, run_migrations


@pytest.fixture(scope="module")
def vec_db_url():
"""A dedicated pg0 instance so the test owns its schema/index state."""
from hindsight_api.pg0 import EmbeddedPostgres

pg0 = EmbeddedPostgres(name="hindsight-vecidx-test", port=5570)
loop = asyncio.new_event_loop()
try:
return loop.run_until_complete(pg0.ensure_running())
finally:
loop.close()


def test_per_bank_backend_does_not_create_global_memory_units_index(vec_db_url):
config = HindsightConfig.from_env()
vec = config.vector_extension
if not uses_per_bank_vector_indexes(vec):
pytest.skip(f"backend {vec!r} uses a global vector index by design (no per-bank indexes)")

schema = "vecidx_fresh"

engine = create_engine(vec_db_url)
try:
with engine.connect() as conn:
conn.execute(text(f'DROP SCHEMA IF EXISTS "{schema}" CASCADE'))
conn.commit()
finally:
engine.dispose()

run_migrations(vec_db_url, schema=schema)
# Fresh, empty schema (no banks yet) → the reconcile must be a no-op for the
# global index, not recreate it.
ensure_vector_extension(vec_db_url, vector_extension=vec, schema=schema)

engine = create_engine(vec_db_url)
try:
with engine.connect() as conn:
global_index_count = conn.execute(
text(
"SELECT COUNT(*) FROM pg_indexes "
"WHERE schemaname = :schema AND tablename = 'memory_units' "
"AND indexname = 'idx_memory_units_embedding'"
),
{"schema": schema},
).scalar()
conn.execute(text(f'DROP SCHEMA IF EXISTS "{schema}" CASCADE'))
conn.commit()
finally:
engine.dispose()

assert global_index_count == 0