diff --git a/rust/lance-index/src/scalar.rs b/rust/lance-index/src/scalar.rs index 772dfaf4089..f667f3ac407 100644 --- a/rust/lance-index/src/scalar.rs +++ b/rust/lance-index/src/scalar.rs @@ -1014,6 +1014,27 @@ pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index + DeepSizeOf { metrics: &dyn MetricsCollector, ) -> Result; + /// Like [`Self::search`] but with a best-effort `limit` hint: when `limit` is `Some(n)` + /// an index may stop after finding `n` matches (it may still return more). Only push a + /// limit for a single positive lookup. The default ignores it and calls [`Self::search`]. + /// + /// ``` + /// # use lance_core::Result; + /// # use lance_index::{metrics::NoOpMetricsCollector, scalar::{AnyQuery, ScalarIndex}}; + /// # async fn example(index: &dyn ScalarIndex, query: &dyn AnyQuery) -> Result<()> { + /// let _result = index.search_limited(query, &NoOpMetricsCollector, Some(10)).await?; + /// # Ok(()) + /// # } + /// ``` + async fn search_limited( + &self, + query: &dyn AnyQuery, + metrics: &dyn MetricsCollector, + _limit: Option, + ) -> Result { + self.search(query, metrics).await + } + /// Returns true if the remap operation is supported fn can_remap(&self) -> bool; diff --git a/rust/lance-index/src/scalar/btree.rs b/rust/lance-index/src/scalar/btree.rs index a650e8db244..45e691da3fb 100644 --- a/rust/lance-index/src/scalar/btree.rs +++ b/rust/lance-index/src/scalar/btree.rs @@ -1729,14 +1729,19 @@ impl Index for BTreeIndex { } } -#[async_trait] -impl ScalarIndex for BTreeIndex { - async fn search( +impl BTreeIndex { + /// Shared implementation for [`ScalarIndex::search`] and + /// [`ScalarIndex::search_limited`]. + /// + /// When `limit` is `Some(n)` the pages are searched in order and the search stops once + /// it has at least `n` matching row ids. The result may hold more than `n` rows but + /// never fewer unless the query matches fewer. + async fn do_search( &self, - query: &dyn AnyQuery, + query: &SargableQuery, metrics: &dyn MetricsCollector, + limit: Option, ) -> Result { - let query = query.as_any().downcast_ref::().unwrap(); let mut pages = match query { SargableQuery::Equals(val) => self .page_lookup @@ -1797,7 +1802,10 @@ impl ScalarIndex for BTreeIndex { // We add them as Matches::Some (not Matches::All) so that // FlatIndex::search() evaluates the predicate and correctly marks // the rows as NULL rather than TRUE. - if !matches!(query, SargableQuery::IsNull()) { + // + // When a `limit` is set the query is a single positive lookup, so null tracking + // is not needed and skipping null pages helps us stop early. + if limit.is_none() && !matches!(query, SargableQuery::IsNull()) { let existing: HashSet = pages.iter().map(|m| m.page_id()).collect(); for &page_id in self .page_lookup @@ -1822,19 +1830,60 @@ impl ScalarIndex for BTreeIndex { .collect::>(); debug!("Searching {} btree pages", page_tasks.len()); - // Collect both matching row IDs and null row IDs from all pages - let results: Vec = stream::iter(page_tasks) - // I/O and compute mixed here but important case is index in cache so - // use compute intensive thread count - .buffered(get_num_compute_intensive_cpus()) - .try_collect() - .await?; + // Collect row IDs from the pages. `buffered` keeps page order. When a `limit` is + // set we read one page at a time and stop once we have enough matches, so we do + // not issue I/O for pages we never need. Without a limit we fan out across CPUs + // (I/O and compute are mixed, but the important case is the index being cached). + let parallelism = if limit.is_some() { + 1 + } else { + get_num_compute_intensive_cpus() + }; + let mut page_stream = stream::iter(page_tasks).buffered(parallelism); + + let mut results: Vec = Vec::new(); + let mut matches_found: u64 = 0; + while let Some(page_result) = page_stream.try_next().await? { + if let Some(limit) = limit { + // Count only TRUE matches. NULL rows never match, so they must not count + // toward the limit. `len()` already excludes nulls. + matches_found += page_result.len().unwrap_or(0); + results.push(page_result); + if matches_found >= limit as u64 { + break; + } + } else { + results.push(page_result); + } + } // Merge matching row IDs let selection = NullableRowAddrSet::union_all(&results); Ok(SearchResult::Exact(selection)) } +} + +#[async_trait] +impl ScalarIndex for BTreeIndex { + async fn search( + &self, + query: &dyn AnyQuery, + metrics: &dyn MetricsCollector, + ) -> Result { + let query = query.as_any().downcast_ref::().unwrap(); + self.do_search(query, metrics, None).await + } + + async fn search_limited( + &self, + query: &dyn AnyQuery, + metrics: &dyn MetricsCollector, + limit: Option, + ) -> Result { + let query = query.as_any().downcast_ref::().unwrap(); + self.do_search(query, metrics, limit).await + } fn can_remap(&self) -> bool { true @@ -4963,6 +5012,75 @@ mod tests { } } + /// `search_limited` returns at least `limit` matches but stops reading pages early, + /// so for a multi-page range it returns fewer rows than an unlimited search. + #[tokio::test] + async fn test_search_limited_short_circuits() { + use arrow_array::{Int32Array, UInt64Array}; + + let tmpdir = TempObjDir::default(); + let test_store = Arc::new(LanceIndexStore::new( + Arc::new(ObjectStore::local()), + tmpdir.clone(), + Arc::new(LanceCache::no_cache()), + )); + + // Enough rows to span several btree pages, with no nulls so every row matches an + // unbounded range. `train_btree_index` makes pages of `DEFAULT_BTREE_BATCH_SIZE` + // rows, so this gives five pages. + let num_rows = 5 * DEFAULT_BTREE_BATCH_SIZE; + let values: Int32Array = (0..num_rows).map(|i| Some(i as i32)).collect(); + let row_ids = UInt64Array::from_iter_values(0..num_rows); + let data = arrow_array::RecordBatch::try_from_iter(vec![ + ("value", Arc::new(values) as arrow_array::ArrayRef), + ("_rowid", Arc::new(row_ids) as arrow_array::ArrayRef), + ]) + .unwrap(); + let schema = data.schema(); + let stream: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new( + schema, + stream::iter(vec![Ok(data)]), + )); + train_btree_index( + stream, + test_store.as_ref(), + DEFAULT_BTREE_BATCH_SIZE, + None, + None, + ) + .await + .unwrap(); + + let index = BTreeIndex::load(test_store.clone(), None, &LanceCache::no_cache()) + .await + .unwrap(); + let metrics = NoOpMetricsCollector; + let everything = + SargableQuery::Range(std::ops::Bound::Unbounded, std::ops::Bound::Unbounded); + + // Baseline: an unlimited search returns every row. + let full = index.search(&everything, &metrics).await.unwrap(); + let full_len = full.row_addrs().len().unwrap(); + assert_eq!(full_len, num_rows); + + // A limit that reaches into the second page. The search must satisfy it but stop + // well before reading all five pages. + let limit = (DEFAULT_BTREE_BATCH_SIZE + 100) as usize; + let limited = index + .search_limited(&everything, &metrics, Some(limit)) + .await + .unwrap(); + let limited_len = limited.row_addrs().len().unwrap(); + assert!( + limited_len >= limit as u64, + "expected at least {limit} matches, got {limited_len}" + ); + assert!( + limited_len < full_len, + "expected the search to short-circuit, but it returned all {full_len} rows" + ); + } + fn sample_lookup_batch() -> RecordBatch { record_batch!( ("min", Int32, [Some(0), Some(10), Some(20)]), diff --git a/rust/lance-index/src/scalar/expression.rs b/rust/lance-index/src/scalar/expression.rs index 38a29e9c43c..d5844c948bd 100644 --- a/rust/lance-index/src/scalar/expression.rs +++ b/rust/lance-index/src/scalar/expression.rs @@ -1324,21 +1324,24 @@ impl ScalarIndexExpr { &self, index_loader: &dyn ScalarIndexLoader, metrics: &dyn MetricsCollector, + limit: Option, ) -> Result { match self { + // A limit only applies to a single positive lookup. NOT, AND, and OR need the + // full result of each side, so the limit is dropped when recursing into them. Self::Not(inner) => { - let result = inner.evaluate_nullable(index_loader, metrics).await?; + let result = inner.evaluate_nullable(index_loader, metrics, None).await?; Ok(!result) } Self::And(lhs, rhs) => { - let lhs_result = lhs.evaluate_nullable(index_loader, metrics); - let rhs_result = rhs.evaluate_nullable(index_loader, metrics); + let lhs_result = lhs.evaluate_nullable(index_loader, metrics, None); + let rhs_result = rhs.evaluate_nullable(index_loader, metrics, None); let (lhs_result, rhs_result) = try_join!(lhs_result, rhs_result)?; Ok(lhs_result & rhs_result) } Self::Or(lhs, rhs) => { - let lhs_result = lhs.evaluate_nullable(index_loader, metrics); - let rhs_result = rhs.evaluate_nullable(index_loader, metrics); + let lhs_result = lhs.evaluate_nullable(index_loader, metrics, None); + let rhs_result = rhs.evaluate_nullable(index_loader, metrics, None); let (lhs_result, rhs_result) = try_join!(lhs_result, rhs_result)?; Ok(lhs_result | rhs_result) } @@ -1346,7 +1349,9 @@ impl ScalarIndexExpr { let index = index_loader .load_index(&search.column, &search.index_name, metrics) .await?; - let search_result = index.search(search.query.as_ref(), metrics).await?; + let search_result = index + .search_limited(search.query.as_ref(), metrics, limit) + .await?; Ok(search_result.into()) } } @@ -1357,9 +1362,24 @@ impl ScalarIndexExpr { &self, index_loader: &dyn ScalarIndexLoader, metrics: &dyn MetricsCollector, + ) -> Result { + self.evaluate_limited(index_loader, metrics, None).await + } + + /// Like [`Self::evaluate`] but pushes a `limit` hint into the index search so it can + /// stop once it has found at least `limit` matches. + /// + /// See [`crate::scalar::ScalarIndex::search_limited`] for the rules on when a limit + /// may be pushed down. + #[instrument(level = "debug", skip_all)] + pub async fn evaluate_limited( + &self, + index_loader: &dyn ScalarIndexLoader, + metrics: &dyn MetricsCollector, + limit: Option, ) -> Result { Ok(self - .evaluate_nullable(index_loader, metrics) + .evaluate_nullable(index_loader, metrics, limit) .await? .drop_nulls()) } diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index 38328894d2e..c8c6832ae38 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -2847,6 +2847,16 @@ impl Scanner { fragments: Option>>, scan_range: Option>, ) -> Result> { + // Decide whether a limit can be pushed into the index search. The fragments the + // read covers (used for the deletion check) are the requested subset, or the whole + // dataset when none was given. + let all_fragments = self.dataset.fragments(); + let scanned_fragments: &[Fragment] = fragments + .as_ref() + .map(|frags| frags.as_slice()) + .unwrap_or_else(|| all_fragments.as_slice()); + let pushdown_limit = self.index_search_limit(filter_plan, scanned_fragments); + let mut read_options = FilteredReadOptions::basic_full_read(&self.dataset) .with_filter_plan(filter_plan.clone()) .with_projection(projection); @@ -2885,11 +2895,10 @@ impl Scanner { let result_format = self.index_expr_result_format(); let index_input = filter_plan.index_query.clone().map(|index_query| { - Arc::new(ScalarIndexExec::new( - self.dataset.clone(), - index_query, - result_format, - )) as Arc + Arc::new( + ScalarIndexExec::new(self.dataset.clone(), index_query, result_format) + .with_limit(pushdown_limit), + ) as Arc }); Ok(Arc::new(FilteredReadExec::try_new( @@ -4068,6 +4077,55 @@ impl Scanner { Ok((relevant_frags, missing_frags)) } + /// Compute the limit hint that can be safely pushed into a scalar index search. + /// + /// Pushing a limit is only an optimization. A `GlobalLimitExec` still applies the + /// exact limit and offset, so the index only needs to return at least `limit + offset` + /// rows. The first N matches are as good as any N matches only when all of these hold. + /// + /// - There is a positive row limit. + /// - The rows are not reordered before the limit (no `ORDER BY`, vector or FTS search). + /// - There is no aggregate (the limit applies after aggregation). + /// - The index result is used as is, with no refine filter and no recheck. Either of + /// those re-filters rows later and could drop matches. + /// - The relevant fragments have no deletions. Deleted rows are pruned after the index + /// search, so stopping early could leave fewer than `limit` live rows. + /// + /// Returns `None` when no limit can be pushed. + fn index_search_limit( + &self, + filter_plan: &ExprFilterPlan, + relevant_fragments: &[Fragment], + ) -> Option { + let limit = self.limit?; + if limit <= 0 { + return None; + } + if self.ordering.is_some() + || self.nearest.is_some() + || self.full_text_query.is_some() + || self.aggregate.is_some() + || filter_plan.has_refine() + { + return None; + } + if filter_plan + .index_query + .as_ref() + .is_some_and(|query| query.needs_recheck()) + { + return None; + } + if relevant_fragments + .iter() + .any(|fragment| fragment.deletion_file.is_some()) + { + return None; + } + let offset = self.offset.unwrap_or(0).max(0) as usize; + Some((limit as usize).saturating_add(offset)) + } + // First perform a lookup in a scalar index for ids and then perform a take on the // target fragments with those ids async fn scalar_indexed_scan( @@ -4092,11 +4150,18 @@ impl Scanner { .partition_frags_by_coverage(index_expr, fragments) .await?; - let mut plan: Arc = Arc::new(MaterializeIndexExec::new( - self.dataset.clone(), - index_expr.clone(), - Arc::new(relevant_frags), - )); + // A limit can be pushed into the index search, but only when its rows are used as + // is and the relevant fragments have no deletions. + let pushdown_limit = self.index_search_limit(filter_plan, &relevant_frags); + + let mut plan: Arc = Arc::new( + MaterializeIndexExec::new( + self.dataset.clone(), + index_expr.clone(), + Arc::new(relevant_frags), + ) + .with_limit(pushdown_limit), + ); let refine_expr = filter_plan.refine_expr.as_ref(); @@ -5815,6 +5880,75 @@ mod test { assert_eq!(ids, &(10..20).collect::>()); } + #[tokio::test] + async fn test_limit_pushed_into_scalar_index() { + // When a scan filter is fully served by a scalar index (no refine, no recheck, no + // ordering) the limit can be pushed into the index search. The result must still + // be exactly `limit` rows that all match the filter. Early stop must not drop or + // duplicate matches. + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "id", + DataType::Int32, + false, + )])); + // Span several btree pages so a small limit short-circuits before the end. + let num_rows = 20_000; + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from_iter_values(0..num_rows))], + ) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let mut dataset = Dataset::write(reader, "memory://", None).await.unwrap(); + dataset + .create_index( + &["id"], + IndexType::BTree, + None, + &ScalarIndexParams::default(), + true, + ) + .await + .unwrap(); + + let limit = 100; + let scan_ids = |dataset: Arc| async move { + let batch = dataset + .scan() + .filter("id >= 5") + .unwrap() + .limit(Some(limit), None) + .unwrap() + .try_into_batch() + .await + .unwrap(); + batch + .column_by_name("id") + .unwrap() + .as_primitive::() + .values() + .to_vec() + }; + + let ids = scan_ids(Arc::new(dataset.clone())).await; + assert_eq!(ids.len(), limit as usize); + assert!( + ids.iter().all(|&id| id >= 5), + "every returned row must satisfy the filter" + ); + + // With deletions present the limit must not be pushed, since deleted rows are + // pruned after the index search. The scan must still return exactly `limit` live + // matches. + dataset.delete("id >= 5 AND id < 10000").await.unwrap(); + let ids = scan_ids(Arc::new(dataset)).await; + assert_eq!(ids.len(), limit as usize); + assert!( + ids.iter().all(|&id| id >= 10000), + "deleted rows must not be returned" + ); + } + #[test_log::test(tokio::test)] async fn test_limit_cancel() { // If there is a filter and a limit and we can't use the index to satisfy diff --git a/rust/lance/src/index/scalar_logical.rs b/rust/lance/src/index/scalar_logical.rs index d2ab4e4b9f0..9d63e6b4368 100644 --- a/rust/lance/src/index/scalar_logical.rs +++ b/rust/lance/src/index/scalar_logical.rs @@ -132,6 +132,24 @@ impl ScalarIndex for LogicalScalarIndex { combine_search_results(results) } + async fn search_limited( + &self, + query: &dyn AnyQuery, + metrics: &dyn MetricsCollector, + limit: Option, + ) -> Result { + // Forwarding the limit to every segment is safe. Each segment returns at least + // `limit` matches when it has them, so the combined result still has at least + // `limit` matches overall. + let results = try_join_all( + self.segments + .iter() + .map(|segment| segment.search_limited(query, metrics, limit)), + ) + .await?; + combine_search_results(results) + } + fn can_remap(&self) -> bool { false } diff --git a/rust/lance/src/io/exec/scalar_index.rs b/rust/lance/src/io/exec/scalar_index.rs index cc2abb6cc30..bb411bc65df 100644 --- a/rust/lance/src/io/exec/scalar_index.rs +++ b/rust/lance/src/io/exec/scalar_index.rs @@ -76,6 +76,12 @@ pub struct ScalarIndexExec { properties: Arc, metrics: ExecutionPlanMetricsSet, result_format: IndexExprResultWireFormat, + /// Hint passed to the index search so it can stop once it has found this many + /// matches. `None` means search all matches. + /// + /// This is only an optimization. A downstream `GlobalLimitExec` still applies the + /// exact limit, so the index only needs to return at least this many rows. + limit: Option, } impl DisplayAs for ScalarIndexExec { @@ -109,9 +115,20 @@ impl ScalarIndexExec { properties, metrics: ExecutionPlanMetricsSet::new(), result_format, + limit: None, } } + /// Push a `limit` hint into the index search so it can stop early. + /// + /// Only set this when returning any `limit` matching rows is safe, such as an + /// unordered scan whose results are not filtered further. Correctness still relies on + /// a downstream limit operator. + pub fn with_limit(mut self, limit: Option) -> Self { + self.limit = limit; + self + } + pub fn dataset(&self) -> &Arc { &self.dataset } @@ -162,12 +179,14 @@ impl ScalarIndexExec { dataset: Arc, plan_metrics: ExecutionPlanMetricsSet, result_format: IndexExprResultWireFormat, + limit: Option, ) -> Result { let metrics = IndexMetrics::new(&plan_metrics, 0); let query_result = { let search_time = plan_metrics.new_time(SCALAR_INDEX_SEARCH_TIME_METRIC, 0); let _timer = search_time.timer(); - expr.evaluate(dataset.as_ref(), &metrics).await? + expr.evaluate_limited(dataset.as_ref(), &metrics, limit) + .await? }; let fragments_covered_by_result = Self::fragments_covered_by_index_query(&expr, dataset.as_ref()).await?; @@ -219,6 +238,7 @@ impl ExecutionPlan for ScalarIndexExec { self.dataset.clone(), self.metrics.clone(), self.result_format, + self.limit, ); let stream = futures::stream::iter(vec![batch_fut]) .then(|batch_fut| batch_fut.map_err(|err| err.into())) @@ -487,6 +507,12 @@ pub struct MaterializeIndexExec { fragments: Arc>, properties: Arc, metrics: ExecutionPlanMetricsSet, + /// Hint passed to the index search so it can stop once it has found this many + /// matches. `None` means materialize all matches. + /// + /// This is only an optimization. A downstream `GlobalLimitExec` still applies the + /// exact limit, so the index only needs to return at least this many rows. + limit: Option, } impl DisplayAs for MaterializeIndexExec { @@ -559,17 +585,29 @@ impl MaterializeIndexExec { fragments, properties, metrics: ExecutionPlanMetricsSet::new(), + limit: None, } } + /// Push a `limit` hint into the index search so it can stop early. + /// + /// Only set this when returning any `limit` matching rows is safe, such as an + /// unordered scan whose results are not filtered further. Correctness still relies on + /// a downstream limit operator. + pub fn with_limit(mut self, limit: Option) -> Self { + self.limit = limit; + self + } + #[instrument(name = "materialize_scalar_index", skip_all, level = "debug")] async fn do_execute( expr: ScalarIndexExpr, dataset: Arc, fragments: Arc>, metrics: Arc, + limit: Option, ) -> Result { - let expr_result = expr.evaluate(dataset.as_ref(), metrics.as_ref()); + let expr_result = expr.evaluate_limited(dataset.as_ref(), metrics.as_ref(), limit); let span = debug_span!("create_prefilter"); let prefilter = span.in_scope(|| { let fragment_bitmap = @@ -736,6 +774,7 @@ impl ExecutionPlan for MaterializeIndexExec { self.dataset.clone(), self.fragments.clone(), metrics, + self.limit, ); let stream = futures::stream::iter(vec![batch_fut]) .then(|batch_fut| batch_fut.map_err(|err| err.into()))