Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
93aae22
test: add failing repros for active-memtable predicate-crossing stale…
hamersaw Jun 2, 2026
9150c07
fix(mem_wal): dedup predicate-crossing stale reads in active-memtable…
hamersaw Jun 3, 2026
b065eb6
refactor(mem_wal): route point-lookup + block-list through the PK-pos…
hamersaw Jun 3, 2026
64d97e2
refactor(mem_wal): probe the PK-position index in contains_pks; delet…
hamersaw Jun 3, 2026
648cf60
test(mem_wal): cover the snapshot-bounded vanished-row guards
hamersaw Jun 3, 2026
14428ff
refactor(mem_wal): reuse PK BTree indexes for dedup; drop PkPositionI…
hamersaw Jun 3, 2026
d48bebd
refactor(mem_wal): fold PkLookup into IndexStore; tighten comments
hamersaw Jun 3, 2026
bc11566
docs(mem_wal): fix broken intra-doc link to pk_newest_visible
hamersaw Jun 3, 2026
7e83d81
refactor(mem_wal): composite-key PK BTree + standalone on-disk dedup …
hamersaw Jun 4, 2026
19257fb
Merge remote-tracking branch 'upstream/main' into bug/dedup-active-me…
hamersaw Jun 4, 2026
afed6e1
refactor(mem_wal): route flushed PK index through the session index c…
hamersaw Jun 4, 2026
3147ae3
refactor(mem_wal): composite PK index is now a BTreeMemIndex, drop Pk…
hamersaw Jun 4, 2026
edb899e
refactor(mem_wal): collapse GenMembership::contains to a single key
hamersaw Jun 4, 2026
3f420cb
test(mem_wal): fix PK-position test setup and add max_visible_row test
hamersaw Jun 10, 2026
1e9216d
perf(mem_wal): batch the block-list PK existence probe
hamersaw Jun 10, 2026
f6eca16
Merge remote-tracking branch 'origin/main' into bug/dedup-active-memt…
hamersaw Jun 10, 2026
3366c09
docs(lance-index): avoid public→private intra-doc link in contains_keys
hamersaw Jun 10, 2026
2dbb69e
Merge upstream/main into bug/dedup-active-memtable
hamersaw Jun 10, 2026
73ec180
fix(mem_wal): build the PK dedup index whenever a primary key exists
hamersaw Jun 10, 2026
747e98e
test(mem_wal): stage PK sidecar when writing flushed gens in Python t…
hamersaw Jun 10, 2026
30dce61
fix(mem_wal): write the PK sidecar on the plain-flush path
hamersaw Jun 10, 2026
ecb0b86
Merge remote-tracking branch 'upstream/main' into bug/dedup-active-me…
hamersaw Jun 10, 2026
18991a1
build: pin lance-namespace-reqwest-client to 0.8.2
hamersaw Jun 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 187 additions & 5 deletions rust/lance/src/dataset/mem_wal/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ mod fts;
mod hnsw;

use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};

use datafusion::common::ScalarValue;

use super::memtable::batch_store::StoredBatch;
use arrow_array::RecordBatch;
use lance_core::datatypes::Schema as LanceSchema;
Expand Down Expand Up @@ -195,12 +198,18 @@ impl MemIndexConfig {
/// therefore safe for scanners to read. Scanners snapshot this at plan
/// construction time so every plan keys on a stable MVCC cursor.
pub struct IndexStore {
/// BTree indexes keyed by index name.
btree_indexes: HashMap<String, BTreeMemIndex>,
/// BTree indexes keyed by index name. `Arc` so the primary-key BTrees can be
/// shared into [`Self::pk_btrees`] without a second copy or a second insert.
btree_indexes: HashMap<String, Arc<BTreeMemIndex>>,
/// HNSW vector indexes keyed by index name.
hnsw_indexes: HashMap<String, HnswMemIndex>,
/// FTS indexes keyed by index name.
fts_indexes: HashMap<String, FtsMemIndex>,
/// The primary-key BTrees, one per PK column in order. Each aliases a
/// `btree_indexes` entry (reused or auto-created), so the insert loop
/// maintains it. Empty without a primary key; queried via
/// [`Self::pk_newest_visible`] (see [`Self::enable_pk_index`]).
pk_btrees: Vec<Arc<BTreeMemIndex>>,
/// Maximum batch position that is durable in the WAL and therefore
/// visible to scanners. Advanced unconditionally after a WAL append
/// succeeds; not gated on whether any indexes are configured.
Expand All @@ -213,6 +222,7 @@ impl Default for IndexStore {
btree_indexes: HashMap::new(),
hnsw_indexes: HashMap::new(),
fts_indexes: HashMap::new(),
pk_btrees: Vec::new(),
max_visible_batch_position: AtomicUsize::new(0),
}
}
Expand All @@ -230,6 +240,14 @@ impl std::fmt::Debug for IndexStore {
&self.hnsw_indexes.keys().collect::<Vec<_>>(),
)
.field("fts_indexes", &self.fts_indexes.keys().collect::<Vec<_>>())
.field(
"pk_btrees",
&self
.pk_btrees
.iter()
.map(|b| b.column_name())
.collect::<Vec<_>>(),
)
.field(
"max_visible_batch_position",
&self.max_visible_batch_position.load(Ordering::Acquire),
Expand Down Expand Up @@ -264,7 +282,7 @@ impl IndexStore {
for config in configs {
match config {
MemIndexConfig::BTree(c) => {
let index = BTreeMemIndex::new(c.field_id, c.column.clone());
let index = Arc::new(BTreeMemIndex::new(c.field_id, c.column.clone()));
registry.btree_indexes.insert(c.name.clone(), index);
}
MemIndexConfig::Hnsw(c) => {
Expand Down Expand Up @@ -293,7 +311,7 @@ impl IndexStore {
/// the production memtable path goes through [`Self::from_configs`].
pub fn add_btree(&mut self, name: String, field_id: i32, column: String) {
self.btree_indexes
.insert(name, BTreeMemIndex::new(field_id, column));
.insert(name, Arc::new(BTreeMemIndex::new(field_id, column)));
}

/// Add an HNSW vector index with default build parameters.
Expand Down Expand Up @@ -362,6 +380,97 @@ impl IndexStore {
.insert(name, FtsMemIndex::with_params(field_id, column, params));
}

/// Maintain a BTree on each primary-key column so the memtable can answer
/// "newest visible version of this key" (see [`Self::pk_newest_visible`]).
///
/// Reuses an existing BTree on the field, else auto-creates one under a
/// `__pk__*` name so the normal insert loop maintains it. Call once at
/// construction, after [`Self::from_configs`] and before any inserts; a
/// no-op when `pk_columns` is empty. The auto-created BTrees are in-memory
/// only — flush rebuilds from the user's index configs — so a primary key
/// without a user scalar index gains no on-disk index.
pub fn enable_pk_index(&mut self, pk_columns: &[(String, i32)]) {
for (column, field_id) in pk_columns {
let btree = match self
.btree_indexes
.values()
.find(|b| b.field_id() == *field_id)
{
Some(existing) => existing.clone(),
None => {
let btree = Arc::new(BTreeMemIndex::new(*field_id, column.clone()));
self.btree_indexes
.insert(format!("__pk__{column}"), btree.clone());
btree
}
};
self.pk_btrees.push(btree);
}
}

/// Whether the memtable has a primary-key index (a BTree per PK column).
pub fn has_pk_index(&self) -> bool {
!self.pk_btrees.is_empty()
}

/// The newest row position of the primary-key tuple `values` (in PK order)
/// visible at `max_visible_row`, or `None`.
///
/// Single-column is one seek. A composite key walks the leading column's
/// visible positions newest-first and returns the first that every other
/// column also holds at the same physical position — so the whole tuple
/// matches that row. Collision-free, since `position` is the row identity.
pub fn pk_newest_visible(
&self,
values: &[ScalarValue],
max_visible_row: RowPosition,
) -> Option<RowPosition> {
match self.pk_btrees.as_slice() {
[] => None,
[only] => only.get_newest_visible(&values[0], max_visible_row),
[leading, rest @ ..] => {
let mut positions = leading.visible_positions(&values[0], max_visible_row);
positions.reverse();
positions.into_iter().find(|&position| {
rest.iter()
.zip(&values[1..])
.all(|(column, value)| column.contains_position(value, position))
})
}
}
}

/// Whether `position` is the newest visible row of `values` — the recency
/// check the active index-search arms apply to drop predicate-crossing
/// stale hits. Callers gate on [`Self::has_pk_index`] first, since this is
/// `false` (drop) when the memtable has no primary-key index.
pub fn pk_is_newest(
&self,
values: &[ScalarValue],
position: RowPosition,
max_visible_row: RowPosition,
) -> bool {
self.pk_newest_visible(values, max_visible_row) == Some(position)
}

/// Whether `values` has any version visible at `max_visible_row` — the
/// cross-source block-list's existence query, snapshot-bounded so a
/// not-yet-visible write can't shadow an older visible copy.
pub fn pk_contains_visible(
&self,
values: &[ScalarValue],
max_visible_row: RowPosition,
) -> bool {
self.pk_newest_visible(values, max_visible_row).is_some()
}

/// Whether the primary-key index holds no rows (or doesn't exist). All
/// columns are inserted together, so the leading column answers for the
/// tuple.
pub fn pk_is_empty(&self) -> bool {
self.pk_btrees.first().is_none_or(|c| c.is_empty())
}

/// Insert a batch into all indexes.
pub fn insert(&self, batch: &RecordBatch, row_offset: u64) -> Result<()> {
self.insert_with_batch_position(batch, row_offset, None)
Expand All @@ -384,6 +493,7 @@ impl IndexStore {
for index in self.fts_indexes.values() {
index.insert(batch, row_offset)?;
}
// PK BTrees alias `btree_indexes` entries — already maintained above.

// Update global watermark after all indexes have been updated
if let Some(bp) = batch_position {
Expand Down Expand Up @@ -440,6 +550,8 @@ impl IndexStore {
}
}

// PK BTrees alias `btree_indexes` entries — already maintained above.

// Update global watermark to the max batch position
let max_bp = batches.iter().map(|b| b.batch_position).max().unwrap();
self.advance_max_visible_batch_position(max_bp);
Expand Down Expand Up @@ -552,6 +664,9 @@ impl IndexStore {
.map(|(name, _idx_type, duration)| (name.to_string(), duration))
.collect();

// PK BTrees alias `btree_indexes` entries — their threads above
// already maintained them (and joined before the watermark advances).

// Update global watermark to the max batch position
let max_bp = batches.iter().map(|b| b.batch_position).max().unwrap();
self.advance_max_visible_batch_position(max_bp);
Expand All @@ -562,7 +677,7 @@ impl IndexStore {

/// Get a BTree index by name.
pub fn get_btree(&self, name: &str) -> Option<&BTreeMemIndex> {
self.btree_indexes.get(name)
self.btree_indexes.get(name).map(Arc::as_ref)
}

/// Get an HNSW vector index by name.
Expand All @@ -583,6 +698,7 @@ impl IndexStore {
self.btree_indexes
.values()
.find(|idx| idx.field_id() == field_id)
.map(Arc::as_ref)
}

/// Get an HNSW vector index by field ID.
Expand All @@ -607,6 +723,7 @@ impl IndexStore {
self.btree_indexes
.values()
.find(|idx| idx.column_name() == column)
.map(Arc::as_ref)
}

/// Get an HNSW vector index by column name.
Expand Down Expand Up @@ -694,6 +811,71 @@ mod tests {
.unwrap()
}

/// Single-column `id` batch for primary-key lookup tests.
fn id_batch(ids: &[i32]) -> RecordBatch {
RecordBatch::try_new(
Arc::new(ArrowSchema::new(vec![Field::new(
"id",
DataType::Int32,
false,
)])),
vec![Arc::new(Int32Array::from(ids.to_vec()))],
)
.unwrap()
}

#[test]
fn pk_newest_visible_single_column() {
let mut store = IndexStore::new();
store.enable_pk_index(&[("id".to_string(), 0)]);
// id=1 at positions 0 and 2 (an update), id=2 at position 1.
store.insert(&id_batch(&[1, 2]), 0).unwrap();
store.insert(&id_batch(&[1]), 2).unwrap();

let one = [ScalarValue::Int32(Some(1))];
// Watermark above the update sees the newest position; below it, the older.
assert_eq!(store.pk_newest_visible(&one, 5), Some(2));
assert_eq!(store.pk_newest_visible(&one, 1), Some(0));
assert!(store.pk_is_newest(&one, 2, 5));
assert!(!store.pk_is_newest(&one, 0, 5));
// Absent key.
assert!(!store.pk_contains_visible(&[ScalarValue::Int32(Some(9))], 5));
}

#[test]
fn pk_newest_visible_composite_intersects_by_position() {
let mut store = IndexStore::new();
store.enable_pk_index(&[("id".to_string(), 0), ("name".to_string(), 1)]);
// Rows: (1,"a")@0, (1,"b")@1, (1,"a")@2 — an update of (1,"a").
let schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
]));
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(Int32Array::from(vec![1, 1, 1])),
Arc::new(StringArray::from(vec!["a", "b", "a"])),
],
)
.unwrap();
store.insert(&batch, 0).unwrap();

let tuple_1a = [ScalarValue::Int32(Some(1)), ScalarValue::from("a")];
let tuple_1b = [ScalarValue::Int32(Some(1)), ScalarValue::from("b")];
// (1,"a")'s newest visible row is its re-write at position 2.
assert_eq!(store.pk_newest_visible(&tuple_1a, 5), Some(2));
assert!(store.pk_is_newest(&tuple_1a, 2, 5));
assert!(!store.pk_is_newest(&tuple_1a, 0, 5));
// (1,"b") only exists at position 1.
assert_eq!(store.pk_newest_visible(&tuple_1b, 5), Some(1));
// Watermark below the re-write: the older (1,"a")@0 is the newest visible.
assert_eq!(store.pk_newest_visible(&tuple_1a, 1), Some(0));
// An absent tuple.
let tuple_2a = [ScalarValue::Int32(Some(2)), ScalarValue::from("a")];
assert!(!store.pk_contains_visible(&tuple_2a, 5));
}

#[test]
fn test_index_registry() {
let schema = create_test_schema();
Expand Down
Loading
Loading