Skip to content
17 changes: 12 additions & 5 deletions docs/src/guide/distributed_indexing.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ write:
3. Lance plans and builds index artifacts from the worker outputs supplied by the caller
4. the built artifacts are committed into the dataset manifest

For vector indices, the worker outputs are segments stored directly
under `indices/<segment_uuid>/`. Lance can turn these outputs into one or more
physical segments and then commit them as one logical index.
For vector indices and segment-native scalar indices, the worker outputs are
segments stored directly under `indices/<segment_uuid>/`. Lance can turn these
outputs into one or more physical segments and then commit them as one logical
index.

![Distributed Vector Segment Build](../images/distributed_vector_segment_build.svg)

Expand Down Expand Up @@ -81,7 +82,7 @@ launching workers and driving the overall workflow.

## Current Model

The current model for distributed vector indexing has two layers of parallelism.
The current model for distributed indexing has two layers of parallelism.

### Worker Build

Expand All @@ -105,6 +106,12 @@ or merged into larger segments:

Within a single commit, built segments must have disjoint fragment coverage.

`merge_existing_index_segments(...)` currently supports vector, inverted,
bitmap, BTree, and zone map segments. Other scalar index families can still
commit multiple compatible segments directly when their build path supports
fragment-scoped segments, but cannot be merged into a larger physical segment
until they add a merge implementation.

### Vector Model Scope

Distributed vector builds support two model scopes.
Expand Down Expand Up @@ -138,7 +145,7 @@ trained segments as separate physical segments.

## Internal Finalize Model

Internally, Lance models distributed vector segment build as:
Internally, Lance models distributed segment build as:

1. **build** one uncommitted segment per worker
2. **optionally merge** caller-defined groups of existing segments
Expand Down
51 changes: 51 additions & 0 deletions rust/lance-index/src/scalar/zonemap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,57 @@ impl ScalarIndex for ZoneMapIndex {
}
}

/// Merge caller-selected ZoneMap segments into one self-contained segment.
pub async fn merge_zonemap_indices(
source_indices: &[&ZoneMapIndex],
dest_store: &dyn IndexStore,
fragment_filter: &RoaringBitmap,
) -> Result<CreatedIndex> {
let first = source_indices.first().ok_or_else(|| {
Error::invalid_input("merge_zonemap_indices requires at least one source index")
})?;
let rows_per_zone = first.rows_per_zone;
let data_type = first.data_type.clone();

let mut zones = Vec::new();
for source in source_indices {
if source.rows_per_zone != rows_per_zone {
return Err(Error::invalid_input(format!(
"cannot merge ZoneMap segments with different rows_per_zone values: {} and {}",
rows_per_zone, source.rows_per_zone
)));
}
if source.data_type != data_type {
return Err(Error::invalid_input(format!(
"cannot merge ZoneMap segments with different value types: {:?} and {:?}",
data_type, source.data_type
)));
}
zones.extend(
source
.zones
.iter()
.filter(|zone| {
u32::try_from(zone.bound.fragment_id)
.is_ok_and(|fragment_id| fragment_filter.contains(fragment_id))
})
.cloned(),
);
}
zones.sort_by_key(|zone| (zone.bound.fragment_id, zone.bound.start));

let mut builder =
ZoneMapIndexBuilder::try_new(ZoneMapIndexBuilderParams::new(rows_per_zone), data_type)?;
builder.maps = zones;
builder.write_index(dest_store).await?;

Ok(CreatedIndex {
index_details: prost_types::Any::from_msg(&pbold::ZoneMapIndexDetails::default()).unwrap(),
index_version: ZONEMAP_INDEX_VERSION,
files: Some(dest_store.list_files_with_sizes().await?),
})
}

fn default_rows_per_zone() -> u64 {
*DEFAULT_ROWS_PER_ZONE
}
Expand Down
28 changes: 25 additions & 3 deletions rust/lance/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,13 @@ fn segment_has_btree_details(segment: &IndexMetadata) -> bool {
)
}

fn segment_has_zonemap_details(segment: &IndexMetadata) -> bool {
segment
.index_details
.as_ref()
.is_some_and(|details| details.type_url.ends_with("ZoneMapIndexDetails"))
}

// Cache keys for different index types
#[derive(Debug, Clone)]
pub(crate) struct LegacyVectorIndexCacheKey<'a> {
Expand Down Expand Up @@ -1130,7 +1137,8 @@ impl DatasetIndexExt for Dataset {
let all_inverted = source_segments.iter().all(segment_has_inverted_details);
let all_bitmap = source_segments.iter().all(segment_has_bitmap_details);
let all_btree = source_segments.iter().all(segment_has_btree_details);
if !all_vector && !all_inverted && !all_bitmap && !all_btree {
let all_zonemap = source_segments.iter().all(segment_has_zonemap_details);
if !all_vector && !all_inverted && !all_bitmap && !all_btree && !all_zonemap {
return Err(Error::invalid_input(
"merge_existing_index_segments requires all segments to have the same supported index type"
.to_string(),
Expand All @@ -1148,6 +1156,8 @@ impl DatasetIndexExt for Dataset {
crate::index::scalar::inverted::merge_segments(self, source_segments).await?
} else if all_bitmap {
crate::index::scalar::bitmap::merge_segments(self, source_segments).await?
} else if all_zonemap {
crate::index::scalar::zonemap::merge_segments(self, source_segments).await?
} else {
crate::index::scalar::btree::merge_segments(self, source_segments).await?
};
Expand Down Expand Up @@ -1214,7 +1224,8 @@ impl DatasetIndexExt for Dataset {
.is_none_or(|(details, expected)| details.type_url == expected)
})
.map(|idx| -> Result<Option<IndexMetadata>> {
let Some(existing_fragments) = idx.fragment_bitmap.as_ref() else {
let Some(existing_fragments) = idx.effective_fragment_bitmap(&dataset_fragments)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential issue: fully retired existing segments can reach this disjoint branch and be kept instead of removed. That leaves old zonemap segments referenced in the manifest after a merge, so the merge may not actually replace multiple physical segments with one.

else {
if incoming_fragments != dataset_fragments {
return Err(Error::invalid_input(format!(
"CreateIndex: cannot replace legacy index segment {} for '{}' with partial fragment coverage; rebuild all fragments in one commit",
Expand Down Expand Up @@ -6764,7 +6775,18 @@ mod tests {
)
.into_reader_rows(RowCount::from(20), BatchCount::from(2));

let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap();
let mut dataset = Dataset::write(
reader,
test_uri,
Some(WriteParams {
max_rows_per_file: 20,
max_rows_per_group: 20,
..Default::default()
}),
)
.await
.unwrap();
assert_eq!(dataset.get_fragments().len(), 2);

let field_id = dataset.schema().field("vector").unwrap().id;
let original = write_vector_segment_metadata(
Expand Down
1 change: 1 addition & 0 deletions rust/lance/src/index/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
pub(crate) mod bitmap;
pub(crate) mod btree;
pub(crate) mod inverted;
pub(crate) mod zonemap;

pub use inverted::{load_segment_details, load_segments};

Expand Down
86 changes: 86 additions & 0 deletions rust/lance/src/index/scalar/zonemap.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use std::sync::Arc;

use lance_index::metrics::NoOpMetricsCollector;
use lance_index::scalar::lance_format::LanceIndexStore;
use lance_index::scalar::zonemap::ZoneMapIndex;
use lance_table::format::IndexMetadata;
use roaring::RoaringBitmap;
use uuid::Uuid;

use crate::{Dataset, Error, Result, dataset::index::LanceIndexStoreExt};

/// Merge one caller-defined group of source ZoneMap segments into a single segment.
pub(in crate::index) async fn merge_segments(
dataset: &Dataset,
segments: Vec<IndexMetadata>,
) -> Result<IndexMetadata> {
if segments.is_empty() {
return Err(Error::index("No segment metadata was provided".to_string()));
}

let field_id = *segments[0].fields.first().ok_or_else(|| {
Error::invalid_input(format!(
"CreateIndex: segment {} is missing field ids",
segments[0].uuid
))
})?;
let field_path = dataset.schema().field_path(field_id)?;

let mut scalar_indices = Vec::with_capacity(segments.len());
let mut fragment_bitmap = RoaringBitmap::new();
let dataset_fragments = dataset.fragment_bitmap.as_ref();
for segment in &segments {
let effective = segment
.effective_fragment_bitmap(dataset_fragments)
.ok_or_else(|| {
Error::invalid_input(format!(
"CreateIndex: segment {} is missing fragment coverage",
segment.uuid
))
})?;
fragment_bitmap |= effective;
let scalar_index =
super::open_scalar_index(dataset, &field_path, segment, &NoOpMetricsCollector).await?;
scalar_indices.push((segment.uuid, scalar_index));
}

let mut source_indices = Vec::with_capacity(scalar_indices.len());
for (segment_uuid, scalar_index) in &scalar_indices {
let zonemap_index = scalar_index
.as_any()
.downcast_ref::<ZoneMapIndex>()
.ok_or_else(|| {
Error::index(format!(
"merge_existing_index_segments: expected zonemap segment {}, got {:?}",
segment_uuid,
scalar_index.index_type()
))
})?;
source_indices.push(zonemap_index);
}

let new_uuid = Uuid::new_v4();
let new_store = LanceIndexStore::from_dataset_for_new(dataset, &new_uuid.to_string())?;
let created_index = lance_index::scalar::zonemap::merge_zonemap_indices(
&source_indices,
&new_store,
&fragment_bitmap,
)
.await?;

Ok(IndexMetadata {
uuid: new_uuid,
fields: vec![field_id],
dataset_version: dataset.manifest.version,
fragment_bitmap: Some(fragment_bitmap),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a merge-then-commit regression test here? Since the merged segment keeps only live fragment coverage, committing it under the same index name can compare against the old segment's original coverage and report retired fragments as orphaned.

index_details: Some(Arc::new(created_index.index_details)),
index_version: created_index.index_version as i32,
created_at: Some(chrono::Utc::now()),
base_id: None,
files: created_index.files,
..segments[0].clone()
})
}
Loading
Loading