Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
*
* <p>See {@link DeltaV2Mode} for V1 vs V2 connector definitions and enable mode configuration.</p>
*/
public class DeltaCatalog extends AbstractDeltaCatalog {
public class DeltaCatalog extends AbstractDeltaCatalog implements ChangelogSupport {

/**
* Loads a Delta table that is registered in the catalog.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.catalog

import org.apache.spark.sql.connector.catalog.TableCatalog

/**
* No-op shim of `ChangelogSupport` for Spark 4.0.
*
* <p>The catalog-driven `TableCatalog.loadChangelog` entrypoint and its supporting types
* (`Changelog`, `ChangelogInfo`, `ChangelogRange`) were introduced in Spark 4.2 via
* SPARK-56685. They do not exist in Spark 4.0/4.1, so the Auto-CDF wiring is compiled in only
* when building against Spark 4.2 (see `scala-shims/spark-4.2/...ChangelogSupport.scala`).
*
* <p>In 4.0/4.1 builds, mixing this empty trait into `DeltaCatalog` is a no-op: there is no
* `loadChangelog` to override, and downstream Auto-CDF classes (`DeltaChangelog`, etc.) live in
* version-specific `java-shims/spark-4.2/` dirs and are not present here either.
*/
trait ChangelogSupport extends TableCatalog
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.catalog

import org.apache.spark.sql.connector.catalog.TableCatalog

/**
* No-op shim of `ChangelogSupport` for Spark 4.1.
*
* <p>The catalog-driven `TableCatalog.loadChangelog` entrypoint and its supporting types
* (`Changelog`, `ChangelogInfo`, `ChangelogRange`) were introduced in Spark 4.2 via
* SPARK-56685. They do not exist in Spark 4.0/4.1, so the Auto-CDF wiring is compiled in only
* when building against Spark 4.2 (see `scala-shims/spark-4.2/...ChangelogSupport.scala`).
*
* <p>In 4.0/4.1 builds, mixing this empty trait into `DeltaCatalog` is a no-op: there is no
* `loadChangelog` to override, and downstream Auto-CDF classes (`DeltaChangelog`, etc.) live in
* version-specific `java-shims/spark-4.2/` dirs and are not present here either.
*/
trait ChangelogSupport extends TableCatalog
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.catalog

import io.delta.spark.internal.v2.catalog.SparkTable
import io.delta.spark.internal.v2.read.changelog.DeltaChangelog

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.catalog.{Changelog, ChangelogInfo, Identifier, TableCatalog}
import org.apache.spark.sql.connector.catalog.ChangelogRange.{TimestampRange, UnboundedRange, VersionRange}
import org.apache.spark.sql.delta.DeltaErrors
import org.apache.spark.sql.delta.sources.DeltaSQLConf

/**
* Mixed into a [[TableCatalog]] implementation to add Auto-CDF support. Provides the
* catalog-driven `TableCatalog.loadChangelog` entrypoint introduced by SPARK-56685.
*
* <p>This trait extends [[TableCatalog]] as a dependency marker: every concrete catalog that
* mixes this trait in must already be a `TableCatalog`. The trait itself does not provide a
* `TableCatalog` implementation.
*
* <p>The trait is intentionally thin. `loadChangelog` resolves the table via the catalog's own
* `loadTable`, validates that the result is a V2 [[SparkTable]] (read-time CDF only flows
* through the V2 connector. V1 tables go through the legacy Delta CDF path), resolves the
* requested [[ChangelogRange]] against the table's snapshot manager, and wraps everything into
* a [[DeltaChangelog]]. All connector-level work (loading snapshots, validating row tracking,
* inspecting metadata actions) is deferred to the read path inside [[DeltaChangelog]].
*
* <p>The whole entry point is gated by [[DeltaSQLConf.DELTA_CHANGELOG_V2_ENABLED]] (default
* `false`). When the flag is off the trait delegates to the parent `loadChangelog` default,
* which surfaces `UNSUPPORTED_FEATURE.CHANGE_DATA_CAPTURE`.
*/
trait ChangelogSupport extends TableCatalog {

override def loadChangelog(ident: Identifier, changelogInfo: ChangelogInfo): Changelog = {
val spark = SparkSession.active
if (!spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_CHANGELOG_V2_ENABLED)) {
// Feature gated off: fall back to the parent's default, which surfaces
// UNSUPPORTED_FEATURE.CHANGE_DATA_CAPTURE to the user.
return super.loadChangelog(ident, changelogInfo)
}
val sparkTable = loadTable(ident) match {
case st: SparkTable => st
case other =>
// Auto-CDF only supports the V2 connector. V1 Delta tables (DeltaTableV2 under the
// hood) keep going through the legacy CDF path that DeltaCatalog already exposes.
DeltaErrors.throwChangelogRequiresV2Table(ident.toString, other.getClass.getName)
}
val (startVersion, endVersion) = resolveRange(sparkTable, changelogInfo.range())
new DeltaChangelog(ident.name(), sparkTable, startVersion, endVersion)
}

/**
* Resolves a [[ChangelogRange]] against the snapshot manager owned by the resolved table.
*
* <p>Returned bounds have inclusivity already applied (exclusive start adds 1, exclusive end
* subtracts 1) and are validated. `UnboundedRange` is rejected on batch reads.
*/
private def resolveRange(
sparkTable: SparkTable,
range: org.apache.spark.sql.connector.catalog.ChangelogRange): (Long, Long) = {
val snapshotManager = sparkTable.getSnapshotManager
val latestVersion = snapshotManager.loadLatestSnapshot().getVersion
range match {
case vr: VersionRange =>
val rawStart = vr.startingVersion().toLong
val rawEnd: Long =
if (vr.endingVersion().isPresent) vr.endingVersion().get.toLong else latestVersion
adjustBounds(
rawStart, rawEnd, vr.startingBoundInclusive(), vr.endingBoundInclusive(), latestVersion)
case tr: TimestampRange =>
// TimestampRange carries Catalyst micros. The kernel API takes millis.
val rawStart = snapshotManager
.getActiveCommitAtTime(
tr.startingTimestamp / 1000,
/* canReturnLastCommit */ false,
/* mustBeRecreatable */ true,
/* canReturnEarliestCommit */ false)
.getVersion
val rawEnd: Long = if (tr.endingTimestamp.isPresent) {
snapshotManager
.getActiveCommitAtTime(
tr.endingTimestamp.get / 1000,
/* canReturnLastCommit */ true,
/* mustBeRecreatable */ true,
/* canReturnEarliestCommit */ false)
.getVersion
} else {
latestVersion
}
adjustBounds(
rawStart, rawEnd, tr.startingBoundInclusive(), tr.endingBoundInclusive(), latestVersion)
case _: UnboundedRange =>
DeltaErrors.throwChangelogUnboundedRange()
}
}

/**
* Apply per-bound inclusivity (`+1` / `-1`) and verify the resulting range is non-empty and
* within the table's commit history.
*/
private def adjustBounds(
start: Long,
end: Long,
startInclusive: Boolean,
endInclusive: Boolean,
latest: Long): (Long, Long) = {
val adjustedStart = if (startInclusive) start else start + 1
val adjustedEnd = if (endInclusive) end else end - 1
if (adjustedStart > adjustedEnd) {
throw DeltaErrors.endBeforeStartVersionInCDC(adjustedStart, adjustedEnd)
}
if (adjustedStart > latest) {
throw DeltaErrors.startVersionAfterLatestVersion(adjustedStart, latest)
}
(adjustedStart, adjustedEnd)
}
}
34 changes: 34 additions & 0 deletions spark/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,40 @@
],
"sqlState" : "22003"
},
"DELTA_CHANGELOG_REQUIRES_ROW_TRACKING" : {
"message" : [
"Change data capture via CHANGES on `<tableName>` requires row tracking.",
"Enable it by setting TBLPROPERTIES ('delta.enableRowTracking' = 'true')."
],
"sqlState" : "0AKDE"
},
"DELTA_CHANGELOG_REQUIRES_V2_TABLE" : {
"message" : [
"Auto-CDF reads on `<tableName>` require the V2 Delta connector, but the catalog resolved the table to `<actualClassName>`.",
"Set the Delta V2 mode SQL conf to `STRICT` (or `AUTO`, if AUTO routes CHANGES queries) to read this table through the V2 connector."
],
"sqlState" : "0AKDE"
},
"DELTA_CHANGELOG_ROW_TRACKING_DISABLED_IN_RANGE" : {
"message" : [
"Delta CDC requires row tracking to remain enabled across the requested range.",
"Commit at version <version> disabled row tracking. Pick a starting/ending version that brackets a row-tracking-enabled state."
],
"sqlState" : "0AKDE"
},
"DELTA_CHANGELOG_SCHEMA_CHANGE_IN_RANGE" : {
"message" : [
"Delta CDC does not support reading a changelog range that includes a schema change.",
"Commit at version <version> changes the table schema. Pick a starting/ending version that brackets a single schema."
],
"sqlState" : "0AKDE"
},
"DELTA_CHANGELOG_UNBOUNDED_RANGE" : {
"message" : [
"Delta CDC does not support unbounded ranges in batch reads. Specify a starting and ending version or timestamp."
],
"sqlState" : "0AKDE"
},
"DELTA_CHANGE_DATA_FEED_INCOMPATIBLE_DATA_SCHEMA" : {
"message" : [
"Retrieving table changes between version <start> and <end> failed because of an incompatible data schema.",
Expand Down
62 changes: 62 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,68 @@ trait DeltaErrorsBase
messageParameters = Array(start.toString, latest.toString))
}

/**
* Auto-CDF batch read rejected because the source table does not have row tracking enabled.
* Row tracking is required for the V2 changelog reader to identify rows across commits.
*
* Returns `Nothing` so Scala callers can use this in expression position (e.g. as a `match`
* arm) without an explicit `throw`. Java callers invoke it as a statement.
*/
def throwChangelogRequiresRowTracking(tableName: String): Nothing = {
throw new DeltaAnalysisException(
errorClass = "DELTA_CHANGELOG_REQUIRES_ROW_TRACKING",
messageParameters = Array(tableName))
}

/**
* Auto-CDF batch read rejected because the user requested an unbounded changelog range.
* Batch CHANGES queries require explicit start and end bounds.
*
* Returns `Nothing` so Scala callers can use this in expression position (e.g. as a `match`
* arm) without an explicit `throw`. Java callers invoke it as a statement.
*/
def throwChangelogUnboundedRange(): Nothing = {
throw new DeltaAnalysisException(
errorClass = "DELTA_CHANGELOG_UNBOUNDED_RANGE",
messageParameters = Array.empty[String])
}

/**
* Auto-CDF batch read rejected because the table resolved by the catalog is not a V2
* [[io.delta.spark.internal.v2.catalog.SparkTable]]. The V2 connector is the only path that
* implements the catalog-driven CHANGES surface. V1 Delta tables (`DeltaTableV2`) continue to
* use the legacy CDF path that does not go through `TableCatalog.loadChangelog`.
*
* Returns `Nothing` so Scala callers can use this in expression position (e.g. as a `match`
* arm) without an explicit `throw`. Java callers invoke it as a statement.
*/
def throwChangelogRequiresV2Table(tableName: String, actualClassName: String): Nothing = {
throw new DeltaAnalysisException(
errorClass = "DELTA_CHANGELOG_REQUIRES_V2_TABLE",
messageParameters = Array(tableName, actualClassName))
}

/**
* Auto-CDF batch read rejected because the table schema differs at some commit within the
* requested range. The connector requires the schema to be stable across the read range so
* that downstream batch CDC post-processing sees a single schema.
*/
def throwChangelogSchemaChangeInRange(version: Long): Nothing = {
throw new DeltaAnalysisException(
errorClass = "DELTA_CHANGELOG_SCHEMA_CHANGE_IN_RANGE",
messageParameters = Array(version.toString))
}

/**
* Auto-CDF batch read rejected because row tracking was disabled at some commit within the
* requested range (the `delta.enableRowTracking` table property was set to `false`).
*/
def throwChangelogRowTrackingDisabledInRange(version: Long): Nothing = {
throw new DeltaAnalysisException(
errorClass = "DELTA_CHANGELOG_ROW_TRACKING_DISABLED_IN_RANGE",
messageParameters = Array(version.toString))
}

def setTransactionVersionConflict(appId: String, version1: Long, version2: Long): Throwable = {
new IllegalArgumentException(
s"Two SetTransaction actions within the same transaction have the same appId ${appId} but " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2375,6 +2375,17 @@ trait DeltaSQLConfBase extends DeltaSQLConfUtils {
.booleanConf
.createWithDefault(false)

val DELTA_CHANGELOG_V2_ENABLED =
buildConf("changelogV2.enabled")
.internal()
.doc(
"""When enabled, the V2 connector's hybrid DeltaCatalog answers
|CHANGES FROM ... batch queries (TableCatalog.loadChangelog) using the
|kernel-based Auto-CDF reader stack. When disabled, the catalog falls back to the
|default behavior (UNSUPPORTED_FEATURE.CHANGE_DATA_CAPTURE).""".stripMargin)
.booleanConf
.createWithDefault(false)

val DELTA_CDF_ALLOW_OUT_OF_RANGE_TIMESTAMP = {
buildConf("changeDataFeed.timestampOutOfRange.enabled")
.doc(
Expand Down
Loading
Loading