-
Notifications
You must be signed in to change notification settings - Fork 2.1k
[Delta] Add Changelog support #6794
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
14 commits
Select commit
Hold shift + click to select a range
8a0ea55
INIT
SanJSp 6172e58
Functional fixes to get port running
SanJSp 7c74bc4
Mostly linting/formatting
SanJSp 9e5176b
Add feature flag (unclean separation)
SanJSp 707bf80
Add working Batch Changelog alongside Streaming
SanJSp 2c8d056
PR self review
SanJSp b0f5080
PR feedback round 1
SanJSp 322b019
Self review 2
SanJSp d115a87
Simplyfy comments
SanJSp dae7108
Add cross-Spark shims for Auto-CDF
SanJSp 7b4498b
Fix test
SanJSp 3a70117
fix test
SanJSp 3a7bc8e
Merge branch 'master' into changelog-pr
SanJSp 28a26a5
fix test
SanJSp File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
33 changes: 33 additions & 0 deletions
33
.../src/main/scala-shims/spark-4.0/org/apache/spark/sql/delta/catalog/ChangelogSupport.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,33 @@ | ||
| /* | ||
| * Copyright (2025) The Delta Lake Project Authors. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.delta.catalog | ||
|
|
||
| import org.apache.spark.sql.connector.catalog.TableCatalog | ||
|
|
||
| /** | ||
| * No-op shim of `ChangelogSupport` for Spark 4.0. | ||
| * | ||
| * <p>The catalog-driven `TableCatalog.loadChangelog` entrypoint and its supporting types | ||
| * (`Changelog`, `ChangelogInfo`, `ChangelogRange`) were introduced in Spark 4.2 via | ||
| * SPARK-56685. They do not exist in Spark 4.0/4.1, so the Auto-CDF wiring is compiled in only | ||
| * when building against Spark 4.2 (see `scala-shims/spark-4.2/...ChangelogSupport.scala`). | ||
| * | ||
| * <p>In 4.0/4.1 builds, mixing this empty trait into `DeltaCatalog` is a no-op: there is no | ||
| * `loadChangelog` to override, and downstream Auto-CDF classes (`DeltaChangelog`, etc.) live in | ||
| * version-specific `java-shims/spark-4.2/` dirs and are not present here either. | ||
| */ | ||
| trait ChangelogSupport extends TableCatalog | ||
33 changes: 33 additions & 0 deletions
33
.../src/main/scala-shims/spark-4.1/org/apache/spark/sql/delta/catalog/ChangelogSupport.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,33 @@ | ||
| /* | ||
| * Copyright (2025) The Delta Lake Project Authors. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.delta.catalog | ||
|
|
||
| import org.apache.spark.sql.connector.catalog.TableCatalog | ||
|
|
||
| /** | ||
| * No-op shim of `ChangelogSupport` for Spark 4.1. | ||
| * | ||
| * <p>The catalog-driven `TableCatalog.loadChangelog` entrypoint and its supporting types | ||
| * (`Changelog`, `ChangelogInfo`, `ChangelogRange`) were introduced in Spark 4.2 via | ||
| * SPARK-56685. They do not exist in Spark 4.0/4.1, so the Auto-CDF wiring is compiled in only | ||
| * when building against Spark 4.2 (see `scala-shims/spark-4.2/...ChangelogSupport.scala`). | ||
| * | ||
| * <p>In 4.0/4.1 builds, mixing this empty trait into `DeltaCatalog` is a no-op: there is no | ||
| * `loadChangelog` to override, and downstream Auto-CDF classes (`DeltaChangelog`, etc.) live in | ||
| * version-specific `java-shims/spark-4.2/` dirs and are not present here either. | ||
| */ | ||
| trait ChangelogSupport extends TableCatalog |
132 changes: 132 additions & 0 deletions
132
.../src/main/scala-shims/spark-4.2/org/apache/spark/sql/delta/catalog/ChangelogSupport.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,132 @@ | ||
| /* | ||
| * Copyright (2025) The Delta Lake Project Authors. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.delta.catalog | ||
|
|
||
| import io.delta.spark.internal.v2.catalog.SparkTable | ||
| import io.delta.spark.internal.v2.read.changelog.DeltaChangelog | ||
|
|
||
| import org.apache.spark.sql.SparkSession | ||
| import org.apache.spark.sql.connector.catalog.{Changelog, ChangelogInfo, Identifier, TableCatalog} | ||
| import org.apache.spark.sql.connector.catalog.ChangelogRange.{TimestampRange, UnboundedRange, VersionRange} | ||
| import org.apache.spark.sql.delta.DeltaErrors | ||
| import org.apache.spark.sql.delta.sources.DeltaSQLConf | ||
|
|
||
| /** | ||
| * Mixed into a [[TableCatalog]] implementation to add Auto-CDF support. Provides the | ||
| * catalog-driven `TableCatalog.loadChangelog` entrypoint introduced by SPARK-56685. | ||
| * | ||
| * <p>This trait extends [[TableCatalog]] as a dependency marker: every concrete catalog that | ||
| * mixes this trait in must already be a `TableCatalog`. The trait itself does not provide a | ||
| * `TableCatalog` implementation. | ||
| * | ||
| * <p>The trait is intentionally thin. `loadChangelog` resolves the table via the catalog's own | ||
| * `loadTable`, validates that the result is a V2 [[SparkTable]] (read-time CDF only flows | ||
| * through the V2 connector. V1 tables go through the legacy Delta CDF path), resolves the | ||
| * requested [[ChangelogRange]] against the table's snapshot manager, and wraps everything into | ||
| * a [[DeltaChangelog]]. All connector-level work (loading snapshots, validating row tracking, | ||
| * inspecting metadata actions) is deferred to the read path inside [[DeltaChangelog]]. | ||
| * | ||
| * <p>The whole entry point is gated by [[DeltaSQLConf.DELTA_CHANGELOG_V2_ENABLED]] (default | ||
| * `false`). When the flag is off the trait delegates to the parent `loadChangelog` default, | ||
| * which surfaces `UNSUPPORTED_FEATURE.CHANGE_DATA_CAPTURE`. | ||
| */ | ||
| trait ChangelogSupport extends TableCatalog { | ||
|
|
||
| override def loadChangelog(ident: Identifier, changelogInfo: ChangelogInfo): Changelog = { | ||
| val spark = SparkSession.active | ||
| if (!spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_CHANGELOG_V2_ENABLED)) { | ||
| // Feature gated off: fall back to the parent's default, which surfaces | ||
| // UNSUPPORTED_FEATURE.CHANGE_DATA_CAPTURE to the user. | ||
| return super.loadChangelog(ident, changelogInfo) | ||
| } | ||
| val sparkTable = loadTable(ident) match { | ||
| case st: SparkTable => st | ||
| case other => | ||
| // Auto-CDF only supports the V2 connector. V1 Delta tables (DeltaTableV2 under the | ||
| // hood) keep going through the legacy CDF path that DeltaCatalog already exposes. | ||
| DeltaErrors.throwChangelogRequiresV2Table(ident.toString, other.getClass.getName) | ||
| } | ||
| val (startVersion, endVersion) = resolveRange(sparkTable, changelogInfo.range()) | ||
| new DeltaChangelog(ident.name(), sparkTable, startVersion, endVersion) | ||
| } | ||
|
|
||
| /** | ||
| * Resolves a [[ChangelogRange]] against the snapshot manager owned by the resolved table. | ||
| * | ||
| * <p>Returned bounds have inclusivity already applied (exclusive start adds 1, exclusive end | ||
| * subtracts 1) and are validated. `UnboundedRange` is rejected on batch reads. | ||
| */ | ||
| private def resolveRange( | ||
| sparkTable: SparkTable, | ||
| range: org.apache.spark.sql.connector.catalog.ChangelogRange): (Long, Long) = { | ||
| val snapshotManager = sparkTable.getSnapshotManager | ||
| val latestVersion = snapshotManager.loadLatestSnapshot().getVersion | ||
| range match { | ||
| case vr: VersionRange => | ||
| val rawStart = vr.startingVersion().toLong | ||
| val rawEnd: Long = | ||
| if (vr.endingVersion().isPresent) vr.endingVersion().get.toLong else latestVersion | ||
| adjustBounds( | ||
| rawStart, rawEnd, vr.startingBoundInclusive(), vr.endingBoundInclusive(), latestVersion) | ||
| case tr: TimestampRange => | ||
| // TimestampRange carries Catalyst micros. The kernel API takes millis. | ||
| val rawStart = snapshotManager | ||
| .getActiveCommitAtTime( | ||
| tr.startingTimestamp / 1000, | ||
| /* canReturnLastCommit */ false, | ||
| /* mustBeRecreatable */ true, | ||
| /* canReturnEarliestCommit */ false) | ||
| .getVersion | ||
| val rawEnd: Long = if (tr.endingTimestamp.isPresent) { | ||
| snapshotManager | ||
| .getActiveCommitAtTime( | ||
| tr.endingTimestamp.get / 1000, | ||
| /* canReturnLastCommit */ true, | ||
| /* mustBeRecreatable */ true, | ||
| /* canReturnEarliestCommit */ false) | ||
| .getVersion | ||
| } else { | ||
| latestVersion | ||
| } | ||
| adjustBounds( | ||
| rawStart, rawEnd, tr.startingBoundInclusive(), tr.endingBoundInclusive(), latestVersion) | ||
| case _: UnboundedRange => | ||
| DeltaErrors.throwChangelogUnboundedRange() | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Apply per-bound inclusivity (`+1` / `-1`) and verify the resulting range is non-empty and | ||
| * within the table's commit history. | ||
| */ | ||
| private def adjustBounds( | ||
| start: Long, | ||
| end: Long, | ||
| startInclusive: Boolean, | ||
| endInclusive: Boolean, | ||
| latest: Long): (Long, Long) = { | ||
| val adjustedStart = if (startInclusive) start else start + 1 | ||
| val adjustedEnd = if (endInclusive) end else end - 1 | ||
| if (adjustedStart > adjustedEnd) { | ||
| throw DeltaErrors.endBeforeStartVersionInCDC(adjustedStart, adjustedEnd) | ||
| } | ||
| if (adjustedStart > latest) { | ||
| throw DeltaErrors.startVersionAfterLatestVersion(adjustedStart, latest) | ||
| } | ||
| (adjustedStart, adjustedEnd) | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: read-time CDF instead of Auto-CDF