Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions datafusion/common/src/hash_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,16 @@ macro_rules! hash_float_value {
($(($t:ty, $i:ty)),+) => {
$(impl HashValue for $t {
fn hash_one(&self, state: &RandomState) -> u64 {
state.hash_one(<$i>::from_ne_bytes(self.to_ne_bytes()))
// +0.0 and -0.0 differ only in the sign bit but compare equal
// under IEEE 754; normalize -0.0 → +0.0 so Hash agrees with Eq.
let bits = <$i>::from_ne_bytes(self.to_ne_bytes());
let bits = if bits << 1 == 0 { 0 } else { bits };
state.hash_one(bits)
}
fn hash_write(&self, hasher: &mut impl Hasher) {
hasher.write(&self.to_ne_bytes())
let bits = <$i>::from_ne_bytes(self.to_ne_bytes());
let bits: $i = if bits << 1 == 0 { 0 } else { bits };
hasher.write(&bits.to_ne_bytes())
}
})+
};
Expand Down
58 changes: 58 additions & 0 deletions datafusion/common/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1386,6 +1386,64 @@ fn fsl_values_row_number(list_size: i32, array_len: usize) -> Result<Int32Array>
Ok(PrimitiveArray::new(rows_number.into(), None))
}

/// Replace `-0.0` with `+0.0` in any `Float16`, `Float32`, or `Float64` array.
/// For non-float arrays returns the input unchanged. NaN payloads are
/// preserved.
///
/// Arrow's comparison kernels (`arrow::compute::kernels::cmp::eq` etc.) and
/// row-encoding (`arrow::row::RowConverter`) use IEEE 754 totalOrder
/// semantics, which treats `-0.0` and `+0.0` as distinct. SQL semantics
/// (PostgreSQL / IEEE 754 equality) require them to compare equal, so
/// callers normalize before invoking those kernels.
pub fn normalize_float_zero(array: &ArrayRef) -> ArrayRef {
use arrow::array::{Float16Array, Float32Array, Float64Array};
use arrow::datatypes::{Float16Type, Float32Type, Float64Type};
match array.data_type() {
DataType::Float32 => {
let arr: &Float32Array = array.as_primitive::<Float32Type>();
let normalized: Float32Array =
arr.unary(|v| if v.to_bits() << 1 == 0 { 0.0_f32 } else { v });
Arc::new(normalized)
}
DataType::Float64 => {
let arr: &Float64Array = array.as_primitive::<Float64Type>();
let normalized: Float64Array =
arr.unary(|v| if v.to_bits() << 1 == 0 { 0.0_f64 } else { v });
Arc::new(normalized)
}
DataType::Float16 => {
let arr: &Float16Array = array.as_primitive::<Float16Type>();
let normalized: Float16Array = arr.unary(|v| {
if v.to_bits() << 1 == 0 {
half::f16::from_bits(0)
} else {
v
}
});
Arc::new(normalized)
}
_ => Arc::clone(array),
}
}

/// Replace `-0.0` with `+0.0` in `Float16`, `Float32`, or `Float64` scalar
/// values. Other variants are returned unchanged. See [`normalize_float_zero`]
/// for context.
pub fn normalize_float_zero_scalar(scalar: ScalarValue) -> ScalarValue {
match scalar {
ScalarValue::Float32(Some(v)) if v.to_bits() << 1 == 0 => {
ScalarValue::Float32(Some(0.0))
}
ScalarValue::Float64(Some(v)) if v.to_bits() << 1 == 0 => {
ScalarValue::Float64(Some(0.0))
}
ScalarValue::Float16(Some(v)) if v.to_bits() << 1 == 0 => {
ScalarValue::Float16(Some(half::f16::from_bits(0)))
}
other => other,
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down
15 changes: 10 additions & 5 deletions datafusion/functions-nested/src/except.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use arrow::buffer::{NullBuffer, OffsetBuffer};
use arrow::compute::take;
use arrow::datatypes::{DataType, FieldRef};
use arrow::row::{RowConverter, SortField};
use datafusion_common::utils::{ListCoercion, take_function_args};
use datafusion_common::utils::{ListCoercion, normalize_float_zero, take_function_args};
use datafusion_common::{HashSet, Result, internal_err};
use datafusion_expr::{
ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature,
Expand Down Expand Up @@ -169,16 +169,21 @@ fn general_except<OffsetSize: OffsetSizeTrait>(
) -> Result<GenericListArray<OffsetSize>> {
let converter = RowConverter::new(vec![SortField::new(l.value_type())])?;

// Normalize -0.0 → +0.0 so RowConverter (IEEE 754 totalOrder) groups
// ±0 together for both the rhs lookup set and the lhs probe.
let l_values_norm = normalize_float_zero(l.values());
let r_values_norm = normalize_float_zero(r.values());

// Only convert the visible portion of the values array. For sliced
// ListArrays, values() returns the full underlying array but only
// elements between the first and last offset are referenced.
let l_first = l.offsets()[0].as_usize();
let l_len = l.offsets()[l.len()].as_usize() - l_first;
let l_values = converter.convert_columns(&[l.values().slice(l_first, l_len)])?;
let l_values = converter.convert_columns(&[l_values_norm.slice(l_first, l_len)])?;

let r_first = r.offsets()[0].as_usize();
let r_len = r.offsets()[r.len()].as_usize() - r_first;
let r_values = converter.convert_columns(&[r.values().slice(r_first, r_len)])?;
let r_values = converter.convert_columns(&[r_values_norm.slice(r_first, r_len)])?;

let mut offsets = Vec::<OffsetSize>::with_capacity(l.len() + 1);
offsets.push(OffsetSize::usize_as(0));
Expand Down Expand Up @@ -223,11 +228,11 @@ fn general_except<OffsetSize: OffsetSizeTrait>(
} else if OffsetSize::IS_LARGE {
let indices =
UInt64Array::from(indices.into_iter().map(|i| i as u64).collect::<Vec<_>>());
take(l.values().as_ref(), &indices, None)?
take(l_values_norm.as_ref(), &indices, None)?
} else {
let indices =
UInt32Array::from(indices.into_iter().map(|i| i as u32).collect::<Vec<_>>());
take(l.values().as_ref(), &indices, None)?
take(l_values_norm.as_ref(), &indices, None)?
};

Ok(GenericListArray::<OffsetSize>::new(
Expand Down
36 changes: 24 additions & 12 deletions datafusion/functions-nested/src/set_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use arrow::datatypes::DataType::{LargeList, List, Null};
use arrow::datatypes::{DataType, Field, FieldRef};
use arrow::row::{RowConverter, SortField};
use datafusion_common::cast::{as_large_list_array, as_list_array};
use datafusion_common::utils::ListCoercion;
use datafusion_common::utils::{ListCoercion, normalize_float_zero};
use datafusion_common::{
Result, assert_eq_or_internal_err, exec_err, internal_err, utils::take_function_args,
};
Expand Down Expand Up @@ -351,21 +351,28 @@ fn generic_set_lists<OffsetSize: OffsetSizeTrait>(

let converter = RowConverter::new(vec![SortField::new(l.value_type())])?;

// Normalize -0.0 → +0.0 so RowConverter (which uses IEEE 754 totalOrder
// and treats ±0 as distinct) groups them together. Use the normalized
// arrays for both row conversion and the final output values.
let l_values_norm = normalize_float_zero(l.values());
let r_values_norm = normalize_float_zero(r.values());

// Only convert the visible portion of the values array. For sliced
// ListArrays, values() returns the full underlying array but only
// elements between the first and last offset are referenced.
let l_first = l.offsets()[0].as_usize();
let l_len = l.offsets()[l.len()].as_usize() - l_first;
let rows_l = converter.convert_columns(&[l.values().slice(l_first, l_len)])?;
let l_values = l_values_norm.slice(l_first, l_len);
let rows_l = converter.convert_columns(&[Arc::clone(&l_values)])?;

let r_first = r.offsets()[0].as_usize();
let r_len = r.offsets()[r.len()].as_usize() - r_first;
let rows_r = converter.convert_columns(&[r.values().slice(r_first, r_len)])?;
let r_values = r_values_norm.slice(r_first, r_len);
let rows_r = converter.convert_columns(&[Arc::clone(&r_values)])?;

// Combine the *sliced* value arrays so 0-based indices from the row
// converter map directly into the concatenated array.
let l_values = l.values().slice(l_first, l_len);
let r_values = r.values().slice(r_first, r_len);
// Indices from the row converter are 0-based in the per-side slice;
// concatenating those same slices lets indices map directly into the
// combined values array.
let combined_values = concat(&[l_values.as_ref(), r_values.as_ref()])?;
let r_offset = l_len;

Expand Down Expand Up @@ -558,13 +565,18 @@ fn general_array_distinct<OffsetSize: OffsetSizeTrait>(

let converter = RowConverter::new(vec![SortField::new(dt.clone())])?;

// Normalize -0.0 → +0.0 so RowConverter (which uses IEEE 754 totalOrder
// and treats ±0 as distinct) groups them together, and so the output
// carries the canonical sign.
let values_norm = normalize_float_zero(array.values());

// Only convert the visible portion of the values array. For sliced
// ListArrays, values() returns the full underlying array but only
// elements between the first and last offset are referenced.
let first_offset = value_offsets[0].as_usize();
let visible_len = value_offsets[array.len()].as_usize() - first_offset;
let rows =
converter.convert_columns(&[array.values().slice(first_offset, visible_len)])?;
converter.convert_columns(&[values_norm.slice(first_offset, visible_len)])?;

let mut indices: Vec<usize> = Vec::with_capacity(rows.num_rows());
let mut seen = HashSet::new();
Expand Down Expand Up @@ -593,19 +605,19 @@ fn general_array_distinct<OffsetSize: OffsetSizeTrait>(
}

// Gather distinct values in a single pass, using the computed `indices`.
// Indices are absolute positions in array.values() (first_offset was added
// back when collecting them), so we can take directly from the full values.
// Indices are absolute positions in the (normalized) values array, so we
// can take directly from the full values.
// Use UInt64Array for LargeList to support values arrays exceeding u32::MAX.
let final_values = if indices.is_empty() {
new_empty_array(&dt)
} else if OffsetSize::IS_LARGE {
let indices =
UInt64Array::from(indices.into_iter().map(|i| i as u64).collect::<Vec<_>>());
take(array.values().as_ref(), &indices, None)?
take(values_norm.as_ref(), &indices, None)?
} else {
let indices =
UInt32Array::from(indices.into_iter().map(|i| i as u32).collect::<Vec<_>>());
take(array.values().as_ref(), &indices, None)?
take(values_norm.as_ref(), &indices, None)?
};

Ok(Arc::new(GenericListArray::<OffsetSize>::try_new(
Expand Down
18 changes: 17 additions & 1 deletion datafusion/physical-expr-common/src/datum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use arrow::compute::kernels::cmp::{
};
use arrow::compute::{SortOptions, ilike, like, nilike, nlike};
use arrow::error::ArrowError;
use datafusion_common::utils::{normalize_float_zero, normalize_float_zero_scalar};
use datafusion_common::{Result, ScalarValue};
use datafusion_common::{arrow_datafusion_err, assert_or_internal_err, internal_err};
use datafusion_expr_common::columnar_value::ColumnarValue;
Expand Down Expand Up @@ -84,7 +85,22 @@ pub fn apply_cmp(
}
};

apply(lhs, rhs, |l, r| Ok(Arc::new(f(l, r)?)))
// Arrow's comparison kernels use IEEE 754 totalOrder semantics for
// floats, which treats `-0.0` and `+0.0` as distinct. Normalize float
// operands so SQL semantics (`+0.0 == -0.0`) hold. No-op for
// non-float types.
let lhs = normalize_cmp_input(lhs);
let rhs = normalize_cmp_input(rhs);
apply(&lhs, &rhs, |l, r| Ok(Arc::new(f(l, r)?)))
}
}

fn normalize_cmp_input(cv: &ColumnarValue) -> ColumnarValue {
match cv {
ColumnarValue::Array(a) => ColumnarValue::Array(normalize_float_zero(a)),
ColumnarValue::Scalar(s) => {
ColumnarValue::Scalar(normalize_float_zero_scalar(s.clone()))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::aggregates::group_values::HashValue;
use crate::aggregates::group_values::multi_group_by::{
GroupColumn, Nulls, nulls_equal_to,
};
Expand Down Expand Up @@ -51,6 +52,7 @@ pub struct PrimitiveGroupValueBuilder<T: ArrowPrimitiveType, const NULLABLE: boo
impl<T, const NULLABLE: bool> PrimitiveGroupValueBuilder<T, NULLABLE>
where
T: ArrowPrimitiveType,
T::Native: HashValue,
{
/// Create a new `PrimitiveGroupValueBuilder`
pub fn new(data_type: DataType) -> Self {
Expand Down Expand Up @@ -91,7 +93,9 @@ where
} else {
unsafe { *array_values.get_unchecked(rhs_row) }
};
if left.is_eq(right) {
// `left` was already canonicalized on append; canonicalize the
// input so ±0 (and any future equivalence class) compares equal.
if left.is_eq(right.canonicalize()) {
cmp_buf[i / 8] |= 1 << (i % 8);
}
}
Expand Down Expand Up @@ -133,7 +137,7 @@ where
continue;
}

if !self.group_values[lhs_row].is_eq(array.value(rhs_row)) {
if !self.group_values[lhs_row].is_eq(array.value(rhs_row).canonicalize()) {
equal_to_results.set_bit(idx, false);
}
}
Expand All @@ -142,6 +146,8 @@ where

impl<T: ArrowPrimitiveType, const NULLABLE: bool> GroupColumn
for PrimitiveGroupValueBuilder<T, NULLABLE>
where
T::Native: HashValue,
{
fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool {
// Perf: skip null check (by short circuit) if input is not nullable
Expand All @@ -154,7 +160,8 @@ impl<T: ArrowPrimitiveType, const NULLABLE: bool> GroupColumn
// Otherwise, we need to check their values
}

self.group_values[lhs_row].is_eq(array.as_primitive::<T>().value(rhs_row))
self.group_values[lhs_row]
.is_eq(array.as_primitive::<T>().value(rhs_row).canonicalize())
}

fn append_val(&mut self, array: &ArrayRef, row: usize) -> Result<()> {
Expand All @@ -165,10 +172,12 @@ impl<T: ArrowPrimitiveType, const NULLABLE: bool> GroupColumn
self.group_values.push(T::default_value());
} else {
self.nulls.append(false);
self.group_values.push(array.as_primitive::<T>().value(row));
self.group_values
.push(array.as_primitive::<T>().value(row).canonicalize());
}
} else {
self.group_values.push(array.as_primitive::<T>().value(row));
self.group_values
.push(array.as_primitive::<T>().value(row).canonicalize());
}

Ok(())
Expand Down Expand Up @@ -214,15 +223,15 @@ impl<T: ArrowPrimitiveType, const NULLABLE: bool> GroupColumn
self.group_values.push(T::default_value());
} else {
self.nulls.append(false);
self.group_values.push(arr.value(row));
self.group_values.push(arr.value(row).canonicalize());
}
}
}

(true, Nulls::None) => {
self.nulls.append_n(rows.len(), false);
for &row in rows {
self.group_values.push(arr.value(row));
self.group_values.push(arr.value(row).canonicalize());
}
}

Expand All @@ -234,7 +243,7 @@ impl<T: ArrowPrimitiveType, const NULLABLE: bool> GroupColumn

(false, _) => {
for &row in rows {
self.group_values.push(arr.value(row));
self.group_values.push(arr.value(row).canonicalize());
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions datafusion/physical-plan/src/aggregates/group_values/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use arrow::row::{RowConverter, Rows, SortField};
use datafusion_common::Result;
use datafusion_common::hash_utils::RandomState;
use datafusion_common::hash_utils::create_hashes;
use datafusion_common::utils::normalize_float_zero;
use datafusion_execution::memory_pool::proxy::{HashTableAllocExt, VecAllocExt};
use datafusion_expr::EmitTo;
use hashbrown::hash_table::HashTable;
Expand Down Expand Up @@ -116,6 +117,13 @@ impl GroupValuesRows {

impl GroupValues for GroupValuesRows {
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()> {
// Normalize -0.0 → +0.0 so RowConverter (IEEE 754 totalOrder) and
// primitive hashing both group ±0 together. No-op for non-float
// columns.
let normalized_cols: Vec<ArrayRef> =
cols.iter().map(normalize_float_zero).collect();
let cols = normalized_cols.as_slice();

// Convert the group keys into the row format
let group_rows = &mut self.rows_buffer;
group_rows.clear();
Expand Down
Loading
Loading