Skip to content

[kernel-spark] Support schema tracking log in v2 analysis stage#6570

Merged
murali-db merged 1 commit into
delta-io:masterfrom
PorridgeSwim:stack/NonAdditiveSchemaEvolution2
May 16, 2026
Merged

[kernel-spark] Support schema tracking log in v2 analysis stage#6570
murali-db merged 1 commit into
delta-io:masterfrom
PorridgeSwim:stack/NonAdditiveSchemaEvolution2

Conversation

@PorridgeSwim
Copy link
Copy Markdown
Collaborator

@PorridgeSwim PorridgeSwim commented Apr 14, 2026

🥞 Stacked PR

Use this link to review incremental changes.


Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

PR 5/7 in the non-additive schema evolution for V2 streaming connector stack.

Wire schema tracking into V2's analysis path so the analyzed plan reflects the persisted (evolved) schema instead of the live snapshot schema.

  • DeltaAnalysis.verifyDeltaSourceSchemaLocation: extend the duplicate-schema-location check to also visit StreamingRelationV2, keyed on the V2 Table.name.
  • SparkTable: open DeltaSourceMetadataTrackingLog once during construction (gated on mergeConsecutiveSchemaChanges) and seed SchemaProvider from the persisted metadata, so analysis-time schema() matches what the stream will read at runtime.
  • ApplyV2ReadOptions (renamed from ApplyV2Streaming): generalize the CDC-only rebuild to also fire when schemaTrackingLocation arrives via extraOptions on the catalog readStream.table() path; rebuild SparkTable with merged options so the schema-log lookup actually fires.
  • MetadataEvolutionHandler.getMetadataTrackingLogForMicroBatchStream: V2 port of V1's helper, reused by SparkTable (analysis) and SparkScan (execution).

How was this patch tested?

SparkTableTest, MetadataEvolutionHandlerTest, ApplyV2ReadOptionsSuite. Unified DeltaV2SourceSchemaEvolutionSuite updated.

Does this PR introduce any user-facing changes?

No.

@PorridgeSwim
Copy link
Copy Markdown
Collaborator Author

Range-diff: stack/MetadataEvolutionHandler2 (8b93ccd -> 3e25083)
spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java
@@ -1,6 +1,34 @@
 diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java
 --- a/spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java
 +++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java
+ import io.delta.kernel.internal.actions.AddFile;
+ import io.delta.kernel.internal.actions.CommitInfo;
+ import io.delta.kernel.internal.actions.Metadata;
++import io.delta.kernel.internal.actions.Protocol;
+ import io.delta.kernel.internal.actions.RemoveFile;
++import io.delta.kernel.internal.checksum.CRCInfo;
++import io.delta.kernel.internal.lang.Lazy;
++import io.delta.kernel.internal.metrics.SnapshotQueryContext;
++import io.delta.kernel.internal.replay.LogReplay;
+ import io.delta.kernel.internal.util.ColumnMapping;
+ import io.delta.kernel.internal.util.ColumnMapping.ColumnMappingMode;
+ import io.delta.kernel.internal.util.Preconditions;
+ import io.delta.kernel.internal.util.Utils;
+ import io.delta.kernel.internal.util.VectorUtils;
++import io.delta.kernel.types.StringType;
+ import io.delta.kernel.utils.CloseableIterator;
++import io.delta.spark.internal.v2.adapters.SparkMetadataAdapter;
+ import io.delta.spark.internal.v2.snapshot.DeltaSnapshotManager;
+ import io.delta.spark.internal.v2.utils.PartitionUtils;
+ import io.delta.spark.internal.v2.utils.ScalaUtils;
+ import org.apache.spark.sql.connector.read.InputPartition;
+ import org.apache.spark.sql.connector.read.PartitionReaderFactory;
+ import org.apache.spark.sql.connector.read.streaming.*;
+-import org.apache.spark.sql.delta.DeltaColumnMapping;
++import org.apache.spark.sql.delta.DeltaColumnMapping$;
+ import org.apache.spark.sql.delta.DeltaErrors;
+ import org.apache.spark.sql.delta.DeltaOptions;
+ import org.apache.spark.sql.delta.DeltaStartingVersion;
  import org.apache.spark.sql.delta.TypeWidening;
  import org.apache.spark.sql.delta.sources.DeltaSQLConf;
  import org.apache.spark.sql.delta.sources.DeltaSource;
@@ -23,6 +51,17 @@
  
    private final Engine engine;
    private final DeltaSnapshotManager snapshotManager;
+   private final boolean skipChangeCommits;
+   private final SnapshotImpl snapshotAtSourceInit;
+   private final String tableId;
++  private final SnapshotImpl readSnapshotAtSourceInit;
+   private final StructType readSchemaAtSourceInit;
++  private final StructType readPartitionSchemaAtSourceInit;
++  private final Protocol readProtocolAtSourceInit;
++  private final Map<String, String> readConfigurationsAtSourceInit;
+   private final boolean shouldValidateOffsets;
+   private final Optional<Regex> excludeRegex;
+   private final SparkSession spark;
    private final SQLConf sqlConf;
    private final scala.collection.immutable.Map<String, String> scalaOptions;
  
@@ -46,26 +85,29 @@
      this.snapshotAtSourceInit = (SnapshotImpl) snapshotAtSourceInit;
      this.tableId = this.snapshotAtSourceInit.getMetadata().getId();
 -    // TODO(#5319): schema tracking for non-additive schema changes
--    this.readSchemaAtSourceInit =
--        Objects.requireNonNull(
--            SchemaUtils.convertKernelSchemaToSparkSchema(snapshotAtSourceInit.getSchema()),
--            "readSchemaAtSourceInit is null");
 +
-+    // When the metadata tracking log has a persisted schema, use it as the read schema
-+    // instead of the snapshot schema. This mirrors v1's readSnapshotDescriptor override.
-+    Option<PersistedMetadata> persistedMetadata =
++    // The effective snapshot for reading, mirroring v1's readSnapshotDescriptor.
++    // When schema tracking has persisted metadata, load the snapshot at that version so its
++    // schema/protocol/config reflect the evolved state. Otherwise same as snapshotAtSourceInit.
++    Option<PersistedMetadata> persistedMetadataAtSourceInit =
 +        metadataTrackingLog.isDefined()
 +            ? metadataTrackingLog.get().getCurrentTrackedMetadata()
 +            : Option.empty();
-+    if (persistedMetadata.isDefined()) {
-+      this.readSchemaAtSourceInit =
-+          Objects.requireNonNull(persistedMetadata.get().dataSchema(), "persisted dataSchema");
++    if (persistedMetadataAtSourceInit.isDefined()) {
++      this.readSnapshotAtSourceInit =
++          buildReadSnapshotFromPersistedMetadata(persistedMetadataAtSourceInit.get());
 +    } else {
-+      this.readSchemaAtSourceInit =
-+          Objects.requireNonNull(
-+              SchemaUtils.convertKernelSchemaToSparkSchema(snapshotAtSourceInit.getSchema()),
-+              "readSchemaAtSourceInit is null");
++      this.readSnapshotAtSourceInit = this.snapshotAtSourceInit;
 +    }
+     this.readSchemaAtSourceInit =
+-        Objects.requireNonNull(
+-            SchemaUtils.convertKernelSchemaToSparkSchema(snapshotAtSourceInit.getSchema()),
+-            "readSchemaAtSourceInit is null");
++        SchemaUtils.convertKernelSchemaToSparkSchema(readSnapshotAtSourceInit.getSchema());
++    this.readPartitionSchemaAtSourceInit =
++        new SparkMetadataAdapter(readSnapshotAtSourceInit.getMetadata()).partitionSchema();
++    this.readProtocolAtSourceInit = readSnapshotAtSourceInit.getProtocol();
++    this.readConfigurationsAtSourceInit = readSnapshotAtSourceInit.getMetadata().getConfiguration();
 +
      this.shouldValidateOffsets =
          Objects.requireNonNull(
@@ -74,20 +116,6 @@
                  spark, isStreamingFromColumnMappingTable, isTypeWideningSupportedInProtocol),
              "schemaReadOptions is null");
 +
-+    // Build the read-time state from either persisted metadata or the snapshot.
-+    // This mirrors v1's readSnapshotDescriptor which uses persisted metadata when available.
-+    StructType readPartitionSchemaAtSourceInit =
-+        persistedMetadata.isDefined()
-+            ? persistedMetadata.get().partitionSchema()
-+            : partitionSchema;
-+    Map<String, String> readConfigurationsAtSourceInit =
-+        persistedMetadata.isDefined()
-+            ? ScalaUtils.toJavaMap(
-+                persistedMetadata.get().tableConfigurations().getOrElse(
-+                    () -> ScalaUtils.toScalaMap(
-+                        this.snapshotAtSourceInit.getMetadata().getConfiguration())))
-+            : this.snapshotAtSourceInit.getMetadata().getConfiguration();
-+
 +    this.metadataEvolutionHandler =
 +        new MetadataEvolutionHandler(
 +            spark,
@@ -98,7 +126,7 @@
 +            options,
 +            schemaReadOptions,
 +            metadataTrackingLog,
-+            readMetadataAtSourceInit,
++            readSnapshotAtSourceInit.getMetadata(),
 +            readSchemaAtSourceInit,
 +            readPartitionSchemaAtSourceInit,
 +            readConfigurationsAtSourceInit,
@@ -108,6 +136,82 @@
      validateSchemaCompatibilityOnStartup(dataSchema, partitionSchema, readSchemaAtSourceInit);
    }
  
+     isTriggerAvailableNow = true;
+   }
+ 
++  private SnapshotImpl buildReadSnapshotFromPersistedMetadata(PersistedMetadata customMetadata) {
++    Metadata sourceMetadata = snapshotAtSourceInit.getMetadata();
++
++    Map<String, String> readConfigurations;
++    if (customMetadata.tableConfigurations().isDefined()) {
++      readConfigurations = ScalaUtils.toJavaMap(customMetadata.tableConfigurations().get());
++    } else {
++      readConfigurations = sourceMetadata.getConfiguration();
++      logger.warn("Using snapshot's table configuration: {}", readConfigurations);
++    }
++
++    Metadata readMetadata =
++        new Metadata(
++            sourceMetadata.getId(),
++            sourceMetadata.getName(),
++            sourceMetadata.getDescription(),
++            sourceMetadata.getFormat(),
++            customMetadata.dataSchemaJson(),
++            SchemaUtils.convertSparkSchemaToKernelSchema(customMetadata.dataSchema()),
++            VectorUtils.buildArrayValue(
++                Arrays.asList(customMetadata.partitionSchema().fieldNames()), StringType.STRING),
++            sourceMetadata.getCreatedTime(),
++            VectorUtils.stringStringMapValue(readConfigurations));
++
++    Protocol readProtocol;
++    if (customMetadata.protocol().isDefined()) {
++      readProtocol = toKernelProtocol(customMetadata.protocol().get());
++    } else {
++      readProtocol = snapshotAtSourceInit.getProtocol();
++      logger.warn("Using snapshot's protocol: {}", readProtocol);
++    }
++
++    Lazy<Optional<CRCInfo>> lazyCrcInfo = new Lazy<>(snapshotAtSourceInit::getCurrentCrcInfo);
++    LogReplay logReplay =
++        new LogReplay(
++            engine,
++            snapshotAtSourceInit.getDataPath(),
++            snapshotAtSourceInit.getLazyLogSegment(),
++            lazyCrcInfo);
++
++    return new SnapshotImpl(
++        snapshotAtSourceInit.getDataPath(),
++        customMetadata.deltaCommitVersion(),
++        snapshotAtSourceInit.getLazyLogSegment(),
++        logReplay,
++        readProtocol,
++        readMetadata,
++        snapshotAtSourceInit.getCommitter(),
++        SnapshotQueryContext.forVersionSnapshot(
++            snapshotAtSourceInit.getDataPath().toString(), customMetadata.deltaCommitVersion()),
++        Optional.empty() /* inCommitTimestampOpt */);
++  }
++
++  private static Protocol toKernelProtocol(
++      org.apache.spark.sql.delta.actions.Protocol sparkProtocol) {
++    Set<String> readerFeatures =
++        sparkProtocol.getReaderFeatures() == null
++            ? Collections.emptySet()
++            : new HashSet<>(sparkProtocol.getReaderFeatures());
++    Set<String> writerFeatures =
++        sparkProtocol.getWriterFeatures() == null
++            ? Collections.emptySet()
++            : new HashSet<>(sparkProtocol.getWriterFeatures());
++    return new Protocol(
++        sparkProtocol.getMinReaderVersion(),
++        sparkProtocol.getMinWriterVersion(),
++        readerFeatures,
++        writerFeatures);
++  }
++
+   /**
+    * initialize the internal states for AvailableNow if this method is called first time after
+    * prepareForTriggerAvailableNow.
        DeltaSourceOffset previousOffset,
        Optional<DeltaSource.AdmissionLimits> limits,
        boolean isFirstBatch) {
@@ -116,7 +220,9 @@
 +    // This mirrors v1's getStartingOffsetFromSpecificDeltaVersion eager init path.
 +    if (isFirstBatch && metadataEvolutionHandler.shouldInitializeMetadataTrackingEagerly()) {
 +      metadataEvolutionHandler.initializeMetadataTrackingAndExitStream(
-+          previousOffset.reservoirVersion(), /* batchEndVersion= */ null, /* alwaysFailUponLogInitialized= */ false);
++          previousOffset.reservoirVersion(),
++          /* batchEndVersion= */ null,
++          /* alwaysFailUponLogInitialized= */ false);
 +    }
 +
 +    // Handle pending schema change offsets (two-barrier protocol).
@@ -143,6 +249,21 @@
      try (CloseableIterator<IndexedFile> fileChanges =
  
    @Override
+   public PartitionReaderFactory createReaderFactory() {
++    // Use readSnapshotAtSourceInit (which has the evolved schema when schema tracking is active),
++    // mirroring v1's use of readSnapshotDescriptor in createDataFrame.
+     return PartitionUtils.createDeltaParquetReaderFactory(
+-        snapshotAtSourceInit,
+-        dataSchema,
+-        partitionSchema,
++        readSnapshotAtSourceInit,
++        readSchemaAtSourceInit,
++        readPartitionSchemaAtSourceInit,
+         readDataSchema,
+         dataFilters,
+         scalaOptions,
+ 
+   @Override
    public void commit(Offset end) {
 -    // TODO(#5319): update metadata tracking log.
 +    // IMPORTANT: for future developers, please place any work you would like to do in commit()
@@ -158,7 +279,8 @@
 -    // TODO(#5318): Stop at schema change barriers
 -    return changes;
 +    // Stop before any schema change barrier if detected.
-+    // Mirrors v1's stopIndexedFileIteratorAtSchemaChangeBarrier call in getFileChangesWithRateLimit.
++    // Mirrors v1's stopIndexedFileIteratorAtSchemaChangeBarrier call in
++    // getFileChangesWithRateLimit.
 +    return metadataEvolutionHandler.stopIndexedFileIteratorAtSchemaChangeBarrier(changes);
    }
  
@@ -292,6 +414,19 @@
    }
  
    /**
+               .toSeq();
+     }
+ 
+-    checkNonAdditiveSchemaChanges(
+-        oldMetadata,
+-        newMetadata,
+-        oldPartitionColumns,
+-        newPartitionColumns,
+-        validatedDuringStreamStart);
++    checkNonAdditiveSchemaChanges(oldMetadata, newMetadata, validatedDuringStreamStart);
+ 
+     // Other standard read compatibility changes
+     if (!validatedDuringStreamStart
      }
    }
  
@@ -304,17 +439,50 @@
 +  // When schema tracking is enabled, this method is NOT called — the MetadataEvolutionHandler
 +  // manages these changes through the barrier protocol instead.
    private void checkNonAdditiveSchemaChanges(
-       Metadata oldMetadata,
-       Metadata newMetadata,
-                 oldPartitionColumns,
-                 /* isBothColumnMappingEnabled */ true);
-       } else if (oldMode == NONE && newMode != NONE) {
+-      Metadata oldMetadata,
+-      Metadata newMetadata,
+-      Seq<String> oldPartitionColumns,
+-      Seq<String> newPartitionColumns,
+-      boolean validatedDuringStreamStart) {
++      Metadata oldMetadata, Metadata newMetadata, boolean validatedDuringStreamStart) {
+     StructType sparkNewSchema =
+         SchemaUtils.convertKernelSchemaToSparkSchema(newMetadata.getSchema());
+     StructType sparkOldSchema =
+     } else if (schemaReadOptions.allowUnsafeStreamingReadOnColumnMappingSchemaChanges()) {
+       shouldTrackSchema = false;
+     } else {
+-      ColumnMappingMode NONE = ColumnMappingMode.NONE;
+-      ColumnMappingMode oldMode =
+-          ColumnMapping.getColumnMappingMode(oldMetadata.getConfiguration());
+-      ColumnMappingMode newMode =
+-          ColumnMapping.getColumnMappingMode(newMetadata.getConfiguration());
+-      if (oldMode != NONE && newMode != NONE) {
+-        Preconditions.checkArgument(oldMode == newMode, "changing mode is not supported");
+-        shouldTrackSchema =
+-            DeltaColumnMapping.hasColMappingOrPartitionSchemaChange(
+-                sparkNewSchema,
+-                sparkOldSchema,
+-                newPartitionColumns,
+-                oldPartitionColumns,
+-                /* isBothColumnMappingEnabled */ true);
+-      } else if (oldMode == NONE && newMode != NONE) {
 -        // TODO(#5319): We should disallow user to upgrade column mapping mode for now since we
 -        // don't support schema tracking
-+        // Column mapping mode upgrade requires schema tracking.
-         shouldTrackSchema = true;
-       } else {
-         // Prohibit reading across a downgrade.
+-        shouldTrackSchema = true;
+-      } else {
+-        // Prohibit reading across a downgrade.
+-        shouldTrackSchema = oldMode != NONE && newMode == NONE;
+-      }
++      // Delegate to the shared utility which handles all column mapping mode transitions
++      // (both enabled, upgrade from none, downgrade to none).
++      shouldTrackSchema =
++          !DeltaColumnMapping$.MODULE$.hasNoColumnMappingSchemaChanges(
++              new SparkMetadataAdapter(newMetadata),
++              new SparkMetadataAdapter(oldMetadata),
++              schemaReadOptions.allowUnsafeStreamingReadOnPartitionColumnChanges());
+     }
+ 
+     if (shouldTrackSchema) {
      }
    }
  
@@ -334,14 +502,11 @@
 +    // For eager initialization, we initialize the log right now.
 +    if (metadataEvolutionHandler.shouldInitializeMetadataTrackingEagerly()) {
 +      metadataEvolutionHandler.initializeMetadataTrackingAndExitStream(
-+          startVersion,
-+          endVersionForMetadataLogInit,
-+          /* alwaysFailUponLogInitialized= */ false);
++          startVersion, endVersionForMetadataLogInit, /* alwaysFailUponLogInitialized= */ false);
 +    }
 +
 +    // Check for column mapping + streaming incompatible schema changes
-+    checkReadIncompatibleSchemaChangeOnStreamStartOnce(
-+        startVersion, endVersionForMetadataLogInit);
++    checkReadIncompatibleSchemaChangeOnStreamStartOnce(startVersion, endVersionForMetadataLogInit);
 +  }
 +
    /**
@@ -357,6 +522,24 @@
  
      if (hasCheckedReadIncompatibleSchemaChangesOnStreamStart) return;
  
+ 
+   /** Loads snapshot files at the specified version. */
+   private InitialSnapshotCache loadAndValidateSnapshot(long version) {
+-    Snapshot snapshot = snapshotManager.loadSnapshotAt(version);
++    SnapshotImpl snapshot = (SnapshotImpl) snapshotManager.loadSnapshotAt(version);
++    // If schema tracking is already active and the initial snapshot has advanced since the tracked
++    // read snapshot, replace the tracked metadata/protocol before reading snapshot files.
++    if (metadataEvolutionHandler.shouldTrackMetadataChange()
++        && snapshot.getVersion() >= readSnapshotAtSourceInit.getVersion()) {
++      metadataEvolutionHandler.updateMetadataTrackingLogAndFailIfNeeded(
++          snapshot.getMetadata(),
++          snapshot.getProtocol(),
++          snapshot.getVersion(),
++          /* replace= */ true);
++    }
+     long commitTimestamp = snapshot.getTimestamp(engine);
+ 
+     Scan scan = snapshot.getScanBuilder().build();
          existing == null,
          "Should not encounter two metadata actions in the same commit of version %d",
          version);
spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkScan.java
@@ -52,16 +52,13 @@
 +    // checks. SparkScan's initialSnapshot is from analysis time and may be stale by stream
 +    // start/restart.
 +    // Matches V1's DeltaDataSource.createSource() behavior.
-+    Snapshot freshSnapshot = snapshotManager.loadLatestSnapshot();
++    Snapshot latestSnapshot = snapshotManager.loadLatestSnapshot();
++    SparkSession spark = SparkSession.active();
 +
 +    // Create metadata tracking log for non-additive schema evolution support.
 +    // Mirrors V1's DeltaDataSource.getMetadataTrackingLogForDeltaSource().
 +    Option<DeltaSourceMetadataTrackingLog> metadataTrackingLog =
-+        getMetadataTrackingLogForMicroBatchStream(
-+            SparkSession.active(),
-+            freshSnapshot,
-+            ScalaUtils.toScalaMap(options),
-+            scala.Option.apply(checkpointLocation));
++        getMetadataTrackingLogForMicroBatchStream(spark, latestSnapshot, checkpointLocation);
 +
      return new SparkMicroBatchStream(
          snapshotManager,
@@ -70,10 +67,13 @@
 -        // start/restart.
 -        // Matches V1's DeltaDataSource.createSource() behavior.
 -        snapshotManager.loadLatestSnapshot(),
-+        freshSnapshot,
++        latestSnapshot,
          hadoopConf,
-         SparkSession.active(),
+-        SparkSession.active(),
++        spark,
          deltaOptions,
+         getTablePath(),
+         dataSchema,
          partitionSchema,
          readDataSchema,
          dataFilters != null ? dataFilters : new Filter[0],
@@ -84,17 +84,15 @@
 +  }
 +
 +  /**
-+   * Create a metadata tracking log for the Delta streaming source, if a schema tracking location
-+   * is configured. Mirrors V1's {@code DeltaDataSource.getMetadataTrackingLogForDeltaSource()}.
++   * Create a metadata tracking log for the Delta streaming source, if a schema tracking location is
++   * configured. Mirrors V1's {@code DeltaDataSource.getMetadataTrackingLogForDeltaSource()}.
++   *
++   * <p>All Kernel/Scala type conversions are handled internally — the caller passes plain Java
++   * types only.
 +   */
 +  private Option<DeltaSourceMetadataTrackingLog> getMetadataTrackingLogForMicroBatchStream(
-+      SparkSession spark,
-+      Snapshot sourceSnapshot,
-+      scala.collection.immutable.Map<String, String> parameters,
-+      Option<String> sourceMetadataPathOpt) {
-+    io.delta.kernel.internal.SnapshotImpl snapshotImpl =
-+        (io.delta.kernel.internal.SnapshotImpl) sourceSnapshot;
-+
++      SparkSession spark, Snapshot sourceSnapshot, String checkpointLocation) {
++    // Extract schema tracking location from user options (case-insensitive).
 +    String schemaTrackingLocation = options.get(DeltaOptions.SCHEMA_TRACKING_LOCATION());
 +    if (schemaTrackingLocation == null) {
 +      schemaTrackingLocation = options.get(DeltaOptions.SCHEMA_TRACKING_LOCATION_ALIAS());
@@ -103,23 +101,28 @@
 +      return Option.empty();
 +    }
 +
-+    if (!(boolean) spark.sessionState().conf().getConf(
-+        DeltaSQLConf.DELTA_STREAMING_ENABLE_SCHEMA_TRACKING())) {
++    if (!(boolean)
++        spark
++            .sessionState()
++            .conf()
++            .getConf(DeltaSQLConf.DELTA_STREAMING_ENABLE_SCHEMA_TRACKING())) {
 +      throw new UnsupportedOperationException(
 +          "Schema tracking location is not supported for Delta streaming source");
 +    }
 +
-+    // V2 does not have a DeltaLog, so we pass table ID and data path directly.
-+    // The consecutiveSchemaChangesMerger is a no-op for the execution phase (it is only
-+    // meaningful during analysis for merging consecutive schema changes).
++    // V2 does not have a DeltaLog, so we extract table ID and data path from the Kernel
++    // snapshot directly. The consecutiveSchemaChangesMerger is a no-op for the execution phase
++    // (it is only meaningful during analysis for merging consecutive schema changes).
++    io.delta.kernel.internal.SnapshotImpl snapshotImpl =
++        (io.delta.kernel.internal.SnapshotImpl) sourceSnapshot;
 +    return Option.apply(
 +        DeltaSourceMetadataTrackingLog.create(
 +            spark,
 +            schemaTrackingLocation,
 +            snapshotImpl.getMetadata().getId(),
 +            snapshotImpl.getPath(),
-+            parameters,
-+            sourceMetadataPathOpt,
++            ScalaUtils.toScalaMap(options),
++            Option.apply(checkpointLocation),
 +            /* mergeConsecutiveSchemaChanges= */ false,
 +            currentMetadata -> Option.empty(),
 +            /* initMetadataLogEagerly= */ true));
spark/v2/src/test/java/io/delta/spark/internal/v2/read/SparkMicroBatchStreamCDCTest.java
@@ -0,0 +1,13 @@
+diff --git a/spark/v2/src/test/java/io/delta/spark/internal/v2/read/SparkMicroBatchStreamCDCTest.java b/spark/v2/src/test/java/io/delta/spark/internal/v2/read/SparkMicroBatchStreamCDCTest.java
+--- a/spark/v2/src/test/java/io/delta/spark/internal/v2/read/SparkMicroBatchStreamCDCTest.java
++++ b/spark/v2/src/test/java/io/delta/spark/internal/v2/read/SparkMicroBatchStreamCDCTest.java
+         /* partitionSchema= */ new StructType(),
+         /* readDataSchema= */ new StructType(),
+         /* dataFilters= */ new org.apache.spark.sql.sources.Filter[0],
+-        /* scalaOptions= */ scala.collection.immutable.Map$.MODULE$.empty());
++        /* scalaOptions= */ scala.collection.immutable.Map$.MODULE$.<String, String>empty(),
++        /* metadataTrackingLog= */ scala.Option.empty(),
++        /* metadataPath= */ "");
+   }
+ 
+   private SparkMicroBatchStream createStream(String tablePath) {
\ No newline at end of file
spark/v2/src/test/java/io/delta/spark/internal/v2/read/SparkMicroBatchStreamTest.java
@@ -0,0 +1,257 @@
+diff --git a/spark/v2/src/test/java/io/delta/spark/internal/v2/read/SparkMicroBatchStreamTest.java b/spark/v2/src/test/java/io/delta/spark/internal/v2/read/SparkMicroBatchStreamTest.java
+--- a/spark/v2/src/test/java/io/delta/spark/internal/v2/read/SparkMicroBatchStreamTest.java
++++ b/spark/v2/src/test/java/io/delta/spark/internal/v2/read/SparkMicroBatchStreamTest.java
+ import org.apache.spark.sql.delta.*;
+ import org.apache.spark.sql.delta.sources.DeltaSQLConf;
+ import org.apache.spark.sql.delta.sources.DeltaSource;
++import org.apache.spark.sql.delta.sources.DeltaSourceMetadataTrackingLog;
+ import org.apache.spark.sql.delta.sources.DeltaSourceOffset;
++import org.apache.spark.sql.delta.sources.PersistedMetadata;
+ import org.apache.spark.sql.delta.sources.ReadMaxBytes;
+ import org.apache.spark.sql.delta.storage.ClosableIterator;
+ import org.apache.spark.sql.delta.util.JsonUtils;
+             partitionSchema,
+             dataSchema,
+             new org.apache.spark.sql.sources.Filter[0],
+-            Map$.MODULE$.empty());
++            Map$.MODULE$.empty(),
++            Option.empty(),
++            testTablePath + "/_checkpoint");
+ 
+     InputPartition[] partitions = stream.planInputPartitions(startOffset, planPartitionsEndOffset);
+     PartitionReaderFactory readerFactory = stream.createReaderFactory();
+             partitionSchema,
+             fullSchema,
+             new org.apache.spark.sql.sources.Filter[0],
+-            Map$.MODULE$.empty());
++            Map$.MODULE$.empty(),
++            Option.empty(),
++            testTablePath + "/_checkpoint");
+ 
+     InputPartition[] partitions = stream.planInputPartitions(startOffset, endOffset);
+     PartitionReaderFactory readerFactory = stream.createReaderFactory();
+         "v1 connector and v2 connector should throw the same error messages on stream start schema changes");
+   }
+ 
++  @Test
++  public void testSchemaTrackingLifecycle_dropColumn_e2e(@TempDir File tempDir) throws Exception {
++    String tablePath = tempDir.getAbsolutePath();
++    String tableName = "test_schema_tracking_drop_column_" + System.nanoTime();
++    createSchemaEvolutionTestTable(tablePath, tableName);
++
++    DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(tablePath));
++    long startVersion = deltaLog.update(false, Option.empty(), Option.empty()).version();
++
++    Configuration hadoopConf = new Configuration();
++    PathBasedSnapshotManager snapshotManager = new PathBasedSnapshotManager(tablePath, hadoopConf);
++    String schemaTrackingLocation = new File(tempDir, "schema_tracking").getAbsolutePath();
++    String checkpointLocation = new File(tempDir, "checkpoint").getAbsolutePath();
++    java.util.Map<String, String> optionMap =
++        Collections.singletonMap("startingVersion", String.valueOf(startVersion));
++    DeltaOptions options = createDeltaOptions(optionMap);
++    DeltaSourceMetadataTrackingLog trackingLog =
++        createTrackingLog(snapshotManager, schemaTrackingLocation, checkpointLocation, optionMap);
++
++    StructType preDropSchema = loadSparkSchemaAtVersion(snapshotManager, startVersion);
++    SparkMicroBatchStream streamBeforeDrop =
++        createSchemaTrackingTestStream(
++            snapshotManager,
++            hadoopConf,
++            options,
++            tablePath,
++            preDropSchema,
++            Option.apply(trackingLog),
++            checkpointLocation);
++
++    sql("ALTER TABLE %s DROP COLUMNS (value)", tableName);
++    long schemaChangeVersion = deltaLog.update(false, Option.empty(), Option.empty()).version();
++    sql("INSERT INTO %s VALUES (3, 'Cathy', named_struct('col1', 18, 'col2', 'SF'))", tableName);
++
++    Offset startOffset = streamBeforeDrop.initialOffset();
++    DeltaSourceOffset startDeltaOffset = (DeltaSourceOffset) startOffset;
++    assertEquals(startVersion, startDeltaOffset.reservoirVersion());
++    assertEquals(DeltaSourceOffset.BASE_INDEX(), startDeltaOffset.index());
++    assertFalse(startDeltaOffset.isInitialSnapshot());
++
++    Offset barrierOffset = streamBeforeDrop.latestOffset(startOffset, ReadLimit.allAvailable());
++    DeltaSourceOffset barrierDeltaOffset = (DeltaSourceOffset) barrierOffset;
++    assertEquals(schemaChangeVersion, barrierDeltaOffset.reservoirVersion());
++    assertEquals(DeltaSourceOffset.METADATA_CHANGE_INDEX(), barrierDeltaOffset.index());
++
++    PersistedMetadata initializedMetadata = trackingLog.getCurrentTrackedMetadata().get();
++    assertEquals(startVersion, initializedMetadata.deltaCommitVersion());
++    assertThat(Arrays.asList(initializedMetadata.dataSchema().fieldNames())).contains("value");
++    assertEquals(
++        List.of(1, 2), readIdsBetweenOffsets(streamBeforeDrop, startOffset, barrierOffset));
++
++    DeltaRuntimeException evolutionException =
++        assertThrows(DeltaRuntimeException.class, () -> streamBeforeDrop.commit(barrierOffset));
++    assertMetadataEvolutionException(
++        evolutionException, "when committing the metadata change barrier");
++
++    PersistedMetadata evolvedMetadata = trackingLog.getCurrentTrackedMetadata().get();
++    assertEquals(schemaChangeVersion, evolvedMetadata.deltaCommitVersion());
++    assertThat(Arrays.asList(evolvedMetadata.dataSchema().fieldNames())).doesNotContain("value");
++
++    StructType postDropSchema =
++        io.delta.spark.internal.v2.utils.SchemaUtils.convertKernelSchemaToSparkSchema(
++            snapshotManager.loadLatestSnapshot().getSchema());
++    SparkMicroBatchStream adoptedStream =
++        createSchemaTrackingTestStream(
++            snapshotManager,
++            hadoopConf,
++            options,
++            tablePath,
++            postDropSchema,
++            Option.apply(trackingLog),
++            checkpointLocation);
++
++    Offset postBarrierOffset = adoptedStream.latestOffset(barrierOffset, ReadLimit.allAvailable());
++    DeltaSourceOffset postBarrierDeltaOffset = (DeltaSourceOffset) postBarrierOffset;
++    assertEquals(schemaChangeVersion, postBarrierDeltaOffset.reservoirVersion());
++    assertEquals(DeltaSourceOffset.POST_METADATA_CHANGE_INDEX(), postBarrierDeltaOffset.index());
++    assertTrue(readIdsBetweenOffsets(adoptedStream, barrierOffset, postBarrierOffset).isEmpty());
++    assertDoesNotThrow(() -> adoptedStream.commit(postBarrierOffset));
++
++    Offset postDropDataOffset =
++        adoptedStream.latestOffset(postBarrierOffset, ReadLimit.allAvailable());
++    assertNotNull(postDropDataOffset);
++    assertEquals(
++        List.of(3), readIdsBetweenOffsets(adoptedStream, postBarrierOffset, postDropDataOffset));
++    assertDoesNotThrow(() -> adoptedStream.commit(postDropDataOffset));
++  }
++
+   /** Provides test scenarios that generate additive schema changes actions. */
+   private static Stream<Arguments> additiveSchemaEvolutionScenarios() {
+     return Stream.of(
+   private SparkMicroBatchStream createTestStreamWithDefaults(
+       PathBasedSnapshotManager snapshotManager, Configuration hadoopConf, DeltaOptions options) {
+     io.delta.kernel.Snapshot snapshot = snapshotManager.loadLatestSnapshot();
++    String tablePath = ((io.delta.kernel.internal.SnapshotImpl) snapshot).getPath();
+     StructType tableSchema =
+         io.delta.spark.internal.v2.utils.SchemaUtils.convertKernelSchemaToSparkSchema(
+             snapshot.getSchema());
+         hadoopConf,
+         spark,
+         options,
+-        /* tablePath= */ "",
++        /* tablePath= */ tablePath,
+         /* dataSchema= */ tableSchema,
+         /* partitionSchema= */ new StructType(),
+         /* readDataSchema= */ new StructType(),
+         /* dataFilters= */ new org.apache.spark.sql.sources.Filter[0],
+-        /* scalaOptions= */ scala.collection.immutable.Map$.MODULE$.empty());
++        /* scalaOptions= */ scala.collection.immutable.Map$.MODULE$.empty(),
++        /* metadataTrackingLog= */ Option.empty(),
++        /* metadataPath= */ tablePath + "/_checkpoint");
++  }
++
++  private SparkMicroBatchStream createSchemaTrackingTestStream(
++      PathBasedSnapshotManager snapshotManager,
++      Configuration hadoopConf,
++      DeltaOptions options,
++      String tablePath,
++      StructType dataSchema,
++      Option<DeltaSourceMetadataTrackingLog> metadataTrackingLog,
++      String metadataPath) {
++    io.delta.kernel.Snapshot snapshot = snapshotManager.loadLatestSnapshot();
++    return new SparkMicroBatchStream(
++        snapshotManager,
++        snapshot,
++        hadoopConf,
++        spark,
++        options,
++        tablePath,
++        dataSchema,
++        /* partitionSchema= */ new StructType(),
++        /* readDataSchema= */ dataSchema,
++        /* dataFilters= */ new org.apache.spark.sql.sources.Filter[0],
++        /* scalaOptions= */ scala.collection.immutable.Map$.MODULE$.empty(),
++        metadataTrackingLog,
++        metadataPath);
++  }
++
++  private DeltaSourceMetadataTrackingLog createTrackingLog(
++      PathBasedSnapshotManager snapshotManager,
++      String schemaTrackingLocation,
++      String checkpointLocation,
++      java.util.Map<String, String> optionMap) {
++    io.delta.kernel.internal.SnapshotImpl snapshot =
++        (io.delta.kernel.internal.SnapshotImpl) snapshotManager.loadLatestSnapshot();
++    return DeltaSourceMetadataTrackingLog.create(
++        spark,
++        schemaTrackingLocation,
++        snapshot.getMetadata().getId(),
++        snapshot.getPath(),
++        ScalaUtils.toScalaMap(optionMap),
++        Option.apply(checkpointLocation),
++        /* mergeConsecutiveSchemaChanges= */ false,
++        /* consecutiveSchemaChangesMerger= */ null,
++        /* initMetadataLogEagerly= */ true);
++  }
++
++  private StructType loadSparkSchemaAtVersion(
++      PathBasedSnapshotManager snapshotManager, long version) {
++    return io.delta.spark.internal.v2.utils.SchemaUtils.convertKernelSchemaToSparkSchema(
++        snapshotManager.loadSnapshotAt(version).getSchema());
++  }
++
++  private List<Integer> readIdsBetweenOffsets(
++      SparkMicroBatchStream stream, Offset startOffset, Offset endOffset) throws Exception {
++    InputPartition[] partitions = stream.planInputPartitions(startOffset, endOffset);
++    PartitionReaderFactory readerFactory = stream.createReaderFactory();
++    List<Integer> ids = new ArrayList<>();
++    for (InputPartition partition : partitions) {
++      if (readerFactory.supportColumnarReads(partition)) {
++        PartitionReader<org.apache.spark.sql.vectorized.ColumnarBatch> reader =
++            readerFactory.createColumnarReader(partition);
++        try {
++          while (reader.next()) {
++            org.apache.spark.sql.vectorized.ColumnarBatch batch = reader.get();
++            for (int rowId = 0; rowId < batch.numRows(); rowId++) {
++              ids.add(batch.getRow(rowId).getInt(0));
++            }
++          }
++        } finally {
++          reader.close();
++        }
++      } else {
++        PartitionReader<InternalRow> reader = readerFactory.createReader(partition);
++        try {
++          while (reader.next()) {
++            ids.add(reader.get().getInt(0));
++          }
++        } finally {
++          reader.close();
++        }
++      }
++    }
++    Collections.sort(ids);
++    return ids;
++  }
++
++  private static void assertMetadataEvolutionException(DeltaRuntimeException ex, String context) {
++    assertEquals(
++        "DELTA_STREAMING_METADATA_EVOLUTION",
++        ex.getErrorClass(),
++        "Should throw metadata evolution exception " + context);
++    java.util.Map<String, String> params = ex.getMessageParameters();
++    assertTrue(params.containsKey("schema"), "Missing 'schema' message parameter");
++    assertTrue(params.containsKey("config"), "Missing 'config' message parameter");
++    assertTrue(params.containsKey("protocol"), "Missing 'protocol' message parameter");
+   }
+ 
+   /** Helper method to create DeltaOptions with read option for testing. */
+     }
+   }
+ 
++  private DeltaOptions createDeltaOptions(java.util.Map<String, String> optionMap) {
++    if (optionMap == null || optionMap.isEmpty()) {
++      return emptyDeltaOptions();
++    }
++    return new DeltaOptions(ScalaUtils.toScalaMap(optionMap), spark.sessionState().conf());
++  }
++
+   /** Helper method to test and compare getStartingVersion results from DSv1 and DSv2. */
+   private void testAndCompareStartingVersion(
+       String testTablePath,
\ No newline at end of file

Reproduce locally: git range-diff cdae0bb..8b93ccd ab61d5e..3e25083 | Disable: git config gitstack.push-range-diff false

@PorridgeSwim PorridgeSwim force-pushed the stack/NonAdditiveSchemaEvolution2 branch from 3e25083 to 33e6da2 Compare April 15, 2026 22:26
@PorridgeSwim PorridgeSwim self-assigned this Apr 15, 2026
@PorridgeSwim PorridgeSwim changed the title Support non-additive schema evolution in v2 [kernel-spark] Support non-additive schema evolution in v2 Apr 15, 2026
@PorridgeSwim PorridgeSwim force-pushed the stack/NonAdditiveSchemaEvolution2 branch from 33e6da2 to f9d52c5 Compare April 24, 2026 23:17
@PorridgeSwim
Copy link
Copy Markdown
Collaborator Author

Range-diff: stack/MetadataEvolutionHandler2 (33e6da2 -> f9d52c5)
spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java
@@ -17,7 +17,7 @@
  import io.delta.kernel.internal.util.VectorUtils;
 +import io.delta.kernel.types.StringType;
  import io.delta.kernel.utils.CloseableIterator;
-+import io.delta.spark.internal.v2.adapters.SparkMetadataAdapter;
++import io.delta.spark.internal.v2.adapters.KernelMetadataAdapter;
  import io.delta.spark.internal.v2.snapshot.DeltaSnapshotManager;
  import io.delta.spark.internal.v2.utils.PartitionUtils;
  import io.delta.spark.internal.v2.utils.ScalaUtils;
@@ -29,7 +29,7 @@
  import org.apache.spark.sql.delta.DeltaErrors;
  import org.apache.spark.sql.delta.DeltaOptions;
  import org.apache.spark.sql.delta.DeltaStartingVersion;
- import org.apache.spark.sql.delta.TypeWidening;
+ import org.apache.spark.sql.delta.sources.AdmittableFile;
  import org.apache.spark.sql.delta.sources.DeltaSQLConf;
  import org.apache.spark.sql.delta.sources.DeltaSource;
 +import org.apache.spark.sql.delta.sources.DeltaSourceMetadataTrackingLog;
@@ -40,17 +40,20 @@
  import org.apache.spark.sql.execution.datasources.FilePartition;
  import org.apache.spark.sql.execution.datasources.FilePartition$;
  import org.apache.spark.sql.execution.datasources.PartitionedFile;
+                   DeltaAction.ADD,
+                   DeltaAction.REMOVE,
+                   DeltaAction.METADATA,
++                  DeltaAction.PROTOCOL,
+                   DeltaAction.COMMITINFO)));
  
-   private static final Set<DeltaAction> ACTION_SET =
-       Collections.unmodifiableSet(
--          new HashSet<>(Arrays.asList(DeltaAction.ADD, DeltaAction.REMOVE, DeltaAction.METADATA)));
-+          new HashSet<>(
-+              Arrays.asList(
-+                  DeltaAction.ADD, DeltaAction.REMOVE,
-+                  DeltaAction.METADATA, DeltaAction.PROTOCOL)));
+   /** Action set for CDC reads. */
+                   DeltaAction.ADD,
+                   DeltaAction.REMOVE,
+                   DeltaAction.METADATA,
++                  DeltaAction.PROTOCOL,
+                   DeltaAction.CDC,
+                   DeltaAction.COMMITINFO)));
  
-   private final Engine engine;
-   private final DeltaSnapshotManager snapshotManager;
    private final boolean skipChangeCommits;
    private final SnapshotImpl snapshotAtSourceInit;
    private final String tableId;
@@ -105,7 +108,7 @@
 -            "readSchemaAtSourceInit is null");
 +        SchemaUtils.convertKernelSchemaToSparkSchema(readSnapshotAtSourceInit.getSchema());
 +    this.readPartitionSchemaAtSourceInit =
-+        new SparkMetadataAdapter(readSnapshotAtSourceInit.getMetadata()).partitionSchema();
++        new KernelMetadataAdapter(readSnapshotAtSourceInit.getMetadata()).partitionSchema();
 +    this.readProtocolAtSourceInit = readSnapshotAtSourceInit.getProtocol();
 +    this.readConfigurationsAtSourceInit = readSnapshotAtSourceInit.getMetadata().getConfiguration();
 +
@@ -115,7 +118,6 @@
              DeltaStreamUtils.SchemaReadOptions$.MODULE$.fromSparkSession(
                  spark, isStreamingFromColumnMappingTable, isTypeWideningSupportedInProtocol),
              "schemaReadOptions is null");
-+
 +    this.metadataEvolutionHandler =
 +        new MetadataEvolutionHandler(
 +            spark,
@@ -132,10 +134,9 @@
 +            readConfigurationsAtSourceInit,
 +            readProtocolAtSourceInit,
 +            metadataPath);
-+
-     validateSchemaCompatibilityOnStartup(dataSchema, partitionSchema, readSchemaAtSourceInit);
-   }
- 
+     boolean shouldValidateSchemaOnRestart =
+         (Boolean)
+             spark
      isTriggerAvailableNow = true;
    }
  
@@ -384,28 +385,21 @@
      Metadata metadataAction = null;
 +    io.delta.kernel.internal.actions.Protocol protocolAction = null;
      String removeFileActionPath = null;
+     String operation = null;
  
-     try (CloseableIterator<ColumnarBatch> actionsIter = commit.getActions()) {
-                     endOffsetOpt,
                      verifyMetadataAction);
            }
-+
+ 
 +          // Track Protocol for schema evolution barrier detection.
 +          Optional<io.delta.kernel.internal.actions.Protocol> protocolOpt =
 +              StreamingHelper.getProtocol(batch, rowId);
 +          if (protocolOpt.isPresent()) {
 +            protocolAction = protocolOpt.get();
 +          }
-         }
-       }
-     } catch (IOException e) {
-     if (removeFileActionPath != null) {
-       if (hasFileAdd && !shouldAllowChanges) {
-         // Commit contains data changes (adds + removes) and changes are disallowed.
--        // TODO(#5319): log CommitInfo action's operation instead of path
-         throw (RuntimeException)
-             DeltaErrors.deltaSourceIgnoreChangesError(version, removeFileActionPath, tablePath);
-       } else if (!hasFileAdd && !shouldAllowDeletes) {
++
+           // Track CommitInfo for operation details in error messages.
+           Optional<CommitInfo> commitInfoOpt = StreamingHelper.getCommitInfo(batch, rowId);
+           if (commitInfoOpt.isPresent()) {
        }
      }
  
@@ -477,8 +471,8 @@
 +      // (both enabled, upgrade from none, downgrade to none).
 +      shouldTrackSchema =
 +          !DeltaColumnMapping$.MODULE$.hasNoColumnMappingSchemaChanges(
-+              new SparkMetadataAdapter(newMetadata),
-+              new SparkMetadataAdapter(oldMetadata),
++              new KernelMetadataAdapter(newMetadata),
++              new KernelMetadataAdapter(oldMetadata),
 +              schemaReadOptions.allowUnsafeStreamingReadOnPartitionColumnChanges());
      }
  
spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkScan.java
@@ -1,7 +1,7 @@
 diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkScan.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkScan.java
 --- a/spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkScan.java
 +++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkScan.java
- import org.apache.spark.sql.connector.read.partitioning.UnknownPartitioning;
+ import org.apache.spark.sql.connector.read.colstats.ColumnStatistics;
  import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
  import org.apache.spark.sql.delta.DeltaOptions;
 +import org.apache.spark.sql.delta.sources.DeltaSQLConf;
@@ -15,12 +15,12 @@
 +import scala.Option;
  
  /** Spark DSV2 Scan implementation backed by Delta Kernel. */
- public class SparkScan
-               DeltaOptions.IGNORE_CHANGES_OPTION(),
+ public class SparkScan implements Scan, SupportsReportStatistics, SupportsRuntimeV2Filtering {
                DeltaOptions.IGNORE_DELETES_OPTION(),
                DeltaOptions.SKIP_CHANGE_COMMITS_OPTION(),
--              DeltaOptions.EXCLUDE_REGEX_OPTION()));
-+              DeltaOptions.EXCLUDE_REGEX_OPTION(),
+               DeltaOptions.EXCLUDE_REGEX_OPTION(),
+-              DeltaOptions.FAIL_ON_DATA_LOSS_OPTION()));
++              DeltaOptions.FAIL_ON_DATA_LOSS_OPTION(),
 +              DeltaOptions.SCHEMA_TRACKING_LOCATION(),
 +              DeltaOptions.SCHEMA_TRACKING_LOCATION_ALIAS(),
 +              DeltaOptions.STREAMING_SOURCE_TRACKING_ID(),
@@ -29,7 +29,16 @@
 +              DeltaOptions.ALLOW_SOURCE_COLUMN_TYPE_CHANGE()));
  
    /**
-    * Block list of DeltaOptions that are not supported for streaming in V2 connector. Only
+-   * Block list of DeltaOptions that are not supported for streaming in V2 connector. Only
+-   * startingVersion, startingTimestamp, maxFilesPerTrigger, maxBytesPerTrigger, ignoreFileDeletion,
+-   * ignoreChanges, ignoreDeletes, skipChangeCommits, excludeRegex, and failOnDataLoss are
+-   * supported. User-defined custom options (not in DeltaOptions) are allowed to pass through.
++   * Block list of DeltaOptions that are not supported for streaming in V2 connector. The supported
++   * options are enumerated in {@link #SUPPORTED_STREAMING_OPTIONS}; user-defined custom options
++   * (not in DeltaOptions) are allowed to pass through.
+    */
+   private static final Set<String> UNSUPPORTED_STREAMING_OPTIONS =
+       Collections.unmodifiableSet(
                    DeltaOptions.CDC_READ_OPTION().toLowerCase(),
                    DeltaOptions.CDC_READ_OPTION_LEGACY().toLowerCase(),
                    DeltaOptions.CDC_END_VERSION().toLowerCase(),
spark/v2/src/test/java/io/delta/spark/internal/v2/read/SparkScanTest.java
@@ -0,0 +1,12 @@
+diff --git a/spark/v2/src/test/java/io/delta/spark/internal/v2/read/SparkScanTest.java b/spark/v2/src/test/java/io/delta/spark/internal/v2/read/SparkScanTest.java
+--- a/spark/v2/src/test/java/io/delta/spark/internal/v2/read/SparkScanTest.java
++++ b/spark/v2/src/test/java/io/delta/spark/internal/v2/read/SparkScanTest.java
+         "The following streaming options are not supported: [readchangefeed]. "
+             + "Supported options are: [startingVersion, startingTimestamp, maxFilesPerTrigger, "
+             + "maxBytesPerTrigger, ignoreFileDeletion, ignoreChanges, ignoreDeletes, skipChangeCommits, "
+-            + "excludeRegex, failOnDataLoss].",
++            + "excludeRegex, failOnDataLoss, schemaTrackingLocation, schemaLocation, "
++            + "streamingSourceTrackingId, allowSourceColumnDrop, allowSourceColumnRename, "
++            + "allowSourceColumnTypeChange].",
+         exception.getMessage());
+   }
\ No newline at end of file

Reproduce locally: git range-diff be6d89d..33e6da2 e7376d3..f9d52c5 | Disable: git config gitstack.push-range-diff false

@PorridgeSwim PorridgeSwim force-pushed the stack/NonAdditiveSchemaEvolution2 branch 4 times, most recently from a448530 to 8497640 Compare April 29, 2026 18:12
@PorridgeSwim PorridgeSwim marked this pull request as ready for review April 29, 2026 18:41
@PorridgeSwim PorridgeSwim force-pushed the stack/NonAdditiveSchemaEvolution2 branch from 8497640 to efb589e Compare April 29, 2026 19:53
@PorridgeSwim
Copy link
Copy Markdown
Collaborator Author

Range-diff: stack/MetadataEvolutionHandler2 (8497640 -> efb589e)
spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java
@@ -37,9 +37,9 @@
  import org.apache.spark.sql.delta.sources.DeltaSourceOffset$;
  import org.apache.spark.sql.delta.sources.DeltaStreamUtils;
 +import org.apache.spark.sql.delta.sources.PersistedMetadata;
- import org.apache.spark.sql.execution.datasources.FilePartition;
- import org.apache.spark.sql.execution.datasources.FilePartition$;
  import org.apache.spark.sql.execution.datasources.PartitionedFile;
+ import org.apache.spark.sql.internal.SQLConf;
+ import org.apache.spark.sql.sources.Filter;
  
    private static final Logger logger = LoggerFactory.getLogger(SparkMicroBatchStream.class);
  

Reproduce locally: git range-diff b637275..8497640 9a48144..efb589e | Disable: git config gitstack.push-range-diff false

@PorridgeSwim
Copy link
Copy Markdown
Collaborator Author

Range-diff: stack/MetadataEvolutionHandler2 (5c0c8bf -> 6c83dbc)
spark/v2/src/main/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandler.java
@@ -22,15 +22,31 @@
  import org.apache.spark.sql.delta.sources.DeltaSourceMetadataEvolutionSupport$;
  import org.apache.spark.sql.delta.sources.DeltaSourceMetadataTrackingLog;
  import org.apache.spark.sql.delta.sources.DeltaSourceOffset;
+   }
+ 
+   /**
+-   * V2 port of V1's {@code
++   * Picks the most recent metadata and most supportive protocol in {@code [startVersion,
++   * endVersion]} to seed the tracking log; throws if any earlier change is incompatible.
++   *
++   * <p>V2 port of V1's {@code
+    * DeltaSourceMetadataEvolutionSupport.validateAndResolveMetadataForLogInitialization}.
+    */
+   private ValidatedMetadataAndProtocol validateAndResolveMetadataForLogInitialization(
        this.protocol = protocol;
      }
    }
 +
 +  // ---------------------------------------------------------------------------
-+  // Consecutive metadata-change merging (analysis-phase helper)
++  // Static utilities
 +  // ---------------------------------------------------------------------------
 +
-+  /** V2 port of V1's {@code DeltaDataSource.getMetadataTrackingLogForDeltaSource}. */
++  /**
++   * Builds the tracking log from streaming options: empty when {@code schemaTrackingLocation} is
++   * unset; throws when it is set but schema tracking is disabled in config.
++   *
++   * <p>V2 port of V1's {@code DeltaDataSource.getMetadataTrackingLogForDeltaSource}.
++   */
 +  public static Option<DeltaSourceMetadataTrackingLog> getMetadataTrackingLogForMicroBatchStream(
 +      SparkSession spark,
 +      SnapshotImpl snapshot,
spark/v2/src/test/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandlerTest.java
@@ -8,12 +8,16 @@
  import org.apache.spark.sql.delta.sources.DeltaSourceMetadataTrackingLog;
  import org.apache.spark.sql.delta.sources.DeltaSourceOffset;
  import org.apache.spark.sql.delta.sources.DeltaStreamUtils;
-     assertTrue(hwl.trackingLog.getCurrentTrackedMetadata().isDefined());
-     assertEquals(1L, hwl.trackingLog.getCurrentTrackedMetadata().get().deltaCommitVersion());
+         ex.getErrorClass(),
+         "Should throw incompatible-metadata error when range contains a column drop");
    }
 +
 +  // ---------------------------------------------------------------------------
 +  // getMetadataTrackingLogForMicroBatchStream
++  //
++  // Builds the tracking log from streaming options: empty when SCHEMA_TRACKING_LOCATION
++  // (or its alias) is unset; throws when set but the feature flag is off; otherwise
++  // returns a usable log.
 +  // ---------------------------------------------------------------------------
 +
 +  /**

Reproduce locally: git range-diff 9a48144..5c0c8bf 696ce03..6c83dbc | Disable: git config gitstack.push-range-diff false

@PorridgeSwim PorridgeSwim changed the title [kernel-spark] Support non-additive schema evolution in v2 [kernel-spark] Support schema tracking log in v2 analysis stage May 1, 2026
@PorridgeSwim PorridgeSwim force-pushed the stack/NonAdditiveSchemaEvolution2 branch from 9c30908 to 7c66bf1 Compare May 6, 2026 17:56
murali-db pushed a commit that referenced this pull request May 6, 2026
…seable in v2 (#6562)

## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/6562/files) to
review incremental changes.
-
[stack/SparkMetadataAdapter](#6546)
[[Files changed](https://github.com/delta-io/delta/pull/6546/files)]
[MERGED]
-
[stack/RefactorMetadataTrackingLog](#6550)
[[Files changed](https://github.com/delta-io/delta/pull/6550/files)]
[MERGED]
-
[**stack/RefactorDeltaSourceMetadataEvolutionSupport**](#6562)
[[Files changed](https://github.com/delta-io/delta/pull/6562/files)]
-
[stack/MetadataEvolutionHandler2](#6563)
[[Files
changed](https://github.com/delta-io/delta/pull/6563/files/ed92a0fa2051432b6bc5784034df0b7949bbfb98..e5b2c3295843ec85753e07dc0010aa5ccebaabb7)]
-
[stack/NonAdditiveSchemaEvolution2](#6570)
[[Files
changed](https://github.com/delta-io/delta/pull/6570/files/e5b2c3295843ec85753e07dc0010aa5ccebaabb7..7c66bf11a0f1b651cda32ed7f529f552dd9dbfcb)]
-
[stack/NonAdditiveSchemaEvolution3](#6697)
[[Files
changed](https://github.com/delta-io/delta/pull/6697/files/7c66bf11a0f1b651cda32ed7f529f552dd9dbfcb..14956ea304c93d2343ccd7eb89a112966f07f906)]
-
[stack/consecutiveSchemaChangesMerger](#6698)
[[Files
changed](https://github.com/delta-io/delta/pull/6698/files/14956ea304c93d2343ccd7eb89a112966f07f906..8101b335b892a6a5b6d6fe11f4a202d14102721c)]

---------
#### Which Delta project/connector is this regarding?

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

PR 3/7 in the non-additive schema evolution for V2 streaming connector
stack.

Refactor `DeltaSourceMetadataEvolutionSupport` and `DeltaColumnMapping`
so the schema change detection logic can be called from V2 without
depending on V1 instance state.

**`DeltaSourceMetadataEvolutionSupport`:**
- Extract instance methods (`validateAndResolveMetadataEvolution`,
`checkColumnMappingSchemaChangesDuringStreaming`,
`resolveMetadataEvolutionForCommitRange`, etc.) to companion object
statics that accept explicit parameters instead of accessing V1
`DeltaSource` via `this`
- V1 trait methods now delegate to the companion object statics

**`DeltaColumnMapping`:**
- Widen `hasNoColumnMappingSchemaChanges` from V1 `Metadata` to
`AbstractMetadata` so V2 can call it via the adapter layer
- Extract `assignColumnIdAndPhysicalNameToSchema(StructType, Map)` from
`assignColumnIdAndPhysicalName(Metadata, Metadata, ...)` — needed for
simulating column mapping upgrades during NoMapping-to-NameMapping
transitions

All changes are structural refactors with no behavioral change.

## How was this patch tested?

Existing tests in `DeltaSourceSchemaEvolutionSuite` continue to pass. No
behavioral changes.

## Does this PR introduce _any_ user-facing changes?

No.
@PorridgeSwim PorridgeSwim force-pushed the stack/NonAdditiveSchemaEvolution2 branch 2 times, most recently from 99b2159 to 0e07f87 Compare May 6, 2026 23:16
Copy link
Copy Markdown
Collaborator

@TimothyW553 TimothyW553 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java:243-247 (not in this diff): what happens at restart if this snapshot schema differs from what SparkTable.schema() got from the tracking log?

(extraOptions.containsKey(DeltaOptions.SCHEMA_TRACKING_LOCATION) ||
extraOptions.containsKey(DeltaOptions.SCHEMA_TRACKING_LOCATION_ALIAS)) &&
!tableOptions.containsKey(DeltaOptions.SCHEMA_TRACKING_LOCATION) &&
!tableOptions.containsKey(DeltaOptions.SCHEMA_TRACKING_LOCATION_ALIAS)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If extraOptions and the table both set schemaTrackingLocation to different values, the user's value is silently dropped. Error instead?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is intentional. If the table already carries a schemaTrackingLocation, we treat that as the source of truth. In practice there's no conflict to surface: this rule is currently the only path that propagates schemaTrackingLocation onto SparkTable (via the merged-options rebuild below), so by the time tableOptions contains the key it was put there by us. The !tableOptions.containsKey(...) check is what keeps the rule idempotent — without it, we'd rebuild the StreamingRelationV2 on every pass.

// the runtime tableId (V1 uses the Delta UUID, V2 uses Kernel's snapshot id).
// `table.name` is path-aware ("delta.`/path`" for path-based, qualified name for
// catalog-based) and is sufficient to differentiate sources for the conflict check.
val tableId = table.name.replace(":", "").replace("/", "_")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

table.name differs for path vs catalog access to the same table, so this misses the conflict V1 catches. Use the path?

Copy link
Copy Markdown
Collaborator Author

@PorridgeSwim PorridgeSwim May 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, but this is fail-fast only — at runtime both V1 and V2 use the stable Delta UUID, and DeltaSourceMetadataTrackingLog re-validates, so the conflict still surfaces at first batch.
Getting the path here isn't clean: StreamingRelationV2.table is the generic DSv2 Table (only name()), and SparkTable lives in a module above DeltaAnalysis. A proper fix needs a SparkTable-side change — out of scope. Adding a TODO.

DeltaSQLConf
.DELTA_STREAMING_ENABLE_SCHEMA_TRACKING_MERGE_CONSECUTIVE_CHANGES());
scala.Option<DeltaSourceMetadataTrackingLog> trackingLog =
MetadataEvolutionHandler.getMetadataTrackingLogForMicroBatchStream(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this out of the constructor, into the streaming entry?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SparkTable's schema is fixed during its construction, and this schema will be used in logicalPlan as expected readSchema of Spark engine. Hence, we have to fetch the correct read schema at SparkTable level

private def needsSchemaTrackingRebuild(
table: SparkTable, extraOptions: CaseInsensitiveStringMap): Boolean = {
val tableOptions = new CaseInsensitiveStringMap(table.getOptions)
(extraOptions.containsKey(DeltaOptions.SCHEMA_TRACKING_LOCATION) ||
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's turn this into a static method somewhere in a util?

Copy link
Copy Markdown
Collaborator Author

@PorridgeSwim PorridgeSwim May 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to metadataevolutionhandler

// Keep this None to force the V2 path; we don't want to fall back to V1 here.
v1Relation = None)

// For catalog-loaded relations (readStream.table("foo")), TableCatalog.loadTable has no
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this explanation is unnecessary here. let's just use a TODO (one line)

Map<String, String> options,
DeltaSnapshotManager snapshotManager,
Engine engine,
Set<DeltaLogActionUtils.DeltaAction> mergeActionSet,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unused?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They will be used in #6698, since I am aiming to merge them all in the this week, it is fine to keep them unused for now, there is no correctness issue

Option<String> sourceMetadataPathOpt,
boolean mergeConsecutiveSchemaChanges) {
String location =
options.getOrDefault(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a case-insensitive lookup?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

// consecutive metadata-only commits and writes the merged entry back to the durable schema
// log; the execution-time SparkMicroBatchStream then re-reads the same merged entry from the
// log via DeltaSourceMetadataTrackingLog.getCurrentTrackedMetadata.
SparkSession spark = SparkSession.active();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you put the reading-from-schema-tracking-log logic into a util method?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to MetadataEvolutionHandler

// Override in CDC suites to add CDC-specific tests.
override protected def shouldFailTests: Set[String] = Set(
// ========== Schema location validation ==========
// TODO(#5319): Move tests to shouldPassTests as V2 schema tracking log support is implemented.c
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo

murali-db pushed a commit that referenced this pull request May 11, 2026
## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/6563/files) to
review incremental changes.
-
[stack/SparkMetadataAdapter](#6546)
[[Files changed](https://github.com/delta-io/delta/pull/6546/files)]
[MERGED]
-
[stack/RefactorMetadataTrackingLog](#6550)
[[Files changed](https://github.com/delta-io/delta/pull/6550/files)]
[MERGED]
-
[stack/RefactorDeltaSourceMetadataEvolutionSupport](#6562)
[[Files changed](https://github.com/delta-io/delta/pull/6562/files)]
[MERGED]
-
[**stack/MetadataEvolutionHandler2**](#6563)
[[Files changed](https://github.com/delta-io/delta/pull/6563/files)]
-
[stack/NonAdditiveSchemaEvolution2](#6570)
[[Files
changed](https://github.com/delta-io/delta/pull/6570/files/a20f1f3ab452a75fc954e15c57c17327e0cb9267..0e07f87285becd6be416450ae084df454d9c94a9)]
-
[stack/NonAdditiveSchemaEvolution3](#6697)
[[Files
changed](https://github.com/delta-io/delta/pull/6697/files/0e07f87285becd6be416450ae084df454d9c94a9..73e1aa7f4162a3e1480ffd2b88b9ca79d852f2fe)]
-
[stack/consecutiveSchemaChangesMerger](#6698)
[[Files
changed](https://github.com/delta-io/delta/pull/6698/files/73e1aa7f4162a3e1480ffd2b88b9ca79d852f2fe..5e5d260b64d45cc11bcfdb58e5aab1b2d2637b33)]
- [stack/V1V2MixTest](#6759)
[[Files
changed](https://github.com/delta-io/delta/pull/6759/files/5e5d260b64d45cc11bcfdb58e5aab1b2d2637b33..738379713040986c74f98dbebfdc6c83ec1d3f16)]

---------
#### Which Delta project/connector is this regarding?

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

PR 4/7 in the non-additive schema evolution for V2 streaming connector
stack.

Introduce `MetadataEvolutionHandler`, a Java class that implements the
V1 barrier protocol for schema evolution in the V2 connector. In V1 this
logic lives in `DeltaSourceMetadataEvolutionSupport`, a Scala trait
mixed into `DeltaSource` that accesses stream state via `this`. Since
V2's `SparkMicroBatchStream` is Java and cannot use Scala trait mixins,
`MetadataEvolutionHandler` receives all dependencies via constructor
injection instead.

The handler covers the full schema evolution lifecycle:
- **Stream start**: eager metadata tracking log initialization on first
batch
- **Offset generation**: injects `METADATA_CHANGE_INDEX` /
`POST_METADATA_CHANGE_INDEX` barrier sentinels into the file change
iterator
- **Pending schema offsets**: returns barrier offsets for in-progress
schema changes
- **Batch commit**: updates the schema log and throws
`DELTA_STREAMING_METADATA_EVOLUTION` to trigger stream restart
- **Batch planning on restart**: validates and re-initializes the schema
log

All detection logic delegates to the shared
`DeltaSourceMetadataEvolutionSupport$` companion object statics
(refactored in PR 3/7). V2-specific orchestration is limited to wiring
the barrier protocol into the `CloseableIterator<IndexedFile>` pipeline
and collecting metadata/protocol from Kernel commit ranges via
`StreamingHelper`.

Also extends `StreamingHelper` with
`getMetadataAndProtocolForVersionRange` to collect metadata and protocol
actions from a range of Kernel commits.

## How was this patch tested?

Unit tests in `MetadataEvolutionHandlerTest.java` covering: barrier
protocol (METADATA_CHANGE_INDEX / POST_METADATA_CHANGE_INDEX offset
generation), tracking state transitions, initialization lifecycle,
offset arithmetic, pending schema change handling, and commit-time
evolution exception.

## Does this PR introduce _any_ user-facing changes?

No.
@zikangh zikangh requested a review from BrooksWalls May 11, 2026 20:20
@PorridgeSwim PorridgeSwim force-pushed the stack/NonAdditiveSchemaEvolution2 branch from 0e07f87 to a85d763 Compare May 13, 2026 21:07
@PorridgeSwim PorridgeSwim force-pushed the stack/NonAdditiveSchemaEvolution2 branch 3 times, most recently from 818591f to 2a2f77a Compare May 14, 2026 23:21
identifier = Some(ident),
v1Relation = None)

case s @ StreamingRelationV2(_, _, table: SparkTable, extraOptions, _, _, _, _)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see a lot of overlap between cdc and schema evolution. Could you help reconcile them? We should only run schema augmentation once.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

DeltaAction.ADD,
DeltaAction.REMOVE,
DeltaAction.METADATA,
DeltaAction.PROTOCOL,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add this to the CDC path too? One of us (whoever has time) should make sure we test CDC + schema evolution cases.

Copy link
Copy Markdown
Collaborator Author

@PorridgeSwim PorridgeSwim May 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will create a new PR to support CDC + schema evolution and add TODO in MetadataEvolutionHandler

* schemaTrackingLocation}/{@code schemaLocation} option was observed, so callers must rebuild the
* table with the option folded in for its schema to be driven by the tracking log.
*/
public static boolean shouldPropagateSchemaTrackingToTable(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a test for this?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

@PorridgeSwim PorridgeSwim force-pushed the stack/NonAdditiveSchemaEvolution2 branch from 2a2f77a to c13a9c3 Compare May 15, 2026 05:45
val rebuilt = if (table.getCatalogTable.isPresent) {
new SparkTable(table.getIdentifier, table.getCatalogTable.get, merged)
} else {
new SparkTable(table.getIdentifier, table.getTablePath.toString, merged)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we make sure table.getIdentifier is not null? same for getTablePath.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is requireNonNull in SparkTable's constructor for identifier and tablePath

@PorridgeSwim PorridgeSwim force-pushed the stack/NonAdditiveSchemaEvolution2 branch from c13a9c3 to b7f6c8e Compare May 15, 2026 22:34
@murali-db murali-db merged commit aa42034 into delta-io:master May 16, 2026
31 checks passed
seewishnew added a commit to murali-db/delta that referenced this pull request May 16, 2026
…ark 4.2

New code from delta-io#6570 (schema tracking log in v2 analysis) uses a direct
StreamingRelationV2 8-arg pattern match that breaks on Spark 4.2 where
the case class has 9 parameters. Use StreamingRelationV2Shim instead.

Co-authored-by: Isaac
murali-db pushed a commit that referenced this pull request May 16, 2026
…6697)

## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/6697/files) to
review incremental changes.
-
[stack/SparkMetadataAdapter](#6546)
[[Files changed](https://github.com/delta-io/delta/pull/6546/files)]
[MERGED]
-
[stack/RefactorMetadataTrackingLog](#6550)
[[Files changed](https://github.com/delta-io/delta/pull/6550/files)]
[MERGED]
-
[stack/RefactorDeltaSourceMetadataEvolutionSupport](#6562)
[[Files changed](https://github.com/delta-io/delta/pull/6562/files)]
[MERGED]
-
[stack/MetadataEvolutionHandler2](#6563)
[[Files changed](https://github.com/delta-io/delta/pull/6563/files)]
[MERGED]
-
[stack/NonAdditiveSchemaEvolution2](#6570)
[[Files changed](https://github.com/delta-io/delta/pull/6570/files)]
[MERGED]
-
[**stack/NonAdditiveSchemaEvolution3**](#6697)
[[Files changed](https://github.com/delta-io/delta/pull/6697/files)]
-
[stack/consecutiveSchemaChangesMerger](#6698)
[[Files
changed](https://github.com/delta-io/delta/pull/6698/files/f96643aa3cc01e7f70cc13a18b82dc27f277f11d..f612628ad931ec35c237801109f01b6fbd1379f7)]
-
[stack/SchemaTrackingWithCDC](#6801)
[[Files
changed](https://github.com/delta-io/delta/pull/6801/files/f612628ad931ec35c237801109f01b6fbd1379f7..4aeacfb120b33e9cdfe124352290b72f53f7cf89)]
- [stack/V1V2MixTest](#6759)
[[Files
changed](https://github.com/delta-io/delta/pull/6759/files/f612628ad931ec35c237801109f01b6fbd1379f7..0c818ee431ab417a4f2ffbcc609930be09d25031)]

---------
#### Which Delta project/connector is this regarding?

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

PR 6/7 in the non-additive schema evolution for V2 streaming connector
stack.

Wire `MetadataEvolutionHandler` into `SparkMicroBatchStream` and
`SparkScan` so V2 streaming reads honor non-additive schema evolution
(column rename/drop, type widening).

- `SparkMicroBatchStream`: take `metadataTrackingLog` + `metadataPath`
as constructor inputs; when a persisted entry exists, layer it onto the
freshly loaded `snapshotAtSourceInit` to derive
`readSnapshotAtSourceInit` (mirrors V1's `readSnapshotDescriptor`).
Integrate the schema-evolution barrier protocol into `latestOffset` /
`commit` / `planInputPartitions`. Skip the on-restart schema-validation
check when schema tracking is active — the schema-log evolution
exception covers it.
- `SparkScan.toMicroBatchStream`: reload latest snapshot (the
analysis-time `initialSnapshot` can be stale by stream start), open the
tracking log via
`MetadataEvolutionHandler.getMetadataTrackingLogForMicroBatchStream`
with `mergeConsecutiveSchemaChanges=false` (the merger only runs at
analysis), and pass it through with the checkpoint location.
- `SparkScan` option allow-list: move `allowSourceColumnDrop` / `Rename`
/ `TypeChange` out of the unsupported list now that they are honored.

## How was this patch tested?

`SparkMicroBatchStreamTest`, `MetadataEvolutionHandlerTest`. Unified
suites (`DeltaV2SourceSchemaEvolutionSuite`,
`TypeWideningStreamingV2SourceSuite`,
`RemoveColumnMappingStreamingReadV2Suite`) move non-merger evolution
scenarios from `shouldFailTests` to `shouldPassTests`; merger-dependent
tests remain pending until PR 7/7.

## Does this PR introduce _any_ user-facing changes?

No.
seewishnew added a commit to murali-db/delta that referenced this pull request May 19, 2026
…ark 4.2

New code from delta-io#6570 (schema tracking log in v2 analysis) uses a direct
StreamingRelationV2 8-arg pattern match that breaks on Spark 4.2 where
the case class has 9 parameters. Use StreamingRelationV2Shim instead.

Co-authored-by: Isaac
murali-db pushed a commit that referenced this pull request May 19, 2026
## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/6698/files) to
review incremental changes.
-
[stack/SparkMetadataAdapter](#6546)
[[Files changed](https://github.com/delta-io/delta/pull/6546/files)]
[MERGED]
-
[stack/RefactorMetadataTrackingLog](#6550)
[[Files changed](https://github.com/delta-io/delta/pull/6550/files)]
[MERGED]
-
[stack/RefactorDeltaSourceMetadataEvolutionSupport](#6562)
[[Files changed](https://github.com/delta-io/delta/pull/6562/files)]
[MERGED]
-
[stack/MetadataEvolutionHandler2](#6563)
[[Files changed](https://github.com/delta-io/delta/pull/6563/files)]
[MERGED]
-
[stack/NonAdditiveSchemaEvolution2](#6570)
[[Files changed](https://github.com/delta-io/delta/pull/6570/files)]
[MERGED]
-
[stack/NonAdditiveSchemaEvolution3](#6697)
[[Files changed](https://github.com/delta-io/delta/pull/6697/files)]
[MERGED]
-
[**stack/consecutiveSchemaChangesMerger**](#6698)
[[Files changed](https://github.com/delta-io/delta/pull/6698/files)]
-
[stack/SchemaTrackingWithCDC](#6801)
[[Files
changed](https://github.com/delta-io/delta/pull/6801/files/e230b46c3acb772d6599662b7c5aaf17e3625498..1ed4903f1b06fd49533dad3a1cf25c9206aef2f3)]
- [stack/V1V2MixTest](#6759)
[[Files
changed](https://github.com/delta-io/delta/pull/6759/files/e230b46c3acb772d6599662b7c5aaf17e3625498..e3c0d530a63150797b7b882fab2dad2070452683)]

---------
#### Which Delta project/connector is this regarding?

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

PR 7/7 in the non-additive schema evolution for V2 streaming connector
stack.

Implement V2's consecutive-schema-changes merger so analysis-time
evolution matches V1: runs of consecutive metadata-only commits collapse
to a single tracked entry at the latest version. Without the merger,
each metadata-only commit produces its own pending schema offset.

- `MetadataEvolutionHandler.getMergedConsecutiveMetadataChanges`: V2
port of V1's
`DeltaSourceMetadataEvolutionSupport.getMergedConsecutiveMetadataChanges`.
Walks Kernel commits forward via `CommitRangeImpl` +
`StreamingHelper.getCommitActionsFromRangeUnsafe`; for each commit
detects file actions (ADD/REMOVE) and metadata/protocol actions; stops
on the first commit with a file action or with neither metadata nor
protocol; emits a merged `PersistedMetadata` at the latest metadata-only
version.
- `MetadataEvolutionHandler.getMetadataTrackingLogForMicroBatchStream`:
pass the merger lambda into `DeltaSourceMetadataTrackingLog.create` (was
a `null` placeholder).
- `DeltaSourceMetadataTrackingLog` (V1): extract
`PersistedMetadata.toProtocolJson` helper so V2's merger can reuse the
same protocol-JSON encoding.

## How was this patch tested?

`MetadataEvolutionHandlerTest` covers merger walk semantics —
stop-on-file-action, stop-on-no-metadata-or-protocol, multiple folded
changes, protocol-only and combined updates.
`DeltaSourceSchemaEvolutionSuite` adds parallel V1 tests for the same
scenarios. Unified `DeltaV2SourceSchemaEvolutionSuite` moves the
remaining merger-dependent scenarios (`consecutive schema evolutions`,
`unblock with sql conf`, `streaming with a column mapping upgrade`) from
`shouldFailTests` to `shouldPassTests`.

## Does this PR introduce _any_ user-facing changes?

No.
murali-db pushed a commit that referenced this pull request May 21, 2026
## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/6801/files) to
review incremental changes.
-
[stack/SparkMetadataAdapter](#6546)
[[Files changed](https://github.com/delta-io/delta/pull/6546/files)]
[MERGED]
-
[stack/RefactorMetadataTrackingLog](#6550)
[[Files changed](https://github.com/delta-io/delta/pull/6550/files)]
[MERGED]
-
[stack/RefactorDeltaSourceMetadataEvolutionSupport](#6562)
[[Files changed](https://github.com/delta-io/delta/pull/6562/files)]
[MERGED]
-
[stack/MetadataEvolutionHandler2](#6563)
[[Files changed](https://github.com/delta-io/delta/pull/6563/files)]
[MERGED]
-
[stack/NonAdditiveSchemaEvolution2](#6570)
[[Files changed](https://github.com/delta-io/delta/pull/6570/files)]
[MERGED]
-
[stack/NonAdditiveSchemaEvolution3](#6697)
[[Files changed](https://github.com/delta-io/delta/pull/6697/files)]
[MERGED]
-
[stack/consecutiveSchemaChangesMerger](#6698)
[[Files changed](https://github.com/delta-io/delta/pull/6698/files)]
[MERGED]
-
[**stack/SchemaTrackingWithCDC**](#6801)
[[Files changed](https://github.com/delta-io/delta/pull/6801/files)]
- [stack/V1V2MixTest](#6759)
[[Files changed](https://github.com/delta-io/delta/pull/6759/files)]

---------
#### Which Delta project/connector is this regarding?

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

Follow-up to the non-additive schema evolution stack: extend V2
streaming schema tracking to CDC reads so a CDC stream stops at
metadata- or protocol-change commits with a barrier sentinel instead of
either silently reading across the change or failing the read-compat
check.

- `SparkMicroBatchStream.collectAndBuildCDCIndexedFiles`: capture
`Protocol` alongside `Metadata` while scanning a commit's actions, then
call
`MetadataEvolutionHandler.getMetadataOrProtocolChangeIndexedFileIterator`
once the scan is done; when the commit diverges from source-init, return
a singleton barrier (`METADATA_CHANGE_INDEX`) in place of BASE + files +
END. Skip the on-commit `verifyMetadataAction` read-compat check when
schema tracking is active — the barrier covers divergence. V1 splits
this between `DeltaSourceCDCSupport.filterAndIndexDeltaLogs` (barrier
injection) and `IndexedChangeFileSeq.filterFiles` (short-circuit); V2
collapses both into this single method.
- `SparkMicroBatchStream.applyPerCommitCDCAdmission`: pass barrier
sentinels through admission unchanged (they can only appear as element 0
of the per-commit list).
- `SparkMicroBatchStream.getFileChangesForCDC`: apply
`metadataEvolutionHandler.stopIndexedFileIteratorAtSchemaChangeBarrier`
after end-boundary filtering so post-barrier commits in the same batch
are truncated. V1's wrap lives in the shared
`DeltaSource.getFileChangesWithRateLimit`; V2 places it inside the
CDC-specific method because both `planInputPartitions` and the outer
`getFileChangesWithRateLimit` reach the CDC iterator through here.
- `MetadataEvolutionHandler.getMergedConsecutiveMetadataChanges`:
include `AddCDCFile` in the action set the merger walks and treat any
non-null CDC column as a file action that stops the merger walk. Drops
the `mergeActionSet` parameter (always `CDC_ACTION_SET` now) so non-CDC
and CDC analysis share the same stop semantics. Resolves the TODO(#5319)
placeholder left by PR 7/7.
- `SparkMicroBatchStream.CDC_ACTION_SET`: promote to `public` so
`MetadataEvolutionHandler` can reuse it.

## How was this patch tested?

`SparkMicroBatchStreamCDCTest` adds barrier-emission cases:
- `testProcessCommit_emitsBarrierAtSchemaChange`: a metadata-only commit
on a CDC stream with seeded tracking emits `[barrier, END]` from
`processCommitToIndexedFilesForCDC`.
- `testGetFileChangesForCDC_emitsBarrierAtSchemaChange`: end-to-end
check that the barrier fires and the iterator truncates across commits —
exercises metadata/protocol capture, barrier emission, admission
passthrough, and cross-commit truncation in one path.

`MetadataEvolutionHandlerTest` extends the merger walk tests to cover
CDC file actions stopping the walk. The unified
`DeltaV2SourceSchemaEvolutionCDCSuiteBase` moves all evolution scenarios
out of `shouldFailTests` into `shouldPassTests` so the CDC variants of
the streaming schema-evolution suite now run alongside non-CDC.

## Does this PR introduce _any_ user-facing changes?

No.

Co-authored-by: Timothy Wang <timothy.art@gmail.com>
SanJSp pushed a commit to SanJSp/delta that referenced this pull request May 22, 2026
…ark 4.2

New code from delta-io#6570 (schema tracking log in v2 analysis) uses a direct
StreamingRelationV2 8-arg pattern match that breaks on Spark 4.2 where
the case class has 9 parameters. Use StreamingRelationV2Shim instead.

Co-authored-by: Isaac
SanJSp pushed a commit to SanJSp/delta that referenced this pull request May 22, 2026
…ark 4.2

New code from delta-io#6570 (schema tracking log in v2 analysis) uses a direct
StreamingRelationV2 8-arg pattern match that breaks on Spark 4.2 where
the case class has 9 parameters. Use StreamingRelationV2Shim instead.

Co-authored-by: Isaac
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants