diff --git a/docs/src/guide/distributed_indexing.md b/docs/src/guide/distributed_indexing.md index ae4531331d0..389e5a1bc09 100644 --- a/docs/src/guide/distributed_indexing.md +++ b/docs/src/guide/distributed_indexing.md @@ -18,9 +18,10 @@ write: 3. Lance plans and builds index artifacts from the worker outputs supplied by the caller 4. the built artifacts are committed into the dataset manifest -For vector indices, the worker outputs are segments stored directly -under `indices//`. Lance can turn these outputs into one or more -physical segments and then commit them as one logical index. +For vector indices and segment-native scalar indices, the worker outputs are +segments stored directly under `indices//`. Lance can turn these +outputs into one or more physical segments and then commit them as one logical +index. ![Distributed Vector Segment Build](../images/distributed_vector_segment_build.svg) @@ -81,7 +82,7 @@ launching workers and driving the overall workflow. ## Current Model -The current model for distributed vector indexing has two layers of parallelism. +The current model for distributed indexing has two layers of parallelism. ### Worker Build @@ -105,6 +106,12 @@ or merged into larger segments: Within a single commit, built segments must have disjoint fragment coverage. +`merge_existing_index_segments(...)` currently supports vector, inverted, +bitmap, BTree, and zone map segments. Other scalar index families can still +commit multiple compatible segments directly when their build path supports +fragment-scoped segments, but cannot be merged into a larger physical segment +until they add a merge implementation. + ### Vector Model Scope Distributed vector builds support two model scopes. @@ -138,7 +145,7 @@ trained segments as separate physical segments. ## Internal Finalize Model -Internally, Lance models distributed vector segment build as: +Internally, Lance models distributed segment build as: 1. **build** one uncommitted segment per worker 2. **optionally merge** caller-defined groups of existing segments diff --git a/rust/lance-index/src/scalar/zonemap.rs b/rust/lance-index/src/scalar/zonemap.rs index 64601a64f96..da504cc6ba0 100644 --- a/rust/lance-index/src/scalar/zonemap.rs +++ b/rust/lance-index/src/scalar/zonemap.rs @@ -661,6 +661,57 @@ impl ScalarIndex for ZoneMapIndex { } } +/// Merge caller-selected ZoneMap segments into one self-contained segment. +pub async fn merge_zonemap_indices( + source_indices: &[&ZoneMapIndex], + dest_store: &dyn IndexStore, + fragment_filter: &RoaringBitmap, +) -> Result { + let first = source_indices.first().ok_or_else(|| { + Error::invalid_input("merge_zonemap_indices requires at least one source index") + })?; + let rows_per_zone = first.rows_per_zone; + let data_type = first.data_type.clone(); + + let mut zones = Vec::new(); + for source in source_indices { + if source.rows_per_zone != rows_per_zone { + return Err(Error::invalid_input(format!( + "cannot merge ZoneMap segments with different rows_per_zone values: {} and {}", + rows_per_zone, source.rows_per_zone + ))); + } + if source.data_type != data_type { + return Err(Error::invalid_input(format!( + "cannot merge ZoneMap segments with different value types: {:?} and {:?}", + data_type, source.data_type + ))); + } + zones.extend( + source + .zones + .iter() + .filter(|zone| { + u32::try_from(zone.bound.fragment_id) + .is_ok_and(|fragment_id| fragment_filter.contains(fragment_id)) + }) + .cloned(), + ); + } + zones.sort_by_key(|zone| (zone.bound.fragment_id, zone.bound.start)); + + let mut builder = + ZoneMapIndexBuilder::try_new(ZoneMapIndexBuilderParams::new(rows_per_zone), data_type)?; + builder.maps = zones; + builder.write_index(dest_store).await?; + + Ok(CreatedIndex { + index_details: prost_types::Any::from_msg(&pbold::ZoneMapIndexDetails::default()).unwrap(), + index_version: ZONEMAP_INDEX_VERSION, + files: Some(dest_store.list_files_with_sizes().await?), + }) +} + fn default_rows_per_zone() -> u64 { *DEFAULT_ROWS_PER_ZONE } diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 96eb7f88d32..69312cbc2f2 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -272,6 +272,13 @@ fn segment_has_btree_details(segment: &IndexMetadata) -> bool { ) } +fn segment_has_zonemap_details(segment: &IndexMetadata) -> bool { + segment + .index_details + .as_ref() + .is_some_and(|details| details.type_url.ends_with("ZoneMapIndexDetails")) +} + // Cache keys for different index types #[derive(Debug, Clone)] pub(crate) struct LegacyVectorIndexCacheKey<'a> { @@ -1130,7 +1137,8 @@ impl DatasetIndexExt for Dataset { let all_inverted = source_segments.iter().all(segment_has_inverted_details); let all_bitmap = source_segments.iter().all(segment_has_bitmap_details); let all_btree = source_segments.iter().all(segment_has_btree_details); - if !all_vector && !all_inverted && !all_bitmap && !all_btree { + let all_zonemap = source_segments.iter().all(segment_has_zonemap_details); + if !all_vector && !all_inverted && !all_bitmap && !all_btree && !all_zonemap { return Err(Error::invalid_input( "merge_existing_index_segments requires all segments to have the same supported index type" .to_string(), @@ -1148,6 +1156,8 @@ impl DatasetIndexExt for Dataset { crate::index::scalar::inverted::merge_segments(self, source_segments).await? } else if all_bitmap { crate::index::scalar::bitmap::merge_segments(self, source_segments).await? + } else if all_zonemap { + crate::index::scalar::zonemap::merge_segments(self, source_segments).await? } else { crate::index::scalar::btree::merge_segments(self, source_segments).await? }; @@ -1214,7 +1224,8 @@ impl DatasetIndexExt for Dataset { .is_none_or(|(details, expected)| details.type_url == expected) }) .map(|idx| -> Result> { - let Some(existing_fragments) = idx.fragment_bitmap.as_ref() else { + let Some(existing_fragments) = idx.effective_fragment_bitmap(&dataset_fragments) + else { if incoming_fragments != dataset_fragments { return Err(Error::invalid_input(format!( "CreateIndex: cannot replace legacy index segment {} for '{}' with partial fragment coverage; rebuild all fragments in one commit", @@ -1224,6 +1235,10 @@ impl DatasetIndexExt for Dataset { return Ok(Some(idx)); }; + if existing_fragments.is_empty() { + return Ok(Some(idx)); + } + if existing_fragments.is_disjoint(&incoming_fragments) { return Ok(None); } @@ -6764,7 +6779,18 @@ mod tests { ) .into_reader_rows(RowCount::from(20), BatchCount::from(2)); - let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap(); + let mut dataset = Dataset::write( + reader, + test_uri, + Some(WriteParams { + max_rows_per_file: 20, + max_rows_per_group: 20, + ..Default::default() + }), + ) + .await + .unwrap(); + assert_eq!(dataset.get_fragments().len(), 2); let field_id = dataset.schema().field("vector").unwrap().id; let original = write_vector_segment_metadata( diff --git a/rust/lance/src/index/scalar.rs b/rust/lance/src/index/scalar.rs index 92b06f0a1a5..11308279193 100644 --- a/rust/lance/src/index/scalar.rs +++ b/rust/lance/src/index/scalar.rs @@ -7,6 +7,7 @@ pub(crate) mod bitmap; pub(crate) mod btree; pub(crate) mod inverted; +pub(crate) mod zonemap; pub use inverted::{load_segment_details, load_segments}; diff --git a/rust/lance/src/index/scalar/zonemap.rs b/rust/lance/src/index/scalar/zonemap.rs new file mode 100644 index 00000000000..308fe1e693f --- /dev/null +++ b/rust/lance/src/index/scalar/zonemap.rs @@ -0,0 +1,86 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use std::sync::Arc; + +use lance_index::metrics::NoOpMetricsCollector; +use lance_index::scalar::lance_format::LanceIndexStore; +use lance_index::scalar::zonemap::ZoneMapIndex; +use lance_table::format::IndexMetadata; +use roaring::RoaringBitmap; +use uuid::Uuid; + +use crate::{Dataset, Error, Result, dataset::index::LanceIndexStoreExt}; + +/// Merge one caller-defined group of source ZoneMap segments into a single segment. +pub(in crate::index) async fn merge_segments( + dataset: &Dataset, + segments: Vec, +) -> Result { + if segments.is_empty() { + return Err(Error::index("No segment metadata was provided".to_string())); + } + + let field_id = *segments[0].fields.first().ok_or_else(|| { + Error::invalid_input(format!( + "CreateIndex: segment {} is missing field ids", + segments[0].uuid + )) + })?; + let field_path = dataset.schema().field_path(field_id)?; + + let mut scalar_indices = Vec::with_capacity(segments.len()); + let mut fragment_bitmap = RoaringBitmap::new(); + let dataset_fragments = dataset.fragment_bitmap.as_ref(); + for segment in &segments { + let effective = segment + .effective_fragment_bitmap(dataset_fragments) + .ok_or_else(|| { + Error::invalid_input(format!( + "CreateIndex: segment {} is missing fragment coverage", + segment.uuid + )) + })?; + fragment_bitmap |= effective; + let scalar_index = + super::open_scalar_index(dataset, &field_path, segment, &NoOpMetricsCollector).await?; + scalar_indices.push((segment.uuid, scalar_index)); + } + + let mut source_indices = Vec::with_capacity(scalar_indices.len()); + for (segment_uuid, scalar_index) in &scalar_indices { + let zonemap_index = scalar_index + .as_any() + .downcast_ref::() + .ok_or_else(|| { + Error::index(format!( + "merge_existing_index_segments: expected zonemap segment {}, got {:?}", + segment_uuid, + scalar_index.index_type() + )) + })?; + source_indices.push(zonemap_index); + } + + let new_uuid = Uuid::new_v4(); + let new_store = LanceIndexStore::from_dataset_for_new(dataset, &new_uuid.to_string())?; + let created_index = lance_index::scalar::zonemap::merge_zonemap_indices( + &source_indices, + &new_store, + &fragment_bitmap, + ) + .await?; + + Ok(IndexMetadata { + uuid: new_uuid, + fields: vec![field_id], + dataset_version: dataset.manifest.version, + fragment_bitmap: Some(fragment_bitmap), + index_details: Some(Arc::new(created_index.index_details)), + index_version: created_index.index_version as i32, + created_at: Some(chrono::Utc::now()), + base_id: None, + files: created_index.files, + ..segments[0].clone() + }) +} diff --git a/rust/lance/src/index/scalar_logical.rs b/rust/lance/src/index/scalar_logical.rs index d2ab4e4b9f0..1ca2e1bd9a8 100644 --- a/rust/lance/src/index/scalar_logical.rs +++ b/rust/lance/src/index/scalar_logical.rs @@ -326,6 +326,10 @@ mod tests { use lance_index::scalar::bitmap::BITMAP_LOOKUP_NAME; use lance_index::scalar::{BuiltinIndexType, SargableQuery, ScalarIndexParams}; + use crate::Dataset; + use crate::dataset::WriteParams; + use crate::dataset::optimize::{CompactionOptions, compact_files}; + use crate::dataset::write::WriteMode; use crate::index::create::CreateIndexBuilder; use crate::utils::test::{DatagenExt, FragmentCount, FragmentRowCount}; @@ -668,6 +672,348 @@ mod tests { ); } + #[tokio::test] + async fn test_merge_existing_index_segments_supports_zonemap_segments() { + let dataset = lance_datagen::gen_batch() + .col("value", array::step::()) + .into_ram_dataset(FragmentCount::from(4), FragmentRowCount::from(16)) + .await + .unwrap(); + let mut dataset = dataset; + let fragments = dataset.get_fragments(); + let zonemap_params = lance_index::scalar::zonemap::ZoneMapIndexBuilderParams::new(8); + let params_json = serde_json::to_value(&zonemap_params).unwrap(); + let params = + ScalarIndexParams::for_builtin(BuiltinIndexType::ZoneMap).with_params(¶ms_json); + let mut staged = Vec::new(); + + for fragment in &fragments { + let segment = + CreateIndexBuilder::new(&mut dataset, &["value"], IndexType::ZoneMap, ¶ms) + .name("value_zonemap_merged".to_string()) + .fragments(vec![fragment.id() as u32]) + .execute_uncommitted() + .await + .unwrap(); + staged.push(segment); + } + + let staged_uuids = staged + .iter() + .map(|segment| segment.uuid) + .collect::>(); + let merged = dataset.merge_existing_index_segments(staged).await.unwrap(); + assert!(!staged_uuids.contains(&merged.uuid)); + assert_eq!( + merged + .fragment_bitmap + .as_ref() + .unwrap() + .iter() + .collect::>(), + fragments + .iter() + .map(|fragment| fragment.id() as u32) + .collect::>() + ); + assert!( + merged + .files + .as_ref() + .unwrap() + .iter() + .any(|file| file.path == "zonemap.lance") + ); + + dataset + .commit_existing_index_segments("value_zonemap_merged", "value", vec![merged]) + .await + .unwrap(); + + let committed = dataset + .load_indices_by_name("value_zonemap_merged") + .await + .unwrap(); + assert_eq!(committed.len(), 1); + + let logical = open_named_scalar_index( + &dataset, + "value", + "value_zonemap_merged", + &NoOpMetricsCollector, + ) + .await + .unwrap(); + assert_eq!(logical.index_type(), IndexType::ZoneMap); + assert_eq!( + logical.statistics().unwrap()["rows_per_zone"], + serde_json::json!(8) + ); + assert_eq!( + logical.calculate_included_frags().await.unwrap(), + dataset.fragment_bitmap.as_ref().clone() + ); + + let query = SargableQuery::Range( + Bound::Included(ScalarValue::Int32(Some(0))), + Bound::Included(ScalarValue::Int32(Some(10_000))), + ); + let result = logical.search(&query, &NoOpMetricsCollector).await.unwrap(); + let searched_fragments = result + .row_addrs() + .true_rows() + .row_addrs() + .unwrap() + .map(|row_addr| RowAddress::from(u64::from(row_addr)).fragment_id()) + .collect::>(); + assert_eq!( + searched_fragments, + fragments + .iter() + .map(|fragment| fragment.id() as u32) + .collect::>() + ); + + let selective_query = SargableQuery::Range( + Bound::Included(ScalarValue::Int32(Some(20))), + Bound::Included(ScalarValue::Int32(Some(43))), + ); + let selective_result = logical + .search(&selective_query, &NoOpMetricsCollector) + .await + .unwrap(); + let selective_fragments = selective_result + .row_addrs() + .true_rows() + .row_addrs() + .unwrap() + .map(|row_addr| RowAddress::from(u64::from(row_addr)).fragment_id()) + .collect::>(); + assert_eq!( + selective_fragments, + fragments[1..=2] + .iter() + .map(|fragment| fragment.id() as u32) + .collect::>() + ); + } + + #[tokio::test] + async fn test_merge_existing_zonemap_segments_drops_retired_fragments() { + let tmpdir = TempStrDir::default(); + let dataset_uri = format!("file://{}", tmpdir.as_str()); + let reader = lance_datagen::gen_batch() + .col("value", array::step::()) + .into_reader_rows( + lance_datagen::RowCount::from(64), + lance_datagen::BatchCount::from(2), + ); + let mut dataset = Dataset::write( + reader, + &dataset_uri, + Some(WriteParams { + max_rows_per_file: 64, + mode: WriteMode::Overwrite, + enable_stable_row_ids: true, + ..Default::default() + }), + ) + .await + .unwrap(); + let params = ScalarIndexParams::for_builtin(BuiltinIndexType::ZoneMap); + let mut staged = Vec::new(); + for fragment in dataset.get_fragments() { + staged.push( + CreateIndexBuilder::new(&mut dataset, &["value"], IndexType::ZoneMap, ¶ms) + .name("value_zonemap_retired".to_string()) + .fragments(vec![fragment.id() as u32]) + .execute_uncommitted() + .await + .unwrap(), + ); + } + dataset + .commit_existing_index_segments("value_zonemap_retired", "value", staged) + .await + .unwrap(); + + dataset.delete("value < 16").await.unwrap(); + compact_files( + &mut dataset, + CompactionOptions { + target_rows_per_fragment: 64, + ..Default::default() + }, + None, + ) + .await + .unwrap(); + let live_frags = dataset.fragment_bitmap.as_ref().clone(); + assert!(!live_frags.contains(0), "compaction should retire frag 0"); + + let merged = dataset + .merge_existing_index_segments( + dataset + .load_indices_by_name("value_zonemap_retired") + .await + .unwrap(), + ) + .await + .unwrap(); + let coverage = merged.fragment_bitmap.as_ref().unwrap(); + assert!(!coverage.contains(0), "must drop retired frag 0"); + assert!(coverage.contains(1), "must keep live indexed frag 1"); + + let field_path = dataset.schema().field_path(merged.fields[0]).unwrap(); + let index = crate::index::scalar::open_scalar_index( + &dataset, + &field_path, + &merged, + &NoOpMetricsCollector, + ) + .await + .unwrap(); + let query = SargableQuery::Range( + Bound::Included(ScalarValue::Int32(Some(0))), + Bound::Excluded(ScalarValue::Int32(Some(16))), + ); + let searched_fragments = index + .search(&query, &NoOpMetricsCollector) + .await + .unwrap() + .row_addrs() + .true_rows() + .row_addrs() + .unwrap() + .map(|row_addr| RowAddress::from(u64::from(row_addr)).fragment_id()) + .collect::>(); + assert!( + searched_fragments.is_empty(), + "must filter retired-fragment zones" + ); + } + + #[tokio::test] + async fn test_merge_then_commit_zonemap_segment_ignores_retired_fragment_coverage() { + let tmpdir = TempStrDir::default(); + let dataset_uri = format!("file://{}", tmpdir.as_str()); + let reader = lance_datagen::gen_batch() + .col("value", array::step::()) + .into_reader_rows( + lance_datagen::RowCount::from(64), + lance_datagen::BatchCount::from(2), + ); + let mut dataset = Dataset::write( + reader, + &dataset_uri, + Some(WriteParams { + max_rows_per_file: 64, + mode: WriteMode::Overwrite, + enable_stable_row_ids: true, + ..Default::default() + }), + ) + .await + .unwrap(); + let params = ScalarIndexParams::for_builtin(BuiltinIndexType::ZoneMap); + let segment = + CreateIndexBuilder::new(&mut dataset, &["value"], IndexType::ZoneMap, ¶ms) + .name("value_zonemap_replace_retired".to_string()) + .execute_uncommitted() + .await + .unwrap(); + let original_coverage = segment.fragment_bitmap.as_ref().unwrap().clone(); + assert!(original_coverage.contains(0)); + assert!(original_coverage.contains(1)); + + dataset + .commit_existing_index_segments("value_zonemap_replace_retired", "value", vec![segment]) + .await + .unwrap(); + + dataset.delete("value < 16").await.unwrap(); + compact_files( + &mut dataset, + CompactionOptions { + target_rows_per_fragment: 64, + ..Default::default() + }, + None, + ) + .await + .unwrap(); + let live_frags = dataset.fragment_bitmap.as_ref().clone(); + assert!(!live_frags.contains(0), "compaction should retire frag 0"); + + let merged = dataset + .merge_existing_index_segments( + dataset + .load_indices_by_name("value_zonemap_replace_retired") + .await + .unwrap(), + ) + .await + .unwrap(); + let merged_coverage = merged.fragment_bitmap.as_ref().unwrap().clone(); + let merged_uuid = merged.uuid; + + dataset + .commit_existing_index_segments("value_zonemap_replace_retired", "value", vec![merged]) + .await + .unwrap(); + + let committed = dataset + .load_indices_by_name("value_zonemap_replace_retired") + .await + .unwrap(); + assert_eq!(committed.len(), 1); + assert_eq!(committed[0].uuid, merged_uuid); + + let combined_bitmap = + scalar_index_fragment_bitmap(&dataset, "value", "value_zonemap_replace_retired") + .await + .unwrap() + .unwrap(); + assert_eq!(combined_bitmap, merged_coverage); + } + + #[tokio::test] + async fn test_merge_existing_index_segments_rejects_mismatched_zonemap_params() { + let dataset = lance_datagen::gen_batch() + .col("value", array::step::()) + .into_ram_dataset(FragmentCount::from(2), FragmentRowCount::from(16)) + .await + .unwrap(); + let mut dataset = dataset; + let fragments = dataset.get_fragments(); + let mut staged = Vec::new(); + + for (fragment, rows_per_zone) in fragments.iter().zip([8, 16]) { + let zonemap_params = + lance_index::scalar::zonemap::ZoneMapIndexBuilderParams::new(rows_per_zone); + let params_json = serde_json::to_value(&zonemap_params).unwrap(); + let params = + ScalarIndexParams::for_builtin(BuiltinIndexType::ZoneMap).with_params(¶ms_json); + let segment = + CreateIndexBuilder::new(&mut dataset, &["value"], IndexType::ZoneMap, ¶ms) + .name("value_zonemap_mismatched".to_string()) + .fragments(vec![fragment.id() as u32]) + .execute_uncommitted() + .await + .unwrap(); + staged.push(segment); + } + + let err = dataset + .merge_existing_index_segments(staged) + .await + .unwrap_err(); + assert!( + err.to_string().contains("different rows_per_zone values"), + "unexpected error: {err}" + ); + } + #[tokio::test] async fn test_commit_existing_zonemap_segments_replaces_overlapping_segments() { let dataset = lance_datagen::gen_batch()