Skip to content

[kernel-spark] Implement MetadataEvolutionHandler in v2#6563

Merged
murali-db merged 1 commit into
delta-io:masterfrom
PorridgeSwim:stack/MetadataEvolutionHandler2
May 11, 2026
Merged

[kernel-spark] Implement MetadataEvolutionHandler in v2#6563
murali-db merged 1 commit into
delta-io:masterfrom
PorridgeSwim:stack/MetadataEvolutionHandler2

Conversation

@PorridgeSwim
Copy link
Copy Markdown
Collaborator

@PorridgeSwim PorridgeSwim commented Apr 14, 2026

🥞 Stacked PR

Use this link to review incremental changes.


Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

PR 4/7 in the non-additive schema evolution for V2 streaming connector stack.

Introduce MetadataEvolutionHandler, a Java class that implements the V1 barrier protocol for schema evolution in the V2 connector. In V1 this logic lives in DeltaSourceMetadataEvolutionSupport, a Scala trait mixed into DeltaSource that accesses stream state via this. Since V2's SparkMicroBatchStream is Java and cannot use Scala trait mixins, MetadataEvolutionHandler receives all dependencies via constructor injection instead.

The handler covers the full schema evolution lifecycle:

  • Stream start: eager metadata tracking log initialization on first batch
  • Offset generation: injects METADATA_CHANGE_INDEX / POST_METADATA_CHANGE_INDEX barrier sentinels into the file change iterator
  • Pending schema offsets: returns barrier offsets for in-progress schema changes
  • Batch commit: updates the schema log and throws DELTA_STREAMING_METADATA_EVOLUTION to trigger stream restart
  • Batch planning on restart: validates and re-initializes the schema log

All detection logic delegates to the shared DeltaSourceMetadataEvolutionSupport$ companion object statics (refactored in PR 3/7). V2-specific orchestration is limited to wiring the barrier protocol into the CloseableIterator<IndexedFile> pipeline and collecting metadata/protocol from Kernel commit ranges via StreamingHelper.

Also extends StreamingHelper with getMetadataAndProtocolForVersionRange to collect metadata and protocol actions from a range of Kernel commits.

How was this patch tested?

Unit tests in MetadataEvolutionHandlerTest.java covering: barrier protocol (METADATA_CHANGE_INDEX / POST_METADATA_CHANGE_INDEX offset generation), tracking state transitions, initialization lifecycle, offset arithmetic, pending schema change handling, and commit-time evolution exception.

Does this PR introduce any user-facing changes?

No.

@PorridgeSwim
Copy link
Copy Markdown
Collaborator Author

Range-diff: stack/RefactorDeltaSourceMetadataEvolutionSupport (05a2c34 -> 21a7fb0)
spark/v2/src/main/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandler.java
@@ -47,6 +47,8 @@
 +import org.apache.spark.sql.delta.sources.DeltaSourceOffset;
 +import org.apache.spark.sql.delta.sources.DeltaStreamUtils;
 +import org.apache.spark.sql.delta.sources.PersistedMetadata;
++import org.apache.spark.sql.delta.v2.interop.AbstractMetadata;
++import org.apache.spark.sql.delta.v2.interop.AbstractProtocol;
 +import org.apache.spark.sql.types.StructType;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
@@ -340,9 +342,9 @@
 +  /** Delegates to the shared static method in {@code DeltaSourceMetadataEvolutionSupport}. */
 +  private boolean hasMetadataOrProtocolChangeComparedToStreamMetadata(
 +      Metadata newMetadata, Protocol newProtocol, long newSchemaVersion) {
-+    Option<SparkMetadataAdapter> metadataOpt =
++    Option<? extends AbstractMetadata> metadataOpt =
 +        newMetadata != null ? Option.apply(new SparkMetadataAdapter(newMetadata)) : Option.empty();
-+    Option<SparkProtocolAdapter> protocolOpt =
++    Option<? extends AbstractProtocol> protocolOpt =
 +        newProtocol != null ? Option.apply(new SparkProtocolAdapter(newProtocol)) : Option.empty();
 +    Option<PersistedMetadata> persistedOpt =
 +        persistedMetadataAtSourceInit != null

Reproduce locally: git range-diff 2490e84..05a2c34 2490e84..21a7fb0 | Disable: git config gitstack.push-range-diff false

@PorridgeSwim PorridgeSwim force-pushed the stack/MetadataEvolutionHandler2 branch from 21a7fb0 to b69031a Compare April 14, 2026 18:40
@PorridgeSwim
Copy link
Copy Markdown
Collaborator Author

Range-diff: stack/RefactorDeltaSourceMetadataEvolutionSupport (21a7fb0 -> b69031a)
spark/v2/src/main/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandler.java
@@ -342,10 +342,14 @@
 +  /** Delegates to the shared static method in {@code DeltaSourceMetadataEvolutionSupport}. */
 +  private boolean hasMetadataOrProtocolChangeComparedToStreamMetadata(
 +      Metadata newMetadata, Protocol newProtocol, long newSchemaVersion) {
-+    Option<? extends AbstractMetadata> metadataOpt =
-+        newMetadata != null ? Option.apply(new SparkMetadataAdapter(newMetadata)) : Option.empty();
-+    Option<? extends AbstractProtocol> protocolOpt =
-+        newProtocol != null ? Option.apply(new SparkProtocolAdapter(newProtocol)) : Option.empty();
++    Option<AbstractMetadata> metadataOpt =
++        newMetadata != null
++            ? Option.apply((AbstractMetadata) new SparkMetadataAdapter(newMetadata))
++            : Option.empty();
++    Option<AbstractProtocol> protocolOpt =
++        newProtocol != null
++            ? Option.apply((AbstractProtocol) new SparkProtocolAdapter(newProtocol))
++            : Option.empty();
 +    Option<PersistedMetadata> persistedOpt =
 +        persistedMetadataAtSourceInit != null
 +            ? Option.apply(persistedMetadataAtSourceInit)

Reproduce locally: git range-diff 2490e84..21a7fb0 2490e84..b69031a | Disable: git config gitstack.push-range-diff false

@PorridgeSwim PorridgeSwim force-pushed the stack/MetadataEvolutionHandler2 branch from b69031a to 82d0cb9 Compare April 14, 2026 20:46
@PorridgeSwim
Copy link
Copy Markdown
Collaborator Author

Range-diff: stack/RefactorDeltaSourceMetadataEvolutionSupport (b69031a -> 82d0cb9)
spark/v2/src/test/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandlerTest.java
@@ -0,0 +1,922 @@
+diff --git a/spark/v2/src/test/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandlerTest.java b/spark/v2/src/test/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandlerTest.java
+new file mode 100644
+--- /dev/null
++++ b/spark/v2/src/test/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandlerTest.java
++/*
++ * Copyright (2026) The Delta Lake Project Authors.
++ *
++ * Licensed under the Apache License, Version 2.0 (the "License");
++ * you may not use this file except in compliance with the License.
++ * You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package io.delta.spark.internal.v2.read;
++
++import static org.junit.jupiter.api.Assertions.*;
++
++import io.delta.kernel.CommitRange;
++import io.delta.kernel.Snapshot;
++import io.delta.kernel.engine.Engine;
++import io.delta.kernel.internal.DeltaHistoryManager;
++import io.delta.kernel.internal.actions.Format;
++import io.delta.kernel.internal.actions.Metadata;
++import io.delta.kernel.internal.actions.Protocol;
++import io.delta.kernel.internal.util.InternalUtils;
++import io.delta.kernel.internal.util.Utils;
++import io.delta.kernel.internal.util.VectorUtils;
++import io.delta.kernel.data.ArrayValue;
++import io.delta.kernel.data.ColumnVector;
++import io.delta.kernel.types.IntegerType;
++import io.delta.kernel.types.StringType;
++import io.delta.kernel.types.StructType;
++import io.delta.kernel.utils.CloseableIterator;
++import io.delta.spark.internal.v2.DeltaV2TestBase;
++import io.delta.spark.internal.v2.adapters.SparkMetadataAdapter;
++import io.delta.spark.internal.v2.adapters.SparkProtocolAdapter;
++import io.delta.spark.internal.v2.exception.VersionNotFoundException;
++import io.delta.spark.internal.v2.snapshot.DeltaSnapshotManager;
++import io.delta.spark.internal.v2.snapshot.PathBasedSnapshotManager;
++import java.io.File;
++import java.util.*;
++import org.apache.spark.sql.delta.DeltaOptions;
++import org.apache.spark.sql.delta.sources.DeltaSourceMetadataTrackingLog;
++import org.apache.spark.sql.delta.sources.DeltaSourceOffset;
++import org.apache.spark.sql.delta.sources.DeltaStreamUtils;
++import org.apache.spark.sql.delta.sources.PersistedMetadata;
++import org.junit.jupiter.api.Test;
++import org.junit.jupiter.api.io.TempDir;
++import scala.Option;
++
++/** Unit tests for {@link MetadataEvolutionHandler}. */
++public class MetadataEvolutionHandlerTest extends DeltaV2TestBase {
++
++  // ---------------------------------------------------------------------------
++  // Shared constants
++  // ---------------------------------------------------------------------------
++
++  private static final StructType SIMPLE_KERNEL_SCHEMA =
++      new StructType().add("c1", IntegerType.INTEGER).add("c2", StringType.STRING);
++
++  private static final String SIMPLE_SCHEMA_JSON =
++      "{\"type\":\"struct\",\"fields\":["
++          + "{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},"
++          + "{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}";
++
++  private static final org.apache.spark.sql.types.StructType SIMPLE_SPARK_SCHEMA =
++      new org.apache.spark.sql.types.StructType().add("c1", "int").add("c2", "string");
++
++  private static final Metadata DEFAULT_METADATA =
++      new Metadata(
++          "test-id",
++          Optional.empty(),
++          Optional.empty(),
++          new Format("parquet", Collections.emptyMap()),
++          SIMPLE_SCHEMA_JSON,
++          SIMPLE_KERNEL_SCHEMA,
++          emptyArrayValue(),
++          Optional.empty(),
++          VectorUtils.stringStringMapValue(Collections.emptyMap()));
++
++  private static final Protocol DEFAULT_PROTOCOL = new Protocol(1, 2);
++
++  private static ArrayValue emptyArrayValue() {
++    return new ArrayValue() {
++      @Override
++      public int getSize() {
++        return 0;
++      }
++
++      @Override
++      public ColumnVector getElements() {
++        return InternalUtils.singletonStringColumnVector("");
++      }
++    };
++  }
++
++  /** A no-op snapshot manager for tests that never hit the delta log. */
++  private static final DeltaSnapshotManager DUMMY_SNAPSHOT_MANAGER =
++      new DeltaSnapshotManager() {
++        @Override
++        public Snapshot loadLatestSnapshot() {
++          throw new UnsupportedOperationException("not expected in this test");
++        }
++
++        @Override
++        public Snapshot loadSnapshotAt(long version) {
++          throw new UnsupportedOperationException("not expected in this test");
++        }
++
++        @Override
++        public DeltaHistoryManager.Commit getActiveCommitAtTime(
++            long ts, boolean last, boolean recreatable, boolean earliest) {
++          throw new UnsupportedOperationException("not expected in this test");
++        }
++
++        @Override
++        public void checkVersionExists(long version, boolean recreatable, boolean allowOOR)
++            throws VersionNotFoundException {
++          throw new UnsupportedOperationException("not expected in this test");
++        }
++
++        @Override
++        public CommitRange getTableChanges(
++            Engine engine, long startVersion, Optional<Long> endVersion) {
++          throw new UnsupportedOperationException("not expected in this test");
++        }
++      };
++
++  // ---------------------------------------------------------------------------
++  // Builder helpers
++  // ---------------------------------------------------------------------------
++
++  private static DeltaStreamUtils.SchemaReadOptions schemaReadOptions(boolean allowUnsafe) {
++    return new DeltaStreamUtils.SchemaReadOptions(
++        /* allowUnsafeStreamingReadOnColumnMappingSchemaChanges= */ allowUnsafe,
++        /* allowUnsafeStreamingReadOnPartitionColumnChanges= */ false,
++        /* forceEnableStreamingReadOnReadIncompatibleSchemaChangesDuringStreamStart= */ false,
++        /* forceEnableUnsafeReadOnNullabilityChange= */ false,
++        /* isStreamingFromColumnMappingTable= */ false,
++        /* typeWideningEnabled= */ false,
++        /* enableSchemaTrackingForTypeWidening= */ false);
++  }
++
++  /**
++   * Build a handler with the given tracking log and schema read options. Uses a dummy snapshot
++   * manager — only suitable for tests that don't hit private helpers that read the delta log.
++   */
++  private MetadataEvolutionHandler buildHandler(
++      Option<DeltaSourceMetadataTrackingLog> trackingLog,
++      DeltaStreamUtils.SchemaReadOptions readOptions) {
++    return buildHandler(trackingLog, readOptions, DEFAULT_METADATA, DEFAULT_PROTOCOL);
++  }
++
++  private MetadataEvolutionHandler buildHandler(
++      Option<DeltaSourceMetadataTrackingLog> trackingLog,
++      DeltaStreamUtils.SchemaReadOptions readOptions,
++      Metadata initMetadata,
++      Protocol initProtocol) {
++    DeltaOptions deltaOptions =
++        new DeltaOptions(
++            scala.collection.immutable.Map$.MODULE$.<String, String>empty(),
++            spark.sessionState().conf());
++    SparkMetadataAdapter metadataAdapter = new SparkMetadataAdapter(initMetadata);
++    return new MetadataEvolutionHandler(
++        spark,
++        "test-table-id",
++        "/tmp/fake-table",
++        DUMMY_SNAPSHOT_MANAGER,
++        defaultEngine,
++        deltaOptions,
++        readOptions,
++        trackingLog,
++        initMetadata,
++        metadataAdapter.schema(),
++        metadataAdapter.partitionSchema(),
++        scala.jdk.javaapi.CollectionConverters.asJava(metadataAdapter.configuration()),
++        initProtocol,
++        "/tmp/fake-table/_delta_log/_streaming_metadata");
++  }
++
++  /**
++   * Build a handler backed by a real delta table so private helpers (collectMetadataAtVersion,
++   * etc.) work.
++   */
++  private MetadataEvolutionHandler buildHandlerWithRealTable(
++      File tempDir,
++      Option<DeltaSourceMetadataTrackingLog> trackingLog,
++      DeltaStreamUtils.SchemaReadOptions readOptions) {
++    String tablePath = tempDir.getAbsolutePath();
++    String tableName = "t_" + UUID.randomUUID().toString().replace('-', '_');
++    createEmptyTestTable(tablePath, tableName);
++    PathBasedSnapshotManager sm =
++        new PathBasedSnapshotManager(tablePath, spark.sessionState().newHadoopConf());
++
++    DeltaOptions deltaOptions =
++        new DeltaOptions(
++            scala.collection.immutable.Map$.MODULE$.<String, String>empty(),
++            spark.sessionState().conf());
++    return new MetadataEvolutionHandler(
++        spark,
++        "test-table-id",
++        tablePath,
++        sm,
++        defaultEngine,
++        deltaOptions,
++        readOptions,
++        trackingLog,
++        DEFAULT_METADATA,
++        SIMPLE_SPARK_SCHEMA,
++        new org.apache.spark.sql.types.StructType(),
++        Collections.emptyMap(),
++        DEFAULT_PROTOCOL,
++        tablePath + "/_delta_log/_streaming_metadata");
++  }
++
++  /** Create a tracking log on disk and optionally seed it with an initial entry. */
++  private DeltaSourceMetadataTrackingLog createTrackingLog(
++      File tempDir, boolean writeInitialEntry) {
++    String metadataPath = tempDir.getAbsolutePath() + "/metadata_log";
++    scala.collection.immutable.Map<String, String> emptyScalaMap =
++        scala.collection.immutable.Map$.MODULE$.empty();
++    DeltaSourceMetadataTrackingLog log =
++        DeltaSourceMetadataTrackingLog.create(
++            spark,
++            metadataPath,
++            "test-table-id",
++            "/tmp/fake-table",
++            emptyScalaMap,
++            Option.empty(),
++            false,
++            scala.Function1$.MODULE$.<PersistedMetadata, Option<PersistedMetadata>>const$(
++                Option.empty()),
++            true);
++    if (writeInitialEntry) {
++      PersistedMetadata entry =
++          PersistedMetadata.apply(
++              "test-table-id",
++              0L,
++              new SparkMetadataAdapter(DEFAULT_METADATA),
++              new SparkProtocolAdapter(DEFAULT_PROTOCOL),
++              "/tmp/fake-table/_delta_log/_streaming_metadata");
++      log.writeNewMetadata(entry, false);
++    }
++    return log;
++  }
++
++  private static List<IndexedFile> drain(CloseableIterator<IndexedFile> iter) {
++    List<IndexedFile> result = new ArrayList<>();
++    try {
++      while (iter.hasNext()) {
++        result.add(iter.next());
++      }
++    } finally {
++      try {
++        iter.close();
++      } catch (Exception e) {
++        throw new RuntimeException(e);
++      }
++    }
++    return result;
++  }
++
++  // ---------------------------------------------------------------------------
++  // shouldTrackMetadataChange
++  // ---------------------------------------------------------------------------
++
++  @Test
++  public void testShouldTrackMetadataChange_falseWhenUnsafeReadAllowed() {
++    MetadataEvolutionHandler handler =
++        buildHandler(Option.empty(), schemaReadOptions(/* allowUnsafe= */ true));
++    assertFalse(handler.shouldTrackMetadataChange());
++  }
++
++  @Test
++  public void testShouldTrackMetadataChange_falseWhenNoTrackingLog() {
++    MetadataEvolutionHandler handler =
++        buildHandler(Option.empty(), schemaReadOptions(/* allowUnsafe= */ false));
++    assertFalse(handler.shouldTrackMetadataChange());
++  }
++
++  @Test
++  public void testShouldTrackMetadataChange_falseWhenTrackingLogEmpty(@TempDir File tempDir) {
++    DeltaSourceMetadataTrackingLog log =
++        createTrackingLog(tempDir, /* writeInitialEntry= */ false);
++    MetadataEvolutionHandler handler =
++        buildHandler(Option.apply(log), schemaReadOptions(/* allowUnsafe= */ false));
++    assertFalse(handler.shouldTrackMetadataChange());
++  }
++
++  @Test
++  public void testShouldTrackMetadataChange_trueWhenLogHasEntry(@TempDir File tempDir) {
++    DeltaSourceMetadataTrackingLog log =
++        createTrackingLog(tempDir, /* writeInitialEntry= */ true);
++    MetadataEvolutionHandler handler =
++        buildHandler(Option.apply(log), schemaReadOptions(/* allowUnsafe= */ false));
++    assertTrue(handler.shouldTrackMetadataChange());
++  }
++
++  // ---------------------------------------------------------------------------
++  // shouldInitializeMetadataTrackingEagerly
++  // ---------------------------------------------------------------------------
++
++  @Test
++  public void testShouldInitializeEagerly_falseWhenUnsafeReadAllowed() {
++    MetadataEvolutionHandler handler =
++        buildHandler(Option.empty(), schemaReadOptions(/* allowUnsafe= */ true));
++    assertFalse(handler.shouldInitializeMetadataTrackingEagerly());
++  }
++
++  @Test
++  public void testShouldInitializeEagerly_falseWhenNoTrackingLog() {
++    MetadataEvolutionHandler handler =
++        buildHandler(Option.empty(), schemaReadOptions(/* allowUnsafe= */ false));
++    assertFalse(handler.shouldInitializeMetadataTrackingEagerly());
++  }
++
++  @Test
++  public void testShouldInitializeEagerly_trueWhenLogEmptyAndEagerEnabled(@TempDir File tempDir) {
++    DeltaSourceMetadataTrackingLog log =
++        createTrackingLog(tempDir, /* writeInitialEntry= */ false);
++    MetadataEvolutionHandler handler =
++        buildHandler(Option.apply(log), schemaReadOptions(/* allowUnsafe= */ false));
++    assertTrue(handler.shouldInitializeMetadataTrackingEagerly());
++  }
++
++  @Test
++  public void testShouldInitializeEagerly_falseWhenLogHasEntry(@TempDir File tempDir) {
++    DeltaSourceMetadataTrackingLog log =
++        createTrackingLog(tempDir, /* writeInitialEntry= */ true);
++    MetadataEvolutionHandler handler =
++        buildHandler(Option.apply(log), schemaReadOptions(/* allowUnsafe= */ false));
++    // Log already has an entry -> not "ready to initialize"
++    assertFalse(handler.shouldInitializeMetadataTrackingEagerly());
++  }
++
++  // ---------------------------------------------------------------------------
++  // stopIndexedFileIteratorAtSchemaChangeBarrier
++  // ---------------------------------------------------------------------------
++
++  @Test
++  public void testStopAtBarrier_passesAllWhenNoBarrier() {
++    MetadataEvolutionHandler handler =
++        buildHandler(Option.empty(), schemaReadOptions(/* allowUnsafe= */ true));
++
++    List<IndexedFile> files =
++        Arrays.asList(
++            IndexedFile.sentinel(1L, 0L),
++            IndexedFile.sentinel(1L, 1L),
++            IndexedFile.sentinel(1L, 2L));
++
++    List<IndexedFile> result =
++        drain(
++            handler.stopIndexedFileIteratorAtSchemaChangeBarrier(
++                Utils.toCloseableIterator(files.iterator())));
++    assertEquals(3, result.size());
++  }
++
++  @Test
++  public void testStopAtBarrier_truncatesAtBarrierInclusive() {
++    MetadataEvolutionHandler handler =
++        buildHandler(Option.empty(), schemaReadOptions(/* allowUnsafe= */ true));
++
++    long barrierIdx = DeltaSourceOffset.METADATA_CHANGE_INDEX();
++    List<IndexedFile> files =
++        Arrays.asList(
++            IndexedFile.sentinel(1L, 0L),
++            IndexedFile.sentinel(1L, 1L),
++            IndexedFile.sentinel(1L, barrierIdx),
++            IndexedFile.sentinel(1L, 3L),
++            IndexedFile.sentinel(1L, 4L));
++
++    List<IndexedFile> result =
++        drain(
++            handler.stopIndexedFileIteratorAtSchemaChangeBarrier(
++                Utils.toCloseableIterator(files.iterator())));
++    assertEquals(3, result.size());
++    assertEquals(0L, result.get(0).getIndex());
++    assertEquals(1L, result.get(1).getIndex());
++    assertEquals(barrierIdx, result.get(2).getIndex());
++  }
++
++  @Test
++  public void testStopAtBarrier_barrierIsFirst() {
++    MetadataEvolutionHandler handler =
++        buildHandler(Option.empty(), schemaReadOptions(/* allowUnsafe= */ true));
++
++    long barrierIdx = DeltaSourceOffset.METADATA_CHANGE_INDEX();
++    List<IndexedFile> files =
++        Arrays.asList(IndexedFile.sentinel(1L, barrierIdx), IndexedFile.sentinel(1L, 0L));
++
++    List<IndexedFile> result =
++        drain(
++            handler.stopIndexedFileIteratorAtSchemaChangeBarrier(
++                Utils.toCloseableIterator(files.iterator())));
++    assertEquals(1, result.size());
++    assertEquals(barrierIdx, result.get(0).getIndex());
++  }
++
++  @Test
++  public void testStopAtBarrier_emptyIterator() {
++    MetadataEvolutionHandler handler =
++        buildHandler(Option.empty(), schemaReadOptions(/* allowUnsafe= */ true));
++
++    List<IndexedFile> result =
++        drain(
++            handler.stopIndexedFileIteratorAtSchemaChangeBarrier(
++                Utils.toCloseableIterator(Collections.<IndexedFile>emptyIterator())));
++    assertTrue(result.isEmpty());
++  }
++
++  // ---------------------------------------------------------------------------
++  // getMetadataOrProtocolChangeIndexedFileIterator
++  // ---------------------------------------------------------------------------
++
++  @Test
++  public void testGetChangeIterator_emptyWhenNotTracking() {
++    MetadataEvolutionHandler handler =
++        buildHandler(Option.empty(), schemaReadOptions(/* allowUnsafe= */ true));
++
++    Protocol differentProtocol = new Protocol(3, 7);
++    List<IndexedFile> result =
++        drain(
++            handler.getMetadataOrProtocolChangeIndexedFileIterator(
++                DEFAULT_METADATA, differentProtocol, 1L));
++    assertTrue(result.isEmpty());
++  }
++
++  @Test
++  public void testGetChangeIterator_emptyWhenMetadataUnchanged(@TempDir File tempDir) {
++    // Tracking is on and log has an entry, but metadata/protocol match init values -> no barrier
++    DeltaSourceMetadataTrackingLog log =
++        createTrackingLog(tempDir, /* writeInitialEntry= */ true);
++    MetadataEvolutionHandler handler =
++        buildHandler(Option.apply(log), schemaReadOptions(/* allowUnsafe= */ false));
++
++    List<IndexedFile> result =
++        drain(
++            handler.getMetadataOrProtocolChangeIndexedFileIterator(
++                DEFAULT_METADATA, DEFAULT_PROTOCOL, 1L));
++    assertTrue(result.isEmpty());
++  }
++
++  @Test
++  public void testGetChangeIterator_returnsBarrierOnProtocolChange(@TempDir File tempDir) {
++    DeltaSourceMetadataTrackingLog log =
++        createTrackingLog(tempDir, /* writeInitialEntry= */ true);
++    MetadataEvolutionHandler handler =
++        buildHandler(Option.apply(log), schemaReadOptions(/* allowUnsafe= */ false));
++
++    // Different protocol version -> should detect change and return a barrier
++    Protocol changedProtocol = new Protocol(3, 7);
++    List<IndexedFile> result =
++        drain(
++            handler.getMetadataOrProtocolChangeIndexedFileIterator(
++                DEFAULT_METADATA, changedProtocol, 5L));
++    assertEquals(1, result.size());
++    assertEquals(5L, result.get(0).getVersion());
++    assertEquals(DeltaSourceOffset.METADATA_CHANGE_INDEX(), result.get(0).getIndex());
++    assertFalse(result.get(0).hasFileAction());
++  }
++
++  @Test
++  public void testGetChangeIterator_returnsBarrierOnSchemaChange(@TempDir File tempDir) {
++    DeltaSourceMetadataTrackingLog log =
++        createTrackingLog(tempDir, /* writeInitialEntry= */ true);
++    MetadataEvolutionHandler handler =
++        buildHandler(Option.apply(log), schemaReadOptions(/* allowUnsafe= */ false));
++
++    // Create metadata with a different schema (added column)
++    StructType changedSchema =
++        new StructType()
++            .add("c1", IntegerType.INTEGER)
++            .add("c2", StringType.STRING)
++            .add("c3", IntegerType.INTEGER);
++    String changedSchemaJson =
++        "{\"type\":\"struct\",\"fields\":["
++            + "{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},"
++            + "{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},"
++            + "{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}";
++    Metadata changedMetadata =
++        new Metadata(
++            "test-id",
++            Optional.empty(),
++            Optional.empty(),
++            new Format("parquet", Collections.emptyMap()),
++            changedSchemaJson,
++            changedSchema,
++            emptyArrayValue(),
++            Optional.empty(),
++            VectorUtils.stringStringMapValue(Collections.emptyMap()));
++
++    List<IndexedFile> result =
++        drain(
++            handler.getMetadataOrProtocolChangeIndexedFileIterator(
++                changedMetadata, DEFAULT_PROTOCOL, 7L));
++    assertEquals(1, result.size());
++    assertEquals(7L, result.get(0).getVersion());
++    assertEquals(DeltaSourceOffset.METADATA_CHANGE_INDEX(), result.get(0).getIndex());
++  }
++
++  @Test
++  public void testGetChangeIterator_returnsBarrierOnConfigChange(@TempDir File tempDir) {
++    DeltaSourceMetadataTrackingLog log =
++        createTrackingLog(tempDir, /* writeInitialEntry= */ true);
++    MetadataEvolutionHandler handler =
++        buildHandler(Option.apply(log), schemaReadOptions(/* allowUnsafe= */ false));
++
++    // Same schema but different delta configuration
++    Map<String, String> changedConfig = new HashMap<>();
++    changedConfig.put("delta.enableChangeDataFeed", "true");
++    Metadata changedMetadata =
++        new Metadata(
++            "test-id",
++            Optional.empty(),
++            Optional.empty(),
++            new Format("parquet", Collections.emptyMap()),
++            SIMPLE_SCHEMA_JSON,
++            SIMPLE_KERNEL_SCHEMA,
++            emptyArrayValue(),
++            Optional.empty(),
++            VectorUtils.stringStringMapValue(changedConfig));
++
++    List<IndexedFile> result =
++        drain(
++            handler.getMetadataOrProtocolChangeIndexedFileIterator(
++                changedMetadata, DEFAULT_PROTOCOL, 3L));
++    assertEquals(1, result.size());
++    assertEquals(DeltaSourceOffset.METADATA_CHANGE_INDEX(), result.get(0).getIndex());
++  }
++
++  // ---------------------------------------------------------------------------
++  // getNextOffsetFromPreviousOffsetIfPendingSchemaChange
++  // ---------------------------------------------------------------------------
++
++  @Test
++  public void testGetNextOffset_advancesToPostMetadataChange() {
++    MetadataEvolutionHandler handler =
++        buildHandler(Option.empty(), schemaReadOptions(/* allowUnsafe= */ true));
++
++    DeltaSourceOffset offset =
++        DeltaSourceOffset.apply(
++            "test-reservoir", 5L, DeltaSourceOffset.METADATA_CHANGE_INDEX(), false);
++
++    DeltaSourceOffset next = handler.getNextOffsetFromPreviousOffsetIfPendingSchemaChange(offset);
++    assertNotNull(next);
++    assertEquals(DeltaSourceOffset.POST_METADATA_CHANGE_INDEX(), next.index());
++    assertEquals(5L, next.reservoirVersion());
++    assertFalse(next.isInitialSnapshot());
++  }
++
++  @Test
++  public void testGetNextOffset_returnsNullForRegularIndex() {
++    MetadataEvolutionHandler handler =
++        buildHandler(Option.empty(), schemaReadOptions(/* allowUnsafe= */ true));
++
++    DeltaSourceOffset offset = DeltaSourceOffset.apply("test-reservoir", 5L, 10L, false);
++    assertNull(handler.getNextOffsetFromPreviousOffsetIfPendingSchemaChange(offset));
++  }
++
++  @Test
++  public void testGetNextOffset_preservesIsInitialSnapshot() {
++    MetadataEvolutionHandler handler =
++        buildHandler(Option.empty(), schemaReadOptions(/* allowUnsafe= */ true));
++
++    DeltaSourceOffset offset =
++        DeltaSourceOffset.apply(
++            "test-reservoir", 3L, DeltaSourceOffset.METADATA_CHANGE_INDEX(), true);
++
++    DeltaSourceOffset next = handler.getNextOffsetFromPreviousOffsetIfPendingSchemaChange(offset);
++    assertNotNull(next);
++    assertEquals(DeltaSourceOffset.POST_METADATA_CHANGE_INDEX(), next.index());
++    assertTrue(next.isInitialSnapshot());
++  }
++
++  @Test
++  public void testGetNextOffset_postMetadataWithNoChange_returnsNull(@TempDir File tempDir) {
++    // Needs real table because POST_METADATA_CHANGE_INDEX calls collectMetadataAtVersion
++    MetadataEvolutionHandler handler =
++        buildHandlerWithRealTable(
++            tempDir, Option.empty(), schemaReadOptions(/* allowUnsafe= */ true));
++
++    DeltaSourceOffset offset =
++        DeltaSourceOffset.apply(
++            "test-reservoir", 0L, DeltaSourceOffset.POST_METADATA_CHANGE_INDEX(), false);
++
++    // Version 0 has same metadata -> no actual change -> returns null
++    assertNull(handler.getNextOffsetFromPreviousOffsetIfPendingSchemaChange(offset));
++  }
++
++  // ---------------------------------------------------------------------------
++  // updateMetadataTrackingLogAndFailIfNeeded (offset-based)
++  // ---------------------------------------------------------------------------
++
++  @Test
++  public void testUpdateTrackingLog_noOpWhenNotTracking() {
++    MetadataEvolutionHandler handler =
++        buildHandler(Option.empty(), schemaReadOptions(/* allowUnsafe= */ true));
++
++    DeltaSourceOffset offset =
++        DeltaSourceOffset.apply(
++            "test-reservoir", 0L, DeltaSourceOffset.METADATA_CHANGE_INDEX(), false);
++    // Should not throw — tracking is disabled
++    handler.updateMetadataTrackingLogAndFailIfNeeded(offset);
++  }
++
++  @Test
++  public void testUpdateTrackingLog_noOpForRegularIndex() {
++    MetadataEvolutionHandler handler =
++        buildHandler(Option.empty(), schemaReadOptions(/* allowUnsafe= */ true));
++
++    DeltaSourceOffset offset = DeltaSourceOffset.apply("test-reservoir", 0L, 5L, false);
++    handler.updateMetadataTrackingLogAndFailIfNeeded(offset);
++  }
++
++  @Test
++  public void testUpdateTrackingLog_throwsOnSchemaChangeAtBarrier(@TempDir File tempDir) {
++    // Create a real table and make a schema change at version 1
++    String tablePath = tempDir.getAbsolutePath();
++    String tableName = "t_" + UUID.randomUUID().toString().replace('-', '_');
++    createEmptyTestTable(tablePath, tableName);
++    // ALTER TABLE adds a column -> creates version 1 with new metadata
++    spark.sql(String.format("ALTER TABLE %s ADD COLUMNS (c3 INT)", tableName));
++
++    PathBasedSnapshotManager sm =
++        new PathBasedSnapshotManager(tablePath, spark.sessionState().newHadoopConf());
++
++    // Create a tracking log with an initial entry matching version 0 metadata
++    String metadataLogPath = new File(tempDir, "metadata_log").getAbsolutePath();
++    scala.collection.immutable.Map<String, String> emptyScalaMap =
++        scala.collection.immutable.Map$.MODULE$.empty();
++    DeltaSourceMetadataTrackingLog log =
++        DeltaSourceMetadataTrackingLog.create(
++            spark,
++            metadataLogPath,
++            "test-table-id",
++            tablePath,
++            emptyScalaMap,
++            Option.empty(),
++            false,
++            scala.Function1$.MODULE$.<PersistedMetadata, Option<PersistedMetadata>>const$(
++                Option.empty()),
++            true);
++    // Write initial entry so shouldTrackMetadataChange() returns true
++    PersistedMetadata initEntry =
++        PersistedMetadata.apply(
++            "test-table-id",
++            0L,
++            new SparkMetadataAdapter(DEFAULT_METADATA),
++            new SparkProtocolAdapter(DEFAULT_PROTOCOL),
++            tablePath + "/_delta_log/_streaming_metadata");
++    log.writeNewMetadata(initEntry, false);
++
++    DeltaOptions deltaOptions =
++        new DeltaOptions(
++            scala.collection.immutable.Map$.MODULE$.<String, String>empty(),
++            spark.sessionState().conf());
++    MetadataEvolutionHandler handler =
++        new MetadataEvolutionHandler(
++            spark,
++            "test-table-id",
++            tablePath,
++            sm,
++            defaultEngine,
++            deltaOptions,
++            schemaReadOptions(/* allowUnsafe= */ false),
++            Option.apply(log),
++            DEFAULT_METADATA,
++            SIMPLE_SPARK_SCHEMA,
++            new org.apache.spark.sql.types.StructType(),
++            Collections.emptyMap(),
++            DEFAULT_PROTOCOL,
++            tablePath + "/_delta_log/_streaming_metadata");
++
++    // The barrier offset points to version 1 which has a schema change
++    DeltaSourceOffset offset =
++        DeltaSourceOffset.apply(
++            "test-reservoir", 1L, DeltaSourceOffset.METADATA_CHANGE_INDEX(), false);
++
++    RuntimeException ex =
++        assertThrows(
++            RuntimeException.class,
++            () -> handler.updateMetadataTrackingLogAndFailIfNeeded(offset));
++    assertTrue(ex.getMessage().contains("DELTA_STREAMING_METADATA_EVOLUTION"));
++  }
++
++  @Test
++  public void testUpdateTrackingLog_4arg_throwsOnMetadataChange(@TempDir File tempDir) {
++    DeltaSourceMetadataTrackingLog log =
++        createTrackingLog(tempDir, /* writeInitialEntry= */ true);
++    MetadataEvolutionHandler handler =
++        buildHandler(Option.apply(log), schemaReadOptions(/* allowUnsafe= */ false));
++
++    // Pass a changed protocol directly
++    Protocol changedProtocol = new Protocol(3, 7);
++    RuntimeException ex =
++        assertThrows(
++            RuntimeException.class,
++            () ->
++                handler.updateMetadataTrackingLogAndFailIfNeeded(
++                    null, changedProtocol, 5L, /* replace= */ false));
++    assertTrue(ex.getMessage().contains("DELTA_STREAMING_METADATA_EVOLUTION"));
++  }
++
++  @Test
++  public void testUpdateTrackingLog_4arg_noOpWhenMetadataUnchanged(@TempDir File tempDir) {
++    DeltaSourceMetadataTrackingLog log =
++        createTrackingLog(tempDir, /* writeInitialEntry= */ true);
++    MetadataEvolutionHandler handler =
++        buildHandler(Option.apply(log), schemaReadOptions(/* allowUnsafe= */ false));
++
++    // Same metadata/protocol -> no change -> should not throw
++    handler.updateMetadataTrackingLogAndFailIfNeeded(
++        DEFAULT_METADATA, DEFAULT_PROTOCOL, 1L, /* replace= */ false);
++  }
++
++  // ---------------------------------------------------------------------------
++  // initializeMetadataTrackingAndExitStream
++  // ---------------------------------------------------------------------------
++
++  @Test
++  public void testInitialize_writesEntryAndDoesNotThrowWhenMetadataMatches(@TempDir File tempDir) {
++    // Create a real table (version 0)
++    String tablePath = tempDir.getAbsolutePath();
++    String tableName = "t_" + UUID.randomUUID().toString().replace('-', '_');
++    createEmptyTestTable(tablePath, tableName);
++
++    PathBasedSnapshotManager sm =
++        new PathBasedSnapshotManager(tablePath, spark.sessionState().newHadoopConf());
++
++    // Get the actual metadata from the table so init values match
++    io.delta.kernel.internal.SnapshotImpl snapshot =
++        (io.delta.kernel.internal.SnapshotImpl) sm.loadSnapshotAt(0);
++    Metadata tableMetadata = snapshot.getMetadata();
++    Protocol tableProtocol = snapshot.getProtocol();
++    SparkMetadataAdapter metadataAdapter = new SparkMetadataAdapter(tableMetadata);
++
++    String metadataLogPath = new File(tempDir, "metadata_log").getAbsolutePath();
++    scala.collection.immutable.Map<String, String> emptyScalaMap =
++        scala.collection.immutable.Map$.MODULE$.empty();
++    DeltaSourceMetadataTrackingLog log =
++        DeltaSourceMetadataTrackingLog.create(
++            spark,
++            metadataLogPath,
++            "test-table-id",
++            tablePath,
++            emptyScalaMap,
++            Option.empty(),
++            false,
++            scala.Function1$.MODULE$.<PersistedMetadata, Option<PersistedMetadata>>const$(
++                Option.empty()),
++            true);
++
++    DeltaOptions deltaOptions =
++        new DeltaOptions(
++            scala.collection.immutable.Map$.MODULE$.<String, String>empty(),
++            spark.sessionState().conf());
++    MetadataEvolutionHandler handler =
++        new MetadataEvolutionHandler(
++            spark,
++            "test-table-id",
++            tablePath,
++            sm,
++            defaultEngine,
++            deltaOptions,
++            schemaReadOptions(/* allowUnsafe= */ false),
++            Option.apply(log),
++            tableMetadata,
++            metadataAdapter.schema(),
++            metadataAdapter.partitionSchema(),
++            scala.jdk.javaapi.CollectionConverters.asJava(metadataAdapter.configuration()),
++            tableProtocol,
++            tablePath + "/_delta_log/_streaming_metadata");
++
++    // Metadata at version 0 matches init -> should write entry but NOT throw
++    handler.initializeMetadataTrackingAndExitStream(
++        0L, /* batchEndVersion= */ null, /* alwaysFailUponLogInitialized= */ false);
++
++    // Verify the entry was written to the tracking log
++    assertTrue(log.getCurrentTrackedMetadata().isDefined());
++    assertEquals(0L, log.getCurrentTrackedMetadata().get().deltaCommitVersion());
++  }
++
++  @Test
++  public void testInitialize_throwsWhenAlwaysFailIsTrue(@TempDir File tempDir) {
++    String tablePath = tempDir.getAbsolutePath();
++    String tableName = "t_" + UUID.randomUUID().toString().replace('-', '_');
++    createEmptyTestTable(tablePath, tableName);
++
++    PathBasedSnapshotManager sm =
++        new PathBasedSnapshotManager(tablePath, spark.sessionState().newHadoopConf());
++    io.delta.kernel.internal.SnapshotImpl snapshot =
++        (io.delta.kernel.internal.SnapshotImpl) sm.loadSnapshotAt(0);
++    Metadata tableMetadata = snapshot.getMetadata();
++    Protocol tableProtocol = snapshot.getProtocol();
++    SparkMetadataAdapter metadataAdapter = new SparkMetadataAdapter(tableMetadata);
++
++    String metadataLogPath = new File(tempDir, "metadata_log").getAbsolutePath();
++    scala.collection.immutable.Map<String, String> emptyScalaMap =
++        scala.collection.immutable.Map$.MODULE$.empty();
++    DeltaSourceMetadataTrackingLog log =
++        DeltaSourceMetadataTrackingLog.create(
++            spark,
++            metadataLogPath,
++            "test-table-id",
++            tablePath,
++            emptyScalaMap,
++            Option.empty(),
++            false,
++            scala.Function1$.MODULE$.<PersistedMetadata, Option<PersistedMetadata>>const$(
++                Option.empty()),
++            true);
++
++    DeltaOptions deltaOptions =
++        new DeltaOptions(
++            scala.collection.immutable.Map$.MODULE$.<String, String>empty(),
++            spark.sessionState().conf());
++    MetadataEvolutionHandler handler =
++        new MetadataEvolutionHandler(
++            spark,
++            "test-table-id",
++            tablePath,
++            sm,
++            defaultEngine,
++            deltaOptions,
++            schemaReadOptions(/* allowUnsafe= */ false),
++            Option.apply(log),
++            tableMetadata,
++            metadataAdapter.schema(),
++            metadataAdapter.partitionSchema(),
++            scala.jdk.javaapi.CollectionConverters.asJava(metadataAdapter.configuration()),
++            tableProtocol,
++            tablePath + "/_delta_log/_streaming_metadata");
++
++    // Even though metadata matches, alwaysFailUponLogInitialized = true -> throws
++    RuntimeException ex =
++        assertThrows(
++            RuntimeException.class,
++            () ->
++                handler.initializeMetadataTrackingAndExitStream(
++                    0L, /* batchEndVersion= */ null, /* alwaysFailUponLogInitialized= */ true));
++    assertTrue(ex.getMessage().contains("DELTA_STREAMING_METADATA_EVOLUTION"));
++
++    // Entry should still have been written before throwing
++    assertTrue(log.getCurrentTrackedMetadata().isDefined());
++  }
++
++  @Test
++  public void testInitialize_throwsWhenMetadataChangedSinceInit(@TempDir File tempDir) {
++    String tablePath = tempDir.getAbsolutePath();
++    String tableName = "t_" + UUID.randomUUID().toString().replace('-', '_');
++    createEmptyTestTable(tablePath, tableName);
++
++    PathBasedSnapshotManager sm =
++        new PathBasedSnapshotManager(tablePath, spark.sessionState().newHadoopConf());
++
++    // Capture version 0 metadata as the init state
++    io.delta.kernel.internal.SnapshotImpl snapshot0 =
++        (io.delta.kernel.internal.SnapshotImpl) sm.loadSnapshotAt(0);
++    Metadata initMetadata = snapshot0.getMetadata();
++    Protocol initProtocol = snapshot0.getProtocol();
++    SparkMetadataAdapter initAdapter = new SparkMetadataAdapter(initMetadata);
++
++    // Evolve the table: add a column -> creates version 1 with different metadata
++    spark.sql(String.format("ALTER TABLE %s ADD COLUMNS (c3 INT)", tableName));
++
++    String metadataLogPath = new File(tempDir, "metadata_log").getAbsolutePath();
++    scala.collection.immutable.Map<String, String> emptyScalaMap =
++        scala.collection.immutable.Map$.MODULE$.empty();
++    DeltaSourceMetadataTrackingLog log =
++        DeltaSourceMetadataTrackingLog.create(
++            spark,
++            metadataLogPath,
++            "test-table-id",
++            tablePath,
++            emptyScalaMap,
++            Option.empty(),
++            false,
++            scala.Function1$.MODULE$.<PersistedMetadata, Option<PersistedMetadata>>const$(
++                Option.empty()),
++            true);
++
++    DeltaOptions deltaOptions =
++        new DeltaOptions(
++            scala.collection.immutable.Map$.MODULE$.<String, String>empty(),
++            spark.sessionState().conf());
++    MetadataEvolutionHandler handler =
++        new MetadataEvolutionHandler(
++            spark,
++            "test-table-id",
++            tablePath,
++            sm,
++            defaultEngine,
++            deltaOptions,
++            schemaReadOptions(/* allowUnsafe= */ false),
++            Option.apply(log),
++            initMetadata,
++            initAdapter.schema(),
++            initAdapter.partitionSchema(),
++            scala.jdk.javaapi.CollectionConverters.asJava(initAdapter.configuration()),
++            initProtocol,
++            tablePath + "/_delta_log/_streaming_metadata");
++
++    // Initialize at version 1 which has different schema than init (version 0) -> throws
++    RuntimeException ex =
++        assertThrows(
++            RuntimeException.class,
++            () ->
++                handler.initializeMetadataTrackingAndExitStream(
++                    1L, /* batchEndVersion= */ null, /* alwaysFailUponLogInitialized= */ false));
++    assertTrue(ex.getMessage().contains("DELTA_STREAMING_METADATA_EVOLUTION"));
++
++    // Entry should have been written with version 1 metadata
++    assertTrue(log.getCurrentTrackedMetadata().isDefined());
++    assertEquals(1L, log.getCurrentTrackedMetadata().get().deltaCommitVersion());
++  }
++}
\ No newline at end of file

Reproduce locally: git range-diff 2490e84..b69031a 2490e84..82d0cb9 | Disable: git config gitstack.push-range-diff false

@PorridgeSwim PorridgeSwim force-pushed the stack/MetadataEvolutionHandler2 branch from 82d0cb9 to 9af6e73 Compare April 14, 2026 20:54
@PorridgeSwim
Copy link
Copy Markdown
Collaborator Author

Range-diff: stack/RefactorDeltaSourceMetadataEvolutionSupport (82d0cb9 -> 9af6e73)
spark/v2/src/test/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandlerTest.java
@@ -23,6 +23,8 @@
 +
 +import io.delta.kernel.CommitRange;
 +import io.delta.kernel.Snapshot;
++import io.delta.kernel.data.ArrayValue;
++import io.delta.kernel.data.ColumnVector;
 +import io.delta.kernel.engine.Engine;
 +import io.delta.kernel.internal.DeltaHistoryManager;
 +import io.delta.kernel.internal.actions.Format;
@@ -31,8 +33,6 @@
 +import io.delta.kernel.internal.util.InternalUtils;
 +import io.delta.kernel.internal.util.Utils;
 +import io.delta.kernel.internal.util.VectorUtils;
-+import io.delta.kernel.data.ArrayValue;
-+import io.delta.kernel.data.ColumnVector;
 +import io.delta.kernel.types.IntegerType;
 +import io.delta.kernel.types.StringType;
 +import io.delta.kernel.types.StructType;
@@ -46,6 +46,7 @@
 +import java.io.File;
 +import java.util.*;
 +import org.apache.spark.sql.delta.DeltaOptions;
++import org.apache.spark.sql.delta.DeltaRuntimeException;
 +import org.apache.spark.sql.delta.sources.DeltaSourceMetadataTrackingLog;
 +import org.apache.spark.sql.delta.sources.DeltaSourceOffset;
 +import org.apache.spark.sql.delta.sources.DeltaStreamUtils;
@@ -266,6 +267,17 @@
 +    return result;
 +  }
 +
++  /**
++   * Assert that a DELTA_STREAMING_METADATA_EVOLUTION exception contains the expected message
++   * parameter keys: schema, config, protocol.
++   */
++  private static void assertMetadataEvolutionMessageParameters(DeltaRuntimeException ex) {
++    java.util.Map<String, String> params = ex.getMessageParameters();
++    assertTrue(params.containsKey("schema"), "Missing 'schema' message parameter");
++    assertTrue(params.containsKey("config"), "Missing 'config' message parameter");
++    assertTrue(params.containsKey("protocol"), "Missing 'protocol' message parameter");
++  }
++
 +  // ---------------------------------------------------------------------------
 +  // shouldTrackMetadataChange
 +  // ---------------------------------------------------------------------------
@@ -286,8 +298,7 @@
 +
 +  @Test
 +  public void testShouldTrackMetadataChange_falseWhenTrackingLogEmpty(@TempDir File tempDir) {
-+    DeltaSourceMetadataTrackingLog log =
-+        createTrackingLog(tempDir, /* writeInitialEntry= */ false);
++    DeltaSourceMetadataTrackingLog log = createTrackingLog(tempDir, /* writeInitialEntry= */ false);
 +    MetadataEvolutionHandler handler =
 +        buildHandler(Option.apply(log), schemaReadOptions(/* allowUnsafe= */ false));
 +    assertFalse(handler.shouldTrackMetadataChange());
@@ -295,8 +306,7 @@
 +
 +  @Test
 +  public void testShouldTrackMetadataChange_trueWhenLogHasEntry(@TempDir File tempDir) {
-+    DeltaSourceMetadataTrackingLog log =
-+        createTrackingLog(tempDir, /* writeInitialEntry= */ true);
++    DeltaSourceMetadataTrackingLog log = createTrackingLog(tempDir, /* writeInitialEntry= */ true);
 +    MetadataEvolutionHandler handler =
 +        buildHandler(Option.apply(log), schemaReadOptions(/* allowUnsafe= */ false));
 +    assertTrue(handler.shouldTrackMetadataChange());
@@ -322,8 +332,7 @@
 +
 +  @Test
 +  public void testShouldInitializeEagerly_trueWhenLogEmptyAndEagerEnabled(@TempDir File tempDir) {
-+    DeltaSourceMetadataTrackingLog log =
-+        createTrackingLog(tempDir, /* writeInitialEntry= */ false);
++    DeltaSourceMetadataTrackingLog log = createTrackingLog(tempDir, /* writeInitialEntry= */ false);
 +    MetadataEvolutionHandler handler =
 +        buildHandler(Option.apply(log), schemaReadOptions(/* allowUnsafe= */ false));
 +    assertTrue(handler.shouldInitializeMetadataTrackingEagerly());
@@ -331,8 +340,7 @@
 +
 +  @Test
 +  public void testShouldInitializeEagerly_falseWhenLogHasEntry(@TempDir File tempDir) {
-+    DeltaSourceMetadataTrackingLog log =
-+        createTrackingLog(tempDir, /* writeInitialEntry= */ true);
++    DeltaSourceMetadataTrackingLog log = createTrackingLog(tempDir, /* writeInitialEntry= */ true);
 +    MetadataEvolutionHandler handler =
 +        buildHandler(Option.apply(log), schemaReadOptions(/* allowUnsafe= */ false));
 +    // Log already has an entry -> not "ready to initialize"
@@ -434,8 +442,7 @@
 +  @Test
 +  public void testGetChangeIterator_emptyWhenMetadataUnchanged(@TempDir File tempDir) {
 +    // Tracking is on and log has an entry, but metadata/protocol match init values -> no barrier
-+    DeltaSourceMetadataTrackingLog log =
-+        createTrackingLog(tempDir, /* writeInitialEntry= */ true);
++    DeltaSourceMetadataTrackingLog log = createTrackingLog(tempDir, /* writeInitialEntry= */ true);
 +    MetadataEvolutionHandler handler =
 +        buildHandler(Option.apply(log), schemaReadOptions(/* allowUnsafe= */ false));
 +
@@ -448,8 +455,7 @@
 +
 +  @Test
 +  public void testGetChangeIterator_returnsBarrierOnProtocolChange(@TempDir File tempDir) {
-+    DeltaSourceMetadataTrackingLog log =
-+        createTrackingLog(tempDir, /* writeInitialEntry= */ true);
++    DeltaSourceMetadataTrackingLog log = createTrackingLog(tempDir, /* writeInitialEntry= */ true);
 +    MetadataEvolutionHandler handler =
 +        buildHandler(Option.apply(log), schemaReadOptions(/* allowUnsafe= */ false));
 +
@@ -467,8 +473,7 @@
 +
 +  @Test
 +  public void testGetChangeIterator_returnsBarrierOnSchemaChange(@TempDir File tempDir) {
-+    DeltaSourceMetadataTrackingLog log =
-+        createTrackingLog(tempDir, /* writeInitialEntry= */ true);
++    DeltaSourceMetadataTrackingLog log = createTrackingLog(tempDir, /* writeInitialEntry= */ true);
 +    MetadataEvolutionHandler handler =
 +        buildHandler(Option.apply(log), schemaReadOptions(/* allowUnsafe= */ false));
 +
@@ -506,8 +511,7 @@
 +
 +  @Test
 +  public void testGetChangeIterator_returnsBarrierOnConfigChange(@TempDir File tempDir) {
-+    DeltaSourceMetadataTrackingLog log =
-+        createTrackingLog(tempDir, /* writeInitialEntry= */ true);
++    DeltaSourceMetadataTrackingLog log = createTrackingLog(tempDir, /* writeInitialEntry= */ true);
 +    MetadataEvolutionHandler handler =
 +        buildHandler(Option.apply(log), schemaReadOptions(/* allowUnsafe= */ false));
 +
@@ -682,35 +686,41 @@
 +        DeltaSourceOffset.apply(
 +            "test-reservoir", 1L, DeltaSourceOffset.METADATA_CHANGE_INDEX(), false);
 +
-+    RuntimeException ex =
++    DeltaRuntimeException ex =
 +        assertThrows(
-+            RuntimeException.class,
++            DeltaRuntimeException.class,
 +            () -> handler.updateMetadataTrackingLogAndFailIfNeeded(offset));
-+    assertTrue(ex.getMessage().contains("DELTA_STREAMING_METADATA_EVOLUTION"));
++    assertEquals(
++        "DELTA_STREAMING_METADATA_EVOLUTION",
++        ex.getErrorClass(),
++        "Should throw metadata evolution exception when schema changed at barrier version");
++    assertMetadataEvolutionMessageParameters(ex);
 +  }
 +
 +  @Test
 +  public void testUpdateTrackingLog_4arg_throwsOnMetadataChange(@TempDir File tempDir) {
-+    DeltaSourceMetadataTrackingLog log =
-+        createTrackingLog(tempDir, /* writeInitialEntry= */ true);
++    DeltaSourceMetadataTrackingLog log = createTrackingLog(tempDir, /* writeInitialEntry= */ true);
 +    MetadataEvolutionHandler handler =
 +        buildHandler(Option.apply(log), schemaReadOptions(/* allowUnsafe= */ false));
 +
 +    // Pass a changed protocol directly
 +    Protocol changedProtocol = new Protocol(3, 7);
-+    RuntimeException ex =
++    DeltaRuntimeException ex =
 +        assertThrows(
-+            RuntimeException.class,
++            DeltaRuntimeException.class,
 +            () ->
 +                handler.updateMetadataTrackingLogAndFailIfNeeded(
 +                    null, changedProtocol, 5L, /* replace= */ false));
-+    assertTrue(ex.getMessage().contains("DELTA_STREAMING_METADATA_EVOLUTION"));
++    assertEquals(
++        "DELTA_STREAMING_METADATA_EVOLUTION",
++        ex.getErrorClass(),
++        "Should throw metadata evolution exception when protocol changed");
++    assertMetadataEvolutionMessageParameters(ex);
 +  }
 +
 +  @Test
 +  public void testUpdateTrackingLog_4arg_noOpWhenMetadataUnchanged(@TempDir File tempDir) {
-+    DeltaSourceMetadataTrackingLog log =
-+        createTrackingLog(tempDir, /* writeInitialEntry= */ true);
++    DeltaSourceMetadataTrackingLog log = createTrackingLog(tempDir, /* writeInitialEntry= */ true);
 +    MetadataEvolutionHandler handler =
 +        buildHandler(Option.apply(log), schemaReadOptions(/* allowUnsafe= */ false));
 +
@@ -838,13 +848,17 @@
 +            tablePath + "/_delta_log/_streaming_metadata");
 +
 +    // Even though metadata matches, alwaysFailUponLogInitialized = true -> throws
-+    RuntimeException ex =
++    DeltaRuntimeException ex =
 +        assertThrows(
-+            RuntimeException.class,
++            DeltaRuntimeException.class,
 +            () ->
 +                handler.initializeMetadataTrackingAndExitStream(
 +                    0L, /* batchEndVersion= */ null, /* alwaysFailUponLogInitialized= */ true));
-+    assertTrue(ex.getMessage().contains("DELTA_STREAMING_METADATA_EVOLUTION"));
++    assertEquals(
++        "DELTA_STREAMING_METADATA_EVOLUTION",
++        ex.getErrorClass(),
++        "Should throw metadata evolution exception when alwaysFailUponLogInitialized is true");
++    assertMetadataEvolutionMessageParameters(ex);
 +
 +    // Entry should still have been written before throwing
 +    assertTrue(log.getCurrentTrackedMetadata().isDefined());
@@ -907,13 +921,17 @@
 +            tablePath + "/_delta_log/_streaming_metadata");
 +
 +    // Initialize at version 1 which has different schema than init (version 0) -> throws
-+    RuntimeException ex =
++    DeltaRuntimeException ex =
 +        assertThrows(
-+            RuntimeException.class,
++            DeltaRuntimeException.class,
 +            () ->
 +                handler.initializeMetadataTrackingAndExitStream(
 +                    1L, /* batchEndVersion= */ null, /* alwaysFailUponLogInitialized= */ false));
-+    assertTrue(ex.getMessage().contains("DELTA_STREAMING_METADATA_EVOLUTION"));
++    assertEquals(
++        "DELTA_STREAMING_METADATA_EVOLUTION",
++        ex.getErrorClass(),
++        "Should throw metadata evolution exception when schema changed since init");
++    assertMetadataEvolutionMessageParameters(ex);
 +
 +    // Entry should have been written with version 1 metadata
 +    assertTrue(log.getCurrentTrackedMetadata().isDefined());

Reproduce locally: git range-diff 2490e84..82d0cb9 2490e84..9af6e73 | Disable: git config gitstack.push-range-diff false

@PorridgeSwim PorridgeSwim force-pushed the stack/MetadataEvolutionHandler2 branch from 9af6e73 to cdae0bb Compare April 14, 2026 21:18
@PorridgeSwim
Copy link
Copy Markdown
Collaborator Author

Range-diff: stack/RefactorDeltaSourceMetadataEvolutionSupport (9af6e73 -> cdae0bb)

... (truncated, output exceeded 60000 bytes)

Reproduce locally: git range-diff 2490e84..9af6e73 2490e84..cdae0bb | Disable: git config gitstack.push-range-diff false

@PorridgeSwim PorridgeSwim force-pushed the stack/MetadataEvolutionHandler2 branch from cdae0bb to ab61d5e Compare April 14, 2026 22:51
@PorridgeSwim
Copy link
Copy Markdown
Collaborator Author

Range-diff: stack/RefactorDeltaSourceMetadataEvolutionSupport (cdae0bb -> ab61d5e)
spark/v2/src/main/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandler.java
@@ -376,7 +376,7 @@
 +  /** Collect all metadata actions between start and end version, both inclusive. */
 +  private Map<Long, Metadata> collectMetadataActions(long startVersion, long endVersion) {
 +    return StreamingHelper.collectMetadataActionsFromRangeUnsafe(
-+        startVersion, Optional.of(endVersion + 1), snapshotManager, engine, tablePath);
++        startVersion, Optional.of(endVersion), snapshotManager, engine, tablePath);
 +  }
 +
 +  /** Collect the protocol action at a specific version. Returns null if none. */
@@ -387,7 +387,7 @@
 +  /** Collect all protocol actions between start and end version, both inclusive. */
 +  private Map<Long, Protocol> collectProtocolActions(long startVersion, long endVersion) {
 +    return StreamingHelper.collectProtocolActionsFromRangeUnsafe(
-+        startVersion, Optional.of(endVersion + 1), snapshotManager, engine, tablePath);
++        startVersion, Optional.of(endVersion), snapshotManager, engine, tablePath);
 +  }
 +
 +  /**
spark/v2/src/main/java/io/delta/spark/internal/v2/utils/StreamingHelper.java
@@ -23,6 +23,14 @@
    /**
     * Gets commit-level actions from a commit range without requiring a snapshot at the exact start
     * version.
+    * metadata action. Throws an exception if multiple metadata actions are found in the same commit.
+    *
+    * @param startVersion the starting version (inclusive) of the commit range
+-   * @param endVersionOpt optional ending version (exclusive) of the commit range
++   * @param endVersionOpt optional ending version (inclusive) of the commit range
+    * @param snapshotManager the Delta snapshot manager
+    * @param engine the Delta engine
+    * @param tablePath the path to the Delta table
      return versionToMetadata;
    }
  
@@ -35,7 +43,7 @@
 +   * protocol action. Throws an exception if multiple protocol actions are found in the same commit.
 +   *
 +   * @param startVersion the starting version (inclusive) of the commit range
-+   * @param endVersionOpt optional ending version (exclusive) of the commit range
++   * @param endVersionOpt optional ending version (inclusive) of the commit range
 +   * @param snapshotManager the Delta snapshot manager
 +   * @param engine the Delta engine
 +   * @param tablePath the path to the Delta table
spark/v2/src/test/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandlerTest.java
@@ -214,8 +214,8 @@
 +            EMPTY_SCALA_MAP,
 +            Option.empty(),
 +            false,
-+            scala.Function1$.MODULE$.<PersistedMetadata, Option<PersistedMetadata>>const$(
-+                Option.empty()),
++            null, // consecutiveSchemaChangesMerger — unused since
++            // mergeConsecutiveSchemaChanges=false
 +            true);
 +
 +    if (seedLogWithInitEntry) {
@@ -265,8 +265,8 @@
 +            EMPTY_SCALA_MAP,
 +            Option.empty(),
 +            false,
-+            scala.Function1$.MODULE$.<PersistedMetadata, Option<PersistedMetadata>>const$(
-+                Option.empty()),
++            null, // consecutiveSchemaChangesMerger — unused since
++            // mergeConsecutiveSchemaChanges=false
 +            true);
 +    if (seedWithDefaultEntry) {
 +      PersistedMetadata entry =

Reproduce locally: git range-diff 2490e84..cdae0bb 2490e84..ab61d5e | Disable: git config gitstack.push-range-diff false

@PorridgeSwim
Copy link
Copy Markdown
Collaborator Author

Range-diff: stack/RefactorDeltaSourceMetadataEvolutionSupport (ab61d5e -> be6d89d)
spark/v2/src/main/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandler.java
@@ -3,7 +3,7 @@
 --- /dev/null
 +++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandler.java
 +/*
-+ * Copyright (2025) The Delta Lake Project Authors.
++ * Copyright (2026) The Delta Lake Project Authors.
 + *
 + * Licensed under the Apache License, Version 2.0 (the "License");
 + * you may not use this file except in compliance with the License.

Reproduce locally: git range-diff 2490e84..ab61d5e 9491023..be6d89d | Disable: git config gitstack.push-range-diff false

@PorridgeSwim PorridgeSwim marked this pull request as ready for review April 15, 2026 22:27
@PorridgeSwim PorridgeSwim self-assigned this Apr 15, 2026
@PorridgeSwim PorridgeSwim changed the title Implement MetadataEvolutionHandler in v2 [kernel-spark] Implement MetadataEvolutionHandler in v2 Apr 15, 2026
@PorridgeSwim PorridgeSwim force-pushed the stack/MetadataEvolutionHandler2 branch from be6d89d to e7376d3 Compare April 24, 2026 23:17
@PorridgeSwim
Copy link
Copy Markdown
Collaborator Author

Range-diff: stack/RefactorDeltaSourceMetadataEvolutionSupport (be6d89d -> e7376d3)
spark/v2/src/main/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandler.java
@@ -25,8 +25,8 @@
 +import io.delta.kernel.internal.actions.Protocol;
 +import io.delta.kernel.internal.util.Utils;
 +import io.delta.kernel.utils.CloseableIterator;
-+import io.delta.spark.internal.v2.adapters.SparkMetadataAdapter;
-+import io.delta.spark.internal.v2.adapters.SparkProtocolAdapter;
++import io.delta.spark.internal.v2.adapters.KernelMetadataAdapter;
++import io.delta.spark.internal.v2.adapters.KernelProtocolAdapter;
 +import io.delta.spark.internal.v2.snapshot.DeltaSnapshotManager;
 +import io.delta.spark.internal.v2.utils.ScalaUtils;
 +import io.delta.spark.internal.v2.utils.StreamingHelper;
@@ -267,8 +267,8 @@
 +        PersistedMetadata.apply(
 +            tableId,
 +            version,
-+            new SparkMetadataAdapter(metadataToUse),
-+            new SparkProtocolAdapter(protocolToUse),
++            new KernelMetadataAdapter(metadataToUse),
++            new KernelProtocolAdapter(protocolToUse),
 +            metadataPath);
 +
 +    if (replace) {
@@ -320,8 +320,8 @@
 +        PersistedMetadata.apply(
 +            tableId,
 +            version,
-+            new SparkMetadataAdapter(metadata),
-+            new SparkProtocolAdapter(protocol),
++            new KernelMetadataAdapter(metadata),
++            new KernelProtocolAdapter(protocol),
 +            metadataPath);
 +    metadataTrackingLog.get().writeNewMetadata(newMetadata, false);
 +
@@ -344,11 +344,11 @@
 +      Metadata newMetadata, Protocol newProtocol, long newSchemaVersion) {
 +    Option<AbstractMetadata> metadataOpt =
 +        newMetadata != null
-+            ? Option.apply((AbstractMetadata) new SparkMetadataAdapter(newMetadata))
++            ? Option.apply((AbstractMetadata) new KernelMetadataAdapter(newMetadata))
 +            : Option.empty();
 +    Option<AbstractProtocol> protocolOpt =
 +        newProtocol != null
-+            ? Option.apply((AbstractProtocol) new SparkProtocolAdapter(newProtocol))
++            ? Option.apply((AbstractProtocol) new KernelProtocolAdapter(newProtocol))
 +            : Option.empty();
 +    Option<PersistedMetadata> persistedOpt =
 +        persistedMetadataAtSourceInit != null
@@ -361,7 +361,7 @@
 +            protocolOpt,
 +            newSchemaVersion,
 +            persistedOpt,
-+            new SparkProtocolAdapter(readProtocolAtSourceInit),
++            new KernelProtocolAdapter(readProtocolAtSourceInit),
 +            readSchemaAtSourceInit,
 +            readPartitionSchemaAtSourceInit,
 +            ScalaUtils.toScalaMap(readConfigurationsAtSourceInit),
@@ -414,14 +414,14 @@
 +    Metadata mostRecentMetadataChange =
 +        metadataChanges.isEmpty() ? null : metadataChanges.get(metadataChanges.size() - 1);
 +    if (mostRecentMetadataChange != null) {
-+      SparkMetadataAdapter mostRecentAdapter = new SparkMetadataAdapter(mostRecentMetadataChange);
++      KernelMetadataAdapter mostRecentAdapter = new KernelMetadataAdapter(mostRecentMetadataChange);
 +      // Validate startMetadata + all intermediate changes against the most recent
 +      List<Metadata> otherMetadataChanges = new ArrayList<>();
 +      otherMetadataChanges.add(startMetadata);
 +      otherMetadataChanges.addAll(metadataChanges.subList(0, metadataChanges.size() - 1));
 +      for (Metadata potentialSchemaChangeMetadata : otherMetadataChanges) {
-+        SparkMetadataAdapter potentialAdapter =
-+            new SparkMetadataAdapter(potentialSchemaChangeMetadata);
++        KernelMetadataAdapter potentialAdapter =
++            new KernelMetadataAdapter(potentialSchemaChangeMetadata);
 +        if (!DeltaColumnMapping$.MODULE$.hasNoColumnMappingSchemaChanges(
 +                mostRecentAdapter, potentialAdapter, false)
 +            || !SchemaUtils$.MODULE$.isReadCompatible(
spark/v2/src/main/java/io/delta/spark/internal/v2/utils/StreamingHelper.java
@@ -20,9 +20,9 @@
 +    return Optional.ofNullable(protocol);
 +  }
 +
-   /**
-    * Gets commit-level actions from a commit range without requiring a snapshot at the exact start
-    * version.
+   /** Get CommitInfo action from a batch at the specified row, if present. */
+   public static Optional<CommitInfo> getCommitInfo(ColumnarBatch columnarBatch, int rowId) {
+     int commitInfoIdx =
     * metadata action. Throws an exception if multiple metadata actions are found in the same commit.
     *
     * @param startVersion the starting version (inclusive) of the commit range
spark/v2/src/test/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandlerTest.java
@@ -39,8 +39,8 @@
 +import io.delta.kernel.types.StructType;
 +import io.delta.kernel.utils.CloseableIterator;
 +import io.delta.spark.internal.v2.DeltaV2TestBase;
-+import io.delta.spark.internal.v2.adapters.SparkMetadataAdapter;
-+import io.delta.spark.internal.v2.adapters.SparkProtocolAdapter;
++import io.delta.spark.internal.v2.adapters.KernelMetadataAdapter;
++import io.delta.spark.internal.v2.adapters.KernelProtocolAdapter;
 +import io.delta.spark.internal.v2.exception.VersionNotFoundException;
 +import io.delta.spark.internal.v2.snapshot.DeltaSnapshotManager;
 +import io.delta.spark.internal.v2.snapshot.PathBasedSnapshotManager;
@@ -168,7 +168,7 @@
 +  private MetadataEvolutionHandler buildLightweightHandler(
 +      Option<DeltaSourceMetadataTrackingLog> trackingLog,
 +      DeltaStreamUtils.SchemaReadOptions readOptions) {
-+    SparkMetadataAdapter adapter = new SparkMetadataAdapter(DEFAULT_METADATA);
++    KernelMetadataAdapter adapter = new KernelMetadataAdapter(DEFAULT_METADATA);
 +    return new MetadataEvolutionHandler(
 +        spark,
 +        "test-table-id",
@@ -202,7 +202,7 @@
 +    SnapshotImpl snapshot = (SnapshotImpl) snapshotManager.loadSnapshotAt(initVersion);
 +    Metadata tableMetadata = snapshot.getMetadata();
 +    Protocol tableProtocol = snapshot.getProtocol();
-+    SparkMetadataAdapter adapter = new SparkMetadataAdapter(tableMetadata);
++    KernelMetadataAdapter adapter = new KernelMetadataAdapter(tableMetadata);
 +
 +    String metadataLogPath = tablePath + "/metadata_log";
 +    DeltaSourceMetadataTrackingLog trackingLog =
@@ -224,7 +224,7 @@
 +              "test-table-id",
 +              initVersion,
 +              adapter,
-+              new SparkProtocolAdapter(tableProtocol),
++              new KernelProtocolAdapter(tableProtocol),
 +              tablePath + "/_delta_log/_streaming_metadata");
 +      trackingLog.writeNewMetadata(entry, false);
 +    }
@@ -273,8 +273,8 @@
 +          PersistedMetadata.apply(
 +              "test-table-id",
 +              0L,
-+              new SparkMetadataAdapter(DEFAULT_METADATA),
-+              new SparkProtocolAdapter(DEFAULT_PROTOCOL),
++              new KernelMetadataAdapter(DEFAULT_METADATA),
++              new KernelProtocolAdapter(DEFAULT_PROTOCOL),
 +              "/tmp/fake-table/_delta_log/_streaming_metadata");
 +      trackingLog.writeNewMetadata(entry, false);
 +    }

Reproduce locally: git range-diff 9491023..be6d89d 44b86f0..e7376d3 | Disable: git config gitstack.push-range-diff false

@PorridgeSwim PorridgeSwim force-pushed the stack/MetadataEvolutionHandler2 branch from e7376d3 to aee34e1 Compare April 29, 2026 17:49
@PorridgeSwim
Copy link
Copy Markdown
Collaborator Author

Range-diff: stack/RefactorDeltaSourceMetadataEvolutionSupport (9a48144 -> 696ce03)
spark/v2/src/main/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandler.java
@@ -57,14 +57,23 @@
 +import scala.collection.immutable.Seq$;
 +
 +/**
-+ * Handles metadata and schema evolution for the v2 Delta streaming source.
++ * V2 port of V1's {@code DeltaSourceMetadataEvolutionSupport} trait. Handles metadata evolution
++ * (schema, table configuration, or protocol changes) for the v2 Delta streaming source.
 + *
-+ * <p>This is the v2 counterpart of the v1 {@code DeltaSourceMetadataEvolutionSupport} trait. It
-+ * manages the metadata tracking log, detects metadata/protocol changes during streaming, and
-+ * implements the two-barrier offset protocol to ensure safe schema evolution.
++ * <p>To safely evolve schema mid-stream, this class intercepts streaming at several stages to:
 + *
-+ * <p>Delegates to static utilities in {@code DeltaSourceMetadataEvolutionSupport} for validation
-+ * logic that is shared between v1 and v2.
++ * <ol>
++ *   <li>Capture metadata changes within a stream.
++ *   <li>Stop {@code latestOffset} from crossing a metadata change boundary.
++ *   <li>Ensure the batch prior to the change can still be served correctly.
++ *   <li>Fail the stream if and only if the prior batch was served successfully.
++ *   <li>Write the new metadata to the tracking log before the stream fails so the restarted stream
++ *       picks up the updated schema.
++ * </ol>
++ *
++ * <p>See V1's trait doc for the full barrier protocol details. Validation logic shared with v1
++ * lives in the {@code DeltaSourceMetadataEvolutionSupport} companion object; this class delegates
++ * to those static utilities.
 + */
 +public class MetadataEvolutionHandler {
 +
@@ -126,15 +135,23 @@
 +            : null;
 +  }
 +
-+  /** Whether this source should use schema tracking for metadata evolution. */
++  /**
++   * Whether this source uses the metadata tracking log as its read schema. False when the log is
++   * absent/empty or unsafe column-mapping reads are enabled.
++   *
++   * <p>V2 port of V1's {@code DeltaSourceMetadataEvolutionSupport.shouldTrackMetadataChange}.
++   */
 +  public boolean shouldTrackMetadataChange() {
 +    return DeltaSourceMetadataEvolutionSupport$.MODULE$.shouldTrackMetadataChange(
 +        schemaReadOptions, metadataTrackingLog);
 +  }
 +
 +  /**
-+   * Whether the tracking log should be initialized eagerly. This is true when the log is provided
-+   * but empty. Should only be used for the first write to the schema log.
++   * Whether the tracking log is provided but still empty, so it should be initialized eagerly on
++   * the first batch. Should only be consulted before the first write to the log.
++   *
++   * <p>V2 port of V1's {@code
++   * DeltaSourceMetadataEvolutionSupport.shouldInitializeMetadataTrackingEagerly}.
 +   */
 +  public boolean shouldInitializeMetadataTrackingEagerly() {
 +    return DeltaSourceMetadataEvolutionSupport$.MODULE$.shouldInitializeMetadataTrackingEagerly(
@@ -174,10 +191,11 @@
 +  }
 +
 +  /**
-+   * If the version has a metadata or protocol change compared to the current stream metadata,
-+   * return an iterator with a single sentinel IndexedFile at METADATA_CHANGE_INDEX to act as a
-+   * barrier. Otherwise, return an empty iterator. The caller concatenates this into the file change
-+   * stream.
++   * Returns a single barrier {@link IndexedFile} at {@code METADATA_CHANGE_INDEX} when tracking is
++   * on and the given metadata/protocol differ from the init state; empty otherwise.
++   *
++   * <p>V2 port of V1's {@code
++   * DeltaSourceMetadataEvolutionSupport.getMetadataOrProtocolChangeIndexedFileIterator}.
 +   */
 +  public CloseableIterator<IndexedFile> getMetadataOrProtocolChangeIndexedFileIterator(
 +      Metadata metadata, Protocol protocol, long version) {
@@ -192,15 +210,13 @@
 +  }
 +
 +  /**
-+   * Handle pending schema change offsets. Implements the two-barrier protocol:
-+   *
-+   * <ul>
-+   *   <li>If previous offset is at METADATA_CHANGE_INDEX, advance to POST_METADATA_CHANGE_INDEX
-+   *   <li>If previous offset is at POST_METADATA_CHANGE_INDEX and schema evolution hasn't happened
-+   *       yet, block by returning the same offset
-+   * </ul>
++   * Drives the two-barrier protocol when the previous offset sits on a barrier: advances {@code
++   * METADATA_CHANGE_INDEX} to {@code POST_METADATA_CHANGE_INDEX}, blocks at {@code
++   * POST_METADATA_CHANGE_INDEX} if the change is still pending, or returns null when there is no
++   * pending schema change.
 +   *
-+   * @return the next offset if a schema change is pending, empty otherwise
++   * <p>V2 port of V1's {@code
++   * DeltaSourceMetadataEvolutionSupport.getNextOffsetFromPreviousOffsetIfPendingSchemaChange}.
 +   */
 +  public DeltaSourceOffset getNextOffsetFromPreviousOffsetIfPendingSchemaChange(
 +      DeltaSourceOffset previousOffset) {
@@ -230,8 +246,12 @@
 +  // ---------------------------------------------------------------------------
 +
 +  /**
-+   * Called from commit(). If the committed offset is a schema change barrier, write the new
-+   * metadata to the tracking log and fail the stream to trigger re-analysis.
++   * Called from {@code commit()}: when the committed offset is a schema-change barrier, writes the
++   * new metadata to the tracking log and fails the stream to trigger restart under the new schema.
++   * No-op for non-barrier offsets or when tracking is disabled.
++   *
++   * <p>V2 port of V1's {@code
++   * DeltaSourceMetadataEvolutionSupport.updateMetadataTrackingLogAndFailTheStreamIfNeeded(Offset)}.
 +   */
 +  public void updateMetadataTrackingLogAndFailTheStreamIfNeeded(DeltaSourceOffset offset) {
 +    if (!shouldTrackMetadataChange()) {
@@ -250,8 +270,13 @@
 +  }
 +
 +  /**
-+   * Write new metadata into the tracking log and fail the stream if there are changes compared to
-+   * the current stream metadata.
++   * Writes the changed metadata/protocol to the tracking log at {@code version} and throws to fail
++   * the stream. No-op when the change matches the current init state. With {@code replace=true},
++   * the new entry logically replaces the current latest entry instead of being appended.
++   *
++   * <p>V2 port of V1's {@code
++   * DeltaSourceMetadataEvolutionSupport.updateMetadataTrackingLogAndFailTheStreamIfNeeded(Option,
++   * Option, Long, Boolean)}.
 +   */
 +  public void updateMetadataTrackingLogAndFailTheStreamIfNeeded(
 +      Metadata changedMetadata, Protocol changedProtocol, long version, boolean replace) {
@@ -289,12 +314,13 @@
 +  // ---------------------------------------------------------------------------
 +
 +  /**
-+   * Initialize the metadata tracking log on the first batch. Validates that the metadata across the
-+   * version range is safe to read, then writes the initial entry.
++   * Initializes an empty tracking log on the first batch with the metadata at {@code
++   * batchStartVersion}, or — when {@code batchEndVersion} is given for an already-constructed batch
++   * — the most-recent compatible metadata in {@code [start, end]}. Throws to fail the stream if the
++   * initialized metadata differs from init or {@code alwaysFailUponLogInitialized} is set.
 +   *
-+   * @param batchStartVersion start version of the batch
-+   * @param batchEndVersion optional end version (for constructed batches with existing end offset)
-+   * @param alwaysFailUponLogInitialized whether to always fail with schema evolution exception
++   * <p>V2 port of V1's {@code
++   * DeltaSourceMetadataEvolutionSupport.initializeMetadataTrackingAndExitStream}.
 +   */
 +  public void initializeMetadataTrackingAndExitStream(
 +      long batchStartVersion, Long batchEndVersion, boolean alwaysFailUponLogInitialized) {
@@ -391,15 +417,8 @@
 +  }
 +
 +  /**
-+   * Given the version range for an ALREADY fetched batch, check if there are any read-incompatible
-+   * schema changes or protocol changes.
-+   *
-+   * <p>Try to find rename or drop columns in between, or nullability/datatype changes by using the
-+   * last schema as the read schema. If so, we cannot find a good read schema. Otherwise, the most
-+   * recent metadata change will be the most encompassing schema as well.
-+   *
-+   * <p>For protocols, walk through changes and ensure each is a superset of the previous. If not,
-+   * we cannot find a safe protocol.
++   * V2 port of V1's {@code
++   * DeltaSourceMetadataEvolutionSupport.validateAndResolveMetadataForLogInitialization}.
 +   */
 +  private ValidatedMetadataAndProtocol validateAndResolveMetadataForLogInitialization(
 +      long startVersion, long endVersion) {
spark/v2/src/main/java/io/delta/spark/internal/v2/utils/StreamingHelper.java
@@ -23,31 +23,73 @@
    /** Get CommitInfo action from a batch at the specified row, if present. */
    public static Optional<CommitInfo> getCommitInfo(ColumnarBatch columnarBatch, int rowId) {
      int commitInfoIdx =
-    * metadata action. Throws an exception if multiple metadata actions are found in the same commit.
+   }
+ 
+   /**
+-   * Collects metadata actions from a commit range, mapping each version to its metadata.
++   * Collects {@link Metadata} actions from commits in {@code [startVersion, endVersionOpt]}
++   * (inclusive on both ends).
+    *
+-   * <p>This method is "unsafe" because it uses {@code getActionsFromRangeUnsafe()} which bypasses
+-   * the standard snapshot requirement for protocol validation.
++   * <p><b>Returns:</b> a {@link LinkedHashMap} from commit version to its metadata action,
++   * preserving ascending version order. Versions with no metadata action are omitted.
+    *
+-   * <p>Returns a map preserving version order (via LinkedHashMap) where each version maps to its
+-   * metadata action. Throws an exception if multiple metadata actions are found in the same commit.
++   * <p><b>Throws:</b> {@link IllegalArgumentException} if a single commit contains more than one
++   * metadata action; {@link RuntimeException} on underlying I/O errors.
     *
-    * @param startVersion the starting version (inclusive) of the commit range
+-   * @param startVersion the starting version (inclusive) of the commit range
 -   * @param endVersionOpt optional ending version (exclusive) of the commit range
-+   * @param endVersionOpt optional ending version (inclusive) of the commit range
-    * @param snapshotManager the Delta snapshot manager
-    * @param engine the Delta engine
-    * @param tablePath the path to the Delta table
-     return versionToMetadata;
-   }
- 
+-   * @param snapshotManager the Delta snapshot manager
+-   * @param engine the Delta engine
+-   * @param tablePath the path to the Delta table
+-   * @return a map from version number to metadata action, in version order
++   * <p><b>Unsafe:</b> bypasses the snapshot-based protocol validation that {@link
++   * io.delta.kernel.CommitRange#getCommitActions} would normally perform. Callers are responsible
++   * for ensuring protocol compatibility.
++   *
++   * @param startVersion inclusive starting version of the commit range
++   * @param endVersionOpt inclusive ending version, or empty to read through the latest
++   * @param snapshotManager snapshot manager backing the table
++   * @param engine Delta kernel engine
++   * @param tablePath path to the Delta table
+    */
+   public static Map<Long, Metadata> collectMetadataActionsFromRangeUnsafe(
+       long startVersion,
+       DeltaSnapshotManager snapshotManager,
+       Engine engine,
+       String tablePath) {
++    return collectActionsFromRangeUnsafe(
++        startVersion,
++        endVersionOpt,
++        snapshotManager,
++        engine,
++        tablePath,
++        DeltaLogActionUtils.DeltaAction.METADATA,
++        StreamingHelper::getMetadata);
++  }
++
 +  /**
-+   * Collects protocol actions from a commit range, mapping each version to its protocol.
++   * Collects {@link Protocol} actions from commits in {@code [startVersion, endVersionOpt]}
++   * (inclusive on both ends).
 +   *
-+   * <p>This method mirrors {@link #collectMetadataActionsFromRangeUnsafe} but for protocol actions.
++   * <p><b>Returns:</b> a {@link LinkedHashMap} from commit version to its protocol action,
++   * preserving ascending version order. Versions with no protocol action are omitted.
 +   *
-+   * <p>Returns a map preserving version order (via LinkedHashMap) where each version maps to its
-+   * protocol action. Throws an exception if multiple protocol actions are found in the same commit.
++   * <p><b>Throws:</b> {@link IllegalArgumentException} if a single commit contains more than one
++   * protocol action; {@link RuntimeException} on underlying I/O errors.
 +   *
-+   * @param startVersion the starting version (inclusive) of the commit range
-+   * @param endVersionOpt optional ending version (inclusive) of the commit range
-+   * @param snapshotManager the Delta snapshot manager
-+   * @param engine the Delta engine
-+   * @param tablePath the path to the Delta table
-+   * @return a map from version number to protocol action, in version order
++   * <p><b>Unsafe:</b> bypasses the snapshot-based protocol validation that {@link
++   * io.delta.kernel.CommitRange#getCommitActions} would normally perform. Callers are responsible
++   * for ensuring protocol compatibility.
++   *
++   * @param startVersion inclusive starting version of the commit range
++   * @param endVersionOpt inclusive ending version, or empty to read through the latest
++   * @param snapshotManager snapshot manager backing the table
++   * @param engine Delta kernel engine
++   * @param tablePath path to the Delta table
 +   */
 +  public static Map<Long, Protocol> collectProtocolActionsFromRangeUnsafe(
 +      long startVersion,
@@ -55,45 +97,75 @@
 +      DeltaSnapshotManager snapshotManager,
 +      Engine engine,
 +      String tablePath) {
-+    CommitRangeImpl commitRange =
-+        (CommitRangeImpl) snapshotManager.getTableChanges(engine, startVersion, endVersionOpt);
-+    Map<Long, Protocol> versionToProtocol = new LinkedHashMap<>();
++    return collectActionsFromRangeUnsafe(
++        startVersion,
++        endVersionOpt,
++        snapshotManager,
++        engine,
++        tablePath,
++        DeltaLogActionUtils.DeltaAction.PROTOCOL,
++        StreamingHelper::getProtocol);
++  }
++
++  /** Extracts an action of type {@code T} from a single row of a {@link ColumnarBatch}. */
++  @FunctionalInterface
++  private interface RowExtractor<T> {
++    Optional<T> extract(ColumnarBatch batch, int rowId);
++  }
 +
-+    try (CloseableIterator<CommitActions> commitsIter =
-+        getCommitActionsFromRangeUnsafe(
-+            engine, commitRange, tablePath, Set.of(DeltaLogActionUtils.DeltaAction.PROTOCOL))) {
-+      while (commitsIter.hasNext()) {
-+        try (CommitActions commit = commitsIter.next()) {
-+          long version = commit.getVersion();
-+          try (CloseableIterator<ColumnarBatch> actionsIter = commit.getActions()) {
-+            while (actionsIter.hasNext()) {
++  /**
++   * Shared implementation for {@link #collectMetadataActionsFromRangeUnsafe} and {@link
++   * #collectProtocolActionsFromRangeUnsafe}: walks the commit range filtered to {@code actionType},
++   * applies {@code extractor} per row, and rejects commits with more than one matching action.
++   */
++  private static <T> Map<Long, T> collectActionsFromRangeUnsafe(
++      long startVersion,
++      Optional<Long> endVersionOpt,
++      DeltaSnapshotManager snapshotManager,
++      Engine engine,
++      String tablePath,
++      DeltaLogActionUtils.DeltaAction actionType,
++      RowExtractor<T> extractor) {
+     CommitRangeImpl commitRange =
+         (CommitRangeImpl) snapshotManager.getTableChanges(engine, startVersion, endVersionOpt);
+     // LinkedHashMap to preserve insertion order
+-    Map<Long, Metadata> versionToMetadata = new LinkedHashMap<>();
++    Map<Long, T> versionToAction = new LinkedHashMap<>();
+ 
+     try (CloseableIterator<CommitActions> commitsIter =
+-        getCommitActionsFromRangeUnsafe(
+-            engine, commitRange, tablePath, Set.of(DeltaLogActionUtils.DeltaAction.METADATA))) {
++        getCommitActionsFromRangeUnsafe(engine, commitRange, tablePath, Set.of(actionType))) {
+       while (commitsIter.hasNext()) {
+         try (CommitActions commit = commitsIter.next()) {
+           long version = commit.getVersion();
+           try (CloseableIterator<ColumnarBatch> actionsIter = commit.getActions()) {
+             while (actionsIter.hasNext()) {
+-              ColumnarBatch columnarBatch = actionsIter.next();
+-              int numRows = columnarBatch.getSize();
 +              ColumnarBatch batch = actionsIter.next();
 +              int numRows = batch.getSize();
-+              for (int rowId = 0; rowId < numRows; rowId++) {
-+                Optional<Protocol> protocolOpt = StreamingHelper.getProtocol(batch, rowId);
-+                if (protocolOpt.isPresent()) {
-+                  Protocol existing = versionToProtocol.putIfAbsent(version, protocolOpt.get());
-+                  Preconditions.checkArgument(
-+                      existing == null,
-+                      "Should not encounter two protocol actions in the same commit of version %d",
-+                      version);
-+                }
-+              }
-+            }
-+          } catch (IOException e) {
-+            throw new RuntimeException("Failed to process commit at version " + version, e);
-+          }
-+        }
-+      }
-+    } catch (RuntimeException e) {
-+      throw e;
-+    } catch (Exception e) {
-+      throw new RuntimeException("Failed to process commits", e);
-+    }
-+
-+    return versionToProtocol;
-+  }
-+
-   /** Get explicit CDC file (AddCDCFile) from a batch at the specified row, if present. */
-   public static Optional<CDCDataFile> getCDCFile(
-       ColumnarBatch batch, int rowId, long commitTimestamp) {
\ No newline at end of file
+               for (int rowId = 0; rowId < numRows; rowId++) {
+-                Optional<Metadata> metadataOpt = StreamingHelper.getMetadata(columnarBatch, rowId);
+-                if (metadataOpt.isPresent()) {
+-                  Metadata existing = versionToMetadata.putIfAbsent(version, metadataOpt.get());
++                Optional<T> actionOpt = extractor.extract(batch, rowId);
++                if (actionOpt.isPresent()) {
++                  T existing = versionToAction.putIfAbsent(version, actionOpt.get());
+                   Preconditions.checkArgument(
+                       existing == null,
+-                      "Should not encounter two metadata actions in the same commit of version %d",
++                      "Should not encounter two %s actions in the same commit of version %d",
++                      actionType.colName,
+                       version);
+                 }
+               }
+       // CommitActions.close() throws Exception
+       throw new RuntimeException("Failed to process commits", e);
+     }
+-
+-    return versionToMetadata;
++    return versionToAction;
+   }
+ 
+   /** Get explicit CDC file (AddCDCFile) from a batch at the specified row, if present. */
\ No newline at end of file
spark/v2/src/test/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandlerTest.java
@@ -635,27 +635,52 @@
 +    assertNull(handler.getNextOffsetFromPreviousOffsetIfPendingSchemaChange(regularOffset));
 +  }
 +
-+  /**
-+   * At POST_METADATA_CHANGE_INDEX, if the metadata at that version hasn't actually changed compared
-+   * to init (e.g. a no-op metadata action), the handler returns null to unblock. Requires a real
-+   * table because this path calls collectMetadataAtVersion internally.
-+   */
++  /** At POST_BARRIER, returns null when metadata at that version matches init (unblocks stream). */
 +  @Test
 +  public void testPendingSchemaChange_unblockWhenNoActualChange(@TempDir File tempDir) {
 +    String tablePath = tempDir.getAbsolutePath();
 +    String tableName = "t_" + UUID.randomUUID().toString().replace('-', '_');
 +    createEmptyTestTable(tablePath, tableName);
 +
-+    HandlerWithLog hwl =
++    HandlerWithLog handlerWithLog =
 +        buildHandlerWithRealTable(tablePath, 0L, /* seedLogWithInitEntry= */ false);
 +
 +    DeltaSourceOffset postBarrierOffset =
 +        DeltaSourceOffset.apply(
 +            "test-reservoir", 0L, DeltaSourceOffset.POST_METADATA_CHANGE_INDEX(), false);
 +    // Version 0 has the same metadata as init -> unblocks
-+    assertNull(hwl.handler.getNextOffsetFromPreviousOffsetIfPendingSchemaChange(postBarrierOffset));
++    assertNull(
++        handlerWithLog.handler.getNextOffsetFromPreviousOffsetIfPendingSchemaChange(
++            postBarrierOffset));
 +  }
 +
++  /** At POST_BARRIER with a still-pending schema change, returns the same offset to block. */
++  @Test
++  public void testPendingSchemaChange_blocksAtPostBarrierWhenChangeStillPending(
++      @TempDir File tempDir) {
++    String tablePath = tempDir.getAbsolutePath();
++    String tableName = "t_" + UUID.randomUUID().toString().replace('-', '_');
++    createEmptyTestTable(tablePath, tableName);
++
++    // Handler captures version 0 as init, with a seed log entry at v0
++    HandlerWithLog handlerWithLog =
++        buildHandlerWithRealTable(tablePath, 0L, /* seedLogWithInitEntry= */ true);
++
++    // Evolve at v1 -> schema at v1 differs from init
++    spark.sql(String.format("ALTER TABLE %s ADD COLUMNS (c3 INT)", tableName));
++
++    DeltaSourceOffset postBarrierAtVersion1 =
++        DeltaSourceOffset.apply(
++            "test-reservoir", 1L, DeltaSourceOffset.POST_METADATA_CHANGE_INDEX(), false);
++    DeltaSourceOffset result =
++        handlerWithLog.handler.getNextOffsetFromPreviousOffsetIfPendingSchemaChange(
++            postBarrierAtVersion1);
++    assertNotNull(result);
++    assertEquals(postBarrierAtVersion1.reservoirVersion(), result.reservoirVersion());
++    assertEquals(postBarrierAtVersion1.index(), result.index());
++    assertEquals(postBarrierAtVersion1.reservoirId(), result.reservoirId());
++  }
++
 +  // ---------------------------------------------------------------------------
 +  // updateMetadataTrackingLogAndFailTheStreamIfNeeded
 +  //
@@ -682,8 +707,8 @@
 +  }
 +
 +  /**
-+   * Offset-based overload: when the barrier version has a schema change (via ALTER TABLE), the
-+   * handler reads the changed metadata from the log, writes it, and throws.
++   * Offset overload: when the barrier version has a schema change (via ALTER TABLE), reads the
++   * changed metadata from the log, writes it, and throws.
 +   */
 +  @Test
 +  public void testUpdateLog_throwsWhenBarrierVersionHasSchemaChange(@TempDir File tempDir) {
@@ -691,7 +716,8 @@
 +    String tableName = "t_" + UUID.randomUUID().toString().replace('-', '_');
 +    createEmptyTestTable(tablePath, tableName);
 +
-+    HandlerWithLog hwl = buildHandlerWithRealTable(tablePath, 0L, /* seedLogWithInitEntry= */ true);
++    HandlerWithLog handlerWithLog =
++        buildHandlerWithRealTable(tablePath, 0L, /* seedLogWithInitEntry= */ true);
 +
 +    // Evolve the table schema: version 1 has a new column
 +    spark.sql(String.format("ALTER TABLE %s ADD COLUMNS (c3 INT)", tableName));
@@ -703,20 +729,23 @@
 +    DeltaRuntimeException ex =
 +        assertThrows(
 +            DeltaRuntimeException.class,
-+            () -> hwl.handler.updateMetadataTrackingLogAndFailTheStreamIfNeeded(barrierAtVersion1));
++            () ->
++                handlerWithLog.handler.updateMetadataTrackingLogAndFailTheStreamIfNeeded(
++                    barrierAtVersion1));
 +    assertMetadataEvolutionException(ex, "when schema changed at barrier version");
 +
 +    // Verify the new entry was persisted at version 1
-+    assertTrue(hwl.trackingLog.getCurrentTrackedMetadata().isDefined());
-+    assertEquals(1L, hwl.trackingLog.getCurrentTrackedMetadata().get().deltaCommitVersion());
++    assertTrue(handlerWithLog.trackingLog.getCurrentTrackedMetadata().isDefined());
++    assertEquals(
++        1L, handlerWithLog.trackingLog.getCurrentTrackedMetadata().get().deltaCommitVersion());
 +  }
 +
 +  /**
-+   * 4-arg overload: directly passing a changed protocol throws and writes the new entry to the
-+   * tracking log at the specified version.
++   * Direct overload: passing a changed protocol throws and writes the new entry to the tracking log
++   * at the specified version.
 +   */
 +  @Test
-+  public void testUpdateLog_4arg_throwsAndWritesEntryOnProtocolChange(@TempDir File tempDir) {
++  public void testUpdateLog_throwsAndWritesEntryOnDirectProtocolChange(@TempDir File tempDir) {
 +    DeltaSourceMetadataTrackingLog seededLog =
 +        createStandaloneTrackingLog(tempDir, /* seedWithDefaultEntry= */ true);
 +    MetadataEvolutionHandler handler =
@@ -737,9 +766,9 @@
 +    assertEquals(5L, seededLog.getCurrentTrackedMetadata().get().deltaCommitVersion());
 +  }
 +
-+  /** 4-arg overload: no-op when metadata and protocol match the init state. */
++  /** Direct overload: no-op when metadata and protocol match the init state. */
 +  @Test
-+  public void testUpdateLog_4arg_noOpWhenMetadataAndProtocolUnchanged(@TempDir File tempDir) {
++  public void testUpdateLog_noOpOnUnchangedDirectMetadataAndProtocol(@TempDir File tempDir) {
 +    DeltaSourceMetadataTrackingLog seededLog =
 +        createStandaloneTrackingLog(tempDir, /* seedWithDefaultEntry= */ true);
 +    MetadataEvolutionHandler handler =
@@ -750,6 +779,62 @@
 +        DEFAULT_METADATA, DEFAULT_PROTOCOL, 1L, /* replace= */ false);
 +  }
 +
++  /**
++   * With replace=true, the new entry's previousMetadataSeqNum is set so getPreviousTrackedMetadata
++   * returns empty (logically replacing the seed).
++   */
++  @Test
++  public void testUpdateLog_replaceTrueClearsPreviousEntry(@TempDir File tempDir) {
++    DeltaSourceMetadataTrackingLog seededLog =
++        createStandaloneTrackingLog(tempDir, /* seedWithDefaultEntry= */ true);
++    MetadataEvolutionHandler handler =
++        buildLightweightHandler(
++            Option.apply(seededLog), schemaReadOptions(/* allowUnsafeColumnMappingRead= */ false));
++
++    Protocol upgradedProtocol = new Protocol(3, 7);
++    DeltaRuntimeException ex =
++        assertThrows(
++            DeltaRuntimeException.class,
++            () ->
++                handler.updateMetadataTrackingLogAndFailTheStreamIfNeeded(
++                    null, upgradedProtocol, 5L, /* replace= */ true));
++    assertMetadataEvolutionException(ex, "when protocol changed with replace=true");
++
++    assertTrue(seededLog.getCurrentTrackedMetadata().isDefined());
++    assertEquals(5L, seededLog.getCurrentTrackedMetadata().get().deltaCommitVersion());
++    // The seed at v0 is logically replaced -> previous tracked is empty.
++    assertTrue(seededLog.getPreviousTrackedMetadata().isEmpty());
++  }
++
++  /** Throw-and-write also fires at POST_METADATA_CHANGE_INDEX, not just METADATA_CHANGE_INDEX. */
++  @Test
++  public void testUpdateLog_throwsAtPostBarrierIndex(@TempDir File tempDir) {
++    String tablePath = tempDir.getAbsolutePath();
++    String tableName = "t_" + UUID.randomUUID().toString().replace('-', '_');
++    createEmptyTestTable(tablePath, tableName);
++
++    HandlerWithLog handlerWithLog =
++        buildHandlerWithRealTable(tablePath, 0L, /* seedLogWithInitEntry= */ true);
++
++    spark.sql(String.format("ALTER TABLE %s ADD COLUMNS (c3 INT)", tableName));
++
++    DeltaSourceOffset postBarrierAtVersion1 =
++        DeltaSourceOffset.apply(
++            "test-reservoir", 1L, DeltaSourceOffset.POST_METADATA_CHANGE_INDEX(), false);
++
++    DeltaRuntimeException ex =
++        assertThrows(
++            DeltaRuntimeException.class,
++            () ->
++                handlerWithLog.handler.updateMetadataTrackingLogAndFailTheStreamIfNeeded(
++                    postBarrierAtVersion1));
++    assertMetadataEvolutionException(ex, "when committing the post-barrier with schema change");
++
++    assertTrue(handlerWithLog.trackingLog.getCurrentTrackedMetadata().isDefined());
++    assertEquals(
++        1L, handlerWithLog.trackingLog.getCurrentTrackedMetadata().get().deltaCommitVersion());
++  }
++
 +  // ---------------------------------------------------------------------------
 +  // initializeMetadataTrackingAndExitStream
 +  //
@@ -759,8 +844,8 @@
 +  // ---------------------------------------------------------------------------
 +
 +  /**
-+   * When metadata at the batch version matches the handler's init state and alwaysFail is false,
-+   * the entry is written to the log without throwing.
++   * When metadata at the batch version matches init and alwaysFail is false, the entry is written
++   * without throwing.
 +   */
 +  @Test
 +  public void testInitialize_writesEntryWithoutThrowingWhenMetadataMatches(@TempDir File tempDir) {
@@ -768,19 +853,20 @@
 +    String tableName = "t_" + UUID.randomUUID().toString().replace('-', '_');
 +    createEmptyTestTable(tablePath, tableName);
 +
-+    HandlerWithLog hwl =
++    HandlerWithLog handlerWithLog =
 +        buildHandlerWithRealTable(tablePath, 0L, /* seedLogWithInitEntry= */ false);
 +
-+    hwl.handler.initializeMetadataTrackingAndExitStream(
++    handlerWithLog.handler.initializeMetadataTrackingAndExitStream(
 +        0L, /* batchEndVersion= */ null, /* alwaysFailUponLogInitialized= */ false);
 +
-+    assertTrue(hwl.trackingLog.getCurrentTrackedMetadata().isDefined());
-+    assertEquals(0L, hwl.trackingLog.getCurrentTrackedMetadata().get().deltaCommitVersion());
++    assertTrue(handlerWithLog.trackingLog.getCurrentTrackedMetadata().isDefined());
++    assertEquals(
++        0L, handlerWithLog.trackingLog.getCurrentTrackedMetadata().get().deltaCommitVersion());
 +  }
 +
 +  /**
-+   * When alwaysFailUponLogInitialized is true, the handler throws even if the metadata matches. The
-+   * entry should still be written before throwing.
++   * When alwaysFailUponLogInitialized is true, throws even if metadata matches; entry is still
++   * written before throwing.
 +   */
 +  @Test
 +  public void testInitialize_alwaysThrowsWhenAlwaysFailFlagIsSet(@TempDir File tempDir) {
@@ -788,19 +874,19 @@
 +    String tableName = "t_" + UUID.randomUUID().toString().replace('-', '_');
 +    createEmptyTestTable(tablePath, tableName);
 +
-+    HandlerWithLog hwl =
++    HandlerWithLog handlerWithLog =
 +        buildHandlerWithRealTable(tablePath, 0L, /* seedLogWithInitEntry= */ false);
 +
 +    DeltaRuntimeException ex =
 +        assertThrows(
 +            DeltaRuntimeException.class,
 +            () ->
-+                hwl.handler.initializeMetadataTrackingAndExitStream(
++                handlerWithLog.handler.initializeMetadataTrackingAndExitStream(
 +                    0L, /* batchEndVersion= */ null, /* alwaysFailUponLogInitialized= */ true));
 +    assertMetadataEvolutionException(ex, "when alwaysFailUponLogInitialized is true");
 +
 +    // Entry should still have been written before throwing
-+    assertTrue(hwl.trackingLog.getCurrentTrackedMetadata().isDefined());
++    assertTrue(handlerWithLog.trackingLog.getCurrentTrackedMetadata().isDefined());
 +  }
 +
 +  /**
@@ -814,7 +900,7 @@
 +    createEmptyTestTable(tablePath, tableName);
 +
 +    // Handler captures version 0 as init state
-+    HandlerWithLog hwl =
++    HandlerWithLog handlerWithLog =
 +        buildHandlerWithRealTable(tablePath, 0L, /* seedLogWithInitEntry= */ false);
 +
 +    // Evolve: version 1 has a new column
@@ -824,12 +910,71 @@
 +        assertThrows(
 +            DeltaRuntimeException.class,
 +            () ->
-+                hwl.handler.initializeMetadataTrackingAndExitStream(
++                handlerWithLog.handler.initializeMetadataTrackingAndExitStream(
 +                    1L, /* batchEndVersion= */ null, /* alwaysFailUponLogInitialized= */ false));
 +    assertMetadataEvolutionException(ex, "when schema evolved since init");
 +
 +    // Entry should be written at version 1 (the evolved version)
-+    assertTrue(hwl.trackingLog.getCurrentTrackedMetadata().isDefined());
-+    assertEquals(1L, hwl.trackingLog.getCurrentTrackedMetadata().get().deltaCommitVersion());
++    assertTrue(handlerWithLog.trackingLog.getCurrentTrackedMetadata().isDefined());
++    assertEquals(
++        1L, handlerWithLog.trackingLog.getCurrentTrackedMetadata().get().deltaCommitVersion());
++  }
++
++  /**
++   * batchEndVersion overload: when there are no metadata or protocol changes in [start, end],
++   * validation succeeds and the entry is written at endVersion.
++   */
++  @Test
++  public void testInitialize_batchEndVersion_succeedsWhenNoMetadataChangeInRange(
++      @TempDir File tempDir) {
++    String tablePath = tempDir.getAbsolutePath();
++    String tableName = "t_" + UUID.randomUUID().toString().replace('-', '_');
++    createEmptyTestTable(tablePath, tableName);
++    // INSERT adds an Add action at v1; no metadata or protocol change in [0, 1]
++    spark.sql(String.format("INSERT INTO %s VALUES (1, 'Alice')", tableName));
++
++    HandlerWithLog handlerWithLog =
++        buildHandlerWithRealTable(tablePath, 0L, /* seedLogWithInitEntry= */ false);
++
++    handlerWithLog.handler.initializeMetadataTrackingAndExitStream(
++        0L, /* batchEndVersion= */ 1L, /* alwaysFailUponLogInitialized= */ false);
++
++    assertTrue(handlerWithLog.trackingLog.getCurrentTrackedMetadata().isDefined());
++    assertEquals(
++        1L, handlerWithLog.trackingLog.getCurrentTrackedMetadata().get().deltaCommitVersion());
++  }
++
++  /**
++   * batchEndVersion overload: an incompatible schema change (column drop with column mapping)
++   * within [start, end] makes validation throw
++   * DELTA_STREAMING_SCHEMA_LOG_INIT_FAILED_INCOMPATIBLE_METADATA — there's no schema we can use to
++   * safely read the constructed batch.
++   */
++  @Test
++  public void testInitialize_batchEndVersion_throwsOnIncompatibleSchemaChangeInRange(
++      @TempDir File tempDir) {
++    String tablePath = tempDir.getAbsolutePath();
++    String tableName = "t_" + UUID.randomUUID().toString().replace('-', '_');
++    // Column mapping enables logical column drops (detected as incompatible by validation)
++    spark.sql(
++        String.format(
++            "CREATE TABLE %s (id INT, name STRING, age INT) USING delta LOCATION '%s' "
++                + "TBLPROPERTIES ('delta.columnMapping.mode' = 'name')",
++            tableName, tablePath));
++    spark.sql(String.format("ALTER TABLE %s DROP COLUMN age", tableName));
++
++    HandlerWithLog handlerWithLog =
++        buildHandlerWithRealTable(tablePath, 0L, /* seedLogWithInitEntry= */ false);
++
++    DeltaRuntimeException ex =
++        assertThrows(
++            DeltaRuntimeException.class,
++            () ->
++                handlerWithLog.handler.initializeMetadataTrackingAndExitStream(
++                    0L, /* batchEndVersion= */ 1L, /* alwaysFailUponLogInitialized= */ false));
++    assertEquals(
++        "DELTA_STREAMING_SCHEMA_LOG_INIT_FAILED_INCOMPATIBLE_METADATA",
++        ex.getErrorClass(),
++        "Should throw incompatible-metadata error when range contains a column drop");
 +  }
 +}
\ No newline at end of file
spark/v2/src/test/java/io/delta/spark/internal/v2/utils/StreamingHelperTest.java
@@ -0,0 +1,168 @@
+diff --git a/spark/v2/src/test/java/io/delta/spark/internal/v2/utils/StreamingHelperTest.java b/spark/v2/src/test/java/io/delta/spark/internal/v2/utils/StreamingHelperTest.java
+--- a/spark/v2/src/test/java/io/delta/spark/internal/v2/utils/StreamingHelperTest.java
++++ b/spark/v2/src/test/java/io/delta/spark/internal/v2/utils/StreamingHelperTest.java
+ 
+ import io.delta.kernel.Snapshot;
+ import io.delta.kernel.internal.DeltaHistoryManager;
++import io.delta.kernel.internal.actions.Metadata;
++import io.delta.kernel.internal.actions.Protocol;
++import io.delta.kernel.types.IntegerType;
++import io.delta.kernel.types.StringType;
++import io.delta.kernel.types.StructType;
+ import io.delta.spark.internal.v2.DeltaV2TestBase;
+ import io.delta.spark.internal.v2.exception.VersionNotFoundException;
+ import io.delta.spark.internal.v2.snapshot.PathBasedSnapshotManager;
+ import java.io.File;
+ import java.sql.Timestamp;
++import java.util.Map;
++import java.util.Optional;
++import java.util.Set;
+ import java.util.stream.Stream;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.spark.sql.delta.DeltaLog;
+           .checkVersionExists(versionToCheck, Option.empty(), mustBeRecreatable, allowOutOfRange);
+     }
+   }
++
++  // ---------------------------------------------------------------------------
++  // collectMetadataActionsFromRangeUnsafe
++  //
++  // Fixture for parameterized cases below: v0 CREATE, v1 INSERT, v2 ALTER ADD COLUMNS.
++  // Versions with a Metadata action: {v0, v2}. v1 has none.
++  // ---------------------------------------------------------------------------
++
++  private void setupTableWithMetadataChangeAtV2(String testTablePath, String testTableName) {
++    createEmptyTestTable(testTablePath, testTableName); // v0
++    spark.sql(String.format("INSERT INTO %s VALUES (1, 'Alice')", testTableName)); // v1
++    spark.sql(String.format("ALTER TABLE %s ADD COLUMNS (c3 INT)", testTableName)); // v2
++  }
++
++  private static Stream<Arguments> collectMetadataTestCases() {
++    return Stream.of(
++        // scenario, startVersion, endVersionOpt, expectedVersions
++        Arguments.of("fullRange_endInclusive", 0L, Optional.of(2L), Set.of(0L, 2L)),
++        Arguments.of("nonTrivialStartExcludesV0", 1L, Optional.of(2L), Set.of(2L)),
++        Arguments.of("emptyEndReachesLatest", 0L, Optional.empty(), Set.of(0L, 2L)),
++        Arguments.of("rangeWithoutMetadataChange", 1L, Optional.of(1L), Set.of()),
++        Arguments.of("endBeforeChange", 0L, Optional.of(1L), Set.of(0L)),
++        Arguments.of("emptyEndWithNonTrivialStart", 1L, Optional.empty(), Set.of(2L)),
++        Arguments.of("singleVersionAtChange", 2L, Optional.of(2L), Set.of(2L)));
++  }
++
++  @ParameterizedTest(name = "{0}")
++  @MethodSource("collectMetadataTestCases")
++  public void testCollectMetadataActionsFromRangeUnsafe(
++      String scenario,
++      long startVersion,
++      Optional<Long> endVersionOpt,
++      Set<Long> expectedVersions,
++      @TempDir File tempDir) {
++    String testTablePath = tempDir.getAbsolutePath();
++    String testTableName = "test_collect_metadata_" + scenario;
++    setupTableWithMetadataChangeAtV2(testTablePath, testTableName);
++    snapshotManager =
++        new PathBasedSnapshotManager(testTablePath, spark.sessionState().newHadoopConf());
++
++    Map<Long, Metadata> result =
++        StreamingHelper.collectMetadataActionsFromRangeUnsafe(
++            startVersion, endVersionOpt, snapshotManager, defaultEngine, testTablePath);
++
++    assertEquals(expectedVersions, result.keySet());
++  }
++
++  /** Verifies that the version → Metadata mapping returns the correct Metadata content. */
++  @Test
++  public void testCollectMetadataActionsFromRangeUnsafe_returnsCorrectMetadataPerVersion(
++      @TempDir File tempDir) {
++    String testTablePath = tempDir.getAbsolutePath();
++    String testTableName = "test_collect_metadata_content";
++    setupTableWithMetadataChangeAtV2(testTablePath, testTableName);
++    snapshotManager =
++        new PathBasedSnapshotManager(testTablePath, spark.sessionState().newHadoopConf());
++
++    Map<Long, Metadata> result =
++        StreamingHelper.collectMetadataActionsFromRangeUnsafe(
++            0L, Optional.of(2L), snapshotManager, defaultEngine, testTablePath);
++
++    StructType expectedV0Schema =
++        new StructType().add("id", IntegerType.INTEGER).add("name", StringType.STRING);
++    StructType expectedV2Schema =
++        new StructType()
++            .add("id", IntegerType.INTEGER)
++            .add("name", StringType.STRING)
++            .add("c3", IntegerType.INTEGER);
++    assertEquals(expectedV0Schema, result.get(0L).getSchema());
++    assertEquals(expectedV2Schema, result.get(2L).getSchema());
++  }
++
++  // ---------------------------------------------------------------------------
++  // collectProtocolActionsFromRangeUnsafe
++  //
++  // Fixture for parameterized cases below: v0 CREATE, v1 INSERT, v2 SET TBLPROPERTIES (upgrade).
++  // Versions with a Protocol action: {v0, v2}. v1 has none.
++  // ---------------------------------------------------------------------------
++
++  private void setupTableWithProtocolUpgradeAtV2(String testTablePath, String testTableName) {
++    createEmptyTestTable(testTablePath, testTableName); // v0
++    spark.sql(String.format("INSERT INTO %s VALUES (1, 'Alice')", testTableName)); // v1
++    spark.sql(
++        String.format(
++            "ALTER TABLE %s SET TBLPROPERTIES "
++                + "('delta.minReaderVersion' = '3', 'delta.minWriterVersion' = '7')",
++            testTableName)); // v2
++  }
++
++  private static Stream<Arguments> collectProtocolTestCases() {
++    return Stream.of(
++        // scenario, startVersion, endVersionOpt, expectedVersions
++        Arguments.of("fullRange_endInclusive", 0L, Optional.of(2L), Set.of(0L, 2L)),
++        Arguments.of("nonTrivialStartExcludesV0", 1L, Optional.of(2L), Set.of(2L)),
++        Arguments.of("emptyEndReachesLatest", 0L, Optional.empty(), Set.of(0L, 2L)),
++        Arguments.of("rangeWithoutProtocolChange", 1L, Optional.of(1L), Set.of()),
++        Arguments.of("endBeforeChange", 0L, Optional.of(1L), Set.of(0L)),
++        Arguments.of("emptyEndWithNonTrivialStart", 1L, Optional.empty(), Set.of(2L)),
++        Arguments.of("singleVersionAtChange", 2L, Optional.of(2L), Set.of(2L)));
++  }
++
++  @ParameterizedTest(name = "{0}")
++  @MethodSource("collectProtocolTestCases")
++  public void testCollectProtocolActionsFromRangeUnsafe(
++      String scenario,
++      long startVersion,
++      Optional<Long> endVersionOpt,
++      Set<Long> expectedVersions,
++      @TempDir File tempDir) {
++    String testTablePath = tempDir.getAbsolutePath();
++    String testTableName = "test_collect_protocol_" + scenario;
++    setupTableWithProtocolUpgradeAtV2(testTablePath, testTableName);
++    snapshotManager =
++        new PathBasedSnapshotManager(testTablePath, spark.sessionState().newHadoopConf());
++
++    Map<Long, Protocol> result =
++        StreamingHelper.collectProtocolActionsFromRangeUnsafe(
++            startVersion, endVersionOpt, snapshotManager, defaultEngine, testTablePath);
++
++    assertEquals(expectedVersions, result.keySet());
++  }
++
++  /** Verifies that the version → Protocol mapping returns the correct Protocol content. */
++  @Test
++  public void testCollectProtocolActionsFromRangeUnsafe_returnsCorrectProtocolPerVersion(
++      @TempDir File tempDir) {
++    String testTablePath = tempDir.getAbsolutePath();
++    String testTableName = "test_collect_protocol_content";
++    setupTableWithProtocolUpgradeAtV2(testTablePath, testTableName);
++    snapshotManager =
++        new PathBasedSnapshotManager(testTablePath, spark.sessionState().newHadoopConf());
++
++    Map<Long, Protocol> result =
++        StreamingHelper.collectProtocolActionsFromRangeUnsafe(
++            0L, Optional.of(2L), snapshotManager, defaultEngine, testTablePath);
++
++    // v0 default: reader=1, writer=2; v2 upgraded: reader=3, writer=7
++    assertEquals(1, result.get(0L).getMinReaderVersion());
++    assertEquals(2, result.get(0L).getMinWriterVersion());
++    assertEquals(3, result.get(2L).getMinReaderVersion());
++    assertEquals(7, result.get(2L).getMinWriterVersion());
++  }
+ }
\ No newline at end of file

Reproduce locally: git range-diff 67377eb..9a48144 a80dfae..696ce03 | Disable: git config gitstack.push-range-diff false

@PorridgeSwim PorridgeSwim force-pushed the stack/MetadataEvolutionHandler2 branch from 696ce03 to 6981a40 Compare May 4, 2026 06:25
@PorridgeSwim
Copy link
Copy Markdown
Collaborator Author

Range-diff: stack/RefactorDeltaSourceMetadataEvolutionSupport (696ce03 -> 6981a40)
spark/v2/src/main/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandler.java
@@ -28,7 +28,6 @@
 +import io.delta.spark.internal.v2.adapters.KernelMetadataAdapter;
 +import io.delta.spark.internal.v2.adapters.KernelProtocolAdapter;
 +import io.delta.spark.internal.v2.snapshot.DeltaSnapshotManager;
-+import io.delta.spark.internal.v2.utils.ScalaUtils;
 +import io.delta.spark.internal.v2.utils.StreamingHelper;
 +import java.util.ArrayList;
 +import java.util.Collections;
@@ -49,7 +48,6 @@
 +import org.apache.spark.sql.delta.sources.PersistedMetadata;
 +import org.apache.spark.sql.delta.v2.interop.AbstractMetadata;
 +import org.apache.spark.sql.delta.v2.interop.AbstractProtocol;
-+import org.apache.spark.sql.types.StructType;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +import scala.Option;
@@ -90,9 +88,6 @@
 +
 +  // Read-time state captured at source initialization
 +  private final Metadata readMetadataAtSourceInit;
-+  private final StructType readSchemaAtSourceInit;
-+  private final StructType readPartitionSchemaAtSourceInit;
-+  private final Map<String, String> readConfigurationsAtSourceInit;
 +  private final Protocol readProtocolAtSourceInit;
 +  private final String metadataPath;
 +
@@ -109,9 +104,6 @@
 +      DeltaStreamUtils.SchemaReadOptions schemaReadOptions,
 +      Option<DeltaSourceMetadataTrackingLog> metadataTrackingLog,
 +      Metadata readMetadataAtSourceInit,
-+      StructType readSchemaAtSourceInit,
-+      StructType readPartitionSchemaAtSourceInit,
-+      Map<String, String> readConfigurationsAtSourceInit,
 +      Protocol readProtocolAtSourceInit,
 +      String metadataPath) {
 +    this.spark = Objects.requireNonNull(spark);
@@ -123,9 +115,6 @@
 +    this.schemaReadOptions = Objects.requireNonNull(schemaReadOptions);
 +    this.metadataTrackingLog = Objects.requireNonNull(metadataTrackingLog);
 +    this.readMetadataAtSourceInit = Objects.requireNonNull(readMetadataAtSourceInit);
-+    this.readSchemaAtSourceInit = Objects.requireNonNull(readSchemaAtSourceInit);
-+    this.readPartitionSchemaAtSourceInit = Objects.requireNonNull(readPartitionSchemaAtSourceInit);
-+    this.readConfigurationsAtSourceInit = Objects.requireNonNull(readConfigurationsAtSourceInit);
 +    this.readProtocolAtSourceInit = Objects.requireNonNull(readProtocolAtSourceInit);
 +    this.metadataPath = Objects.requireNonNull(metadataPath);
 +    this.persistedMetadataAtSourceInit =
@@ -388,9 +377,7 @@
 +            newSchemaVersion,
 +            persistedOpt,
 +            new KernelProtocolAdapter(readProtocolAtSourceInit),
-+            readSchemaAtSourceInit,
-+            readPartitionSchemaAtSourceInit,
-+            ScalaUtils.toScalaMap(readConfigurationsAtSourceInit),
++            new KernelMetadataAdapter(readMetadataAtSourceInit),
 +            spark);
 +  }
 +
spark/v2/src/test/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandlerTest.java
@@ -168,7 +168,6 @@
 +  private MetadataEvolutionHandler buildLightweightHandler(
 +      Option<DeltaSourceMetadataTrackingLog> trackingLog,
 +      DeltaStreamUtils.SchemaReadOptions readOptions) {
-+    KernelMetadataAdapter adapter = new KernelMetadataAdapter(DEFAULT_METADATA);
 +    return new MetadataEvolutionHandler(
 +        spark,
 +        "test-table-id",
@@ -179,9 +178,6 @@
 +        readOptions,
 +        trackingLog,
 +        DEFAULT_METADATA,
-+        adapter.schema(),
-+        adapter.partitionSchema(),
-+        scala.jdk.javaapi.CollectionConverters.asJava(adapter.configuration()),
 +        DEFAULT_PROTOCOL,
 +        "/tmp/fake-table/_delta_log/_streaming_metadata");
 +  }
@@ -240,9 +236,6 @@
 +            schemaReadOptions(/* allowUnsafeColumnMappingRead= */ false),
 +            Option.apply(trackingLog),
 +            tableMetadata,
-+            adapter.schema(),
-+            adapter.partitionSchema(),
-+            scala.jdk.javaapi.CollectionConverters.asJava(adapter.configuration()),
 +            tableProtocol,
 +            tablePath + "/_delta_log/_streaming_metadata");
 +
spark/v2/src/test/java/io/delta/spark/internal/v2/utils/StreamingHelperTest.java
@@ -105,10 +105,12 @@
 +  private void setupTableWithProtocolUpgradeAtV2(String testTablePath, String testTableName) {
 +    createEmptyTestTable(testTablePath, testTableName); // v0
 +    spark.sql(String.format("INSERT INTO %s VALUES (1, 'Alice')", testTableName)); // v1
++    // Adding a table feature forces a real protocol upgrade; setting only minReader/minWriter to
++    // (3, 7) is normalized back to (1, 2) by Protocol.merge when no features are introduced and
++    // produces no Protocol action.
 +    spark.sql(
 +        String.format(
-+            "ALTER TABLE %s SET TBLPROPERTIES "
-+                + "('delta.minReaderVersion' = '3', 'delta.minWriterVersion' = '7')",
++            "ALTER TABLE %s SET TBLPROPERTIES " + "('delta.feature.deletionVectors' = 'supported')",
 +            testTableName)); // v2
 +  }
 +

Reproduce locally: git range-diff a80dfae..696ce03 e3eb104..6981a40 | Disable: git config gitstack.push-range-diff false

Copy link
Copy Markdown
Collaborator

@TimothyW553 TimothyW553 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a few blocking comments

Comment on lines +156 to +176
public CloseableIterator<IndexedFile> stopIndexedFileIteratorAtSchemaChangeBarrier(
CloseableIterator<IndexedFile> fileActions) {
// Consume until we hit the barrier, include the barrier itself, discard the rest.
List<IndexedFile> result = new ArrayList<>();
try {
while (fileActions.hasNext()) {
IndexedFile file = fileActions.next();
result.add(file);
if (file.getIndex() == DeltaSourceOffset.METADATA_CHANGE_INDEX()) {
break;
}
}
} finally {
try {
fileActions.close();
} catch (Exception e) {
throw new RuntimeException("Failed to close file actions iterator", e);
}
}
return Utils.toCloseableIterator(result.iterator());
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

V1 was lazy here but this version reads the whole iterator into a list first. For large batches this could use a lot of memory - can we make it lazy?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds great to me, utilize breakableFilter to make it lazy

Comment on lines +168 to +174
} finally {
try {
fileActions.close();
} catch (Exception e) {
throw new RuntimeException("Failed to close file actions iterator", e);
}
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If next() throws and then close() also throws, we lose the original error. Use addSuppressed or try-with-resources?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a issue anymore after we changed it to lazy

Comment on lines +832 to +842
val upgradedMetadata = new AbstractMetadata {
val id: String = oldMetadata.id
val name: String = oldMetadata.name
val description: String = oldMetadata.description
val schema: StructType = upgradedSchema
val partitionColumns: Seq[String] = oldMetadata.partitionColumns
val configuration: Map[String, String] = oldMetadata.configuration +
(DeltaConfigs.COLUMN_MAPPING_MODE.key -> newMetadata.columnMappingMode.name,
DeltaConfigs.COLUMN_MAPPING_MAX_ID.key -> upgradedMaxId.toString)
val columnMappingMode: DeltaColumnMappingMode = newMetadata.columnMappingMode
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a new field is added to AbstractMetadata, this anonymous impl will silently still compile with a trait default - exactly the drift the new equalsByFields is meant to prevent. Can we use a named class or a copy helper?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is a issue, Scala requires anonymous trait instantiations to implement all abstract members

Copy link
Copy Markdown
Collaborator

@johanl-db johanl-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't do a detailed review, only looked at a high-level: the change makes sense, the main evaluation at this stage will be that existing DSV1 tests can be run and pass against this new implementation.

@PorridgeSwim PorridgeSwim force-pushed the stack/MetadataEvolutionHandler2 branch from 6981a40 to 5788701 Compare May 5, 2026 20:47
TimothyW553 pushed a commit that referenced this pull request May 5, 2026
## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/6546/files) to
review incremental changes.
-
[**stack/SparkMetadataAdapter**](#6546)
[[Files changed](https://github.com/delta-io/delta/pull/6546/files)]
-
[stack/RefactorMetadataTrackingLog](#6550)
[[Files
changed](https://github.com/delta-io/delta/pull/6550/files/9271a6262f7a2615b977de0319c7238044b7d0a9..8378d33acda70a34a109b35173a968a4b3401ec1)]
-
[stack/RefactorDeltaSourceMetadataEvolutionSupport](#6562)
[[Files
changed](https://github.com/delta-io/delta/pull/6562/files/8378d33acda70a34a109b35173a968a4b3401ec1..90365431b12640de181446ec9c2033fb1b143b03)]
-
[stack/MetadataEvolutionHandler2](#6563)
[[Files
changed](https://github.com/delta-io/delta/pull/6563/files/28bb7021adb12b055e1b281fdfee0ab48a8732ac..578870181fa81a9146b2fa907244e350ffcabb52)]
-
[stack/NonAdditiveSchemaEvolution2](#6570)
[[Files
changed](https://github.com/delta-io/delta/pull/6570/files/578870181fa81a9146b2fa907244e350ffcabb52..c025b7c3c386e8d46d6142d0727dce95582bb0ef)]
-
[stack/NonAdditiveSchemaEvolution3](#6697)
[[Files
changed](https://github.com/delta-io/delta/pull/6697/files/c025b7c3c386e8d46d6142d0727dce95582bb0ef..db16b9fa80a80c105430c93589126ba8b828458f)]
-
[stack/consecutiveSchemaChangesMerger](#6698)
[[Files
changed](https://github.com/delta-io/delta/pull/6698/files/0148020ffe11e7b079e99fa8c5189a19c354f2be..9a360aa819f20d78b5361b2e997d24433fb793d5)]

---------
#### Which Delta project/connector is this regarding?

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

PR 1/7 in the non-additive schema evolution for V2 streaming connector
stack.

The shared V1 Scala utilities (`DeltaColumnMapping`,
`DeltaSourceMetadataEvolutionSupport`) operate on
`AbstractMetadata`/`AbstractProtocol`, but V2 holds Kernel types. This
PR creates two adapter classes that bridge the gap:

- `KernelMetadataAdapter`: Kernel `Metadata` → `AbstractMetadata`
(schema conversion via `SchemaUtils`, partition columns and
configuration converted to Scala collections)
- `KernelProtocolAdapter`: Kernel `Protocol` → `AbstractProtocol` (maps
reader/writer features to `Option[Set[String]]`)

Also adds `columnMappingMode` and `partitionSchema` to the
`AbstractMetadata` trait — V1's `Metadata` already had these fields, the
trait just didn't expose them.

## How was this patch tested?

Unit tests in `ActionAdaptersTest.java`: table-features protocol, legacy
protocol, full metadata round-trip, null optional fields, and null
constructor rejection.

## Does this PR introduce _any_ user-facing changes?

No.
@PorridgeSwim PorridgeSwim force-pushed the stack/MetadataEvolutionHandler2 branch from 5788701 to 1438111 Compare May 6, 2026 00:04
@PorridgeSwim
Copy link
Copy Markdown
Collaborator Author

Range-diff: stack/RefactorDeltaSourceMetadataEvolutionSupport (5788701 -> 1438111)
spark/v2/src/main/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandler.java
@@ -25,6 +25,7 @@
 +import io.delta.kernel.internal.actions.Protocol;
 +import io.delta.kernel.internal.util.Utils;
 +import io.delta.kernel.utils.CloseableIterator;
++import io.delta.kernel.utils.CloseableIterator.BreakableFilterResult;
 +import io.delta.spark.internal.v2.adapters.KernelMetadataAdapter;
 +import io.delta.spark.internal.v2.adapters.KernelProtocolAdapter;
 +import io.delta.spark.internal.v2.snapshot.DeltaSnapshotManager;
@@ -159,24 +160,18 @@
 +   */
 +  public CloseableIterator<IndexedFile> stopIndexedFileIteratorAtSchemaChangeBarrier(
 +      CloseableIterator<IndexedFile> fileActions) {
-+    // Consume until we hit the barrier, include the barrier itself, discard the rest.
-+    List<IndexedFile> result = new ArrayList<>();
-+    try {
-+      while (fileActions.hasNext()) {
-+        IndexedFile file = fileActions.next();
-+        result.add(file);
-+        if (file.getIndex() == DeltaSourceOffset.METADATA_CHANGE_INDEX()) {
-+          break;
-+        }
-+      }
-+    } finally {
-+      try {
-+        fileActions.close();
-+      } catch (Exception e) {
-+        throw new RuntimeException("Failed to close file actions iterator", e);
-+      }
-+    }
-+    return Utils.toCloseableIterator(result.iterator());
++    // Lazily include files up to and including the barrier, then break.
++    boolean[] sawBarrier = {false};
++    return fileActions.breakableFilter(
++        file -> {
++          if (sawBarrier[0]) {
++            return BreakableFilterResult.BREAK;
++          }
++          if (file.getIndex() == DeltaSourceOffset.METADATA_CHANGE_INDEX()) {
++            sawBarrier[0] = true;
++          }
++          return BreakableFilterResult.INCLUDE;
++        });
 +  }
 +
 +  /**

Reproduce locally: git range-diff 28bb702..5788701 624a83e..1438111 | Disable: git config gitstack.push-range-diff false

* <p>V2 port of V1's {@code
* DeltaSourceMetadataEvolutionSupport.getNextOffsetFromPreviousOffsetIfPendingSchemaChange}.
*/
public DeltaSourceOffset getNextOffsetFromPreviousOffsetIfPendingSchemaChange(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SparkMicroBatchStream.getNextOffsetFromPreviousOffset returns an Optional<>, we should match that.

* DeltaSourceMetadataEvolutionSupport.initializeMetadataTrackingAndExitStream}.
*/
public void initializeMetadataTrackingAndExitStream(
long batchStartVersion, Long batchEndVersion, boolean alwaysFailUponLogInitialized) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be Optional?

Copy link
Copy Markdown
Collaborator Author

@PorridgeSwim PorridgeSwim May 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think Optional<> is encouraged to be used as a method parameter in Java. I can add a @Nullable anotation

new KernelProtocolAdapter(protocolToUse),
metadataPath);

if (replace) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just do metadataTrackingLog.get().writeNewMetadata(schemaToPersist, replace);?

* picks up the updated schema.
* </ol>
*
* <p>See V1's trait doc for the full barrier protocol details. Validation logic shared with v1
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not immediately clear what this refers to.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

@PorridgeSwim PorridgeSwim force-pushed the stack/MetadataEvolutionHandler2 branch from 1438111 to ada8458 Compare May 6, 2026 01:34
@PorridgeSwim
Copy link
Copy Markdown
Collaborator Author

Range-diff: stack/RefactorDeltaSourceMetadataEvolutionSupport (1438111 -> ada8458)
spark/v2/src/test/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandlerTest.java
@@ -209,10 +209,9 @@
 +            tablePath,
 +            EMPTY_SCALA_MAP,
 +            Option.empty(),
-+            false,
-+            null, // consecutiveSchemaChangesMerger — unused since
-+            // mergeConsecutiveSchemaChanges=false
-+            true);
++            /* mergeConsecutiveSchemaChanges= */ false,
++            /* consecutiveSchemaChangesMerger= */ Option.empty(),
++            /* initMetadataLogEagerly= */ true);
 +
 +    if (seedLogWithInitEntry) {
 +      PersistedMetadata entry =
@@ -257,10 +256,9 @@
 +            "/tmp/fake-table",
 +            EMPTY_SCALA_MAP,
 +            Option.empty(),
-+            false,
-+            null, // consecutiveSchemaChangesMerger — unused since
-+            // mergeConsecutiveSchemaChanges=false
-+            true);
++            /* mergeConsecutiveSchemaChanges= */ false,
++            /* consecutiveSchemaChangesMerger= */ Option.empty(),
++            /* initMetadataLogEagerly= */ true);
 +    if (seedWithDefaultEntry) {
 +      PersistedMetadata entry =
 +          PersistedMetadata.apply(

Reproduce locally: git range-diff 624a83e..1438111 027984b..ada8458 | Disable: git config gitstack.push-range-diff false

TimothyW553 pushed a commit that referenced this pull request May 6, 2026
…#6550)

## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/6550/files) to
review incremental changes.
-
[stack/SparkMetadataAdapter](#6546)
[[Files changed](https://github.com/delta-io/delta/pull/6546/files)]
[MERGED]
-
[**stack/RefactorMetadataTrackingLog**](#6550)
[[Files changed](https://github.com/delta-io/delta/pull/6550/files)]
-
[stack/RefactorDeltaSourceMetadataEvolutionSupport](#6562)
[[Files
changed](https://github.com/delta-io/delta/pull/6562/files/953f137f8c4ce46d8b8a9605b0c7bed898e30df4..027984b6edcbad0f4731e560425c2ed9bcf8fc27)]
-
[stack/MetadataEvolutionHandler2](#6563)
[[Files
changed](https://github.com/delta-io/delta/pull/6563/files/027984b6edcbad0f4731e560425c2ed9bcf8fc27..ada845895139edcb2727a87b39922c8e16837a99)]
-
[stack/NonAdditiveSchemaEvolution2](#6570)
[[Files
changed](https://github.com/delta-io/delta/pull/6570/files/ada845895139edcb2727a87b39922c8e16837a99..476762fde7b9cb9b9bc3e416c86a260cd29806ed)]
-
[stack/NonAdditiveSchemaEvolution3](#6697)
[[Files
changed](https://github.com/delta-io/delta/pull/6697/files/476762fde7b9cb9b9bc3e416c86a260cd29806ed..13395a7f2a49db4962091e8ee919bebdab5bd4e2)]
-
[stack/consecutiveSchemaChangesMerger](#6698)
[[Files
changed](https://github.com/delta-io/delta/pull/6698/files/13395a7f2a49db4962091e8ee919bebdab5bd4e2..f22ba063eaf35ab69d653a2d5faefdc52f35eab5)]

---------
#### Which Delta project/connector is this regarding?

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

PR 2/7 in the non-additive schema evolution for V2 streaming connector
stack.

Decouple `DeltaSourceMetadataTrackingLog` and `PersistedMetadata` from
V1-specific types so the schema log can be reused by the V2 connector.

- Replace `SnapshotDescriptor` parameter in `create()` with plain
`sourceTableId` and `sourceDataPath` strings
- Unify `PersistedMetadata.apply` to accept
`AbstractMetadata`/`AbstractProtocol` instead of V1
`Metadata`/`Protocol`
- Extract the consecutive schema changes merger (V1-specific, depends on
`DeltaLog`) out of the companion object into
`DeltaSourceMetadataEvolutionSupport`, and inject it as a function
parameter so V2 can provide its own implementation
- Remove `Protocol`'s `private` constructor modifier to allow
construction from abstract protocol fields

All changes are structural refactors with no behavioral change.

## How was this patch tested?

Existing tests in `DeltaSourceSchemaEvolutionSuite` updated to use the
new API. No behavioral changes.

## Does this PR introduce _any_ user-facing changes?

No.
@PorridgeSwim PorridgeSwim force-pushed the stack/MetadataEvolutionHandler2 branch 2 times, most recently from 54b456f to e5b2c32 Compare May 6, 2026 17:56
murali-db pushed a commit that referenced this pull request May 6, 2026
…seable in v2 (#6562)

## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/6562/files) to
review incremental changes.
-
[stack/SparkMetadataAdapter](#6546)
[[Files changed](https://github.com/delta-io/delta/pull/6546/files)]
[MERGED]
-
[stack/RefactorMetadataTrackingLog](#6550)
[[Files changed](https://github.com/delta-io/delta/pull/6550/files)]
[MERGED]
-
[**stack/RefactorDeltaSourceMetadataEvolutionSupport**](#6562)
[[Files changed](https://github.com/delta-io/delta/pull/6562/files)]
-
[stack/MetadataEvolutionHandler2](#6563)
[[Files
changed](https://github.com/delta-io/delta/pull/6563/files/ed92a0fa2051432b6bc5784034df0b7949bbfb98..e5b2c3295843ec85753e07dc0010aa5ccebaabb7)]
-
[stack/NonAdditiveSchemaEvolution2](#6570)
[[Files
changed](https://github.com/delta-io/delta/pull/6570/files/e5b2c3295843ec85753e07dc0010aa5ccebaabb7..7c66bf11a0f1b651cda32ed7f529f552dd9dbfcb)]
-
[stack/NonAdditiveSchemaEvolution3](#6697)
[[Files
changed](https://github.com/delta-io/delta/pull/6697/files/7c66bf11a0f1b651cda32ed7f529f552dd9dbfcb..14956ea304c93d2343ccd7eb89a112966f07f906)]
-
[stack/consecutiveSchemaChangesMerger](#6698)
[[Files
changed](https://github.com/delta-io/delta/pull/6698/files/14956ea304c93d2343ccd7eb89a112966f07f906..8101b335b892a6a5b6d6fe11f4a202d14102721c)]

---------
#### Which Delta project/connector is this regarding?

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

PR 3/7 in the non-additive schema evolution for V2 streaming connector
stack.

Refactor `DeltaSourceMetadataEvolutionSupport` and `DeltaColumnMapping`
so the schema change detection logic can be called from V2 without
depending on V1 instance state.

**`DeltaSourceMetadataEvolutionSupport`:**
- Extract instance methods (`validateAndResolveMetadataEvolution`,
`checkColumnMappingSchemaChangesDuringStreaming`,
`resolveMetadataEvolutionForCommitRange`, etc.) to companion object
statics that accept explicit parameters instead of accessing V1
`DeltaSource` via `this`
- V1 trait methods now delegate to the companion object statics

**`DeltaColumnMapping`:**
- Widen `hasNoColumnMappingSchemaChanges` from V1 `Metadata` to
`AbstractMetadata` so V2 can call it via the adapter layer
- Extract `assignColumnIdAndPhysicalNameToSchema(StructType, Map)` from
`assignColumnIdAndPhysicalName(Metadata, Metadata, ...)` — needed for
simulating column mapping upgrades during NoMapping-to-NameMapping
transitions

All changes are structural refactors with no behavioral change.

## How was this patch tested?

Existing tests in `DeltaSourceSchemaEvolutionSuite` continue to pass. No
behavioral changes.

## Does this PR introduce _any_ user-facing changes?

No.
@PorridgeSwim PorridgeSwim force-pushed the stack/MetadataEvolutionHandler2 branch 2 times, most recently from f963649 to a20f1f3 Compare May 6, 2026 23:16
@PorridgeSwim PorridgeSwim force-pushed the stack/MetadataEvolutionHandler2 branch from a20f1f3 to 2349027 Compare May 10, 2026 21:43
@PorridgeSwim PorridgeSwim mentioned this pull request May 10, 2026
@murali-db murali-db merged commit 48ba860 into delta-io:master May 11, 2026
31 checks passed
murali-db pushed a commit that referenced this pull request May 16, 2026
## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/6570/files) to
review incremental changes.
-
[stack/SparkMetadataAdapter](#6546)
[[Files changed](https://github.com/delta-io/delta/pull/6546/files)]
[MERGED]
-
[stack/RefactorMetadataTrackingLog](#6550)
[[Files changed](https://github.com/delta-io/delta/pull/6550/files)]
[MERGED]
-
[stack/RefactorDeltaSourceMetadataEvolutionSupport](#6562)
[[Files changed](https://github.com/delta-io/delta/pull/6562/files)]
[MERGED]
-
[stack/MetadataEvolutionHandler2](#6563)
[[Files changed](https://github.com/delta-io/delta/pull/6563/files)]
[MERGED]
-
[**stack/NonAdditiveSchemaEvolution2**](#6570)
[[Files changed](https://github.com/delta-io/delta/pull/6570/files)]
-
[stack/NonAdditiveSchemaEvolution3](#6697)
[[Files
changed](https://github.com/delta-io/delta/pull/6697/files/b7f6c8ebfc0882e7e2cc580f09f376be23a8d43d..dbb6246c14be1ab7f017ad9fc26455ae599ee676)]
-
[stack/consecutiveSchemaChangesMerger](#6698)
[[Files
changed](https://github.com/delta-io/delta/pull/6698/files/dbb6246c14be1ab7f017ad9fc26455ae599ee676..4bf2fa3fa828bcab0b56c4c26ca51ee9cc40b482)]
-
[stack/SchemaTrackingWithCDC](#6801)
[[Files
changed](https://github.com/delta-io/delta/pull/6801/files/4bf2fa3fa828bcab0b56c4c26ca51ee9cc40b482..a78a4ac2bc9a52605278a36b98804230258c12a2)]
- [stack/V1V2MixTest](#6759)
[[Files
changed](https://github.com/delta-io/delta/pull/6759/files/7f9b7f2724b2245ab7380908616303cf7ea95fca..e146cdc9ebb0572e8b0a928cc6dd3bfdc198d984)]

---------
#### Which Delta project/connector is this regarding?

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

PR 5/7 in the non-additive schema evolution for V2 streaming connector
stack.

Wire schema tracking into V2's analysis path so the analyzed plan
reflects the persisted (evolved) schema instead of the live snapshot
schema.

- `DeltaAnalysis.verifyDeltaSourceSchemaLocation`: extend the
duplicate-schema-location check to also visit `StreamingRelationV2`,
keyed on the V2 `Table.name`.
- `SparkTable`: open `DeltaSourceMetadataTrackingLog` once during
construction (gated on `mergeConsecutiveSchemaChanges`) and seed
`SchemaProvider` from the persisted metadata, so analysis-time
`schema()` matches what the stream will read at runtime.
- `ApplyV2ReadOptions` (renamed from `ApplyV2Streaming`): generalize the
CDC-only rebuild to also fire when `schemaTrackingLocation` arrives via
`extraOptions` on the catalog `readStream.table()` path; rebuild
`SparkTable` with merged options so the schema-log lookup actually
fires.
- `MetadataEvolutionHandler.getMetadataTrackingLogForMicroBatchStream`:
V2 port of V1's helper, reused by `SparkTable` (analysis) and
`SparkScan` (execution).

## How was this patch tested?

`SparkTableTest`, `MetadataEvolutionHandlerTest`,
`ApplyV2ReadOptionsSuite`. Unified `DeltaV2SourceSchemaEvolutionSuite`
updated.

## Does this PR introduce _any_ user-facing changes?

No.
murali-db pushed a commit that referenced this pull request May 16, 2026
…6697)

## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/6697/files) to
review incremental changes.
-
[stack/SparkMetadataAdapter](#6546)
[[Files changed](https://github.com/delta-io/delta/pull/6546/files)]
[MERGED]
-
[stack/RefactorMetadataTrackingLog](#6550)
[[Files changed](https://github.com/delta-io/delta/pull/6550/files)]
[MERGED]
-
[stack/RefactorDeltaSourceMetadataEvolutionSupport](#6562)
[[Files changed](https://github.com/delta-io/delta/pull/6562/files)]
[MERGED]
-
[stack/MetadataEvolutionHandler2](#6563)
[[Files changed](https://github.com/delta-io/delta/pull/6563/files)]
[MERGED]
-
[stack/NonAdditiveSchemaEvolution2](#6570)
[[Files changed](https://github.com/delta-io/delta/pull/6570/files)]
[MERGED]
-
[**stack/NonAdditiveSchemaEvolution3**](#6697)
[[Files changed](https://github.com/delta-io/delta/pull/6697/files)]
-
[stack/consecutiveSchemaChangesMerger](#6698)
[[Files
changed](https://github.com/delta-io/delta/pull/6698/files/f96643aa3cc01e7f70cc13a18b82dc27f277f11d..f612628ad931ec35c237801109f01b6fbd1379f7)]
-
[stack/SchemaTrackingWithCDC](#6801)
[[Files
changed](https://github.com/delta-io/delta/pull/6801/files/f612628ad931ec35c237801109f01b6fbd1379f7..4aeacfb120b33e9cdfe124352290b72f53f7cf89)]
- [stack/V1V2MixTest](#6759)
[[Files
changed](https://github.com/delta-io/delta/pull/6759/files/f612628ad931ec35c237801109f01b6fbd1379f7..0c818ee431ab417a4f2ffbcc609930be09d25031)]

---------
#### Which Delta project/connector is this regarding?

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

PR 6/7 in the non-additive schema evolution for V2 streaming connector
stack.

Wire `MetadataEvolutionHandler` into `SparkMicroBatchStream` and
`SparkScan` so V2 streaming reads honor non-additive schema evolution
(column rename/drop, type widening).

- `SparkMicroBatchStream`: take `metadataTrackingLog` + `metadataPath`
as constructor inputs; when a persisted entry exists, layer it onto the
freshly loaded `snapshotAtSourceInit` to derive
`readSnapshotAtSourceInit` (mirrors V1's `readSnapshotDescriptor`).
Integrate the schema-evolution barrier protocol into `latestOffset` /
`commit` / `planInputPartitions`. Skip the on-restart schema-validation
check when schema tracking is active — the schema-log evolution
exception covers it.
- `SparkScan.toMicroBatchStream`: reload latest snapshot (the
analysis-time `initialSnapshot` can be stale by stream start), open the
tracking log via
`MetadataEvolutionHandler.getMetadataTrackingLogForMicroBatchStream`
with `mergeConsecutiveSchemaChanges=false` (the merger only runs at
analysis), and pass it through with the checkpoint location.
- `SparkScan` option allow-list: move `allowSourceColumnDrop` / `Rename`
/ `TypeChange` out of the unsupported list now that they are honored.

## How was this patch tested?

`SparkMicroBatchStreamTest`, `MetadataEvolutionHandlerTest`. Unified
suites (`DeltaV2SourceSchemaEvolutionSuite`,
`TypeWideningStreamingV2SourceSuite`,
`RemoveColumnMappingStreamingReadV2Suite`) move non-merger evolution
scenarios from `shouldFailTests` to `shouldPassTests`; merger-dependent
tests remain pending until PR 7/7.

## Does this PR introduce _any_ user-facing changes?

No.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants