Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions java/lance-jni/src/mem_wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1075,6 +1075,7 @@ fn shard_snapshot_from_manifest(manifest: ShardManifest) -> ShardSnapshot {
path: generation.path,
})
.collect(),
shard_field_values: Default::default(),
}
}

Expand Down
1 change: 1 addition & 0 deletions python/src/mem_wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -957,6 +957,7 @@ fn shard_snapshot_from_manifest(manifest: lance_index::mem_wal::ShardManifest) -
path: generation.path,
})
.collect(),
shard_field_values: Default::default(),
}
}

Expand Down
1 change: 1 addition & 0 deletions rust/lance/src/dataset/mem_wal/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ mod fts_search;
mod planner;
mod point_lookup;
mod projection;
pub(crate) mod shard_pruning;
mod vector_search;

pub use builder::LsmScanner;
Expand Down
28 changes: 28 additions & 0 deletions rust/lance/src/dataset/mem_wal/scanner/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
use datafusion::prelude::{Expr, SessionContext};
use futures::TryStreamExt;
use lance_core::{Error, Result, is_system_column};
use lance_index::mem_wal::ShardingSpec;
use uuid::Uuid;

use super::collector::{InMemoryMemTableRef, InMemoryMemTables, LsmDataSourceCollector};
Expand Down Expand Up @@ -125,6 +126,10 @@ pub struct LsmScanner {
/// Cache of opened flushed-generation datasets. When set, repeated
/// queries against the same generation skip the manifest read entirely.
flushed_cache: Option<Arc<FlushedMemTableCache>>,
/// Optional sharding spec for read-path shard pruning.
sharding_spec: Option<ShardingSpec>,
/// Mapping from source field id to column name, for sharding evaluation.
source_id_to_column: HashMap<i32, String>,
}

impl LsmScanner {
Expand Down Expand Up @@ -160,6 +165,8 @@ impl LsmScanner {
pk_columns,
session,
flushed_cache: None,
sharding_spec: None,
source_id_to_column: HashMap::new(),
}
}

Expand Down Expand Up @@ -198,6 +205,8 @@ impl LsmScanner {
pk_columns,
session: None,
flushed_cache: None,
sharding_spec: None,
source_id_to_column: HashMap::new(),
}
}

Expand Down Expand Up @@ -253,6 +262,19 @@ impl LsmScanner {
self
}

/// Set the sharding spec and source-column mapping for read-path shard
/// pruning. When set, the scan planner can skip shards whose field values
/// do not match the query filter.
pub fn with_sharding_spec(
mut self,
spec: ShardingSpec,
source_id_to_column: HashMap<i32, String>,
) -> Self {
self.sharding_spec = Some(spec);
self.source_id_to_column = source_id_to_column;
self
}

/// Project specific columns.
///
/// If not called, all columns from the base schema are included.
Expand Down Expand Up @@ -490,6 +512,12 @@ impl LsmScanner {
collector = collector.with_in_memory_memtables(*shard_id, mems.clone());
}

if let Some(spec) = &self.sharding_spec {
collector = collector
.with_sharding_spec(spec.clone(), self.source_id_to_column.clone())
.with_base_schema(self.schema.clone());
}

collector
}
}
Expand Down
62 changes: 62 additions & 0 deletions rust/lance/src/dataset/mem_wal/scanner/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use arrow_schema::SchemaRef;
use datafusion::prelude::Expr;
use lance_core::Result;
use lance_index::mem_wal::ShardingSpec;
use uuid::Uuid;

use super::data_source::{LsmDataSource, LsmGeneration, ShardSnapshot};
Expand Down Expand Up @@ -66,6 +68,12 @@ pub struct LsmDataSourceCollector {
shard_snapshots: Vec<ShardSnapshot>,
/// In-memory memtables by shard (active + frozen-awaiting-flush).
in_memory_memtables: HashMap<Uuid, InMemoryMemTables>,
/// Optional sharding spec for read-path shard pruning.
sharding_spec: Option<ShardingSpec>,
/// Mapping from source field id to column name, for sharding evaluation.
source_id_to_column: HashMap<i32, String>,
/// Base schema for type coercion during shard pruning.
base_schema: Option<SchemaRef>,
}

impl LsmDataSourceCollector {
Expand All @@ -84,6 +92,9 @@ impl LsmDataSourceCollector {
base_path,
shard_snapshots,
in_memory_memtables: HashMap::new(),
sharding_spec: None,
source_id_to_column: HashMap::new(),
base_schema: None,
}
}

Expand All @@ -101,6 +112,9 @@ impl LsmDataSourceCollector {
base_path: base_path.into().trim_end_matches('/').to_string(),
shard_snapshots,
in_memory_memtables: HashMap::new(),
sharding_spec: None,
source_id_to_column: HashMap::new(),
base_schema: None,
}
}

Expand Down Expand Up @@ -132,6 +146,25 @@ impl LsmDataSourceCollector {
self
}

/// Set the sharding spec and source-column mapping for read-path shard
/// pruning. When set, [`Self::collect_pruned`] can skip shards whose
/// field values do not match the query filter.
pub fn with_sharding_spec(
mut self,
spec: ShardingSpec,
source_id_to_column: HashMap<i32, String>,
) -> Self {
self.sharding_spec = Some(spec);
self.source_id_to_column = source_id_to_column;
self
}

/// Set the base schema used for type coercion during shard pruning.
pub fn with_base_schema(mut self, schema: SchemaRef) -> Self {
self.base_schema = Some(schema);
self
}

/// Get the base table, if any.
pub fn base_table(&self) -> Option<&Arc<Dataset>> {
self.base_table.as_ref()
Expand Down Expand Up @@ -304,6 +337,33 @@ impl LsmDataSourceCollector {
Ok(sources)
}

/// Collect data sources, pruning shards when the filter references the
/// sharding column and a [`ShardingSpec`] has been configured via
/// [`Self::with_sharding_spec`].
///
/// Falls back to [`Self::collect`] when pruning is not possible (no spec,
/// no filter, or the filter does not match the sharding column).
pub fn collect_pruned(&self, filter: Option<&Expr>) -> Result<Vec<LsmDataSource>> {
if let Some(spec) = &self.sharding_spec
&& let Some(filter) = filter
&& let Some(shard_ids) = super::shard_pruning::prune_shards(
filter,
spec,
&self.shard_snapshots,
&self.source_id_to_column,
self.base_schema.as_ref(),
)
{
tracing::debug!(
pruned_to = shard_ids.len(),
total = self.shard_snapshots.len(),
"shard pruning applied"
);
return self.collect_for_shards(&shard_ids);
}
self.collect()
}

/// Get the total number of data sources.
pub fn num_sources(&self) -> usize {
let flushed_count: usize = self
Expand Down Expand Up @@ -353,6 +413,7 @@ mod tests {
path: "def_gen_2".to_string(),
},
],
shard_field_values: HashMap::new(),
},
ShardSnapshot {
shard_id: shard_b,
Expand All @@ -362,6 +423,7 @@ mod tests {
generation: 1,
path: "xyz_gen_1".to_string(),
}],
shard_field_values: HashMap::new(),
},
]
}
Expand Down
15 changes: 15 additions & 0 deletions rust/lance/src/dataset/mem_wal/scanner/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

//! Data source types for LSM scanner.

use std::collections::HashMap;
use std::sync::Arc;

use arrow_schema::SchemaRef;
Expand Down Expand Up @@ -93,6 +94,9 @@ pub struct ShardSnapshot {
pub current_generation: u64,
/// List of flushed generations and their paths.
pub flushed_generations: Vec<FlushedGeneration>,
/// Computed shard field values, keyed by field id (e.g. bucket id).
/// Used by shard pruning to skip shards that cannot contain matching rows.
pub shard_field_values: HashMap<String, Vec<u8>>,
}

impl ShardSnapshot {
Expand All @@ -103,6 +107,7 @@ impl ShardSnapshot {
spec_id: 0,
current_generation: 1,
flushed_generations: Vec::new(),
shard_field_values: HashMap::new(),
}
}

Expand All @@ -124,6 +129,16 @@ impl ShardSnapshot {
.push(FlushedGeneration { generation, path });
self
}

/// Set the shard field values for this snapshot.
///
/// These are the computed sharding-field values for the shard, keyed by
/// field id (e.g. the bucket number). Used by the read-path shard pruning
/// to skip shards whose field values do not match the query filter.
pub fn with_shard_field_values(mut self, values: HashMap<String, Vec<u8>>) -> Self {
self.shard_field_values = values;
self
}
}

/// A data source in the LSM tree that can be scanned.
Expand Down
Loading
Loading