Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion crates/lance-context-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ mod store;

pub use context::{Context, ContextEntry, Snapshot};
pub use record::{ContextRecord, SearchResult, StateMetadata};
pub use store::{CompactionConfig, CompactionStats, ContextStore, ContextStoreOptions};
pub use store::{
CompactionConfig, CompactionStats, ContextStore, ContextStoreOptions, IdIndexType,
};

// Re-export CompactionMetrics from lance for Python bindings
pub use lance::dataset::optimize::CompactionMetrics;
206 changes: 204 additions & 2 deletions crates/lance-context-core/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ use lance::index::DatasetIndexExt;
use lance::io::{ObjectStoreParams, StorageOptionsAccessor};
use lance::{Error as LanceError, Result as LanceResult};
use lance_index::mem_wal::MEM_WAL_INDEX_NAME;
use lance_index::scalar::ScalarIndexParams;
use lance_index::IndexType;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tracing::{error, info, warn};
Expand All @@ -32,6 +34,7 @@ use crate::record::{ContextRecord, SearchResult, StateMetadata};
/// Embedding length used for the semantic index column.
const DEFAULT_EMBEDDING_DIM: i32 = 1536;
const DEFAULT_SEARCH_LIMIT: usize = 10;
const ID_INDEX_NAME: &str = "id_idx";

/// Configuration for background compaction.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -72,6 +75,18 @@ impl Default for CompactionConfig {
}
}

/// Type of scalar index on the `id` column.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum IdIndexType {
/// No index on the id column.
#[default]
None,
/// Zone-map index (min/max per fragment, lightweight).
ZoneMap,
/// B-tree index (point lookups, heavier).
BTree,
}

/// Statistics about compaction status and history.
#[derive(Debug, Clone)]
pub struct CompactionStats {
Expand Down Expand Up @@ -106,6 +121,7 @@ pub struct ContextStore {
compaction_state: Arc<Mutex<CompactionState>>,
pub compaction_config: CompactionConfig,
blob_columns: HashSet<String>,
id_index_type: IdIndexType,
}

/// Additional configuration when opening a [`ContextStore`].
Expand All @@ -116,6 +132,8 @@ pub struct ContextStoreOptions {
/// Column names that should use Lance V1 blob encoding.
/// Valid values: `"text_payload"`, `"binary_payload"`.
pub blob_columns: HashSet<String>,
/// Type of scalar index to create on the `id` column.
pub id_index_type: IdIndexType,
}

impl ContextStoreOptions {
Expand Down Expand Up @@ -164,8 +182,12 @@ impl ContextStore {
})),
compaction_config: options.compaction,
blob_columns,
id_index_type: options.id_index_type,
};

// Ensure id index if configured
store.ensure_id_index().await?;

// Start background compaction if enabled
store.start_background_compaction().await?;

Expand All @@ -192,8 +214,14 @@ impl ContextStore {
let has_mem_wal = indices.iter().any(|i| i.name == MEM_WAL_INDEX_NAME);

if !has_mem_wal {
let maintained_indexes: Vec<String> =
indices.iter().map(|i| i.name.clone()).collect();
// ZoneMap indices are not supported by MemWAL; exclude them
let maintained_indexes: Vec<String> = indices
.iter()
.filter(|i| {
!(self.id_index_type == IdIndexType::ZoneMap && i.name == ID_INDEX_NAME)
})
.map(|i| i.name.clone())
.collect();
self.dataset
.initialize_mem_wal()
.unsharded()
Expand Down Expand Up @@ -348,6 +376,7 @@ impl ContextStore {
state.last_compaction = Some(Utc::now());
state.total_compactions += 1;
state.last_error = None;
drop(state); // Release lock before ensure_id_index

info!(
"Compaction completed in {:?}: removed {} fragments ({}files), added {} fragments ({} files)",
Expand All @@ -361,6 +390,12 @@ impl ContextStore {
// Reload dataset to see new version
self.dataset = Dataset::open(self.dataset.uri()).await?;

// Ensure id index exists after compaction
// (handles first-time creation on previously empty dataset)
if let Err(e) = self.ensure_id_index().await {
warn!("Failed to ensure id index after compaction: {}", e);
}

Ok(metrics)
}
Err(e) => {
Expand Down Expand Up @@ -408,6 +443,44 @@ impl ContextStore {
})
}

/// Ensure the configured id index exists on the dataset.
async fn ensure_id_index(&mut self) -> LanceResult<()> {
if self.id_index_type == IdIndexType::None {
return Ok(());
}

let indices = self.dataset.load_indices().await?;
if indices.iter().any(|i| i.name == ID_INDEX_NAME) {
return Ok(());
}

self.create_id_index().await
}

/// Create (or replace) the scalar index on the `id` column.
pub async fn create_id_index(&mut self) -> LanceResult<()> {
let index_type = match self.id_index_type {
IdIndexType::ZoneMap => IndexType::ZoneMap,
IdIndexType::BTree => IndexType::BTree,
IdIndexType::None => return Ok(()),
};

info!("Creating {:?} index on id column", index_type);

let params = ScalarIndexParams::default();

self.dataset
.create_index_builder(&["id"], index_type, &params)
.name(ID_INDEX_NAME.to_string())
.replace(true)
.await?;

// Reload dataset to pick up new index
self.dataset = Dataset::open(self.dataset.uri()).await?;

Ok(())
}

/// Start background compaction task if enabled.
async fn start_background_compaction(&mut self) -> LanceResult<()> {
if !self.compaction_config.enabled {
Expand Down Expand Up @@ -1379,4 +1452,133 @@ mod tests {
assert_eq!(results_binary[0].text_payload, record.text_payload);
});
}

#[test]
fn test_id_index_btree() {
let dir = TempDir::new().unwrap();
let uri = dir.path().to_string_lossy().to_string();
let runtime = tokio::runtime::Runtime::new().unwrap();

runtime.block_on(async {
let options = ContextStoreOptions {
id_index_type: IdIndexType::BTree,
..Default::default()
};
let mut store = ContextStore::open_with_options(&uri, options)
.await
.unwrap();

// Index should be created eagerly on open
let indices = store.dataset.load_indices().await.unwrap();
assert!(
indices.iter().any(|i| i.name == ID_INDEX_NAME),
"btree index should be created on open"
);

// Add data and verify it still works with the index
for i in 0..5 {
store
.add(&[text_record(&format!("btree-{i}"), i as f32)])
.await
.unwrap();
}
store.compact(None).await.unwrap();

// Index should still exist after compaction
let indices = store.dataset.load_indices().await.unwrap();
assert!(
indices.iter().any(|i| i.name == ID_INDEX_NAME),
"btree index should persist after compaction"
);
});
}

#[test]
fn test_id_index_zonemap() {
let dir = TempDir::new().unwrap();
let uri = dir.path().to_string_lossy().to_string();
let runtime = tokio::runtime::Runtime::new().unwrap();

runtime.block_on(async {
let options = ContextStoreOptions {
id_index_type: IdIndexType::ZoneMap,
..Default::default()
};
let mut store = ContextStore::open_with_options(&uri, options)
.await
.unwrap();

// Index should be created eagerly on open
let indices = store.dataset.load_indices().await.unwrap();
assert!(
indices.iter().any(|i| i.name == ID_INDEX_NAME),
"zonemap index should be created on open"
);

for i in 0..5 {
store
.add(&[text_record(&format!("zm-{i}"), i as f32)])
.await
.unwrap();
}
store.compact(None).await.unwrap();

let indices = store.dataset.load_indices().await.unwrap();
assert!(
indices.iter().any(|i| i.name == ID_INDEX_NAME),
"zonemap index should persist after compaction"
);
});
}

#[test]
fn test_id_index_none_by_default() {
let dir = TempDir::new().unwrap();
let uri = dir.path().to_string_lossy().to_string();
let runtime = tokio::runtime::Runtime::new().unwrap();

runtime.block_on(async {
let mut store = ContextStore::open(&uri).await.unwrap();

store.add(&[text_record("no-idx-1", 0.0)]).await.unwrap();
store.compact(None).await.unwrap();

let indices = store.dataset.load_indices().await.unwrap();
assert!(
!indices.iter().any(|i| i.name == ID_INDEX_NAME),
"no id index should be created when IdIndexType::None"
);
});
}

#[test]
fn test_id_index_idempotent() {
let dir = TempDir::new().unwrap();
let uri = dir.path().to_string_lossy().to_string();
let runtime = tokio::runtime::Runtime::new().unwrap();

runtime.block_on(async {
let options = ContextStoreOptions {
id_index_type: IdIndexType::BTree,
..Default::default()
};
let mut store = ContextStore::open_with_options(&uri, options)
.await
.unwrap();

for i in 0..5 {
store
.add(&[text_record(&format!("idem-{i}"), i as f32)])
.await
.unwrap();
}

// Create index twice -- second call should be a no-op
store.create_id_index().await.unwrap();
let v1 = store.version();
store.ensure_id_index().await.unwrap();
let v2 = store.version();
assert_eq!(v1, v2, "ensure_id_index should not recreate existing index");
});
}
}
6 changes: 5 additions & 1 deletion python/python/lance_context/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ def __init__(
compaction_min_fragments: int = 5,
compaction_target_rows: int = 1_000_000,
quiet_hours: list[tuple[int, int]] | None = None,
id_index_type: str | None = None,
) -> None:
options = _merge_storage_options(
storage_options,
Expand All @@ -282,11 +283,12 @@ def __init__(
"quiet_hours": quiet_hours or [],
}

if options or compaction_config["enabled"]:
if options or compaction_config["enabled"] or id_index_type:
self._inner = _Context.create(
uri,
storage_options=options or None,
compaction_config=compaction_config,
id_index_type=id_index_type,
)
else:
self._inner = _Context.create(uri)
Expand All @@ -308,6 +310,7 @@ def create(
compaction_min_fragments: int = 5,
compaction_target_rows: int = 1_000_000,
quiet_hours: list[tuple[int, int]] | None = None,
id_index_type: str | None = None,
) -> Context:
return cls(
uri,
Expand All @@ -323,6 +326,7 @@ def create(
compaction_min_fragments=compaction_min_fragments,
compaction_target_rows=compaction_target_rows,
quiet_hours=quiet_hours,
id_index_type=id_index_type,
)

def uri(self) -> str:
Expand Down
18 changes: 16 additions & 2 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tokio::runtime::Runtime;
use lance_context::serde::CONTENT_TYPE_TEXT;
use lance_context::{
CompactionConfig, CompactionMetrics, CompactionStats, Context as RustContext, ContextRecord,
ContextStore, ContextStoreOptions, SearchResult,
ContextStore, ContextStoreOptions, IdIndexType, SearchResult,
};

const DEFAULT_BINARY_CONTENT_TYPE: &str = "application/octet-stream";
Expand Down Expand Up @@ -110,23 +110,37 @@ fn compaction_config_from_dict<'py>(
#[pymethods]
impl Context {
#[classmethod]
#[pyo3(signature = (uri, *, storage_options=None, compaction_config=None, blob_columns=None))]
#[pyo3(signature = (uri, *, storage_options=None, compaction_config=None, blob_columns=None, id_index_type=None))]
fn create(
_cls: &Bound<'_, PyType>,
py: Python<'_>,
uri: &str,
storage_options: Option<&Bound<'_, PyDict>>,
compaction_config: Option<&Bound<'_, PyDict>>,
blob_columns: Option<Vec<String>>,
id_index_type: Option<String>,
) -> PyResult<Self> {
let runtime = Arc::new(Runtime::new().map_err(to_py_err)?);

let blob_set: HashSet<String> = blob_columns.unwrap_or_default().into_iter().collect();

let id_idx = match id_index_type.as_deref() {
Some("btree") => IdIndexType::BTree,
Some("zonemap") => IdIndexType::ZoneMap,
Some("none") | None => IdIndexType::None,
Some(other) => {
return Err(PyRuntimeError::new_err(format!(
"invalid id_index_type '{}': valid values are 'btree', 'zonemap'",
other
)))
}
};

let options = ContextStoreOptions {
storage_options: storage_options_from_dict(storage_options)?,
compaction: compaction_config_from_dict(compaction_config)?,
blob_columns: blob_set,
id_index_type: id_idx,
};

let store_res =
Expand Down
Loading
Loading