Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions rust/lance-index/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1014,6 +1014,27 @@ pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index + DeepSizeOf {
metrics: &dyn MetricsCollector,
) -> Result<SearchResult>;

/// Like [`Self::search`] but with a best-effort `limit` hint: when `limit` is `Some(n)`
/// an index may stop after finding `n` matches (it may still return more). Only push a
/// limit for a single positive lookup. The default ignores it and calls [`Self::search`].
///
/// ```
/// # use lance_core::Result;
/// # use lance_index::{metrics::NoOpMetricsCollector, scalar::{AnyQuery, ScalarIndex}};
/// # async fn example(index: &dyn ScalarIndex, query: &dyn AnyQuery) -> Result<()> {
/// let _result = index.search_limited(query, &NoOpMetricsCollector, Some(10)).await?;
/// # Ok(())
/// # }
/// ```
async fn search_limited(
&self,
query: &dyn AnyQuery,
metrics: &dyn MetricsCollector,
_limit: Option<usize>,
) -> Result<SearchResult> {
self.search(query, metrics).await
}

/// Returns true if the remap operation is supported
fn can_remap(&self) -> bool;

Expand Down
144 changes: 131 additions & 13 deletions rust/lance-index/src/scalar/btree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1729,14 +1729,19 @@ impl Index for BTreeIndex {
}
}

#[async_trait]
impl ScalarIndex for BTreeIndex {
async fn search(
impl BTreeIndex {
/// Shared implementation for [`ScalarIndex::search`] and
/// [`ScalarIndex::search_limited`].
///
/// When `limit` is `Some(n)` the pages are searched in order and the search stops once
/// it has at least `n` matching row ids. The result may hold more than `n` rows but
/// never fewer unless the query matches fewer.
async fn do_search(
&self,
query: &dyn AnyQuery,
query: &SargableQuery,
metrics: &dyn MetricsCollector,
limit: Option<usize>,
) -> Result<SearchResult> {
let query = query.as_any().downcast_ref::<SargableQuery>().unwrap();
let mut pages = match query {
SargableQuery::Equals(val) => self
.page_lookup
Expand Down Expand Up @@ -1797,7 +1802,10 @@ impl ScalarIndex for BTreeIndex {
// We add them as Matches::Some (not Matches::All) so that
// FlatIndex::search() evaluates the predicate and correctly marks
// the rows as NULL rather than TRUE.
if !matches!(query, SargableQuery::IsNull()) {
//
// When a `limit` is set the query is a single positive lookup, so null tracking
// is not needed and skipping null pages helps us stop early.
if limit.is_none() && !matches!(query, SargableQuery::IsNull()) {
let existing: HashSet<u32> = pages.iter().map(|m| m.page_id()).collect();
for &page_id in self
.page_lookup
Expand All @@ -1822,19 +1830,60 @@ impl ScalarIndex for BTreeIndex {
.collect::<Vec<_>>();
debug!("Searching {} btree pages", page_tasks.len());

// Collect both matching row IDs and null row IDs from all pages
let results: Vec<NullableRowAddrSet> = stream::iter(page_tasks)
// I/O and compute mixed here but important case is index in cache so
// use compute intensive thread count
.buffered(get_num_compute_intensive_cpus())
.try_collect()
.await?;
// Collect row IDs from the pages. `buffered` keeps page order. When a `limit` is
// set we read one page at a time and stop once we have enough matches, so we do
// not issue I/O for pages we never need. Without a limit we fan out across CPUs
// (I/O and compute are mixed, but the important case is the index being cached).
let parallelism = if limit.is_some() {
1
} else {
get_num_compute_intensive_cpus()
};
let mut page_stream = stream::iter(page_tasks).buffered(parallelism);

let mut results: Vec<NullableRowAddrSet> = Vec::new();
let mut matches_found: u64 = 0;
while let Some(page_result) = page_stream.try_next().await? {
if let Some(limit) = limit {
// Count only TRUE matches. NULL rows never match, so they must not count
// toward the limit. `len()` already excludes nulls.
matches_found += page_result.len().unwrap_or(0);
results.push(page_result);
if matches_found >= limit as u64 {
break;
}
} else {
results.push(page_result);
}
}

// Merge matching row IDs
let selection = NullableRowAddrSet::union_all(&results);

Ok(SearchResult::Exact(selection))
}
}

#[async_trait]
impl ScalarIndex for BTreeIndex {
async fn search(
&self,
query: &dyn AnyQuery,
metrics: &dyn MetricsCollector,
) -> Result<SearchResult> {
let query = query.as_any().downcast_ref::<SargableQuery>().unwrap();
self.do_search(query, metrics, None).await
}

async fn search_limited(
&self,
query: &dyn AnyQuery,
metrics: &dyn MetricsCollector,
limit: Option<usize>,
) -> Result<SearchResult> {
let query = query.as_any().downcast_ref::<SargableQuery>().unwrap();
self.do_search(query, metrics, limit).await
}

fn can_remap(&self) -> bool {
true
Expand Down Expand Up @@ -4963,6 +5012,75 @@ mod tests {
}
}

/// `search_limited` returns at least `limit` matches but stops reading pages early,
/// so for a multi-page range it returns fewer rows than an unlimited search.
#[tokio::test]
async fn test_search_limited_short_circuits() {
use arrow_array::{Int32Array, UInt64Array};

let tmpdir = TempObjDir::default();
let test_store = Arc::new(LanceIndexStore::new(
Arc::new(ObjectStore::local()),
tmpdir.clone(),
Arc::new(LanceCache::no_cache()),
));

// Enough rows to span several btree pages, with no nulls so every row matches an
// unbounded range. `train_btree_index` makes pages of `DEFAULT_BTREE_BATCH_SIZE`
// rows, so this gives five pages.
let num_rows = 5 * DEFAULT_BTREE_BATCH_SIZE;
let values: Int32Array = (0..num_rows).map(|i| Some(i as i32)).collect();
let row_ids = UInt64Array::from_iter_values(0..num_rows);
let data = arrow_array::RecordBatch::try_from_iter(vec![
("value", Arc::new(values) as arrow_array::ArrayRef),
("_rowid", Arc::new(row_ids) as arrow_array::ArrayRef),
])
.unwrap();
let schema = data.schema();
let stream: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new(
schema,
stream::iter(vec![Ok(data)]),
));
train_btree_index(
stream,
test_store.as_ref(),
DEFAULT_BTREE_BATCH_SIZE,
None,
None,
)
.await
.unwrap();

let index = BTreeIndex::load(test_store.clone(), None, &LanceCache::no_cache())
.await
.unwrap();
let metrics = NoOpMetricsCollector;
let everything =
SargableQuery::Range(std::ops::Bound::Unbounded, std::ops::Bound::Unbounded);

// Baseline: an unlimited search returns every row.
let full = index.search(&everything, &metrics).await.unwrap();
let full_len = full.row_addrs().len().unwrap();
assert_eq!(full_len, num_rows);

// A limit that reaches into the second page. The search must satisfy it but stop
// well before reading all five pages.
let limit = (DEFAULT_BTREE_BATCH_SIZE + 100) as usize;
let limited = index
.search_limited(&everything, &metrics, Some(limit))
.await
.unwrap();
let limited_len = limited.row_addrs().len().unwrap();
assert!(
limited_len >= limit as u64,
"expected at least {limit} matches, got {limited_len}"
);
assert!(
limited_len < full_len,
"expected the search to short-circuit, but it returned all {full_len} rows"
);
}

fn sample_lookup_batch() -> RecordBatch {
record_batch!(
("min", Int32, [Some(0), Some(10), Some(20)]),
Expand Down
34 changes: 27 additions & 7 deletions rust/lance-index/src/scalar/expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1324,29 +1324,34 @@ impl ScalarIndexExpr {
&self,
index_loader: &dyn ScalarIndexLoader,
metrics: &dyn MetricsCollector,
limit: Option<usize>,
) -> Result<NullableIndexExprResult> {
match self {
// A limit only applies to a single positive lookup. NOT, AND, and OR need the
// full result of each side, so the limit is dropped when recursing into them.
Self::Not(inner) => {
let result = inner.evaluate_nullable(index_loader, metrics).await?;
let result = inner.evaluate_nullable(index_loader, metrics, None).await?;
Ok(!result)
}
Self::And(lhs, rhs) => {
let lhs_result = lhs.evaluate_nullable(index_loader, metrics);
let rhs_result = rhs.evaluate_nullable(index_loader, metrics);
let lhs_result = lhs.evaluate_nullable(index_loader, metrics, None);
let rhs_result = rhs.evaluate_nullable(index_loader, metrics, None);
let (lhs_result, rhs_result) = try_join!(lhs_result, rhs_result)?;
Ok(lhs_result & rhs_result)
}
Self::Or(lhs, rhs) => {
let lhs_result = lhs.evaluate_nullable(index_loader, metrics);
let rhs_result = rhs.evaluate_nullable(index_loader, metrics);
let lhs_result = lhs.evaluate_nullable(index_loader, metrics, None);
let rhs_result = rhs.evaluate_nullable(index_loader, metrics, None);
let (lhs_result, rhs_result) = try_join!(lhs_result, rhs_result)?;
Ok(lhs_result | rhs_result)
}
Self::Query(search) => {
let index = index_loader
.load_index(&search.column, &search.index_name, metrics)
.await?;
let search_result = index.search(search.query.as_ref(), metrics).await?;
let search_result = index
.search_limited(search.query.as_ref(), metrics, limit)
.await?;
Ok(search_result.into())
}
}
Expand All @@ -1357,9 +1362,24 @@ impl ScalarIndexExpr {
&self,
index_loader: &dyn ScalarIndexLoader,
metrics: &dyn MetricsCollector,
) -> Result<IndexExprResult> {
self.evaluate_limited(index_loader, metrics, None).await
}

/// Like [`Self::evaluate`] but pushes a `limit` hint into the index search so it can
/// stop once it has found at least `limit` matches.
///
/// See [`crate::scalar::ScalarIndex::search_limited`] for the rules on when a limit
/// may be pushed down.
#[instrument(level = "debug", skip_all)]
pub async fn evaluate_limited(
&self,
index_loader: &dyn ScalarIndexLoader,
metrics: &dyn MetricsCollector,
limit: Option<usize>,
) -> Result<IndexExprResult> {
Ok(self
.evaluate_nullable(index_loader, metrics)
.evaluate_nullable(index_loader, metrics, limit)
.await?
.drop_nulls())
}
Expand Down
Loading
Loading