Skip to content

Add metadata evolution handler in v2#6554

Closed
PorridgeSwim wants to merge 3 commits into
delta-io:masterfrom
PorridgeSwim:stack/MetadataEvolutionHandler
Closed

Add metadata evolution handler in v2#6554
PorridgeSwim wants to merge 3 commits into
delta-io:masterfrom
PorridgeSwim:stack/MetadataEvolutionHandler

Conversation

@PorridgeSwim
Copy link
Copy Markdown
Collaborator

@PorridgeSwim PorridgeSwim commented Apr 13, 2026

🥞 Stacked PR

Use this link to review incremental changes.


Which Delta project/connector is this regarding?

Description

How was this patch tested?

Does this PR introduce any user-facing changes?

@PorridgeSwim
Copy link
Copy Markdown
Collaborator Author

Range-diff: stack/RefactorMetadataTrackingLog (c68732b -> e85d7bd)
spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataEvolutionSupport.scala
@@ -86,8 +86,7 @@
 +      metadataChangeOpt.exists { newMetadata =>
 +        hasSchemaChangeComparedToStreamMetadata(
 +          newMetadata.schema, readSchemaAtSourceInit, spark) ||
-+          new StructType(newMetadata.partitionColumns
-+            .map(c => newMetadata.schema(c)).toArray) != readPartitionSchemaAtSourceInit ||
++          newMetadata.partitionColumns != readPartitionSchemaAtSourceInit ||
 +          newMetadata.configuration.filterKeys(_.startsWith("delta.")).toMap !=
 +            readConfigurationsAtSourceInit.filterKeys(_.startsWith("delta.")).toMap
 +      }

Reproduce locally: git range-diff 30ac78e..c68732b 0c97c0f..e85d7bd | Disable: git config gitstack.push-range-diff false

@PorridgeSwim PorridgeSwim self-assigned this Apr 13, 2026
@PorridgeSwim PorridgeSwim force-pushed the stack/MetadataEvolutionHandler branch from e85d7bd to b737b1d Compare April 13, 2026 21:31
@PorridgeSwim
Copy link
Copy Markdown
Collaborator Author

Range-diff: stack/RefactorMetadataTrackingLog (e85d7bd -> b737b1d)
spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataEvolutionSupport.scala
@@ -8,6 +8,50 @@
  
  import org.apache.spark.internal.Logging
  import org.apache.spark.sql.SparkSession
+  */
+ trait DeltaSourceMetadataEvolutionSupport extends DeltaSourceBase { base: DeltaSource =>
+ 
+-  /**
+-   * Whether this DeltaSource is utilizing a schema log entry as its read schema.
+-   *
+-   * If user explicitly turn on the flag to fall back to using latest schema to read (i.e. the
+-   * legacy mode), we will ignore the schema log.
+-   */
+   protected def trackingMetadataChange: Boolean =
+-    !schemaReadOptions.allowUnsafeStreamingReadOnColumnMappingSchemaChanges &&
+-      metadataTrackingLog.flatMap(_.getCurrentTrackedMetadata).nonEmpty
++    DeltaSourceMetadataEvolutionSupport.shouldTrackMetadataChange(
++      schemaReadOptions, metadataTrackingLog)
+ 
+-  /**
+-   * Whether a schema tracking log is provided (and is empty), so we could initialize eagerly.
+-   * This should only be used for the first write to the schema log, after then, schema tracking
+-   * should not rely on this state any more.
+-   */
+   protected def readyToInitializeMetadataTrackingEagerly: Boolean =
+-    !schemaReadOptions.allowUnsafeStreamingReadOnColumnMappingSchemaChanges &&
+-      metadataTrackingLog.exists { log =>
+-        log.getCurrentTrackedMetadata.isEmpty && log.initMetadataLogEagerly
+-      }
++    DeltaSourceMetadataEvolutionSupport.shouldInitializeMetadataTrackingEagerly(
++      schemaReadOptions, metadataTrackingLog)
+ 
+ 
+   /**
+     }
+   }
+ 
+-  /**
+-   * Check the table metadata or protocol changed since the initial read snapshot. We make sure:
+-   * 1. The schema is the same, except for internal metadata, AND
+-   * 2. The delta related table configurations are strictly equal, AND
+-   * 3. The incoming metadata change should not be considered a failure-causing change if we have
+-   *    marked the persisted schema and the stream progress is behind that schema version.
+-   *    This could happen when we've already merged consecutive schema changes during the analysis
+-   *    phase and we are using the merged schema as the read schema. All the schema changes in
+-   *    between can be safely ignored because they won't contribute any data.
+-   */
+   private def hasMetadataOrProtocolChangeComparedToStreamMetadata(
        metadataChangeOpt: Option[Metadata],
        protocolChangeOpt: Option[Protocol],
        newSchemaVersion: Long): Boolean = {
@@ -52,8 +96,38 @@
        DeltaSQLConf.DELTA_TYPE_WIDENING_BYPASS_STREAMING_TYPE_CHANGE_CHECK)
  
 +  /**
-+   * Check if the metadata or protocol has changed compared to the stream's current metadata.
-+   * This is shared between v1 and v2 connectors.
++   * Whether this source should use schema tracking for metadata evolution.
++   * Shared between v1 and v2 connectors.
++   */
++  def shouldTrackMetadataChange(
++      schemaReadOptions: DeltaStreamUtils.SchemaReadOptions,
++      metadataTrackingLog: Option[DeltaSourceMetadataTrackingLog]): Boolean = {
++    !schemaReadOptions.allowUnsafeStreamingReadOnColumnMappingSchemaChanges &&
++      metadataTrackingLog.flatMap(_.getCurrentTrackedMetadata).nonEmpty
++  }
++
++  /**
++   * Whether the tracking log should be initialized eagerly (log is provided but empty).
++   * Shared between v1 and v2 connectors.
++   */
++  def shouldInitializeMetadataTrackingEagerly(
++      schemaReadOptions: DeltaStreamUtils.SchemaReadOptions,
++      metadataTrackingLog: Option[DeltaSourceMetadataTrackingLog]): Boolean = {
++    !schemaReadOptions.allowUnsafeStreamingReadOnColumnMappingSchemaChanges &&
++      metadataTrackingLog.exists { log =>
++        log.getCurrentTrackedMetadata.isEmpty && log.initMetadataLogEagerly
++      }
++  }
++
++  /**
++   * Check the table metadata or protocol changed since the initial read snapshot. We make sure:
++   * 1. The schema is the same, except for internal metadata, AND
++   * 2. The delta related table configurations are strictly equal, AND
++   * 3. The incoming metadata change should not be considered a failure-causing change if we have
++   *    marked the persisted schema and the stream progress is behind that schema version.
++   *    This could happen when we've already merged consecutive schema changes during the analysis
++   *    phase and we are using the merged schema as the read schema. All the schema changes in
++   *    between can be safely ignored because they won't contribute any data.
 +   *
 +   * @param metadataChangeOpt New metadata action, if any.
 +   * @param protocolChangeOpt New protocol action, if any.
spark/v2/src/main/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandler.java
@@ -19,17 +19,19 @@
 + */
 +package io.delta.spark.internal.v2.read;
 +
-+import io.delta.kernel.Snapshot;
 +import io.delta.kernel.engine.Engine;
 +import io.delta.kernel.internal.SnapshotImpl;
 +import io.delta.kernel.internal.actions.Metadata;
 +import io.delta.kernel.internal.actions.Protocol;
++import io.delta.kernel.internal.util.Utils;
 +import io.delta.kernel.utils.CloseableIterator;
 +import io.delta.spark.internal.v2.adapters.SparkMetadataAdapter;
 +import io.delta.spark.internal.v2.adapters.SparkProtocolAdapter;
 +import io.delta.spark.internal.v2.snapshot.DeltaSnapshotManager;
++import io.delta.spark.internal.v2.utils.ScalaUtils;
 +import io.delta.spark.internal.v2.utils.StreamingHelper;
 +import java.util.ArrayList;
++import java.util.Collections;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Objects;
@@ -45,7 +47,7 @@
 +import org.apache.spark.sql.types.StructType;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
-+import scala.collection.JavaConverters;
++import scala.Option;
 +
 +/**
 + * Handles metadata and schema evolution for the v2 Delta streaming source.
@@ -68,7 +70,7 @@
 +  private final Engine engine;
 +  private final DeltaOptions options;
 +  private final DeltaStreamUtils.SchemaReadOptions schemaReadOptions;
-+  private final DeltaSourceMetadataTrackingLog metadataTrackingLog;
++  private final Option<DeltaSourceMetadataTrackingLog> metadataTrackingLog;
 +
 +  // Read-time state captured at source initialization
 +  private final StructType readSchemaAtSourceInit;
@@ -88,7 +90,7 @@
 +      Engine engine,
 +      DeltaOptions options,
 +      DeltaStreamUtils.SchemaReadOptions schemaReadOptions,
-+      DeltaSourceMetadataTrackingLog metadataTrackingLog,
++      Option<DeltaSourceMetadataTrackingLog> metadataTrackingLog,
 +      StructType readSchemaAtSourceInit,
 +      StructType readPartitionSchemaAtSourceInit,
 +      Map<String, String> readConfigurationsAtSourceInit,
@@ -101,26 +103,23 @@
 +    this.engine = Objects.requireNonNull(engine);
 +    this.options = Objects.requireNonNull(options);
 +    this.schemaReadOptions = Objects.requireNonNull(schemaReadOptions);
-+    this.metadataTrackingLog = metadataTrackingLog;
++    this.metadataTrackingLog = Objects.requireNonNull(metadataTrackingLog);
 +    this.readSchemaAtSourceInit = Objects.requireNonNull(readSchemaAtSourceInit);
-+    this.readPartitionSchemaAtSourceInit =
-+        Objects.requireNonNull(readPartitionSchemaAtSourceInit);
-+    this.readConfigurationsAtSourceInit =
-+        Objects.requireNonNull(readConfigurationsAtSourceInit);
++    this.readPartitionSchemaAtSourceInit = Objects.requireNonNull(readPartitionSchemaAtSourceInit);
++    this.readConfigurationsAtSourceInit = Objects.requireNonNull(readConfigurationsAtSourceInit);
 +    this.readProtocolAtSourceInit = Objects.requireNonNull(readProtocolAtSourceInit);
 +    this.metadataPath = Objects.requireNonNull(metadataPath);
 +    this.persistedMetadataAtSourceInit =
-+        metadataTrackingLog != null
-+                && metadataTrackingLog.getCurrentTrackedMetadata().isDefined()
-+            ? metadataTrackingLog.getCurrentTrackedMetadata().get()
++        metadataTrackingLog.isDefined()
++                && metadataTrackingLog.get().getCurrentTrackedMetadata().isDefined()
++            ? metadataTrackingLog.get().getCurrentTrackedMetadata().get()
 +            : null;
 +  }
 +
 +  /** Whether this source should use schema tracking for metadata evolution. */
 +  public boolean shouldTrackMetadataChange() {
-+    return !schemaReadOptions.allowUnsafeStreamingReadOnColumnMappingSchemaChanges()
-+        && metadataTrackingLog != null
-+        && metadataTrackingLog.getCurrentTrackedMetadata().isDefined();
++    return DeltaSourceMetadataEvolutionSupport$.MODULE$.shouldTrackMetadataChange(
++        schemaReadOptions, metadataTrackingLog);
 +  }
 +
 +  /**
@@ -128,10 +127,8 @@
 +   * but empty. Should only be used for the first write to the schema log.
 +   */
 +  public boolean shouldInitializeMetadataTrackingEagerly() {
-+    return !schemaReadOptions.allowUnsafeStreamingReadOnColumnMappingSchemaChanges()
-+        && metadataTrackingLog != null
-+        && metadataTrackingLog.getCurrentTrackedMetadata().isEmpty()
-+        && metadataTrackingLog.initMetadataLogEagerly();
++    return DeltaSourceMetadataEvolutionSupport$.MODULE$.shouldInitializeMetadataTrackingEagerly(
++        schemaReadOptions, metadataTrackingLog);
 +  }
 +
 +  // ---------------------------------------------------------------------------
@@ -163,22 +160,25 @@
 +        throw new RuntimeException("Failed to close file actions iterator", e);
 +      }
 +    }
-+    return io.delta.kernel.internal.util.Utils.toCloseableIterator(result.iterator());
++    return Utils.toCloseableIterator(result.iterator());
 +  }
 +
 +  /**
 +   * If the version has a metadata or protocol change compared to the current stream metadata,
-+   * return a sentinel IndexedFile at METADATA_CHANGE_INDEX to act as a barrier.
++   * return an iterator with a single sentinel IndexedFile at METADATA_CHANGE_INDEX to act as a
++   * barrier. Otherwise, return an empty iterator. The caller concatenates this into the file change
++   * stream.
 +   */
-+  public Optional<IndexedFile> getMetadataChangeBarrierIfNeeded(
++  public CloseableIterator<IndexedFile> getMetadataOrProtocolChangeIndexedFileIterator(
 +      Metadata metadata, Protocol protocol, long version) {
-+    if (!shouldTrackMetadataChange()) {
-+      return Optional.empty();
-+    }
-+    if (hasMetadataOrProtocolChangeComparedToStreamMetadata(metadata, protocol, version)) {
-+      return Optional.of(IndexedFile.sentinel(version, DeltaSourceOffset.METADATA_CHANGE_INDEX()));
++    if (shouldTrackMetadataChange()
++        && hasMetadataOrProtocolChangeComparedToStreamMetadata(metadata, protocol, version)) {
++      return Utils.toCloseableIterator(
++          Collections.singletonList(
++                  IndexedFile.sentinel(version, DeltaSourceOffset.METADATA_CHANGE_INDEX()))
++              .iterator());
 +    }
-+    return Optional.empty();
++    return Utils.toCloseableIterator(Collections.emptyIterator());
 +  }
 +
 +  /**
@@ -205,8 +205,8 @@
 +
 +    if (previousOffset.index() == DeltaSourceOffset.POST_METADATA_CHANGE_INDEX()) {
 +      // Check if schema evolution has actually occurred; if not, block.
-+      Metadata metadata = collectMetadataAtVersion(previousOffset.reservoirVersion()).orElse(null);
-+      Protocol protocol = collectProtocolAtVersion(previousOffset.reservoirVersion()).orElse(null);
++      Metadata metadata = collectMetadataAtVersion(previousOffset.reservoirVersion());
++      Protocol protocol = collectProtocolAtVersion(previousOffset.reservoirVersion());
 +      if (hasMetadataOrProtocolChangeComparedToStreamMetadata(
 +          metadata, protocol, previousOffset.reservoirVersion())) {
 +        return Optional.of(previousOffset);
@@ -233,8 +233,8 @@
 +      return;
 +    }
 +
-+    Metadata changedMetadata = collectMetadataAtVersion(offset.reservoirVersion()).orElse(null);
-+    Protocol changedProtocol = collectProtocolAtVersion(offset.reservoirVersion()).orElse(null);
++    Metadata changedMetadata = collectMetadataAtVersion(offset.reservoirVersion());
++    Protocol changedProtocol = collectProtocolAtVersion(offset.reservoirVersion());
 +
 +    updateMetadataTrackingLogAndFailIfNeeded(
 +        changedMetadata, changedProtocol, offset.reservoirVersion(), /* replace= */ false);
@@ -251,12 +251,10 @@
 +      return;
 +    }
 +
-+    Metadata metadataToUse =
-+        changedMetadata != null ? changedMetadata : getMetadataAtSourceInit();
-+    Protocol protocolToUse =
-+        changedProtocol != null ? changedProtocol : readProtocolAtSourceInit;
++    Metadata metadataToUse = changedMetadata != null ? changedMetadata : getMetadataAtSourceInit();
++    Protocol protocolToUse = changedProtocol != null ? changedProtocol : readProtocolAtSourceInit;
 +
-+    assert metadataTrackingLog != null
++    assert metadataTrackingLog.isDefined()
 +        : "Metadata tracking log must be present to update metadata.";
 +
 +    PersistedMetadata schemaToPersist =
@@ -268,9 +266,9 @@
 +            metadataPath);
 +
 +    if (replace) {
-+      metadataTrackingLog.writeNewMetadata(schemaToPersist, true);
++      metadataTrackingLog.get().writeNewMetadata(schemaToPersist, true);
 +    } else {
-+      metadataTrackingLog.writeNewMetadata(schemaToPersist, false);
++      metadataTrackingLog.get().writeNewMetadata(schemaToPersist, false);
 +    }
 +
 +    throw (RuntimeException)
@@ -285,8 +283,8 @@
 +  // ---------------------------------------------------------------------------
 +
 +  /**
-+   * Initialize the metadata tracking log on the first batch. Validates that the metadata across
-+   * the version range is safe to read, then writes the initial entry.
++   * Initialize the metadata tracking log on the first batch. Validates that the metadata across the
++   * version range is safe to read, then writes the initial entry.
 +   *
 +   * @param batchStartVersion start version of the batch
 +   * @param batchEndVersion optional end version (for constructed batches with existing end offset)
@@ -312,7 +310,7 @@
 +      protocol = snapshot.getProtocol();
 +    }
 +
-+    assert metadataTrackingLog != null
++    assert metadataTrackingLog.isDefined()
 +        : "Metadata tracking log must be present to initialize metadata tracking.";
 +
 +    PersistedMetadata newMetadata =
@@ -322,7 +320,7 @@
 +            new SparkMetadataAdapter(metadata),
 +            new SparkProtocolAdapter(protocol),
 +            metadataPath);
-+    metadataTrackingLog.writeNewMetadata(newMetadata, false);
++    metadataTrackingLog.get().writeNewMetadata(newMetadata, false);
 +
 +    if (hasMetadataOrProtocolChangeComparedToStreamMetadata(metadata, protocol, version)
 +        || alwaysFailUponLogInitialized) {
@@ -338,20 +336,17 @@
 +  // Private helpers
 +  // ---------------------------------------------------------------------------
 +
-+  /**
-+   * Delegates to the shared static method in {@code DeltaSourceMetadataEvolutionSupport}.
-+   */
++  /** Delegates to the shared static method in {@code DeltaSourceMetadataEvolutionSupport}. */
 +  private boolean hasMetadataOrProtocolChangeComparedToStreamMetadata(
 +      Metadata newMetadata, Protocol newProtocol, long newSchemaVersion) {
-+    scala.Option<SparkMetadataAdapter> metadataOpt = newMetadata != null
-+        ? scala.Option.apply(new SparkMetadataAdapter(newMetadata))
-+        : scala.Option.empty();
-+    scala.Option<SparkProtocolAdapter> protocolOpt = newProtocol != null
-+        ? scala.Option.apply(new SparkProtocolAdapter(newProtocol))
-+        : scala.Option.empty();
-+    scala.Option<PersistedMetadata> persistedOpt = persistedMetadataAtSourceInit != null
-+        ? scala.Option.apply(persistedMetadataAtSourceInit)
-+        : scala.Option.empty();
++    Option<SparkMetadataAdapter> metadataOpt =
++        newMetadata != null ? Option.apply(new SparkMetadataAdapter(newMetadata)) : Option.empty();
++    Option<SparkProtocolAdapter> protocolOpt =
++        newProtocol != null ? Option.apply(new SparkProtocolAdapter(newProtocol)) : Option.empty();
++    Option<PersistedMetadata> persistedOpt =
++        persistedMetadataAtSourceInit != null
++            ? Option.apply(persistedMetadataAtSourceInit)
++            : Option.empty();
 +
 +    return DeltaSourceMetadataEvolutionSupport$.MODULE$
 +        .hasMetadataOrProtocolChangeComparedToStreamMetadata(
@@ -362,59 +357,79 @@
 +            new SparkProtocolAdapter(readProtocolAtSourceInit),
 +            readSchemaAtSourceInit,
 +            readPartitionSchemaAtSourceInit,
-+            JavaConverters.mapAsScalaMap(readConfigurationsAtSourceInit).toMap(
-+                scala.Predef.<scala.Tuple2<String, String>>conforms()),
++            ScalaUtils.toScalaMap(readConfigurationsAtSourceInit),
 +            spark);
 +  }
 +
-+  /** Collect the metadata action at a specific version using Kernel APIs. */
-+  private Optional<Metadata> collectMetadataAtVersion(long version) {
-+    Map<Long, Metadata> metadataMap =
-+        StreamingHelper.collectMetadataActionsFromRangeUnsafe(
-+            version, Optional.of(version + 1), snapshotManager, engine, tablePath);
-+    return Optional.ofNullable(metadataMap.get(version));
++  /** Collect the metadata action at a specific version. Returns null if none. */
++  private Metadata collectMetadataAtVersion(long version) {
++    return collectMetadataActions(version, version).get(version);
 +  }
 +
-+  /** Collect the protocol action at a specific version using Kernel APIs. */
-+  private Optional<Protocol> collectProtocolAtVersion(long version) {
-+    // TODO: add protocol collection to StreamingHelper similar to metadata collection
-+    // For now, load the snapshot at this version and use its protocol
-+    SnapshotImpl snapshot = (SnapshotImpl) snapshotManager.loadSnapshotAt(version);
-+    return Optional.of(snapshot.getProtocol());
++  /** Collect all metadata actions between start and end version, both inclusive. */
++  private Map<Long, Metadata> collectMetadataActions(long startVersion, long endVersion) {
++    return StreamingHelper.collectMetadataActionsFromRangeUnsafe(
++        startVersion, Optional.of(endVersion + 1), snapshotManager, engine, tablePath);
++  }
++
++  /** Collect the protocol action at a specific version. Returns null if none. */
++  private Protocol collectProtocolAtVersion(long version) {
++    return collectProtocolActions(version, version).get(version);
++  }
++
++  /** Collect all protocol actions between start and end version, both inclusive. */
++  private Map<Long, Protocol> collectProtocolActions(long startVersion, long endVersion) {
++    return StreamingHelper.collectProtocolActionsFromRangeUnsafe(
++        startVersion, Optional.of(endVersion + 1), snapshotManager, engine, tablePath);
 +  }
 +
 +  /**
-+   * Validate metadata changes across a version range for log initialization. Ensures no
-+   * read-incompatible schema changes exist between start and end versions.
++   * Given the version range for an ALREADY fetched batch, check if there are any read-incompatible
++   * schema changes or protocol changes.
++   *
++   * <p>Try to find rename or drop columns in between, or nullability/datatype changes by using the
++   * last schema as the read schema. If so, we cannot find a good read schema. Otherwise, the most
++   * recent metadata change will be the most encompassing schema as well.
++   *
++   * <p>For protocols, walk through changes and ensure each is a superset of the previous. If not,
++   * we cannot find a safe protocol.
 +   */
 +  private ValidatedMetadataAndProtocol validateAndResolveMetadataForLogInitialization(
 +      long startVersion, long endVersion) {
-+    // Collect all metadata changes in the range
-+    Map<Long, Metadata> metadataChanges =
-+        StreamingHelper.collectMetadataActionsFromRangeUnsafe(
-+            startVersion, Optional.of(endVersion + 1), snapshotManager, engine, tablePath);
-+
++    List<Metadata> metadataChanges = new ArrayList<>(
++        collectMetadataActions(startVersion, endVersion).values());
 +    SnapshotImpl startSnapshot = (SnapshotImpl) snapshotManager.loadSnapshotAt(startVersion);
++    Metadata startMetadata = startSnapshot.getMetadata();
 +
-+    // Use the most recent metadata change if any, otherwise fall back to start snapshot
-+    Metadata resolvedMetadata;
-+    if (!metadataChanges.isEmpty()) {
-+      // Get the last entry (highest version) from the ordered map
-+      Metadata lastMetadata = null;
-+      for (Metadata m : metadataChanges.values()) {
-+        lastMetadata = m;
++    // Try to use the most recent metadata change as the read schema and validate all prior
++    // schemas are read-compatible against it.
++    Metadata mostRecentMetadataChange = metadataChanges.isEmpty()
++        ? null : metadataChanges.get(metadataChanges.size() - 1);
++    if (mostRecentMetadataChange != null) {
++      // Validate startMetadata + all intermediate changes against the most recent
++      List<Metadata> otherMetadataChanges = new ArrayList<>();
++      otherMetadataChanges.add(startMetadata);
++      otherMetadataChanges.addAll(metadataChanges.subList(0, metadataChanges.size() - 1));
++      for (Metadata prior : otherMetadataChanges) {
++        // TODO: add column mapping schema change validation (hasNoColumnMappingSchemaChanges)
++        // and read-compatibility validation (SchemaUtils.isReadCompatible) using Kernel APIs
++        // For now, this is a placeholder matching the v1 structure.
 +      }
-+      // TODO: validate read-compatibility across all intermediate metadata changes
-+      // similar to v1's validateAndResolveMetadataForLogInitialization
-+      resolvedMetadata = lastMetadata;
-+    } else {
-+      resolvedMetadata = startSnapshot.getMetadata();
 +    }
 +
-+    // Use the start snapshot's protocol as the base — protocol validation is simpler
-+    Protocol resolvedProtocol = startSnapshot.getProtocol();
++    // Check protocol changes and use the most supportive protocol
++    Protocol mostSupportiveProtocol = startSnapshot.getProtocol();
++    List<Protocol> protocolChanges = new ArrayList<>(
++        collectProtocolActions(startVersion, endVersion).values());
++    for (Protocol p : protocolChanges) {
++      // TODO: add readerAndWriterFeatureNames subset check using Kernel Protocol APIs
++      // similar to v1's mostSupportiveProtocol.readerAndWriterFeatureNames.subsetOf(...)
++      mostSupportiveProtocol = p;
++    }
 +
-+    return new ValidatedMetadataAndProtocol(resolvedMetadata, resolvedProtocol);
++    Metadata resolvedMetadata = mostRecentMetadataChange != null
++        ? mostRecentMetadataChange : startMetadata;
++    return new ValidatedMetadataAndProtocol(resolvedMetadata, mostSupportiveProtocol);
 +  }
 +
 +  private Metadata getMetadataAtSourceInit() {
spark/v2/src/main/java/io/delta/spark/internal/v2/utils/StreamingHelper.java
@@ -0,0 +1,91 @@
+diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/utils/StreamingHelper.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/utils/StreamingHelper.java
+--- a/spark/v2/src/main/java/io/delta/spark/internal/v2/utils/StreamingHelper.java
++++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/utils/StreamingHelper.java
+ import io.delta.kernel.internal.actions.AddFile;
+ import io.delta.kernel.internal.actions.CommitInfo;
+ import io.delta.kernel.internal.actions.Metadata;
++import io.delta.kernel.internal.actions.Protocol;
+ import io.delta.kernel.internal.actions.RemoveFile;
+ import io.delta.kernel.internal.commitrange.CommitRangeImpl;
+ import io.delta.kernel.internal.data.StructRow;
+     return Optional.ofNullable(metadata);
+   }
+ 
++  /** Get Protocol action from a batch at the specified row, if present. */
++  public static Optional<Protocol> getProtocol(ColumnarBatch batch, int rowId) {
++    int protocolIdx = getFieldIndex(batch, DeltaLogActionUtils.DeltaAction.PROTOCOL.colName);
++    ColumnVector protocolVector = batch.getColumnVector(protocolIdx);
++    Protocol protocol = Protocol.fromColumnVector(protocolVector, rowId);
++
++    return Optional.ofNullable(protocol);
++  }
++
+   /**
+    * Gets commit-level actions from a commit range without requiring a snapshot at the exact start
+    * version.
+     return versionToMetadata;
+   }
+ 
++  /**
++   * Collects protocol actions from a commit range, mapping each version to its protocol.
++   *
++   * <p>This method mirrors {@link #collectMetadataActionsFromRangeUnsafe} but for protocol actions.
++   *
++   * <p>Returns a map preserving version order (via LinkedHashMap) where each version maps to its
++   * protocol action. Throws an exception if multiple protocol actions are found in the same commit.
++   *
++   * @param startVersion the starting version (inclusive) of the commit range
++   * @param endVersionOpt optional ending version (exclusive) of the commit range
++   * @param snapshotManager the Delta snapshot manager
++   * @param engine the Delta engine
++   * @param tablePath the path to the Delta table
++   * @return a map from version number to protocol action, in version order
++   */
++  public static Map<Long, Protocol> collectProtocolActionsFromRangeUnsafe(
++      long startVersion,
++      Optional<Long> endVersionOpt,
++      DeltaSnapshotManager snapshotManager,
++      Engine engine,
++      String tablePath) {
++    CommitRangeImpl commitRange =
++        (CommitRangeImpl) snapshotManager.getTableChanges(engine, startVersion, endVersionOpt);
++    Map<Long, Protocol> versionToProtocol = new LinkedHashMap<>();
++
++    try (CloseableIterator<CommitActions> commitsIter =
++        getCommitActionsFromRangeUnsafe(
++            engine, commitRange, tablePath, Set.of(DeltaLogActionUtils.DeltaAction.PROTOCOL))) {
++      while (commitsIter.hasNext()) {
++        try (CommitActions commit = commitsIter.next()) {
++          long version = commit.getVersion();
++          try (CloseableIterator<ColumnarBatch> actionsIter = commit.getActions()) {
++            while (actionsIter.hasNext()) {
++              ColumnarBatch batch = actionsIter.next();
++              int numRows = batch.getSize();
++              for (int rowId = 0; rowId < numRows; rowId++) {
++                Optional<Protocol> protocolOpt = StreamingHelper.getProtocol(batch, rowId);
++                if (protocolOpt.isPresent()) {
++                  Protocol existing = versionToProtocol.putIfAbsent(version, protocolOpt.get());
++                  Preconditions.checkArgument(
++                      existing == null,
++                      "Should not encounter two protocol actions in the same commit of version %d",
++                      version);
++                }
++              }
++            }
++          } catch (IOException e) {
++            throw new RuntimeException("Failed to process commit at version " + version, e);
++          }
++        }
++      }
++    } catch (RuntimeException e) {
++      throw e;
++    } catch (Exception e) {
++      throw new RuntimeException("Failed to process commits", e);
++    }
++
++    return versionToProtocol;
++  }
++
+   /** Get explicit CDC file (AddCDCFile) from a batch at the specified row, if present. */
+   public static Optional<CDCDataFile> getCDCFile(
+       ColumnarBatch batch, int rowId, long commitTimestamp) {
\ No newline at end of file

Reproduce locally: git range-diff 0c97c0f..e85d7bd 0c97c0f..b737b1d | Disable: git config gitstack.push-range-diff false

@PorridgeSwim PorridgeSwim force-pushed the stack/MetadataEvolutionHandler branch from b737b1d to ab1198e Compare April 13, 2026 21:42
@PorridgeSwim PorridgeSwim deleted the stack/MetadataEvolutionHandler branch April 14, 2026 01:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant