From 22384af54eaacb82615ac9981c713fd79777e4f0 Mon Sep 17 00:00:00 2001 From: Brijesh-Thakkar Date: Sat, 30 May 2026 12:46:54 +0530 Subject: [PATCH 1/4] fix: recompute_schema() now checks types and names for LogicalPlan::Union Previously the Union arm only validated field count (width), causing stale cached schemas when inputs were rewritten with different types or column names (e.g. after type-coercion). Fix uses qualified_field() to deep-compare qualifier + data_type + name. Fixes #22447 --- datafusion/expr/src/logical_plan/plan.rs | 97 +++++++++++++++++++++++- 1 file changed, 95 insertions(+), 2 deletions(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index cef20dcd5a4e1..95f35ab7d4eea 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -709,8 +709,20 @@ impl LogicalPlan { } LogicalPlan::Union(Union { inputs, schema }) => { let first_input_schema = inputs[0].schema(); - if schema.fields().len() == first_input_schema.fields().len() { - // If inputs are not pruned do not change schema + // Check field count AND field types AND field names/qualifiers. + // A width-only check misses cases where inputs were rewritten with + // different types or aliases (e.g. after type-coercion rewrites). + let schemas_match = schema.fields().len() == first_input_schema.fields().len() + && (0..schema.fields().len()).all(|i| { + let (q1, f1) = schema.qualified_field(i); + let (q2, f2) = first_input_schema.qualified_field(i); + q1 == q2 + && f1.data_type() == f2.data_type() + && f1.name() == f2.name() + }); + if schemas_match { + // Inputs are structurally identical to the cached schema; + // no recomputation needed. Ok(LogicalPlan::Union(Union { inputs, schema })) } else { // A note on `Union`s constructed via `try_new_by_name`: @@ -6128,4 +6140,85 @@ mod tests { Ok(()) } + + #[test] + fn test_recompute_schema_union_type_mismatch() -> Result<()> { + use arrow::datatypes::{DataType, Field, Schema}; + + let schema_i32 = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let schema_i64 = Schema::new(vec![Field::new("a", DataType::Int64, false)]); + + // Build a Union whose schema starts out as Int32 (matching its inputs). + let original = Union::try_new(vec![ + Arc::new(table_scan(Some("t1"), &schema_i32, None)?.build()?), + Arc::new(table_scan(Some("t2"), &schema_i32, None)?.build()?), + ])?; + assert_eq!( + original.schema.field(0).data_type(), + &DataType::Int32, + "sanity: starting schema is Int32" + ); + + // Simulate a rewrite pass (e.g. type-coercion) that replaced the inputs + // with Int64-typed versions while leaving the Union's cached schema stale. + // Same width, different types — this is exactly the bug scenario. + let stale = LogicalPlan::Union(Union { + inputs: vec![ + Arc::new(table_scan(Some("t1"), &schema_i64, None)?.build()?), + Arc::new(table_scan(Some("t2"), &schema_i64, None)?.build()?), + ], + schema: Arc::clone(&original.schema), + }); + + let recomputed = stale.recompute_schema()?; + + assert_eq!( + recomputed.schema().field(0).data_type(), + &DataType::Int64, + "Union schema should track the new Int64 input types after \ + recompute_schema(), but the width-only check left it stale" + ); + + Ok(()) + } + + #[test] + fn test_recompute_schema_union_name_mismatch() -> Result<()> { + use arrow::datatypes::{DataType, Field, Schema}; + + let schema_a = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let schema_b = Schema::new(vec![Field::new("b", DataType::Int32, false)]); + + // Build a Union whose schema starts out with column "a". + let original = Union::try_new(vec![ + Arc::new(table_scan(Some("t1"), &schema_a, None)?.build()?), + Arc::new(table_scan(Some("t2"), &schema_a, None)?.build()?), + ])?; + assert_eq!( + original.schema.field(0).name(), + "a", + "sanity: starting schema has column name 'a'" + ); + + // Simulate a rewrite pass that renamed the columns but left + // the cached schema stale. Same width and type, different name. + let stale = LogicalPlan::Union(Union { + inputs: vec![ + Arc::new(table_scan(Some("t1"), &schema_b, None)?.build()?), + Arc::new(table_scan(Some("t2"), &schema_b, None)?.build()?), + ], + schema: Arc::clone(&original.schema), + }); + + let recomputed = stale.recompute_schema()?; + + assert_eq!( + recomputed.schema().field(0).name(), + "b", + "Union schema should reflect the renamed column after \ + recompute_schema(), but the width-only check left it stale" + ); + + Ok(()) + } } From fc65ce7828fe36da0c0a7c4b7398c1b43bcd13e5 Mon Sep 17 00:00:00 2001 From: Brijesh-Thakkar Date: Sat, 30 May 2026 12:57:00 +0530 Subject: [PATCH 2/4] fix(expr): recompute_schema() correctly detects stale Union schema Previously the Union arm only checked field count (width), causing stale cached schemas when inputs were rewritten with different types, names, qualifiers, or nullability but the same column count. Fix: call Union::try_new(inputs) to get the authoritative recomputed schema, then compare against the cached schema via DFSchema PartialEq. This handles all field properties in one place and is future-proof. Added three unit tests covering type, name, and nullability mismatches. Fixes #22447 --- datafusion/expr/src/logical_plan/plan.rs | 85 +++++++++++++++++------- 1 file changed, 61 insertions(+), 24 deletions(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 95f35ab7d4eea..9865a284c0901 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -708,31 +708,28 @@ impl LogicalPlan { })) } LogicalPlan::Union(Union { inputs, schema }) => { - let first_input_schema = inputs[0].schema(); - // Check field count AND field types AND field names/qualifiers. - // A width-only check misses cases where inputs were rewritten with - // different types or aliases (e.g. after type-coercion rewrites). - let schemas_match = schema.fields().len() == first_input_schema.fields().len() - && (0..schema.fields().len()).all(|i| { - let (q1, f1) = schema.qualified_field(i); - let (q2, f2) = first_input_schema.qualified_field(i); - q1 == q2 - && f1.data_type() == f2.data_type() - && f1.name() == f2.name() - }); - if schemas_match { - // Inputs are structurally identical to the cached schema; - // no recomputation needed. - Ok(LogicalPlan::Union(Union { inputs, schema })) + // Recompute what the schema should be from the current inputs. + // Comparing the full recomputed schema (not just inputs[0]) correctly + // handles: field-count changes, type changes, name/alias changes, + // nullability changes, and metadata changes — all in one place. + // + // A note on `Union`s constructed via `try_new_by_name`: + // + // At this point, the schema for each input should have + // the same width. Thus, we do not need to save whether a + // `Union` was created `BY NAME`, and can safely rely on the + // `try_new` initializer to derive the new schema based on + // column positions. + let recomputed = Union::try_new(inputs)?; + if recomputed.schema == schema { + // Schema is still valid; preserve the cached schema + // (which may carry metadata the recomputed one lacks). + Ok(LogicalPlan::Union(Union { + inputs: recomputed.inputs, + schema, + })) } else { - // A note on `Union`s constructed via `try_new_by_name`: - // - // At this point, the schema for each input should have - // the same width. Thus, we do not need to save whether a - // `Union` was created `BY NAME`, and can safely rely on the - // `try_new` initializer to derive the new schema based on - // column positions. - Ok(LogicalPlan::Union(Union::try_new(inputs)?)) + Ok(LogicalPlan::Union(recomputed)) } } LogicalPlan::Distinct(distinct) => { @@ -6221,4 +6218,44 @@ mod tests { Ok(()) } + + #[test] + fn test_recompute_schema_union_nullability_mismatch() -> Result<()> { + use arrow::datatypes::{DataType, Field, Schema}; + + // nullable: false + let schema_not_null = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + // nullable: true + let schema_nullable = Schema::new(vec![Field::new("a", DataType::Int32, true)]); + + // Build Union starting with NOT NULL inputs. + let original = Union::try_new(vec![ + Arc::new(table_scan(Some("t1"), &schema_not_null, None)?.build()?), + Arc::new(table_scan(Some("t2"), &schema_not_null, None)?.build()?), + ])?; + assert!( + !original.schema.field(0).is_nullable(), + "sanity: starting schema field is NOT NULL" + ); + + // Simulate a rewrite that made the inputs nullable while leaving + // the Union's cached schema stale. + let stale = LogicalPlan::Union(Union { + inputs: vec![ + Arc::new(table_scan(Some("t1"), &schema_nullable, None)?.build()?), + Arc::new(table_scan(Some("t2"), &schema_nullable, None)?.build()?), + ], + schema: Arc::clone(&original.schema), + }); + + let recomputed = stale.recompute_schema()?; + + assert!( + recomputed.schema().field(0).is_nullable(), + "Union schema should reflect the new nullable inputs after \ + recompute_schema(), but the stale NOT NULL schema was kept" + ); + + Ok(()) + } } From a5216a74b6e13df9bc53edf6038bb1217d25b5fb Mon Sep 17 00:00:00 2001 From: Brijesh-Thakkar Date: Sun, 31 May 2026 11:44:55 +0530 Subject: [PATCH 3/4] fix(expr): recompute_schema() correctly detects stale Union schema Previously the Union arm only checked field count (width), causing stale cached schemas when inputs were rewritten with different types, names, qualifiers, or nullability but the same column count. Fix: - Fast path: structural comparison across ALL inputs (field count, type, name, qualifier, nullability) with zero allocation on the common no-change case, as suggested by the reviewer. - Slow path: Union::try_new() for structure, then HashMap::extend() semantics for schema-level and field-level metadata preservation, matching the behavior of coerce_union_schema_with_schema in type_coercion.rs. Added four unit tests covering type mismatch, name mismatch, nullability mismatch, and metadata preservation. Fixes #22447 --- datafusion/expr/src/logical_plan/plan.rs | 133 ++++++++++++++++++++--- 1 file changed, 117 insertions(+), 16 deletions(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 9865a284c0901..b4636e158571e 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -708,29 +708,76 @@ impl LogicalPlan { })) } LogicalPlan::Union(Union { inputs, schema }) => { - // Recompute what the schema should be from the current inputs. - // Comparing the full recomputed schema (not just inputs[0]) correctly - // handles: field-count changes, type changes, name/alias changes, - // nullability changes, and metadata changes — all in one place. - // - // A note on `Union`s constructed via `try_new_by_name`: + // Fast path: if all inputs structurally match the cached schema + // (field count, types, names, qualifiers, nullability) then no + // recomputation is needed and we avoid any allocation. + let schemas_match = inputs.iter().all(|input| { + let input_schema = input.schema(); + schema.fields().len() == input_schema.fields().len() + && schema.iter().zip(input_schema.iter()).all( + |((q1, f1), (q2, f2))| { + q1 == q2 + && f1.name() == f2.name() + && f1.data_type() == f2.data_type() + && f1.is_nullable() == f2.is_nullable() + }, + ) + }); + if schemas_match { + // Inputs are structurally identical to the cached schema. + return Ok(LogicalPlan::Union(Union { inputs, schema })); + } + + // Slow path: inputs changed — recompute the schema. // + // NOTE: A note on `Union`s constructed via `try_new_by_name`: // At this point, the schema for each input should have // the same width. Thus, we do not need to save whether a // `Union` was created `BY NAME`, and can safely rely on the // `try_new` initializer to derive the new schema based on // column positions. - let recomputed = Union::try_new(inputs)?; - if recomputed.schema == schema { - // Schema is still valid; preserve the cached schema - // (which may carry metadata the recomputed one lacks). - Ok(LogicalPlan::Union(Union { - inputs: recomputed.inputs, - schema, - })) - } else { - Ok(LogicalPlan::Union(recomputed)) + let mut recomputed = Union::try_new(inputs)?; + + // Metadata preservation: Union::try_new uses intersection logic + // for metadata, but we want "later takes precedence" (extend semantics) + // to match coerce_union_schema_with_schema in type_coercion.rs. + let mut merged_metadata = + recomputed.inputs[0].schema().metadata().clone(); + for input in recomputed.inputs.iter().skip(1) { + merged_metadata.extend(input.schema().metadata().clone()); } + + let mut merged_field_metadata = recomputed.inputs[0] + .schema() + .fields() + .iter() + .map(|f| f.metadata().clone()) + .collect::>(); + + for input in recomputed.inputs.iter().skip(1) { + for (field_meta, input_field) in merged_field_metadata + .iter_mut() + .zip(input.schema().fields()) + { + field_meta.extend(input_field.metadata().clone()); + } + } + + let new_fields = recomputed + .schema + .iter() + .zip(merged_field_metadata) + .map(|((qualifier, field), meta)| { + let mut field = field.as_ref().clone(); + field.set_metadata(meta); + (qualifier.cloned(), Arc::new(field)) + }) + .collect::>(); + + recomputed.schema = + Arc::new(DFSchema::new_with_metadata(new_fields, merged_metadata)?); + + Ok(LogicalPlan::Union(recomputed)) } LogicalPlan::Distinct(distinct) => { let distinct = match distinct { @@ -6258,4 +6305,58 @@ mod tests { Ok(()) } + + #[test] + fn test_recompute_schema_union_metadata_preservation() -> Result<()> { + use arrow::datatypes::{DataType, Field, Schema}; + use std::collections::HashMap; + + let mut meta1 = HashMap::new(); + meta1.insert("k1".to_string(), "v1".to_string()); + let mut meta2 = HashMap::new(); + meta2.insert("k1".to_string(), "v2".to_string()); // duplicate key, different value + meta2.insert("k2".to_string(), "v2".to_string()); + + let schema1 = Schema::new_with_metadata( + vec![Field::new("a", DataType::Int32, false)], + meta1.clone(), + ); + let schema2 = Schema::new_with_metadata( + vec![Field::new("a", DataType::Int32, false)], + meta2.clone(), + ); + + // Build a Union. Its initial schema will have intersected metadata. + let original = Union::try_new(vec![ + Arc::new(table_scan(Some("t1"), &schema1, None)?.build()?), + Arc::new(table_scan(Some("t2"), &schema2, None)?.build()?), + ])?; + + // Union::try_new uses intersection, so k1 should be missing (v1 != v2) + // and k2 should be missing (not in meta1). + assert!(original.schema.metadata().is_empty()); + + // Now simulate recompute_schema() where we want EXTEND semantics (later takes precedence). + // Our implementation of recompute_schema for Union now does this. + let stale = LogicalPlan::Union(Union { + inputs: vec![ + Arc::new(table_scan(Some("t1"), &schema1, None)?.build()?), + Arc::new(table_scan(Some("t2"), &schema2, None)?.build()?), + ], + // Use a dummy schema that forces recomputation (e.g. different name) + schema: Arc::new(DFSchema::try_from(Schema::new(vec![Field::new( + "wrong_name", + DataType::Int32, + false, + )]))?), + }); + + let recomputed = stale.recompute_schema()?; + + // Metadata should now be {k1: v2, k2: v2} because meta2 was the last input. + assert_eq!(recomputed.schema().metadata().get("k1").unwrap(), "v2"); + assert_eq!(recomputed.schema().metadata().get("k2").unwrap(), "v2"); + + Ok(()) + } } From 7dad09491026bfce64578611e41d957bc9251cc7 Mon Sep 17 00:00:00 2001 From: Brijesh-Thakkar Date: Tue, 9 Jun 2026 11:37:26 +0530 Subject: [PATCH 4/4] fix(expr): recompute_schema() for Union checks types across all inputs Address reviewer feedback: - Add Union::try_new_with_metadata() centralizing extend-semantics metadata merging so it can be reused by coerce_union_schema_with_schema - Fast path now checks names/qualifiers only against inputs[0] (since position-based Union derives names from first input only) and data_type + nullability against all inputs - Add end-to-end integration test using TreeNode transform to simulate a real optimizer rewrite pass Fixes #22447 --- datafusion/expr/src/logical_plan/plan.rs | 183 +++++++++++++++-------- 1 file changed, 118 insertions(+), 65 deletions(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 536911f83be8c..341e2d0eaf396 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -705,76 +705,36 @@ impl LogicalPlan { })) } LogicalPlan::Union(Union { inputs, schema }) => { - // Fast path: if all inputs structurally match the cached schema - // (field count, types, names, qualifiers, nullability) then no - // recomputation is needed and we avoid any allocation. - let schemas_match = inputs.iter().all(|input| { - let input_schema = input.schema(); - schema.fields().len() == input_schema.fields().len() - && schema.iter().zip(input_schema.iter()).all( - |((q1, f1), (q2, f2))| { - q1 == q2 - && f1.name() == f2.name() - && f1.data_type() == f2.data_type() - && f1.is_nullable() == f2.is_nullable() - }, - ) - }); - if schemas_match { - // Inputs are structurally identical to the cached schema. - return Ok(LogicalPlan::Union(Union { inputs, schema })); - } - - // Slow path: inputs changed — recompute the schema. + // Fast path: check structural compatibility against all inputs. // - // NOTE: A note on `Union`s constructed via `try_new_by_name`: - // At this point, the schema for each input should have - // the same width. Thus, we do not need to save whether a - // `Union` was created `BY NAME`, and can safely rely on the - // `try_new` initializer to derive the new schema based on - // column positions. - let mut recomputed = Union::try_new(inputs)?; - - // Metadata preservation: Union::try_new uses intersection logic - // for metadata, but we want "later takes precedence" (extend semantics) - // to match coerce_union_schema_with_schema in type_coercion.rs. - let mut merged_metadata = - recomputed.inputs[0].schema().metadata().clone(); - for input in recomputed.inputs.iter().skip(1) { - merged_metadata.extend(input.schema().metadata().clone()); - } + // For position-based Union (try_new), schema names come exclusively + // from inputs[0], so we check names/qualifiers only against the first + // input. Data types and nullability must match across every input. + let first_input_schema = inputs[0].schema(); + let names_match = + schema.fields().len() == first_input_schema.fields().len() + && schema.iter().zip(first_input_schema.iter()).all( + |((q1, f1), (q2, f2))| q1 == q2 && f1.name() == f2.name(), + ); - let mut merged_field_metadata = recomputed.inputs[0] - .schema() - .fields() - .iter() - .map(|f| f.metadata().clone()) - .collect::>(); + let types_match = names_match + && inputs.iter().all(|input| { + let input_schema = input.schema(); + schema.fields().len() == input_schema.fields().len() + && schema.iter().zip(input_schema.iter()).all( + |((_, f1), (_, f2))| { + f1.data_type() == f2.data_type() + && f1.is_nullable() == f2.is_nullable() + }, + ) + }); - for input in recomputed.inputs.iter().skip(1) { - for (field_meta, input_field) in merged_field_metadata - .iter_mut() - .zip(input.schema().fields()) - { - field_meta.extend(input_field.metadata().clone()); - } + if types_match { + return Ok(LogicalPlan::Union(Union { inputs, schema })); } - let new_fields = recomputed - .schema - .iter() - .zip(merged_field_metadata) - .map(|((qualifier, field), meta)| { - let mut field = field.as_ref().clone(); - field.set_metadata(meta); - (qualifier.cloned(), Arc::new(field)) - }) - .collect::>(); - - recomputed.schema = - Arc::new(DFSchema::new_with_metadata(new_fields, merged_metadata)?); - - Ok(LogicalPlan::Union(recomputed)) + // Slow path: recompute schema with metadata preservation. + Ok(LogicalPlan::Union(Union::try_new_with_metadata(inputs)?)) } LogicalPlan::Distinct(distinct) => { let distinct = match distinct { @@ -3212,6 +3172,52 @@ impl Union { Ok(Union { inputs, schema }) } + /// Constructs a new Union from inputs, deriving the schema by position + /// (like `try_new`) but preserving schema-level and field-level metadata + /// using "later takes precedence" (extend) semantics — matching the + /// behavior of `coerce_union_schema_with_schema`. + pub fn try_new_with_metadata(inputs: Vec>) -> Result { + let mut union = Self::try_new(inputs)?; + + // Merge schema-level metadata: later inputs take precedence. + let mut merged_schema_meta = union.inputs[0].schema().metadata().clone(); + for input in union.inputs.iter().skip(1) { + merged_schema_meta.extend(input.schema().metadata().clone()); + } + + // Merge field-level metadata: later inputs take precedence per field. + let mut merged_field_meta: Vec<_> = union.inputs[0] + .schema() + .fields() + .iter() + .map(|f| f.metadata().clone()) + .collect(); + for input in union.inputs.iter().skip(1) { + for (field_meta, input_field) in + merged_field_meta.iter_mut().zip(input.schema().fields()) + { + field_meta.extend(input_field.metadata().clone()); + } + } + + // Rebuild schema with merged metadata applied to each field. + let new_fields = union + .schema + .iter() + .zip(merged_field_meta) + .map(|((qualifier, field), meta)| { + let mut field = field.as_ref().clone(); + field.set_metadata(meta); + (qualifier.cloned(), Arc::new(field)) + }) + .collect::>(); + + union.schema = + Arc::new(DFSchema::new_with_metadata(new_fields, merged_schema_meta)?); + + Ok(union) + } + /// When constructing a `UNION BY NAME`, we need to wrap inputs /// in an additional `Projection` to account for absence of columns /// in input schemas or differing projection orders. @@ -6516,4 +6522,51 @@ mod tests { Ok(()) } + + #[test] + fn test_recompute_schema_union_after_input_rewrite() -> Result<()> { + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_common::tree_node::{Transformed, TreeNode}; + + // Build a Union over two Int32 table scans. + let schema_i32 = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let schema_i64 = Schema::new(vec![Field::new("a", DataType::Int64, false)]); + + let union_plan = LogicalPlan::Union(Union::try_new(vec![ + Arc::new(table_scan(Some("t1"), &schema_i32, None)?.build()?), + Arc::new(table_scan(Some("t2"), &schema_i32, None)?.build()?), + ])?); + + // Sanity check: the Union schema starts as Int32. + assert_eq!(union_plan.schema().field(0).data_type(), &DataType::Int32); + + // Simulate what an optimizer pass does: rewrite all leaf nodes + // (TableScan) to use Int64, then call recompute_schema() on the way up. + // This is the pattern used by type_coercion and optimize_projections. + let rewritten = union_plan + .transform(|plan| match plan { + LogicalPlan::TableScan(ref scan) + if scan.source.schema().field(0).data_type() == &DataType::Int32 => + { + let new_scan = + table_scan(Some(scan.table_name.table()), &schema_i64, None)? + .build()?; + Ok(Transformed::yes(new_scan)) + } + other => Ok(Transformed::no(other)), + })? + .data; + + // After tree transformation, call recompute_schema() on the Union. + // Before this fix, the width-only check would leave the schema as Int32. + let fixed = rewritten.recompute_schema()?; + + assert_eq!( + fixed.schema().field(0).data_type(), + &DataType::Int64, + "recompute_schema() must update Union schema after input types change" + ); + + Ok(()) + } }