Fix V2 streaming row metadata reads#6762
Conversation
johanl-db
left a comment
There was a problem hiding this comment.
High-level comment:
This change doesn't fit with how metadata columns are supposed to work in Spark.
There shouldn't be a need for manually propagating the metadata column, the flow is:
- DSv2 conector implements trait
SupportsMetadataColumnand surfaces the metadata columns it exposes viametadataColumns - When resolving a relation, Spark appends the metadata columns that the connector exposes to its output schema.
- The plan is analyzed, unused metadata columns or individual metadata fields are pruned away.
- Spark builds a scan, the read schema contains the metadata columns/fields (e.g. user selected _metadata.row_id) that are effectively used. The connector advertised that it could surface that metadata, it now fills the value in the batches it returns to Spark.
The metadata fields should be part of the read schema, without the need to manually add them here. It's Spark responsibility to do this in a generic manner.
Somehow for streaming, the metadata column got lost along the way. This is what needs to be fixed, and not manually adding it back after the fact.
Also, using options as a way to pass information is not robust. There are very few cases where options are the right way (even though we tend to overuse them). Better to be explicit and use dedicated variable with a proper type.
|
+1, can we dig more why the _metadata column is not resolved vs depending on a read option. |
Which Delta project/connector is this regarding?
Description
Managed tables in the UC test setup have row tracking enabled, so
_metadata.row_idis a valid metadata column. Batch reads already work:The same projection in V2 streaming analyzed successfully, but failed at execution with
ArrayIndexOutOfBoundsException. The streaming scan was planned with_metadatain the output, but the reader factory received a read data schema without_metadata, so it skipped the existing row-tracking metadata path and produced one fewer column than Spark expected.This kind of query works after this PR:
This PR carries the requested
_metadataschema from the analyzed V2 streaming relation intoSparkScanBuilderusing an internal option.SparkScanBuilderadds_metadataback to the read data schema and strips the option before constructing the scan. That lets streaming use the same row-tracking metadata reader path as batch.How was this patch tested?
Added tests for the streaming metadata schema handoff and for a managed-table streaming read of
_metadata.row_id.Does this PR introduce any user-facing changes?
Yes. V2 streaming reads that select
_metadata.row_idnow complete instead of crashing during execution.