From 609a3e6da9c9f3b7c91709b1ff485114daee27e8 Mon Sep 17 00:00:00 2001 From: Beinan Wang Date: Sun, 7 Jun 2026 01:49:01 -0700 Subject: [PATCH 1/2] Add configurable scalar index on id column Support btree and zonemap index types on the id column via IdIndexType enum, configurable through both Rust and Python APIs. BTree indices are maintained by MemWAL during writes; ZoneMap indices are excluded from MemWAL maintenance (unsupported) and rebuilt after compaction. Co-Authored-By: Beinan Wang --- crates/lance-context-core/src/lib.rs | 2 +- crates/lance-context-core/src/store.rs | 201 ++++++++++++++++++++++++- python/python/lance_context/api.py | 6 +- python/src/lib.rs | 18 ++- python/tests/test_id_index.py | 84 +++++++++++ 5 files changed, 305 insertions(+), 6 deletions(-) create mode 100644 python/tests/test_id_index.py diff --git a/crates/lance-context-core/src/lib.rs b/crates/lance-context-core/src/lib.rs index 590ed5a..5d61827 100644 --- a/crates/lance-context-core/src/lib.rs +++ b/crates/lance-context-core/src/lib.rs @@ -7,7 +7,7 @@ mod store; pub use context::{Context, ContextEntry, Snapshot}; pub use record::{ContextRecord, SearchResult, StateMetadata}; -pub use store::{CompactionConfig, CompactionStats, ContextStore, ContextStoreOptions}; +pub use store::{CompactionConfig, CompactionStats, ContextStore, ContextStoreOptions, IdIndexType}; // Re-export CompactionMetrics from lance for Python bindings pub use lance::dataset::optimize::CompactionMetrics; diff --git a/crates/lance-context-core/src/store.rs b/crates/lance-context-core/src/store.rs index 0deb9c2..e16deab 100644 --- a/crates/lance-context-core/src/store.rs +++ b/crates/lance-context-core/src/store.rs @@ -21,6 +21,8 @@ use lance::dataset::{builder::DatasetBuilder, Dataset, WriteMode, WriteParams}; use lance::index::DatasetIndexExt; use lance::io::{ObjectStoreParams, StorageOptionsAccessor}; use lance::{Error as LanceError, Result as LanceResult}; +use lance_index::scalar::ScalarIndexParams; +use lance_index::IndexType; use lance_index::mem_wal::MEM_WAL_INDEX_NAME; use tokio::sync::Mutex; use tokio::task::JoinHandle; @@ -32,6 +34,7 @@ use crate::record::{ContextRecord, SearchResult, StateMetadata}; /// Embedding length used for the semantic index column. const DEFAULT_EMBEDDING_DIM: i32 = 1536; const DEFAULT_SEARCH_LIMIT: usize = 10; +const ID_INDEX_NAME: &str = "id_idx"; /// Configuration for background compaction. #[derive(Debug, Clone)] @@ -72,6 +75,18 @@ impl Default for CompactionConfig { } } +/// Type of scalar index on the `id` column. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum IdIndexType { + /// No index on the id column. + #[default] + None, + /// Zone-map index (min/max per fragment, lightweight). + ZoneMap, + /// B-tree index (point lookups, heavier). + BTree, +} + /// Statistics about compaction status and history. #[derive(Debug, Clone)] pub struct CompactionStats { @@ -106,6 +121,7 @@ pub struct ContextStore { compaction_state: Arc>, pub compaction_config: CompactionConfig, blob_columns: HashSet, + id_index_type: IdIndexType, } /// Additional configuration when opening a [`ContextStore`]. @@ -116,6 +132,8 @@ pub struct ContextStoreOptions { /// Column names that should use Lance V1 blob encoding. /// Valid values: `"text_payload"`, `"binary_payload"`. pub blob_columns: HashSet, + /// Type of scalar index to create on the `id` column. + pub id_index_type: IdIndexType, } impl ContextStoreOptions { @@ -164,8 +182,12 @@ impl ContextStore { })), compaction_config: options.compaction, blob_columns, + id_index_type: options.id_index_type, }; + // Ensure id index if configured + store.ensure_id_index().await?; + // Start background compaction if enabled store.start_background_compaction().await?; @@ -192,8 +214,12 @@ impl ContextStore { let has_mem_wal = indices.iter().any(|i| i.name == MEM_WAL_INDEX_NAME); if !has_mem_wal { - let maintained_indexes: Vec = - indices.iter().map(|i| i.name.clone()).collect(); + // ZoneMap indices are not supported by MemWAL; exclude them + let maintained_indexes: Vec = indices + .iter() + .filter(|i| !(self.id_index_type == IdIndexType::ZoneMap && i.name == ID_INDEX_NAME)) + .map(|i| i.name.clone()) + .collect(); self.dataset .initialize_mem_wal() .unsharded() @@ -348,6 +374,7 @@ impl ContextStore { state.last_compaction = Some(Utc::now()); state.total_compactions += 1; state.last_error = None; + drop(state); // Release lock before ensure_id_index info!( "Compaction completed in {:?}: removed {} fragments ({}files), added {} fragments ({} files)", @@ -361,6 +388,12 @@ impl ContextStore { // Reload dataset to see new version self.dataset = Dataset::open(self.dataset.uri()).await?; + // Ensure id index exists after compaction + // (handles first-time creation on previously empty dataset) + if let Err(e) = self.ensure_id_index().await { + warn!("Failed to ensure id index after compaction: {}", e); + } + Ok(metrics) } Err(e) => { @@ -408,6 +441,44 @@ impl ContextStore { }) } + /// Ensure the configured id index exists on the dataset. + async fn ensure_id_index(&mut self) -> LanceResult<()> { + if self.id_index_type == IdIndexType::None { + return Ok(()); + } + + let indices = self.dataset.load_indices().await?; + if indices.iter().any(|i| i.name == ID_INDEX_NAME) { + return Ok(()); + } + + self.create_id_index().await + } + + /// Create (or replace) the scalar index on the `id` column. + pub async fn create_id_index(&mut self) -> LanceResult<()> { + let index_type = match self.id_index_type { + IdIndexType::ZoneMap => IndexType::ZoneMap, + IdIndexType::BTree => IndexType::BTree, + IdIndexType::None => return Ok(()), + }; + + info!("Creating {:?} index on id column", index_type); + + let params = ScalarIndexParams::default(); + + self.dataset + .create_index_builder(&["id"], index_type, ¶ms) + .name(ID_INDEX_NAME.to_string()) + .replace(true) + .await?; + + // Reload dataset to pick up new index + self.dataset = Dataset::open(self.dataset.uri()).await?; + + Ok(()) + } + /// Start background compaction task if enabled. async fn start_background_compaction(&mut self) -> LanceResult<()> { if !self.compaction_config.enabled { @@ -1379,4 +1450,130 @@ mod tests { assert_eq!(results_binary[0].text_payload, record.text_payload); }); } + + #[test] + fn test_id_index_btree() { + let dir = TempDir::new().unwrap(); + let uri = dir.path().to_string_lossy().to_string(); + let runtime = tokio::runtime::Runtime::new().unwrap(); + + runtime.block_on(async { + let options = ContextStoreOptions { + id_index_type: IdIndexType::BTree, + ..Default::default() + }; + let mut store = ContextStore::open_with_options(&uri, options).await.unwrap(); + + // Index should be created eagerly on open + let indices = store.dataset.load_indices().await.unwrap(); + assert!( + indices.iter().any(|i| i.name == ID_INDEX_NAME), + "btree index should be created on open" + ); + + // Add data and verify it still works with the index + for i in 0..5 { + store + .add(&[text_record(&format!("btree-{i}"), i as f32)]) + .await + .unwrap(); + } + store.compact(None).await.unwrap(); + + // Index should still exist after compaction + let indices = store.dataset.load_indices().await.unwrap(); + assert!( + indices.iter().any(|i| i.name == ID_INDEX_NAME), + "btree index should persist after compaction" + ); + }); + } + + #[test] + fn test_id_index_zonemap() { + let dir = TempDir::new().unwrap(); + let uri = dir.path().to_string_lossy().to_string(); + let runtime = tokio::runtime::Runtime::new().unwrap(); + + runtime.block_on(async { + let options = ContextStoreOptions { + id_index_type: IdIndexType::ZoneMap, + ..Default::default() + }; + let mut store = ContextStore::open_with_options(&uri, options).await.unwrap(); + + // Index should be created eagerly on open + let indices = store.dataset.load_indices().await.unwrap(); + assert!( + indices.iter().any(|i| i.name == ID_INDEX_NAME), + "zonemap index should be created on open" + ); + + for i in 0..5 { + store + .add(&[text_record(&format!("zm-{i}"), i as f32)]) + .await + .unwrap(); + } + store.compact(None).await.unwrap(); + + let indices = store.dataset.load_indices().await.unwrap(); + assert!( + indices.iter().any(|i| i.name == ID_INDEX_NAME), + "zonemap index should persist after compaction" + ); + }); + } + + #[test] + fn test_id_index_none_by_default() { + let dir = TempDir::new().unwrap(); + let uri = dir.path().to_string_lossy().to_string(); + let runtime = tokio::runtime::Runtime::new().unwrap(); + + runtime.block_on(async { + let mut store = ContextStore::open(&uri).await.unwrap(); + + store + .add(&[text_record("no-idx-1", 0.0)]) + .await + .unwrap(); + store.compact(None).await.unwrap(); + + let indices = store.dataset.load_indices().await.unwrap(); + assert!( + !indices.iter().any(|i| i.name == ID_INDEX_NAME), + "no id index should be created when IdIndexType::None" + ); + }); + } + + #[test] + fn test_id_index_idempotent() { + let dir = TempDir::new().unwrap(); + let uri = dir.path().to_string_lossy().to_string(); + let runtime = tokio::runtime::Runtime::new().unwrap(); + + runtime.block_on(async { + let options = ContextStoreOptions { + id_index_type: IdIndexType::BTree, + ..Default::default() + }; + let mut store = ContextStore::open_with_options(&uri, options).await.unwrap(); + + for i in 0..5 { + store + .add(&[text_record(&format!("idem-{i}"), i as f32)]) + .await + .unwrap(); + } + + // Create index twice -- second call should be a no-op + store.create_id_index().await.unwrap(); + let v1 = store.version(); + store.ensure_id_index().await.unwrap(); + let v2 = store.version(); + assert_eq!(v1, v2, "ensure_id_index should not recreate existing index"); + }); + } } diff --git a/python/python/lance_context/api.py b/python/python/lance_context/api.py index 14c2c66..5e7161b 100644 --- a/python/python/lance_context/api.py +++ b/python/python/lance_context/api.py @@ -263,6 +263,7 @@ def __init__( compaction_min_fragments: int = 5, compaction_target_rows: int = 1_000_000, quiet_hours: list[tuple[int, int]] | None = None, + id_index_type: str | None = None, ) -> None: options = _merge_storage_options( storage_options, @@ -282,11 +283,12 @@ def __init__( "quiet_hours": quiet_hours or [], } - if options or compaction_config["enabled"]: + if options or compaction_config["enabled"] or id_index_type: self._inner = _Context.create( uri, storage_options=options or None, compaction_config=compaction_config, + id_index_type=id_index_type, ) else: self._inner = _Context.create(uri) @@ -308,6 +310,7 @@ def create( compaction_min_fragments: int = 5, compaction_target_rows: int = 1_000_000, quiet_hours: list[tuple[int, int]] | None = None, + id_index_type: str | None = None, ) -> Context: return cls( uri, @@ -323,6 +326,7 @@ def create( compaction_min_fragments=compaction_min_fragments, compaction_target_rows=compaction_target_rows, quiet_hours=quiet_hours, + id_index_type=id_index_type, ) def uri(self) -> str: diff --git a/python/src/lib.rs b/python/src/lib.rs index 460abe8..2e2f855 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -11,7 +11,7 @@ use tokio::runtime::Runtime; use lance_context::serde::CONTENT_TYPE_TEXT; use lance_context::{ CompactionConfig, CompactionMetrics, CompactionStats, Context as RustContext, ContextRecord, - ContextStore, ContextStoreOptions, SearchResult, + ContextStore, ContextStoreOptions, IdIndexType, SearchResult, }; const DEFAULT_BINARY_CONTENT_TYPE: &str = "application/octet-stream"; @@ -110,7 +110,7 @@ fn compaction_config_from_dict<'py>( #[pymethods] impl Context { #[classmethod] - #[pyo3(signature = (uri, *, storage_options=None, compaction_config=None, blob_columns=None))] + #[pyo3(signature = (uri, *, storage_options=None, compaction_config=None, blob_columns=None, id_index_type=None))] fn create( _cls: &Bound<'_, PyType>, py: Python<'_>, @@ -118,15 +118,29 @@ impl Context { storage_options: Option<&Bound<'_, PyDict>>, compaction_config: Option<&Bound<'_, PyDict>>, blob_columns: Option>, + id_index_type: Option, ) -> PyResult { let runtime = Arc::new(Runtime::new().map_err(to_py_err)?); let blob_set: HashSet = blob_columns.unwrap_or_default().into_iter().collect(); + let id_idx = match id_index_type.as_deref() { + Some("btree") => IdIndexType::BTree, + Some("zonemap") => IdIndexType::ZoneMap, + Some("none") | None => IdIndexType::None, + Some(other) => { + return Err(PyRuntimeError::new_err(format!( + "invalid id_index_type '{}': valid values are 'btree', 'zonemap'", + other + ))) + } + }; + let options = ContextStoreOptions { storage_options: storage_options_from_dict(storage_options)?, compaction: compaction_config_from_dict(compaction_config)?, blob_columns: blob_set, + id_index_type: id_idx, }; let store_res = diff --git a/python/tests/test_id_index.py b/python/tests/test_id_index.py new file mode 100644 index 0000000..b05f40b --- /dev/null +++ b/python/tests/test_id_index.py @@ -0,0 +1,84 @@ +"""Tests for configurable id column index.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest + +if TYPE_CHECKING: + from pathlib import Path + +from lance_context.api import Context + + +def test_btree_index_creation(tmp_path: Path) -> None: + """Verify btree index is created after adding data and compacting.""" + uri = str(tmp_path / "context.lance") + ctx = Context.create(uri, id_index_type="btree") + + for i in range(10): + ctx.add("user", f"entry-{i}") + + ctx.compact() + + # Data should be fully preserved + results = ctx.list() + assert len(results) == 10 + + +def test_zonemap_index_creation(tmp_path: Path) -> None: + """Verify zonemap index is created after adding data and compacting.""" + uri = str(tmp_path / "context.lance") + ctx = Context.create(uri, id_index_type="zonemap") + + for i in range(10): + ctx.add("user", f"entry-{i}") + + ctx.compact() + + results = ctx.list() + assert len(results) == 10 + + +def test_no_index_by_default(tmp_path: Path) -> None: + """Verify no id index is created by default.""" + uri = str(tmp_path / "context.lance") + ctx = Context.create(uri) + + for i in range(5): + ctx.add("user", f"entry-{i}") + + ctx.compact() + results = ctx.list() + assert len(results) == 5 + + +def test_invalid_index_type_rejected(tmp_path: Path) -> None: + """Verify invalid index type raises an error.""" + uri = str(tmp_path / "context.lance") + with pytest.raises(RuntimeError, match="invalid id_index_type"): + Context.create(uri, id_index_type="invalid_type") + + +def test_index_with_compaction_preserves_data(tmp_path: Path) -> None: + """Verify data integrity across multiple add-compact cycles with an index.""" + uri = str(tmp_path / "context.lance") + ctx = Context.create(uri, id_index_type="btree") + + # First batch + for i in range(10): + ctx.add("user", f"batch1-{i}") + ctx.compact() + + # Second batch + for i in range(10): + ctx.add("user", f"batch2-{i}") + ctx.compact() + + results = ctx.list() + assert len(results) == 20 + texts = {r["text"] for r in results} + for i in range(10): + assert f"batch1-{i}" in texts + assert f"batch2-{i}" in texts From db62341e3de92b2bfc3ca8ae314ae44ac7291ed1 Mon Sep 17 00:00:00 2001 From: Beinan Wang Date: Sun, 7 Jun 2026 01:53:25 -0700 Subject: [PATCH 2/2] style: apply cargo fmt Co-Authored-By: Beinan Wang --- crates/lance-context-core/src/lib.rs | 4 +++- crates/lance-context-core/src/store.rs | 23 ++++++++++++++--------- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/crates/lance-context-core/src/lib.rs b/crates/lance-context-core/src/lib.rs index 5d61827..08b1edf 100644 --- a/crates/lance-context-core/src/lib.rs +++ b/crates/lance-context-core/src/lib.rs @@ -7,7 +7,9 @@ mod store; pub use context::{Context, ContextEntry, Snapshot}; pub use record::{ContextRecord, SearchResult, StateMetadata}; -pub use store::{CompactionConfig, CompactionStats, ContextStore, ContextStoreOptions, IdIndexType}; +pub use store::{ + CompactionConfig, CompactionStats, ContextStore, ContextStoreOptions, IdIndexType, +}; // Re-export CompactionMetrics from lance for Python bindings pub use lance::dataset::optimize::CompactionMetrics; diff --git a/crates/lance-context-core/src/store.rs b/crates/lance-context-core/src/store.rs index e16deab..38b60da 100644 --- a/crates/lance-context-core/src/store.rs +++ b/crates/lance-context-core/src/store.rs @@ -21,9 +21,9 @@ use lance::dataset::{builder::DatasetBuilder, Dataset, WriteMode, WriteParams}; use lance::index::DatasetIndexExt; use lance::io::{ObjectStoreParams, StorageOptionsAccessor}; use lance::{Error as LanceError, Result as LanceResult}; +use lance_index::mem_wal::MEM_WAL_INDEX_NAME; use lance_index::scalar::ScalarIndexParams; use lance_index::IndexType; -use lance_index::mem_wal::MEM_WAL_INDEX_NAME; use tokio::sync::Mutex; use tokio::task::JoinHandle; use tracing::{error, info, warn}; @@ -217,7 +217,9 @@ impl ContextStore { // ZoneMap indices are not supported by MemWAL; exclude them let maintained_indexes: Vec = indices .iter() - .filter(|i| !(self.id_index_type == IdIndexType::ZoneMap && i.name == ID_INDEX_NAME)) + .filter(|i| { + !(self.id_index_type == IdIndexType::ZoneMap && i.name == ID_INDEX_NAME) + }) .map(|i| i.name.clone()) .collect(); self.dataset @@ -1462,7 +1464,9 @@ mod tests { id_index_type: IdIndexType::BTree, ..Default::default() }; - let mut store = ContextStore::open_with_options(&uri, options).await.unwrap(); + let mut store = ContextStore::open_with_options(&uri, options) + .await + .unwrap(); // Index should be created eagerly on open let indices = store.dataset.load_indices().await.unwrap(); @@ -1500,7 +1504,9 @@ mod tests { id_index_type: IdIndexType::ZoneMap, ..Default::default() }; - let mut store = ContextStore::open_with_options(&uri, options).await.unwrap(); + let mut store = ContextStore::open_with_options(&uri, options) + .await + .unwrap(); // Index should be created eagerly on open let indices = store.dataset.load_indices().await.unwrap(); @@ -1534,10 +1540,7 @@ mod tests { runtime.block_on(async { let mut store = ContextStore::open(&uri).await.unwrap(); - store - .add(&[text_record("no-idx-1", 0.0)]) - .await - .unwrap(); + store.add(&[text_record("no-idx-1", 0.0)]).await.unwrap(); store.compact(None).await.unwrap(); let indices = store.dataset.load_indices().await.unwrap(); @@ -1559,7 +1562,9 @@ mod tests { id_index_type: IdIndexType::BTree, ..Default::default() }; - let mut store = ContextStore::open_with_options(&uri, options).await.unwrap(); + let mut store = ContextStore::open_with_options(&uri, options) + .await + .unwrap(); for i in 0..5 { store