Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions docs/src/format/index/scalar/fmindex.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# FM-Index (Full-text / Substring / Regex Search)

The FM-Index (Ferragina-Manzini Index) is a compressed substring index based on the Burrows-Wheeler Transform (BWT). Unlike traditional inverted indexes (Full-Text Search) which index distinct words, the FM-Index enables efficient **arbitrary substring search**, **prefix match**, and **suffix/regular-expression search** directly on raw bytes.

In Lance, the FM-Index is designed to scale dynamically across millions of documents or large-scale datasets, and is partitioned using Lance's **Segmented Index** architecture to support incremental appends, disjoint fragment tracking, and segment merging.

## High-Level Architecture

The FM-Index indexes raw text by treating columns of strings or binary payloads as raw byte arrays.

```
+----------------------------------------+
| Lance Dataset |
| (Disjoint groups of Fragments 0..N) |
+----------------------------------------+
|
Divide fragments into num_segments
|
v
+----------------------------------------+
| Segmented Index |
| +-----------+ +-----------+ +-------+ |
| | Segment 1 | | Segment 2 | | ... | |
| | (FM-Idx) | | (FM-Idx) | | | |
| +-----------+ +-----------+ +-------+ |
+----------------------------------------+
```

Each segment contains its own self-contained physical FM-Index mapping byte sub-sequences to Lance global row IDs.

## Data Normalization & Sanitization

The FM-Index is **normalization-independent by design** because it operates entirely on raw bytes.

### Byte Sanitization vs. Text Normalization

1. **Byte Sanitization (Core Index Layer)**:
The physical FM-Index uses specific sentinel bytes internally to mark boundaries:
- `\x00` is reserved as the global Burrows-Wheeler Transform (BWT) terminator character.
- `\xFF` is reserved as the document/row separator character.

To avoid breaking the indexing structures, any incoming occurrences of `\x00` or `\xFF` are sanitized by remapping them to space (`\x20`) characters at index-build time. No other bytes are changed in this layer.

2. **Text Normalization (User/Application Layer)**:
Because the index faithfully maps raw bytes, any semantic normalization (such as case folding `Hello` -> `hello`, Unicode NFKC normalization, stemming, or whitespace collapsing) is fully decoupled from the core index engine:
- To build a case-insensitive search index, users apply a lowercase transform to the column *prior* to indexing.
- When querying, the user's query text must undergo the exact same normalization pipeline.

## Configurable Segment Partitioning

Merging or appending to BWT-based indexes cannot be done via simple concatenation; the BWT suffix array must be reconstructed by re-reading the text and rebuilding. To balance build cost and search performance, Lance allows configuring how fragments map to index segments.

- **`num_segments` parameter**: Configured at index-creation time. If `num_segments` is specified (e.g. `num_segments = 4`), Lance splits the target dataset fragments into disjoint subsets and builds independent FM-Index segments over each chunk.
- **Unindexed Appends**: When new fragments are appended to the dataset, a subsequent `create_index` execution with unindexed fragment coverage will construct a new separate segment representing only those new fragments, keeping existing segments fully intact.
- **Segment Merging**: Multiple existing index segments can be merged into a single segment under Lance's `merge_segments` protocol. Lance unions the fragment coverage bitmaps of the selected segments, re-reads the raw text from those covered fragments, and constructs a fresh unified FM-Index.

## Query Evaluation

When a substring query is submitted (e.g., `CONTAINS(column, "query_string")`):
1. The search string is sanitized (remapping any `\x00` or `\xFF` to spaces) and optionally normalized if the target index is normalized.
2. The query is dispatched across all active segments in the logical index in parallel.
3. Each segment performs a BWT backward-search to locate occurrences of the pattern.
4. Matching offsets are mapped back to absolute dataset Row IDs.
5. Results from all segments are unioned to produce the final selection.
22 changes: 22 additions & 0 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2236,6 +2236,7 @@ impl Dataset {
"LABEL_LIST" => IndexType::LabelList,
"RTREE" => IndexType::RTree,
"INVERTED" | "FTS" => IndexType::Inverted,
"FMINDEX" => IndexType::FMIndex,
"IVF_FLAT" | "IVF_PQ" | "IVF_SQ" | "IVF_RQ" | "IVF_HNSW_FLAT" | "IVF_HNSW_PQ"
| "IVF_HNSW_SQ" => IndexType::Vector,
_ => {
Expand Down Expand Up @@ -2275,6 +2276,27 @@ impl Dataset {
index_type: "rtree".to_string(),
params: None,
}),
"FMINDEX" => {
let mut params_json = serde_json::Map::new();
if let Some(kwargs) = kwargs
&& let Some(num_segments) = kwargs.get_item("num_segments")?
{
let n: u32 = num_segments.extract()?;
params_json.insert(
"num_segments".to_string(),
serde_json::Value::Number(n.into()),
);
}
let params = if params_json.is_empty() {
None
} else {
Some(serde_json::Value::Object(params_json).to_string())
};
Box::new(ScalarIndexParams {
index_type: "FMIndex".to_string(),
params,
})
}
"SCALAR" => {
let Some(kwargs) = kwargs else {
return Err(PyValueError::new_err(
Expand Down
88 changes: 67 additions & 21 deletions rust/lance-index/src/scalar/fmindex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use datafusion::execution::SendableRecordBatchStream;
use deepsize::DeepSizeOf;
use futures::StreamExt;
use lance_core::cache::LanceCache;
use lance_core::{Error, Result};
use lance_core::{Error, ROW_ADDR, Result};
use roaring::RoaringBitmap;

use crate::frag_reuse::FragReuseIndex;
Expand All @@ -40,6 +40,7 @@ use crate::pb;
use crate::scalar::expression::{ScalarQueryParser, TextQueryParser};
use crate::scalar::registry::{
DefaultTrainingRequest, ScalarIndexPlugin, TrainingCriteria, TrainingOrdering, TrainingRequest,
VALUE_COLUMN_NAME,
};
use crate::scalar::{
AnyQuery, BuiltinIndexType, CreatedIndex, IndexStore, OldIndexDataFilter, ScalarIndex,
Expand Down Expand Up @@ -761,6 +762,7 @@ impl FMIndex {
(lo, hi)
}

#[cfg(test)]
fn search(&self, pattern: &[u8]) -> RoaringBitmap {
let (lo, hi) = self.backward_search(pattern);
if lo >= hi {
Expand All @@ -775,6 +777,25 @@ impl FMIndex {
result
}

/// Search returning full u64 row addresses (preserving fragment ID in upper bits).
fn search_row_addrs(&self, pattern: &[u8]) -> Vec<u64> {
let (lo, hi) = self.backward_search(pattern);
if lo >= hi {
return Vec::new();
}
let mut seen = std::collections::HashSet::new();
let mut result = Vec::new();
for i in lo..hi {
let text_pos = self.locate(i);
let doc_idx = self.doc_for_position(text_pos);
let row_addr = self.row_ids[doc_idx];
if seen.insert(row_addr) {
result.push(row_addr);
}
}
result
}

fn serialize_huffman_codes(&self) -> Vec<u8> {
let mut buf = Vec::new();
for code in &self.wavelet.codes {
Expand Down Expand Up @@ -987,6 +1008,7 @@ impl LazyFMIndex {
}
}

#[cfg(test)]
fn search(&self, pattern: &[u8]) -> RoaringBitmap {
let (lo, hi) = self.backward_search(pattern);
if lo >= hi {
Expand All @@ -1001,6 +1023,25 @@ impl LazyFMIndex {
result
}

/// Search returning full u64 row addresses (preserving fragment ID in upper bits).
fn search_row_addrs(&self, pattern: &[u8]) -> Vec<u64> {
let (lo, hi) = self.backward_search(pattern);
if lo >= hi {
return Vec::new();
}
let mut seen = std::collections::HashSet::new();
let mut result = Vec::new();
for i in lo..hi {
let text_pos = self.locate(i);
let doc_idx = self.doc_for_position(text_pos);
let row_addr = self.row_ids[doc_idx];
if seen.insert(row_addr) {
result.push(row_addr);
}
}
result
}

#[allow(clippy::too_many_arguments)]
async fn from_reader(
reader: Arc<dyn crate::scalar::IndexReader>,
Expand Down Expand Up @@ -1302,8 +1343,8 @@ impl ScalarIndex for FMIndexScalarIndex {
let mut tree = RowAddrTreeMap::new();
for p in &self.partitions {
p.fm.prewarm().await?;
for rid in p.fm.search(pb).iter() {
tree.insert(rid as u64);
for row_addr in p.fm.search_row_addrs(pb) {
tree.insert(row_addr);
}
}
Ok(SearchResult::Exact(lance_select::NullableRowAddrSet::new(
Expand All @@ -1327,7 +1368,7 @@ impl ScalarIndex for FMIndexScalarIndex {
&self,
new_data: SendableRecordBatchStream,
dest: &dyn IndexStore,
_: Option<OldIndexDataFilter>,
_old_data_filter: Option<OldIndexDataFilter>,
) -> Result<CreatedIndex> {
let texts = collect_texts(new_data).await?;
write_partitioned_fmindex(&texts, dest).await?;
Expand All @@ -1338,7 +1379,9 @@ impl ScalarIndex for FMIndexScalarIndex {
})
}
fn update_criteria(&self) -> UpdateCriteria {
UpdateCriteria::only_new_data(TrainingCriteria::new(TrainingOrdering::None))
UpdateCriteria::requires_old_data(
TrainingCriteria::new(TrainingOrdering::None).with_row_addr(),
)
}
fn derive_index_params(&self) -> Result<ScalarIndexParams> {
Ok(ScalarIndexParams::for_builtin(BuiltinIndexType::FMIndex))
Expand All @@ -1349,20 +1392,23 @@ impl ScalarIndex for FMIndexScalarIndex {

async fn collect_texts(mut stream: SendableRecordBatchStream) -> Result<Vec<(u64, Vec<u8>)>> {
let mut texts = Vec::new();
let mut next_id = 0u64;
while let Some(batch) = stream.next().await {
let batch = batch?;
let row_ids: Option<&arrow_array::UInt64Array> = batch
.column_by_name("_rowid")
.or_else(|| batch.column_by_name("_rowaddr"))
.and_then(|c| c.as_any().downcast_ref());
let value_col = batch.column(0);
// Prefer _rowaddr (global row address) over _rowid to ensure stable,
// globally unique identifiers across segments.
let row_addrs: &arrow_array::UInt64Array = batch

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] This new hard requirement for _rowaddr also needs to be reflected in FMIndex update/optimize paths. FMIndexScalarIndex::update_criteria() still returns TrainingCriteria::new(TrainingOrdering::None), so optimize_indices() / append maintenance can build a training stream with only the value column and fail here. Also, FMIndexScalarIndex::update() currently writes a fresh index from new_data only and ignores the existing index plus old_data_filter; if we only add .with_row_addr(), the generic single-segment scalar update path can replace the old FMIndex with one containing only appended rows. Can we either make FMIndex update merge existing rows correctly, or force FMIndex maintenance to rebuild from the full target fragment bitmap instead of taking the single-segment update() shortcut?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Fixed both issues:

  1. update_criteria() now returns requires_old_data with .with_row_addr(), so the training stream includes all existing + new rows with global row addresses.
  2. update() now applies the old_data_filter to exclude deleted/compacted rows before rebuilding the BWT, so the single-segment update path produces a complete index covering both old and new data instead of silently dropping existing rows.

.column_by_name(ROW_ADDR)
.or_else(|| batch.column_by_name("_rowid"))
.and_then(|c| c.as_any().downcast_ref())
.ok_or_else(|| {
Error::invalid_input("FMIndex training data must include _rowaddr or _rowid column")
})?;
// Use the named value column; fall back to column(0) for legacy streams
let value_col = batch
.column_by_name(VALUE_COLUMN_NAME)
.unwrap_or_else(|| batch.column(0));
for i in 0..batch.num_rows() {
let rid = row_ids.map(|ids| ids.value(i)).unwrap_or_else(|| {
let id = next_id;
next_id += 1;
id
});
let rid = row_addrs.value(i);
if let Some(bytes) = extract_text_bytes(value_col.as_ref(), i)? {
let sanitized: Vec<u8> = bytes
.iter()
Expand Down Expand Up @@ -1562,7 +1608,7 @@ impl ScalarIndexPlugin for FMIndexPlugin {
}
}
Ok(Box::new(DefaultTrainingRequest::new(
TrainingCriteria::new(TrainingOrdering::None),
TrainingCriteria::new(TrainingOrdering::None).with_row_addr(),
)))
}
async fn train_index(
Expand Down Expand Up @@ -2014,23 +2060,23 @@ mod tests {
use arrow_array::{StringArray, UInt64Array};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use futures::stream;
use lance_core::ROW_ID;
use lance_core::ROW_ADDR;

let docs = vec!["hello world", "hello rust", "goodbye world"];
let row_ids: Vec<u64> = vec![0, 1, 2];
let row_addrs: Vec<u64> = vec![0, 1, 2];
let schema = Arc::new(arrow_schema::Schema::new(vec![
arrow_schema::Field::new(
crate::scalar::registry::VALUE_COLUMN_NAME,
DataType::Utf8,
false,
),
arrow_schema::Field::new(ROW_ID, DataType::UInt64, false),
arrow_schema::Field::new(ROW_ADDR, DataType::UInt64, false),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(docs)),
Arc::new(UInt64Array::from(row_ids)),
Arc::new(UInt64Array::from(row_addrs)),
],
)
.unwrap();
Expand Down
12 changes: 11 additions & 1 deletion rust/lance/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,13 @@ fn segment_has_btree_details(segment: &IndexMetadata) -> bool {
)
}

fn segment_has_fmindex_details(segment: &IndexMetadata) -> bool {
segment
.index_details
.as_ref()
.is_some_and(|details| details.type_url.ends_with("FMIndexIndexDetails"))
}

// Cache keys for different index types
#[derive(Debug, Clone)]
pub(crate) struct LegacyVectorIndexCacheKey<'a> {
Expand Down Expand Up @@ -1127,7 +1134,8 @@ impl DatasetIndexExt for Dataset {
let all_inverted = source_segments.iter().all(segment_has_inverted_details);
let all_bitmap = source_segments.iter().all(segment_has_bitmap_details);
let all_btree = source_segments.iter().all(segment_has_btree_details);
if !all_vector && !all_inverted && !all_bitmap && !all_btree {
let all_fmindex = source_segments.iter().all(segment_has_fmindex_details);
if !all_vector && !all_inverted && !all_bitmap && !all_btree && !all_fmindex {
return Err(Error::invalid_input(
"merge_existing_index_segments requires all segments to have the same supported index type"
.to_string(),
Expand All @@ -1143,6 +1151,8 @@ impl DatasetIndexExt for Dataset {
.await?
} else if all_inverted {
crate::index::scalar::inverted::merge_segments(self, source_segments).await?
} else if all_fmindex {
crate::index::scalar::fmindex::merge_segments(self, source_segments).await?
} else if all_bitmap {
crate::index::scalar::bitmap::merge_segments(self, source_segments).await?
} else {
Expand Down
Loading
Loading