Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 39 additions & 13 deletions parquet-variant-compute/src/shred_variant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1189,19 +1189,14 @@ mod tests {

let variant_array = shred_variant(&input, &DataType::FixedSizeBinary(16)).unwrap();

// // inspect the typed_value Field and make sure it contains the canonical Uuid extension type
// let typed_value_field = variant_array
// .inner()
// .fields()
// .into_iter()
// .find(|f| f.name() == "typed_value")
// .unwrap();

// assert!(
// typed_value_field
// .try_extension_type::<extension::Uuid>()
// .is_ok()
// );
let typed_value_field = variant_array
.inner()
.fields()
.into_iter()
.find(|f| f.name() == "typed_value")
.unwrap();

assert!(typed_value_field.has_valid_extension_type::<arrow_schema::extension::Uuid>());

// probe the downcasted typed_value array to make sure uuids are shredded correctly
let uuids = variant_array
Expand All @@ -1227,6 +1222,37 @@ mod tests {
assert_eq!(got_uuid_2, mock_uuid_2.as_bytes());
}

#[test]
fn test_uuid_nested_shredding() {
let mock_uuid = Uuid::new_v4();
let input = build_variant_array(vec![VariantRow::Object(vec![(
"id",
VariantValue::from(mock_uuid),
)])]);
let target = ShreddedSchemaBuilder::default()
.with_path("id", DataType::FixedSizeBinary(16))
.unwrap()
.build();

let result = shred_variant(&input, &target).unwrap();

let typed_value = result.typed_value_field().unwrap();
let typed_struct = typed_value.as_any().downcast_ref::<StructArray>().unwrap();
let id =
ShreddedVariantFieldArray::try_new(typed_struct.column_by_name("id").unwrap()).unwrap();

// The extension type lives on the field, not the array, so assert it on the inner struct.
let leaf = id
.inner()
.fields()
.iter()
.find(|f| f.name() == "typed_value")
.unwrap();

assert_eq!(leaf.data_type(), &DataType::FixedSizeBinary(16));
assert!(leaf.has_valid_extension_type::<arrow_schema::extension::Uuid>());
}

#[test]
fn test_primitive_shredding_comprehensive() {
// Test mixed scenarios in a single array
Expand Down
93 changes: 86 additions & 7 deletions parquet-variant-compute/src/variant_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use arrow::datatypes::{
TimestampMicrosecondType, TimestampNanosecondType,
};
use arrow::error::Result;
use arrow_schema::extension::ExtensionType;
use arrow_schema::extension::{ExtensionType, Uuid as UuidExtension};
use arrow_schema::{ArrowError, DataType, Field, FieldRef, Fields, TimeUnit};
use chrono::{DateTime, NaiveTime};
use parquet_variant::{
Expand Down Expand Up @@ -327,7 +327,7 @@ impl VariantArray {
builder = builder.with_field("value", value, true);
}
if let Some(typed_value) = typed_value.clone() {
builder = builder.with_field("typed_value", typed_value, true);
builder = builder.with_field_ref(typed_value_field(&typed_value), typed_value);
}
if let Some(nulls) = nulls {
builder = builder.with_nulls(nulls);
Expand Down Expand Up @@ -713,7 +713,7 @@ impl ShreddedVariantFieldArray {
builder = builder.with_field("value", value, true);
}
if let Some(typed_value) = typed_value.clone() {
builder = builder.with_field("typed_value", typed_value, true);
builder = builder.with_field_ref(typed_value_field(&typed_value), typed_value);
}
if let Some(nulls) = nulls {
builder = builder.with_nulls(nulls);
Expand Down Expand Up @@ -868,6 +868,19 @@ impl TryFrom<&StructArray> for ShreddingState {
}
}

/// Build the `typed_value` [`FieldRef`] for a shredded column.
///
/// The Variant spec maps `FixedSizeBinary(16)` exclusively to UUID, so any

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a reference to this part of the spec?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://parquet.apache.org/docs/file-format/types/variantshredding/ says

Shredded values must use the following Parquet types:

Variant Type Parquet Physical Type Parquet Logical Type
... ... ...
uuid FIXED_LEN_BYTE_ARRAY[len=16] UUID

But I didn't see it say the other way around that any FIXED_LEN_BYTE_ARRAY was always UUID 🤔

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variant Type Parquet Physical Type Parquet Logical Type
...
decimal16 BYTE_ARRAY / FIXED_LEN_BYTE_ARRAY DECIMAL(P, S)
...
uuid FIXED_LEN_BYTE_ARRAY[len=16] UUID
...

Only these two logical Variant types can be Physically stored as FLBA in Parquet.

On the arrow side we have Decimal types, so there's no need for FixedSizedBinary in-memory representation.

For UUID we used FSB(16) with the extension type that parquet writer picks up. Nothing other than UUID can produce a Shredded FSB(16).

Given that only these two types can be physically stored as FLBA it makes little sense to allow other types, like Binary to be shredded into FSB in memory. Since their physical representation is limited by the spec and we'd have to cast back to Binary before writing.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with that assessment. AFAIK, the arrow-parquet reader never produces FLBA when reading decimal columns, regardless of how they were encoded in the actual parquet? In particular, Decimal128Array is a primitive array backed by Decimal128Type, which is physically backed by i128.

So when reading shredded variant data from parquet, any physical FLBA(16) column we encounter must have either DECIMAL or UUID logical type (anything else is an error). And when writing shredded variant to parquet, we would either see a FixedSizeBinaryArray(16) with UUID extension type (any other length or [lack of] extension type is an error), or Decimal128Array.

/// shredded column of that type must carry the canonical [`UuidExtension`]
/// extension metadata on its field.
fn typed_value_field(array: &ArrayRef) -> FieldRef {
let mut field = Field::new("typed_value", array.data_type().clone(), true);
if matches!(array.data_type(), DataType::FixedSizeBinary(16)) {
field = field.with_extension_type(UuidExtension);
}
Arc::new(field)
}

/// Builds struct arrays from component fields
///
/// TODO: move to arrow crate
Expand All @@ -891,6 +904,16 @@ impl StructArrayBuilder {
self
}

/// Add an array to this struct array using a caller-supplied [`FieldRef`].
///
/// Use this when the field carries metadata (e.g. an extension type) that
/// would be lost if the field were synthesized from the array's data type alone.
pub fn with_field_ref(mut self, field: FieldRef, array: ArrayRef) -> Self {
self.fields.push(field);
self.arrays.push(array);
self
}

/// Set the null buffer for this struct array.
pub fn with_nulls(mut self, nulls: NullBuffer) -> Self {
self.nulls = Some(nulls);
Expand Down Expand Up @@ -1202,7 +1225,19 @@ fn canonicalize_and_verify_data_type(data_type: &DataType) -> Result<Cow<'_, Dat
}

fn canonicalize_and_verify_field(field: &Arc<Field>) -> Result<Cow<'_, Arc<Field>>> {
let Cow::Owned(new_data_type) = canonicalize_and_verify_data_type(field.data_type())? else {
let new_data_type = canonicalize_and_verify_data_type(field.data_type())?;

// A shredded FixedSizeBinary(16) column is always a UUID. Tag it with the UUID extension type
// on read, as a safety net against writers that emit the column without the extension metadata.
// Canonicalization never rewrites FixedSizeBinary(16), so the type is already correct here.
if matches!(new_data_type.as_ref(), DataType::FixedSizeBinary(16))
&& !field.has_valid_extension_type::<UuidExtension>()
{
let new_field = field.as_ref().clone().with_extension_type(UuidExtension);
return Ok(Cow::Owned(Arc::new(new_field)));
}

let Cow::Owned(new_data_type) = new_data_type else {
return Ok(Cow::Borrowed(field));
};
let new_field = field.as_ref().clone().with_data_type(new_data_type);
Expand All @@ -1216,9 +1251,9 @@ mod test {

use super::*;
use arrow::array::{
BinaryArray, BinaryViewArray, Decimal32Array, Decimal64Array, Decimal128Array, Int32Array,
Int64Array, LargeBinaryArray, LargeListArray, LargeListViewArray, ListArray, ListViewArray,
Time64MicrosecondArray,
BinaryArray, BinaryViewArray, Decimal32Array, Decimal64Array, Decimal128Array,
FixedSizeBinaryArray, Int32Array, Int64Array, LargeBinaryArray, LargeListArray,
LargeListViewArray, ListArray, ListViewArray, Time64MicrosecondArray,
};
use arrow::buffer::{OffsetBuffer, ScalarBuffer};
use arrow_schema::{Field, Fields};
Expand Down Expand Up @@ -1329,6 +1364,50 @@ mod test {
.build()
}

#[test]
fn try_new_tags_untagged_uuid_on_read() {
// Simulate a foreign writer that shredded a UUID column as bare FixedSizeBinary(16),
// omitting the UUID extension type.
let typed_value = FixedSizeBinaryArray::try_from_iter(std::iter::repeat_n([0u8; 16], 2));
let input = make_variant_struct_with_typed_value(Arc::new(typed_value.unwrap()));

// try_new canonicalizes on the read path and attaches the extension.
let variant_array = VariantArray::try_new(&input).unwrap();
let typed_value = variant_array
.inner()
.fields()
.iter()
.find(|f| f.name() == "typed_value")
.unwrap();
assert_eq!(typed_value.data_type(), &DataType::FixedSizeBinary(16));
assert!(typed_value.has_valid_extension_type::<UuidExtension>());
}

#[test]
fn try_new_tags_untagged_nested_uuid_on_read() {
// A shredded object { id: { typed_value: FixedSizeBinary(16) } } whose inner UUID leaf
// carries no extension type; canonicalization must reach it recursively.
let leaf = FixedSizeBinaryArray::try_from_iter(std::iter::repeat_n([0u8; 16], 1)).unwrap();
let inner = StructArrayBuilder::new()
.with_field("typed_value", Arc::new(leaf), true)
.build();
let object = StructArrayBuilder::new()
.with_field("id", Arc::new(inner), false)
.build();
let input = make_variant_struct_with_typed_value(Arc::new(object));

// typed_value (struct) -> id (struct) -> typed_value (the FixedSizeBinary(16) UUID leaf).
let variant_array = VariantArray::try_new(&input).unwrap();
let object = variant_array.typed_value_field().unwrap().as_struct();
let id = object.column_by_name("id").unwrap().as_struct();
let uuid_leaf = id
.fields()
.iter()
.find(|f| f.name() == "typed_value")
.unwrap();
assert!(uuid_leaf.has_valid_extension_type::<UuidExtension>());
}

#[test]
fn all_null_shredding_state() {
// Verify the shredding state is AllNull
Expand Down
Loading