fix: recompute Union schema when field names or types change#22640
fix: recompute Union schema when field names or types change#22640Brijesh-Thakkar wants to merge 6 commits into
Conversation
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Updates LogicalPlan::recompute_schema for Union plans to detect stale cached schemas beyond just column count, and adds regression tests for type/name mismatches.
Changes:
- Strengthen
Unionschema equivalence check to include qualifiers, field names, and data types. - Add tests that reproduce stale
Unionschema behavior when input types or names change after rewrites.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| LogicalPlan::Union(Union { inputs, schema }) => { | ||
| let first_input_schema = inputs[0].schema(); | ||
| if schema.fields().len() == first_input_schema.fields().len() { | ||
| // If inputs are not pruned do not change schema | ||
| // Check field count AND field types AND field names/qualifiers. | ||
| // A width-only check misses cases where inputs were rewritten with | ||
| // different types or aliases (e.g. after type-coercion rewrites). | ||
| let schemas_match = schema.fields().len() == first_input_schema.fields().len() | ||
| && (0..schema.fields().len()).all(|i| { | ||
| let (q1, f1) = schema.qualified_field(i); | ||
| let (q2, f2) = first_input_schema.qualified_field(i); | ||
| q1 == q2 | ||
| && f1.data_type() == f2.data_type() | ||
| && f1.name() == f2.name() | ||
| }); |
| q1 == q2 | ||
| && f1.data_type() == f2.data_type() | ||
| && f1.name() == f2.name() |
| let schemas_match = schema.fields().len() == first_input_schema.fields().len() | ||
| && (0..schema.fields().len()).all(|i| { |
|
@comphead Heyy could you please review this PR, the issue is addressed properly and all ci tests are also passing |
| // `Union` was created `BY NAME`, and can safely rely on the | ||
| // `try_new` initializer to derive the new schema based on | ||
| // column positions. | ||
| let recomputed = Union::try_new(inputs)?; |
There was a problem hiding this comment.
Here we always pay the allocation cost it. I don't think we need to do that. Instead we can do structural comparison first which can short circuit sooner. Like:
let schemas_match = inputs.iter().all(|input| {
let input_schema = input.schema();
schema.fields().len() == input_schema.fields().len()
&& schema
.iter()
.zip(input_schema.iter())
.all(|((q1, f1), (q2, f2))| {
q1 == q2
&& f1.name() == f2.name()
&& f1.data_type() == f2.data_type()
&& f1.is_nullable() == f2.is_nullable()
})
});This way we avoid allocation if no type-coercion.
| // `Union` was created `BY NAME`, and can safely rely on the | ||
| // `try_new` initializer to derive the new schema based on | ||
| // column positions. | ||
| let recomputed = Union::try_new(inputs)?; |
There was a problem hiding this comment.
Also, after looking at Union::try_new more closely, I see that it always loses metadata.
Metadata may be useful since I see that it is preserved in coerce_union_schema_with_schema (https://github.com/apache/datafusion/blob/main/datafusion/optimizer/src/analyzer/type_coercion.rs)
/// **Schema-level metadata merging**: Later schemas take precedence for duplicate keys.
///
/// **Field-level metadata merging**: Later fields take precedence for duplicate metadata keys.
So we might need an API alternative to try_new or a change to it so that we can preserve schema-level and field-level metadata in the case of type coercion
|
@nathanb9 can u please review it and check if its perfect for merge |
…nion Previously the Union arm only validated field count (width), causing stale cached schemas when inputs were rewritten with different types or column names (e.g. after type-coercion). Fix uses qualified_field() to deep-compare qualifier + data_type + name. Fixes apache#22447
Previously the Union arm only checked field count (width), causing stale cached schemas when inputs were rewritten with different types, names, qualifiers, or nullability but the same column count. Fix: call Union::try_new(inputs) to get the authoritative recomputed schema, then compare against the cached schema via DFSchema PartialEq. This handles all field properties in one place and is future-proof. Added three unit tests covering type, name, and nullability mismatches. Fixes apache#22447
Previously the Union arm only checked field count (width), causing stale cached schemas when inputs were rewritten with different types, names, qualifiers, or nullability but the same column count. Fix: - Fast path: structural comparison across ALL inputs (field count, type, name, qualifier, nullability) with zero allocation on the common no-change case, as suggested by the reviewer. - Slow path: Union::try_new() for structure, then HashMap::extend() semantics for schema-level and field-level metadata preservation, matching the behavior of coerce_union_schema_with_schema in type_coercion.rs. Added four unit tests covering type mismatch, name mismatch, nullability mismatch, and metadata preservation. Fixes apache#22447
b819ec5 to
a5216a7
Compare
If we are checking all of these, can’t we just compare the schemas themselves? What are we saving? And if we need to compare not just the first item in the union but all… is there even anything to cache? |
|
@adriangb There are two reasons why comparing the structure of something is better than checking if two schemasre exactly the same.
Since it checks every part of the schema including information about the schema itself and the fields it will say they are different if anything is different even if it is not important. If we just checked if the schema was the same as the input schema and the input had any differences in the information we would incorrectly go down a slow path. This slow path would do a lot of work like trying to create a new Union and merging all the extra information for every part of the optimization process. The structural comparison is done on purpose so that if the structure is the same but the extra information is different it stays on the path.
The function to calculate the schema again is called for every part of the plan tree like when we change the type of something or remove parts. Most of the time the inputs to the Union nodes will not have changed. The structural comparison is simple. Does not use any extra memory. However trying to create an Union always uses extra memory even if nothing has changed. The extra work of using memory for every part of the plan tree is what the cache is trying to prevent. So to answer your question directly: what we are saving is the work of using memory for every time we calculate the schema again for any Union node that has not changed which is the common case, for every optimization pass that does not touch that specific node. |
|
@nathanb9 Please review it and if everything seems good , we can merge it |
|
@Brijesh-Thakkar please be less aggressive about pinging reviewers. Time split between writing code (especially with LLMs) and reviewing is massive. As a rule of thumb I'd say please do not ping more than once per day or so, it won't help your PR get merged faster. |
Okk , sorry for this behavior |
|
Hello Maintainers, could you please review this PR |
Jefffrey
left a comment
There was a problem hiding this comment.
could we perhaps mock an end to end example that highlights the issue this is fixing? one that fails on main but would succeed with this fix
| q1 == q2 | ||
| && f1.name() == f2.name() |
There was a problem hiding this comment.
im not sure if checking names & qualifiers here is strictly necessary; do unions require all their inputs to have matching names?
There was a problem hiding this comment.
For position-based Union (try_new), schema names come exclusively from inputs[0], so checking names/qualifiers against non-first inputs was overly strict.
Fixed: names and qualifiers are now only checked against inputs[0]; data types and nullability are checked across all inputs.
| // Slow path: inputs changed — recompute the schema. | ||
| // | ||
| // NOTE: A note on `Union`s constructed via `try_new_by_name`: | ||
| // At this point, the schema for each input should have | ||
| // the same width. Thus, we do not need to save whether a | ||
| // `Union` was created `BY NAME`, and can safely rely on the | ||
| // `try_new` initializer to derive the new schema based on | ||
| // column positions. | ||
| let mut recomputed = Union::try_new(inputs)?; | ||
|
|
||
| // Metadata preservation: Union::try_new uses intersection logic | ||
| // for metadata, but we want "later takes precedence" (extend semantics) | ||
| // to match coerce_union_schema_with_schema in type_coercion.rs. | ||
| let mut merged_metadata = | ||
| recomputed.inputs[0].schema().metadata().clone(); | ||
| for input in recomputed.inputs.iter().skip(1) { | ||
| merged_metadata.extend(input.schema().metadata().clone()); | ||
| } | ||
|
|
||
| let mut merged_field_metadata = recomputed.inputs[0] | ||
| .schema() | ||
| .fields() | ||
| .iter() | ||
| .map(|f| f.metadata().clone()) |
There was a problem hiding this comment.
it seems odd to place this logic here; maybe we could centralize it within Union and try reuse it in coerce_union_schema_with_schema?
There was a problem hiding this comment.
Moved the metadata merging logic into a new Union::try_new_with_metadata() method on the Union struct itself. The slow path in recompute_schema() now delegates to it with a single call. This method can also be reused by coerce_union_schema_with_schema in a follow-up if desired.
Address reviewer feedback: - Add Union::try_new_with_metadata() centralizing extend-semantics metadata merging so it can be reused by coerce_union_schema_with_schema - Fast path now checks names/qualifiers only against inputs[0] (since position-based Union derives names from first input only) and data_type + nullability against all inputs - Add end-to-end integration test using TreeNode transform to simulate a real optimizer rewrite pass Fixes apache#22447
Added test_recompute_schema_union_after_input_rewrite which uses TreeNode::transform to simulate what an optimizer pass does , it rewrites all TableScan leaves to use Int64 types while leaving the Union's cached schema as Int32, then calls recompute_schema() and asserts the schema updated correctly. This mirrors the exact pattern used by type_coercion and optimize_projections. |
Which issue does this PR close?
recompute_schema()is broken withLogicalPlan::Union#22447Rationale for this change
LogicalPlan::recompute_schema()contains special handling forLogicalPlan::Unionto avoid unnecessarily rebuilding the schema when inputs have not changed.However, the current implementation only checks whether the cached schema and the first input schema have the same number of fields. This can leave the cached schema stale when optimizer rewrites modify field types, names, or qualifiers without changing the schema width.
For example, after a type coercion rewrite, the input schemas may change from
Int32toInt64while preserving the same column count. In this case,recompute_schema()incorrectly considers the schema unchanged and returns the stale cached schema.What changes are included in this PR?
This PR updates the
LogicalPlan::Unionbranch inrecompute_schema()to compare schema structure rather than only schema width.The comparison now verifies:
If any of these differ from the current input schema, the
Unionschema is recomputed usingUnion::try_new().Additionally, two regression tests were added:
test_recompute_schema_union_type_mismatchtest_recompute_schema_union_name_mismatchThese tests verify that schema recomputation occurs when input field types or names change while the schema width remains unchanged.
Are these changes tested?
Yes.
Added regression tests covering:
Int32→Int64) with identical schema width.The new tests fail with the previous width-only validation logic and pass with this change.
The following test suites were also executed successfully:
cargo test -p datafusion-exprcargo test -p datafusion-optimizerAre there any user-facing changes?
No user-facing API changes.
This change fixes internal schema propagation for
LogicalPlan::Unionafter optimizer rewrites and ensures cached schemas remain consistent with rewritten inputs.