Skip to content

[kernel-spark] Support CDC + schema tracking log in v2#6801

Open
PorridgeSwim wants to merge 3 commits into
delta-io:masterfrom
PorridgeSwim:stack/SchemaTrackingWithCDC
Open

[kernel-spark] Support CDC + schema tracking log in v2#6801
PorridgeSwim wants to merge 3 commits into
delta-io:masterfrom
PorridgeSwim:stack/SchemaTrackingWithCDC

Conversation

@PorridgeSwim
Copy link
Copy Markdown
Collaborator

@PorridgeSwim PorridgeSwim commented May 15, 2026

🥞 Stacked PR

Use this link to review incremental changes.


Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

Follow-up to the non-additive schema evolution stack: extend V2 streaming schema tracking to CDC reads so a CDC stream stops at metadata- or protocol-change commits with a barrier sentinel instead of either silently reading across the change or failing the read-compat check.

  • SparkMicroBatchStream.collectAndBuildCDCIndexedFiles: capture Protocol alongside Metadata while scanning a commit's actions, then call MetadataEvolutionHandler.getMetadataOrProtocolChangeIndexedFileIterator once the scan is done; when the commit diverges from source-init, return a singleton barrier (METADATA_CHANGE_INDEX) in place of BASE + files + END. Skip the on-commit verifyMetadataAction read-compat check when schema tracking is active — the barrier covers divergence. V1 splits this between DeltaSourceCDCSupport.filterAndIndexDeltaLogs (barrier injection) and IndexedChangeFileSeq.filterFiles (short-circuit); V2 collapses both into this single method.
  • SparkMicroBatchStream.applyPerCommitCDCAdmission: pass barrier sentinels through admission unchanged (they can only appear as element 0 of the per-commit list).
  • SparkMicroBatchStream.getFileChangesForCDC: apply metadataEvolutionHandler.stopIndexedFileIteratorAtSchemaChangeBarrier after end-boundary filtering so post-barrier commits in the same batch are truncated. V1's wrap lives in the shared DeltaSource.getFileChangesWithRateLimit; V2 places it inside the CDC-specific method because both planInputPartitions and the outer getFileChangesWithRateLimit reach the CDC iterator through here.
  • MetadataEvolutionHandler.getMergedConsecutiveMetadataChanges: include AddCDCFile in the action set the merger walks and treat any non-null CDC column as a file action that stops the merger walk. Drops the mergeActionSet parameter (always CDC_ACTION_SET now) so non-CDC and CDC analysis share the same stop semantics. Resolves the TODO([Feature Request] Implement kernel-based dsv2 delta streaming source (M2: support advanced read options) #5319) placeholder left by PR 7/7.
  • SparkMicroBatchStream.CDC_ACTION_SET: promote to public so MetadataEvolutionHandler can reuse it.

How was this patch tested?

SparkMicroBatchStreamCDCTest adds barrier-emission cases:

  • testProcessCommit_emitsBarrierAtSchemaChange: a metadata-only commit on a CDC stream with seeded tracking emits [barrier, END] from processCommitToIndexedFilesForCDC.
  • testGetFileChangesForCDC_emitsBarrierAtSchemaChange: end-to-end check that the barrier fires and the iterator truncates across commits — exercises metadata/protocol capture, barrier emission, admission passthrough, and cross-commit truncation in one path.

MetadataEvolutionHandlerTest extends the merger walk tests to cover CDC file actions stopping the walk. The unified DeltaV2SourceSchemaEvolutionCDCSuiteBase moves all evolution scenarios out of shouldFailTests into shouldPassTests so the CDC variants of the streaming schema-evolution suite now run alongside non-CDC.

Does this PR introduce any user-facing changes?

No.

@PorridgeSwim PorridgeSwim changed the title CDC + schema tracking log [kernel-spark] Support CDC + schema tracking log in v2 May 15, 2026
@PorridgeSwim PorridgeSwim marked this pull request as ready for review May 15, 2026 21:51
@PorridgeSwim PorridgeSwim force-pushed the stack/SchemaTrackingWithCDC branch 2 times, most recently from a78a4ac to f74ee6f Compare May 15, 2026 22:49
murali-db pushed a commit that referenced this pull request May 16, 2026
## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/6570/files) to
review incremental changes.
-
[stack/SparkMetadataAdapter](#6546)
[[Files changed](https://github.com/delta-io/delta/pull/6546/files)]
[MERGED]
-
[stack/RefactorMetadataTrackingLog](#6550)
[[Files changed](https://github.com/delta-io/delta/pull/6550/files)]
[MERGED]
-
[stack/RefactorDeltaSourceMetadataEvolutionSupport](#6562)
[[Files changed](https://github.com/delta-io/delta/pull/6562/files)]
[MERGED]
-
[stack/MetadataEvolutionHandler2](#6563)
[[Files changed](https://github.com/delta-io/delta/pull/6563/files)]
[MERGED]
-
[**stack/NonAdditiveSchemaEvolution2**](#6570)
[[Files changed](https://github.com/delta-io/delta/pull/6570/files)]
-
[stack/NonAdditiveSchemaEvolution3](#6697)
[[Files
changed](https://github.com/delta-io/delta/pull/6697/files/b7f6c8ebfc0882e7e2cc580f09f376be23a8d43d..dbb6246c14be1ab7f017ad9fc26455ae599ee676)]
-
[stack/consecutiveSchemaChangesMerger](#6698)
[[Files
changed](https://github.com/delta-io/delta/pull/6698/files/dbb6246c14be1ab7f017ad9fc26455ae599ee676..4bf2fa3fa828bcab0b56c4c26ca51ee9cc40b482)]
-
[stack/SchemaTrackingWithCDC](#6801)
[[Files
changed](https://github.com/delta-io/delta/pull/6801/files/4bf2fa3fa828bcab0b56c4c26ca51ee9cc40b482..a78a4ac2bc9a52605278a36b98804230258c12a2)]
- [stack/V1V2MixTest](#6759)
[[Files
changed](https://github.com/delta-io/delta/pull/6759/files/7f9b7f2724b2245ab7380908616303cf7ea95fca..e146cdc9ebb0572e8b0a928cc6dd3bfdc198d984)]

---------
#### Which Delta project/connector is this regarding?

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

PR 5/7 in the non-additive schema evolution for V2 streaming connector
stack.

Wire schema tracking into V2's analysis path so the analyzed plan
reflects the persisted (evolved) schema instead of the live snapshot
schema.

- `DeltaAnalysis.verifyDeltaSourceSchemaLocation`: extend the
duplicate-schema-location check to also visit `StreamingRelationV2`,
keyed on the V2 `Table.name`.
- `SparkTable`: open `DeltaSourceMetadataTrackingLog` once during
construction (gated on `mergeConsecutiveSchemaChanges`) and seed
`SchemaProvider` from the persisted metadata, so analysis-time
`schema()` matches what the stream will read at runtime.
- `ApplyV2ReadOptions` (renamed from `ApplyV2Streaming`): generalize the
CDC-only rebuild to also fire when `schemaTrackingLocation` arrives via
`extraOptions` on the catalog `readStream.table()` path; rebuild
`SparkTable` with merged options so the schema-log lookup actually
fires.
- `MetadataEvolutionHandler.getMetadataTrackingLogForMicroBatchStream`:
V2 port of V1's helper, reused by `SparkTable` (analysis) and
`SparkScan` (execution).

## How was this patch tested?

`SparkTableTest`, `MetadataEvolutionHandlerTest`,
`ApplyV2ReadOptionsSuite`. Unified `DeltaV2SourceSchemaEvolutionSuite`
updated.

## Does this PR introduce _any_ user-facing changes?

No.
@PorridgeSwim PorridgeSwim mentioned this pull request May 16, 2026
@PorridgeSwim PorridgeSwim force-pushed the stack/SchemaTrackingWithCDC branch from f74ee6f to 89923c7 Compare May 16, 2026 00:18
# Conflicts:
#	spark/v2/src/test/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandlerTest.java
@PorridgeSwim PorridgeSwim force-pushed the stack/SchemaTrackingWithCDC branch from 89923c7 to 4aeacfb Compare May 16, 2026 06:54
@PorridgeSwim PorridgeSwim self-assigned this May 16, 2026
murali-db pushed a commit that referenced this pull request May 16, 2026
…6697)

## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/6697/files) to
review incremental changes.
-
[stack/SparkMetadataAdapter](#6546)
[[Files changed](https://github.com/delta-io/delta/pull/6546/files)]
[MERGED]
-
[stack/RefactorMetadataTrackingLog](#6550)
[[Files changed](https://github.com/delta-io/delta/pull/6550/files)]
[MERGED]
-
[stack/RefactorDeltaSourceMetadataEvolutionSupport](#6562)
[[Files changed](https://github.com/delta-io/delta/pull/6562/files)]
[MERGED]
-
[stack/MetadataEvolutionHandler2](#6563)
[[Files changed](https://github.com/delta-io/delta/pull/6563/files)]
[MERGED]
-
[stack/NonAdditiveSchemaEvolution2](#6570)
[[Files changed](https://github.com/delta-io/delta/pull/6570/files)]
[MERGED]
-
[**stack/NonAdditiveSchemaEvolution3**](#6697)
[[Files changed](https://github.com/delta-io/delta/pull/6697/files)]
-
[stack/consecutiveSchemaChangesMerger](#6698)
[[Files
changed](https://github.com/delta-io/delta/pull/6698/files/f96643aa3cc01e7f70cc13a18b82dc27f277f11d..f612628ad931ec35c237801109f01b6fbd1379f7)]
-
[stack/SchemaTrackingWithCDC](#6801)
[[Files
changed](https://github.com/delta-io/delta/pull/6801/files/f612628ad931ec35c237801109f01b6fbd1379f7..4aeacfb120b33e9cdfe124352290b72f53f7cf89)]
- [stack/V1V2MixTest](#6759)
[[Files
changed](https://github.com/delta-io/delta/pull/6759/files/f612628ad931ec35c237801109f01b6fbd1379f7..0c818ee431ab417a4f2ffbcc609930be09d25031)]

---------
#### Which Delta project/connector is this regarding?

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

PR 6/7 in the non-additive schema evolution for V2 streaming connector
stack.

Wire `MetadataEvolutionHandler` into `SparkMicroBatchStream` and
`SparkScan` so V2 streaming reads honor non-additive schema evolution
(column rename/drop, type widening).

- `SparkMicroBatchStream`: take `metadataTrackingLog` + `metadataPath`
as constructor inputs; when a persisted entry exists, layer it onto the
freshly loaded `snapshotAtSourceInit` to derive
`readSnapshotAtSourceInit` (mirrors V1's `readSnapshotDescriptor`).
Integrate the schema-evolution barrier protocol into `latestOffset` /
`commit` / `planInputPartitions`. Skip the on-restart schema-validation
check when schema tracking is active — the schema-log evolution
exception covers it.
- `SparkScan.toMicroBatchStream`: reload latest snapshot (the
analysis-time `initialSnapshot` can be stale by stream start), open the
tracking log via
`MetadataEvolutionHandler.getMetadataTrackingLogForMicroBatchStream`
with `mergeConsecutiveSchemaChanges=false` (the merger only runs at
analysis), and pass it through with the checkpoint location.
- `SparkScan` option allow-list: move `allowSourceColumnDrop` / `Rename`
/ `TypeChange` out of the unsupported list now that they are honored.

## How was this patch tested?

`SparkMicroBatchStreamTest`, `MetadataEvolutionHandlerTest`. Unified
suites (`DeltaV2SourceSchemaEvolutionSuite`,
`TypeWideningStreamingV2SourceSuite`,
`RemoveColumnMappingStreamingReadV2Suite`) move non-merger evolution
scenarios from `shouldFailTests` to `shouldPassTests`; merger-dependent
tests remain pending until PR 7/7.

## Does this PR introduce _any_ user-facing changes?

No.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant