From 18434ee86777400702b40ddc9e87241845edbcac Mon Sep 17 00:00:00 2001 From: cl Date: Wed, 3 Jun 2026 00:58:06 +0800 Subject: [PATCH 1/2] arrow-select: fuse inline Utf8View/BinaryView filter coalescing Teach BatchCoalescer to reuse a FilterPredicate when coalescing filtered batches whose non-primitive columns are inline Utf8View/BinaryView values. This avoids materializing an intermediate filtered RecordBatch for sparse filters and copies inline views and nulls directly into the in-progress arrays. Keep materialized filtering for dense filters, batches that do not fit the coalescer buffer, and byte-view arrays with external buffers. Use a looser dense threshold for multi-column batches, where sharing the row selection across columns pays for itself. Add shared FilterSelection iterators so primitive and byte-view coalescers can consume materialized or lazy row selections without matching per row. Signed-off-by: cl --- arrow-select/src/coalesce.rs | 302 +++++++++++++++++++++++-- arrow-select/src/coalesce/byte_view.rs | 126 ++++++++++- arrow-select/src/coalesce/generic.rs | 11 + arrow-select/src/coalesce/primitive.rs | 108 ++++++++- arrow-select/src/filter.rs | 127 ++++++++++- 5 files changed, 644 insertions(+), 30 deletions(-) diff --git a/arrow-select/src/coalesce.rs b/arrow-select/src/coalesce.rs index 8fe88fb8c377..e1dd8a654c78 100644 --- a/arrow-select/src/coalesce.rs +++ b/arrow-select/src/coalesce.rs @@ -20,7 +20,7 @@ //! //! [`filter`]: crate::filter::filter //! [`take`]: crate::take::take -use crate::filter::filter_record_batch; +use crate::filter::{FilterBuilder, FilterPredicate, FilterSelection}; use crate::take::take_record_batch; use arrow_array::types::{BinaryViewType, StringViewType}; use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch, downcast_primitive}; @@ -212,7 +212,7 @@ impl BatchCoalescer { /// Push a batch into the Coalescer after applying a filter /// /// This is semantically equivalent of calling [`Self::push_batch`] - /// with the results from [`filter_record_batch`] + /// with the results from [`crate::filter::filter_record_batch`] /// /// # Example /// ``` @@ -238,10 +238,7 @@ impl BatchCoalescer { batch: RecordBatch, filter: &BooleanArray, ) -> Result<(), ArrowError> { - // TODO: optimize this to avoid materializing (copying the results - // of filter to a new batch) - let filtered_batch = filter_record_batch(&batch, filter)?; - self.push_batch(filtered_batch) + self.push_batch_with_filtered_columns(batch, filter) } /// Push a batch into the Coalescer after applying a set of indices @@ -566,6 +563,79 @@ impl BatchCoalescer { } } +impl BatchCoalescer { + fn push_batch_with_filtered_columns( + &mut self, + batch: RecordBatch, + filter: &BooleanArray, + ) -> Result<(), ArrowError> { + if filter.len() > batch.num_rows() { + return Err(ArrowError::InvalidArgumentError(format!( + "Filter predicate of length {} is larger than target array of length {}", + filter.len(), + batch.num_rows() + ))); + } + + let mut filter_builder = FilterBuilder::new(filter); + if batch.num_columns() > 1 + || (batch.num_columns() > 0 + && FilterBuilder::is_optimize_beneficial(batch.schema_ref().field(0).data_type())) + { + filter_builder = filter_builder.optimize(); + } + let predicate = filter_builder.build(); + let selected_count = predicate.count(); + + if selected_count == 0 { + return Ok(()); + } + + if selected_count == batch.num_rows() && filter.len() == batch.num_rows() { + return self.push_batch(batch); + } + + let exceeds_coalesce_limit = self + .biggest_coalesce_batch_size + .is_some_and(|limit| selected_count > limit); + // Multi-column batches benefit from sharing the selection across + // columns; single-column batches need a sparser filter to win. + let is_dense_filter = if batch.num_columns() > 1 { + selected_count.saturating_mul(4) > filter.len() + } else { + selected_count.saturating_mul(16) > filter.len() + }; + let does_not_fit_buffer = selected_count > self.target_batch_size - self.buffered_rows; + + if exceeds_coalesce_limit || is_dense_filter || does_not_fit_buffer { + // Use materialized filtering when sparse per-column copying won't help. + let filtered_batch = predicate.filter_record_batch(&batch)?; + return self.push_batch(filtered_batch); + } + + let (_schema, arrays, _num_rows) = batch.into_parts(); + + if arrays.len() != self.in_progress_arrays.len() { + return Err(ArrowError::InvalidArgumentError(format!( + "Batch has {} columns but BatchCoalescer expects {}", + arrays.len(), + self.in_progress_arrays.len() + ))); + } + + for (in_progress, array) in self.in_progress_arrays.iter_mut().zip(arrays) { + in_progress.copy_rows_by_filter_from(array, &predicate)?; + } + + self.buffered_rows += selected_count; + if self.buffered_rows >= self.target_batch_size { + self.finish_buffered_batch()?; + } + + Ok(()) + } +} + /// Return a new `InProgressArray` for the given data type fn create_in_progress_array(data_type: &DataType, batch_size: usize) -> Box { macro_rules! instantiate_primitive { @@ -611,6 +681,35 @@ trait InProgressArray: std::fmt::Debug + Send + Sync { /// Return an error if the source array is not set fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError>; + /// Copy rows selected by `filter` from the current source array. + fn copy_rows_by_filter(&mut self, filter: &FilterPredicate) -> Result<(), ArrowError> { + self.copy_rows_by_selection(filter.selection()) + } + + /// Copy rows selected by `filter` from `source`. + fn copy_rows_by_filter_from( + &mut self, + source: ArrayRef, + filter: &FilterPredicate, + ) -> Result<(), ArrowError> { + self.set_source(Some(source)); + let result = self.copy_rows_by_filter(filter); + self.set_source(None); + result + } + + /// Copy rows described by a [`FilterSelection`] from the current source array. + fn copy_rows_by_selection(&mut self, selection: FilterSelection<'_>) -> Result<(), ArrowError> { + match selection { + FilterSelection::None => Ok(()), + FilterSelection::All { len } => self.copy_rows(0, len), + FilterSelection::Slices(slices) => { + slices.try_for_each(|(start, end)| self.copy_rows(start, end - start)) + } + FilterSelection::Indices(indices) => indices.try_for_each(|idx| self.copy_rows(idx, 1)), + } + } + /// Finish the currently in-progress array and return it as an `ArrayRef` fn finish(&mut self) -> Result; } @@ -619,6 +718,7 @@ trait InProgressArray: std::fmt::Debug + Send + Sync { mod tests { use super::*; use crate::concat::concat_batches; + use crate::filter::filter_record_batch; use arrow_array::builder::StringViewBuilder; use arrow_array::cast::AsArray; use arrow_array::types::Int32Type; @@ -1197,6 +1297,172 @@ mod tests { .run(); } + #[test] + fn test_binary_view_filtered() { + let values: Vec> = vec![ + Some(b"foo"), + None, + Some(b"A longer string that is more than 12 bytes"), + ]; + + let binary_view = + BinaryViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000)); + let batch = + RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap(); + let filter = sparse_filter(1000); + + Test::new("coalesce_binary_view_filtered") + .with_batch(batch.clone()) + .with_filter(filter.clone()) + .with_batch(batch) + .with_filter(filter) + .with_batch_size(256) + .with_expected_output_sizes(vec![250]) + .run(); + } + + #[test] + fn test_binary_view_filtered_inline() { + let values: Vec> = vec![Some(b"foo"), None, Some(b"barbaz")]; + + let binary_view = + BinaryViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000)); + let batch = + RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap(); + let filter = sparse_filter(1000); + + Test::new("coalesce_binary_view_filtered_inline") + .with_batch(batch.clone()) + .with_filter(filter.clone()) + .with_batch(batch) + .with_filter(filter) + .with_batch_size(300) + .with_expected_output_sizes(vec![250]) + .run(); + } + + #[test] + fn test_string_view_filtered_inline() { + let values: Vec> = vec![Some("foo"), None, Some("barbaz")]; + + let string_view = + StringViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000)); + let batch = + RecordBatch::try_from_iter(vec![("c0", Arc::new(string_view) as ArrayRef)]).unwrap(); + let filter = sparse_filter(1000); + + Test::new("coalesce_string_view_filtered_inline") + .with_batch(batch.clone()) + .with_filter(filter.clone()) + .with_batch(batch) + .with_filter(filter) + .with_batch_size(300) + .with_expected_output_sizes(vec![250]) + .run(); + } + + #[test] + fn test_mixed_inline_binary_view_filtered() { + let int_values = + Int32Array::from_iter((0..1000).map(|v| if v % 5 == 0 { None } else { Some(v) })); + let float_values = arrow_array::Float64Array::from_iter((0..1000).map(|v| Some(v as f64))); + let binary_values: Vec> = vec![Some(b"foo"), None, Some(b"barbaz")]; + let binary_view = BinaryViewArray::from_iter( + std::iter::repeat(binary_values.iter()).flatten().take(1000), + ); + + let batch = RecordBatch::try_from_iter(vec![ + ("i", Arc::new(int_values) as ArrayRef), + ("f", Arc::new(float_values) as ArrayRef), + ("b", Arc::new(binary_view) as ArrayRef), + ]) + .unwrap(); + + let filter = sparse_filter(1000); + + Test::new("coalesce_mixed_inline_binary_view_filtered") + .with_batch(batch.clone()) + .with_filter(filter.clone()) + .with_batch(batch) + .with_filter(filter) + .with_batch_size(300) + .with_expected_output_sizes(vec![250]) + .run(); + } + + #[test] + fn test_mixed_inline_string_view_filtered() { + let int_values = + Int32Array::from_iter((0..1000).map(|v| if v % 5 == 0 { None } else { Some(v) })); + let float_values = arrow_array::Float64Array::from_iter((0..1000).map(|v| Some(v as f64))); + let string_values: Vec> = vec![Some("foo"), None, Some("barbaz")]; + let string_view = StringViewArray::from_iter( + std::iter::repeat(string_values.iter()).flatten().take(1000), + ); + + let batch = RecordBatch::try_from_iter(vec![ + ("i", Arc::new(int_values) as ArrayRef), + ("f", Arc::new(float_values) as ArrayRef), + ("s", Arc::new(string_view) as ArrayRef), + ]) + .unwrap(); + + let filter = sparse_filter(1000); + + Test::new("coalesce_mixed_inline_string_view_filtered") + .with_batch(batch.clone()) + .with_filter(filter.clone()) + .with_batch(batch) + .with_filter(filter) + .with_batch_size(300) + .with_expected_output_sizes(vec![250]) + .run(); + } + + #[test] + fn test_mixed_boolean_inline_string_view_filtered() { + let bool_values = BooleanArray::from_iter((0..1000).map(|v| Some(v % 3 == 0))); + let string_values: Vec> = vec![Some("foo"), None, Some("barbaz")]; + let string_view = StringViewArray::from_iter( + std::iter::repeat(string_values.iter()).flatten().take(1000), + ); + + let batch = RecordBatch::try_from_iter(vec![ + ("b", Arc::new(bool_values) as ArrayRef), + ("s", Arc::new(string_view) as ArrayRef), + ]) + .unwrap(); + + let filter = sparse_filter(1000); + + Test::new("coalesce_mixed_boolean_inline_string_view_filtered") + .with_batch(batch.clone()) + .with_filter(filter.clone()) + .with_batch(batch) + .with_filter(filter) + .with_batch_size(300) + .with_expected_output_sizes(vec![250]) + .run(); + } + + #[test] + fn test_inline_filter_rejects_filter_longer_than_batch() { + let values: Vec> = vec![Some(b"foo"), Some(b"bar")]; + let binary_view = BinaryViewArray::from_iter(values); + let batch = + RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap(); + let filter = BooleanArray::from(vec![true, false, true]); + + let mut coalescer = BatchCoalescer::new(batch.schema(), 100); + let result = coalescer.push_batch_with_filter(batch, &filter); + assert!(result.is_err()); + let err = result.unwrap_err().to_string(); + assert!( + err.contains("Filter predicate of length 3 is larger than target array of length 2"), + "unexpected error: {err}" + ); + } + #[derive(Debug, Clone, PartialEq)] struct ExpectedLayout { len: usize, @@ -1685,6 +1951,10 @@ mod tests { } } + fn sparse_filter(len: usize) -> BooleanArray { + BooleanArray::from_iter((0..len).map(|idx| Some(idx % 8 == 0))) + } + /// Returns the named column as a StringViewArray fn col_as_string_view<'b>(name: &str, batch: &'b RecordBatch) -> &'b StringViewArray { batch @@ -1701,18 +1971,20 @@ mod tests { let (schema, mut columns, row_count) = batch.into_parts(); for column in columns.iter_mut() { - let Some(string_view) = column.as_string_view_opt() else { + if let Some(string_view) = column.as_string_view_opt() { + // Re-create the StringViewArray to ensure memory layout is + // consistent + let mut builder = StringViewBuilder::new(); + for s in string_view.iter() { + builder.append_option(s); + } + *column = Arc::new(builder.finish()); continue; - }; + } - // Re-create the StringViewArray to ensure memory layout is - // consistent - let mut builder = StringViewBuilder::new(); - for s in string_view.iter() { - builder.append_option(s); + if let Some(binary_view) = column.as_binary_view_opt() { + *column = Arc::new(BinaryViewArray::from_iter(binary_view.iter())); } - // Update the column with the new StringViewArray - *column = Arc::new(builder.finish()); } let options = RecordBatchOptions::new().with_row_count(Some(row_count)); diff --git a/arrow-select/src/coalesce/byte_view.rs b/arrow-select/src/coalesce/byte_view.rs index 6062cd5e77aa..1a38b05698fd 100644 --- a/arrow-select/src/coalesce/byte_view.rs +++ b/arrow-select/src/coalesce/byte_view.rs @@ -16,10 +16,11 @@ // under the License. use crate::coalesce::InProgressArray; +use crate::filter::{FilterPredicate, FilterSelection, filter_null_mask}; use arrow_array::cast::AsArray; use arrow_array::types::ByteViewType; use arrow_array::{Array, ArrayRef, GenericByteViewArray}; -use arrow_buffer::{Buffer, NullBufferBuilder}; +use arrow_buffer::{BooleanBuffer, Buffer, NullBuffer, NullBufferBuilder}; use arrow_data::{ByteView, MAX_INLINE_VIEW_LEN}; use arrow_schema::ArrowError; use std::marker::PhantomData; @@ -111,6 +112,61 @@ impl InProgressByteViewArray { self.completed.push(next_buffer.into()); } + fn append_views_by_filter(&mut self, views: &[u128], filter: &FilterPredicate) { + let selected_count = filter.count(); + let current_len = self.views.len(); + self.views.reserve(selected_count); + + let mut written = 0; + + unsafe { + let mut out = self.views.spare_capacity_mut().as_mut_ptr().cast::(); + + match filter.selection() { + FilterSelection::None => {} + FilterSelection::All { .. } => { + std::ptr::copy_nonoverlapping(views.as_ptr(), out, selected_count); + written = selected_count; + } + FilterSelection::Slices(slices) => { + slices.for_each(|(start, end)| { + let len = end - start; + std::ptr::copy_nonoverlapping(views.as_ptr().add(start), out, len); + out = out.add(len); + written += len; + }); + } + FilterSelection::Indices(indices) => { + indices.for_each(|idx| { + out.write(*views.get_unchecked(idx)); + out = out.add(1); + written += 1; + }); + } + } + + self.views.set_len(current_len + written); + } + + debug_assert_eq!(written, selected_count); + } + + fn append_nulls_by_filter( + &mut self, + filter: &FilterPredicate, + source_nulls: Option<&NullBuffer>, + ) { + let Some((null_count, nulls)) = filter_null_mask(source_nulls, filter) else { + self.nulls.append_n_non_nulls(filter.count()); + return; + }; + + let nulls = unsafe { + NullBuffer::new_unchecked(BooleanBuffer::new(nulls, 0, filter.count()), null_count) + }; + self.nulls.append_buffer(&nulls); + } + /// Append views to self.views, updating the buffer index if necessary #[inline(never)] fn append_views_and_update_buffer_index(&mut self, views: &[u128], buffers: &[Buffer]) { @@ -346,6 +402,52 @@ impl InProgressArray for InProgressByteViewArray { Ok(()) } + fn copy_rows_by_filter(&mut self, filter: &FilterPredicate) -> Result<(), ArrowError> { + self.ensure_capacity(); + let source = self.source.take().ok_or_else(|| { + ArrowError::InvalidArgumentError( + "Internal Error: InProgressByteViewArray: source not set".to_string(), + ) + })?; + + let s = source.array.as_byte_view::(); + + if !s.data_buffers().is_empty() { + // Restore the source taken above before returning the guard error. + self.source = Some(source); + return Err(ArrowError::InvalidArgumentError( + "Internal Error: InProgressByteViewArray::copy_rows_by_filter requires inline views" + .to_string(), + )); + } + + self.append_nulls_by_filter(filter, s.nulls()); + self.append_views_by_filter(s.views(), filter); + + self.source = Some(source); + Ok(()) + } + + fn copy_rows_by_filter_from( + &mut self, + source: ArrayRef, + filter: &FilterPredicate, + ) -> Result<(), ArrowError> { + let s = source.as_byte_view::(); + if s.data_buffers().is_empty() { + self.ensure_capacity(); + self.append_nulls_by_filter(filter, s.nulls()); + self.append_views_by_filter(s.views(), filter); + return Ok(()); + } + + let filtered = filter.filter(source.as_ref())?; + self.set_source(Some(filtered)); + let result = self.copy_rows(0, filter.count()); + self.set_source(None); + result + } + fn finish(&mut self) -> Result { self.finish_current(); assert!(self.current.is_none()); @@ -405,6 +507,9 @@ impl BufferSource { #[cfg(test)] mod tests { use super::*; + use crate::filter::FilterBuilder; + use arrow_array::types::BinaryViewType; + use arrow_array::{BinaryViewArray, BooleanArray}; #[test] fn test_buffer_source() { @@ -444,4 +549,23 @@ mod tests { // Can override with larger size request assert_eq!(source.next_buffer(2_000_000).capacity(), 2_000_000); } + + #[test] + fn test_copy_rows_by_filter_rejects_non_inline_views() { + let values: Vec> = vec![Some(b"This value is longer than 12 bytes")]; + let array = BinaryViewArray::from_iter(values); + assert!(!array.data_buffers().is_empty()); + + let mut in_progress = InProgressByteViewArray::::new(1); + in_progress.set_source(Some(Arc::new(array))); + + let filter = BooleanArray::from(vec![true]); + let predicate = FilterBuilder::new(&filter).build(); + let err = in_progress.copy_rows_by_filter(&predicate).unwrap_err(); + + assert!( + err.to_string().contains("requires inline views"), + "unexpected error: {err}" + ); + } } diff --git a/arrow-select/src/coalesce/generic.rs b/arrow-select/src/coalesce/generic.rs index 1ea57dff929c..4fa64273ec5c 100644 --- a/arrow-select/src/coalesce/generic.rs +++ b/arrow-select/src/coalesce/generic.rs @@ -17,6 +17,7 @@ use super::InProgressArray; use crate::concat::concat; +use crate::filter::FilterPredicate; use arrow_array::ArrayRef; use arrow_schema::ArrowError; @@ -60,6 +61,16 @@ impl InProgressArray for GenericInProgressArray { Ok(()) } + fn copy_rows_by_filter_from( + &mut self, + source: ArrayRef, + filter: &FilterPredicate, + ) -> Result<(), ArrowError> { + let array = filter.filter(source.as_ref())?; + self.buffered_arrays.push(array); + Ok(()) + } + fn finish(&mut self) -> Result { // Concatenate all buffered arrays into a single array, which uses 2x // peak memory diff --git a/arrow-select/src/coalesce/primitive.rs b/arrow-select/src/coalesce/primitive.rs index a7f2fb32ce49..1de8b52eddf1 100644 --- a/arrow-select/src/coalesce/primitive.rs +++ b/arrow-select/src/coalesce/primitive.rs @@ -16,9 +16,10 @@ // under the License. use crate::coalesce::InProgressArray; +use crate::filter::{FilterIndices, FilterPredicate, FilterSelection, filter_null_mask}; use arrow_array::cast::AsArray; use arrow_array::{Array, ArrayRef, ArrowPrimitiveType, PrimitiveArray}; -use arrow_buffer::{NullBufferBuilder, ScalarBuffer}; +use arrow_buffer::{BooleanBuffer, NullBuffer, NullBufferBuilder, ScalarBuffer}; use arrow_schema::{ArrowError, DataType}; use std::fmt::Debug; use std::sync::Arc; @@ -59,6 +60,47 @@ impl InProgressPrimitiveArray { self.current.reserve(self.batch_size); } } + + fn append_values_by_indices(&mut self, indices: FilterIndices<'_>) -> Result<(), ArrowError> { + self.ensure_capacity(); + + let s = primitive_source::(&self.source)?; + let values = s.values(); + indices.for_each(|idx| self.current.push(values[idx])); + + Ok(()) + } +} + +fn primitive_source( + source: &Option, +) -> Result<&PrimitiveArray, ArrowError> { + Ok(source + .as_ref() + .ok_or_else(|| { + ArrowError::InvalidArgumentError( + "Internal Error: InProgressPrimitiveArray: source not set".to_string(), + ) + })? + .as_primitive::()) +} + +fn append_filtered_nulls( + nulls: &mut NullBufferBuilder, + source_nulls: Option<&NullBuffer>, + filter: &FilterPredicate, +) { + if let Some((null_count, filtered_nulls)) = filter_null_mask(source_nulls, filter) { + let filtered_nulls = unsafe { + NullBuffer::new_unchecked( + BooleanBuffer::new(filtered_nulls, 0, filter.count()), + null_count, + ) + }; + nulls.append_buffer(&filtered_nulls); + } else { + nulls.append_n_non_nulls(filter.count()); + } } impl InProgressArray for InProgressPrimitiveArray { @@ -69,15 +111,7 @@ impl InProgressArray for InProgressPrimitiveArray fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError> { self.ensure_capacity(); - let s = self - .source - .as_ref() - .ok_or_else(|| { - ArrowError::InvalidArgumentError( - "Internal Error: InProgressPrimitiveArray: source not set".to_string(), - ) - })? - .as_primitive::(); + let s = primitive_source::(&self.source)?; // add nulls if necessary if let Some(nulls) = s.nulls().as_ref() { @@ -94,6 +128,19 @@ impl InProgressArray for InProgressPrimitiveArray Ok(()) } + fn copy_rows_by_filter(&mut self, filter: &FilterPredicate) -> Result<(), ArrowError> { + match filter.selection() { + FilterSelection::Indices(indices) => { + let s = primitive_source::(&self.source)?; + + append_filtered_nulls(&mut self.nulls, s.nulls(), filter); + self.append_values_by_indices(indices) + } + // Other selection shapes reuse the generic copy_rows path. + selection => self.copy_rows_by_selection(selection), + } + } + fn finish(&mut self) -> Result { // take and reset the current values and nulls let values = std::mem::take(&mut self.current); @@ -106,3 +153,44 @@ impl InProgressArray for InProgressPrimitiveArray Ok(Arc::new(array)) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::filter::FilterBuilder; + use arrow_array::types::Int32Type; + use arrow_array::{BooleanArray, Int32Array}; + + #[test] + fn test_copy_rows_by_filter_index_iterator() { + let source = + Int32Array::from_iter((0..21).map(|idx| if idx % 5 == 0 { None } else { Some(idx) })); + let filter = BooleanArray::from_iter( + (0..21).map(|idx| Some(matches!(idx, 0 | 1 | 2 | 3 | 5 | 8 | 13))), + ); + let predicate = FilterBuilder::new(&filter).build(); + let FilterSelection::Indices(indices) = predicate.selection() else { + panic!("expected index iterator selection"); + }; + let mut selected_indices = Vec::new(); + indices.for_each(|idx| selected_indices.push(idx)); + assert_eq!(selected_indices, vec![0, 1, 2, 3, 5, 8, 13]); + + let mut in_progress = InProgressPrimitiveArray::::new(7, DataType::Int32); + in_progress.set_source(Some(Arc::new(source))); + in_progress.copy_rows_by_filter(&predicate).unwrap(); + + let result = in_progress.finish().unwrap(); + let result = result.as_primitive::(); + let expected = Int32Array::from(vec![ + None, + Some(1), + Some(2), + Some(3), + None, + Some(8), + Some(13), + ]); + assert_eq!(result, &expected); + } +} diff --git a/arrow-select/src/filter.rs b/arrow-select/src/filter.rs index fcbce82d5d9d..8a73b9f3fec6 100644 --- a/arrow-select/src/filter.rs +++ b/arrow-select/src/filter.rs @@ -81,13 +81,13 @@ impl Iterator for SlicesIterator<'_> { /// /// This provides the best performance on most predicates, apart from those which keep /// large runs and therefore favour [`SlicesIterator`] -struct IndexIterator<'a> { +pub(crate) struct IndexIterator<'a> { remaining: usize, iter: BitIndexIterator<'a>, } impl<'a> IndexIterator<'a> { - fn new(filter: &'a BooleanArray, remaining: usize) -> Self { + pub(crate) fn new(filter: &'a BooleanArray, remaining: usize) -> Self { assert_eq!(filter.null_count(), 0); let iter = filter.values().set_indices(); Self { remaining, iter } @@ -366,6 +366,66 @@ impl IterationStrategy { } } +/// Borrowed description of which rows a [`FilterPredicate`] selects. +pub(crate) enum FilterSelection<'a> { + None, + All { len: usize }, + Slices(FilterSlices<'a>), + Indices(FilterIndices<'a>), +} + +pub(crate) type FilterSlices<'a> = + FilterIterator>, SlicesIterator<'a>>; + +pub(crate) type FilterIndices<'a> = + FilterIterator>, IndexIterator<'a>>; + +/// Holds either materialized rows or a lazy iterator. +/// +/// This does not implement [`Iterator`] on purpose. Callers use +/// [`Self::for_each`] or [`Self::try_for_each`] so the enum is matched once +/// before the loop, not once per row in `next`. +pub(crate) enum FilterIterator { + Materialized(M), + Lazy(I), +} + +impl FilterIterator +where + M: Iterator, + I: Iterator, +{ + pub(crate) fn for_each(self, f: F) + where + F: FnMut(M::Item), + { + match self { + Self::Materialized(iter) => iter.for_each(f), + Self::Lazy(iter) => iter.for_each(f), + } + } + + pub(crate) fn try_for_each(self, mut f: F) -> Result<(), E> + where + F: FnMut(M::Item) -> Result<(), E>, + { + match self { + Self::Materialized(iter) => { + for item in iter { + f(item)?; + } + } + Self::Lazy(iter) => { + for item in iter { + f(item)?; + } + } + } + + Ok(()) + } +} + /// A filtering predicate that can be applied to an [`Array`] #[derive(Debug)] pub struct FilterPredicate { @@ -410,6 +470,25 @@ impl FilterPredicate { self.count } + pub(crate) fn selection(&self) -> FilterSelection<'_> { + match &self.strategy { + IterationStrategy::None => FilterSelection::None, + IterationStrategy::All => FilterSelection::All { len: self.count }, + IterationStrategy::Slices(slices) => { + FilterSelection::Slices(FilterIterator::Materialized(slices.iter().copied())) + } + IterationStrategy::SlicesIterator => { + FilterSelection::Slices(FilterIterator::Lazy(SlicesIterator::new(&self.filter))) + } + IterationStrategy::Indices(indices) => { + FilterSelection::Indices(FilterIterator::Materialized(indices.iter().copied())) + } + IterationStrategy::IndexIterator => FilterSelection::Indices(FilterIterator::Lazy( + IndexIterator::new(&self.filter, self.count), + )), + } + } + /// Filters the given `nulls` buffer using this predicate. /// /// Returns `None` when there is nothing to track in the output, either @@ -575,7 +654,7 @@ where /// `Some((null_count, null_buffer))` where `null_count` is the number of nulls /// in the filtered output, and `null_buffer` is the filtered null buffer /// -fn filter_null_mask( +pub(crate) fn filter_null_mask( nulls: Option<&NullBuffer>, predicate: &FilterPredicate, ) -> Option<(usize, Buffer)> { @@ -649,7 +728,10 @@ fn filter_boolean(array: &BooleanArray, predicate: &FilterPredicate) -> BooleanA } #[inline(never)] -fn filter_native(values: &[T], predicate: &FilterPredicate) -> Buffer { +pub(crate) fn filter_native( + values: &[T], + predicate: &FilterPredicate, +) -> Buffer { assert!(values.len() >= predicate.filter.len()); match &predicate.strategy { @@ -1606,6 +1688,43 @@ mod tests { assert_eq!(filter_count, 61 + 61 + 5); } + #[test] + fn test_filter_selection_iterators() { + let slices = [(0, 2), (4, 5)]; + let mut ranges = Vec::new(); + let selection: FilterSlices<'_> = FilterIterator::Materialized(slices.iter().copied()); + selection.for_each(|range| ranges.push(range)); + assert_eq!(ranges, slices); + + let filter = BooleanArray::from(vec![true, true, false, false, true]); + let mut ranges = Vec::new(); + let selection: FilterSlices<'_> = FilterIterator::Lazy(SlicesIterator::new(&filter)); + selection + .try_for_each(|range| { + ranges.push(range); + Ok::<(), ArrowError>(()) + }) + .unwrap(); + assert_eq!(ranges, vec![(0, 2), (4, 5)]); + + let indices = [1, 3, 5]; + let mut selected = Vec::new(); + let selection: FilterIndices<'_> = FilterIterator::Materialized(indices.iter().copied()); + selection.for_each(|idx| selected.push(idx)); + assert_eq!(selected, indices); + + let filter = BooleanArray::from(vec![false, true, false, true]); + let mut selected = Vec::new(); + let selection: FilterIndices<'_> = FilterIterator::Lazy(IndexIterator::new(&filter, 2)); + selection + .try_for_each(|idx| { + selected.push(idx); + Ok::<(), ArrowError>(()) + }) + .unwrap(); + assert_eq!(selected, vec![1, 3]); + } + #[test] fn test_null_mask() { let a = Int64Array::from(vec![Some(1), Some(2), None]); From 1edbc730574177b602720a33553891ce94ad638a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 3 Jun 2026 09:50:38 -0400 Subject: [PATCH 2/2] docs(arrow-select): document the InProgressArray copy methods Add rationale and clarify the relationships between the `InProgressArray` trait's copy methods (`copy_rows`, `copy_rows_by_filter`, `copy_rows_by_filter_from`, `copy_rows_by_selection`), including which read the source set via `set_source` versus taking it directly. Co-Authored-By: Claude Opus 4.8 (1M context) --- arrow-select/src/coalesce.rs | 20 ++++++++++++++++---- arrow-select/src/coalesce/byte_view.rs | 5 +++++ 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/arrow-select/src/coalesce.rs b/arrow-select/src/coalesce.rs index e1dd8a654c78..89b54d047b12 100644 --- a/arrow-select/src/coalesce.rs +++ b/arrow-select/src/coalesce.rs @@ -661,9 +661,9 @@ fn create_in_progress_array(data_type: &DataType, batch_size: usize) -> Box Result<(), ArrowError>; /// Copy rows selected by `filter` from the current source array. + /// + /// The default implementation calls [`Self::copy_rows_by_selection`] fn copy_rows_by_filter(&mut self, filter: &FilterPredicate) -> Result<(), ArrowError> { self.copy_rows_by_selection(filter.selection()) } - /// Copy rows selected by `filter` from `source`. + /// Copy rows selected by a [`FilterPredicate`] from `source`. + /// + /// Unlike the other copy methods, the source array is passed in directly + /// rather than read from the array set by [`Self::set_source`]. + /// + /// The default implementation sets `source` via [`Self::set_source`] and + /// then calls [`Self::copy_rows_by_filter`]. fn copy_rows_by_filter_from( &mut self, source: ArrayRef, @@ -699,6 +707,10 @@ trait InProgressArray: std::fmt::Debug + Send + Sync { } /// Copy rows described by a [`FilterSelection`] from the current source array. + /// + /// You typically get a [`FilterSelection`] from [`FilterPredicate::selection`]. + /// + /// Note: The source array is set by [`Self::set_source`]. fn copy_rows_by_selection(&mut self, selection: FilterSelection<'_>) -> Result<(), ArrowError> { match selection { FilterSelection::None => Ok(()), diff --git a/arrow-select/src/coalesce/byte_view.rs b/arrow-select/src/coalesce/byte_view.rs index 1a38b05698fd..50c2c5345b32 100644 --- a/arrow-select/src/coalesce/byte_view.rs +++ b/arrow-select/src/coalesce/byte_view.rs @@ -434,6 +434,8 @@ impl InProgressArray for InProgressByteViewArray { filter: &FilterPredicate, ) -> Result<(), ArrowError> { let s = source.as_byte_view::(); + // The source views reference no external buffers, so they must all be + // inline and we can copy just the nulls and views. if s.data_buffers().is_empty() { self.ensure_capacity(); self.append_nulls_by_filter(filter, s.nulls()); @@ -441,6 +443,9 @@ impl InProgressArray for InProgressByteViewArray { return Ok(()); } + // The views reference external buffers, so materialize the + // filtered array and append it through the regular `copy_rows` path + // (which copies the referenced string data into our own buffers). let filtered = filter.filter(source.as_ref())?; self.set_source(Some(filtered)); let result = self.copy_rows(0, filter.count());