diff --git a/docs/src/format/index/scalar/fmindex.md b/docs/src/format/index/scalar/fmindex.md new file mode 100644 index 00000000000..1138024896f --- /dev/null +++ b/docs/src/format/index/scalar/fmindex.md @@ -0,0 +1,64 @@ +# FM-Index (Full-text / Substring / Regex Search) + +The FM-Index (Ferragina-Manzini Index) is a compressed substring index based on the Burrows-Wheeler Transform (BWT). Unlike traditional inverted indexes (Full-Text Search) which index distinct words, the FM-Index enables efficient **arbitrary substring search**, **prefix match**, and **suffix/regular-expression search** directly on raw bytes. + +In Lance, the FM-Index is designed to scale dynamically across millions of documents or large-scale datasets, and is partitioned using Lance's **Segmented Index** architecture to support incremental appends, disjoint fragment tracking, and segment merging. + +## High-Level Architecture + +The FM-Index indexes raw text by treating columns of strings or binary payloads as raw byte arrays. + +``` + +----------------------------------------+ + | Lance Dataset | + | (Disjoint groups of Fragments 0..N) | + +----------------------------------------+ + | + Divide fragments into num_segments + | + v + +----------------------------------------+ + | Segmented Index | + | +-----------+ +-----------+ +-------+ | + | | Segment 1 | | Segment 2 | | ... | | + | | (FM-Idx) | | (FM-Idx) | | | | + | +-----------+ +-----------+ +-------+ | + +----------------------------------------+ +``` + +Each segment contains its own self-contained physical FM-Index mapping byte sub-sequences to Lance global row IDs. + +## Data Normalization & Sanitization + +The FM-Index is **normalization-independent by design** because it operates entirely on raw bytes. + +### Byte Sanitization vs. Text Normalization + +1. **Byte Sanitization (Core Index Layer)**: + The physical FM-Index uses specific sentinel bytes internally to mark boundaries: + - `\x00` is reserved as the global Burrows-Wheeler Transform (BWT) terminator character. + - `\xFF` is reserved as the document/row separator character. + + To avoid breaking the indexing structures, any incoming occurrences of `\x00` or `\xFF` are sanitized by remapping them to space (`\x20`) characters at index-build time. No other bytes are changed in this layer. + +2. **Text Normalization (User/Application Layer)**: + Because the index faithfully maps raw bytes, any semantic normalization (such as case folding `Hello` -> `hello`, Unicode NFKC normalization, stemming, or whitespace collapsing) is fully decoupled from the core index engine: + - To build a case-insensitive search index, users apply a lowercase transform to the column *prior* to indexing. + - When querying, the user's query text must undergo the exact same normalization pipeline. + +## Configurable Segment Partitioning + +Merging or appending to BWT-based indexes cannot be done via simple concatenation; the BWT suffix array must be reconstructed by re-reading the text and rebuilding. To balance build cost and search performance, Lance allows configuring how fragments map to index segments. + +- **`num_segments` parameter**: Configured at index-creation time. If `num_segments` is specified (e.g. `num_segments = 4`), Lance splits the target dataset fragments into disjoint subsets and builds independent FM-Index segments over each chunk. +- **Unindexed Appends**: When new fragments are appended to the dataset, a subsequent `create_index` execution with unindexed fragment coverage will construct a new separate segment representing only those new fragments, keeping existing segments fully intact. +- **Segment Merging**: Multiple existing index segments can be merged into a single segment under Lance's `merge_segments` protocol. Lance unions the fragment coverage bitmaps of the selected segments, re-reads the raw text from those covered fragments, and constructs a fresh unified FM-Index. + +## Query Evaluation + +When a substring query is submitted (e.g., `CONTAINS(column, "query_string")`): +1. The search string is sanitized (remapping any `\x00` or `\xFF` to spaces) and optionally normalized if the target index is normalized. +2. The query is dispatched across all active segments in the logical index in parallel. +3. Each segment performs a BWT backward-search to locate occurrences of the pattern. +4. Matching offsets are mapped back to absolute dataset Row IDs. +5. Results from all segments are unioned to produce the final selection. diff --git a/java/lance-jni/src/blocking_dataset.rs b/java/lance-jni/src/blocking_dataset.rs index 935de3e8a35..8e09be45597 100644 --- a/java/lance-jni/src/blocking_dataset.rs +++ b/java/lance-jni/src/blocking_dataset.rs @@ -974,7 +974,7 @@ fn inner_create_index<'local>( | IndexType::NGram | IndexType::ZoneMap | IndexType::BloomFilter - | IndexType::FMIndex + | IndexType::Fm | IndexType::RTree => { // For scalar indices, create a scalar IndexParams let (index_type_str, params_opt) = get_scalar_index_params(env, params_jobj)?; diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 8c16ca85399..51f2c871fb2 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -2236,6 +2236,7 @@ impl Dataset { "LABEL_LIST" => IndexType::LabelList, "RTREE" => IndexType::RTree, "INVERTED" | "FTS" => IndexType::Inverted, + "FM" => IndexType::Fm, "IVF_FLAT" | "IVF_PQ" | "IVF_SQ" | "IVF_RQ" | "IVF_HNSW_FLAT" | "IVF_HNSW_PQ" | "IVF_HNSW_SQ" => IndexType::Vector, _ => { @@ -2275,6 +2276,27 @@ impl Dataset { index_type: "rtree".to_string(), params: None, }), + "FM" => { + let mut params_json = serde_json::Map::new(); + if let Some(kwargs) = kwargs + && let Some(num_segments) = kwargs.get_item("num_segments")? + { + let n: u32 = num_segments.extract()?; + params_json.insert( + "num_segments".to_string(), + serde_json::Value::Number(n.into()), + ); + } + let params = if params_json.is_empty() { + None + } else { + Some(serde_json::Value::Object(params_json).to_string()) + }; + Box::new(ScalarIndexParams { + index_type: "fm".to_string(), + params, + }) + } "SCALAR" => { let Some(kwargs) = kwargs else { return Err(PyValueError::new_err( diff --git a/rust/lance-index/src/lib.rs b/rust/lance-index/src/lib.rs index fee421bab08..d3152278f3c 100644 --- a/rust/lance-index/src/lib.rs +++ b/rust/lance-index/src/lib.rs @@ -125,7 +125,7 @@ pub enum IndexType { RTree = 10, // RTree - FMIndex = 11, // FM-Index + Fm = 11, // FM-Index // 100+ and up for vector index. /// Flat vector index. @@ -152,7 +152,7 @@ impl std::fmt::Display for IndexType { Self::ZoneMap => write!(f, "ZoneMap"), Self::BloomFilter => write!(f, "BloomFilter"), Self::RTree => write!(f, "RTree"), - Self::FMIndex => write!(f, "FMIndex"), + Self::Fm => write!(f, "Fm"), Self::Vector | Self::IvfPq => write!(f, "IVF_PQ"), Self::IvfFlat => write!(f, "IVF_FLAT"), Self::IvfSq => write!(f, "IVF_SQ"), @@ -180,7 +180,7 @@ impl TryFrom for IndexType { v if v == Self::ZoneMap as i32 => Ok(Self::ZoneMap), v if v == Self::BloomFilter as i32 => Ok(Self::BloomFilter), v if v == Self::RTree as i32 => Ok(Self::RTree), - v if v == Self::FMIndex as i32 => Ok(Self::FMIndex), + v if v == Self::Fm as i32 => Ok(Self::Fm), v if v == Self::Vector as i32 => Ok(Self::Vector), v if v == Self::IvfFlat as i32 => Ok(Self::IvfFlat), v if v == Self::IvfSq as i32 => Ok(Self::IvfSq), @@ -209,7 +209,7 @@ impl TryFrom<&str> for IndexType { "ZoneMap" | "ZONEMAP" => Ok(Self::ZoneMap), "BloomFilter" | "BLOOMFILTER" | "BLOOM_FILTER" => Ok(Self::BloomFilter), "RTree" | "RTREE" | "R_TREE" => Ok(Self::RTree), - "FMIndex" | "FMINDEX" | "FM_INDEX" => Ok(Self::FMIndex), + "Fm" | "FM" => Ok(Self::Fm), "Vector" | "VECTOR" => Ok(Self::Vector), "IVF_FLAT" => Ok(Self::IvfFlat), "IVF_SQ" => Ok(Self::IvfSq), @@ -241,7 +241,7 @@ impl IndexType { | Self::ZoneMap | Self::BloomFilter | Self::RTree - | Self::FMIndex, + | Self::Fm, ) } @@ -281,7 +281,7 @@ impl IndexType { Self::ZoneMap => 0, Self::BloomFilter => 0, Self::RTree => 0, - Self::FMIndex => 0, + Self::Fm => 0, // IMPORTANT: if any vector index subtype needs a format bump that is // not backward compatible, its new version must be set to @@ -396,7 +396,7 @@ mod tests { IndexType::ZoneMap, IndexType::BloomFilter, IndexType::RTree, - IndexType::FMIndex, + IndexType::Fm, IndexType::Vector, IndexType::IvfFlat, IndexType::IvfSq, @@ -438,9 +438,8 @@ mod tests { ("RTree", IndexType::RTree), ("RTREE", IndexType::RTree), ("R_TREE", IndexType::RTree), - ("FMIndex", IndexType::FMIndex), - ("FMINDEX", IndexType::FMIndex), - ("FM_INDEX", IndexType::FMIndex), + ("Fm", IndexType::Fm), + ("FM", IndexType::Fm), ("Vector", IndexType::Vector), ("VECTOR", IndexType::Vector), ("IVF_FLAT", IndexType::IvfFlat), diff --git a/rust/lance-index/src/registry.rs b/rust/lance-index/src/registry.rs index 1608baec8e6..bca4c853272 100644 --- a/rust/lance-index/src/registry.rs +++ b/rust/lance-index/src/registry.rs @@ -29,7 +29,12 @@ impl IndexPluginRegistry { fn get_plugin_name_from_details_name(&self, details_name: &str) -> String { let details_name = Self::normalize_plugin_name(details_name); if details_name.ends_with("indexdetails") { - details_name.replace("indexdetails", "") + let plugin_name = details_name.replace("indexdetails", ""); + if plugin_name == "fmindex" { + "fm".to_string() + } else { + plugin_name + } } else { details_name } @@ -123,6 +128,7 @@ mod tests { ("NGRAM", "NGram"), ("ZONEMAP", "ZoneMap"), ("BLOOMFILTER", "BloomFilter"), + ("FM", "Fm"), ("JSON", "Json"), ] { let plugin = registry.get_plugin_by_name(requested_name).unwrap(); diff --git a/rust/lance-index/src/scalar.rs b/rust/lance-index/src/scalar.rs index 772dfaf4089..869a4fe6778 100644 --- a/rust/lance-index/src/scalar.rs +++ b/rust/lance-index/src/scalar.rs @@ -75,7 +75,7 @@ pub enum BuiltinIndexType { BloomFilter, RTree, Inverted, - FMIndex, + Fm, } impl BuiltinIndexType { @@ -89,7 +89,7 @@ impl BuiltinIndexType { Self::Inverted => "inverted", Self::BloomFilter => "bloomfilter", Self::RTree => "rtree", - Self::FMIndex => "fmindex", + Self::Fm => "fm", } } } @@ -107,7 +107,7 @@ impl TryFrom for BuiltinIndexType { IndexType::Inverted => Ok(Self::Inverted), IndexType::BloomFilter => Ok(Self::BloomFilter), IndexType::RTree => Ok(Self::RTree), - IndexType::FMIndex => Ok(Self::FMIndex), + IndexType::Fm => Ok(Self::Fm), _ => Err(Error::index("Invalid index type".to_string())), } } diff --git a/rust/lance-index/src/scalar/fmindex.rs b/rust/lance-index/src/scalar/fmindex.rs index 331be04c538..d32e5b285e0 100644 --- a/rust/lance-index/src/scalar/fmindex.rs +++ b/rust/lance-index/src/scalar/fmindex.rs @@ -31,7 +31,7 @@ use datafusion::execution::SendableRecordBatchStream; use deepsize::DeepSizeOf; use futures::StreamExt; use lance_core::cache::LanceCache; -use lance_core::{Error, Result}; +use lance_core::{Error, ROW_ADDR, Result}; use roaring::RoaringBitmap; use crate::frag_reuse::FragReuseIndex; @@ -40,6 +40,7 @@ use crate::pb; use crate::scalar::expression::{ScalarQueryParser, TextQueryParser}; use crate::scalar::registry::{ DefaultTrainingRequest, ScalarIndexPlugin, TrainingCriteria, TrainingOrdering, TrainingRequest, + VALUE_COLUMN_NAME, }; use crate::scalar::{ AnyQuery, BuiltinIndexType, CreatedIndex, IndexStore, OldIndexDataFilter, ScalarIndex, @@ -57,7 +58,7 @@ const SENTINEL_BYTE: u8 = 0xFF; const SA_SAMPLE_RATE: usize = 32; fn fmindex_partition_path(partition_id: u64) -> String { - format!("part_{partition_id}_fmindex.lance") + format!("part_{partition_id}_fm.lance") } // ── Bitvector with O(1) rank ───────────────────────────────────────────────── @@ -761,6 +762,7 @@ impl FMIndex { (lo, hi) } + #[cfg(test)] fn search(&self, pattern: &[u8]) -> RoaringBitmap { let (lo, hi) = self.backward_search(pattern); if lo >= hi { @@ -775,6 +777,25 @@ impl FMIndex { result } + /// Search returning full u64 row addresses (preserving fragment ID in upper bits). + fn search_row_addrs(&self, pattern: &[u8]) -> Vec { + let (lo, hi) = self.backward_search(pattern); + if lo >= hi { + return Vec::new(); + } + let mut seen = std::collections::HashSet::new(); + let mut result = Vec::new(); + for i in lo..hi { + let text_pos = self.locate(i); + let doc_idx = self.doc_for_position(text_pos); + let row_addr = self.row_ids[doc_idx]; + if seen.insert(row_addr) { + result.push(row_addr); + } + } + result + } + fn serialize_huffman_codes(&self) -> Vec { let mut buf = Vec::new(); for code in &self.wavelet.codes { @@ -987,6 +1008,7 @@ impl LazyFMIndex { } } + #[cfg(test)] fn search(&self, pattern: &[u8]) -> RoaringBitmap { let (lo, hi) = self.backward_search(pattern); if lo >= hi { @@ -1001,6 +1023,25 @@ impl LazyFMIndex { result } + /// Search returning full u64 row addresses (preserving fragment ID in upper bits). + fn search_row_addrs(&self, pattern: &[u8]) -> Vec { + let (lo, hi) = self.backward_search(pattern); + if lo >= hi { + return Vec::new(); + } + let mut seen = std::collections::HashSet::new(); + let mut result = Vec::new(); + for i in lo..hi { + let text_pos = self.locate(i); + let doc_idx = self.doc_for_position(text_pos); + let row_addr = self.row_ids[doc_idx]; + if seen.insert(row_addr) { + result.push(row_addr); + } + } + result + } + #[allow(clippy::too_many_arguments)] async fn from_reader( reader: Arc, @@ -1226,7 +1267,7 @@ impl FMIndexScalarIndex { if let Some(id) = f .path .strip_prefix("part_") - .and_then(|r| r.strip_suffix("_fmindex.lance")) + .and_then(|r| r.strip_suffix("_fm.lance")) .and_then(|s| s.parse::().ok()) { pfiles.push((id, f.path.clone())); @@ -1256,7 +1297,7 @@ impl Index for FMIndexScalarIndex { } fn as_vector_index(self: Arc) -> Result> { Err(Error::invalid_input_source( - "FMIndex is not a vector index".into(), + "Fm is not a vector index".into(), )) } async fn prewarm(&self) -> Result<()> { @@ -1264,14 +1305,14 @@ impl Index for FMIndexScalarIndex { } fn statistics(&self) -> Result { Ok(serde_json::json!({ - "type": "FMIndex", + "type": "Fm", "num_partitions": self.partitions.len(), "total_bwt_len": self.partitions.iter().map(|p| p.fm.wavelet.len).sum::(), "total_docs": self.partitions.iter().map(|p| p.fm.row_ids.len()).sum::(), })) } fn index_type(&self) -> IndexType { - IndexType::FMIndex + IndexType::Fm } async fn calculate_included_frags(&self) -> Result { let mut frags = RoaringBitmap::new(); @@ -1294,7 +1335,7 @@ impl ScalarIndex for FMIndexScalarIndex { let tq = query .as_any() .downcast_ref::() - .ok_or_else(|| Error::invalid_input("FMIndex only supports TextQuery"))?; + .ok_or_else(|| Error::invalid_input("Fm only supports TextQuery"))?; match tq { TextQuery::StringContains(pattern) => { let pb = pattern.as_bytes(); @@ -1302,8 +1343,8 @@ impl ScalarIndex for FMIndexScalarIndex { let mut tree = RowAddrTreeMap::new(); for p in &self.partitions { p.fm.prewarm().await?; - for rid in p.fm.search(pb).iter() { - tree.insert(rid as u64); + for row_addr in p.fm.search_row_addrs(pb) { + tree.insert(row_addr); } } Ok(SearchResult::Exact(lance_select::NullableRowAddrSet::new( @@ -1321,13 +1362,13 @@ impl ScalarIndex for FMIndexScalarIndex { _: &HashMap>, _: &dyn IndexStore, ) -> Result { - Err(Error::not_supported("FMIndex does not support remap")) + Err(Error::not_supported("Fm does not support remap")) } async fn update( &self, new_data: SendableRecordBatchStream, dest: &dyn IndexStore, - _: Option, + _old_data_filter: Option, ) -> Result { let texts = collect_texts(new_data).await?; write_partitioned_fmindex(&texts, dest).await?; @@ -1338,10 +1379,12 @@ impl ScalarIndex for FMIndexScalarIndex { }) } fn update_criteria(&self) -> UpdateCriteria { - UpdateCriteria::only_new_data(TrainingCriteria::new(TrainingOrdering::None)) + UpdateCriteria::requires_old_data( + TrainingCriteria::new(TrainingOrdering::None).with_row_addr(), + ) } fn derive_index_params(&self) -> Result { - Ok(ScalarIndexParams::for_builtin(BuiltinIndexType::FMIndex)) + Ok(ScalarIndexParams::for_builtin(BuiltinIndexType::Fm)) } } @@ -1349,20 +1392,23 @@ impl ScalarIndex for FMIndexScalarIndex { async fn collect_texts(mut stream: SendableRecordBatchStream) -> Result)>> { let mut texts = Vec::new(); - let mut next_id = 0u64; while let Some(batch) = stream.next().await { let batch = batch?; - let row_ids: Option<&arrow_array::UInt64Array> = batch - .column_by_name("_rowid") - .or_else(|| batch.column_by_name("_rowaddr")) - .and_then(|c| c.as_any().downcast_ref()); - let value_col = batch.column(0); + // Prefer _rowaddr (global row address) over _rowid to ensure stable, + // globally unique identifiers across segments. + let row_addrs: &arrow_array::UInt64Array = batch + .column_by_name(ROW_ADDR) + .or_else(|| batch.column_by_name("_rowid")) + .and_then(|c| c.as_any().downcast_ref()) + .ok_or_else(|| { + Error::invalid_input("Fm training data must include _rowaddr or _rowid column") + })?; + // Use the named value column; fall back to column(0) for legacy streams + let value_col = batch + .column_by_name(VALUE_COLUMN_NAME) + .unwrap_or_else(|| batch.column(0)); for i in 0..batch.num_rows() { - let rid = row_ids.map(|ids| ids.value(i)).unwrap_or_else(|| { - let id = next_id; - next_id += 1; - id - }); + let rid = row_addrs.value(i); if let Some(bytes) = extract_text_bytes(value_col.as_ref(), i)? { let sanitized: Vec = bytes .iter() @@ -1421,7 +1467,7 @@ fn extract_text_bytes(array: &dyn arrow_array::Array, index: usize) -> Result Err(Error::invalid_input(format!( - "FMIndex does not support data type: {:?}", + "Fm does not support data type: {:?}", array.data_type() ))), } @@ -1545,7 +1591,7 @@ pub struct FMIndexPlugin; #[async_trait] impl ScalarIndexPlugin for FMIndexPlugin { fn name(&self) -> &str { - "FMIndex" + "Fm" } fn new_training_request( &self, @@ -1562,7 +1608,7 @@ impl ScalarIndexPlugin for FMIndexPlugin { } } Ok(Box::new(DefaultTrainingRequest::new( - TrainingCriteria::new(TrainingOrdering::None), + TrainingCriteria::new(TrainingOrdering::None).with_row_addr(), ))) } async fn train_index( @@ -2014,23 +2060,23 @@ mod tests { use arrow_array::{StringArray, UInt64Array}; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use futures::stream; - use lance_core::ROW_ID; + use lance_core::ROW_ADDR; let docs = vec!["hello world", "hello rust", "goodbye world"]; - let row_ids: Vec = vec![0, 1, 2]; + let row_addrs: Vec = vec![0, 1, 2]; let schema = Arc::new(arrow_schema::Schema::new(vec![ arrow_schema::Field::new( crate::scalar::registry::VALUE_COLUMN_NAME, DataType::Utf8, false, ), - arrow_schema::Field::new(ROW_ID, DataType::UInt64, false), + arrow_schema::Field::new(ROW_ADDR, DataType::UInt64, false), ])); let batch = RecordBatch::try_new( schema.clone(), vec![ Arc::new(StringArray::from(docs)), - Arc::new(UInt64Array::from(row_ids)), + Arc::new(UInt64Array::from(row_addrs)), ], ) .unwrap(); @@ -2149,7 +2195,7 @@ mod tests { .unwrap(); let stats = index.statistics().unwrap(); - assert_eq!(stats["type"], "FMIndex"); + assert_eq!(stats["type"], "Fm"); assert_eq!(stats["total_docs"], 10); assert!(stats["total_bwt_len"].as_u64().unwrap() > 0); }); diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 3a5f9975810..8375e88bd59 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -271,6 +271,13 @@ fn segment_has_btree_details(segment: &IndexMetadata) -> bool { ) } +fn segment_has_fmindex_details(segment: &IndexMetadata) -> bool { + segment + .index_details + .as_ref() + .is_some_and(|details| details.type_url.ends_with("FMIndexIndexDetails")) +} + // Cache keys for different index types #[derive(Debug, Clone)] pub(crate) struct LegacyVectorIndexCacheKey<'a> { @@ -437,7 +444,7 @@ fn legacy_type_name(index_uri: &str, index_type_hint: Option<&str>) -> String { "BloomFilter" => IndexType::BloomFilter.to_string(), "RTree" => IndexType::RTree.to_string(), "Inverted" => IndexType::Inverted.to_string(), - "FMIndex" => IndexType::FMIndex.to_string(), + "FMIndex" => IndexType::Fm.to_string(), "Json" => IndexType::Scalar.to_string(), "Flat" | "Vector" => IndexType::Vector.to_string(), other if other.contains("Vector") => IndexType::Vector.to_string(), @@ -1127,7 +1134,8 @@ impl DatasetIndexExt for Dataset { let all_inverted = source_segments.iter().all(segment_has_inverted_details); let all_bitmap = source_segments.iter().all(segment_has_bitmap_details); let all_btree = source_segments.iter().all(segment_has_btree_details); - if !all_vector && !all_inverted && !all_bitmap && !all_btree { + let all_fmindex = source_segments.iter().all(segment_has_fmindex_details); + if !all_vector && !all_inverted && !all_bitmap && !all_btree && !all_fmindex { return Err(Error::invalid_input( "merge_existing_index_segments requires all segments to have the same supported index type" .to_string(), @@ -1143,6 +1151,8 @@ impl DatasetIndexExt for Dataset { .await? } else if all_inverted { crate::index::scalar::inverted::merge_segments(self, source_segments).await? + } else if all_fmindex { + crate::index::scalar::fmindex::merge_segments(self, source_segments).await? } else if all_bitmap { crate::index::scalar::bitmap::merge_segments(self, source_segments).await? } else { diff --git a/rust/lance/src/index/append.rs b/rust/lance/src/index/append.rs index a89b64df276..662576c8510 100644 --- a/rust/lance/src/index/append.rs +++ b/rust/lance/src/index/append.rs @@ -192,6 +192,7 @@ async fn merge_scalar_indices<'a>( &NoOpMetricsCollector, ) .await?; + let update_criteria = reference_index.update_criteria(); // Effective = bitmap ∩ live fragments; deleted = bitmap \ live fragments. let mut effective_old_frags = RoaringBitmap::new(); @@ -213,15 +214,19 @@ async fn merge_scalar_indices<'a>( // rescanning the dataset let has_segment_merge_primitive = matches!(index_type, IndexType::BTree); - // Merge new data into the existing segment(s) instead of rebuilding from - // scratch, when both hold: + // Merge new data into the existing segment(s) without rebuilding from + // scratch, when all hold: // - `effective_old_frags`: the selected segments' coverage intersected // with live fragments is non-empty, i.e. there is old data worth keeping. + // - `update_criteria` only requires the newly appended data. Indexes that + // need old data must rebuild over `frag_bitmap` so the scanned rows + // exactly match the segment coverage being committed. // - `has_segment_merge_primitive` (Indices supports N:1 segments merge) OR // `selected_old_indices.len() == 1` (any scalar type can `update` one). // Otherwise (e.g. ≥2 selected segments of a type without an N:1 merge // primitive) the index is rebuilt from scratch over `frag_bitmap`. let can_merge_segments = !effective_old_frags.is_empty() + && !update_criteria.requires_old_data && (has_segment_merge_primitive || selected_old_indices.len() == 1); let created_index = if !can_merge_segments { @@ -235,7 +240,6 @@ async fn merge_scalar_indices<'a>( ) .await? } else { - let update_criteria = reference_index.update_criteria(); let new_data_stream = load_unindexed_training_data(dataset.as_ref(), field_path, &update_criteria, unindexed) .await?; @@ -766,7 +770,7 @@ mod tests { use lance_index::vector::sq::builder::SQBuildParams; use lance_index::{ IndexType, - scalar::ScalarIndexParams, + scalar::{BuiltinIndexType, ScalarIndexParams, SearchResult, TextQuery}, vector::{ivf::IvfBuildParams, pq::PQBuildParams}, }; use lance_linalg::distance::MetricType; @@ -1473,6 +1477,113 @@ mod tests { ); } + #[tokio::test] + async fn test_optimize_fmindex_default_rebuilds_old_and_new_rows() { + let test_dir = TempStrDir::default(); + let test_uri = test_dir.as_str(); + + let schema = Arc::new(Schema::new(vec![Field::new("text", DataType::Utf8, false)])); + let make_batch = |values: &[&str]| { + RecordBatch::try_new( + schema.clone(), + vec![Arc::new(StringArray::from_iter_values( + values.iter().copied(), + ))], + ) + .unwrap() + }; + + let reader = RecordBatchIterator::new( + vec![Ok(make_batch(&["old alpha needle", "old beta"]))], + schema.clone(), + ); + let mut dataset = Dataset::write( + reader, + test_uri, + Some(WriteParams { + enable_stable_row_ids: true, + max_rows_per_file: 2, + ..Default::default() + }), + ) + .await + .unwrap(); + + let params = ScalarIndexParams::for_builtin(BuiltinIndexType::Fm); + dataset + .create_index( + &["text"], + IndexType::Fm, + Some("text_fmindex".to_string()), + ¶ms, + true, + ) + .await + .unwrap(); + + let appended = RecordBatchIterator::new( + vec![Ok(make_batch(&["new gamma needle", "new delta"]))], + schema.clone(), + ); + dataset.append(appended, None).await.unwrap(); + + assert!( + !dataset + .unindexed_fragments("text_fmindex") + .await + .unwrap() + .is_empty() + ); + + dataset + .optimize_indices(&OptimizeOptions::default()) + .await + .unwrap(); + + let dataset = DatasetBuilder::from_uri(test_uri).load().await.unwrap(); + assert!( + dataset + .unindexed_fragments("text_fmindex") + .await + .unwrap() + .is_empty() + ); + + let committed = dataset.load_indices_by_name("text_fmindex").await.unwrap(); + assert_eq!(committed.len(), 1); + assert_eq!( + committed[0] + .fragment_bitmap + .as_ref() + .expect("FMIndex segment should carry fragment coverage") + .len(), + 2 + ); + + let logical = crate::index::scalar_logical::open_named_scalar_index( + &dataset, + "text", + "text_fmindex", + &NoOpMetricsCollector, + ) + .await + .unwrap(); + + for (pattern, expected) in [("old alpha", 1), ("new gamma", 1), ("needle", 2)] { + let query = TextQuery::StringContains(pattern.to_string()); + let result = logical.search(&query, &NoOpMetricsCollector).await.unwrap(); + let row_addrs = match result { + SearchResult::Exact(row_addrs) => row_addrs, + other => panic!("expected exact result for {pattern}, got {other:?}"), + }; + let count = row_addrs.true_rows().row_addrs().unwrap().count(); + assert_eq!( + count, expected, + "expected {expected} matches for {pattern}, got {count}" + ); + } + } + #[tokio::test] async fn test_optimize_btree_optimize_append() { async fn query_id_count(dataset: &Dataset, id: &str) -> usize { diff --git a/rust/lance/src/index/create.rs b/rust/lance/src/index/create.rs index ce8e65d8356..132e43b0ccd 100644 --- a/rust/lance/src/index/create.rs +++ b/rust/lance/src/index/create.rs @@ -222,7 +222,7 @@ impl<'a> CreateIndexBuilder<'a> { | IndexType::BTree | IndexType::Inverted | IndexType::NGram - | IndexType::FMIndex + | IndexType::Fm | IndexType::ZoneMap | IndexType::BloomFilter | IndexType::LabelList @@ -495,6 +495,14 @@ impl<'a> CreateIndexBuilder<'a> { #[instrument(skip_all)] async fn execute(mut self) -> Result { + // Multi-segment FM-Index path: when num_segments > 1, build one segment + // per fragment group and commit them all atomically. + if let Some(num_segments) = self.fmindex_num_segments() + && num_segments > 1 + { + return self.execute_multi_segment_fmindex(num_segments).await; + } + let new_idx = self.execute_uncommitted().await?; let index_uuid = new_idx.uuid; let removed_indices = if self.replace { @@ -560,6 +568,218 @@ impl<'a> CreateIndexBuilder<'a> { )) }) } + /// Extract `num_segments` from FM-Index params if this is an FM-Index build. + fn fmindex_num_segments(&self) -> Option { + if self.index_type != IndexType::Fm { + return None; + } + let scalar_params = self.params.as_any().downcast_ref::()?; + let params_json = scalar_params.params.as_deref()?; + let json: serde_json::Value = serde_json::from_str(params_json).ok()?; + json.get("num_segments")?.as_u64().map(|n| n as u32) + } + + /// Build FM-Index with multiple segments, each covering a subset of fragments. + async fn execute_multi_segment_fmindex(&mut self, num_segments: u32) -> Result { + // Validate column count: same check as execute_uncommitted + if self.columns.len() != 1 { + return Err(Error::index( + "Only support building index on 1 column at the moment".to_string(), + )); + } + + let column_input = &self.columns[0]; + let Some(field_path) = self.dataset.schema().resolve_case_insensitive(column_input) else { + return Err(Error::index(format!( + "CreateIndex: column '{column_input}' does not exist" + ))); + }; + let field = *field_path.last().unwrap(); + let names: Vec<&str> = field_path.iter().map(|f| f.name.as_str()).collect(); + let column = format_field_path(&names); + + let train = if self.train { + self.dataset.count_rows(None).await? > 0 + } else { + false + }; + + let indices = self.dataset.load_indices().await?; + let index_name = if let Some(name) = self.name.take() { + name + } else { + let column_path = default_index_name(&names); + let base_name = format!("{column_path}_idx"); + let mut candidate = base_name.clone(); + let mut counter = 2; + while indices + .iter() + .any(|idx| idx.name == candidate && idx.fields != [field.id]) + { + candidate = format!("{base_name}_{counter}"); + counter += 1; + } + candidate + }; + let existing_named_indices = indices + .iter() + .filter(|idx| idx.name == index_name) + .collect::>(); + if existing_named_indices + .iter() + .any(|idx| idx.fields != [field.id]) + { + return Err(Error::index(format!( + "Index name '{index_name}' already exists with different fields, \ + please specify a different name" + ))); + } + if !existing_named_indices.is_empty() && !self.replace { + return Err(Error::index(format!( + "Index name '{index_name}' already exists, \ + please specify a different name or use replace=True" + ))); + } + + let all_fragment_ids: Vec = self.dataset.fragment_bitmap.as_ref().iter().collect(); + if !train || all_fragment_ids.is_empty() { + let segment_uuid = Uuid::new_v4(); + let created_index = build_scalar_index( + self.dataset, + &column, + &segment_uuid.to_string(), + &ScalarIndexParams::for_builtin(lance_index::scalar::BuiltinIndexType::Fm), + false, + None, + None, + self.progress.clone(), + ) + .await?; + let metadata = IndexMetadata { + uuid: segment_uuid, + name: index_name.clone(), + fields: vec![field.id], + dataset_version: self.dataset.manifest.version, + fragment_bitmap: Some(roaring::RoaringBitmap::new()), + index_details: Some(Arc::new(created_index.index_details)), + index_version: created_index.index_version as i32, + created_at: Some(chrono::Utc::now()), + base_id: None, + files: created_index.files, + }; + let segments = vec![metadata.into_index_segment()?]; + let new_indices = + build_index_metadata_from_segments(self.dataset, &index_name, field.id, segments) + .await?; + + // Collect all same-name indices for removal when replace is set + let removed_indices = if self.replace { + existing_named_indices + .into_iter() + .cloned() + .collect::>() + } else { + vec![] + }; + + let transaction = TransactionBuilder::new( + self.dataset.manifest.version, + Operation::CreateIndex { + new_indices, + removed_indices, + }, + ) + .transaction_properties(self.transaction_properties.clone()) + .build(); + + self.dataset + .apply_commit(transaction, &Default::default(), &Default::default()) + .await?; + + let indices = self.dataset.load_indices_by_name(&index_name).await?; + return indices.into_iter().next().ok_or_else(|| { + Error::internal(format!( + "FM-Index segments for '{}' not found after commit", + index_name + )) + }); + } + + let num_segments = (num_segments as usize).min(all_fragment_ids.len()).max(1); + let chunk_size = all_fragment_ids.len().div_ceil(num_segments); + + let mut segment_metadatas = Vec::with_capacity(num_segments); + for chunk in all_fragment_ids.chunks(chunk_size) { + let fragment_ids = chunk.to_vec(); + let segment_uuid = Uuid::new_v4(); + let created_index = build_scalar_index( + self.dataset, + &column, + &segment_uuid.to_string(), + &ScalarIndexParams::for_builtin(lance_index::scalar::BuiltinIndexType::Fm), + true, + Some(fragment_ids.clone()), + None, + self.progress.clone(), + ) + .await?; + + segment_metadatas.push(IndexMetadata { + uuid: segment_uuid, + name: index_name.clone(), + fields: vec![field.id], + dataset_version: self.dataset.manifest.version, + fragment_bitmap: Some(fragment_ids.into_iter().collect()), + index_details: Some(Arc::new(created_index.index_details)), + index_version: created_index.index_version as i32, + created_at: Some(chrono::Utc::now()), + base_id: None, + files: created_index.files, + }); + } + + // Convert to IndexSegments and build proper transaction metadata + let segments = segment_metadatas + .into_iter() + .map(IntoIndexSegment::into_index_segment) + .collect::>>()?; + let new_indices = + build_index_metadata_from_segments(self.dataset, &index_name, field.id, segments) + .await?; + + // Collect all same-name indices for removal when replace is set, + // matching the standard execute() path behavior. + let removed_indices = if self.replace { + existing_named_indices + .into_iter() + .cloned() + .collect::>() + } else { + vec![] + }; + + let transaction = TransactionBuilder::new( + self.dataset.manifest.version, + Operation::CreateIndex { + new_indices, + removed_indices, + }, + ) + .transaction_properties(self.transaction_properties.clone()) + .build(); + + self.dataset + .apply_commit(transaction, &Default::default(), &Default::default()) + .await?; + + let indices = self.dataset.load_indices_by_name(&index_name).await?; + indices.into_iter().next().ok_or_else(|| { + Error::internal(format!( + "FM-Index segments for '{}' not found after commit", + index_name + )) + }) + } } fn is_btree_scalar_params(params: &dyn IndexParams) -> bool { diff --git a/rust/lance/src/index/scalar.rs b/rust/lance/src/index/scalar.rs index 92b06f0a1a5..c5057c3690a 100644 --- a/rust/lance/src/index/scalar.rs +++ b/rust/lance/src/index/scalar.rs @@ -6,6 +6,7 @@ pub(crate) mod bitmap; pub(crate) mod btree; +pub(crate) mod fmindex; pub(crate) mod inverted; pub use inverted::{load_segment_details, load_segments}; diff --git a/rust/lance/src/index/scalar/fmindex.rs b/rust/lance/src/index/scalar/fmindex.rs new file mode 100644 index 00000000000..36158442a76 --- /dev/null +++ b/rust/lance/src/index/scalar/fmindex.rs @@ -0,0 +1,107 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use lance_table::format::IndexMetadata; +use roaring::RoaringBitmap; +use std::sync::Arc; +use uuid::Uuid; + +use crate::{Dataset, Error, Result}; + +/// Merge one caller-defined group of source FM-Index segments into a single segment. +/// +/// FM-Index merge requires rebuilding from source text — there is no cheap way +/// to combine two BWT structures. This function re-reads text data from the +/// dataset for all fragments covered by the source segments and builds a fresh +/// FM-Index over the combined data. +pub(in crate::index) async fn merge_segments( + dataset: &Dataset, + segments: Vec, +) -> Result { + if segments.is_empty() { + return Err(Error::index("No segment metadata was provided".to_string())); + } + + let field_id = *segments[0].fields.first().ok_or_else(|| { + Error::invalid_input(format!( + "CreateIndex: segment {} is missing field ids", + segments[0].uuid + )) + })?; + let column = dataset.schema().field_path(field_id)?; + + let mut fragment_bitmap = RoaringBitmap::new(); + for segment in &segments { + fragment_bitmap |= segment.fragment_bitmap.as_ref().cloned().ok_or_else(|| { + Error::invalid_input(format!( + "CreateIndex: segment {} is missing fragment coverage", + segment.uuid + )) + })?; + } + + // Intersect with the dataset's current live fragments to drop retired/compacted + // fragments, mirroring the btree merge behavior. + fragment_bitmap &= dataset.fragment_bitmap.as_ref(); + + if fragment_bitmap.is_empty() { + // All covered fragments have been retired; produce an empty index. + let new_uuid = Uuid::new_v4(); + let created_index = super::build_scalar_index( + dataset, + &column, + &new_uuid.to_string(), + &lance_index::scalar::ScalarIndexParams::for_builtin( + lance_index::scalar::BuiltinIndexType::Fm, + ), + false, + None, + None, + Arc::new(lance_index::progress::NoopIndexBuildProgress), + ) + .await?; + + return Ok(IndexMetadata { + uuid: new_uuid, + fields: vec![field_id], + dataset_version: dataset.manifest.version, + fragment_bitmap: Some(fragment_bitmap), + index_details: Some(Arc::new(created_index.index_details)), + index_version: created_index.index_version as i32, + created_at: Some(chrono::Utc::now()), + base_id: None, + files: created_index.files, + ..segments[0].clone() + }); + } + + let fragment_ids: Vec = fragment_bitmap.iter().collect(); + let new_uuid = Uuid::new_v4(); + + let created_index = super::build_scalar_index( + dataset, + &column, + &new_uuid.to_string(), + &lance_index::scalar::ScalarIndexParams::for_builtin( + lance_index::scalar::BuiltinIndexType::Fm, + ), + true, + Some(fragment_ids), + None, + Arc::new(lance_index::progress::NoopIndexBuildProgress), + ) + .await?; + + Ok(IndexMetadata { + uuid: new_uuid, + fields: vec![field_id], + dataset_version: dataset.manifest.version, + fragment_bitmap: Some(fragment_bitmap), + index_details: Some(Arc::new(created_index.index_details)), + index_version: created_index.index_version as i32, + created_at: Some(chrono::Utc::now()), + base_id: None, + files: created_index.files, + ..segments[0].clone() + }) +} diff --git a/rust/lance/src/index/scalar_logical.rs b/rust/lance/src/index/scalar_logical.rs index d2ab4e4b9f0..04fe6654b5a 100644 --- a/rust/lance/src/index/scalar_logical.rs +++ b/rust/lance/src/index/scalar_logical.rs @@ -738,4 +738,368 @@ mod tests { dataset.fragment_bitmap.as_ref().clone() ); } + + #[tokio::test] + async fn test_fmindex_segments_commit_and_query_as_logical_index() { + let test_dir = TempStrDir::default(); + + let schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new( + "text", + arrow_schema::DataType::Utf8, + false, + )])); + let write_params = crate::dataset::write::WriteParams { + max_rows_per_file: 4, + ..Default::default() + }; + let batches = vec![ + arrow_array::RecordBatch::try_new( + schema.clone(), + vec![Arc::new(arrow_array::StringArray::from(vec![ + "the quick brown fox", + "jumps over the lazy dog", + "hello world from rust", + "pack my box with five dozen liquor jugs", + "how vexingly quick daft zebras jump", + "the five boxing wizards jump quickly", + "sphinx of black quartz judge my vow", + "two driven jocks help fax my big quiz", + "waltz bad nymph for quick jigs vex", + "glib jocks quiz nymph to vex dwarf", + "quick brown fox jumps again here", + "lazy dog sleeps under the tree", + ]))], + ) + .unwrap(), + ]; + let reader = + arrow_array::RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone()); + let mut dataset = Dataset::write(reader, test_dir.as_str(), Some(write_params)) + .await + .unwrap(); + + let fragments = dataset.get_fragments(); + assert_eq!(fragments.len(), 3); + + let params = ScalarIndexParams::for_builtin(BuiltinIndexType::Fm); + let mut segments = Vec::new(); + for fragment in &fragments { + let segment = CreateIndexBuilder::new(&mut dataset, &["text"], IndexType::Fm, ¶ms) + .name("text_fmindex".to_string()) + .fragments(vec![fragment.id() as u32]) + .execute_uncommitted() + .await + .unwrap(); + + assert_eq!( + segment + .fragment_bitmap + .as_ref() + .unwrap() + .iter() + .collect::>(), + vec![fragment.id() as u32] + ); + segments.push(segment); + } + + dataset + .commit_existing_index_segments("text_fmindex", "text", segments) + .await + .unwrap(); + + let committed = dataset.load_indices_by_name("text_fmindex").await.unwrap(); + assert_eq!(committed.len(), fragments.len()); + + let logical = + open_named_scalar_index(&dataset, "text", "text_fmindex", &NoOpMetricsCollector) + .await + .unwrap(); + assert_eq!(logical.index_type(), IndexType::Fm); + + let query = lance_index::scalar::TextQuery::StringContains("quick".to_string()); + let result = logical.search(&query, &NoOpMetricsCollector).await.unwrap(); + let row_addrs = match result { + SearchResult::Exact(row_addrs) => row_addrs, + other => panic!( + "expected exact result from segmented fmindex, got {:?}", + other + ), + }; + let match_count = row_addrs.true_rows().row_addrs().unwrap().count(); + assert_eq!( + match_count, 5, + "expected exactly 5 matches for 'quick', got {match_count}" + ); + + // Verify fragment coverage via manifest metadata (not calculate_included_frags, + // which derives from row addresses and may not encode fragment IDs for all layouts) + assert_eq!( + scalar_index_fragment_bitmap(&dataset, "text", "text_fmindex") + .await + .unwrap() + .unwrap(), + dataset.fragment_bitmap.as_ref().clone() + ); + } + + #[tokio::test] + async fn test_fmindex_segments_merge_and_query() { + let test_dir = TempStrDir::default(); + + let schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new( + "text", + arrow_schema::DataType::Utf8, + false, + )])); + let write_params = crate::dataset::write::WriteParams { + max_rows_per_file: 4, + ..Default::default() + }; + let batches = vec![ + arrow_array::RecordBatch::try_new( + schema.clone(), + vec![Arc::new(arrow_array::StringArray::from(vec![ + "alpha beta gamma delta", + "beta gamma delta epsilon", + "gamma delta epsilon zeta", + "delta epsilon zeta eta", + "epsilon zeta eta theta", + "zeta eta theta iota", + "eta theta iota kappa", + "theta iota kappa lambda", + ]))], + ) + .unwrap(), + ]; + let reader = + arrow_array::RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone()); + let mut dataset = Dataset::write(reader, test_dir.as_str(), Some(write_params)) + .await + .unwrap(); + + let fragments = dataset.get_fragments(); + assert_eq!(fragments.len(), 2); + + let params = ScalarIndexParams::for_builtin(BuiltinIndexType::Fm); + let mut staged = Vec::new(); + for fragment in &fragments { + let segment = CreateIndexBuilder::new(&mut dataset, &["text"], IndexType::Fm, ¶ms) + .name("text_fmindex_merge".to_string()) + .fragments(vec![fragment.id() as u32]) + .execute_uncommitted() + .await + .unwrap(); + staged.push(segment); + } + assert_eq!(staged.len(), 2); + + let staged_uuids = staged.iter().map(|s| s.uuid).collect::>(); + let merged = dataset.merge_existing_index_segments(staged).await.unwrap(); + + assert!(!staged_uuids.contains(&merged.uuid)); + assert_eq!( + merged + .fragment_bitmap + .as_ref() + .unwrap() + .iter() + .collect::>(), + fragments.iter().map(|f| f.id() as u32).collect::>() + ); + + dataset + .commit_existing_index_segments("text_fmindex_merge", "text", vec![merged]) + .await + .unwrap(); + + let committed = dataset + .load_indices_by_name("text_fmindex_merge") + .await + .unwrap(); + assert_eq!(committed.len(), 1); + + let logical = open_named_scalar_index( + &dataset, + "text", + "text_fmindex_merge", + &NoOpMetricsCollector, + ) + .await + .unwrap(); + assert_eq!(logical.index_type(), IndexType::Fm); + + let query = lance_index::scalar::TextQuery::StringContains("delta".to_string()); + let result = logical.search(&query, &NoOpMetricsCollector).await.unwrap(); + let row_addrs = match result { + SearchResult::Exact(row_addrs) => row_addrs, + other => panic!("expected exact result from merged fmindex, got {:?}", other), + }; + assert_eq!(row_addrs.true_rows().row_addrs().unwrap().count(), 4); + + let query = lance_index::scalar::TextQuery::StringContains("nonexistent".to_string()); + let result = logical.search(&query, &NoOpMetricsCollector).await.unwrap(); + let row_addrs = match result { + SearchResult::Exact(row_addrs) => row_addrs, + other => panic!("expected exact result from merged fmindex, got {:?}", other), + }; + assert_eq!(row_addrs.true_rows().row_addrs().unwrap().count(), 0); + } + + #[tokio::test] + async fn test_fmindex_merge_after_compaction_drops_retired_fragments() { + use crate::dataset::write::WriteParams; + + let test_dir = TempStrDir::default(); + + let schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new( + "text", + arrow_schema::DataType::Utf8, + false, + )])); + // Create two fragments with 4 rows each so compaction can retire one + let write_params = WriteParams { + max_rows_per_file: 4, + enable_stable_row_ids: true, + ..Default::default() + }; + let batches = vec![ + arrow_array::RecordBatch::try_new( + schema.clone(), + vec![Arc::new(arrow_array::StringArray::from(vec![ + "alpha beta gamma", + "beta gamma delta", + "gamma delta epsilon", + "delta epsilon zeta", + "epsilon zeta eta", + "zeta eta theta", + "eta theta iota", + "theta iota kappa", + ]))], + ) + .unwrap(), + ]; + let reader = + arrow_array::RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone()); + let mut dataset = Dataset::write(reader, test_dir.as_str(), Some(write_params)) + .await + .unwrap(); + + let fragments = dataset.get_fragments(); + assert_eq!(fragments.len(), 2); + + // Build per-fragment FM-Index segments and commit + let params = ScalarIndexParams::for_builtin(BuiltinIndexType::Fm); + let mut staged = Vec::new(); + for fragment in &fragments { + let segment = CreateIndexBuilder::new(&mut dataset, &["text"], IndexType::Fm, ¶ms) + .name("text_fmindex_compact".to_string()) + .fragments(vec![fragment.id() as u32]) + .execute_uncommitted() + .await + .unwrap(); + staged.push(segment); + } + dataset + .commit_existing_index_segments("text_fmindex_compact", "text", staged) + .await + .unwrap(); + + // Verify initial state: 2 segments, both fragments live + let committed = dataset + .load_indices_by_name("text_fmindex_compact") + .await + .unwrap(); + assert_eq!(committed.len(), 2); + + // Delete rows from fragment 0 to trigger compaction retirement + dataset.delete("text = 'alpha beta gamma'").await.unwrap(); + dataset.delete("text = 'beta gamma delta'").await.unwrap(); + crate::dataset::optimize::compact_files( + &mut dataset, + crate::dataset::optimize::CompactionOptions { + target_rows_per_fragment: 4, + ..Default::default() + }, + None, + ) + .await + .unwrap(); + + let live_frags: RoaringBitmap = dataset + .get_fragments() + .iter() + .map(|f| f.id() as u32) + .collect(); + assert!( + !live_frags.contains(0), + "compaction should retire fragment 0" + ); + + // Merge: the retired fragment should be dropped from coverage + let segments = dataset + .load_indices_by_name("text_fmindex_compact") + .await + .unwrap(); + let merged = dataset + .merge_existing_index_segments(segments) + .await + .unwrap(); + + let coverage = merged.fragment_bitmap.as_ref().unwrap(); + assert!( + !coverage.contains(0), + "merged coverage must drop retired fragment 0" + ); + assert!( + coverage.contains(1), + "merged coverage must keep live fragment 1" + ); + + // Commit the merged segment and verify search works + dataset + .commit_existing_index_segments("text_fmindex_compact", "text", vec![merged]) + .await + .unwrap(); + + let committed = dataset + .load_indices_by_name("text_fmindex_compact") + .await + .unwrap(); + assert_eq!(committed.len(), 1); + + let logical = open_named_scalar_index( + &dataset, + "text", + "text_fmindex_compact", + &NoOpMetricsCollector, + ) + .await + .unwrap(); + + // "alpha" only existed in the deleted/retired rows + let query = lance_index::scalar::TextQuery::StringContains("alpha".to_string()); + let result = logical.search(&query, &NoOpMetricsCollector).await.unwrap(); + let row_addrs = match result { + SearchResult::Exact(row_addrs) => row_addrs, + other => panic!("expected exact result from merged fmindex, got {:?}", other), + }; + assert_eq!( + row_addrs.true_rows().row_addrs().unwrap().count(), + 0, + "deleted rows from retired fragment should not appear in merged index" + ); + + // "theta" exists in fragment 1 rows only + let query = lance_index::scalar::TextQuery::StringContains("theta".to_string()); + let result = logical.search(&query, &NoOpMetricsCollector).await.unwrap(); + let row_addrs = match result { + SearchResult::Exact(row_addrs) => row_addrs, + other => panic!("expected exact result from merged fmindex, got {:?}", other), + }; + assert!( + row_addrs.true_rows().row_addrs().unwrap().count() > 0, + "rows from live fragment should still be searchable" + ); + } }