diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandler.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandler.java
new file mode 100644
index 00000000000..80efc39564f
--- /dev/null
+++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandler.java
@@ -0,0 +1,473 @@
+/*
+ * Copyright (2026) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.delta.spark.internal.v2.read;
+
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.internal.SnapshotImpl;
+import io.delta.kernel.internal.actions.Metadata;
+import io.delta.kernel.internal.actions.Protocol;
+import io.delta.kernel.internal.util.Utils;
+import io.delta.kernel.utils.CloseableIterator;
+import io.delta.kernel.utils.CloseableIterator.BreakableFilterResult;
+import io.delta.spark.internal.v2.adapters.KernelMetadataAdapter;
+import io.delta.spark.internal.v2.adapters.KernelProtocolAdapter;
+import io.delta.spark.internal.v2.snapshot.DeltaSnapshotManager;
+import io.delta.spark.internal.v2.utils.StreamingHelper;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import javax.annotation.Nullable;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.delta.DeltaColumnMapping$;
+import org.apache.spark.sql.delta.DeltaErrors;
+import org.apache.spark.sql.delta.DeltaOptions;
+import org.apache.spark.sql.delta.TypeWideningMode;
+import org.apache.spark.sql.delta.schema.SchemaUtils$;
+import org.apache.spark.sql.delta.sources.DeltaSourceMetadataEvolutionSupport$;
+import org.apache.spark.sql.delta.sources.DeltaSourceMetadataTrackingLog;
+import org.apache.spark.sql.delta.sources.DeltaSourceOffset;
+import org.apache.spark.sql.delta.sources.DeltaStreamUtils;
+import org.apache.spark.sql.delta.sources.PersistedMetadata;
+import org.apache.spark.sql.delta.v2.interop.AbstractMetadata;
+import org.apache.spark.sql.delta.v2.interop.AbstractProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.immutable.Seq;
+import scala.collection.immutable.Seq$;
+
+/**
+ * V2 port of V1's {@code DeltaSourceMetadataEvolutionSupport} trait. Handles metadata evolution
+ * (schema, table configuration, or protocol changes) for the v2 Delta streaming source.
+ *
+ *
To safely evolve schema mid-stream, this class intercepts streaming at several stages to:
+ *
+ *
+ * - Capture metadata changes within a stream.
+ *
- Stop {@code latestOffset} from crossing a metadata change boundary.
+ *
- Ensure the batch prior to the change can still be served correctly.
+ *
- Fail the stream if and only if the prior batch was served successfully.
+ *
- Write the new metadata to the tracking log before the stream fails so the restarted stream
+ * picks up the updated schema.
+ *
+ *
+ * See the trait-level Scaladoc on {@code
+ * org.apache.spark.sql.delta.sources.DeltaSourceMetadataEvolutionSupport} for the full barrier
+ * protocol details. Validation logic shared with v1 lives in the {@code
+ * DeltaSourceMetadataEvolutionSupport} companion object; this class delegates to those static
+ * utilities.
+ */
+public class MetadataEvolutionHandler {
+
+ private static final Logger logger = LoggerFactory.getLogger(MetadataEvolutionHandler.class);
+
+ private final SparkSession spark;
+ private final String tableId;
+ private final String tablePath;
+ private final DeltaSnapshotManager snapshotManager;
+ private final Engine engine;
+ private final DeltaOptions options;
+ private final DeltaStreamUtils.SchemaReadOptions schemaReadOptions;
+ private final Option metadataTrackingLog;
+
+ // Read-time state captured at source initialization
+ private final Metadata readMetadataAtSourceInit;
+ private final Protocol readProtocolAtSourceInit;
+ private final String metadataPath;
+
+ /** The persisted metadata at source init, if any. */
+ private final PersistedMetadata persistedMetadataAtSourceInit;
+
+ public MetadataEvolutionHandler(
+ SparkSession spark,
+ String tableId,
+ String tablePath,
+ DeltaSnapshotManager snapshotManager,
+ Engine engine,
+ DeltaOptions options,
+ DeltaStreamUtils.SchemaReadOptions schemaReadOptions,
+ Option metadataTrackingLog,
+ Metadata readMetadataAtSourceInit,
+ Protocol readProtocolAtSourceInit,
+ String metadataPath) {
+ this.spark = Objects.requireNonNull(spark);
+ this.tableId = Objects.requireNonNull(tableId);
+ this.tablePath = Objects.requireNonNull(tablePath);
+ this.snapshotManager = Objects.requireNonNull(snapshotManager);
+ this.engine = Objects.requireNonNull(engine);
+ this.options = Objects.requireNonNull(options);
+ this.schemaReadOptions = Objects.requireNonNull(schemaReadOptions);
+ this.metadataTrackingLog = Objects.requireNonNull(metadataTrackingLog);
+ this.readMetadataAtSourceInit = Objects.requireNonNull(readMetadataAtSourceInit);
+ this.readProtocolAtSourceInit = Objects.requireNonNull(readProtocolAtSourceInit);
+ this.metadataPath = Objects.requireNonNull(metadataPath);
+ this.persistedMetadataAtSourceInit =
+ metadataTrackingLog.isDefined()
+ && metadataTrackingLog.get().getCurrentTrackedMetadata().isDefined()
+ ? metadataTrackingLog.get().getCurrentTrackedMetadata().get()
+ : null;
+ }
+
+ /**
+ * Whether this source uses the metadata tracking log as its read schema. False when the log is
+ * absent/empty or unsafe column-mapping reads are enabled.
+ *
+ * V2 port of V1's {@code DeltaSourceMetadataEvolutionSupport.shouldTrackMetadataChange}.
+ */
+ public boolean shouldTrackMetadataChange() {
+ return DeltaSourceMetadataEvolutionSupport$.MODULE$.shouldTrackMetadataChange(
+ schemaReadOptions, metadataTrackingLog);
+ }
+
+ /**
+ * Whether the tracking log is provided but still empty, so it should be initialized eagerly on
+ * the first batch. Should only be consulted before the first write to the log.
+ *
+ *
V2 port of V1's {@code
+ * DeltaSourceMetadataEvolutionSupport.shouldInitializeMetadataTrackingEagerly}.
+ */
+ public boolean shouldInitializeMetadataTrackingEagerly() {
+ return DeltaSourceMetadataEvolutionSupport$.MODULE$.shouldInitializeMetadataTrackingEagerly(
+ schemaReadOptions, metadataTrackingLog);
+ }
+
+ // ---------------------------------------------------------------------------
+ // Offset barrier protocol
+ // ---------------------------------------------------------------------------
+
+ /**
+ * Truncate the file change iterator at the schema change barrier (inclusive).
+ *
+ *
This ensures a batch never crosses a schema change boundary — it stops at the barrier
+ * IndexedFile so the batch can be committed before the schema evolution takes effect.
+ */
+ public CloseableIterator stopIndexedFileIteratorAtSchemaChangeBarrier(
+ CloseableIterator fileActions) {
+ // Lazily include files up to and including the barrier, then break.
+ boolean[] sawBarrier = {false};
+ return fileActions.breakableFilter(
+ file -> {
+ if (sawBarrier[0]) {
+ return BreakableFilterResult.BREAK;
+ }
+ if (file.getIndex() == DeltaSourceOffset.METADATA_CHANGE_INDEX()) {
+ sawBarrier[0] = true;
+ }
+ return BreakableFilterResult.INCLUDE;
+ });
+ }
+
+ /**
+ * Returns a single barrier {@link IndexedFile} at {@code METADATA_CHANGE_INDEX} when tracking is
+ * on and the given metadata/protocol differ from the init state; empty otherwise.
+ *
+ * V2 port of V1's {@code
+ * DeltaSourceMetadataEvolutionSupport.getMetadataOrProtocolChangeIndexedFileIterator}.
+ */
+ public CloseableIterator getMetadataOrProtocolChangeIndexedFileIterator(
+ Metadata metadata, Protocol protocol, long version) {
+ if (shouldTrackMetadataChange()
+ && hasMetadataOrProtocolChangeComparedToStreamMetadata(metadata, protocol, version)) {
+ return Utils.toCloseableIterator(
+ Collections.singletonList(
+ IndexedFile.sentinel(version, DeltaSourceOffset.METADATA_CHANGE_INDEX()))
+ .iterator());
+ }
+ return Utils.toCloseableIterator(Collections.emptyIterator());
+ }
+
+ /**
+ * Drives the two-barrier protocol when the previous offset sits on a barrier: advances {@code
+ * METADATA_CHANGE_INDEX} to {@code POST_METADATA_CHANGE_INDEX}, blocks at {@code
+ * POST_METADATA_CHANGE_INDEX} if the change is still pending, or returns empty when there is no
+ * pending schema change.
+ *
+ * V2 port of V1's {@code
+ * DeltaSourceMetadataEvolutionSupport.getNextOffsetFromPreviousOffsetIfPendingSchemaChange}.
+ */
+ public Optional getNextOffsetFromPreviousOffsetIfPendingSchemaChange(
+ DeltaSourceOffset previousOffset) {
+ if (previousOffset.index() == DeltaSourceOffset.METADATA_CHANGE_INDEX()) {
+ return Optional.of(
+ previousOffset.copy(
+ previousOffset.reservoirId(),
+ previousOffset.reservoirVersion(),
+ DeltaSourceOffset.POST_METADATA_CHANGE_INDEX(),
+ previousOffset.isInitialSnapshot()));
+ }
+
+ if (previousOffset.index() == DeltaSourceOffset.POST_METADATA_CHANGE_INDEX()) {
+ // Check if schema evolution has actually occurred; if not, block.
+ Metadata metadata = collectMetadataAtVersion(previousOffset.reservoirVersion());
+ Protocol protocol = collectProtocolAtVersion(previousOffset.reservoirVersion());
+ if (hasMetadataOrProtocolChangeComparedToStreamMetadata(
+ metadata, protocol, previousOffset.reservoirVersion())) {
+ return Optional.of(previousOffset);
+ }
+ }
+
+ return Optional.empty();
+ }
+
+ // ---------------------------------------------------------------------------
+ // Commit-time evolution
+ // ---------------------------------------------------------------------------
+
+ /**
+ * Called from {@code commit()}: when the committed offset is a schema-change barrier, writes the
+ * new metadata to the tracking log and fails the stream to trigger restart under the new schema.
+ * No-op for non-barrier offsets or when tracking is disabled.
+ *
+ * V2 port of V1's {@code
+ * DeltaSourceMetadataEvolutionSupport.updateMetadataTrackingLogAndFailTheStreamIfNeeded(Offset)}.
+ */
+ public void updateMetadataTrackingLogAndFailTheStreamIfNeeded(DeltaSourceOffset offset) {
+ if (!shouldTrackMetadataChange()) {
+ return;
+ }
+ if (offset.index() != DeltaSourceOffset.METADATA_CHANGE_INDEX()
+ && offset.index() != DeltaSourceOffset.POST_METADATA_CHANGE_INDEX()) {
+ return;
+ }
+
+ Metadata changedMetadata = collectMetadataAtVersion(offset.reservoirVersion());
+ Protocol changedProtocol = collectProtocolAtVersion(offset.reservoirVersion());
+
+ updateMetadataTrackingLogAndFailTheStreamIfNeeded(
+ changedMetadata, changedProtocol, offset.reservoirVersion(), /* replace= */ false);
+ }
+
+ /**
+ * Writes the changed metadata/protocol to the tracking log at {@code version} and throws to fail
+ * the stream. No-op when the change matches the current init state. With {@code replace=true},
+ * the new entry logically replaces the current latest entry instead of being appended.
+ *
+ *
V2 port of V1's {@code
+ * DeltaSourceMetadataEvolutionSupport.updateMetadataTrackingLogAndFailTheStreamIfNeeded(Option,
+ * Option, Long, Boolean)}.
+ */
+ public void updateMetadataTrackingLogAndFailTheStreamIfNeeded(
+ Metadata changedMetadata, Protocol changedProtocol, long version, boolean replace) {
+ if (!hasMetadataOrProtocolChangeComparedToStreamMetadata(
+ changedMetadata, changedProtocol, version)) {
+ return;
+ }
+
+ Metadata metadataToUse = changedMetadata != null ? changedMetadata : readMetadataAtSourceInit;
+ Protocol protocolToUse = changedProtocol != null ? changedProtocol : readProtocolAtSourceInit;
+
+ PersistedMetadata schemaToPersist =
+ PersistedMetadata.apply(
+ tableId,
+ version,
+ new KernelMetadataAdapter(metadataToUse),
+ new KernelProtocolAdapter(protocolToUse),
+ metadataPath);
+
+ metadataTrackingLog.get().writeNewMetadata(schemaToPersist, /* replaceCurrent= */ replace);
+
+ throw (RuntimeException)
+ DeltaErrors.streamingMetadataEvolutionException(
+ schemaToPersist.dataSchema(),
+ schemaToPersist.tableConfigurations().get(),
+ schemaToPersist.protocol().get());
+ }
+
+ // ---------------------------------------------------------------------------
+ // Initialization
+ // ---------------------------------------------------------------------------
+
+ /**
+ * Initializes an empty tracking log on the first batch with the metadata at {@code
+ * batchStartVersion}, or — when {@code batchEndVersion} is given for an already-constructed batch
+ * — the most-recent compatible metadata in {@code [start, end]}. Throws to fail the stream if the
+ * initialized metadata differs from init or {@code alwaysFailUponLogInitialized} is set.
+ *
+ *
V2 port of V1's {@code
+ * DeltaSourceMetadataEvolutionSupport.initializeMetadataTrackingAndExitStream}.
+ */
+ public void initializeMetadataTrackingAndExitStream(
+ long batchStartVersion,
+ @Nullable Long batchEndVersion,
+ boolean alwaysFailUponLogInitialized) {
+ long version;
+ Metadata metadata;
+ Protocol protocol;
+
+ if (batchEndVersion != null) {
+ // Validate no incompatible changes in the range, use the end version's metadata
+ ValidatedMetadataAndProtocol validated =
+ validateAndResolveMetadataForLogInitialization(batchStartVersion, batchEndVersion);
+ version = batchEndVersion;
+ metadata = validated.metadata;
+ protocol = validated.protocol;
+ } else {
+ SnapshotImpl snapshot = (SnapshotImpl) snapshotManager.loadSnapshotAt(batchStartVersion);
+ version = snapshot.getVersion();
+ metadata = snapshot.getMetadata();
+ protocol = snapshot.getProtocol();
+ }
+
+ PersistedMetadata newMetadata =
+ PersistedMetadata.apply(
+ tableId,
+ version,
+ new KernelMetadataAdapter(metadata),
+ new KernelProtocolAdapter(protocol),
+ metadataPath);
+ metadataTrackingLog.get().writeNewMetadata(newMetadata, false);
+
+ if (hasMetadataOrProtocolChangeComparedToStreamMetadata(metadata, protocol, version)
+ || alwaysFailUponLogInitialized) {
+ throw (RuntimeException)
+ DeltaErrors.streamingMetadataEvolutionException(
+ newMetadata.dataSchema(),
+ newMetadata.tableConfigurations().get(),
+ newMetadata.protocol().get());
+ }
+ }
+
+ // ---------------------------------------------------------------------------
+ // Private helpers
+ // ---------------------------------------------------------------------------
+
+ /** Delegates to the shared static method in {@code DeltaSourceMetadataEvolutionSupport}. */
+ private boolean hasMetadataOrProtocolChangeComparedToStreamMetadata(
+ Metadata newMetadata, Protocol newProtocol, long newSchemaVersion) {
+ Option metadataOpt =
+ newMetadata != null
+ ? Option.apply((AbstractMetadata) new KernelMetadataAdapter(newMetadata))
+ : Option.empty();
+ Option protocolOpt =
+ newProtocol != null
+ ? Option.apply((AbstractProtocol) new KernelProtocolAdapter(newProtocol))
+ : Option.empty();
+ Option persistedOpt =
+ persistedMetadataAtSourceInit != null
+ ? Option.apply(persistedMetadataAtSourceInit)
+ : Option.empty();
+
+ return DeltaSourceMetadataEvolutionSupport$.MODULE$
+ .hasMetadataOrProtocolChangeComparedToStreamMetadata(
+ metadataOpt,
+ protocolOpt,
+ newSchemaVersion,
+ persistedOpt,
+ new KernelProtocolAdapter(readProtocolAtSourceInit),
+ new KernelMetadataAdapter(readMetadataAtSourceInit),
+ spark);
+ }
+
+ /** Collect the metadata action at a specific version. Returns null if none. */
+ private Metadata collectMetadataAtVersion(long version) {
+ return collectMetadataActions(version, version).get(version);
+ }
+
+ /** Collect all metadata actions between start and end version, both inclusive. */
+ private Map collectMetadataActions(long startVersion, long endVersion) {
+ return StreamingHelper.collectMetadataActionsFromRangeUnsafe(
+ startVersion, Optional.of(endVersion), snapshotManager, engine, tablePath);
+ }
+
+ /** Collect the protocol action at a specific version. Returns null if none. */
+ private Protocol collectProtocolAtVersion(long version) {
+ return collectProtocolActions(version, version).get(version);
+ }
+
+ /** Collect all protocol actions between start and end version, both inclusive. */
+ private Map collectProtocolActions(long startVersion, long endVersion) {
+ return StreamingHelper.collectProtocolActionsFromRangeUnsafe(
+ startVersion, Optional.of(endVersion), snapshotManager, engine, tablePath);
+ }
+
+ /**
+ * V2 port of V1's {@code
+ * DeltaSourceMetadataEvolutionSupport.validateAndResolveMetadataForLogInitialization}.
+ */
+ private ValidatedMetadataAndProtocol validateAndResolveMetadataForLogInitialization(
+ long startVersion, long endVersion) {
+ List metadataChanges =
+ new ArrayList<>(collectMetadataActions(startVersion, endVersion).values());
+ SnapshotImpl startSnapshot = (SnapshotImpl) snapshotManager.loadSnapshotAt(startVersion);
+ Metadata startMetadata = startSnapshot.getMetadata();
+
+ // Try to find rename or drop columns in between, or nullability/datatype changes by using
+ // the last schema as the read schema. If so, we cannot find a good read schema.
+ // Otherwise, the most recent metadata change will be the most encompassing schema as well.
+ Metadata mostRecentMetadataChange =
+ metadataChanges.isEmpty() ? null : metadataChanges.get(metadataChanges.size() - 1);
+ if (mostRecentMetadataChange != null) {
+ KernelMetadataAdapter mostRecentAdapter = new KernelMetadataAdapter(mostRecentMetadataChange);
+ // Validate startMetadata + all intermediate changes against the most recent
+ List otherMetadataChanges = new ArrayList<>();
+ otherMetadataChanges.add(startMetadata);
+ otherMetadataChanges.addAll(metadataChanges.subList(0, metadataChanges.size() - 1));
+ for (Metadata potentialSchemaChangeMetadata : otherMetadataChanges) {
+ KernelMetadataAdapter potentialAdapter =
+ new KernelMetadataAdapter(potentialSchemaChangeMetadata);
+ if (!DeltaColumnMapping$.MODULE$.hasNoColumnMappingSchemaChanges(
+ mostRecentAdapter, potentialAdapter, false)
+ || !SchemaUtils$.MODULE$.isReadCompatible(
+ potentialAdapter.schema(),
+ mostRecentAdapter.schema(),
+ /* forbidTightenNullability= */ true,
+ /* allowMissingColumns= */ false,
+ TypeWideningMode.NoTypeWidening$.MODULE$,
+ (Seq) Seq$.MODULE$.empty(),
+ (Seq) Seq$.MODULE$.empty(),
+ /* caseSensitive= */ true,
+ /* allowVoidTypeChange= */ false)) {
+ throw (RuntimeException)
+ DeltaErrors.streamingMetadataLogInitFailedIncompatibleMetadataException(
+ startVersion, endVersion);
+ }
+ }
+ }
+
+ // Check protocol changes and use the most supportive protocol
+ Protocol mostSupportiveProtocol = startSnapshot.getProtocol();
+ List protocolChanges =
+ new ArrayList<>(collectProtocolActions(startVersion, endVersion).values());
+ for (Protocol p : protocolChanges) {
+ if (p.getReaderAndWriterFeatures()
+ .containsAll(mostSupportiveProtocol.getReaderAndWriterFeatures())) {
+ mostSupportiveProtocol = p;
+ } else {
+ // TODO: or use protocol union instead?
+ throw (RuntimeException)
+ DeltaErrors.streamingMetadataLogInitFailedIncompatibleMetadataException(
+ startVersion, endVersion);
+ }
+ }
+
+ Metadata resolvedMetadata =
+ mostRecentMetadataChange != null ? mostRecentMetadataChange : startMetadata;
+ return new ValidatedMetadataAndProtocol(resolvedMetadata, mostSupportiveProtocol);
+ }
+
+ private static class ValidatedMetadataAndProtocol {
+ final Metadata metadata;
+ final Protocol protocol;
+
+ ValidatedMetadataAndProtocol(Metadata metadata, Protocol protocol) {
+ this.metadata = metadata;
+ this.protocol = protocol;
+ }
+ }
+}
diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/utils/StreamingHelper.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/utils/StreamingHelper.java
index 28b4ef2351d..b6968ff056e 100644
--- a/spark/v2/src/main/java/io/delta/spark/internal/v2/utils/StreamingHelper.java
+++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/utils/StreamingHelper.java
@@ -28,6 +28,7 @@
import io.delta.kernel.internal.actions.AddFile;
import io.delta.kernel.internal.actions.CommitInfo;
import io.delta.kernel.internal.actions.Metadata;
+import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.actions.RemoveFile;
import io.delta.kernel.internal.commitrange.CommitRangeImpl;
import io.delta.kernel.internal.data.StructRow;
@@ -149,6 +150,15 @@ public static Optional getMetadata(ColumnarBatch columnarBatch, int ro
return Optional.ofNullable(metadata);
}
+ /** Get Protocol action from a batch at the specified row, if present. */
+ public static Optional getProtocol(ColumnarBatch batch, int rowId) {
+ int protocolIdx = getFieldIndex(batch, DeltaLogActionUtils.DeltaAction.PROTOCOL.colName);
+ ColumnVector protocolVector = batch.getColumnVector(protocolIdx);
+ Protocol protocol = Protocol.fromColumnVector(protocolVector, rowId);
+
+ return Optional.ofNullable(protocol);
+ }
+
/** Get CommitInfo action from a batch at the specified row, if present. */
public static Optional getCommitInfo(ColumnarBatch columnarBatch, int rowId) {
int commitInfoIdx =
@@ -186,20 +196,24 @@ public static CloseableIterator getCommitActionsFromRangeUnsafe(
}
/**
- * Collects metadata actions from a commit range, mapping each version to its metadata.
+ * Collects {@link Metadata} actions from commits in {@code [startVersion, endVersionOpt]}
+ * (inclusive on both ends).
*
- * This method is "unsafe" because it uses {@code getActionsFromRangeUnsafe()} which bypasses
- * the standard snapshot requirement for protocol validation.
+ *
Returns: a {@link LinkedHashMap} from commit version to its metadata action,
+ * preserving ascending version order. Versions with no metadata action are omitted.
*
- *
Returns a map preserving version order (via LinkedHashMap) where each version maps to its
- * metadata action. Throws an exception if multiple metadata actions are found in the same commit.
+ *
Throws: {@link IllegalArgumentException} if a single commit contains more than one
+ * metadata action; {@link RuntimeException} on underlying I/O errors.
*
- * @param startVersion the starting version (inclusive) of the commit range
- * @param endVersionOpt optional ending version (exclusive) of the commit range
- * @param snapshotManager the Delta snapshot manager
- * @param engine the Delta engine
- * @param tablePath the path to the Delta table
- * @return a map from version number to metadata action, in version order
+ *
Unsafe: bypasses the snapshot-based protocol validation that {@link
+ * io.delta.kernel.CommitRange#getCommitActions} would normally perform. Callers are responsible
+ * for ensuring protocol compatibility.
+ *
+ * @param startVersion inclusive starting version of the commit range
+ * @param endVersionOpt inclusive ending version, or empty to read through the latest
+ * @param snapshotManager snapshot manager backing the table
+ * @param engine Delta kernel engine
+ * @param tablePath path to the Delta table
*/
public static Map collectMetadataActionsFromRangeUnsafe(
long startVersion,
@@ -207,28 +221,93 @@ public static Map collectMetadataActionsFromRangeUnsafe(
DeltaSnapshotManager snapshotManager,
Engine engine,
String tablePath) {
+ return collectActionsFromRangeUnsafe(
+ startVersion,
+ endVersionOpt,
+ snapshotManager,
+ engine,
+ tablePath,
+ DeltaLogActionUtils.DeltaAction.METADATA,
+ StreamingHelper::getMetadata);
+ }
+
+ /**
+ * Collects {@link Protocol} actions from commits in {@code [startVersion, endVersionOpt]}
+ * (inclusive on both ends).
+ *
+ * Returns: a {@link LinkedHashMap} from commit version to its protocol action,
+ * preserving ascending version order. Versions with no protocol action are omitted.
+ *
+ *
Throws: {@link IllegalArgumentException} if a single commit contains more than one
+ * protocol action; {@link RuntimeException} on underlying I/O errors.
+ *
+ *
Unsafe: bypasses the snapshot-based protocol validation that {@link
+ * io.delta.kernel.CommitRange#getCommitActions} would normally perform. Callers are responsible
+ * for ensuring protocol compatibility.
+ *
+ * @param startVersion inclusive starting version of the commit range
+ * @param endVersionOpt inclusive ending version, or empty to read through the latest
+ * @param snapshotManager snapshot manager backing the table
+ * @param engine Delta kernel engine
+ * @param tablePath path to the Delta table
+ */
+ public static Map collectProtocolActionsFromRangeUnsafe(
+ long startVersion,
+ Optional endVersionOpt,
+ DeltaSnapshotManager snapshotManager,
+ Engine engine,
+ String tablePath) {
+ return collectActionsFromRangeUnsafe(
+ startVersion,
+ endVersionOpt,
+ snapshotManager,
+ engine,
+ tablePath,
+ DeltaLogActionUtils.DeltaAction.PROTOCOL,
+ StreamingHelper::getProtocol);
+ }
+
+ /** Extracts an action of type {@code T} from a single row of a {@link ColumnarBatch}. */
+ @FunctionalInterface
+ private interface RowExtractor {
+ Optional extract(ColumnarBatch batch, int rowId);
+ }
+
+ /**
+ * Shared implementation for {@link #collectMetadataActionsFromRangeUnsafe} and {@link
+ * #collectProtocolActionsFromRangeUnsafe}: walks the commit range filtered to {@code actionType},
+ * applies {@code extractor} per row, and rejects commits with more than one matching action.
+ */
+ private static Map collectActionsFromRangeUnsafe(
+ long startVersion,
+ Optional endVersionOpt,
+ DeltaSnapshotManager snapshotManager,
+ Engine engine,
+ String tablePath,
+ DeltaLogActionUtils.DeltaAction actionType,
+ RowExtractor extractor) {
CommitRangeImpl commitRange =
(CommitRangeImpl) snapshotManager.getTableChanges(engine, startVersion, endVersionOpt);
// LinkedHashMap to preserve insertion order
- Map versionToMetadata = new LinkedHashMap<>();
+ Map versionToAction = new LinkedHashMap<>();
try (CloseableIterator commitsIter =
- getCommitActionsFromRangeUnsafe(
- engine, commitRange, tablePath, Set.of(DeltaLogActionUtils.DeltaAction.METADATA))) {
+ getCommitActionsFromRangeUnsafe(engine, commitRange, tablePath, Set.of(actionType))) {
while (commitsIter.hasNext()) {
try (CommitActions commit = commitsIter.next()) {
long version = commit.getVersion();
try (CloseableIterator actionsIter = commit.getActions()) {
while (actionsIter.hasNext()) {
- ColumnarBatch columnarBatch = actionsIter.next();
- int numRows = columnarBatch.getSize();
+ ColumnarBatch batch = actionsIter.next();
+ int numRows = batch.getSize();
for (int rowId = 0; rowId < numRows; rowId++) {
- Optional metadataOpt = StreamingHelper.getMetadata(columnarBatch, rowId);
- if (metadataOpt.isPresent()) {
- Metadata existing = versionToMetadata.putIfAbsent(version, metadataOpt.get());
+ Optional actionOpt = extractor.extract(batch, rowId);
+ if (actionOpt.isPresent()) {
+ T existing = versionToAction.putIfAbsent(version, actionOpt.get());
Preconditions.checkArgument(
existing == null,
- "Should not encounter two metadata actions in the same commit of version %d",
+ "Should not encounter two %s actions in the same commit of version %d",
+ actionType.colName,
version);
}
}
@@ -244,8 +323,7 @@ public static Map collectMetadataActionsFromRangeUnsafe(
// CommitActions.close() throws Exception
throw new RuntimeException("Failed to process commits", e);
}
-
- return versionToMetadata;
+ return versionToAction;
}
/** Get explicit CDC file (AddCDCFile) from a batch at the specified row, if present. */
diff --git a/spark/v2/src/test/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandlerTest.java b/spark/v2/src/test/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandlerTest.java
new file mode 100644
index 00000000000..299933c26f1
--- /dev/null
+++ b/spark/v2/src/test/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandlerTest.java
@@ -0,0 +1,972 @@
+/*
+ * Copyright (2026) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.delta.spark.internal.v2.read;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import io.delta.kernel.CommitRange;
+import io.delta.kernel.Snapshot;
+import io.delta.kernel.data.ArrayValue;
+import io.delta.kernel.data.ColumnVector;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.internal.DeltaHistoryManager;
+import io.delta.kernel.internal.SnapshotImpl;
+import io.delta.kernel.internal.actions.Format;
+import io.delta.kernel.internal.actions.Metadata;
+import io.delta.kernel.internal.actions.Protocol;
+import io.delta.kernel.internal.util.InternalUtils;
+import io.delta.kernel.internal.util.Utils;
+import io.delta.kernel.internal.util.VectorUtils;
+import io.delta.kernel.types.IntegerType;
+import io.delta.kernel.types.StringType;
+import io.delta.kernel.types.StructType;
+import io.delta.kernel.utils.CloseableIterator;
+import io.delta.spark.internal.v2.DeltaV2TestBase;
+import io.delta.spark.internal.v2.adapters.KernelMetadataAdapter;
+import io.delta.spark.internal.v2.adapters.KernelProtocolAdapter;
+import io.delta.spark.internal.v2.exception.VersionNotFoundException;
+import io.delta.spark.internal.v2.snapshot.DeltaSnapshotManager;
+import io.delta.spark.internal.v2.snapshot.PathBasedSnapshotManager;
+import java.io.File;
+import java.util.*;
+import org.apache.spark.sql.delta.DeltaOptions;
+import org.apache.spark.sql.delta.DeltaRuntimeException;
+import org.apache.spark.sql.delta.sources.DeltaSourceMetadataTrackingLog;
+import org.apache.spark.sql.delta.sources.DeltaSourceOffset;
+import org.apache.spark.sql.delta.sources.DeltaStreamUtils;
+import org.apache.spark.sql.delta.sources.PersistedMetadata;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import scala.Option;
+
+/** Unit tests for {@link MetadataEvolutionHandler}. */
+public class MetadataEvolutionHandlerTest extends DeltaV2TestBase {
+
+ // ---------------------------------------------------------------------------
+ // Shared test fixtures
+ // ---------------------------------------------------------------------------
+
+ /** A simple (c1 INT, c2 STRING) schema used as the default for most tests. */
+ private static final StructType DEFAULT_KERNEL_SCHEMA =
+ new StructType().add("c1", IntegerType.INTEGER).add("c2", StringType.STRING);
+
+ private static final String DEFAULT_SCHEMA_JSON =
+ "{\"type\":\"struct\",\"fields\":["
+ + "{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},"
+ + "{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}";
+
+ /** Default kernel Metadata action used as the handler's init metadata in most tests. */
+ private static final Metadata DEFAULT_METADATA =
+ new Metadata(
+ "test-id",
+ Optional.empty(),
+ Optional.empty(),
+ new Format("parquet", Collections.emptyMap()),
+ DEFAULT_SCHEMA_JSON,
+ DEFAULT_KERNEL_SCHEMA,
+ emptyArrayValue(),
+ Optional.empty(),
+ VectorUtils.stringStringMapValue(Collections.emptyMap()));
+
+ /** Default kernel Protocol action: reader=1, writer=2, no features. */
+ private static final Protocol DEFAULT_PROTOCOL = new Protocol(1, 2);
+
+ private static final scala.collection.immutable.Map EMPTY_SCALA_MAP =
+ scala.collection.immutable.Map$.MODULE$.empty();
+
+ private static ArrayValue emptyArrayValue() {
+ return new ArrayValue() {
+ @Override
+ public int getSize() {
+ return 0;
+ }
+
+ @Override
+ public ColumnVector getElements() {
+ return InternalUtils.singletonStringColumnVector("");
+ }
+ };
+ }
+
+ /**
+ * A no-op snapshot manager that throws on any call. Used for tests that only exercise handler
+ * logic (tracking state, offset arithmetic, iterator manipulation) without hitting the delta log.
+ */
+ private static final DeltaSnapshotManager THROWING_SNAPSHOT_MANAGER =
+ new DeltaSnapshotManager() {
+ @Override
+ public Snapshot loadLatestSnapshot() {
+ throw new UnsupportedOperationException("not expected in this test");
+ }
+
+ @Override
+ public Snapshot loadSnapshotAt(long version) {
+ throw new UnsupportedOperationException("not expected in this test");
+ }
+
+ @Override
+ public DeltaHistoryManager.Commit getActiveCommitAtTime(
+ long ts, boolean last, boolean recreatable, boolean earliest) {
+ throw new UnsupportedOperationException("not expected in this test");
+ }
+
+ @Override
+ public void checkVersionExists(long version, boolean recreatable, boolean allowOOR)
+ throws VersionNotFoundException {
+ throw new UnsupportedOperationException("not expected in this test");
+ }
+
+ @Override
+ public CommitRange getTableChanges(
+ Engine engine, long startVersion, Optional endVersion) {
+ throw new UnsupportedOperationException("not expected in this test");
+ }
+ };
+
+ // ---------------------------------------------------------------------------
+ // Builder helpers
+ // ---------------------------------------------------------------------------
+
+ private static DeltaStreamUtils.SchemaReadOptions schemaReadOptions(
+ boolean allowUnsafeColumnMappingRead) {
+ return new DeltaStreamUtils.SchemaReadOptions(
+ /* allowUnsafeStreamingReadOnColumnMappingSchemaChanges= */ allowUnsafeColumnMappingRead,
+ /* allowUnsafeStreamingReadOnPartitionColumnChanges= */ false,
+ /* forceEnableStreamingReadOnReadIncompatibleSchemaChangesDuringStreamStart= */ false,
+ /* forceEnableUnsafeReadOnNullabilityChange= */ false,
+ /* isStreamingFromColumnMappingTable= */ false,
+ /* typeWideningEnabled= */ false,
+ /* enableSchemaTrackingForTypeWidening= */ false);
+ }
+
+ private DeltaOptions emptyDeltaOptions() {
+ return new DeltaOptions(EMPTY_SCALA_MAP, spark.sessionState().conf());
+ }
+
+ /**
+ * Build a lightweight handler that uses {@link #THROWING_SNAPSHOT_MANAGER}. Suitable for tests
+ * that only exercise tracking state, offset arithmetic, or iterator manipulation — NOT for tests
+ * that call private helpers like collectMetadataAtVersion.
+ */
+ private MetadataEvolutionHandler buildLightweightHandler(
+ Option trackingLog,
+ DeltaStreamUtils.SchemaReadOptions readOptions) {
+ return new MetadataEvolutionHandler(
+ spark,
+ "test-table-id",
+ "/tmp/fake-table",
+ THROWING_SNAPSHOT_MANAGER,
+ defaultEngine,
+ emptyDeltaOptions(),
+ readOptions,
+ trackingLog,
+ DEFAULT_METADATA,
+ DEFAULT_PROTOCOL,
+ "/tmp/fake-table/_delta_log/_streaming_metadata");
+ }
+
+ /**
+ * Build a handler backed by a real Delta table on disk, with a real tracking log. The handler's
+ * init state (schema, protocol, config) is captured from the table at {@code initVersion}.
+ *
+ * @param tablePath path to an already-created Delta table
+ * @param initVersion the version whose metadata becomes the handler's init state
+ * @param seedLogWithInitEntry if true, writes an initial entry to the tracking log at initVersion
+ * so that {@code shouldTrackMetadataChange()} returns true
+ */
+ private HandlerWithLog buildHandlerWithRealTable(
+ String tablePath, long initVersion, boolean seedLogWithInitEntry) {
+ PathBasedSnapshotManager snapshotManager =
+ new PathBasedSnapshotManager(tablePath, spark.sessionState().newHadoopConf());
+ SnapshotImpl snapshot = (SnapshotImpl) snapshotManager.loadSnapshotAt(initVersion);
+ Metadata tableMetadata = snapshot.getMetadata();
+ Protocol tableProtocol = snapshot.getProtocol();
+ KernelMetadataAdapter adapter = new KernelMetadataAdapter(tableMetadata);
+
+ String metadataLogPath = tablePath + "/metadata_log";
+ DeltaSourceMetadataTrackingLog trackingLog =
+ DeltaSourceMetadataTrackingLog.create(
+ spark,
+ metadataLogPath,
+ "test-table-id",
+ tablePath,
+ EMPTY_SCALA_MAP,
+ Option.empty(),
+ /* mergeConsecutiveSchemaChanges= */ false,
+ /* consecutiveSchemaChangesMerger= */ Option.empty(),
+ /* initMetadataLogEagerly= */ true);
+
+ if (seedLogWithInitEntry) {
+ PersistedMetadata entry =
+ PersistedMetadata.apply(
+ "test-table-id",
+ initVersion,
+ adapter,
+ new KernelProtocolAdapter(tableProtocol),
+ tablePath + "/_delta_log/_streaming_metadata");
+ trackingLog.writeNewMetadata(entry, false);
+ }
+
+ MetadataEvolutionHandler handler =
+ new MetadataEvolutionHandler(
+ spark,
+ "test-table-id",
+ tablePath,
+ snapshotManager,
+ defaultEngine,
+ emptyDeltaOptions(),
+ schemaReadOptions(/* allowUnsafeColumnMappingRead= */ false),
+ Option.apply(trackingLog),
+ tableMetadata,
+ tableProtocol,
+ tablePath + "/_delta_log/_streaming_metadata");
+
+ return new HandlerWithLog(handler, trackingLog);
+ }
+
+ /**
+ * Create a standalone tracking log on disk (not tied to a real table). Useful for tests that only
+ * need to control the tracking log state without a real Delta table.
+ */
+ private DeltaSourceMetadataTrackingLog createStandaloneTrackingLog(
+ File logDir, boolean seedWithDefaultEntry) {
+ String metadataPath = logDir.getAbsolutePath() + "/metadata_log";
+ DeltaSourceMetadataTrackingLog trackingLog =
+ DeltaSourceMetadataTrackingLog.create(
+ spark,
+ metadataPath,
+ "test-table-id",
+ "/tmp/fake-table",
+ EMPTY_SCALA_MAP,
+ Option.empty(),
+ /* mergeConsecutiveSchemaChanges= */ false,
+ /* consecutiveSchemaChangesMerger= */ Option.empty(),
+ /* initMetadataLogEagerly= */ true);
+ if (seedWithDefaultEntry) {
+ PersistedMetadata entry =
+ PersistedMetadata.apply(
+ "test-table-id",
+ 0L,
+ new KernelMetadataAdapter(DEFAULT_METADATA),
+ new KernelProtocolAdapter(DEFAULT_PROTOCOL),
+ "/tmp/fake-table/_delta_log/_streaming_metadata");
+ trackingLog.writeNewMetadata(entry, false);
+ }
+ return trackingLog;
+ }
+
+ /** Collect all IndexedFiles from a CloseableIterator into a list. */
+ private static List drainIndexedFiles(CloseableIterator iter) {
+ List result = new ArrayList<>();
+ try {
+ while (iter.hasNext()) {
+ result.add(iter.next());
+ }
+ } finally {
+ try {
+ iter.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Assert that a DeltaRuntimeException is a DELTA_STREAMING_METADATA_EVOLUTION error with the
+ * expected message parameters (schema, config, protocol).
+ */
+ private static void assertMetadataEvolutionException(DeltaRuntimeException ex, String context) {
+ assertEquals(
+ "DELTA_STREAMING_METADATA_EVOLUTION",
+ ex.getErrorClass(),
+ "Should throw metadata evolution exception " + context);
+ java.util.Map params = ex.getMessageParameters();
+ assertTrue(params.containsKey("schema"), "Missing 'schema' message parameter");
+ assertTrue(params.containsKey("config"), "Missing 'config' message parameter");
+ assertTrue(params.containsKey("protocol"), "Missing 'protocol' message parameter");
+ }
+
+ /** Pairs a handler with its tracking log so tests can verify log writes after handler calls. */
+ private static class HandlerWithLog {
+ final MetadataEvolutionHandler handler;
+ final DeltaSourceMetadataTrackingLog trackingLog;
+
+ HandlerWithLog(MetadataEvolutionHandler handler, DeltaSourceMetadataTrackingLog trackingLog) {
+ this.handler = handler;
+ this.trackingLog = trackingLog;
+ }
+ }
+
+ // ---------------------------------------------------------------------------
+ // shouldTrackMetadataChange / shouldInitializeMetadataTrackingEagerly
+ //
+ // These two methods share the same preconditions (tracking log state + unsafe
+ // read flag), so we test them together for each configuration.
+ // ---------------------------------------------------------------------------
+
+ /** Both should be false when unsafe column mapping reads bypass tracking entirely. */
+ @Test
+ public void testTrackingState_disabledWhenUnsafeColumnMappingReadAllowed() {
+ MetadataEvolutionHandler handler =
+ buildLightweightHandler(
+ Option.empty(), schemaReadOptions(/* allowUnsafeColumnMappingRead= */ true));
+ assertFalse(handler.shouldTrackMetadataChange());
+ assertFalse(handler.shouldInitializeMetadataTrackingEagerly());
+ }
+
+ /** Both should be false when no tracking log is provided (Option.empty). */
+ @Test
+ public void testTrackingState_disabledWhenNoTrackingLog() {
+ MetadataEvolutionHandler handler =
+ buildLightweightHandler(
+ Option.empty(), schemaReadOptions(/* allowUnsafeColumnMappingRead= */ false));
+ assertFalse(handler.shouldTrackMetadataChange());
+ assertFalse(handler.shouldInitializeMetadataTrackingEagerly());
+ }
+
+ /**
+ * When the tracking log exists but is empty: not yet tracking (no persisted metadata to read
+ * from), but ready to initialize eagerly on the first batch.
+ */
+ @Test
+ public void testTrackingState_emptyLog_notTrackingButReadyToInitialize(@TempDir File tempDir) {
+ DeltaSourceMetadataTrackingLog emptyLog =
+ createStandaloneTrackingLog(tempDir, /* seedWithDefaultEntry= */ false);
+ MetadataEvolutionHandler handler =
+ buildLightweightHandler(
+ Option.apply(emptyLog), schemaReadOptions(/* allowUnsafeColumnMappingRead= */ false));
+ assertFalse(handler.shouldTrackMetadataChange());
+ assertTrue(handler.shouldInitializeMetadataTrackingEagerly());
+ }
+
+ /**
+ * When the tracking log has a persisted entry: actively tracking (reads schema from log), and no
+ * longer needs eager initialization.
+ */
+ @Test
+ public void testTrackingState_seededLog_trackingActiveAndInitComplete(@TempDir File tempDir) {
+ DeltaSourceMetadataTrackingLog seededLog =
+ createStandaloneTrackingLog(tempDir, /* seedWithDefaultEntry= */ true);
+ MetadataEvolutionHandler handler =
+ buildLightweightHandler(
+ Option.apply(seededLog), schemaReadOptions(/* allowUnsafeColumnMappingRead= */ false));
+ assertTrue(handler.shouldTrackMetadataChange());
+ assertFalse(handler.shouldInitializeMetadataTrackingEagerly());
+ }
+
+ // ---------------------------------------------------------------------------
+ // stopIndexedFileIteratorAtSchemaChangeBarrier
+ //
+ // This method truncates the file action iterator at the METADATA_CHANGE_INDEX
+ // sentinel (inclusive) so a batch never crosses a schema change boundary.
+ // ---------------------------------------------------------------------------
+
+ /** All files pass through when no barrier sentinel is present. */
+ @Test
+ public void testStopAtBarrier_allFilesPassThroughWithoutBarrier() {
+ MetadataEvolutionHandler handler =
+ buildLightweightHandler(
+ Option.empty(), schemaReadOptions(/* allowUnsafeColumnMappingRead= */ true));
+
+ List inputFiles =
+ Arrays.asList(
+ IndexedFile.sentinel(1L, 0L),
+ IndexedFile.sentinel(1L, 1L),
+ IndexedFile.sentinel(1L, 2L));
+
+ List result =
+ drainIndexedFiles(
+ handler.stopIndexedFileIteratorAtSchemaChangeBarrier(
+ Utils.toCloseableIterator(inputFiles.iterator())));
+ assertEquals(3, result.size());
+ }
+
+ /** Files before the barrier + the barrier itself are included; files after are discarded. */
+ @Test
+ public void testStopAtBarrier_includesBarrierAndDiscardsRest() {
+ MetadataEvolutionHandler handler =
+ buildLightweightHandler(
+ Option.empty(), schemaReadOptions(/* allowUnsafeColumnMappingRead= */ true));
+
+ long barrierIndex = DeltaSourceOffset.METADATA_CHANGE_INDEX();
+ List inputFiles =
+ Arrays.asList(
+ IndexedFile.sentinel(1L, 0L),
+ IndexedFile.sentinel(1L, 1L),
+ IndexedFile.sentinel(1L, barrierIndex),
+ IndexedFile.sentinel(1L, 3L),
+ IndexedFile.sentinel(1L, 4L));
+
+ List result =
+ drainIndexedFiles(
+ handler.stopIndexedFileIteratorAtSchemaChangeBarrier(
+ Utils.toCloseableIterator(inputFiles.iterator())));
+ assertEquals(3, result.size());
+ assertEquals(0L, result.get(0).getIndex());
+ assertEquals(1L, result.get(1).getIndex());
+ assertEquals(barrierIndex, result.get(2).getIndex());
+ }
+
+ /** Empty input produces empty output. */
+ @Test
+ public void testStopAtBarrier_handlesEmptyIterator() {
+ MetadataEvolutionHandler handler =
+ buildLightweightHandler(
+ Option.empty(), schemaReadOptions(/* allowUnsafeColumnMappingRead= */ true));
+
+ List result =
+ drainIndexedFiles(
+ handler.stopIndexedFileIteratorAtSchemaChangeBarrier(
+ Utils.toCloseableIterator(Collections.emptyIterator())));
+ assertTrue(result.isEmpty());
+ }
+
+ // ---------------------------------------------------------------------------
+ // getMetadataOrProtocolChangeIndexedFileIterator
+ //
+ // Returns a single METADATA_CHANGE_INDEX sentinel if tracking is on AND the
+ // incoming metadata/protocol differs from the init state. Empty otherwise.
+ // ---------------------------------------------------------------------------
+
+ /** Returns empty even with a different protocol when tracking is disabled (unsafe read). */
+ @Test
+ public void testChangeIterator_emptyWhenTrackingDisabled() {
+ MetadataEvolutionHandler handler =
+ buildLightweightHandler(
+ Option.empty(), schemaReadOptions(/* allowUnsafeColumnMappingRead= */ true));
+
+ Protocol upgradedProtocol = new Protocol(3, 7);
+ List result =
+ drainIndexedFiles(
+ handler.getMetadataOrProtocolChangeIndexedFileIterator(
+ DEFAULT_METADATA, upgradedProtocol, 1L));
+ assertTrue(result.isEmpty());
+ }
+
+ /** Returns empty when tracking is on but metadata and protocol match the init state. */
+ @Test
+ public void testChangeIterator_emptyWhenMetadataAndProtocolUnchanged(@TempDir File tempDir) {
+ DeltaSourceMetadataTrackingLog seededLog =
+ createStandaloneTrackingLog(tempDir, /* seedWithDefaultEntry= */ true);
+ MetadataEvolutionHandler handler =
+ buildLightweightHandler(
+ Option.apply(seededLog), schemaReadOptions(/* allowUnsafeColumnMappingRead= */ false));
+
+ List result =
+ drainIndexedFiles(
+ handler.getMetadataOrProtocolChangeIndexedFileIterator(
+ DEFAULT_METADATA, DEFAULT_PROTOCOL, 1L));
+ assertTrue(result.isEmpty());
+ }
+
+ /** Returns a barrier sentinel when the protocol version has changed. */
+ @Test
+ public void testChangeIterator_returnsBarrierOnProtocolVersionChange(@TempDir File tempDir) {
+ DeltaSourceMetadataTrackingLog seededLog =
+ createStandaloneTrackingLog(tempDir, /* seedWithDefaultEntry= */ true);
+ MetadataEvolutionHandler handler =
+ buildLightweightHandler(
+ Option.apply(seededLog), schemaReadOptions(/* allowUnsafeColumnMappingRead= */ false));
+
+ Protocol upgradedProtocol = new Protocol(3, 7);
+ List result =
+ drainIndexedFiles(
+ handler.getMetadataOrProtocolChangeIndexedFileIterator(
+ DEFAULT_METADATA, upgradedProtocol, 5L));
+ assertEquals(1, result.size());
+ assertEquals(5L, result.get(0).getVersion());
+ assertEquals(DeltaSourceOffset.METADATA_CHANGE_INDEX(), result.get(0).getIndex());
+ assertFalse(result.get(0).hasFileAction());
+ }
+
+ /** Returns a barrier sentinel when a column was added to the schema. */
+ @Test
+ public void testChangeIterator_returnsBarrierOnColumnAdded(@TempDir File tempDir) {
+ DeltaSourceMetadataTrackingLog seededLog =
+ createStandaloneTrackingLog(tempDir, /* seedWithDefaultEntry= */ true);
+ MetadataEvolutionHandler handler =
+ buildLightweightHandler(
+ Option.apply(seededLog), schemaReadOptions(/* allowUnsafeColumnMappingRead= */ false));
+
+ // Add a c3 column to the default (c1, c2) schema
+ StructType schemaWithNewColumn =
+ new StructType()
+ .add("c1", IntegerType.INTEGER)
+ .add("c2", StringType.STRING)
+ .add("c3", IntegerType.INTEGER);
+ String schemaWithNewColumnJson =
+ "{\"type\":\"struct\",\"fields\":["
+ + "{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},"
+ + "{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},"
+ + "{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}";
+ Metadata metadataWithNewColumn =
+ new Metadata(
+ "test-id",
+ Optional.empty(),
+ Optional.empty(),
+ new Format("parquet", Collections.emptyMap()),
+ schemaWithNewColumnJson,
+ schemaWithNewColumn,
+ emptyArrayValue(),
+ Optional.empty(),
+ VectorUtils.stringStringMapValue(Collections.emptyMap()));
+
+ List result =
+ drainIndexedFiles(
+ handler.getMetadataOrProtocolChangeIndexedFileIterator(
+ metadataWithNewColumn, DEFAULT_PROTOCOL, 7L));
+ assertEquals(1, result.size());
+ assertEquals(7L, result.get(0).getVersion());
+ assertEquals(DeltaSourceOffset.METADATA_CHANGE_INDEX(), result.get(0).getIndex());
+ assertFalse(result.get(0).hasFileAction());
+ }
+
+ /** Returns a barrier sentinel when a delta table configuration was added. */
+ @Test
+ public void testChangeIterator_returnsBarrierOnDeltaConfigChange(@TempDir File tempDir) {
+ DeltaSourceMetadataTrackingLog seededLog =
+ createStandaloneTrackingLog(tempDir, /* seedWithDefaultEntry= */ true);
+ MetadataEvolutionHandler handler =
+ buildLightweightHandler(
+ Option.apply(seededLog), schemaReadOptions(/* allowUnsafeColumnMappingRead= */ false));
+
+ // Same schema as default, but with an added delta.* configuration
+ Map configWithCDF = new HashMap<>();
+ configWithCDF.put("delta.enableChangeDataFeed", "true");
+ Metadata metadataWithNewConfig =
+ new Metadata(
+ "test-id",
+ Optional.empty(),
+ Optional.empty(),
+ new Format("parquet", Collections.emptyMap()),
+ DEFAULT_SCHEMA_JSON,
+ DEFAULT_KERNEL_SCHEMA,
+ emptyArrayValue(),
+ Optional.empty(),
+ VectorUtils.stringStringMapValue(configWithCDF));
+
+ List result =
+ drainIndexedFiles(
+ handler.getMetadataOrProtocolChangeIndexedFileIterator(
+ metadataWithNewConfig, DEFAULT_PROTOCOL, 3L));
+ assertEquals(1, result.size());
+ assertEquals(3L, result.get(0).getVersion());
+ assertEquals(DeltaSourceOffset.METADATA_CHANGE_INDEX(), result.get(0).getIndex());
+ assertFalse(result.get(0).hasFileAction());
+ }
+
+ // ---------------------------------------------------------------------------
+ // getNextOffsetFromPreviousOffsetIfPendingSchemaChange
+ //
+ // Implements the two-barrier protocol:
+ // METADATA_CHANGE_INDEX -> advance to POST_METADATA_CHANGE_INDEX
+ // POST_METADATA_CHANGE_INDEX -> block (return same offset) if change persists,
+ // or empty if change was resolved
+ // any other index -> empty (no pending change)
+ // ---------------------------------------------------------------------------
+
+ /**
+ * At METADATA_CHANGE_INDEX, the handler advances to POST_METADATA_CHANGE_INDEX and preserves the
+ * reservoirVersion and isInitialSnapshot flag.
+ */
+ @Test
+ public void testPendingSchemaChange_advancesFromBarrierToPostBarrier() {
+ MetadataEvolutionHandler handler =
+ buildLightweightHandler(
+ Option.empty(), schemaReadOptions(/* allowUnsafeColumnMappingRead= */ true));
+
+ // isInitialSnapshot = false
+ DeltaSourceOffset barrierOffset =
+ DeltaSourceOffset.apply(
+ "test-reservoir", 5L, DeltaSourceOffset.METADATA_CHANGE_INDEX(), false);
+ Optional nextOffset =
+ handler.getNextOffsetFromPreviousOffsetIfPendingSchemaChange(barrierOffset);
+ assertTrue(nextOffset.isPresent());
+ assertEquals(DeltaSourceOffset.POST_METADATA_CHANGE_INDEX(), nextOffset.get().index());
+ assertEquals(5L, nextOffset.get().reservoirVersion());
+ assertFalse(nextOffset.get().isInitialSnapshot());
+
+ // isInitialSnapshot = true — flag must be preserved
+ DeltaSourceOffset initialSnapshotBarrier =
+ DeltaSourceOffset.apply(
+ "test-reservoir", 3L, DeltaSourceOffset.METADATA_CHANGE_INDEX(), true);
+ Optional nextFromSnapshot =
+ handler.getNextOffsetFromPreviousOffsetIfPendingSchemaChange(initialSnapshotBarrier);
+ assertTrue(nextFromSnapshot.isPresent());
+ assertEquals(DeltaSourceOffset.POST_METADATA_CHANGE_INDEX(), nextFromSnapshot.get().index());
+ assertTrue(nextFromSnapshot.get().isInitialSnapshot());
+ }
+
+ /** A regular (non-barrier) index means no pending schema change — returns empty. */
+ @Test
+ public void testPendingSchemaChange_emptyForNonBarrierIndex() {
+ MetadataEvolutionHandler handler =
+ buildLightweightHandler(
+ Option.empty(), schemaReadOptions(/* allowUnsafeColumnMappingRead= */ true));
+
+ DeltaSourceOffset regularOffset = DeltaSourceOffset.apply("test-reservoir", 5L, 10L, false);
+ assertFalse(
+ handler.getNextOffsetFromPreviousOffsetIfPendingSchemaChange(regularOffset).isPresent());
+ }
+
+ /**
+ * At POST_BARRIER, returns empty when metadata at that version matches init (unblocks stream).
+ */
+ @Test
+ public void testPendingSchemaChange_unblockWhenNoActualChange(@TempDir File tempDir) {
+ String tablePath = tempDir.getAbsolutePath();
+ String tableName = "t_" + UUID.randomUUID().toString().replace('-', '_');
+ createEmptyTestTable(tablePath, tableName);
+
+ HandlerWithLog handlerWithLog =
+ buildHandlerWithRealTable(tablePath, 0L, /* seedLogWithInitEntry= */ false);
+
+ DeltaSourceOffset postBarrierOffset =
+ DeltaSourceOffset.apply(
+ "test-reservoir", 0L, DeltaSourceOffset.POST_METADATA_CHANGE_INDEX(), false);
+ // Version 0 has the same metadata as init -> unblocks
+ assertFalse(
+ handlerWithLog
+ .handler
+ .getNextOffsetFromPreviousOffsetIfPendingSchemaChange(postBarrierOffset)
+ .isPresent());
+ }
+
+ /** At POST_BARRIER with a still-pending schema change, returns the same offset to block. */
+ @Test
+ public void testPendingSchemaChange_blocksAtPostBarrierWhenChangeStillPending(
+ @TempDir File tempDir) {
+ String tablePath = tempDir.getAbsolutePath();
+ String tableName = "t_" + UUID.randomUUID().toString().replace('-', '_');
+ createEmptyTestTable(tablePath, tableName);
+
+ // Handler captures version 0 as init, with a seed log entry at v0
+ HandlerWithLog handlerWithLog =
+ buildHandlerWithRealTable(tablePath, 0L, /* seedLogWithInitEntry= */ true);
+
+ // Evolve at v1 -> schema at v1 differs from init
+ spark.sql(String.format("ALTER TABLE %s ADD COLUMNS (c3 INT)", tableName));
+
+ DeltaSourceOffset postBarrierAtVersion1 =
+ DeltaSourceOffset.apply(
+ "test-reservoir", 1L, DeltaSourceOffset.POST_METADATA_CHANGE_INDEX(), false);
+ Optional result =
+ handlerWithLog.handler.getNextOffsetFromPreviousOffsetIfPendingSchemaChange(
+ postBarrierAtVersion1);
+ assertTrue(result.isPresent());
+ assertEquals(postBarrierAtVersion1.reservoirVersion(), result.get().reservoirVersion());
+ assertEquals(postBarrierAtVersion1.index(), result.get().index());
+ assertEquals(postBarrierAtVersion1.reservoirId(), result.get().reservoirId());
+ }
+
+ // ---------------------------------------------------------------------------
+ // updateMetadataTrackingLogAndFailTheStreamIfNeeded
+ //
+ // Called during commit(). Writes the new metadata to the tracking log and
+ // throws DELTA_STREAMING_METADATA_EVOLUTION to trigger stream re-analysis.
+ // ---------------------------------------------------------------------------
+
+ /** No-op when tracking is disabled or the offset is not a barrier index. */
+ @Test
+ public void testUpdateLog_noOpWhenTrackingDisabledOrNonBarrierIndex() {
+ MetadataEvolutionHandler handler =
+ buildLightweightHandler(
+ Option.empty(), schemaReadOptions(/* allowUnsafeColumnMappingRead= */ true));
+
+ // Barrier index but tracking is disabled -> no-op
+ DeltaSourceOffset barrierOffset =
+ DeltaSourceOffset.apply(
+ "test-reservoir", 0L, DeltaSourceOffset.METADATA_CHANGE_INDEX(), false);
+ handler.updateMetadataTrackingLogAndFailTheStreamIfNeeded(barrierOffset);
+
+ // Regular index -> no-op regardless of tracking state
+ DeltaSourceOffset regularOffset = DeltaSourceOffset.apply("test-reservoir", 0L, 5L, false);
+ handler.updateMetadataTrackingLogAndFailTheStreamIfNeeded(regularOffset);
+ }
+
+ /**
+ * Offset overload: when the barrier version has a schema change (via ALTER TABLE), reads the
+ * changed metadata from the log, writes it, and throws.
+ */
+ @Test
+ public void testUpdateLog_throwsWhenBarrierVersionHasSchemaChange(@TempDir File tempDir) {
+ String tablePath = tempDir.getAbsolutePath();
+ String tableName = "t_" + UUID.randomUUID().toString().replace('-', '_');
+ createEmptyTestTable(tablePath, tableName);
+
+ HandlerWithLog handlerWithLog =
+ buildHandlerWithRealTable(tablePath, 0L, /* seedLogWithInitEntry= */ true);
+
+ // Evolve the table schema: version 1 has a new column
+ spark.sql(String.format("ALTER TABLE %s ADD COLUMNS (c3 INT)", tableName));
+
+ DeltaSourceOffset barrierAtVersion1 =
+ DeltaSourceOffset.apply(
+ "test-reservoir", 1L, DeltaSourceOffset.METADATA_CHANGE_INDEX(), false);
+
+ DeltaRuntimeException ex =
+ assertThrows(
+ DeltaRuntimeException.class,
+ () ->
+ handlerWithLog.handler.updateMetadataTrackingLogAndFailTheStreamIfNeeded(
+ barrierAtVersion1));
+ assertMetadataEvolutionException(ex, "when schema changed at barrier version");
+
+ // Verify the new entry was persisted at version 1
+ assertTrue(handlerWithLog.trackingLog.getCurrentTrackedMetadata().isDefined());
+ assertEquals(
+ 1L, handlerWithLog.trackingLog.getCurrentTrackedMetadata().get().deltaCommitVersion());
+ }
+
+ /**
+ * Direct overload: passing a changed protocol throws and writes the new entry to the tracking log
+ * at the specified version.
+ */
+ @Test
+ public void testUpdateLog_throwsAndWritesEntryOnDirectProtocolChange(@TempDir File tempDir) {
+ DeltaSourceMetadataTrackingLog seededLog =
+ createStandaloneTrackingLog(tempDir, /* seedWithDefaultEntry= */ true);
+ MetadataEvolutionHandler handler =
+ buildLightweightHandler(
+ Option.apply(seededLog), schemaReadOptions(/* allowUnsafeColumnMappingRead= */ false));
+
+ Protocol upgradedProtocol = new Protocol(3, 7);
+ DeltaRuntimeException ex =
+ assertThrows(
+ DeltaRuntimeException.class,
+ () ->
+ handler.updateMetadataTrackingLogAndFailTheStreamIfNeeded(
+ null, upgradedProtocol, 5L, /* replace= */ false));
+ assertMetadataEvolutionException(ex, "when protocol changed");
+
+ // Verify the new entry was persisted at version 5
+ assertTrue(seededLog.getCurrentTrackedMetadata().isDefined());
+ assertEquals(5L, seededLog.getCurrentTrackedMetadata().get().deltaCommitVersion());
+ }
+
+ /** Direct overload: no-op when metadata and protocol match the init state. */
+ @Test
+ public void testUpdateLog_noOpOnUnchangedDirectMetadataAndProtocol(@TempDir File tempDir) {
+ DeltaSourceMetadataTrackingLog seededLog =
+ createStandaloneTrackingLog(tempDir, /* seedWithDefaultEntry= */ true);
+ MetadataEvolutionHandler handler =
+ buildLightweightHandler(
+ Option.apply(seededLog), schemaReadOptions(/* allowUnsafeColumnMappingRead= */ false));
+
+ handler.updateMetadataTrackingLogAndFailTheStreamIfNeeded(
+ DEFAULT_METADATA, DEFAULT_PROTOCOL, 1L, /* replace= */ false);
+ }
+
+ /**
+ * With replace=true, the new entry's previousMetadataSeqNum is set so getPreviousTrackedMetadata
+ * returns empty (logically replacing the seed).
+ */
+ @Test
+ public void testUpdateLog_replaceTrueClearsPreviousEntry(@TempDir File tempDir) {
+ DeltaSourceMetadataTrackingLog seededLog =
+ createStandaloneTrackingLog(tempDir, /* seedWithDefaultEntry= */ true);
+ MetadataEvolutionHandler handler =
+ buildLightweightHandler(
+ Option.apply(seededLog), schemaReadOptions(/* allowUnsafeColumnMappingRead= */ false));
+
+ Protocol upgradedProtocol = new Protocol(3, 7);
+ DeltaRuntimeException ex =
+ assertThrows(
+ DeltaRuntimeException.class,
+ () ->
+ handler.updateMetadataTrackingLogAndFailTheStreamIfNeeded(
+ null, upgradedProtocol, 5L, /* replace= */ true));
+ assertMetadataEvolutionException(ex, "when protocol changed with replace=true");
+
+ assertTrue(seededLog.getCurrentTrackedMetadata().isDefined());
+ assertEquals(5L, seededLog.getCurrentTrackedMetadata().get().deltaCommitVersion());
+ // The seed at v0 is logically replaced -> previous tracked is empty.
+ assertTrue(seededLog.getPreviousTrackedMetadata().isEmpty());
+ }
+
+ /** Throw-and-write also fires at POST_METADATA_CHANGE_INDEX, not just METADATA_CHANGE_INDEX. */
+ @Test
+ public void testUpdateLog_throwsAtPostBarrierIndex(@TempDir File tempDir) {
+ String tablePath = tempDir.getAbsolutePath();
+ String tableName = "t_" + UUID.randomUUID().toString().replace('-', '_');
+ createEmptyTestTable(tablePath, tableName);
+
+ HandlerWithLog handlerWithLog =
+ buildHandlerWithRealTable(tablePath, 0L, /* seedLogWithInitEntry= */ true);
+
+ spark.sql(String.format("ALTER TABLE %s ADD COLUMNS (c3 INT)", tableName));
+
+ DeltaSourceOffset postBarrierAtVersion1 =
+ DeltaSourceOffset.apply(
+ "test-reservoir", 1L, DeltaSourceOffset.POST_METADATA_CHANGE_INDEX(), false);
+
+ DeltaRuntimeException ex =
+ assertThrows(
+ DeltaRuntimeException.class,
+ () ->
+ handlerWithLog.handler.updateMetadataTrackingLogAndFailTheStreamIfNeeded(
+ postBarrierAtVersion1));
+ assertMetadataEvolutionException(ex, "when committing the post-barrier with schema change");
+
+ assertTrue(handlerWithLog.trackingLog.getCurrentTrackedMetadata().isDefined());
+ assertEquals(
+ 1L, handlerWithLog.trackingLog.getCurrentTrackedMetadata().get().deltaCommitVersion());
+ }
+
+ // ---------------------------------------------------------------------------
+ // initializeMetadataTrackingAndExitStream
+ //
+ // Called on the first batch. Writes the initial metadata entry to the tracking
+ // log. Throws DELTA_STREAMING_METADATA_EVOLUTION if the metadata at the batch
+ // version differs from init, or if alwaysFailUponLogInitialized is true.
+ // ---------------------------------------------------------------------------
+
+ /**
+ * When metadata at the batch version matches init and alwaysFail is false, the entry is written
+ * without throwing.
+ */
+ @Test
+ public void testInitialize_writesEntryWithoutThrowingWhenMetadataMatches(@TempDir File tempDir) {
+ String tablePath = tempDir.getAbsolutePath();
+ String tableName = "t_" + UUID.randomUUID().toString().replace('-', '_');
+ createEmptyTestTable(tablePath, tableName);
+
+ HandlerWithLog handlerWithLog =
+ buildHandlerWithRealTable(tablePath, 0L, /* seedLogWithInitEntry= */ false);
+
+ handlerWithLog.handler.initializeMetadataTrackingAndExitStream(
+ 0L, /* batchEndVersion= */ null, /* alwaysFailUponLogInitialized= */ false);
+
+ assertTrue(handlerWithLog.trackingLog.getCurrentTrackedMetadata().isDefined());
+ assertEquals(
+ 0L, handlerWithLog.trackingLog.getCurrentTrackedMetadata().get().deltaCommitVersion());
+ }
+
+ /**
+ * When alwaysFailUponLogInitialized is true, throws even if metadata matches; entry is still
+ * written before throwing.
+ */
+ @Test
+ public void testInitialize_alwaysThrowsWhenAlwaysFailFlagIsSet(@TempDir File tempDir) {
+ String tablePath = tempDir.getAbsolutePath();
+ String tableName = "t_" + UUID.randomUUID().toString().replace('-', '_');
+ createEmptyTestTable(tablePath, tableName);
+
+ HandlerWithLog handlerWithLog =
+ buildHandlerWithRealTable(tablePath, 0L, /* seedLogWithInitEntry= */ false);
+
+ DeltaRuntimeException ex =
+ assertThrows(
+ DeltaRuntimeException.class,
+ () ->
+ handlerWithLog.handler.initializeMetadataTrackingAndExitStream(
+ 0L, /* batchEndVersion= */ null, /* alwaysFailUponLogInitialized= */ true));
+ assertMetadataEvolutionException(ex, "when alwaysFailUponLogInitialized is true");
+
+ // Entry should still have been written before throwing
+ assertTrue(handlerWithLog.trackingLog.getCurrentTrackedMetadata().isDefined());
+ }
+
+ /**
+ * When the table schema was evolved (ALTER TABLE ADD COLUMNS) between init and the batch version,
+ * the handler writes the new metadata and throws.
+ */
+ @Test
+ public void testInitialize_throwsWhenSchemaEvolvedSinceInit(@TempDir File tempDir) {
+ String tablePath = tempDir.getAbsolutePath();
+ String tableName = "t_" + UUID.randomUUID().toString().replace('-', '_');
+ createEmptyTestTable(tablePath, tableName);
+
+ // Handler captures version 0 as init state
+ HandlerWithLog handlerWithLog =
+ buildHandlerWithRealTable(tablePath, 0L, /* seedLogWithInitEntry= */ false);
+
+ // Evolve: version 1 has a new column
+ spark.sql(String.format("ALTER TABLE %s ADD COLUMNS (c3 INT)", tableName));
+
+ DeltaRuntimeException ex =
+ assertThrows(
+ DeltaRuntimeException.class,
+ () ->
+ handlerWithLog.handler.initializeMetadataTrackingAndExitStream(
+ 1L, /* batchEndVersion= */ null, /* alwaysFailUponLogInitialized= */ false));
+ assertMetadataEvolutionException(ex, "when schema evolved since init");
+
+ // Entry should be written at version 1 (the evolved version)
+ assertTrue(handlerWithLog.trackingLog.getCurrentTrackedMetadata().isDefined());
+ assertEquals(
+ 1L, handlerWithLog.trackingLog.getCurrentTrackedMetadata().get().deltaCommitVersion());
+ }
+
+ /**
+ * batchEndVersion overload: when there are no metadata or protocol changes in [start, end],
+ * validation succeeds and the entry is written at endVersion.
+ */
+ @Test
+ public void testInitialize_batchEndVersion_succeedsWhenNoMetadataChangeInRange(
+ @TempDir File tempDir) {
+ String tablePath = tempDir.getAbsolutePath();
+ String tableName = "t_" + UUID.randomUUID().toString().replace('-', '_');
+ createEmptyTestTable(tablePath, tableName);
+ // INSERT adds an Add action at v1; no metadata or protocol change in [0, 1]
+ spark.sql(String.format("INSERT INTO %s VALUES (1, 'Alice')", tableName));
+
+ HandlerWithLog handlerWithLog =
+ buildHandlerWithRealTable(tablePath, 0L, /* seedLogWithInitEntry= */ false);
+
+ handlerWithLog.handler.initializeMetadataTrackingAndExitStream(
+ 0L, /* batchEndVersion= */ 1L, /* alwaysFailUponLogInitialized= */ false);
+
+ assertTrue(handlerWithLog.trackingLog.getCurrentTrackedMetadata().isDefined());
+ assertEquals(
+ 1L, handlerWithLog.trackingLog.getCurrentTrackedMetadata().get().deltaCommitVersion());
+ }
+
+ /**
+ * batchEndVersion overload: an incompatible schema change (column drop with column mapping)
+ * within [start, end] makes validation throw
+ * DELTA_STREAMING_SCHEMA_LOG_INIT_FAILED_INCOMPATIBLE_METADATA — there's no schema we can use to
+ * safely read the constructed batch.
+ */
+ @Test
+ public void testInitialize_batchEndVersion_throwsOnIncompatibleSchemaChangeInRange(
+ @TempDir File tempDir) {
+ String tablePath = tempDir.getAbsolutePath();
+ String tableName = "t_" + UUID.randomUUID().toString().replace('-', '_');
+ // Column mapping enables logical column drops (detected as incompatible by validation)
+ spark.sql(
+ String.format(
+ "CREATE TABLE %s (id INT, name STRING, age INT) USING delta LOCATION '%s' "
+ + "TBLPROPERTIES ('delta.columnMapping.mode' = 'name')",
+ tableName, tablePath));
+ spark.sql(String.format("ALTER TABLE %s DROP COLUMN age", tableName));
+
+ HandlerWithLog handlerWithLog =
+ buildHandlerWithRealTable(tablePath, 0L, /* seedLogWithInitEntry= */ false);
+
+ DeltaRuntimeException ex =
+ assertThrows(
+ DeltaRuntimeException.class,
+ () ->
+ handlerWithLog.handler.initializeMetadataTrackingAndExitStream(
+ 0L, /* batchEndVersion= */ 1L, /* alwaysFailUponLogInitialized= */ false));
+ assertEquals(
+ "DELTA_STREAMING_SCHEMA_LOG_INIT_FAILED_INCOMPATIBLE_METADATA",
+ ex.getErrorClass(),
+ "Should throw incompatible-metadata error when range contains a column drop");
+ }
+}
diff --git a/spark/v2/src/test/java/io/delta/spark/internal/v2/utils/StreamingHelperTest.java b/spark/v2/src/test/java/io/delta/spark/internal/v2/utils/StreamingHelperTest.java
index ec2373ac517..19aa5107d30 100644
--- a/spark/v2/src/test/java/io/delta/spark/internal/v2/utils/StreamingHelperTest.java
+++ b/spark/v2/src/test/java/io/delta/spark/internal/v2/utils/StreamingHelperTest.java
@@ -22,11 +22,19 @@
import io.delta.kernel.Snapshot;
import io.delta.kernel.internal.DeltaHistoryManager;
+import io.delta.kernel.internal.actions.Metadata;
+import io.delta.kernel.internal.actions.Protocol;
+import io.delta.kernel.types.IntegerType;
+import io.delta.kernel.types.StringType;
+import io.delta.kernel.types.StructType;
import io.delta.spark.internal.v2.DeltaV2TestBase;
import io.delta.spark.internal.v2.exception.VersionNotFoundException;
import io.delta.spark.internal.v2.snapshot.PathBasedSnapshotManager;
import java.io.File;
import java.sql.Timestamp;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
import java.util.stream.Stream;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.delta.DeltaLog;
@@ -371,4 +379,148 @@ public void testCheckVersionExists(
.checkVersionExists(versionToCheck, Option.empty(), mustBeRecreatable, allowOutOfRange);
}
}
+
+ // ---------------------------------------------------------------------------
+ // collectMetadataActionsFromRangeUnsafe
+ //
+ // Fixture for parameterized cases below: v0 CREATE, v1 INSERT, v2 ALTER ADD COLUMNS.
+ // Versions with a Metadata action: {v0, v2}. v1 has none.
+ // ---------------------------------------------------------------------------
+
+ private void setupTableWithMetadataChangeAtV2(String testTablePath, String testTableName) {
+ createEmptyTestTable(testTablePath, testTableName); // v0
+ spark.sql(String.format("INSERT INTO %s VALUES (1, 'Alice')", testTableName)); // v1
+ spark.sql(String.format("ALTER TABLE %s ADD COLUMNS (c3 INT)", testTableName)); // v2
+ }
+
+ private static Stream collectMetadataTestCases() {
+ return Stream.of(
+ // scenario, startVersion, endVersionOpt, expectedVersions
+ Arguments.of("fullRange_endInclusive", 0L, Optional.of(2L), Set.of(0L, 2L)),
+ Arguments.of("nonTrivialStartExcludesV0", 1L, Optional.of(2L), Set.of(2L)),
+ Arguments.of("emptyEndReachesLatest", 0L, Optional.empty(), Set.of(0L, 2L)),
+ Arguments.of("rangeWithoutMetadataChange", 1L, Optional.of(1L), Set.of()),
+ Arguments.of("endBeforeChange", 0L, Optional.of(1L), Set.of(0L)),
+ Arguments.of("emptyEndWithNonTrivialStart", 1L, Optional.empty(), Set.of(2L)),
+ Arguments.of("singleVersionAtChange", 2L, Optional.of(2L), Set.of(2L)));
+ }
+
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("collectMetadataTestCases")
+ public void testCollectMetadataActionsFromRangeUnsafe(
+ String scenario,
+ long startVersion,
+ Optional endVersionOpt,
+ Set expectedVersions,
+ @TempDir File tempDir) {
+ String testTablePath = tempDir.getAbsolutePath();
+ String testTableName = "test_collect_metadata_" + scenario;
+ setupTableWithMetadataChangeAtV2(testTablePath, testTableName);
+ snapshotManager =
+ new PathBasedSnapshotManager(testTablePath, spark.sessionState().newHadoopConf());
+
+ Map result =
+ StreamingHelper.collectMetadataActionsFromRangeUnsafe(
+ startVersion, endVersionOpt, snapshotManager, defaultEngine, testTablePath);
+
+ assertEquals(expectedVersions, result.keySet());
+ }
+
+ /** Verifies that the version → Metadata mapping returns the correct Metadata content. */
+ @Test
+ public void testCollectMetadataActionsFromRangeUnsafe_returnsCorrectMetadataPerVersion(
+ @TempDir File tempDir) {
+ String testTablePath = tempDir.getAbsolutePath();
+ String testTableName = "test_collect_metadata_content";
+ setupTableWithMetadataChangeAtV2(testTablePath, testTableName);
+ snapshotManager =
+ new PathBasedSnapshotManager(testTablePath, spark.sessionState().newHadoopConf());
+
+ Map result =
+ StreamingHelper.collectMetadataActionsFromRangeUnsafe(
+ 0L, Optional.of(2L), snapshotManager, defaultEngine, testTablePath);
+
+ StructType expectedV0Schema =
+ new StructType().add("id", IntegerType.INTEGER).add("name", StringType.STRING);
+ StructType expectedV2Schema =
+ new StructType()
+ .add("id", IntegerType.INTEGER)
+ .add("name", StringType.STRING)
+ .add("c3", IntegerType.INTEGER);
+ assertEquals(expectedV0Schema, result.get(0L).getSchema());
+ assertEquals(expectedV2Schema, result.get(2L).getSchema());
+ }
+
+ // ---------------------------------------------------------------------------
+ // collectProtocolActionsFromRangeUnsafe
+ //
+ // Fixture for parameterized cases below: v0 CREATE, v1 INSERT, v2 SET TBLPROPERTIES (upgrade).
+ // Versions with a Protocol action: {v0, v2}. v1 has none.
+ // ---------------------------------------------------------------------------
+
+ private void setupTableWithProtocolUpgradeAtV2(String testTablePath, String testTableName) {
+ createEmptyTestTable(testTablePath, testTableName); // v0
+ spark.sql(String.format("INSERT INTO %s VALUES (1, 'Alice')", testTableName)); // v1
+ // Adding a table feature forces a real protocol upgrade; setting only minReader/minWriter to
+ // (3, 7) is normalized back to (1, 2) by Protocol.merge when no features are introduced and
+ // produces no Protocol action.
+ spark.sql(
+ String.format(
+ "ALTER TABLE %s SET TBLPROPERTIES " + "('delta.feature.deletionVectors' = 'supported')",
+ testTableName)); // v2
+ }
+
+ private static Stream collectProtocolTestCases() {
+ return Stream.of(
+ // scenario, startVersion, endVersionOpt, expectedVersions
+ Arguments.of("fullRange_endInclusive", 0L, Optional.of(2L), Set.of(0L, 2L)),
+ Arguments.of("nonTrivialStartExcludesV0", 1L, Optional.of(2L), Set.of(2L)),
+ Arguments.of("emptyEndReachesLatest", 0L, Optional.empty(), Set.of(0L, 2L)),
+ Arguments.of("rangeWithoutProtocolChange", 1L, Optional.of(1L), Set.of()),
+ Arguments.of("endBeforeChange", 0L, Optional.of(1L), Set.of(0L)),
+ Arguments.of("emptyEndWithNonTrivialStart", 1L, Optional.empty(), Set.of(2L)),
+ Arguments.of("singleVersionAtChange", 2L, Optional.of(2L), Set.of(2L)));
+ }
+
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("collectProtocolTestCases")
+ public void testCollectProtocolActionsFromRangeUnsafe(
+ String scenario,
+ long startVersion,
+ Optional endVersionOpt,
+ Set expectedVersions,
+ @TempDir File tempDir) {
+ String testTablePath = tempDir.getAbsolutePath();
+ String testTableName = "test_collect_protocol_" + scenario;
+ setupTableWithProtocolUpgradeAtV2(testTablePath, testTableName);
+ snapshotManager =
+ new PathBasedSnapshotManager(testTablePath, spark.sessionState().newHadoopConf());
+
+ Map result =
+ StreamingHelper.collectProtocolActionsFromRangeUnsafe(
+ startVersion, endVersionOpt, snapshotManager, defaultEngine, testTablePath);
+
+ assertEquals(expectedVersions, result.keySet());
+ }
+
+ /** Verifies that the version → Protocol mapping returns the correct Protocol content. */
+ @Test
+ public void testCollectProtocolActionsFromRangeUnsafe_returnsCorrectProtocolPerVersion(
+ @TempDir File tempDir) {
+ String testTablePath = tempDir.getAbsolutePath();
+ String testTableName = "test_collect_protocol_content";
+ setupTableWithProtocolUpgradeAtV2(testTablePath, testTableName);
+ snapshotManager =
+ new PathBasedSnapshotManager(testTablePath, spark.sessionState().newHadoopConf());
+
+ Map result =
+ StreamingHelper.collectProtocolActionsFromRangeUnsafe(
+ 0L, Optional.of(2L), snapshotManager, defaultEngine, testTablePath);
+
+ // v0 default: reader=1, writer=2; v2 upgraded: reader=3, writer=7
+ assertEquals(1, result.get(0L).getMinReaderVersion());
+ assertEquals(2, result.get(0L).getMinWriterVersion());
+ assertEquals(3, result.get(2L).getMinReaderVersion());
+ assertEquals(7, result.get(2L).getMinWriterVersion());
+ }
}