diff --git a/spark-unified/src/main/java/org/apache/spark/sql/delta/catalog/DeltaCatalog.java b/spark-unified/src/main/java/org/apache/spark/sql/delta/catalog/DeltaCatalog.java
index ae4c2626d85..7b53c4ab9d9 100644
--- a/spark-unified/src/main/java/org/apache/spark/sql/delta/catalog/DeltaCatalog.java
+++ b/spark-unified/src/main/java/org/apache/spark/sql/delta/catalog/DeltaCatalog.java
@@ -62,7 +62,7 @@
*
*
See {@link DeltaV2Mode} for V1 vs V2 connector definitions and enable mode configuration.
*/
-public class DeltaCatalog extends AbstractDeltaCatalog {
+public class DeltaCatalog extends AbstractDeltaCatalog implements ChangelogSupport {
/**
* Loads a Delta table that is registered in the catalog.
diff --git a/spark-unified/src/main/scala-shims/spark-4.0/org/apache/spark/sql/delta/catalog/ChangelogSupport.scala b/spark-unified/src/main/scala-shims/spark-4.0/org/apache/spark/sql/delta/catalog/ChangelogSupport.scala
new file mode 100644
index 00000000000..e3427825753
--- /dev/null
+++ b/spark-unified/src/main/scala-shims/spark-4.0/org/apache/spark/sql/delta/catalog/ChangelogSupport.scala
@@ -0,0 +1,33 @@
+/*
+ * Copyright (2025) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.delta.catalog
+
+import org.apache.spark.sql.connector.catalog.TableCatalog
+
+/**
+ * No-op shim of `ChangelogSupport` for Spark 4.0.
+ *
+ * The catalog-driven `TableCatalog.loadChangelog` entrypoint and its supporting types
+ * (`Changelog`, `ChangelogInfo`, `ChangelogRange`) were introduced in Spark 4.2 via
+ * SPARK-56685. They do not exist in Spark 4.0/4.1, so the Auto-CDF wiring is compiled in only
+ * when building against Spark 4.2 (see `scala-shims/spark-4.2/...ChangelogSupport.scala`).
+ *
+ *
In 4.0/4.1 builds, mixing this empty trait into `DeltaCatalog` is a no-op: there is no
+ * `loadChangelog` to override, and downstream Auto-CDF classes (`DeltaChangelog`, etc.) live in
+ * version-specific `java-shims/spark-4.2/` dirs and are not present here either.
+ */
+trait ChangelogSupport extends TableCatalog
diff --git a/spark-unified/src/main/scala-shims/spark-4.1/org/apache/spark/sql/delta/catalog/ChangelogSupport.scala b/spark-unified/src/main/scala-shims/spark-4.1/org/apache/spark/sql/delta/catalog/ChangelogSupport.scala
new file mode 100644
index 00000000000..16b2a5cd4dc
--- /dev/null
+++ b/spark-unified/src/main/scala-shims/spark-4.1/org/apache/spark/sql/delta/catalog/ChangelogSupport.scala
@@ -0,0 +1,33 @@
+/*
+ * Copyright (2025) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.delta.catalog
+
+import org.apache.spark.sql.connector.catalog.TableCatalog
+
+/**
+ * No-op shim of `ChangelogSupport` for Spark 4.1.
+ *
+ *
The catalog-driven `TableCatalog.loadChangelog` entrypoint and its supporting types
+ * (`Changelog`, `ChangelogInfo`, `ChangelogRange`) were introduced in Spark 4.2 via
+ * SPARK-56685. They do not exist in Spark 4.0/4.1, so the Auto-CDF wiring is compiled in only
+ * when building against Spark 4.2 (see `scala-shims/spark-4.2/...ChangelogSupport.scala`).
+ *
+ *
In 4.0/4.1 builds, mixing this empty trait into `DeltaCatalog` is a no-op: there is no
+ * `loadChangelog` to override, and downstream Auto-CDF classes (`DeltaChangelog`, etc.) live in
+ * version-specific `java-shims/spark-4.2/` dirs and are not present here either.
+ */
+trait ChangelogSupport extends TableCatalog
diff --git a/spark-unified/src/main/scala-shims/spark-4.2/org/apache/spark/sql/delta/catalog/ChangelogSupport.scala b/spark-unified/src/main/scala-shims/spark-4.2/org/apache/spark/sql/delta/catalog/ChangelogSupport.scala
new file mode 100644
index 00000000000..15d49ff9dab
--- /dev/null
+++ b/spark-unified/src/main/scala-shims/spark-4.2/org/apache/spark/sql/delta/catalog/ChangelogSupport.scala
@@ -0,0 +1,132 @@
+/*
+ * Copyright (2025) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.delta.catalog
+
+import io.delta.spark.internal.v2.catalog.SparkTable
+import io.delta.spark.internal.v2.read.changelog.DeltaChangelog
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.connector.catalog.{Changelog, ChangelogInfo, Identifier, TableCatalog}
+import org.apache.spark.sql.connector.catalog.ChangelogRange.{TimestampRange, UnboundedRange, VersionRange}
+import org.apache.spark.sql.delta.DeltaErrors
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+
+/**
+ * Mixed into a [[TableCatalog]] implementation to add Auto-CDF support. Provides the
+ * catalog-driven `TableCatalog.loadChangelog` entrypoint introduced by SPARK-56685.
+ *
+ *
This trait extends [[TableCatalog]] as a dependency marker: every concrete catalog that
+ * mixes this trait in must already be a `TableCatalog`. The trait itself does not provide a
+ * `TableCatalog` implementation.
+ *
+ *
The trait is intentionally thin. `loadChangelog` resolves the table via the catalog's own
+ * `loadTable`, validates that the result is a V2 [[SparkTable]] (read-time CDF only flows
+ * through the V2 connector. V1 tables go through the legacy Delta CDF path), resolves the
+ * requested [[ChangelogRange]] against the table's snapshot manager, and wraps everything into
+ * a [[DeltaChangelog]]. All connector-level work (loading snapshots, validating row tracking,
+ * inspecting metadata actions) is deferred to the read path inside [[DeltaChangelog]].
+ *
+ *
The whole entry point is gated by [[DeltaSQLConf.DELTA_CHANGELOG_V2_ENABLED]] (default
+ * `false`). When the flag is off the trait delegates to the parent `loadChangelog` default,
+ * which surfaces `UNSUPPORTED_FEATURE.CHANGE_DATA_CAPTURE`.
+ */
+trait ChangelogSupport extends TableCatalog {
+
+ override def loadChangelog(ident: Identifier, changelogInfo: ChangelogInfo): Changelog = {
+ val spark = SparkSession.active
+ if (!spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_CHANGELOG_V2_ENABLED)) {
+ // Feature gated off: fall back to the parent's default, which surfaces
+ // UNSUPPORTED_FEATURE.CHANGE_DATA_CAPTURE to the user.
+ return super.loadChangelog(ident, changelogInfo)
+ }
+ val sparkTable = loadTable(ident) match {
+ case st: SparkTable => st
+ case other =>
+ // Auto-CDF only supports the V2 connector. V1 Delta tables (DeltaTableV2 under the
+ // hood) keep going through the legacy CDF path that DeltaCatalog already exposes.
+ DeltaErrors.throwChangelogRequiresV2Table(ident.toString, other.getClass.getName)
+ }
+ val (startVersion, endVersion) = resolveRange(sparkTable, changelogInfo.range())
+ new DeltaChangelog(ident.name(), sparkTable, startVersion, endVersion)
+ }
+
+ /**
+ * Resolves a [[ChangelogRange]] against the snapshot manager owned by the resolved table.
+ *
+ *
Returned bounds have inclusivity already applied (exclusive start adds 1, exclusive end
+ * subtracts 1) and are validated. `UnboundedRange` is rejected on batch reads.
+ */
+ private def resolveRange(
+ sparkTable: SparkTable,
+ range: org.apache.spark.sql.connector.catalog.ChangelogRange): (Long, Long) = {
+ val snapshotManager = sparkTable.getSnapshotManager
+ val latestVersion = snapshotManager.loadLatestSnapshot().getVersion
+ range match {
+ case vr: VersionRange =>
+ val rawStart = vr.startingVersion().toLong
+ val rawEnd: Long =
+ if (vr.endingVersion().isPresent) vr.endingVersion().get.toLong else latestVersion
+ adjustBounds(
+ rawStart, rawEnd, vr.startingBoundInclusive(), vr.endingBoundInclusive(), latestVersion)
+ case tr: TimestampRange =>
+ // TimestampRange carries Catalyst micros. The kernel API takes millis.
+ val rawStart = snapshotManager
+ .getActiveCommitAtTime(
+ tr.startingTimestamp / 1000,
+ /* canReturnLastCommit */ false,
+ /* mustBeRecreatable */ true,
+ /* canReturnEarliestCommit */ false)
+ .getVersion
+ val rawEnd: Long = if (tr.endingTimestamp.isPresent) {
+ snapshotManager
+ .getActiveCommitAtTime(
+ tr.endingTimestamp.get / 1000,
+ /* canReturnLastCommit */ true,
+ /* mustBeRecreatable */ true,
+ /* canReturnEarliestCommit */ false)
+ .getVersion
+ } else {
+ latestVersion
+ }
+ adjustBounds(
+ rawStart, rawEnd, tr.startingBoundInclusive(), tr.endingBoundInclusive(), latestVersion)
+ case _: UnboundedRange =>
+ DeltaErrors.throwChangelogUnboundedRange()
+ }
+ }
+
+ /**
+ * Apply per-bound inclusivity (`+1` / `-1`) and verify the resulting range is non-empty and
+ * within the table's commit history.
+ */
+ private def adjustBounds(
+ start: Long,
+ end: Long,
+ startInclusive: Boolean,
+ endInclusive: Boolean,
+ latest: Long): (Long, Long) = {
+ val adjustedStart = if (startInclusive) start else start + 1
+ val adjustedEnd = if (endInclusive) end else end - 1
+ if (adjustedStart > adjustedEnd) {
+ throw DeltaErrors.endBeforeStartVersionInCDC(adjustedStart, adjustedEnd)
+ }
+ if (adjustedStart > latest) {
+ throw DeltaErrors.startVersionAfterLatestVersion(adjustedStart, latest)
+ }
+ (adjustedStart, adjustedEnd)
+ }
+}
diff --git a/spark/src/main/resources/error/delta-error-classes.json b/spark/src/main/resources/error/delta-error-classes.json
index 9d844b9a2f3..ae5264bf33e 100644
--- a/spark/src/main/resources/error/delta-error-classes.json
+++ b/spark/src/main/resources/error/delta-error-classes.json
@@ -384,6 +384,40 @@
],
"sqlState" : "22003"
},
+ "DELTA_CHANGELOG_REQUIRES_ROW_TRACKING" : {
+ "message" : [
+ "Change data capture via CHANGES on `` requires row tracking.",
+ "Enable it by setting TBLPROPERTIES ('delta.enableRowTracking' = 'true')."
+ ],
+ "sqlState" : "0AKDE"
+ },
+ "DELTA_CHANGELOG_REQUIRES_V2_TABLE" : {
+ "message" : [
+ "Auto-CDF reads on `` require the V2 Delta connector, but the catalog resolved the table to ``.",
+ "Set the Delta V2 mode SQL conf to `STRICT` (or `AUTO`, if AUTO routes CHANGES queries) to read this table through the V2 connector."
+ ],
+ "sqlState" : "0AKDE"
+ },
+ "DELTA_CHANGELOG_ROW_TRACKING_DISABLED_IN_RANGE" : {
+ "message" : [
+ "Delta CDC requires row tracking to remain enabled across the requested range.",
+ "Commit at version disabled row tracking. Pick a starting/ending version that brackets a row-tracking-enabled state."
+ ],
+ "sqlState" : "0AKDE"
+ },
+ "DELTA_CHANGELOG_SCHEMA_CHANGE_IN_RANGE" : {
+ "message" : [
+ "Delta CDC does not support reading a changelog range that includes a schema change.",
+ "Commit at version changes the table schema. Pick a starting/ending version that brackets a single schema."
+ ],
+ "sqlState" : "0AKDE"
+ },
+ "DELTA_CHANGELOG_UNBOUNDED_RANGE" : {
+ "message" : [
+ "Delta CDC does not support unbounded ranges in batch reads. Specify a starting and ending version or timestamp."
+ ],
+ "sqlState" : "0AKDE"
+ },
"DELTA_CHANGE_DATA_FEED_INCOMPATIBLE_DATA_SCHEMA" : {
"message" : [
"Retrieving table changes between version and failed because of an incompatible data schema.",
diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
index 5bca996f065..d296d6273af 100644
--- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
@@ -586,6 +586,68 @@ trait DeltaErrorsBase
messageParameters = Array(start.toString, latest.toString))
}
+ /**
+ * Auto-CDF batch read rejected because the source table does not have row tracking enabled.
+ * Row tracking is required for the V2 changelog reader to identify rows across commits.
+ *
+ * Returns `Nothing` so Scala callers can use this in expression position (e.g. as a `match`
+ * arm) without an explicit `throw`. Java callers invoke it as a statement.
+ */
+ def throwChangelogRequiresRowTracking(tableName: String): Nothing = {
+ throw new DeltaAnalysisException(
+ errorClass = "DELTA_CHANGELOG_REQUIRES_ROW_TRACKING",
+ messageParameters = Array(tableName))
+ }
+
+ /**
+ * Auto-CDF batch read rejected because the user requested an unbounded changelog range.
+ * Batch CHANGES queries require explicit start and end bounds.
+ *
+ * Returns `Nothing` so Scala callers can use this in expression position (e.g. as a `match`
+ * arm) without an explicit `throw`. Java callers invoke it as a statement.
+ */
+ def throwChangelogUnboundedRange(): Nothing = {
+ throw new DeltaAnalysisException(
+ errorClass = "DELTA_CHANGELOG_UNBOUNDED_RANGE",
+ messageParameters = Array.empty[String])
+ }
+
+ /**
+ * Auto-CDF batch read rejected because the table resolved by the catalog is not a V2
+ * [[io.delta.spark.internal.v2.catalog.SparkTable]]. The V2 connector is the only path that
+ * implements the catalog-driven CHANGES surface. V1 Delta tables (`DeltaTableV2`) continue to
+ * use the legacy CDF path that does not go through `TableCatalog.loadChangelog`.
+ *
+ * Returns `Nothing` so Scala callers can use this in expression position (e.g. as a `match`
+ * arm) without an explicit `throw`. Java callers invoke it as a statement.
+ */
+ def throwChangelogRequiresV2Table(tableName: String, actualClassName: String): Nothing = {
+ throw new DeltaAnalysisException(
+ errorClass = "DELTA_CHANGELOG_REQUIRES_V2_TABLE",
+ messageParameters = Array(tableName, actualClassName))
+ }
+
+ /**
+ * Auto-CDF batch read rejected because the table schema differs at some commit within the
+ * requested range. The connector requires the schema to be stable across the read range so
+ * that downstream batch CDC post-processing sees a single schema.
+ */
+ def throwChangelogSchemaChangeInRange(version: Long): Nothing = {
+ throw new DeltaAnalysisException(
+ errorClass = "DELTA_CHANGELOG_SCHEMA_CHANGE_IN_RANGE",
+ messageParameters = Array(version.toString))
+ }
+
+ /**
+ * Auto-CDF batch read rejected because row tracking was disabled at some commit within the
+ * requested range (the `delta.enableRowTracking` table property was set to `false`).
+ */
+ def throwChangelogRowTrackingDisabledInRange(version: Long): Nothing = {
+ throw new DeltaAnalysisException(
+ errorClass = "DELTA_CHANGELOG_ROW_TRACKING_DISABLED_IN_RANGE",
+ messageParameters = Array(version.toString))
+ }
+
def setTransactionVersionConflict(appId: String, version1: Long, version2: Long): Throwable = {
new IllegalArgumentException(
s"Two SetTransaction actions within the same transaction have the same appId ${appId} but " +
diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala
index 5a72ecc105a..383c9bb8bcc 100644
--- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala
@@ -2375,6 +2375,17 @@ trait DeltaSQLConfBase extends DeltaSQLConfUtils {
.booleanConf
.createWithDefault(false)
+ val DELTA_CHANGELOG_V2_ENABLED =
+ buildConf("changelogV2.enabled")
+ .internal()
+ .doc(
+ """When enabled, the V2 connector's hybrid DeltaCatalog answers
+ |CHANGES FROM ... batch queries (TableCatalog.loadChangelog) using the
+ |kernel-based Auto-CDF reader stack. When disabled, the catalog falls back to the
+ |default behavior (UNSUPPORTED_FEATURE.CHANGE_DATA_CAPTURE).""".stripMargin)
+ .booleanConf
+ .createWithDefault(false)
+
val DELTA_CDF_ALLOW_OUT_OF_RANGE_TIMESTAMP = {
buildConf("changeDataFeed.timestampOutOfRange.enabled")
.doc(
diff --git a/spark/v2/src/main/java-shims/spark-4.2/io/delta/spark/internal/v2/read/changelog/DeltaChangelog.java b/spark/v2/src/main/java-shims/spark-4.2/io/delta/spark/internal/v2/read/changelog/DeltaChangelog.java
new file mode 100644
index 00000000000..0742241cd80
--- /dev/null
+++ b/spark/v2/src/main/java-shims/spark-4.2/io/delta/spark/internal/v2/read/changelog/DeltaChangelog.java
@@ -0,0 +1,109 @@
+package io.delta.spark.internal.v2.read.changelog;
+
+import io.delta.kernel.Snapshot;
+import io.delta.spark.internal.v2.catalog.SparkTable;
+import io.delta.spark.internal.v2.utils.SchemaUtils;
+import org.apache.spark.sql.connector.catalog.CatalogV2Util;
+import org.apache.spark.sql.connector.catalog.Changelog;
+import org.apache.spark.sql.connector.catalog.Column;
+import org.apache.spark.sql.connector.expressions.FieldReference;
+import org.apache.spark.sql.connector.expressions.NamedReference;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+/**
+ * V2 Changelog implementation for Delta tables.
+ *
+ * Wraps the {@link SparkTable} resolved by {@code TableCatalog.loadTable(ident)}. The
+ * connector-level work (snapshot loads, row tracking validation, metadata-action inspection
+ * across the range) is deferred to the read path inside {@link DeltaChangelogBatch}. The schema
+ * exposed by {@link #columns()} is the end-version schema. It matches the {@code dataSchema} the
+ * scan builds against, so analysis-time column resolution agrees with the per-commit Metadata
+ * validation performed at scan planning.
+ *
+ *
Row tracking is required at the table protocol. Without it the SPIP analyzer rule cannot
+ * partition by {@code rowId / rowVersion}. Validation is performed by the read path, not here.
+ */
+public class DeltaChangelog implements Changelog {
+
+ private final String tableName;
+ private final SparkTable sparkTable;
+ private final long startVersion;
+ private final long endVersion;
+
+ public static final String METADATA_COLUMN = "_metadata";
+ public static final String ROW_ID_FIELD = "row_id";
+ public static final String ROW_COMMIT_VERSION_FIELD = "row_commit_version";
+ public static final StructType METADATA_STRUCT =
+ new StructType()
+ .add(ROW_ID_FIELD, DataTypes.LongType, false)
+ .add(ROW_COMMIT_VERSION_FIELD, DataTypes.LongType, false);
+
+ public DeltaChangelog(
+ String tableName, SparkTable sparkTable, long startVersion, long endVersion) {
+ this.tableName = tableName;
+ this.sparkTable = sparkTable;
+ this.startVersion = startVersion;
+ this.endVersion = endVersion;
+ }
+
+ @Override
+ public String name() {
+ return tableName + " (changes)";
+ }
+
+ @Override
+ public Column[] columns() {
+ // Resolve lazily so catalog construction stays side-effect free. The scan path validates
+ // each per-commit Metadata against this same end-version schema.
+ Snapshot endSnapshot = sparkTable.getSnapshotManager().loadSnapshotAt(endVersion);
+ StructType endSchema = SchemaUtils.convertKernelSchemaToSparkSchema(endSnapshot.getSchema());
+ StructType cdcSchema =
+ endSchema
+ .add(METADATA_COLUMN, METADATA_STRUCT, false)
+ .add("_change_type", DataTypes.StringType, false)
+ .add("_commit_version", DataTypes.LongType, false)
+ .add("_commit_timestamp", DataTypes.TimestampType, false);
+
+ return CatalogV2Util.structTypeToV2Columns(cdcSchema);
+ }
+
+ // TODO: optimise to false when deletion vectors are guaranteed enabled across the entire
+ // [startVersion, endVersion] range. DVs enabled over range produces no carry-overs.
+ @Override
+ public boolean containsCarryoverRows() {
+ return true;
+ }
+
+ // TODO: optimise to false when the range is a single commit with no UPDATE/MERGE
+ // operations. Requires inspecting the commit's operation type.
+ @Override
+ public boolean containsIntermediateChanges() {
+ return true;
+ }
+
+ // This V2 path only consumes AddFile/RemoveFile actions, so an UPDATE always
+ // surfaces as a DELETE+INSERT pair sharing the same rowId. Spark derives the
+ // pre/post-images via update detection.
+ @Override
+ public boolean representsUpdateAsDeleteAndInsert() {
+ return true;
+ }
+
+ @Override
+ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
+ return new DeltaChangelogScanBuilder(sparkTable, startVersion, endVersion, options);
+ }
+
+ @Override
+ public NamedReference[] rowId() {
+ return new NamedReference[] {FieldReference.apply("_metadata.row_id")};
+ }
+
+ @Override
+ public NamedReference rowVersion() {
+ return FieldReference.apply("_metadata." + ROW_COMMIT_VERSION_FIELD);
+ }
+}
diff --git a/spark/v2/src/main/java-shims/spark-4.2/io/delta/spark/internal/v2/read/changelog/DeltaChangelogBatch.java b/spark/v2/src/main/java-shims/spark-4.2/io/delta/spark/internal/v2/read/changelog/DeltaChangelogBatch.java
new file mode 100644
index 00000000000..b410bfa6822
--- /dev/null
+++ b/spark/v2/src/main/java-shims/spark-4.2/io/delta/spark/internal/v2/read/changelog/DeltaChangelogBatch.java
@@ -0,0 +1,377 @@
+package io.delta.spark.internal.v2.read.changelog;
+
+import io.delta.kernel.CommitActions;
+import io.delta.kernel.CommitRange;
+import io.delta.kernel.Snapshot;
+import io.delta.kernel.data.ColumnarBatch;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.internal.DeltaLogActionUtils;
+import io.delta.kernel.internal.actions.AddFile;
+import io.delta.kernel.internal.actions.Metadata;
+import io.delta.kernel.internal.actions.RemoveFile;
+import io.delta.kernel.internal.commitrange.CommitRangeImpl;
+import io.delta.kernel.utils.CloseableIterator;
+import io.delta.spark.internal.v2.utils.PartitionUtils;
+import io.delta.spark.internal.v2.utils.SchemaUtils;
+import io.delta.spark.internal.v2.utils.StreamingHelper;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Supplier;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.paths.SparkPath;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.expressions.JoinedRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
+import org.apache.spark.sql.connector.read.Batch;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
+import org.apache.spark.sql.delta.DefaultRowCommitVersion$;
+import org.apache.spark.sql.delta.DeltaErrors;
+import org.apache.spark.sql.delta.RowId$;
+import org.apache.spark.sql.execution.datasources.FilePartition;
+import org.apache.spark.sql.execution.datasources.PartitionedFile;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.sources.Filter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import scala.Tuple2;
+
+public class DeltaChangelogBatch implements Batch {
+ private static final Set CHANGELOG_ACTION_SET =
+ Set.of(
+ DeltaLogActionUtils.DeltaAction.ADD,
+ DeltaLogActionUtils.DeltaAction.REMOVE,
+ DeltaLogActionUtils.DeltaAction.METADATA);
+ private static final String INSERT_CHANGE_TYPE = "insert";
+ private static final String DELETE_CHANGE_TYPE = "delete";
+
+ private final CommitRange commitRange;
+ private final Engine engine;
+ private final StructType dataSchema;
+ private final Snapshot snapshot;
+ private final Configuration hadoopConf;
+
+ public DeltaChangelogBatch(
+ CommitRange commitRange,
+ Engine engine,
+ StructType dataSchema,
+ Snapshot snapshot,
+ Configuration hadoopConf) {
+ this.commitRange = commitRange;
+ this.engine = engine;
+ this.dataSchema = dataSchema;
+ this.snapshot = snapshot;
+ this.hadoopConf = hadoopConf;
+ }
+
+ @Override
+ public InputPartition[] planInputPartitions() {
+ List partitions = new ArrayList<>();
+
+ // Pre-check catches schema drift between start and end. The per-commit loop below catches
+ // in-range Metadata commits.
+ StructType startSchema = SchemaUtils.convertKernelSchemaToSparkSchema(snapshot.getSchema());
+ if (!startSchema.equals(dataSchema)) {
+ DeltaErrors.throwChangelogSchemaChangeInRange(
+ ((CommitRangeImpl) commitRange).getStartVersion());
+ }
+
+ // TODO: Remove StreamingHelper usage. The helper is generic, only the class name is
+ // streaming-flavored.
+ //
+ // try-with-resources forces a catch (Exception) below because CommitActions.close()
+ // declares it. Unchecked exceptions pass through unchanged.
+ try (CloseableIterator commitsIter =
+ StreamingHelper.getCommitActionsFromRangeUnsafe(
+ engine, (CommitRangeImpl) commitRange, snapshot.getPath(), CHANGELOG_ACTION_SET)) {
+ while (commitsIter.hasNext()) {
+ // Emit RemoveFiles before AddFiles per commit. The Spark analyzer re-sorts anyway, but
+ // direct-batch tests iterate in emission order and rely on the preimage-then-postimage
+ // shape.
+ List commitRemoves = new ArrayList<>();
+ List commitAdds = new ArrayList<>();
+ try (CommitActions commit = commitsIter.next();
+ CloseableIterator actionsIter = commit.getActions()) {
+ while (actionsIter.hasNext()) {
+ ColumnarBatch batch = actionsIter.next();
+ for (int rowId = 0; rowId < batch.getSize(); rowId++) {
+ Optional addOpt = StreamingHelper.getAddFileWithDataChange(batch, rowId);
+ if (addOpt.isPresent()) {
+ AddFile add = addOpt.get();
+ commitAdds.add(
+ buildPartition(
+ add.getPath(),
+ add.getSize(),
+ INSERT_CHANGE_TYPE,
+ commit.getVersion(),
+ commit.getTimestamp(),
+ add::getBaseRowId,
+ add::getDefaultRowCommitVersion,
+ "AddFile"));
+ }
+ Optional removeOpt = StreamingHelper.getDataChangeRemove(batch, rowId);
+ if (removeOpt.isPresent()) {
+ RemoveFile remove = removeOpt.get();
+ commitRemoves.add(
+ buildPartition(
+ remove.getPath(),
+ remove.getSize().orElse(0L),
+ DELETE_CHANGE_TYPE,
+ commit.getVersion(),
+ commit.getTimestamp(),
+ remove::getBaseRowId,
+ remove::getDefaultRowCommitVersion,
+ "RemoveFile"));
+ }
+ // Validate Metadata actions: schema and row-tracking config must match the
+ // end-version baseline established by DeltaChangelogScanBuilder. Mid-range
+ // schema evolution or row-tracking-toggle would silently corrupt downstream
+ // CDC post-processing (row identity / column mapping drift).
+ Optional metadataOpt = StreamingHelper.getMetadata(batch, rowId);
+ if (metadataOpt.isPresent()) {
+ Metadata md = metadataOpt.get();
+ StructType commitSchema =
+ SchemaUtils.convertKernelSchemaToSparkSchema(md.getSchema());
+ if (!commitSchema.equals(dataSchema)) {
+ DeltaErrors.throwChangelogSchemaChangeInRange(commit.getVersion());
+ }
+ String rtValue = md.getConfiguration().get("delta.enableRowTracking");
+ // Absent key means the prior value persists (no change at this commit).
+ boolean rowTrackingEnabled = rtValue == null || "true".equalsIgnoreCase(rtValue);
+ if (!rowTrackingEnabled) {
+ DeltaErrors.throwChangelogRowTrackingDisabledInRange(commit.getVersion());
+ }
+ }
+ }
+ }
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to process CDC commit actions", e);
+ }
+ partitions.addAll(commitRemoves);
+ partitions.addAll(commitAdds);
+ }
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to plan CDC input partitions", e);
+ }
+ return partitions.toArray(new InputPartition[0]);
+ }
+
+ /**
+ * Build a {@link CDCInputPartition} for a single AddFile or RemoveFile action. Both action types
+ * require non-empty {@code baseRowId} and {@code defaultRowCommitVersion}; the caller (catalog)
+ * has already validated that row tracking is enabled at the start version of the read, so any
+ * missing value here is an invariant violation rather than a user-facing error.
+ */
+ private static CDCInputPartition buildPartition(
+ String path,
+ long size,
+ String changeType,
+ long commitVersion,
+ long commitTimestampMillis,
+ Supplier> baseRowIdAccessor,
+ Supplier> defaultRowCommitVersionAccessor,
+ String actionDescription) {
+ long baseRowId =
+ baseRowIdAccessor
+ .get()
+ .orElseThrow(
+ () ->
+ new IllegalStateException(
+ actionDescription + " " + path + " missing baseRowId"));
+ long defaultRcv =
+ defaultRowCommitVersionAccessor
+ .get()
+ .orElseThrow(
+ () ->
+ new IllegalStateException(
+ actionDescription + " " + path + " missing defaultRowCommitVersion"));
+ return new CDCInputPartition(
+ path, size, commitVersion, commitTimestampMillis, changeType, baseRowId, defaultRcv);
+ }
+
+ @Override
+ public PartitionReaderFactory createReaderFactory() {
+ StructType partitionSchema = new StructType();
+ StructType readDataSchema =
+ dataSchema.add(DeltaChangelog.METADATA_COLUMN, DeltaChangelog.METADATA_STRUCT, false);
+ Filter[] dataFilters = new Filter[0];
+ scala.collection.immutable.Map scalaOptions =
+ scala.collection.immutable.Map$.MODULE$.empty();
+ SQLConf sqlConf = SQLConf.get();
+
+ // Read-time Auto-CDF: tail columns are added by CDCPartitionReaderFactory below, so
+ // isWriteTimeCDCRead stays false (write-time streaming is the only true caller).
+ PartitionReaderFactory delegate =
+ PartitionUtils.createDeltaParquetReaderFactory(
+ snapshot,
+ dataSchema,
+ partitionSchema,
+ readDataSchema,
+ /* ddlOrderedReadOutputSchema */ readDataSchema,
+ dataFilters,
+ scalaOptions,
+ hadoopConf,
+ sqlConf,
+ /* isWriteTimeCDCRead */ false);
+
+ StructType outputSchema =
+ readDataSchema
+ .add("_change_type", DataTypes.StringType, false)
+ .add("_commit_version", DataTypes.LongType, false)
+ .add("_commit_timestamp", DataTypes.TimestampType, false);
+ return new CDCPartitionReaderFactory(delegate, snapshot.getPath(), outputSchema);
+ }
+
+ /** Serialized to executors; represents one CDC file change unit. */
+ static class CDCInputPartition implements InputPartition, Serializable {
+ private final String filePath;
+ private final long fileSize;
+ private final long commitVersion;
+ private final long commitTimestampMillis;
+ private final String changeType;
+ private final long baseRowId;
+ private final long defaultRowCommitVersion;
+
+ CDCInputPartition(
+ String filePath,
+ long fileSize,
+ long commitVersion,
+ long commitTimestampMillis,
+ String changeType,
+ long baseRowId,
+ long defaultRowCommitVersion) {
+ this.filePath = filePath;
+ this.fileSize = fileSize;
+ this.commitVersion = commitVersion;
+ this.commitTimestampMillis = commitTimestampMillis;
+ this.changeType = changeType;
+ this.baseRowId = baseRowId;
+ this.defaultRowCommitVersion = defaultRowCommitVersion;
+ }
+
+ public String getFilePath() {
+ return filePath;
+ }
+
+ public long getFileSize() {
+ return fileSize;
+ }
+
+ public long getCommitVersion() {
+ return commitVersion;
+ }
+
+ public long getCommitTimestampMillis() {
+ return commitTimestampMillis;
+ }
+
+ public String getChangeType() {
+ return changeType;
+ }
+
+ public long getBaseRowId() {
+ return baseRowId;
+ }
+
+ public long getDefaultRowCommitVersion() {
+ return defaultRowCommitVersion;
+ }
+ }
+
+ /** Executor-side factory for CDC partition readers. */
+ static class CDCPartitionReaderFactory implements PartitionReaderFactory, Serializable {
+ private final PartitionReaderFactory delegate;
+ private final String tablePath;
+ private final StructType outputSchema;
+
+ CDCPartitionReaderFactory(
+ PartitionReaderFactory delegate, String tablePath, StructType outputSchema) {
+ this.delegate = delegate;
+ this.tablePath = tablePath;
+ this.outputSchema = outputSchema;
+ }
+
+ @Override
+ public PartitionReader createReader(InputPartition partition) {
+ CDCInputPartition cdcPartition = (CDCInputPartition) partition;
+ InternalRow partitionValues = new GenericInternalRow(0);
+ SparkPath sparkPath =
+ SparkPath.fromUrlString(new Path(tablePath, cdcPartition.getFilePath()).toString());
+ scala.collection.immutable.Map constantMetadata =
+ (scala.collection.immutable.Map)
+ (scala.collection.immutable.Map, ?>)
+ scala.collection.immutable.Map$.MODULE$.empty();
+ constantMetadata =
+ constantMetadata.$plus(
+ new Tuple2<>(RowId$.MODULE$.BASE_ROW_ID(), cdcPartition.getBaseRowId()));
+ constantMetadata =
+ constantMetadata.$plus(
+ new Tuple2<>(
+ DefaultRowCommitVersion$.MODULE$.METADATA_STRUCT_FIELD_NAME(),
+ cdcPartition.getDefaultRowCommitVersion()));
+
+ PartitionedFile file =
+ new PartitionedFile(
+ partitionValues,
+ sparkPath,
+ /* start */ 0L,
+ /* length */ cdcPartition.getFileSize(),
+ /* locations */ new String[0],
+ /* modificationTime */ cdcPartition.getCommitTimestampMillis(),
+ /* fileSize */ cdcPartition.getFileSize(),
+ constantMetadata);
+ FilePartition filePartition = new FilePartition(0, new PartitionedFile[] {file});
+ PartitionReader baseReader = delegate.createReader(filePartition);
+ return new CDCPartitionReader(baseReader, cdcPartition, outputSchema);
+ }
+ }
+
+ /** Executor-side reader stub for per-file CDC row materialization. */
+ static class CDCPartitionReader implements PartitionReader {
+ private final PartitionReader baseReader;
+ private final UnsafeProjection projection;
+ private final GenericInternalRow cdcTail = new GenericInternalRow(3);
+ private final JoinedRow joined = new JoinedRow();
+
+ CDCPartitionReader(
+ PartitionReader baseReader,
+ CDCInputPartition cdcPartition,
+ StructType outputSchema) {
+ this.baseReader = baseReader;
+ this.projection = UnsafeProjection.create(outputSchema);
+ // Tail values are partition-constants. Set them once at construction so get() does not
+ // redo the same writes on every row.
+ this.cdcTail.update(0, UTF8String.fromString(cdcPartition.getChangeType()));
+ this.cdcTail.setLong(1, cdcPartition.getCommitVersion());
+ // millis to micros: Catalyst stores TimestampType as microseconds since epoch.
+ this.cdcTail.setLong(2, cdcPartition.getCommitTimestampMillis() * 1000L);
+ }
+
+ @Override
+ public boolean next() throws IOException {
+ return baseReader.next();
+ }
+
+ @Override
+ public InternalRow get() {
+ return projection.apply(joined.apply(baseReader.get(), cdcTail));
+ }
+
+ @Override
+ public void close() throws IOException {
+ baseReader.close();
+ }
+ }
+}
diff --git a/spark/v2/src/main/java-shims/spark-4.2/io/delta/spark/internal/v2/read/changelog/DeltaChangelogScan.java b/spark/v2/src/main/java-shims/spark-4.2/io/delta/spark/internal/v2/read/changelog/DeltaChangelogScan.java
new file mode 100644
index 00000000000..b94e11bbd0e
--- /dev/null
+++ b/spark/v2/src/main/java-shims/spark-4.2/io/delta/spark/internal/v2/read/changelog/DeltaChangelogScan.java
@@ -0,0 +1,61 @@
+package io.delta.spark.internal.v2.read.changelog;
+
+import io.delta.kernel.CommitRange;
+import io.delta.kernel.Snapshot;
+import io.delta.kernel.engine.Engine;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.sql.connector.read.Batch;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.types.StructType;
+
+public class DeltaChangelogScan implements Scan {
+ private final Engine engine;
+ private final StructType readSchema;
+ private final CommitRange commitRange;
+ private final StructType dataSchema;
+ private final Snapshot snapshot;
+ private final long startVersion;
+ private final long endVersion;
+ private final Configuration hadoopConf;
+
+ public DeltaChangelogScan(
+ StructType readSchema,
+ CommitRange commitRange,
+ Engine engine,
+ StructType dataSchema,
+ Snapshot snapshot,
+ long startVersion,
+ long endVersion,
+ Configuration hadoopConf) {
+ this.readSchema = readSchema;
+ this.commitRange = commitRange;
+ this.engine = engine;
+ this.dataSchema = dataSchema;
+ this.snapshot = snapshot;
+ this.startVersion = startVersion;
+ this.endVersion = endVersion;
+ this.hadoopConf = hadoopConf;
+ }
+
+ @Override
+ public StructType readSchema() {
+ return readSchema;
+ }
+
+ @Override
+ public String description() {
+ return String.format(
+ "DeltaChangelogScan [startVersion=%d, endVersion=%d]", startVersion, endVersion);
+ }
+
+ @Override
+ public Batch toBatch() {
+ return new DeltaChangelogBatch(commitRange, engine, dataSchema, snapshot, hadoopConf);
+ }
+
+ // TODO: implement toMicroBatchStream() so spark.readStream...loadChangelog(...) can drive a
+ // streaming CDC read through this Changelog. The existing streaming-CDC entrypoint
+ // (SparkMicroBatchStream + readChangeFeed=true) provides the wire format today, but it bypasses
+ // the catalog-driven Changelog API that SPARK-56687 (PR apache/spark#55637) builds on for
+ // streaming netChanges post-processing.
+}
diff --git a/spark/v2/src/main/java-shims/spark-4.2/io/delta/spark/internal/v2/read/changelog/DeltaChangelogScanBuilder.java b/spark/v2/src/main/java-shims/spark-4.2/io/delta/spark/internal/v2/read/changelog/DeltaChangelogScanBuilder.java
new file mode 100644
index 00000000000..906839c6a42
--- /dev/null
+++ b/spark/v2/src/main/java-shims/spark-4.2/io/delta/spark/internal/v2/read/changelog/DeltaChangelogScanBuilder.java
@@ -0,0 +1,79 @@
+package io.delta.spark.internal.v2.read.changelog;
+
+import io.delta.kernel.CommitRange;
+import io.delta.kernel.Snapshot;
+import io.delta.kernel.defaults.engine.DefaultEngine;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.internal.SnapshotImpl;
+import io.delta.kernel.internal.rowtracking.RowTracking;
+import io.delta.spark.internal.v2.catalog.SparkTable;
+import io.delta.spark.internal.v2.snapshot.DeltaSnapshotManager;
+import io.delta.spark.internal.v2.utils.SchemaUtils;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.delta.DeltaErrors;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+public class DeltaChangelogScanBuilder implements ScanBuilder {
+
+ private final SparkTable sparkTable;
+ private final long startVersion;
+ private final long endVersion;
+ private final CaseInsensitiveStringMap options;
+
+ public DeltaChangelogScanBuilder(
+ SparkTable sparkTable, long startVersion, long endVersion, CaseInsensitiveStringMap options) {
+ this.sparkTable = sparkTable;
+ this.startVersion = startVersion;
+ this.endVersion = endVersion;
+ this.options = options;
+ }
+
+ @Override
+ public Scan build() {
+ Configuration hadoopConf =
+ Objects.requireNonNull(
+ SparkSession.active().sparkContext().hadoopConfiguration(), "hadoopConf is null");
+ Engine engine = DefaultEngine.create(hadoopConf);
+ DeltaSnapshotManager snapshotManager = sparkTable.getSnapshotManager();
+ CommitRange commitRange =
+ snapshotManager.getTableChanges(engine, startVersion, Optional.of(endVersion));
+ // Boundary checks: both endpoints must already carry the schema + RT state that
+ // DeltaChangelogBatch will validate each in-range Metadata action against. Without these,
+ // an RT-disabled boundary with no in-range toggle commit would surface as a raw
+ // IllegalStateException "missing baseRowId" downstream.
+ Snapshot startSnapshot = snapshotManager.loadSnapshotAt(startVersion);
+ SnapshotImpl startSnapshotImpl = (SnapshotImpl) startSnapshot;
+ if (!RowTracking.isEnabled(startSnapshotImpl.getProtocol(), startSnapshotImpl.getMetadata())) {
+ DeltaErrors.throwChangelogRowTrackingDisabledInRange(startVersion);
+ }
+ Snapshot endSnapshot = snapshotManager.loadSnapshotAt(endVersion);
+ SnapshotImpl endSnapshotImpl = (SnapshotImpl) endSnapshot;
+ StructType endSchema = SchemaUtils.convertKernelSchemaToSparkSchema(endSnapshot.getSchema());
+ if (!RowTracking.isEnabled(endSnapshotImpl.getProtocol(), endSnapshotImpl.getMetadata())) {
+ DeltaErrors.throwChangelogRequiresRowTracking(sparkTable.name());
+ }
+
+ StructType cdcSchema =
+ endSchema
+ .add(DeltaChangelog.METADATA_COLUMN, DeltaChangelog.METADATA_STRUCT, false)
+ .add("_change_type", DataTypes.StringType, false)
+ .add("_commit_version", DataTypes.LongType, false)
+ .add("_commit_timestamp", DataTypes.TimestampType, false);
+ return new DeltaChangelogScan(
+ cdcSchema,
+ commitRange,
+ engine,
+ endSchema,
+ startSnapshot,
+ startVersion,
+ endVersion,
+ hadoopConf);
+ }
+}
diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/catalog/SparkTable.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/catalog/SparkTable.java
index a44cbe44a92..644fa5d8d00 100644
--- a/spark/v2/src/main/java/io/delta/spark/internal/v2/catalog/SparkTable.java
+++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/catalog/SparkTable.java
@@ -259,6 +259,15 @@ public Map getOptions() {
return options;
}
+ /**
+ * Returns the snapshot manager backing this table. Catalog-driven features such as Auto-CDF
+ * (TableCatalog.loadChangelog) use this to resolve versions, timestamps, and snapshots without
+ * having to build their own snapshot manager.
+ */
+ public DeltaSnapshotManager getSnapshotManager() {
+ return snapshotManager;
+ }
+
/**
* Returns the table name in a format compatible with DeltaTableV2.
*
diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkBatch.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkBatch.java
index 20eb074e959..64a85d8eb7a 100644
--- a/spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkBatch.java
+++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkBatch.java
@@ -92,7 +92,8 @@ public InputPartition[] planInputPartitions() {
@Override
public PartitionReaderFactory createReaderFactory() {
- // TODO: support write-time CDF on batch reads.
+ // Non-CDC plain table scan. Write-time CDF streaming reads route through
+ // SparkMicroBatchStream; read-time Auto-CDF batch reads route through DeltaChangelogBatch.
return PartitionUtils.createDeltaParquetReaderFactory(
snapshot,
dataSchema,
@@ -102,8 +103,7 @@ public PartitionReaderFactory createReaderFactory() {
dataFilters,
scalaOptions,
hadoopConf,
- sqlConf,
- /* isCDCRead */ false);
+ sqlConf);
}
@Override
diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java
index 54ee9a3d280..9338d9d2e12 100644
--- a/spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java
+++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java
@@ -526,7 +526,7 @@ public PartitionReaderFactory createReaderFactory() {
scalaOptions,
hadoopConf,
sqlConf,
- /* isCDCRead */ options.readChangeFeed());
+ /* isWriteTimeCDCRead */ options.readChangeFeed());
}
/**
diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/utils/PartitionUtils.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/utils/PartitionUtils.java
index 7b0f6eaa61c..ca1741c79cd 100644
--- a/spark/v2/src/main/java/io/delta/spark/internal/v2/utils/PartitionUtils.java
+++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/utils/PartitionUtils.java
@@ -308,9 +308,36 @@ private static Optional getCDCFileDvDescriptor(
*
*
* @param snapshot The Delta table snapshot containing protocol, metadata, and table path
- * @param isCDCRead If true, augments the read schema with CDC columns and wraps the reader with
- * {@link CDCReadFunction} to null-coalesce CDC metadata from per-file constants.
+ * @param isWriteTimeCDCRead If {@code true}, this is a write-time CDF read (streaming reads of
+ * the legacy {@code .option("readChangeFeed")} format): the read schema is augmented with CDC
+ * tail columns and the reader is wrapped with {@link CDCReadFunction}. If {@code false}, this
+ * is a plain table scan or a read-time Auto-CDF read; CDC handling is left to the caller in
+ * that case (Auto-CDF's outer {@code CDCPartitionReaderFactory} injects the tail columns as
+ * per-partition constants instead).
*/
+ public static PartitionReaderFactory createDeltaParquetReaderFactory(
+ Snapshot snapshot,
+ StructType dataSchema,
+ StructType partitionSchema,
+ StructType readDataSchema,
+ StructType ddlOrderedReadOutputSchema,
+ Filter[] dataFilters,
+ scala.collection.immutable.Map scalaOptions,
+ Configuration hadoopConf,
+ SQLConf sqlConf) {
+ return createDeltaParquetReaderFactory(
+ snapshot,
+ dataSchema,
+ partitionSchema,
+ readDataSchema,
+ ddlOrderedReadOutputSchema,
+ dataFilters,
+ scalaOptions,
+ hadoopConf,
+ sqlConf,
+ /* isWriteTimeCDCRead */ false);
+ }
+
public static PartitionReaderFactory createDeltaParquetReaderFactory(
Snapshot snapshot,
StructType dataSchema,
@@ -321,7 +348,7 @@ public static PartitionReaderFactory createDeltaParquetReaderFactory(
scala.collection.immutable.Map scalaOptions,
Configuration hadoopConf,
SQLConf sqlConf,
- boolean isCDCRead) {
+ boolean isWriteTimeCDCRead) {
SnapshotImpl snapshotImpl = (SnapshotImpl) snapshot;
// Use Path.toString() instead of toUri().toString() to avoid URL encoding issues.
// toUri().toString() encodes special characters (e.g., space -> %20), which causes
@@ -332,10 +359,13 @@ public static PartitionReaderFactory createDeltaParquetReaderFactory(
// column-reorder wrapper below.
final StructType originalReadDataSchema = readDataSchema;
- // For CDC reads, build the schema context and augment readDataSchema with CDC columns
- // before DV wrapping so that DV column indices account for them.
+ // For write-time CDF reads (streaming with readChangeFeed=true), build the schema context
+ // and augment readDataSchema with CDC tail columns before DV wrapping so that DV column
+ // indices account for them. Read-time CDF (Auto-CDF, via DeltaChangelogBatch) does not go
+ // through this path: DeltaChangelogBatch's outer CDCPartitionReaderFactory injects the
+ // tail columns as per-partition constants instead.
Optional cdcSchemaContext =
- isCDCRead
+ isWriteTimeCDCRead
? Optional.of(new CDCSchemaContext(readDataSchema, partitionSchema))
: Optional.empty();
if (cdcSchemaContext.isPresent()) {
@@ -381,7 +411,7 @@ public static PartitionReaderFactory createDeltaParquetReaderFactory(
dvSchemaContext.isPresent() ? Option.apply(Boolean.FALSE) : Option.empty();
DeltaParquetFileFormatV2 deltaFormat =
createDeltaParquetFileFormat(
- snapshot, tablePath, optimizationsEnabled, useMetadataRowIndex, isCDCRead);
+ snapshot, tablePath, optimizationsEnabled, useMetadataRowIndex, isWriteTimeCDCRead);
Function1> readFunc =
deltaFormat.buildReaderWithPartitionValues(
@@ -408,13 +438,9 @@ public static PartitionReaderFactory createDeltaParquetReaderFactory(
readFunc = RowTrackingReadFunction.wrap(readFunc, rowTrackingSchemaContext.get());
}
- // TODO(#5319): add e2e test for CDC reads (full schema + column pruning) when CDC reads
- // become user-reachable.
+ // TODO(#5319): add e2e test for CDC reads (full schema + column pruning) when streaming CDC
+ // reads become user-reachable end-to-end.
if (cdcSchemaContext.isPresent()) {
- if (rowTrackingSchemaContext.isPresent()) {
- throw new UnsupportedOperationException(
- "CDC reads combined with row tracking are not supported");
- }
readFunc = CDCReadFunction.wrap(readFunc, cdcSchemaContext.get(), enableVectorizedReader);
}
@@ -441,6 +467,15 @@ public static PartitionReaderFactory createDeltaParquetReaderFactory(
* @param optimizationsEnabled whether to enable file splitting and predicate pushdown
* @param useMetadataRowIndex explicit control over _metadata.row_index for DV filtering
*/
+ public static DeltaParquetFileFormatV2 createDeltaParquetFileFormat(
+ Snapshot snapshot,
+ String tablePath,
+ boolean optimizationsEnabled,
+ Option useMetadataRowIndex) {
+ return createDeltaParquetFileFormat(
+ snapshot, tablePath, optimizationsEnabled, useMetadataRowIndex, /* isCDCRead */ false);
+ }
+
public static DeltaParquetFileFormatV2 createDeltaParquetFileFormat(
Snapshot snapshot,
String tablePath,
diff --git a/spark/v2/src/test/java/io/delta/spark/internal/v2/DeltaV2TestBase.java b/spark/v2/src/test/java/io/delta/spark/internal/v2/DeltaV2TestBase.java
index af16a8be879..b00222ee3f2 100644
--- a/spark/v2/src/test/java/io/delta/spark/internal/v2/DeltaV2TestBase.java
+++ b/spark/v2/src/test/java/io/delta/spark/internal/v2/DeltaV2TestBase.java
@@ -130,6 +130,19 @@ protected void withTable(String[] tableNames, ThrowingRunnable action) throws Ex
}
}
+ /** Runs the given action and removes the table directory afterwards. */
+ protected void withTable(String tablePath, ThrowingRunnable action) throws Exception {
+ try {
+ action.run();
+ } finally {
+ try {
+ org.apache.commons.io.FileUtils.deleteDirectory(new java.io.File(tablePath));
+ } catch (java.io.IOException ignored) {
+ // Test cleanup best-effort.
+ }
+ }
+ }
+
protected static void createPartitionedTable(String tableName, String path) {
spark.sql(
String.format(
diff --git a/spark/v2/src/test/java/io/delta/spark/internal/v2/utils/PartitionUtilsTest.java b/spark/v2/src/test/java/io/delta/spark/internal/v2/utils/PartitionUtilsTest.java
index b931eb9c8bc..70f916cdd89 100644
--- a/spark/v2/src/test/java/io/delta/spark/internal/v2/utils/PartitionUtilsTest.java
+++ b/spark/v2/src/test/java/io/delta/spark/internal/v2/utils/PartitionUtilsTest.java
@@ -152,7 +152,7 @@ public void testCreateDeltaParquetReaderFactory_Basic() {
options,
hadoopConf,
sqlConf,
- /* isCDCRead= */ false);
+ /* isWriteTimeCDCRead */ false);
assertNotNull(factory, "PartitionReaderFactory should not be null");
}
@@ -194,11 +194,61 @@ public void testCreateDeltaParquetReaderFactory_isCDCRead() {
options,
hadoopConf,
sqlConf,
- /* isCDCRead= */ true);
+ /* isWriteTimeCDCRead */ true);
assertNotNull(factory, "CDC PartitionReaderFactory should not be null");
}
+ /**
+ * Read-time Auto-CDF calls into PartitionUtils with {@code isWriteTimeCDCRead=false}. The factory
+ * is then a plain Parquet reader factory: PartitionUtils does not augment {@code readDataSchema}
+ * with CDC tail columns and does not wrap the reader with {@code CDCReadFunction}. The tail
+ * columns are added by {@code DeltaChangelogBatch.CDCPartitionReaderFactory} as per-partition
+ * constants instead.
+ */
+ @Test
+ public void testCreateDeltaParquetReaderFactory_NotWriteTimeCDCRead() {
+ String tablePath =
+ createTestTable("test_delta_reader_factory_batch_changelog_" + System.nanoTime(), true);
+
+ Table table = Table.forPath(defaultEngine, tablePath);
+ Snapshot snapshot = table.getLatestSnapshot(defaultEngine);
+
+ StructType dataSchema =
+ new StructType(
+ new StructField[] {
+ DataTypes.createStructField("id", DataTypes.LongType, true),
+ });
+ StructType partitionSchema =
+ new StructType(
+ new StructField[] {DataTypes.createStructField("part", DataTypes.StringType, true)});
+ StructType readDataSchema = dataSchema;
+ StructType ddlOrderedReadOutputSchema =
+ SchemaUtils.ddlOrderedOutputSchema(
+ SchemaUtils.convertKernelSchemaToSparkSchema(snapshot.getSchema()),
+ readDataSchema,
+ partitionSchema);
+ Filter[] filters = new Filter[0];
+ scala.collection.immutable.Map options = Map$.MODULE$.empty();
+ Configuration hadoopConf = new Configuration();
+ SQLConf sqlConf = SQLConf.get();
+
+ PartitionReaderFactory factory =
+ PartitionUtils.createDeltaParquetReaderFactory(
+ snapshot,
+ dataSchema,
+ partitionSchema,
+ readDataSchema,
+ ddlOrderedReadOutputSchema,
+ filters,
+ options,
+ hadoopConf,
+ sqlConf,
+ /* isWriteTimeCDCRead */ false);
+
+ assertNotNull(factory, "isWriteTimeCDCRead=false PartitionReaderFactory should not be null");
+ }
+
@Test
public void testCalculateMaxSplitBytes_Basic() {
SQLConf sqlConf = SQLConf.get();
diff --git a/spark/v2/src/test/scala-shims/spark-4.2/io/delta/spark/internal/v2/read/changelog/DeltaChangelogCatalogIntegrationTest.java b/spark/v2/src/test/scala-shims/spark-4.2/io/delta/spark/internal/v2/read/changelog/DeltaChangelogCatalogIntegrationTest.java
new file mode 100644
index 00000000000..7ff7d9144d1
--- /dev/null
+++ b/spark/v2/src/test/scala-shims/spark-4.2/io/delta/spark/internal/v2/read/changelog/DeltaChangelogCatalogIntegrationTest.java
@@ -0,0 +1,740 @@
+/*
+ * Copyright (2026) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.delta.spark.internal.v2.read.changelog;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import io.delta.spark.internal.v2.snapshot.DeltaSnapshotManager;
+import io.delta.spark.internal.v2.snapshot.SnapshotManagerFactory;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Integration tests for catalog-routed CDC entrypoint (TableCatalog.loadChangelog).
+ *
+ * These tests intentionally exercise SQL/DataFrame paths (not direct DeltaChangelog
+ * construction) so they validate analyzer -> catalog -> changelog wiring.
+ */
+public class DeltaChangelogCatalogIntegrationTest extends DeltaChangelogTestBase {
+
+ // ===========================================================================================
+ // Fixtures and helpers
+ // ===========================================================================================
+
+ /**
+ * Creates a row-tracking-enabled Delta table with 5 INSERT commits on top of CREATE, runs the
+ * given body with the table name, and drops the table + path on completion.
+ *
+ *
Resulting commit history (used by all timestamp-range tests below):
+ *
+ *
+ * v0 = CREATE TABLE
+ * v1 = INSERT (1, 'Alice')
+ * v2 = INSERT (2, 'Bob')
+ * v3 = INSERT (3, 'Charlie')
+ * v4 = INSERT (4, 'Dave')
+ * v5 = INSERT (5, 'Eve')
+ *
+ */
+ private void withHistoryTable(String suffix, ThrowingConsumer body) throws Exception {
+ String tableName = "dsv2_cdc_catalog_ts_" + suffix + "_" + System.nanoTime();
+ String tablePath = System.getProperty("java.io.tmpdir") + "/" + tableName;
+ withTable(
+ tablePath,
+ () ->
+ withTable(
+ new String[] {tableName},
+ () -> {
+ spark.sql(
+ String.format(
+ "CREATE TABLE %s (id BIGINT, name STRING) USING delta LOCATION '%s' TBLPROPERTIES "
+ + "('delta.enableDeletionVectors'='false', 'delta.enableRowTracking'='true')",
+ tableName, tablePath));
+ spark.sql(String.format("INSERT INTO %s VALUES (1, 'Alice')", tableName));
+ spark.sql(String.format("INSERT INTO %s VALUES (2, 'Bob')", tableName));
+ spark.sql(String.format("INSERT INTO %s VALUES (3, 'Charlie')", tableName));
+ spark.sql(String.format("INSERT INTO %s VALUES (4, 'Dave')", tableName));
+ spark.sql(String.format("INSERT INTO %s VALUES (5, 'Eve')", tableName));
+ // Auto-CDF requires the V2 connector at read time. Writes above run in the
+ // session-default mode (AUTO → V1 connector for INSERT). The CHANGES read in
+ // the test body needs STRICT to ensure loadTable returns a V2 SparkTable.
+ withSQLConf(
+ "spark.databricks.delta.v2.enableMode",
+ "STRICT",
+ () -> body.accept(tableName, tablePath));
+ }));
+ }
+
+ @FunctionalInterface
+ private interface ThrowingConsumer {
+ void accept(String tableName, String tablePath) throws Exception;
+ }
+
+ /**
+ * Returns the commit timestamp of {@code version}. Resolves the snapshot directly through the
+ * kernel snapshot manager, so it works irrespective of the catalog mode flipped on by the test
+ * body (which keeps the catalog in STRICT for the Auto-CDF read path).
+ */
+ private java.sql.Timestamp commitTimestamp(String tablePath, long version) {
+ DeltaSnapshotManager snapshotManager =
+ SnapshotManagerFactory.create(tablePath, defaultEngine, Optional.empty());
+ long millis = snapshotManager.loadSnapshotAt(version).getTimestamp(defaultEngine);
+ return new java.sql.Timestamp(millis);
+ }
+
+ /**
+ * Returns a wall-clock string strictly between two commit timestamps. Used to exercise {@code
+ * getActiveCommitAtTime} with inputs that don't coincide with any commit's exact ts, so bounds
+ * inclusivity does not change the resolved version.
+ */
+ private String betweenCommits(String tablePath, long earlier, long later) {
+ long earlierMs = commitTimestamp(tablePath, earlier).getTime();
+ long laterMs = commitTimestamp(tablePath, later).getTime();
+ return new java.sql.Timestamp(earlierMs + (laterMs - earlierMs) / 2).toString();
+ }
+
+ // ===========================================================================================
+ // SQL / DataFrame parity
+ // ===========================================================================================
+
+ @Test
+ public void testSqlAndDataFrameChangesMatchForVersionRange() throws Exception {
+ String tableName = "dsv2_cdc_catalog_" + System.nanoTime();
+ String tablePath = System.getProperty("java.io.tmpdir") + "/" + tableName;
+
+ withTable(
+ tablePath,
+ () ->
+ withTable(
+ new String[] {tableName},
+ () -> {
+ spark.sql(
+ String.format(
+ "CREATE TABLE %s (id BIGINT, name STRING) USING delta LOCATION '%s' TBLPROPERTIES "
+ + "('delta.enableDeletionVectors'='false', 'delta.enableRowTracking'='true')",
+ tableName, tablePath));
+ spark.sql(
+ String.format("INSERT INTO %s VALUES (1, 'Alice'), (2, 'Bob')", tableName));
+ spark.sql(String.format("DELETE FROM %s WHERE id = 1", tableName));
+
+ // CHANGES read needs the V2 connector; see withHistoryTable for the rationale.
+ withSQLConf(
+ "spark.databricks.delta.v2.enableMode",
+ "STRICT",
+ () -> {
+ Dataset sqlDf =
+ spark
+ .sql(
+ String.format(
+ "SELECT id, name, _change_type, _commit_version "
+ + "FROM %s CHANGES FROM VERSION 1 TO VERSION 2",
+ tableName))
+ .orderBy("_commit_version", "id", "_change_type", "name");
+
+ Dataset apiDf =
+ spark
+ .read()
+ .option("startingVersion", "1")
+ .option("endingVersion", "2")
+ .changes(tableName)
+ .select("id", "name", "_change_type", "_commit_version")
+ .orderBy("_commit_version", "id", "_change_type", "name");
+
+ List sqlRows = sqlDf.collectAsList();
+ List apiRows = apiDf.collectAsList();
+ assertFalse(
+ sqlRows.isEmpty(),
+ "Expected non-empty CDC output for VERSION 1..2 range");
+ assertEquals(
+ sqlRows,
+ apiRows,
+ "SQL CHANGES and DataFrameReader.changes should match");
+
+ List fieldNames = Arrays.asList(sqlDf.schema().fieldNames());
+ assertTrue(fieldNames.contains("id"));
+ assertTrue(fieldNames.contains("name"));
+ assertTrue(fieldNames.contains("_change_type"));
+ assertTrue(fieldNames.contains("_commit_version"));
+ });
+ }));
+ }
+
+ // ===========================================================================================
+ // Bounds inclusivity/exclusivity testing
+ // ===========================================================================================
+
+ // -------------------- default INCL/INCL --------------------
+
+ @Test
+ public void testTimestampRangeReadsAllChanges() throws Exception {
+ withHistoryTable(
+ "all",
+ (tableName, tablePath) -> {
+ String startTs = commitTimestamp(tablePath, 0).toString();
+ String endTs = commitTimestamp(tablePath, 5).toString();
+
+ Dataset changes =
+ spark
+ .sql(
+ String.format(
+ "SELECT id, name, _change_type "
+ + "FROM %s CHANGES FROM TIMESTAMP '%s' TO TIMESTAMP '%s'",
+ tableName, startTs, endTs))
+ .orderBy("_commit_version", "id");
+
+ List rows = changes.collectAsList();
+ assertEquals(5, rows.size(), "Expected all five inserts in the v0..v5 timestamp range");
+ for (int i = 0; i < 5; i++) {
+ assertEquals((long) (i + 1), ((Number) rows.get(i).getAs("id")).longValue());
+ assertEquals("insert", rows.get(i).getAs("_change_type"));
+ }
+ });
+ }
+
+ @Test
+ public void testTimestampRangePartialMiddleCommit() throws Exception {
+ withHistoryTable(
+ "partial",
+ (tableName, tablePath) -> {
+ // Both bounds resolve to v3; range = [v3, v3] inclusive.
+ String tsV3 = commitTimestamp(tablePath, 3).toString();
+ Dataset changes =
+ spark.sql(
+ String.format(
+ "SELECT id, _change_type FROM %s "
+ + "CHANGES FROM TIMESTAMP '%s' TO TIMESTAMP '%s'",
+ tableName, tsV3, tsV3));
+
+ List rows = changes.collectAsList();
+ assertEquals(1, rows.size(), "Expected only the v3 insert in [v3, v3]");
+ assertEquals(3L, ((Number) rows.get(0).getAs("id")).longValue());
+ assertEquals("insert", rows.get(0).getAs("_change_type"));
+ });
+ }
+
+ @Test
+ public void testTimestampRangeBetweenCommitTimestamps() throws Exception {
+ withHistoryTable(
+ "between",
+ (tableName, tablePath) -> {
+ // Start strictly between v1 and v2: getActiveCommitAtTime returns the latest commit
+ // with ts <= start, so start resolves to v1.
+ // End strictly between v2 and v3: same rule, end resolves to v2.
+ // Range = [v1, v2] = Alice + Bob.
+ String startTs = betweenCommits(tablePath, 1, 2);
+ String endTs = betweenCommits(tablePath, 2, 3);
+
+ Dataset changes =
+ spark
+ .sql(
+ String.format(
+ "SELECT id, _change_type FROM %s "
+ + "CHANGES FROM TIMESTAMP '%s' TO TIMESTAMP '%s'",
+ tableName, startTs, endTs))
+ .orderBy("_commit_version", "id");
+
+ List rows = changes.collectAsList();
+ assertEquals(2, rows.size(), "Expected v1 and v2 inserts in between-commit range");
+ assertEquals(1L, ((Number) rows.get(0).getAs("id")).longValue());
+ assertEquals(2L, ((Number) rows.get(1).getAs("id")).longValue());
+ });
+ }
+
+ // -------------------- exclusive-bound variants --------------------
+
+ @Test
+ public void testTimestampRangeExclusiveBoundsSkipBoundaryCommits() throws Exception {
+ withHistoryTable(
+ "excl",
+ (tableName, tablePath) -> {
+ String tsV1 = commitTimestamp(tablePath, 1).toString();
+ String tsV3 = commitTimestamp(tablePath, 3).toString();
+
+ // FROM tsV1 EXCLUSIVE bumps start to v2; TO tsV3 EXCLUSIVE drops end to v2.
+ // Range = [v2, v2] = only the (2, 'Bob') insert.
+ Dataset changes =
+ spark.sql(
+ String.format(
+ "SELECT id, _change_type FROM %s "
+ + "CHANGES FROM TIMESTAMP '%s' EXCLUSIVE TO TIMESTAMP '%s' EXCLUSIVE",
+ tableName, tsV1, tsV3));
+
+ List rows = changes.collectAsList();
+ assertEquals(1, rows.size(), "Expected only v2 inside exclusive bounds");
+ assertEquals(2L, ((Number) rows.get(0).getAs("id")).longValue());
+ assertEquals("insert", rows.get(0).getAs("_change_type"));
+ });
+ }
+
+ @Test
+ public void testTimestampRangeMixedBoundsStartExclusiveEndInclusive() throws Exception {
+ withHistoryTable(
+ "mixed_se_ei",
+ (tableName, tablePath) -> {
+ String tsV1 = commitTimestamp(tablePath, 1).toString();
+ String tsV3 = commitTimestamp(tablePath, 3).toString();
+
+ // FROM tsV1 EXCLUSIVE bumps start to v2; TO tsV3 (default INCLUSIVE) keeps end at v3.
+ // Range = [v2, v3] = Bob + Charlie.
+ Dataset changes =
+ spark
+ .sql(
+ String.format(
+ "SELECT id, _change_type FROM %s "
+ + "CHANGES FROM TIMESTAMP '%s' EXCLUSIVE TO TIMESTAMP '%s'",
+ tableName, tsV1, tsV3))
+ .orderBy("_commit_version", "id");
+
+ List rows = changes.collectAsList();
+ assertEquals(2, rows.size(), "Expected v2 and v3 inserts");
+ assertEquals(2L, ((Number) rows.get(0).getAs("id")).longValue());
+ assertEquals(3L, ((Number) rows.get(1).getAs("id")).longValue());
+ });
+ }
+
+ @Test
+ public void testTimestampRangeMixedBoundsStartInclusiveEndExclusive() throws Exception {
+ withHistoryTable(
+ "mixed_si_ee",
+ (tableName, tablePath) -> {
+ String tsV1 = commitTimestamp(tablePath, 1).toString();
+ String tsV3 = commitTimestamp(tablePath, 3).toString();
+
+ // FROM tsV1 (default INCLUSIVE) keeps start at v1; TO tsV3 EXCLUSIVE drops end to v2.
+ // Range = [v1, v2] = Alice + Bob.
+ Dataset changes =
+ spark
+ .sql(
+ String.format(
+ "SELECT id, _change_type FROM %s "
+ + "CHANGES FROM TIMESTAMP '%s' TO TIMESTAMP '%s' EXCLUSIVE",
+ tableName, tsV1, tsV3))
+ .orderBy("_commit_version", "id");
+
+ List rows = changes.collectAsList();
+ assertEquals(2, rows.size(), "Expected v1 and v2 inserts");
+ assertEquals(1L, ((Number) rows.get(0).getAs("id")).longValue());
+ assertEquals(2L, ((Number) rows.get(1).getAs("id")).longValue());
+ });
+ }
+
+ // -------------------- open-ended end --------------------
+
+ @Test
+ public void testTimestampRangeOpenEndedReadsToLatest() throws Exception {
+ withHistoryTable(
+ "open_incl",
+ (tableName, tablePath) -> {
+ // FROM tsV1 (default INCLUSIVE) keeps start at v1; no TO clause = read to latest (v5).
+ // Range = [v1, v5] = all five inserts.
+ String tsV1 = commitTimestamp(tablePath, 1).toString();
+
+ Dataset changes =
+ spark
+ .sql(
+ String.format(
+ "SELECT id, _change_type FROM %s CHANGES FROM TIMESTAMP '%s'",
+ tableName, tsV1))
+ .orderBy("_commit_version", "id");
+
+ List rows = changes.collectAsList();
+ assertEquals(5, rows.size(), "Expected v1..v5 inclusive (all five inserts)");
+ for (int i = 0; i < 5; i++) {
+ assertEquals((long) (i + 1), ((Number) rows.get(i).getAs("id")).longValue());
+ }
+ });
+ }
+
+ @Test
+ public void testTimestampRangeOpenEndedExclusiveStart() throws Exception {
+ withHistoryTable(
+ "open_excl",
+ (tableName, tablePath) -> {
+ // FROM tsV1 EXCLUSIVE bumps start to v2; no TO clause = read to latest (v5).
+ // Range = [v2, v5] = Bob + Charlie + Dave + Eve.
+ String tsV1 = commitTimestamp(tablePath, 1).toString();
+
+ Dataset changes =
+ spark
+ .sql(
+ String.format(
+ "SELECT id, _change_type FROM %s CHANGES FROM TIMESTAMP '%s' EXCLUSIVE",
+ tableName, tsV1))
+ .orderBy("_commit_version", "id");
+
+ List rows = changes.collectAsList();
+ assertEquals(4, rows.size(), "Expected v2..v5 (four inserts) after EXCLUSIVE start");
+ for (int i = 0; i < 4; i++) {
+ assertEquals((long) (i + 2), ((Number) rows.get(i).getAs("id")).longValue());
+ }
+ });
+ }
+
+ // -------------------- error paths --------------------
+
+ @Test
+ public void testTimestampRangeRejectsEmptyExclusiveRange() throws Exception {
+ withHistoryTable(
+ "empty_excl",
+ (tableName, tablePath) -> {
+ // Both bounds at tsV3 with EXCL on both sides:
+ // start adjusts to v4, end adjusts to v2 -> start > end -> DELTA_INVALID_CDC_RANGE.
+ String tsV3 = commitTimestamp(tablePath, 3).toString();
+
+ Exception ex =
+ assertThrows(
+ Exception.class,
+ () ->
+ spark
+ .sql(
+ String.format(
+ "SELECT * FROM %s "
+ + "CHANGES FROM TIMESTAMP '%s' EXCLUSIVE "
+ + "TO TIMESTAMP '%s' EXCLUSIVE",
+ tableName, tsV3, tsV3))
+ .collectAsList());
+ assertTrue(
+ ex.getMessage().contains("DELTA_INVALID_CDC_RANGE")
+ || ex.getMessage().contains("end before start"),
+ "Expected empty-range CDC error, got: " + ex.getMessage());
+ });
+ }
+
+ @Test
+ public void testTimestampRangeBeforeEarliestCommitFails() throws Exception {
+ withHistoryTable(
+ "past_ts",
+ (tableName, tablePath) -> {
+ Exception ex =
+ assertThrows(
+ Exception.class,
+ () ->
+ spark
+ .sql(
+ String.format(
+ "SELECT * FROM %s "
+ + "CHANGES FROM TIMESTAMP '1900-01-01 00:00:00' "
+ + "TO TIMESTAMP '1900-01-02 00:00:00'",
+ tableName))
+ .collectAsList());
+ assertTrue(
+ ex.getMessage().contains("DELTA_TIMESTAMP_EARLIER_THAN_COMMIT_RETENTION")
+ || ex.getMessage().contains("earlier than")
+ || ex.getMessage().contains("before the earliest available version"),
+ "Expected timestamp-before-earliest error, got: " + ex.getMessage());
+ });
+ }
+
+ @Test
+ public void testTimestampRangeAfterLatestCommitFails() throws Exception {
+ String tableName = "dsv2_cdc_catalog_ts_future_" + System.nanoTime();
+ String tablePath = System.getProperty("java.io.tmpdir") + "/" + tableName;
+
+ withTable(
+ tablePath,
+ () ->
+ withTable(
+ new String[] {tableName},
+ () -> {
+ spark.sql(
+ String.format(
+ "CREATE TABLE %s (id BIGINT, name STRING) USING delta LOCATION '%s' TBLPROPERTIES "
+ + "('delta.enableDeletionVectors'='false', 'delta.enableRowTracking'='true')",
+ tableName, tablePath));
+ spark.sql(String.format("INSERT INTO %s VALUES (1, 'Alice')", tableName));
+
+ withSQLConf(
+ "spark.databricks.delta.v2.enableMode",
+ "STRICT",
+ () -> {
+ Exception ex =
+ assertThrows(
+ Exception.class,
+ () ->
+ spark
+ .sql(
+ String.format(
+ "SELECT * FROM %s CHANGES FROM TIMESTAMP "
+ + "'9999-01-01 00:00:00' TO TIMESTAMP "
+ + "'9999-01-02 00:00:00'",
+ tableName))
+ .collectAsList());
+ assertTrue(
+ ex.getMessage().contains("DELTA_TIMESTAMP_GREATER_THAN_COMMIT")
+ || ex.getMessage().contains("after the latest version")
+ || ex.getMessage().contains("after the latest available version"),
+ "Expected timestamp-after-latest error, got: " + ex.getMessage());
+ });
+ }));
+ }
+
+ @Test
+ public void testUnboundedBatchChangesIsRejectedForNow() throws Exception {
+ String tableName = "dsv2_cdc_catalog_unbounded_" + System.nanoTime();
+ String tablePath = System.getProperty("java.io.tmpdir") + "/" + tableName;
+
+ withTable(
+ tablePath,
+ () ->
+ withTable(
+ new String[] {tableName},
+ () -> {
+ spark.sql(
+ String.format(
+ "CREATE TABLE %s (id BIGINT, name STRING) USING delta LOCATION '%s'",
+ tableName, tablePath));
+ spark.sql(String.format("INSERT INTO %s VALUES (1, 'Alice')", tableName));
+
+ withSQLConf(
+ "spark.databricks.delta.v2.enableMode",
+ "STRICT",
+ () -> {
+ AnalysisException ex =
+ assertThrows(
+ AnalysisException.class,
+ () -> spark.read().changes(tableName).collectAsList());
+ assertTrue(
+ ex.getMessage().contains("DELTA_CHANGELOG_UNBOUNDED_RANGE"),
+ "Expected loadChangelog rejection for unbounded batch range, got: "
+ + ex.getMessage());
+ });
+ }));
+ }
+
+ // ===========================================================================================
+ // Mid-range validation: schema drift and row-tracking toggle
+ // ===========================================================================================
+
+ /**
+ * A CHANGES read on a table that does not have row tracking enabled at the end version must be
+ * rejected with the {@code DELTA_CHANGELOG_REQUIRES_ROW_TRACKING} error class. The check happens
+ * eagerly in {@code DeltaChangelogScanBuilder.build} against the end-version snapshot.
+ */
+ @Test
+ public void testChangelogRejectsTableWithoutRowTracking() throws Exception {
+ String tableName = "dsv2_cdc_catalog_no_rt_" + System.nanoTime();
+ String tablePath = System.getProperty("java.io.tmpdir") + "/" + tableName;
+
+ withTable(
+ tablePath,
+ () ->
+ withTable(
+ new String[] {tableName},
+ () -> {
+ // Table created without delta.enableRowTracking.
+ spark.sql(
+ String.format(
+ "CREATE TABLE %s (id BIGINT, name STRING) USING delta LOCATION '%s' "
+ + "TBLPROPERTIES ('delta.enableDeletionVectors'='false')",
+ tableName, tablePath));
+ spark.sql(String.format("INSERT INTO %s VALUES (1, 'Alice')", tableName));
+
+ withSQLConf(
+ "spark.databricks.delta.v2.enableMode",
+ "STRICT",
+ () -> {
+ Exception ex =
+ assertThrows(
+ Exception.class,
+ () ->
+ spark
+ .sql(
+ String.format(
+ "SELECT * FROM %s CHANGES FROM VERSION 0 TO "
+ + "VERSION 1",
+ tableName))
+ .collectAsList());
+ assertTrue(
+ ex.getMessage().contains("DELTA_CHANGELOG_REQUIRES_ROW_TRACKING"),
+ "Expected row-tracking required error, got: " + ex.getMessage());
+ });
+ }));
+ }
+
+ /**
+ * Range ends in an RT-disabled state. The eager end-snapshot check in {@code
+ * DeltaChangelogScanBuilder.build} must reject with {@code DELTA_CHANGELOG_REQUIRES_ROW_TRACKING}
+ * before the per-commit loop runs.
+ */
+ @Test
+ public void testChangelogRejectsRowTrackingDisabledAtEnd() throws Exception {
+ String tableName = "dsv2_cdc_catalog_rt_disabled_end_" + System.nanoTime();
+ String tablePath = System.getProperty("java.io.tmpdir") + "/" + tableName;
+
+ withTable(
+ tablePath,
+ () ->
+ withTable(
+ new String[] {tableName},
+ () -> {
+ // v0: CREATE with row tracking enabled.
+ spark.sql(
+ String.format(
+ "CREATE TABLE %s (id BIGINT, name STRING) USING delta LOCATION '%s' "
+ + "TBLPROPERTIES "
+ + "('delta.enableDeletionVectors'='false', "
+ + "'delta.enableRowTracking'='true')",
+ tableName, tablePath));
+ // v1: INSERT (row tracking still on).
+ spark.sql(String.format("INSERT INTO %s VALUES (1, 'Alice')", tableName));
+ // v2: disable row tracking via ALTER TBLPROPERTIES.
+ spark.sql(
+ String.format(
+ "ALTER TABLE %s SET TBLPROPERTIES ('delta.enableRowTracking'='false')",
+ tableName));
+
+ withSQLConf(
+ "spark.databricks.delta.v2.enableMode",
+ "STRICT",
+ () -> {
+ Exception ex =
+ assertThrows(
+ Exception.class,
+ () ->
+ spark
+ .sql(
+ String.format(
+ "SELECT * FROM %s CHANGES FROM VERSION 0 TO "
+ + "VERSION 2",
+ tableName))
+ .collectAsList());
+ assertTrue(
+ ex.getMessage().contains("DELTA_CHANGELOG_REQUIRES_ROW_TRACKING"),
+ "Expected eager end-snapshot RT error, got: " + ex.getMessage());
+ });
+ }));
+ }
+
+ /**
+ * Range starts and ends with row tracking enabled, but a mid-range commit carries a Metadata
+ * action that disables row tracking. The per-commit Metadata loop in {@code
+ * DeltaChangelogBatch.planInputPartitions} must reject with {@code
+ * DELTA_CHANGELOG_ROW_TRACKING_DISABLED_IN_RANGE}, because the eager boundary checks see only
+ * RT-enabled endpoints.
+ */
+ @Test
+ public void testChangelogRejectsRowTrackingDisabledMidRange() throws Exception {
+ String tableName = "dsv2_cdc_catalog_rt_disabled_mid_" + System.nanoTime();
+ String tablePath = System.getProperty("java.io.tmpdir") + "/" + tableName;
+
+ withTable(
+ tablePath,
+ () ->
+ withTable(
+ new String[] {tableName},
+ () -> {
+ // v0: CREATE with row tracking enabled.
+ spark.sql(
+ String.format(
+ "CREATE TABLE %s (id BIGINT, name STRING) USING delta LOCATION '%s' "
+ + "TBLPROPERTIES "
+ + "('delta.enableDeletionVectors'='false', "
+ + "'delta.enableRowTracking'='true')",
+ tableName, tablePath));
+ // v1: INSERT (RT still on).
+ spark.sql(String.format("INSERT INTO %s VALUES (1, 'Alice')", tableName));
+ // v2: ALTER TBLPROPERTIES sets RT off (Metadata commit inside the range).
+ spark.sql(
+ String.format(
+ "ALTER TABLE %s SET TBLPROPERTIES ('delta.enableRowTracking'='false')",
+ tableName));
+ // v3: ALTER TBLPROPERTIES turns RT back on, so the end-snapshot check passes
+ // and the failure must come from the per-commit loop at v2.
+ spark.sql(
+ String.format(
+ "ALTER TABLE %s SET TBLPROPERTIES ('delta.enableRowTracking'='true')",
+ tableName));
+
+ withSQLConf(
+ "spark.databricks.delta.v2.enableMode",
+ "STRICT",
+ () -> {
+ Exception ex =
+ assertThrows(
+ Exception.class,
+ () ->
+ spark
+ .sql(
+ String.format(
+ "SELECT * FROM %s CHANGES FROM VERSION 0 TO "
+ + "VERSION 3",
+ tableName))
+ .collectAsList());
+ assertTrue(
+ ex.getMessage()
+ .contains("DELTA_CHANGELOG_ROW_TRACKING_DISABLED_IN_RANGE"),
+ "Expected per-commit mid-range RT error, got: " + ex.getMessage());
+ });
+ }));
+ }
+
+ /**
+ * A CHANGES read across a range where the table schema evolves mid-range must be rejected with
+ * {@code DELTA_CHANGELOG_SCHEMA_CHANGE_IN_RANGE}.
+ */
+ @Test
+ public void testChangelogRejectsSchemaChangeMidRange() throws Exception {
+ String tableName = "dsv2_cdc_catalog_schema_change_" + System.nanoTime();
+ String tablePath = System.getProperty("java.io.tmpdir") + "/" + tableName;
+
+ withTable(
+ tablePath,
+ () ->
+ withTable(
+ new String[] {tableName},
+ () -> {
+ spark.sql(
+ String.format(
+ "CREATE TABLE %s (id BIGINT, name STRING) USING delta LOCATION '%s' "
+ + "TBLPROPERTIES "
+ + "('delta.enableDeletionVectors'='false', "
+ + "'delta.enableRowTracking'='true')",
+ tableName, tablePath));
+ spark.sql(String.format("INSERT INTO %s VALUES (1, 'Alice')", tableName));
+ // Schema change mid-range: add a column.
+ spark.sql(String.format("ALTER TABLE %s ADD COLUMN extra STRING", tableName));
+ spark.sql(String.format("INSERT INTO %s VALUES (2, 'Bob', 'x')", tableName));
+
+ withSQLConf(
+ "spark.databricks.delta.v2.enableMode",
+ "STRICT",
+ () -> {
+ Exception ex =
+ assertThrows(
+ Exception.class,
+ () ->
+ spark
+ .sql(
+ String.format(
+ "SELECT * FROM %s CHANGES FROM VERSION 1 TO "
+ + "VERSION 3",
+ tableName))
+ .collectAsList());
+ assertTrue(
+ ex.getMessage().contains("DELTA_CHANGELOG_SCHEMA_CHANGE_IN_RANGE"),
+ "Expected schema-change error, got: " + ex.getMessage());
+ });
+ }));
+ }
+}
diff --git a/spark/v2/src/test/scala-shims/spark-4.2/io/delta/spark/internal/v2/read/changelog/DeltaChangelogDirectBatchExecutionTest.java b/spark/v2/src/test/scala-shims/spark-4.2/io/delta/spark/internal/v2/read/changelog/DeltaChangelogDirectBatchExecutionTest.java
new file mode 100644
index 00000000000..ab39512167d
--- /dev/null
+++ b/spark/v2/src/test/scala-shims/spark-4.2/io/delta/spark/internal/v2/read/changelog/DeltaChangelogDirectBatchExecutionTest.java
@@ -0,0 +1,408 @@
+/*
+ * Copyright (2026) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.delta.spark.internal.v2.read.changelog;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import io.delta.spark.internal.v2.catalog.SparkTable;
+import io.delta.spark.internal.v2.snapshot.DeltaSnapshotManager;
+import io.delta.spark.internal.v2.snapshot.SnapshotManagerFactory;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.expressions.NamedReference;
+import org.apache.spark.sql.connector.read.Batch;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Direct (non-analyzer) batch execution contract test for Delta changelog classes.
+ *
+ * This is intentionally strict with explicit expected rows to validate end-to-end behavior
+ * through ScanBuilder -> Scan -> Batch -> PartitionReader.
+ */
+public class DeltaChangelogDirectBatchExecutionTest extends DeltaChangelogTestBase {
+
+ @Test
+ public void testDirectBatchExecutionWithExplicitExpectedRows() throws Exception {
+ String tableName = "dsv2_changelog_direct_" + System.nanoTime();
+ String tablePath = System.getProperty("java.io.tmpdir") + "/" + tableName;
+
+ withTable(
+ new String[] {tableName},
+ () -> {
+ spark.sql(
+ String.format(
+ "CREATE TABLE %s (id BIGINT, name STRING) USING delta "
+ + "LOCATION '%s' TBLPROPERTIES "
+ + "('delta.enableDeletionVectors'='false', 'delta.enableRowTracking'='true')",
+ tableName, tablePath));
+ spark.sql(String.format("INSERT INTO %s VALUES (1, 'Alice'), (2, 'Bob')", tableName));
+ spark.sql(String.format("DELETE FROM %s WHERE id = 1", tableName));
+
+ DeltaSnapshotManager snapshotManager =
+ SnapshotManagerFactory.create(tablePath, defaultEngine, Optional.empty());
+ StructType dataSchema = spark.table(tableName).schema();
+ long latestVersion = snapshotManager.loadLatestSnapshot().getVersion();
+ Map commitTimestampsMicros = loadCommitTimestampsMicros(tableName);
+
+ DeltaChangelog changelog =
+ new DeltaChangelog(
+ tableName,
+ new SparkTable(Identifier.of(new String[0], tableName), tablePath),
+ 0L,
+ latestVersion);
+ ScanBuilder scanBuilder =
+ changelog.newScanBuilder(new CaseInsensitiveStringMap(Collections.emptyMap()));
+ Scan scan = scanBuilder.build();
+ StructType schema = scan.readSchema();
+ Batch batch = scan.toBatch();
+
+ List actualRows = collectRows(batch);
+ List expectedRows =
+ expectedRows(commitTimestampsMicros.get(1L), commitTimestampsMicros.get(2L));
+
+ // Schema-level contract checks for CDC metadata columns.
+ List fieldNames = Arrays.asList(schema.fieldNames());
+ assertTrue(fieldNames.contains("_change_type"));
+ assertTrue(fieldNames.contains("_commit_version"));
+ assertTrue(fieldNames.contains("_commit_timestamp"));
+ assertTrue(fieldNames.contains("id"));
+ assertTrue(fieldNames.contains("name"));
+
+ int idIndex = fieldNames.indexOf("id");
+ int nameIndex = fieldNames.indexOf("name");
+ int changeTypeIndex = fieldNames.indexOf("_change_type");
+ int commitVersionIndex = fieldNames.indexOf("_commit_version");
+ int commitTimestampIndex = fieldNames.indexOf("_commit_timestamp");
+
+ // Explicit row-value checks.
+ assertEquals(expectedRows.size(), actualRows.size());
+ assertRowsEqual(
+ actualRows,
+ expectedRows,
+ idIndex,
+ nameIndex,
+ changeTypeIndex,
+ commitVersionIndex,
+ commitTimestampIndex);
+ });
+ }
+
+ /**
+ * Asserts that {@link DeltaChangelog#rowId()} and {@link DeltaChangelog#rowVersion()} expose the
+ * per-row tracking metadata fields (not the per-commit columns). Spark's batch CDC post-processor
+ * reads these references to perform carry-over removal and update detection. If {@code
+ * rowVersion()} ever pointed at {@code _commit_version} instead of {@code
+ * _metadata.row_commit_version}, real updates whose DELETE/INSERT halves share the same commit
+ * version would be silently dropped as carry-overs.
+ */
+ @Test
+ public void testRowTrackingMetadataReferences() throws Exception {
+ String tableName = "dsv2_changelog_direct_meta_" + System.nanoTime();
+ String tablePath = System.getProperty("java.io.tmpdir") + "/" + tableName;
+
+ withTable(
+ new String[] {tableName},
+ () -> {
+ spark.sql(
+ String.format(
+ "CREATE TABLE %s (id BIGINT) USING delta LOCATION '%s' TBLPROPERTIES "
+ + "('delta.enableDeletionVectors'='false', 'delta.enableRowTracking'='true')",
+ tableName, tablePath));
+ spark.sql(String.format("INSERT INTO %s VALUES (1)", tableName));
+
+ DeltaSnapshotManager snapshotManager =
+ SnapshotManagerFactory.create(tablePath, defaultEngine, Optional.empty());
+ StructType dataSchema = spark.table(tableName).schema();
+ long latestVersion = snapshotManager.loadLatestSnapshot().getVersion();
+
+ DeltaChangelog changelog =
+ new DeltaChangelog(
+ tableName,
+ new SparkTable(Identifier.of(new String[0], tableName), tablePath),
+ 0L,
+ latestVersion);
+
+ NamedReference[] rowIdRefs = changelog.rowId();
+ assertEquals(1, rowIdRefs.length, "Expected a single rowId field reference");
+ assertArrayEquals(
+ new String[] {"_metadata", "row_id"},
+ rowIdRefs[0].fieldNames(),
+ "rowId() must point at _metadata.row_id");
+ assertArrayEquals(
+ new String[] {"_metadata", "row_commit_version"},
+ changelog.rowVersion().fieldNames(),
+ "rowVersion() must point at _metadata.row_commit_version (per-row), "
+ + "not _commit_version (per-commit)");
+ });
+ }
+
+ /**
+ * UPDATE on a row-tracking table is CoW: the file containing the updated row is rewritten, which
+ * produces both a RemoveFile (old contents) and an AddFile (new contents) at the same commit. The
+ * connector must surface both halves of that pair as raw DELETE + INSERT rows so Spark's
+ * post-processor can derive the update.
+ */
+ @Test
+ public void testUpdateProducesPairedDeleteAndInsert() throws Exception {
+ String tableName = "dsv2_changelog_direct_update_" + System.nanoTime();
+ String tablePath = System.getProperty("java.io.tmpdir") + "/" + tableName;
+
+ withTable(
+ new String[] {tableName},
+ () -> {
+ spark.sql(
+ String.format(
+ "CREATE TABLE %s (id BIGINT, name STRING) USING delta LOCATION '%s' "
+ + "TBLPROPERTIES "
+ + "('delta.enableDeletionVectors'='false', 'delta.enableRowTracking'='true')",
+ tableName, tablePath));
+ spark.sql(String.format("INSERT INTO %s VALUES (1, 'Alice')", tableName));
+ spark.sql(String.format("UPDATE %s SET name = 'AliceX' WHERE id = 1", tableName));
+
+ DeltaSnapshotManager snapshotManager =
+ SnapshotManagerFactory.create(tablePath, defaultEngine, Optional.empty());
+ StructType dataSchema = spark.table(tableName).schema();
+ long latestVersion = snapshotManager.loadLatestSnapshot().getVersion();
+ Map commitTimestampsMicros = loadCommitTimestampsMicros(tableName);
+
+ DeltaChangelog changelog =
+ new DeltaChangelog(
+ tableName,
+ new SparkTable(Identifier.of(new String[0], tableName), tablePath),
+ 0L,
+ latestVersion);
+ Scan scan =
+ changelog
+ .newScanBuilder(new CaseInsensitiveStringMap(Collections.emptyMap()))
+ .build();
+ Batch batch = scan.toBatch();
+ StructType schema = scan.readSchema();
+
+ List actualRows = collectRows(batch);
+
+ List fieldNames = Arrays.asList(schema.fieldNames());
+ int idIndex = fieldNames.indexOf("id");
+ int nameIndex = fieldNames.indexOf("name");
+ int changeTypeIndex = fieldNames.indexOf("_change_type");
+ int commitVersionIndex = fieldNames.indexOf("_commit_version");
+ int commitTimestampIndex = fieldNames.indexOf("_commit_timestamp");
+
+ List expectedRows = new ArrayList<>();
+ // v1: initial INSERT
+ expectedRows.add(row(1L, "Alice", "insert", 1L, commitTimestampsMicros.get(1L)));
+ // v2: UPDATE -> DELETE old + INSERT new at the same commit
+ expectedRows.add(row(1L, "Alice", "delete", 2L, commitTimestampsMicros.get(2L)));
+ expectedRows.add(row(1L, "AliceX", "insert", 2L, commitTimestampsMicros.get(2L)));
+
+ assertEquals(
+ expectedRows.size(),
+ actualRows.size(),
+ "Update should produce a paired DELETE + INSERT at v2 alongside the v1 insert");
+ assertRowsEqual(
+ actualRows,
+ expectedRows,
+ idIndex,
+ nameIndex,
+ changeTypeIndex,
+ commitVersionIndex,
+ commitTimestampIndex);
+ });
+ }
+
+ /**
+ * Construction with a non-zero start version must restrict the scan to commits within [start,
+ * end]. Specifically: actions from earlier commits must not appear in the output. This exercises
+ * {@code planInputPartitions} commit-file slicing rather than the full-history happy path.
+ */
+ @Test
+ public void testRangeSlicingNonZeroStart() throws Exception {
+ String tableName = "dsv2_changelog_direct_slice_" + System.nanoTime();
+ String tablePath = System.getProperty("java.io.tmpdir") + "/" + tableName;
+
+ withTable(
+ new String[] {tableName},
+ () -> {
+ spark.sql(
+ String.format(
+ "CREATE TABLE %s (id BIGINT, name STRING) USING delta LOCATION '%s' "
+ + "TBLPROPERTIES "
+ + "('delta.enableDeletionVectors'='false', 'delta.enableRowTracking'='true')",
+ tableName, tablePath));
+ spark.sql(String.format("INSERT INTO %s VALUES (1, 'Alice')", tableName));
+ spark.sql(String.format("INSERT INTO %s VALUES (2, 'Bob')", tableName));
+ spark.sql(String.format("INSERT INTO %s VALUES (3, 'Charlie')", tableName));
+
+ DeltaSnapshotManager snapshotManager =
+ SnapshotManagerFactory.create(tablePath, defaultEngine, Optional.empty());
+ StructType dataSchema = spark.table(tableName).schema();
+ Map commitTimestampsMicros = loadCommitTimestampsMicros(tableName);
+
+ // Range = [v2, v3]. v0 (CREATE) and v1 (Alice) must be excluded from the output.
+ DeltaChangelog changelog =
+ new DeltaChangelog(
+ tableName,
+ new SparkTable(Identifier.of(new String[0], tableName), tablePath),
+ 2L,
+ 3L);
+ Scan scan =
+ changelog
+ .newScanBuilder(new CaseInsensitiveStringMap(Collections.emptyMap()))
+ .build();
+ Batch batch = scan.toBatch();
+ StructType schema = scan.readSchema();
+
+ List actualRows = collectRows(batch);
+
+ List fieldNames = Arrays.asList(schema.fieldNames());
+ int idIndex = fieldNames.indexOf("id");
+ int nameIndex = fieldNames.indexOf("name");
+ int changeTypeIndex = fieldNames.indexOf("_change_type");
+ int commitVersionIndex = fieldNames.indexOf("_commit_version");
+ int commitTimestampIndex = fieldNames.indexOf("_commit_timestamp");
+
+ List expectedRows = new ArrayList<>();
+ expectedRows.add(row(2L, "Bob", "insert", 2L, commitTimestampsMicros.get(2L)));
+ expectedRows.add(row(3L, "Charlie", "insert", 3L, commitTimestampsMicros.get(3L)));
+
+ assertEquals(
+ expectedRows.size(),
+ actualRows.size(),
+ "Sliced range [v2, v3] must exclude v0/v1 actions");
+ assertRowsEqual(
+ actualRows,
+ expectedRows,
+ idIndex,
+ nameIndex,
+ changeTypeIndex,
+ commitVersionIndex,
+ commitTimestampIndex);
+ });
+ }
+
+ private static List collectRows(Batch batch) throws Exception {
+ List out = new ArrayList<>();
+ InputPartition[] partitions = batch.planInputPartitions();
+
+ for (InputPartition partition : partitions) {
+ // Use a fresh factory per partition to avoid reusing stateful prefetch readers in
+ // this direct in-process harness.
+ PartitionReaderFactory readerFactory = batch.createReaderFactory();
+ PartitionReader reader = readerFactory.createReader(partition);
+ try {
+ while (reader.next()) {
+ // Reader rows can be reused/mutable.
+ out.add(reader.get().copy());
+ }
+ } finally {
+ reader.close();
+ }
+ }
+ return out;
+ }
+
+ private static List expectedRows(
+ long insertCommitTimestampMicros, long deleteCommitTimestampMicros) {
+ List rows = new ArrayList<>();
+ // id, name, _change_type, _commit_version, _commit_timestamp
+ //
+ // Spark's INSERT VALUES splits the two rows into separate Delta data files (one row per
+ // file, due to default shuffle partitioning). DELETE WHERE id=1 then affects only Alice's
+ // file: it emits a RemoveFile for that file (preimage = Alice) and writes no AddFile
+ // because the surviving row count is 0. Bob's file is left untouched, so Bob does not
+ // appear in the v2 change set.
+ rows.add(row(1L, "Alice", "insert", 1L, insertCommitTimestampMicros));
+ rows.add(row(2L, "Bob", "insert", 1L, insertCommitTimestampMicros));
+ rows.add(row(1L, "Alice", "delete", 2L, deleteCommitTimestampMicros));
+ return rows;
+ }
+
+ private Map loadCommitTimestampsMicros(String tableName) {
+ List history = spark.sql(String.format("DESCRIBE HISTORY %s", tableName)).collectAsList();
+ Map versionToTimestampMicros = new HashMap<>();
+ for (Row row : history) {
+ long version = ((Number) row.getAs("version")).longValue();
+ Timestamp ts = row.getAs("timestamp");
+ versionToTimestampMicros.put(version, ts.getTime() * 1000L);
+ }
+ return versionToTimestampMicros;
+ }
+
+ private static ExpectedRow row(
+ long id, String name, String changeType, long commitVersion, long commitTimestampMicros) {
+ return new ExpectedRow(id, name, changeType, commitVersion, commitTimestampMicros);
+ }
+
+ private static void assertRowsEqual(
+ List actual,
+ List expected,
+ int idIndex,
+ int nameIndex,
+ int changeTypeIndex,
+ int commitVersionIndex,
+ int commitTimestampIndex) {
+ for (int i = 0; i < expected.size(); i++) {
+ InternalRow a = actual.get(i);
+ ExpectedRow e = expected.get(i);
+ assertEquals(e.id, a.getLong(idIndex), "id mismatch at row " + i);
+ assertEquals(e.name, a.getUTF8String(nameIndex).toString(), "name mismatch at row " + i);
+ assertEquals(
+ e.changeType,
+ a.getUTF8String(changeTypeIndex).toString(),
+ "change_type mismatch at row " + i);
+ assertEquals(
+ e.commitVersion, a.getLong(commitVersionIndex), "commit_version mismatch at row " + i);
+ assertEquals(
+ e.commitTimestampMicros,
+ a.getLong(commitTimestampIndex),
+ "commit_timestamp mismatch at row " + i);
+ }
+ }
+
+ private static class ExpectedRow {
+ private final long id;
+ private final String name;
+ private final String changeType;
+ private final long commitVersion;
+ private final long commitTimestampMicros;
+
+ private ExpectedRow(
+ long id, String name, String changeType, long commitVersion, long commitTimestampMicros) {
+ this.id = id;
+ this.name = name;
+ this.changeType = changeType;
+ this.commitVersion = commitVersion;
+ this.commitTimestampMicros = commitTimestampMicros;
+ }
+ }
+}
diff --git a/spark/v2/src/test/scala-shims/spark-4.2/io/delta/spark/internal/v2/read/changelog/DeltaChangelogTestBase.java b/spark/v2/src/test/scala-shims/spark-4.2/io/delta/spark/internal/v2/read/changelog/DeltaChangelogTestBase.java
new file mode 100644
index 00000000000..64e201bceb1
--- /dev/null
+++ b/spark/v2/src/test/scala-shims/spark-4.2/io/delta/spark/internal/v2/read/changelog/DeltaChangelogTestBase.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright (2025) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.delta.spark.internal.v2.read.changelog;
+
+import io.delta.kernel.defaults.engine.DefaultEngine;
+import io.delta.spark.internal.v2.DeltaV2TestBase;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.BeforeAll;
+
+/**
+ * Test base for the V2 changelog tests.
+ *
+ * Swaps the default {@link DeltaV2TestBase} Spark session for one configured with:
+ *
+ *
+ * - The hybrid {@code DeltaCatalog} (spark-unified) as {@code spark_catalog}, so {@code
+ * TableCatalog.loadChangelog} routes into the {@code ChangelogSupport} trait.
+ *
- {@code spark.databricks.delta.changelogV2.enabled = true}, the SQLConf gate behind which
+ * Auto-CDF is hidden in production.
+ *
+ *
+ * Keeping these two settings out of {@code DeltaV2TestBase} means the rest of the V2 test suites
+ * continue to exercise the V1-only catalog and the gated-off catalog default that production users
+ * hit.
+ */
+public abstract class DeltaChangelogTestBase extends DeltaV2TestBase {
+
+ @BeforeAll
+ public static void setUpChangelogSparkAndEngine() {
+ if (spark != null) {
+ spark.stop();
+ }
+ spark =
+ SparkSession.builder()
+ .master("local[*]")
+ .appName("SparkKernelDsv2ChangelogTests")
+ .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtensionV1")
+ .config(
+ "spark.sql.catalog.spark_catalog",
+ "org.apache.spark.sql.delta.catalog.DeltaCatalog")
+ .config("spark.sql.catalog.dsv2", "io.delta.spark.internal.v2.catalog.TestCatalog")
+ .config("spark.sql.catalog.dsv2.base_path", System.getProperty("java.io.tmpdir"))
+ .config("spark.databricks.delta.changelogV2.enabled", "true")
+ .getOrCreate();
+ defaultEngine = DefaultEngine.create(spark.sessionState().newHadoopConf());
+ }
+}