Skip to content

[kernel-spark] Support non-additive schema evolution in v2 connector#6697

Merged
murali-db merged 1 commit into
delta-io:masterfrom
PorridgeSwim:stack/NonAdditiveSchemaEvolution3
May 16, 2026
Merged

[kernel-spark] Support non-additive schema evolution in v2 connector#6697
murali-db merged 1 commit into
delta-io:masterfrom
PorridgeSwim:stack/NonAdditiveSchemaEvolution3

Conversation

@PorridgeSwim
Copy link
Copy Markdown
Collaborator

@PorridgeSwim PorridgeSwim commented May 1, 2026

🥞 Stacked PR

Use this link to review incremental changes.


Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

PR 6/7 in the non-additive schema evolution for V2 streaming connector stack.

Wire MetadataEvolutionHandler into SparkMicroBatchStream and SparkScan so V2 streaming reads honor non-additive schema evolution (column rename/drop, type widening).

  • SparkMicroBatchStream: take metadataTrackingLog + metadataPath as constructor inputs; when a persisted entry exists, layer it onto the freshly loaded snapshotAtSourceInit to derive readSnapshotAtSourceInit (mirrors V1's readSnapshotDescriptor). Integrate the schema-evolution barrier protocol into latestOffset / commit / planInputPartitions. Skip the on-restart schema-validation check when schema tracking is active — the schema-log evolution exception covers it.
  • SparkScan.toMicroBatchStream: reload latest snapshot (the analysis-time initialSnapshot can be stale by stream start), open the tracking log via MetadataEvolutionHandler.getMetadataTrackingLogForMicroBatchStream with mergeConsecutiveSchemaChanges=false (the merger only runs at analysis), and pass it through with the checkpoint location.
  • SparkScan option allow-list: move allowSourceColumnDrop / Rename / TypeChange out of the unsupported list now that they are honored.

How was this patch tested?

SparkMicroBatchStreamTest, MetadataEvolutionHandlerTest. Unified suites (DeltaV2SourceSchemaEvolutionSuite, TypeWideningStreamingV2SourceSuite, RemoveColumnMappingStreamingReadV2Suite) move non-merger evolution scenarios from shouldFailTests to shouldPassTests; merger-dependent tests remain pending until PR 7/7.

Does this PR introduce any user-facing changes?

No.

@PorridgeSwim
Copy link
Copy Markdown
Collaborator Author

Range-diff: stack/NonAdditiveSchemaEvolution2 (d8362c3 -> d344128)
spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaV2SourceSchemaEvolutionSuite.scala
@@ -7,7 +7,7 @@
 -  // TODO(#5319): Move tests to shouldPassTests as V2 schema tracking log support is implemented.c
    override protected def shouldPassTests: Set[String] = Set(
      // ========== Schema log unit test ==========
-     "schema location not under checkpoint",
+     "detects metadata/protocol changes through the AbstractMetadata/AbstractProtocol surface",
      "forward-compat: older version can read back newer JSON",
  
      // ========== Schema log core ==========
spark-unified/src/test/scala/org/apache/spark/sql/delta/test/typewidening/TypeWideningStreamingV2SourceSuite.scala
@@ -33,12 +33,8 @@
      "type change - widen aggregation expression after projection",
      "type change - widen limit",
      "type change - widen distinct",
-     "type change - widen flatMap groups with state",
-     "widening type change then restore back",
-     "narrowing type changes are not supported",
--    "arbitrary type changes are not supported",
--    "type change in delta source writing to a delta sink"
-+    "arbitrary type changes are not supported"
+     "arbitrary type changes are not supported",
+     "type change in delta source writing to a delta sink"
    )
 -}
 -
spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java
@@ -122,9 +122,6 @@
 +            schemaReadOptions,
 +            metadataTrackingLog,
 +            readSnapshotAtSourceInit.getMetadata(),
-+            readSchemaAtSourceInit,
-+            this.partitionSchema,
-+            readConfigurationsAtSourceInit,
 +            readProtocolAtSourceInit,
 +            metadataPath);
      boolean shouldValidateSchemaOnRestart =
spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaV2SourceSuite.scala
@@ -1,17 +0,0 @@
-diff --git a/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaV2SourceSuite.scala b/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaV2SourceSuite.scala
---- a/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaV2SourceSuite.scala
-+++ b/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaV2SourceSuite.scala
-     "rename column: should fail with non-additive schema change error",
-     "rename column: should throw schema change error with unsafe flag enabled",
-     "type widening: should fail with non-additive schema change error when enable schema tracking",
-+    "handling nullability schema changes", // Uses .table() directly
- 
-     // === Read options ===
-     "excludeRegex works and doesn't mess up offsets across restarts - parquet version",
-     "disallow user specified schema", // Uses .schema() directly
-     "make sure that the delta sources works fine", // Uses .delta() directly
-     "self union a Delta table should pass the catalog table assert", // Uses .table() directly
--    "handling nullability schema changes", // Uses .table() directly
-     "allow user specified schema if consistent: v1 source", // Uses DataSource directly
-     // Calls deltaSource.createSource() directly
-     "createSource should create source with empty or matching table schema provided"
\ No newline at end of file

Reproduce locally: git range-diff 6c83dbc..d8362c3 afe7993..d344128 | Disable: git config gitstack.push-range-diff false

@PorridgeSwim PorridgeSwim force-pushed the stack/NonAdditiveSchemaEvolution3 branch from d344128 to c4fca36 Compare May 5, 2026 20:16
@PorridgeSwim
Copy link
Copy Markdown
Collaborator Author

Range-diff: stack/NonAdditiveSchemaEvolution2 (d344128 -> c4fca36)
spark/v2/src/test/java/io/delta/spark/internal/v2/read/SparkMicroBatchStreamTest.java
@@ -1,6 +1,13 @@
 diff --git a/spark/v2/src/test/java/io/delta/spark/internal/v2/read/SparkMicroBatchStreamTest.java b/spark/v2/src/test/java/io/delta/spark/internal/v2/read/SparkMicroBatchStreamTest.java
 --- a/spark/v2/src/test/java/io/delta/spark/internal/v2/read/SparkMicroBatchStreamTest.java
 +++ b/spark/v2/src/test/java/io/delta/spark/internal/v2/read/SparkMicroBatchStreamTest.java
+ import java.util.Set;
+ import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.concurrent.atomic.AtomicInteger;
++import java.util.function.Consumer;
+ import java.util.stream.Collectors;
+ import java.util.stream.Stream;
+ import org.apache.hadoop.conf.Configuration;
  import org.apache.spark.sql.delta.*;
  import org.apache.spark.sql.delta.sources.DeltaSQLConf;
  import org.apache.spark.sql.delta.sources.DeltaSource;
@@ -33,97 +40,365 @@
          "v1 connector and v2 connector should throw the same error messages on stream start schema changes");
    }
  
-+  @Test
-+  public void testSchemaTrackingLifecycle_dropColumn_e2e(@TempDir File tempDir) throws Exception {
++  @ParameterizedTest(name = "{3}")
++  @MethodSource("nonAdditiveSchemaEvolutionLifecycleScenarios")
++  public void testSchemaTrackingLifecycle_forward(
++      ScenarioSetup scenarioSetup,
++      Map<String, String> sparkConf,
++      Consumer<StructType> assertPostChangeSchema,
++      String description,
++      @TempDir File tempDir)
++      throws Exception {
++    String tablePath = tempDir.getAbsolutePath();
++    String tableName =
++        "test_lifecycle_fwd_" + Math.abs(description.hashCode()) + "_" + System.nanoTime();
++
++    try {
++      sparkConf.forEach((k, v) -> spark.conf().set(k, v));
++
++      createSchemaEvolutionTestTable(tablePath, tableName);
++      DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(tablePath));
++      long startVersion = deltaLog.update(false, Option.empty(), Option.empty()).version();
++      // createSchemaEvolutionTestTable inserts 2 rows.
++      long preChangeRowCount = 2L;
++
++      Configuration hadoopConf = new Configuration();
++      PathBasedSnapshotManager snapshotManager =
++          new PathBasedSnapshotManager(tablePath, hadoopConf);
++      String schemaTrackingLocation = new File(tempDir, "schema_tracking").getAbsolutePath();
++      String checkpointLocation = new File(tempDir, "checkpoint").getAbsolutePath();
++      java.util.Map<String, String> optionMap =
++          Collections.singletonMap("startingVersion", String.valueOf(startVersion));
++      DeltaOptions options = createDeltaOptions(optionMap);
++      DeltaSourceMetadataTrackingLog trackingLog =
++          createTrackingLog(snapshotManager, schemaTrackingLocation, checkpointLocation, optionMap);
++
++      // Stream constructed BEFORE the schema change — pre-change snapshot is still the latest.
++      StructType preChangeSchema = loadSparkSchemaAtVersion(snapshotManager, startVersion);
++      SparkMicroBatchStream streamBefore =
++          createSchemaTrackingTestStream(
++              snapshotManager,
++              hadoopConf,
++              options,
++              tablePath,
++              preChangeSchema,
++              Option.apply(trackingLog),
++              checkpointLocation);
++
++      // Apply non-additive schema change AFTER stream init.
++      scenarioSetup.setup(tableName, tempDir);
++      long schemaChangeVersion = deltaLog.update(false, Option.empty(), Option.empty()).version();
++
++      Offset startOffset = streamBefore.initialOffset();
++      DeltaSourceOffset startDelta = (DeltaSourceOffset) startOffset;
++      assertEquals(startVersion, startDelta.reservoirVersion());
++      assertEquals(DeltaSourceOffset.BASE_INDEX(), startDelta.index());
++      assertFalse(startDelta.isInitialSnapshot());
++
++      // First latestOffset eagerly initializes the log to the pre-change schema (no throw because
++      // the fresh stream's read schema matches). Then it scans forward and stops at the barrier.
++      Offset barrierOffset = streamBefore.latestOffset(startOffset, ReadLimit.allAvailable());
++      DeltaSourceOffset barrierDelta = (DeltaSourceOffset) barrierOffset;
++      assertEquals(schemaChangeVersion, barrierDelta.reservoirVersion());
++      assertEquals(DeltaSourceOffset.METADATA_CHANGE_INDEX(), barrierDelta.index());
++
++      PersistedMetadata initialized = trackingLog.getCurrentTrackedMetadata().get();
++      assertEquals(startVersion, initialized.deltaCommitVersion());
++
++      assertEquals(
++          preChangeRowCount, countRowsBetweenOffsets(streamBefore, startOffset, barrierOffset));
++
++      DeltaRuntimeException barrierEx =
++          assertThrows(DeltaRuntimeException.class, () -> streamBefore.commit(barrierOffset));
++      assertMetadataEvolutionException(barrierEx, "on barrier commit (forward)");
++
++      PersistedMetadata evolved = trackingLog.getCurrentTrackedMetadata().get();
++      assertEquals(schemaChangeVersion, evolved.deltaCommitVersion());
++      assertPostChangeSchema.accept(evolved.dataSchema());
++
++      // Restart with the post-change schema (same trackingLog: it now carries the evolved entry).
++      StructType postChangeSchema =
++          io.delta.spark.internal.v2.utils.SchemaUtils.convertKernelSchemaToSparkSchema(
++              snapshotManager.loadLatestSnapshot().getSchema());
++      SparkMicroBatchStream streamAfter =
++          createSchemaTrackingTestStream(
++              snapshotManager,
++              hadoopConf,
++              options,
++              tablePath,
++              postChangeSchema,
++              Option.apply(trackingLog),
++              checkpointLocation);
++
++      Offset postBarrierOffset = streamAfter.latestOffset(barrierOffset, ReadLimit.allAvailable());
++      DeltaSourceOffset postBarrierDelta = (DeltaSourceOffset) postBarrierOffset;
++      assertEquals(schemaChangeVersion, postBarrierDelta.reservoirVersion());
++      assertEquals(DeltaSourceOffset.POST_METADATA_CHANGE_INDEX(), postBarrierDelta.index());
++      assertEquals(0L, countRowsBetweenOffsets(streamAfter, barrierOffset, postBarrierOffset));
++      assertDoesNotThrow(() -> streamAfter.commit(postBarrierOffset));
++    } finally {
++      sparkConf.keySet().forEach(k -> spark.conf().unset(k));
++    }
++  }
++
++  /**
++   * Backfill counterpart: schema change is applied BEFORE stream construction, so the source's
++   * snapshot-at-init is post-change but the streaming starts at a pre-change version. Three
++   * restarts are needed:
++   *
++   * <ol>
++   *   <li>First {@code latestOffset} runs the eager-init path, which writes the pre-change schema
++   *       to the empty log and throws (post-change ≠ pre-change).
++   *   <li>Restart with pre-change schema: {@code latestOffset} returns the barrier; pre-change rows
++   *       are read; {@code commit(barrier)} writes the post-change entry and throws.
++   *   <li>Restart with post-change schema: barrier advances to {@code POST_METADATA_CHANGE_INDEX}
++   *       and {@code commit} succeeds.
++   * </ol>
++   */
++  @ParameterizedTest(name = "{3}")
++  @MethodSource("nonAdditiveSchemaEvolutionLifecycleScenarios")
++  public void testSchemaTrackingLifecycle_backfill(
++      ScenarioSetup scenarioSetup,
++      Map<String, String> sparkConf,
++      Consumer<StructType> assertPostChangeSchema,
++      String description,
++      @TempDir File tempDir)
++      throws Exception {
 +    String tablePath = tempDir.getAbsolutePath();
-+    String tableName = "test_schema_tracking_drop_column_" + System.nanoTime();
-+    createSchemaEvolutionTestTable(tablePath, tableName);
++    String tableName =
++        "test_lifecycle_backfill_" + Math.abs(description.hashCode()) + "_" + System.nanoTime();
 +
-+    DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(tablePath));
-+    long startVersion = deltaLog.update(false, Option.empty(), Option.empty()).version();
++    try {
++      sparkConf.forEach((k, v) -> spark.conf().set(k, v));
 +
-+    Configuration hadoopConf = new Configuration();
-+    PathBasedSnapshotManager snapshotManager = new PathBasedSnapshotManager(tablePath, hadoopConf);
-+    String schemaTrackingLocation = new File(tempDir, "schema_tracking").getAbsolutePath();
-+    String checkpointLocation = new File(tempDir, "checkpoint").getAbsolutePath();
-+    java.util.Map<String, String> optionMap =
-+        Collections.singletonMap("startingVersion", String.valueOf(startVersion));
-+    DeltaOptions options = createDeltaOptions(optionMap);
-+    DeltaSourceMetadataTrackingLog trackingLog =
-+        createTrackingLog(snapshotManager, schemaTrackingLocation, checkpointLocation, optionMap);
++      createSchemaEvolutionTestTable(tablePath, tableName);
++      DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(tablePath));
++      long startVersion = deltaLog.update(false, Option.empty(), Option.empty()).version();
++      long preChangeRowCount = 2L;
 +
-+    StructType preDropSchema = loadSparkSchemaAtVersion(snapshotManager, startVersion);
-+    SparkMicroBatchStream streamBeforeDrop =
-+        createSchemaTrackingTestStream(
-+            snapshotManager,
-+            hadoopConf,
-+            options,
-+            tablePath,
-+            preDropSchema,
-+            Option.apply(trackingLog),
-+            checkpointLocation);
++      // Apply non-additive schema change BEFORE stream construction.
++      scenarioSetup.setup(tableName, tempDir);
++      long schemaChangeVersion = deltaLog.update(false, Option.empty(), Option.empty()).version();
 +
-+    sql("ALTER TABLE %s DROP COLUMNS (value)", tableName);
-+    long schemaChangeVersion = deltaLog.update(false, Option.empty(), Option.empty()).version();
-+    sql("INSERT INTO %s VALUES (3, 'Cathy', named_struct('col1', 18, 'col2', 'SF'))", tableName);
++      Configuration hadoopConf = new Configuration();
++      PathBasedSnapshotManager snapshotManager =
++          new PathBasedSnapshotManager(tablePath, hadoopConf);
++      String schemaTrackingLocation = new File(tempDir, "schema_tracking").getAbsolutePath();
++      String checkpointLocation = new File(tempDir, "checkpoint").getAbsolutePath();
++      java.util.Map<String, String> optionMap =
++          Collections.singletonMap("startingVersion", String.valueOf(startVersion));
++      DeltaOptions options = createDeltaOptions(optionMap);
++      DeltaSourceMetadataTrackingLog trackingLog =
++          createTrackingLog(snapshotManager, schemaTrackingLocation, checkpointLocation, optionMap);
 +
-+    Offset startOffset = streamBeforeDrop.initialOffset();
-+    DeltaSourceOffset startDeltaOffset = (DeltaSourceOffset) startOffset;
-+    assertEquals(startVersion, startDeltaOffset.reservoirVersion());
-+    assertEquals(DeltaSourceOffset.BASE_INDEX(), startDeltaOffset.index());
-+    assertFalse(startDeltaOffset.isInitialSnapshot());
++      StructType preChangeSchema = loadSparkSchemaAtVersion(snapshotManager, startVersion);
++      StructType postChangeSchema =
++          io.delta.spark.internal.v2.utils.SchemaUtils.convertKernelSchemaToSparkSchema(
++              snapshotManager.loadLatestSnapshot().getSchema());
 +
-+    Offset barrierOffset = streamBeforeDrop.latestOffset(startOffset, ReadLimit.allAvailable());
-+    DeltaSourceOffset barrierDeltaOffset = (DeltaSourceOffset) barrierOffset;
-+    assertEquals(schemaChangeVersion, barrierDeltaOffset.reservoirVersion());
-+    assertEquals(DeltaSourceOffset.METADATA_CHANGE_INDEX(), barrierDeltaOffset.index());
++      // Stream 1: post-change schema (analysis would bind to the latest snapshot since the log
++      // is empty). First latestOffset runs eager-init and throws.
++      SparkMicroBatchStream stream1 =
++          createSchemaTrackingTestStream(
++              snapshotManager,
++              hadoopConf,
++              options,
++              tablePath,
++              postChangeSchema,
++              Option.apply(trackingLog),
++              checkpointLocation);
++      Offset start1 = stream1.initialOffset();
++      DeltaRuntimeException initEx =
++          assertThrows(
++              DeltaRuntimeException.class,
++              () -> stream1.latestOffset(start1, ReadLimit.allAvailable()));
++      assertMetadataEvolutionException(initEx, "during backfill log initialization");
 +
-+    PersistedMetadata initializedMetadata = trackingLog.getCurrentTrackedMetadata().get();
-+    assertEquals(startVersion, initializedMetadata.deltaCommitVersion());
-+    assertThat(Arrays.asList(initializedMetadata.dataSchema().fieldNames())).contains("value");
-+    assertEquals(
-+        List.of(1, 2), readIdsBetweenOffsets(streamBeforeDrop, startOffset, barrierOffset));
++      PersistedMetadata afterInit = trackingLog.getCurrentTrackedMetadata().get();
++      assertEquals(startVersion, afterInit.deltaCommitVersion());
 +
-+    DeltaRuntimeException evolutionException =
-+        assertThrows(DeltaRuntimeException.class, () -> streamBeforeDrop.commit(barrierOffset));
-+    assertMetadataEvolutionException(
-+        evolutionException, "when committing the metadata change barrier");
++      // Stream 2: pre-change schema (the log entry now carries it).
++      SparkMicroBatchStream stream2 =
++          createSchemaTrackingTestStream(
++              snapshotManager,
++              hadoopConf,
++              options,
++              tablePath,
++              preChangeSchema,
++              Option.apply(trackingLog),
++              checkpointLocation);
++      Offset start2 = stream2.initialOffset();
++      Offset barrierOffset = stream2.latestOffset(start2, ReadLimit.allAvailable());
++      DeltaSourceOffset barrierDelta = (DeltaSourceOffset) barrierOffset;
++      assertEquals(schemaChangeVersion, barrierDelta.reservoirVersion());
++      assertEquals(DeltaSourceOffset.METADATA_CHANGE_INDEX(), barrierDelta.index());
 +
-+    PersistedMetadata evolvedMetadata = trackingLog.getCurrentTrackedMetadata().get();
-+    assertEquals(schemaChangeVersion, evolvedMetadata.deltaCommitVersion());
-+    assertThat(Arrays.asList(evolvedMetadata.dataSchema().fieldNames())).doesNotContain("value");
++      assertEquals(preChangeRowCount, countRowsBetweenOffsets(stream2, start2, barrierOffset));
 +
-+    StructType postDropSchema =
-+        io.delta.spark.internal.v2.utils.SchemaUtils.convertKernelSchemaToSparkSchema(
-+            snapshotManager.loadLatestSnapshot().getSchema());
-+    SparkMicroBatchStream adoptedStream =
-+        createSchemaTrackingTestStream(
-+            snapshotManager,
-+            hadoopConf,
-+            options,
-+            tablePath,
-+            postDropSchema,
-+            Option.apply(trackingLog),
-+            checkpointLocation);
++      DeltaRuntimeException barrierEx =
++          assertThrows(DeltaRuntimeException.class, () -> stream2.commit(barrierOffset));
++      assertMetadataEvolutionException(barrierEx, "on barrier commit (backfill)");
 +
-+    Offset postBarrierOffset = adoptedStream.latestOffset(barrierOffset, ReadLimit.allAvailable());
-+    DeltaSourceOffset postBarrierDeltaOffset = (DeltaSourceOffset) postBarrierOffset;
-+    assertEquals(schemaChangeVersion, postBarrierDeltaOffset.reservoirVersion());
-+    assertEquals(DeltaSourceOffset.POST_METADATA_CHANGE_INDEX(), postBarrierDeltaOffset.index());
-+    assertTrue(readIdsBetweenOffsets(adoptedStream, barrierOffset, postBarrierOffset).isEmpty());
-+    assertDoesNotThrow(() -> adoptedStream.commit(postBarrierOffset));
++      PersistedMetadata evolved = trackingLog.getCurrentTrackedMetadata().get();
++      assertEquals(schemaChangeVersion, evolved.deltaCommitVersion());
++      assertPostChangeSchema.accept(evolved.dataSchema());
 +
-+    Offset postDropDataOffset =
-+        adoptedStream.latestOffset(postBarrierOffset, ReadLimit.allAvailable());
-+    assertNotNull(postDropDataOffset);
-+    assertEquals(
-+        List.of(3), readIdsBetweenOffsets(adoptedStream, postBarrierOffset, postDropDataOffset));
-+    assertDoesNotThrow(() -> adoptedStream.commit(postDropDataOffset));
++      // Stream 3: post-change schema. Advance barrier → POST_BARRIER, commit succeeds.
++      SparkMicroBatchStream stream3 =
++          createSchemaTrackingTestStream(
++              snapshotManager,
++              hadoopConf,
++              options,
++              tablePath,
++              postChangeSchema,
++              Option.apply(trackingLog),
++              checkpointLocation);
++      Offset postBarrierOffset = stream3.latestOffset(barrierOffset, ReadLimit.allAvailable());
++      DeltaSourceOffset postBarrierDelta = (DeltaSourceOffset) postBarrierOffset;
++      assertEquals(schemaChangeVersion, postBarrierDelta.reservoirVersion());
++      assertEquals(DeltaSourceOffset.POST_METADATA_CHANGE_INDEX(), postBarrierDelta.index());
++      assertEquals(0L, countRowsBetweenOffsets(stream3, barrierOffset, postBarrierOffset));
++      assertDoesNotThrow(() -> stream3.commit(postBarrierOffset));
++    } finally {
++      sparkConf.keySet().forEach(k -> spark.conf().unset(k));
++    }
 +  }
 +
    /** Provides test scenarios that generate additive schema changes actions. */
    private static Stream<Arguments> additiveSchemaEvolutionScenarios() {
      return Stream.of(
+                 "false"),
+             "Drop column in nested struct"),
+ 
++        // Rename column in nested struct
++        Arguments.of(
++            (ScenarioSetup)
++                (tableName, tempDir) -> {
++                  sql("ALTER TABLE %s RENAME COLUMN info.col1 TO newCol1", tableName);
++                },
++            /* sparkConf */ Map.of(
++                DeltaSQLConf
++                    .DELTA_STREAMING_UNSAFE_READ_ON_INCOMPATIBLE_COLUMN_MAPPING_SCHEMA_CHANGES()
++                    .key(),
++                "false"),
++            "Rename column in nested struct"),
++
+         // Widen INT column to BIGINT
+         Arguments.of(
+             (ScenarioSetup)
+             // non-additive
+             /* sparkConf */ Map.of(
+                 DeltaSQLConf.DELTA_TYPE_WIDENING_ENABLE_STREAMING_SCHEMA_TRACKING().key(), "true"),
+-            "Widen INT column to BIGINT"));
++            "Widen INT column to BIGINT"),
++
++        // Widen INT column to BIGINT in nested struct
++        Arguments.of(
++            (ScenarioSetup)
++                (tableName, tempDir) -> {
++                  sql("ALTER TABLE %s ALTER COLUMN info.col1 TYPE BIGINT", tableName);
++                },
++            /* sparkConf */ Map.of(
++                DeltaSQLConf.DELTA_TYPE_WIDENING_ENABLE_STREAMING_SCHEMA_TRACKING().key(), "true"),
++            "Widen INT column to BIGINT in nested struct"));
++  }
++
++  /**
++   * Scenarios for the schema-tracking lifecycle tests. Each scenario carries a per-shape predicate
++   * that asserts the post-change schema persisted in the tracking log after the barrier commit.
++   * Pre-change schema is always the fixture's schema, so it doesn't need to be parameterized.
++   */
++  private static Stream<Arguments> nonAdditiveSchemaEvolutionLifecycleScenarios() {
++    return Stream.of(
++        // Drop column
++        Arguments.of(
++            (ScenarioSetup)
++                (tableName, tempDir) -> sql("ALTER TABLE %s DROP COLUMNS (value)", tableName),
++            /* sparkConf */ Map.<String, String>of(),
++            (Consumer<StructType>)
++                schema -> assertThat(Arrays.asList(schema.fieldNames())).doesNotContain("value"),
++            "Drop column"),
++
++        // Drop nullable, non-nullable and struct columns (only `name` remains)
++        Arguments.of(
++            (ScenarioSetup)
++                (tableName, tempDir) ->
++                    sql("ALTER TABLE %s DROP COLUMNS (id, value, info)", tableName),
++            /* sparkConf */ Map.of(
++                DeltaSQLConf
++                    .DELTA_STREAMING_UNSAFE_READ_ON_INCOMPATIBLE_COLUMN_MAPPING_SCHEMA_CHANGES()
++                    .key(),
++                "false"),
++            (Consumer<StructType>)
++                schema -> assertThat(Arrays.asList(schema.fieldNames())).containsExactly("name"),
++            "Drop nullable, non-nullable and struct columns"),
++
++        // Rename top-level column
++        Arguments.of(
++            (ScenarioSetup)
++                (tableName, tempDir) -> sql("ALTER TABLE %s RENAME COLUMN id TO userId", tableName),
++            /* sparkConf */ Map.<String, String>of(),
++            (Consumer<StructType>)
++                schema ->
++                    assertThat(Arrays.asList(schema.fieldNames()))
++                        .contains("userId")
++                        .doesNotContain("id"),
++            "Rename column"),
++
++        // Drop column in nested struct
++        Arguments.of(
++            (ScenarioSetup)
++                (tableName, tempDir) -> sql("ALTER TABLE %s DROP COLUMNS info.col1", tableName),
++            /* sparkConf */ Map.<String, String>of(),
++            (Consumer<StructType>)
++                schema -> {
++                  StructType info = (StructType) schema.apply("info").dataType();
++                  assertThat(Arrays.asList(info.fieldNames())).doesNotContain("col1");
++                },
++            "Drop column in nested struct"),
++
++        // Rename column in nested struct
++        Arguments.of(
++            (ScenarioSetup)
++                (tableName, tempDir) ->
++                    sql("ALTER TABLE %s RENAME COLUMN info.col1 TO newCol1", tableName),
++            /* sparkConf */ Map.<String, String>of(),
++            (Consumer<StructType>)
++                schema -> {
++                  StructType info = (StructType) schema.apply("info").dataType();
++                  assertThat(Arrays.asList(info.fieldNames()))
++                      .contains("newCol1")
++                      .doesNotContain("col1");
++                },
++            "Rename column in nested struct"),
++
++        // Widen INT column to BIGINT
++        Arguments.of(
++            (ScenarioSetup)
++                (tableName, tempDir) ->
++                    sql("ALTER TABLE %s ALTER COLUMN id TYPE BIGINT", tableName),
++            /* sparkConf */ Map.of(
++                DeltaSQLConf.DELTA_TYPE_WIDENING_ENABLE_STREAMING_SCHEMA_TRACKING().key(), "true"),
++            (Consumer<StructType>)
++                schema -> assertEquals(DataTypes.LongType, schema.apply("id").dataType()),
++            "Widen INT column to BIGINT"),
++
++        // Widen INT column to BIGINT in nested struct
++        Arguments.of(
++            (ScenarioSetup)
++                (tableName, tempDir) ->
++                    sql("ALTER TABLE %s ALTER COLUMN info.col1 TYPE BIGINT", tableName),
++            /* sparkConf */ Map.of(
++                DeltaSQLConf.DELTA_TYPE_WIDENING_ENABLE_STREAMING_SCHEMA_TRACKING().key(), "true"),
++            (Consumer<StructType>)
++                schema -> {
++                  StructType info = (StructType) schema.apply("info").dataType();
++                  assertEquals(DataTypes.LongType, info.apply("col1").dataType());
++                },
++            "Widen INT column to BIGINT in nested struct"));
+   }
+ 
+   // ================================================================================================
    private SparkMicroBatchStream createTestStreamWithDefaults(
        PathBasedSnapshotManager snapshotManager, Configuration hadoopConf, DeltaOptions options) {
      io.delta.kernel.Snapshot snapshot = snapshotManager.loadLatestSnapshot();
@@ -230,6 +505,41 @@
 +    return ids;
 +  }
 +
++  /**
++   * Reads rows between offsets and returns just the count. Used by lifecycle tests where the
++   * specific column values are not the focus and reading by column name/ordinal would break across
++   * scenarios that drop or rename the leading column.
++   */
++  private long countRowsBetweenOffsets(
++      SparkMicroBatchStream stream, Offset startOffset, Offset endOffset) throws Exception {
++    InputPartition[] partitions = stream.planInputPartitions(startOffset, endOffset);
++    PartitionReaderFactory readerFactory = stream.createReaderFactory();
++    long count = 0L;
++    for (InputPartition partition : partitions) {
++      if (readerFactory.supportColumnarReads(partition)) {
++        PartitionReader<org.apache.spark.sql.vectorized.ColumnarBatch> reader =
++            readerFactory.createColumnarReader(partition);
++        try {
++          while (reader.next()) {
++            count += reader.get().numRows();
++          }
++        } finally {
++          reader.close();
++        }
++      } else {
++        PartitionReader<InternalRow> reader = readerFactory.createReader(partition);
++        try {
++          while (reader.next()) {
++            count++;
++          }
++        } finally {
++          reader.close();
++        }
++      }
++    }
++    return count;
++  }
++
 +  private static void assertMetadataEvolutionException(DeltaRuntimeException ex, String context) {
 +    assertEquals(
 +        "DELTA_STREAMING_METADATA_EVOLUTION",

Reproduce locally: git range-diff afe7993..d344128 afe7993..c4fca36 | Disable: git config gitstack.push-range-diff false

@PorridgeSwim PorridgeSwim force-pushed the stack/NonAdditiveSchemaEvolution3 branch 2 times, most recently from a35f8b0 to 0148020 Compare May 5, 2026 20:47
@PorridgeSwim
Copy link
Copy Markdown
Collaborator Author

Range-diff: stack/NonAdditiveSchemaEvolution2 (a35f8b0 -> 0148020)
spark/v2/src/test/java/io/delta/spark/internal/v2/read/SparkMicroBatchStreamTest.java
@@ -40,7 +40,7 @@
          "v1 connector and v2 connector should throw the same error messages on stream start schema changes");
    }
  
-+  @ParameterizedTest(name = "{3}")
++  @ParameterizedTest
 +  @MethodSource("nonAdditiveSchemaEvolutionLifecycleScenarios")
 +  public void testSchemaTrackingLifecycle_forward(
 +      ScenarioSetup scenarioSetup,
@@ -155,7 +155,7 @@
 +   *       and {@code commit} succeeds.
 +   * </ol>
 +   */
-+  @ParameterizedTest(name = "{3}")
++  @ParameterizedTest
 +  @MethodSource("nonAdditiveSchemaEvolutionLifecycleScenarios")
 +  public void testSchemaTrackingLifecycle_backfill(
 +      ScenarioSetup scenarioSetup,

Reproduce locally: git range-diff afe7993..a35f8b0 c025b7c..0148020 | Disable: git config gitstack.push-range-diff false

@PorridgeSwim PorridgeSwim force-pushed the stack/NonAdditiveSchemaEvolution3 branch from 0148020 to db16b9f Compare May 5, 2026 22:10
@PorridgeSwim
Copy link
Copy Markdown
Collaborator Author

Range-diff: stack/NonAdditiveSchemaEvolution2 (0148020 -> db16b9f)
spark/v2/src/test/java/io/delta/spark/internal/v2/read/SparkMicroBatchStreamTest.java
@@ -40,31 +40,46 @@
          "v1 connector and v2 connector should throw the same error messages on stream start schema changes");
    }
  
++  /**
++   * Forward counterpart: schema change is applied AFTER stream construction, so the source's
++   * snapshot-at-init is pre-change. Only one restart is needed:
++   *
++   * <ol>
++   *   <li>First {@code latestOffset} runs the eager-init path, which writes the pre-change schema
++   *       to the empty log. No throw, because the fresh stream's read schema matches what
++   *       eager-init wrote. {@code latestOffset} then scans forward and stops at the barrier.
++   *   <li>{@code commit(barrier)} writes the post-change entry to the log and throws.
++   *   <li>Restart with post-change schema: barrier advances to {@code POST_METADATA_CHANGE_INDEX}
++   *       and {@code commit} succeeds. A row written after the schema change is then read through
++   *       the evolved stream to confirm the new schema is honored end-to-end.
++   * </ol>
++   */
 +  @ParameterizedTest
 +  @MethodSource("nonAdditiveSchemaEvolutionLifecycleScenarios")
 +  public void testSchemaTrackingLifecycle_forward(
 +      ScenarioSetup scenarioSetup,
 +      Map<String, String> sparkConf,
++      ScenarioSetup insertPostChangeRow,
 +      Consumer<StructType> assertPostChangeSchema,
-+      String description,
++      String testDescription,
 +      @TempDir File tempDir)
 +      throws Exception {
-+    String tablePath = tempDir.getAbsolutePath();
-+    String tableName =
-+        "test_lifecycle_fwd_" + Math.abs(description.hashCode()) + "_" + System.nanoTime();
++    String testTablePath = tempDir.getAbsolutePath();
++    String testTableName =
++        "test_forward_lifecycle_" + Math.abs(testDescription.hashCode()) + "_" + System.nanoTime();
 +
 +    try {
 +      sparkConf.forEach((k, v) -> spark.conf().set(k, v));
 +
-+      createSchemaEvolutionTestTable(tablePath, tableName);
-+      DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(tablePath));
++      createSchemaEvolutionTestTable(testTablePath, testTableName);
++      DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(testTablePath));
 +      long startVersion = deltaLog.update(false, Option.empty(), Option.empty()).version();
 +      // createSchemaEvolutionTestTable inserts 2 rows.
 +      long preChangeRowCount = 2L;
 +
 +      Configuration hadoopConf = new Configuration();
 +      PathBasedSnapshotManager snapshotManager =
-+          new PathBasedSnapshotManager(tablePath, hadoopConf);
++          new PathBasedSnapshotManager(testTablePath, hadoopConf);
 +      String schemaTrackingLocation = new File(tempDir, "schema_tracking").getAbsolutePath();
 +      String checkpointLocation = new File(tempDir, "checkpoint").getAbsolutePath();
 +      java.util.Map<String, String> optionMap =
@@ -75,21 +90,21 @@
 +
 +      // Stream constructed BEFORE the schema change — pre-change snapshot is still the latest.
 +      StructType preChangeSchema = loadSparkSchemaAtVersion(snapshotManager, startVersion);
-+      SparkMicroBatchStream streamBefore =
++      SparkMicroBatchStream streamPreChange =
 +          createSchemaTrackingTestStream(
 +              snapshotManager,
 +              hadoopConf,
 +              options,
-+              tablePath,
++              testTablePath,
 +              preChangeSchema,
 +              Option.apply(trackingLog),
 +              checkpointLocation);
 +
 +      // Apply non-additive schema change AFTER stream init.
-+      scenarioSetup.setup(tableName, tempDir);
++      scenarioSetup.setup(testTableName, tempDir);
 +      long schemaChangeVersion = deltaLog.update(false, Option.empty(), Option.empty()).version();
 +
-+      Offset startOffset = streamBefore.initialOffset();
++      Offset startOffset = streamPreChange.initialOffset();
 +      DeltaSourceOffset startDelta = (DeltaSourceOffset) startOffset;
 +      assertEquals(startVersion, startDelta.reservoirVersion());
 +      assertEquals(DeltaSourceOffset.BASE_INDEX(), startDelta.index());
@@ -97,7 +112,7 @@
 +
 +      // First latestOffset eagerly initializes the log to the pre-change schema (no throw because
 +      // the fresh stream's read schema matches). Then it scans forward and stops at the barrier.
-+      Offset barrierOffset = streamBefore.latestOffset(startOffset, ReadLimit.allAvailable());
++      Offset barrierOffset = streamPreChange.latestOffset(startOffset, ReadLimit.allAvailable());
 +      DeltaSourceOffset barrierDelta = (DeltaSourceOffset) barrierOffset;
 +      assertEquals(schemaChangeVersion, barrierDelta.reservoirVersion());
 +      assertEquals(DeltaSourceOffset.METADATA_CHANGE_INDEX(), barrierDelta.index());
@@ -106,10 +121,10 @@
 +      assertEquals(startVersion, initialized.deltaCommitVersion());
 +
 +      assertEquals(
-+          preChangeRowCount, countRowsBetweenOffsets(streamBefore, startOffset, barrierOffset));
++          preChangeRowCount, countRowsBetweenOffsets(streamPreChange, startOffset, barrierOffset));
 +
 +      DeltaRuntimeException barrierEx =
-+          assertThrows(DeltaRuntimeException.class, () -> streamBefore.commit(barrierOffset));
++          assertThrows(DeltaRuntimeException.class, () -> streamPreChange.commit(barrierOffset));
 +      assertMetadataEvolutionException(barrierEx, "on barrier commit (forward)");
 +
 +      PersistedMetadata evolved = trackingLog.getCurrentTrackedMetadata().get();
@@ -120,22 +135,36 @@
 +      StructType postChangeSchema =
 +          io.delta.spark.internal.v2.utils.SchemaUtils.convertKernelSchemaToSparkSchema(
 +              snapshotManager.loadLatestSnapshot().getSchema());
-+      SparkMicroBatchStream streamAfter =
++      SparkMicroBatchStream streamPostChange =
 +          createSchemaTrackingTestStream(
 +              snapshotManager,
 +              hadoopConf,
 +              options,
-+              tablePath,
++              testTablePath,
 +              postChangeSchema,
 +              Option.apply(trackingLog),
 +              checkpointLocation);
 +
-+      Offset postBarrierOffset = streamAfter.latestOffset(barrierOffset, ReadLimit.allAvailable());
++      Offset postBarrierOffset =
++          streamPostChange.latestOffset(barrierOffset, ReadLimit.allAvailable());
 +      DeltaSourceOffset postBarrierDelta = (DeltaSourceOffset) postBarrierOffset;
 +      assertEquals(schemaChangeVersion, postBarrierDelta.reservoirVersion());
 +      assertEquals(DeltaSourceOffset.POST_METADATA_CHANGE_INDEX(), postBarrierDelta.index());
-+      assertEquals(0L, countRowsBetweenOffsets(streamAfter, barrierOffset, postBarrierOffset));
-+      assertDoesNotThrow(() -> streamAfter.commit(postBarrierOffset));
++      assertEquals(0L, countRowsBetweenOffsets(streamPostChange, barrierOffset, postBarrierOffset));
++      assertDoesNotThrow(() -> streamPostChange.commit(postBarrierOffset));
++
++      // Successful commit must leave the evolved entry in place — not roll back or double-evolve.
++      PersistedMetadata afterPostBarrierCommit = trackingLog.getCurrentTrackedMetadata().get();
++      assertEquals(schemaChangeVersion, afterPostBarrierCommit.deltaCommitVersion());
++      assertPostChangeSchema.accept(afterPostBarrierCommit.dataSchema());
++
++      // Insert a row shaped to the post-change schema and read it through the evolved stream.
++      insertPostChangeRow.setup(testTableName, tempDir);
++      Offset postInsertOffset =
++          streamPostChange.latestOffset(postBarrierOffset, ReadLimit.allAvailable());
++      assertEquals(
++          1L, countRowsBetweenOffsets(streamPostChange, postBarrierOffset, postInsertOffset));
++      assertDoesNotThrow(() -> streamPostChange.commit(postInsertOffset));
 +    } finally {
 +      sparkConf.keySet().forEach(k -> spark.conf().unset(k));
 +    }
@@ -152,7 +181,8 @@
 +   *   <li>Restart with pre-change schema: {@code latestOffset} returns the barrier; pre-change rows
 +   *       are read; {@code commit(barrier)} writes the post-change entry and throws.
 +   *   <li>Restart with post-change schema: barrier advances to {@code POST_METADATA_CHANGE_INDEX}
-+   *       and {@code commit} succeeds.
++   *       and {@code commit} succeeds. A row written after the schema change is then read through
++   *       the evolved stream to confirm the new schema is honored end-to-end.
 +   * </ol>
 +   */
 +  @ParameterizedTest
@@ -160,29 +190,30 @@
 +  public void testSchemaTrackingLifecycle_backfill(
 +      ScenarioSetup scenarioSetup,
 +      Map<String, String> sparkConf,
++      ScenarioSetup insertPostChangeRow,
 +      Consumer<StructType> assertPostChangeSchema,
-+      String description,
++      String testDescription,
 +      @TempDir File tempDir)
 +      throws Exception {
-+    String tablePath = tempDir.getAbsolutePath();
-+    String tableName =
-+        "test_lifecycle_backfill_" + Math.abs(description.hashCode()) + "_" + System.nanoTime();
++    String testTablePath = tempDir.getAbsolutePath();
++    String testTableName =
++        "test_backfill_lifecycle_" + Math.abs(testDescription.hashCode()) + "_" + System.nanoTime();
 +
 +    try {
 +      sparkConf.forEach((k, v) -> spark.conf().set(k, v));
 +
-+      createSchemaEvolutionTestTable(tablePath, tableName);
-+      DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(tablePath));
++      createSchemaEvolutionTestTable(testTablePath, testTableName);
++      DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(testTablePath));
 +      long startVersion = deltaLog.update(false, Option.empty(), Option.empty()).version();
 +      long preChangeRowCount = 2L;
 +
 +      // Apply non-additive schema change BEFORE stream construction.
-+      scenarioSetup.setup(tableName, tempDir);
++      scenarioSetup.setup(testTableName, tempDir);
 +      long schemaChangeVersion = deltaLog.update(false, Option.empty(), Option.empty()).version();
 +
 +      Configuration hadoopConf = new Configuration();
 +      PathBasedSnapshotManager snapshotManager =
-+          new PathBasedSnapshotManager(tablePath, hadoopConf);
++          new PathBasedSnapshotManager(testTablePath, hadoopConf);
 +      String schemaTrackingLocation = new File(tempDir, "schema_tracking").getAbsolutePath();
 +      String checkpointLocation = new File(tempDir, "checkpoint").getAbsolutePath();
 +      java.util.Map<String, String> optionMap =
@@ -196,69 +227,93 @@
 +          io.delta.spark.internal.v2.utils.SchemaUtils.convertKernelSchemaToSparkSchema(
 +              snapshotManager.loadLatestSnapshot().getSchema());
 +
-+      // Stream 1: post-change schema (analysis would bind to the latest snapshot since the log
++      // Round 1: post-change schema (analysis would bind to the latest snapshot since the log
 +      // is empty). First latestOffset runs eager-init and throws.
-+      SparkMicroBatchStream stream1 =
++      SparkMicroBatchStream streamForEagerInit =
 +          createSchemaTrackingTestStream(
 +              snapshotManager,
 +              hadoopConf,
 +              options,
-+              tablePath,
++              testTablePath,
 +              postChangeSchema,
 +              Option.apply(trackingLog),
 +              checkpointLocation);
-+      Offset start1 = stream1.initialOffset();
++      Offset eagerInitStartOffset = streamForEagerInit.initialOffset();
 +      DeltaRuntimeException initEx =
 +          assertThrows(
 +              DeltaRuntimeException.class,
-+              () -> stream1.latestOffset(start1, ReadLimit.allAvailable()));
++              () ->
++                  streamForEagerInit.latestOffset(eagerInitStartOffset, ReadLimit.allAvailable()));
 +      assertMetadataEvolutionException(initEx, "during backfill log initialization");
 +
 +      PersistedMetadata afterInit = trackingLog.getCurrentTrackedMetadata().get();
 +      assertEquals(startVersion, afterInit.deltaCommitVersion());
 +
-+      // Stream 2: pre-change schema (the log entry now carries it).
-+      SparkMicroBatchStream stream2 =
++      // Round 2: pre-change schema (the log entry now carries it).
++      SparkMicroBatchStream streamForBarrier =
 +          createSchemaTrackingTestStream(
 +              snapshotManager,
 +              hadoopConf,
 +              options,
-+              tablePath,
++              testTablePath,
 +              preChangeSchema,
 +              Option.apply(trackingLog),
 +              checkpointLocation);
-+      Offset start2 = stream2.initialOffset();
-+      Offset barrierOffset = stream2.latestOffset(start2, ReadLimit.allAvailable());
++      Offset barrierStartOffset = streamForBarrier.initialOffset();
++      DeltaSourceOffset barrierStartDelta = (DeltaSourceOffset) barrierStartOffset;
++      assertEquals(startVersion, barrierStartDelta.reservoirVersion());
++      assertEquals(DeltaSourceOffset.BASE_INDEX(), barrierStartDelta.index());
++      assertFalse(barrierStartDelta.isInitialSnapshot());
++
++      Offset barrierOffset =
++          streamForBarrier.latestOffset(barrierStartOffset, ReadLimit.allAvailable());
 +      DeltaSourceOffset barrierDelta = (DeltaSourceOffset) barrierOffset;
 +      assertEquals(schemaChangeVersion, barrierDelta.reservoirVersion());
 +      assertEquals(DeltaSourceOffset.METADATA_CHANGE_INDEX(), barrierDelta.index());
 +
-+      assertEquals(preChangeRowCount, countRowsBetweenOffsets(stream2, start2, barrierOffset));
++      assertEquals(
++          preChangeRowCount,
++          countRowsBetweenOffsets(streamForBarrier, barrierStartOffset, barrierOffset));
 +
 +      DeltaRuntimeException barrierEx =
-+          assertThrows(DeltaRuntimeException.class, () -> stream2.commit(barrierOffset));
++          assertThrows(DeltaRuntimeException.class, () -> streamForBarrier.commit(barrierOffset));
 +      assertMetadataEvolutionException(barrierEx, "on barrier commit (backfill)");
 +
 +      PersistedMetadata evolved = trackingLog.getCurrentTrackedMetadata().get();
 +      assertEquals(schemaChangeVersion, evolved.deltaCommitVersion());
 +      assertPostChangeSchema.accept(evolved.dataSchema());
 +
-+      // Stream 3: post-change schema. Advance barrier → POST_BARRIER, commit succeeds.
-+      SparkMicroBatchStream stream3 =
++      // Round 3: post-change schema. Advance barrier → POST_BARRIER, commit succeeds.
++      SparkMicroBatchStream streamForPostBarrier =
 +          createSchemaTrackingTestStream(
 +              snapshotManager,
 +              hadoopConf,
 +              options,
-+              tablePath,
++              testTablePath,
 +              postChangeSchema,
 +              Option.apply(trackingLog),
 +              checkpointLocation);
-+      Offset postBarrierOffset = stream3.latestOffset(barrierOffset, ReadLimit.allAvailable());
++      Offset postBarrierOffset =
++          streamForPostBarrier.latestOffset(barrierOffset, ReadLimit.allAvailable());
 +      DeltaSourceOffset postBarrierDelta = (DeltaSourceOffset) postBarrierOffset;
 +      assertEquals(schemaChangeVersion, postBarrierDelta.reservoirVersion());
 +      assertEquals(DeltaSourceOffset.POST_METADATA_CHANGE_INDEX(), postBarrierDelta.index());
-+      assertEquals(0L, countRowsBetweenOffsets(stream3, barrierOffset, postBarrierOffset));
-+      assertDoesNotThrow(() -> stream3.commit(postBarrierOffset));
++      assertEquals(
++          0L, countRowsBetweenOffsets(streamForPostBarrier, barrierOffset, postBarrierOffset));
++      assertDoesNotThrow(() -> streamForPostBarrier.commit(postBarrierOffset));
++
++      // Successful commit must leave the evolved entry in place — not roll back or double-evolve.
++      PersistedMetadata afterPostBarrierCommit = trackingLog.getCurrentTrackedMetadata().get();
++      assertEquals(schemaChangeVersion, afterPostBarrierCommit.deltaCommitVersion());
++      assertPostChangeSchema.accept(afterPostBarrierCommit.dataSchema());
++
++      // Insert a row shaped to the post-change schema and read it through the evolved stream.
++      insertPostChangeRow.setup(testTableName, tempDir);
++      Offset postInsertOffset =
++          streamForPostBarrier.latestOffset(postBarrierOffset, ReadLimit.allAvailable());
++      assertEquals(
++          1L, countRowsBetweenOffsets(streamForPostBarrier, postBarrierOffset, postInsertOffset));
++      assertDoesNotThrow(() -> streamForPostBarrier.commit(postInsertOffset));
 +    } finally {
 +      sparkConf.keySet().forEach(k -> spark.conf().unset(k));
 +    }
@@ -267,135 +322,202 @@
    /** Provides test scenarios that generate additive schema changes actions. */
    private static Stream<Arguments> additiveSchemaEvolutionScenarios() {
      return Stream.of(
-                 "false"),
-             "Drop column in nested struct"),
+             "Widen INT column to BIGINT"));
+   }
  
-+        // Rename column in nested struct
-+        Arguments.of(
-+            (ScenarioSetup)
-+                (tableName, tempDir) -> {
-+                  sql("ALTER TABLE %s RENAME COLUMN info.col1 TO newCol1", tableName);
-+                },
-+            /* sparkConf */ Map.of(
-+                DeltaSQLConf
-+                    .DELTA_STREAMING_UNSAFE_READ_ON_INCOMPATIBLE_COLUMN_MAPPING_SCHEMA_CHANGES()
-+                    .key(),
-+                "false"),
-+            "Rename column in nested struct"),
-+
-         // Widen INT column to BIGINT
-         Arguments.of(
-             (ScenarioSetup)
-             // non-additive
-             /* sparkConf */ Map.of(
-                 DeltaSQLConf.DELTA_TYPE_WIDENING_ENABLE_STREAMING_SCHEMA_TRACKING().key(), "true"),
--            "Widen INT column to BIGINT"));
-+            "Widen INT column to BIGINT"),
+-  /** Provides test scenarios that generate non-additive schema changes actions. */
+-  private static Stream<Arguments> nonAdditiveSchemaEvolutionScenarios() {
+-    return Stream.of(
+-        // Rename column
+-        Arguments.of(
+-            (ScenarioSetup)
+-                (tableName, tempDir) -> {
+-                  sql("ALTER TABLE %s RENAME COLUMN id TO userId", tableName);
+-                },
+-            /* sparkConf */ Map.of(),
+-            "Rename column"),
++  /**
++   * Single source of truth for non-additive schema-evolution scenarios. Each scenario carries:
++   *
++   * <ul>
++   *   <li>{@code change} — the non-additive ALTER applied to the fixture
++   *   <li>{@code sparkConf} — feature flags required for the change to be classified as
++   *       non-additive (column-mapping unsafe-read flag, type-widening tracking flag, etc.)
++   *   <li>{@code insertPostChangeRow} — INSERT shaped to the post-change schema; used by lifecycle
++   *       tests to verify the evolved stream can read freshly-written data
++   *   <li>{@code assertPostChangeSchema} — predicate over the persisted schema after the evolve;
++   *       used by lifecycle tests
++   *   <li>{@code description} — human-readable label used to disambiguate temp tables / failures
++   * </ul>
++   *
++   * Throws-error tests project {@code (change, sparkConf, description)}; lifecycle tests use the
++   * full tuple. Pre-change schema is always the fixture's schema, so it doesn't need to be
++   * parameterized.
++   */
++  private static final class NonAdditiveScenario {
++    final ScenarioSetup change;
++    final Map<String, String> sparkConf;
++    final ScenarioSetup insertPostChangeRow;
++    final Consumer<StructType> assertPostChangeSchema;
++    final String description;
 +
-+        // Widen INT column to BIGINT in nested struct
-+        Arguments.of(
-+            (ScenarioSetup)
-+                (tableName, tempDir) -> {
-+                  sql("ALTER TABLE %s ALTER COLUMN info.col1 TYPE BIGINT", tableName);
-+                },
-+            /* sparkConf */ Map.of(
-+                DeltaSQLConf.DELTA_TYPE_WIDENING_ENABLE_STREAMING_SCHEMA_TRACKING().key(), "true"),
-+            "Widen INT column to BIGINT in nested struct"));
++    NonAdditiveScenario(
++        ScenarioSetup change,
++        Map<String, String> sparkConf,
++        ScenarioSetup insertPostChangeRow,
++        Consumer<StructType> assertPostChangeSchema,
++        String description) {
++      this.change = change;
++      this.sparkConf = sparkConf;
++      this.insertPostChangeRow = insertPostChangeRow;
++      this.assertPostChangeSchema = assertPostChangeSchema;
++      this.description = description;
++    }
 +  }
+ 
+-        // Drop nullable, non-nullable and struct columns
+-        Arguments.of(
+-            (ScenarioSetup)
+-                (tableName, tempDir) -> {
+-                  sql("ALTER TABLE %s DROP COLUMNS (id, value, info)", tableName);
+-                },
+-            /* sparkConf */ Map.of(
+-                DeltaSQLConf
+-                    .DELTA_STREAMING_UNSAFE_READ_ON_INCOMPATIBLE_COLUMN_MAPPING_SCHEMA_CHANGES()
+-                    .key(),
+-                "false"),
++  private static List<NonAdditiveScenario> nonAdditiveScenarios() {
++    Map<String, String> unsafeReadFalse =
++        Map.of(
++            DeltaSQLConf.DELTA_STREAMING_UNSAFE_READ_ON_INCOMPATIBLE_COLUMN_MAPPING_SCHEMA_CHANGES()
++                .key(),
++            "false");
++    Map<String, String> typeWideningTracking =
++        Map.of(DeltaSQLConf.DELTA_TYPE_WIDENING_ENABLE_STREAMING_SCHEMA_TRACKING().key(), "true");
 +
-+  /**
-+   * Scenarios for the schema-tracking lifecycle tests. Each scenario carries a per-shape predicate
-+   * that asserts the post-change schema persisted in the tracking log after the barrier commit.
-+   * Pre-change schema is always the fixture's schema, so it doesn't need to be parameterized.
-+   */
-+  private static Stream<Arguments> nonAdditiveSchemaEvolutionLifecycleScenarios() {
-+    return Stream.of(
-+        // Drop column
-+        Arguments.of(
-+            (ScenarioSetup)
-+                (tableName, tempDir) -> sql("ALTER TABLE %s DROP COLUMNS (value)", tableName),
-+            /* sparkConf */ Map.<String, String>of(),
-+            (Consumer<StructType>)
-+                schema -> assertThat(Arrays.asList(schema.fieldNames())).doesNotContain("value"),
++    return List.of(
++        new NonAdditiveScenario(
++            (tableName, tempDir) -> sql("ALTER TABLE %s DROP COLUMNS (value)", tableName),
++            unsafeReadFalse,
++            (tableName, tempDir) ->
++                sql(
++                    "INSERT INTO %s VALUES (3, 'Carol', named_struct('col1', 50, 'col2', 'SF'))",
++                    tableName),
++            schema -> assertThat(Arrays.asList(schema.fieldNames())).doesNotContain("value"),
 +            "Drop column"),
-+
-+        // Drop nullable, non-nullable and struct columns (only `name` remains)
-+        Arguments.of(
-+            (ScenarioSetup)
-+                (tableName, tempDir) ->
-+                    sql("ALTER TABLE %s DROP COLUMNS (id, value, info)", tableName),
-+            /* sparkConf */ Map.of(
-+                DeltaSQLConf
-+                    .DELTA_STREAMING_UNSAFE_READ_ON_INCOMPATIBLE_COLUMN_MAPPING_SCHEMA_CHANGES()
-+                    .key(),
-+                "false"),
-+            (Consumer<StructType>)
-+                schema -> assertThat(Arrays.asList(schema.fieldNames())).containsExactly("name"),
-+            "Drop nullable, non-nullable and struct columns"),
-+
-+        // Rename top-level column
-+        Arguments.of(
-+            (ScenarioSetup)
-+                (tableName, tempDir) -> sql("ALTER TABLE %s RENAME COLUMN id TO userId", tableName),
-+            /* sparkConf */ Map.<String, String>of(),
-+            (Consumer<StructType>)
-+                schema ->
-+                    assertThat(Arrays.asList(schema.fieldNames()))
-+                        .contains("userId")
-+                        .doesNotContain("id"),
++        new NonAdditiveScenario(
++            (tableName, tempDir) -> sql("ALTER TABLE %s DROP COLUMNS (id, value, info)", tableName),
++            unsafeReadFalse,
++            (tableName, tempDir) -> sql("INSERT INTO %s VALUES ('Carol')", tableName),
++            schema -> assertThat(Arrays.asList(schema.fieldNames())).containsExactly("name"),
+             "Drop nullable, non-nullable and struct columns"),
+-
+-        // Drop column in nested struct
+-        Arguments.of(
+-            (ScenarioSetup)
+-                (tableName, tempDir) -> {
+-                  sql("ALTER TABLE %s DROP COLUMNS info.col1", tableName);
+-                },
+-            /* sparkConf */ Map.of(
+-                DeltaSQLConf
+-                    .DELTA_STREAMING_UNSAFE_READ_ON_INCOMPATIBLE_COLUMN_MAPPING_SCHEMA_CHANGES()
+-                    .key(),
+-                "false"),
++        new NonAdditiveScenario(
++            (tableName, tempDir) -> sql("ALTER TABLE %s RENAME COLUMN id TO userId", tableName),
++            Map.of(),
++            (tableName, tempDir) ->
++                sql(
++                    "INSERT INTO %s VALUES (3, 'Carol', 50.0, "
++                        + "named_struct('col1', 50, 'col2', 'SF'))",
++                    tableName),
++            schema ->
++                assertThat(Arrays.asList(schema.fieldNames()))
++                    .contains("userId")
++                    .doesNotContain("id"),
 +            "Rename column"),
-+
-+        // Drop column in nested struct
-+        Arguments.of(
-+            (ScenarioSetup)
-+                (tableName, tempDir) -> sql("ALTER TABLE %s DROP COLUMNS info.col1", tableName),
-+            /* sparkConf */ Map.<String, String>of(),
-+            (Consumer<StructType>)
-+                schema -> {
-+                  StructType info = (StructType) schema.apply("info").dataType();
-+                  assertThat(Arrays.asList(info.fieldNames())).doesNotContain("col1");
-+                },
-+            "Drop column in nested struct"),
-+
-+        // Rename column in nested struct
-+        Arguments.of(
-+            (ScenarioSetup)
-+                (tableName, tempDir) ->
-+                    sql("ALTER TABLE %s RENAME COLUMN info.col1 TO newCol1", tableName),
-+            /* sparkConf */ Map.<String, String>of(),
-+            (Consumer<StructType>)
-+                schema -> {
-+                  StructType info = (StructType) schema.apply("info").dataType();
-+                  assertThat(Arrays.asList(info.fieldNames()))
-+                      .contains("newCol1")
-+                      .doesNotContain("col1");
-+                },
++        new NonAdditiveScenario(
++            (tableName, tempDir) -> sql("ALTER TABLE %s DROP COLUMNS info.col1", tableName),
++            unsafeReadFalse,
++            (tableName, tempDir) ->
++                sql(
++                    "INSERT INTO %s VALUES (3, 'Carol', 50.0, named_struct('col2', 'SF'))",
++                    tableName),
++            schema -> {
++              StructType info = (StructType) schema.apply("info").dataType();
++              assertThat(Arrays.asList(info.fieldNames())).doesNotContain("col1");
++            },
+             "Drop column in nested struct"),
++        new NonAdditiveScenario(
++            (tableName, tempDir) ->
++                sql("ALTER TABLE %s RENAME COLUMN info.col1 TO newCol1", tableName),
++            unsafeReadFalse,
++            (tableName, tempDir) ->
++                sql(
++                    "INSERT INTO %s VALUES (3, 'Carol', 50.0, "
++                        + "named_struct('newCol1', 50, 'col2', 'SF'))",
++                    tableName),
++            schema -> {
++              StructType info = (StructType) schema.apply("info").dataType();
++              assertThat(Arrays.asList(info.fieldNames()))
++                  .contains("newCol1")
++                  .doesNotContain("col1");
++            },
 +            "Rename column in nested struct"),
-+
-+        // Widen INT column to BIGINT
-+        Arguments.of(
-+            (ScenarioSetup)
-+                (tableName, tempDir) ->
-+                    sql("ALTER TABLE %s ALTER COLUMN id TYPE BIGINT", tableName),
-+            /* sparkConf */ Map.of(
-+                DeltaSQLConf.DELTA_TYPE_WIDENING_ENABLE_STREAMING_SCHEMA_TRACKING().key(), "true"),
-+            (Consumer<StructType>)
-+                schema -> assertEquals(DataTypes.LongType, schema.apply("id").dataType()),
++        new NonAdditiveScenario(
++            (tableName, tempDir) -> sql("ALTER TABLE %s ALTER COLUMN id TYPE BIGINT", tableName),
++            typeWideningTracking,
++            (tableName, tempDir) ->
++                sql(
++                    "INSERT INTO %s VALUES (3, 'Carol', 50.0, "
++                        + "named_struct('col1', 50, 'col2', 'SF'))",
++                    tableName),
++            schema -> assertEquals(DataTypes.LongType, schema.apply("id").dataType()),
 +            "Widen INT column to BIGINT"),
++        new NonAdditiveScenario(
++            (tableName, tempDir) ->
++                sql("ALTER TABLE %s ALTER COLUMN info.col1 TYPE BIGINT", tableName),
++            typeWideningTracking,
++            (tableName, tempDir) ->
++                sql(
++                    "INSERT INTO %s VALUES (3, 'Carol', 50.0, "
++                        + "named_struct('col1', 50, 'col2', 'SF'))",
++                    tableName),
++            schema -> {
++              StructType info = (StructType) schema.apply("info").dataType();
++              assertEquals(DataTypes.LongType, info.apply("col1").dataType());
++            },
++            "Widen INT column to BIGINT in nested struct"));
++  }
+ 
+-        // Widen INT column to BIGINT
+-        Arguments.of(
+-            (ScenarioSetup)
+-                (tableName, tempDir) -> {
+-                  sql("ALTER TABLE %s ALTER COLUMN id TYPE BIGINT", tableName);
+-                },
+-            // Set enableSchemaTrackingForTypeWidening to be true to treat widening type changes as
+-            // non-additive
+-            /* sparkConf */ Map.of(
+-                DeltaSQLConf.DELTA_TYPE_WIDENING_ENABLE_STREAMING_SCHEMA_TRACKING().key(), "true"),
+-            "Widen INT column to BIGINT"));
++  /** Provides test scenarios that generate non-additive schema changes actions. */
++  private static Stream<Arguments> nonAdditiveSchemaEvolutionScenarios() {
++    return nonAdditiveScenarios().stream()
++        .map(s -> Arguments.of(s.change, s.sparkConf, s.description));
++  }
 +
-+        // Widen INT column to BIGINT in nested struct
-+        Arguments.of(
-+            (ScenarioSetup)
-+                (tableName, tempDir) ->
-+                    sql("ALTER TABLE %s ALTER COLUMN info.col1 TYPE BIGINT", tableName),
-+            /* sparkConf */ Map.of(
-+                DeltaSQLConf.DELTA_TYPE_WIDENING_ENABLE_STREAMING_SCHEMA_TRACKING().key(), "true"),
-+            (Consumer<StructType>)
-+                schema -> {
-+                  StructType info = (StructType) schema.apply("info").dataType();
-+                  assertEquals(DataTypes.LongType, info.apply("col1").dataType());
-+                },
-+            "Widen INT column to BIGINT in nested struct"));
++  /** Same scenarios as {@link #nonAdditiveSchemaEvolutionScenarios}, plus lifecycle-only fields. */
++  private static Stream<Arguments> nonAdditiveSchemaEvolutionLifecycleScenarios() {
++    return nonAdditiveScenarios().stream()
++        .map(
++            s ->
++                Arguments.of(
++                    s.change,
++                    s.sparkConf,
++                    s.insertPostChangeRow,
++                    s.assertPostChangeSchema,
++                    s.description));
    }
  
    // ================================================================================================

Reproduce locally: git range-diff c025b7c..0148020 c025b7c..db16b9f | Disable: git config gitstack.push-range-diff false

TimothyW553 pushed a commit that referenced this pull request May 5, 2026
## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/6546/files) to
review incremental changes.
-
[**stack/SparkMetadataAdapter**](#6546)
[[Files changed](https://github.com/delta-io/delta/pull/6546/files)]
-
[stack/RefactorMetadataTrackingLog](#6550)
[[Files
changed](https://github.com/delta-io/delta/pull/6550/files/9271a6262f7a2615b977de0319c7238044b7d0a9..8378d33acda70a34a109b35173a968a4b3401ec1)]
-
[stack/RefactorDeltaSourceMetadataEvolutionSupport](#6562)
[[Files
changed](https://github.com/delta-io/delta/pull/6562/files/8378d33acda70a34a109b35173a968a4b3401ec1..90365431b12640de181446ec9c2033fb1b143b03)]
-
[stack/MetadataEvolutionHandler2](#6563)
[[Files
changed](https://github.com/delta-io/delta/pull/6563/files/28bb7021adb12b055e1b281fdfee0ab48a8732ac..578870181fa81a9146b2fa907244e350ffcabb52)]
-
[stack/NonAdditiveSchemaEvolution2](#6570)
[[Files
changed](https://github.com/delta-io/delta/pull/6570/files/578870181fa81a9146b2fa907244e350ffcabb52..c025b7c3c386e8d46d6142d0727dce95582bb0ef)]
-
[stack/NonAdditiveSchemaEvolution3](#6697)
[[Files
changed](https://github.com/delta-io/delta/pull/6697/files/c025b7c3c386e8d46d6142d0727dce95582bb0ef..db16b9fa80a80c105430c93589126ba8b828458f)]
-
[stack/consecutiveSchemaChangesMerger](#6698)
[[Files
changed](https://github.com/delta-io/delta/pull/6698/files/0148020ffe11e7b079e99fa8c5189a19c354f2be..9a360aa819f20d78b5361b2e997d24433fb793d5)]

---------
#### Which Delta project/connector is this regarding?

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

PR 1/7 in the non-additive schema evolution for V2 streaming connector
stack.

The shared V1 Scala utilities (`DeltaColumnMapping`,
`DeltaSourceMetadataEvolutionSupport`) operate on
`AbstractMetadata`/`AbstractProtocol`, but V2 holds Kernel types. This
PR creates two adapter classes that bridge the gap:

- `KernelMetadataAdapter`: Kernel `Metadata` → `AbstractMetadata`
(schema conversion via `SchemaUtils`, partition columns and
configuration converted to Scala collections)
- `KernelProtocolAdapter`: Kernel `Protocol` → `AbstractProtocol` (maps
reader/writer features to `Option[Set[String]]`)

Also adds `columnMappingMode` and `partitionSchema` to the
`AbstractMetadata` trait — V1's `Metadata` already had these fields, the
trait just didn't expose them.

## How was this patch tested?

Unit tests in `ActionAdaptersTest.java`: table-features protocol, legacy
protocol, full metadata round-trip, null optional fields, and null
constructor rejection.

## Does this PR introduce _any_ user-facing changes?

No.
@PorridgeSwim PorridgeSwim force-pushed the stack/NonAdditiveSchemaEvolution3 branch from db16b9f to ea7cfeb Compare May 6, 2026 00:04
@PorridgeSwim
Copy link
Copy Markdown
Collaborator Author

Range-diff: stack/NonAdditiveSchemaEvolution2 (db16b9f -> ea7cfeb)
spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaV2SourceSchemaEvolutionSuite.scala
@@ -7,7 +7,7 @@
 -  // TODO(#5319): Move tests to shouldPassTests as V2 schema tracking log support is implemented.c
    override protected def shouldPassTests: Set[String] = Set(
      // ========== Schema log unit test ==========
-     "detects metadata/protocol changes through the AbstractMetadata/AbstractProtocol surface",
+     "schema location not under checkpoint",
      "forward-compat: older version can read back newer JSON",
  
      // ========== Schema log core ==========
spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkScan.java
@@ -37,8 +37,8 @@
  
    private final DeltaSnapshotManager snapshotManager;
    private final Snapshot initialSnapshot;
-     DeltaOptions deltaOptions = new DeltaOptions(scalaOptions, sqlConf);
-     // Validate streaming options immediately after constructing DeltaOptions
+   @Override
+   public MicroBatchStream toMicroBatchStream(String checkpointLocation) {
      validateStreamingOptions(deltaOptions);
 +
 +    // Loads a fresh snapshot as the baseline for schema change detection and table identity

Reproduce locally: git range-diff c025b7c..db16b9f 1888b50..ea7cfeb | Disable: git config gitstack.push-range-diff false

@PorridgeSwim PorridgeSwim force-pushed the stack/NonAdditiveSchemaEvolution3 branch 2 times, most recently from 6854223 to 13395a7 Compare May 6, 2026 01:34
@PorridgeSwim
Copy link
Copy Markdown
Collaborator Author

Range-diff: stack/NonAdditiveSchemaEvolution2 (6854223 -> 13395a7)
spark/v2/src/test/java/io/delta/spark/internal/v2/read/SparkMicroBatchStreamTest.java
@@ -583,7 +583,7 @@
 +        ScalaUtils.toScalaMap(optionMap),
 +        Option.apply(checkpointLocation),
 +        /* mergeConsecutiveSchemaChanges= */ false,
-+        /* consecutiveSchemaChangesMerger= */ null,
++        /* consecutiveSchemaChangesMerger= */ Option.empty(),
 +        /* initMetadataLogEagerly= */ true);
 +  }
 +

Reproduce locally: git range-diff 6430dd8..6854223 476762f..13395a7 | Disable: git config gitstack.push-range-diff false

TimothyW553 pushed a commit that referenced this pull request May 6, 2026
…#6550)

## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/6550/files) to
review incremental changes.
-
[stack/SparkMetadataAdapter](#6546)
[[Files changed](https://github.com/delta-io/delta/pull/6546/files)]
[MERGED]
-
[**stack/RefactorMetadataTrackingLog**](#6550)
[[Files changed](https://github.com/delta-io/delta/pull/6550/files)]
-
[stack/RefactorDeltaSourceMetadataEvolutionSupport](#6562)
[[Files
changed](https://github.com/delta-io/delta/pull/6562/files/953f137f8c4ce46d8b8a9605b0c7bed898e30df4..027984b6edcbad0f4731e560425c2ed9bcf8fc27)]
-
[stack/MetadataEvolutionHandler2](#6563)
[[Files
changed](https://github.com/delta-io/delta/pull/6563/files/027984b6edcbad0f4731e560425c2ed9bcf8fc27..ada845895139edcb2727a87b39922c8e16837a99)]
-
[stack/NonAdditiveSchemaEvolution2](#6570)
[[Files
changed](https://github.com/delta-io/delta/pull/6570/files/ada845895139edcb2727a87b39922c8e16837a99..476762fde7b9cb9b9bc3e416c86a260cd29806ed)]
-
[stack/NonAdditiveSchemaEvolution3](#6697)
[[Files
changed](https://github.com/delta-io/delta/pull/6697/files/476762fde7b9cb9b9bc3e416c86a260cd29806ed..13395a7f2a49db4962091e8ee919bebdab5bd4e2)]
-
[stack/consecutiveSchemaChangesMerger](#6698)
[[Files
changed](https://github.com/delta-io/delta/pull/6698/files/13395a7f2a49db4962091e8ee919bebdab5bd4e2..f22ba063eaf35ab69d653a2d5faefdc52f35eab5)]

---------
#### Which Delta project/connector is this regarding?

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

PR 2/7 in the non-additive schema evolution for V2 streaming connector
stack.

Decouple `DeltaSourceMetadataTrackingLog` and `PersistedMetadata` from
V1-specific types so the schema log can be reused by the V2 connector.

- Replace `SnapshotDescriptor` parameter in `create()` with plain
`sourceTableId` and `sourceDataPath` strings
- Unify `PersistedMetadata.apply` to accept
`AbstractMetadata`/`AbstractProtocol` instead of V1
`Metadata`/`Protocol`
- Extract the consecutive schema changes merger (V1-specific, depends on
`DeltaLog`) out of the companion object into
`DeltaSourceMetadataEvolutionSupport`, and inject it as a function
parameter so V2 can provide its own implementation
- Remove `Protocol`'s `private` constructor modifier to allow
construction from abstract protocol fields

All changes are structural refactors with no behavioral change.

## How was this patch tested?

Existing tests in `DeltaSourceSchemaEvolutionSuite` updated to use the
new API. No behavioral changes.

## Does this PR introduce _any_ user-facing changes?

No.
@PorridgeSwim PorridgeSwim force-pushed the stack/NonAdditiveSchemaEvolution3 branch 2 times, most recently from 9a0ad72 to 14956ea Compare May 6, 2026 17:56
murali-db pushed a commit that referenced this pull request May 6, 2026
…seable in v2 (#6562)

## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/6562/files) to
review incremental changes.
-
[stack/SparkMetadataAdapter](#6546)
[[Files changed](https://github.com/delta-io/delta/pull/6546/files)]
[MERGED]
-
[stack/RefactorMetadataTrackingLog](#6550)
[[Files changed](https://github.com/delta-io/delta/pull/6550/files)]
[MERGED]
-
[**stack/RefactorDeltaSourceMetadataEvolutionSupport**](#6562)
[[Files changed](https://github.com/delta-io/delta/pull/6562/files)]
-
[stack/MetadataEvolutionHandler2](#6563)
[[Files
changed](https://github.com/delta-io/delta/pull/6563/files/ed92a0fa2051432b6bc5784034df0b7949bbfb98..e5b2c3295843ec85753e07dc0010aa5ccebaabb7)]
-
[stack/NonAdditiveSchemaEvolution2](#6570)
[[Files
changed](https://github.com/delta-io/delta/pull/6570/files/e5b2c3295843ec85753e07dc0010aa5ccebaabb7..7c66bf11a0f1b651cda32ed7f529f552dd9dbfcb)]
-
[stack/NonAdditiveSchemaEvolution3](#6697)
[[Files
changed](https://github.com/delta-io/delta/pull/6697/files/7c66bf11a0f1b651cda32ed7f529f552dd9dbfcb..14956ea304c93d2343ccd7eb89a112966f07f906)]
-
[stack/consecutiveSchemaChangesMerger](#6698)
[[Files
changed](https://github.com/delta-io/delta/pull/6698/files/14956ea304c93d2343ccd7eb89a112966f07f906..8101b335b892a6a5b6d6fe11f4a202d14102721c)]

---------
#### Which Delta project/connector is this regarding?

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

PR 3/7 in the non-additive schema evolution for V2 streaming connector
stack.

Refactor `DeltaSourceMetadataEvolutionSupport` and `DeltaColumnMapping`
so the schema change detection logic can be called from V2 without
depending on V1 instance state.

**`DeltaSourceMetadataEvolutionSupport`:**
- Extract instance methods (`validateAndResolveMetadataEvolution`,
`checkColumnMappingSchemaChangesDuringStreaming`,
`resolveMetadataEvolutionForCommitRange`, etc.) to companion object
statics that accept explicit parameters instead of accessing V1
`DeltaSource` via `this`
- V1 trait methods now delegate to the companion object statics

**`DeltaColumnMapping`:**
- Widen `hasNoColumnMappingSchemaChanges` from V1 `Metadata` to
`AbstractMetadata` so V2 can call it via the adapter layer
- Extract `assignColumnIdAndPhysicalNameToSchema(StructType, Map)` from
`assignColumnIdAndPhysicalName(Metadata, Metadata, ...)` — needed for
simulating column mapping upgrades during NoMapping-to-NameMapping
transitions

All changes are structural refactors with no behavioral change.

## How was this patch tested?

Existing tests in `DeltaSourceSchemaEvolutionSuite` continue to pass. No
behavioral changes.

## Does this PR introduce _any_ user-facing changes?

No.
@PorridgeSwim PorridgeSwim force-pushed the stack/NonAdditiveSchemaEvolution3 branch 2 times, most recently from ebcd911 to 73e1aa7 Compare May 6, 2026 23:16
isTriggerAvailableNow = true;
}

private SnapshotImpl buildReadSnapshotFromPersistedMetadata(PersistedMetadata customMetadata) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move this out of SparkMicroBatchStream. The class is already becoming complex, and this would better fit to a class dedicated to interacting with the schema tracking log

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to MetadataEvolutionHandler

Comment thread spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java Outdated
Comment thread spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java Outdated
if (endOffset.reservoirVersion() == version
&& endOffset.index() == DeltaSourceOffset.BASE_INDEX()) {
return false;
return new CommitValidationResult(false, null, null);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, you'll pass null, null to getMetadataOrProtocolChangeIndexedFileIterator, that doesn't seem particularly safe

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those nulls have a real meaning ("no metadata/protocol action in this commit") and getMetadataOrProtocolChangeIndexedFileIterator is already written to handle them. I annotated the chain (CommitValidationResult fields → getMetadataOrProtocolChangeIndexedFileIteratorhasMetadataOrProtocolChangeComparedToStreamMetadata)
with @Nullable so the contract is enforced by static analysis. Preferred over Optional<> since Optional as a field type is discouraged in Java

Comment thread spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java Outdated
Comment thread spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java Outdated
Comment on lines +1542 to +1546
shouldTrackSchema =
!DeltaColumnMapping$.MODULE$.hasNoColumnMappingSchemaChanges(
new KernelMetadataAdapter(newMetadata),
new KernelMetadataAdapter(oldMetadata),
schemaReadOptions.allowUnsafeStreamingReadOnPartitionColumnChanges());
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice 👍

Comment thread spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java Outdated
Comment on lines +255 to +259
// Loads a fresh snapshot as the baseline for schema change detection and table identity
// checks. SparkScan's initialSnapshot is from analysis time and may be stale by stream
// start/restart.
// Matches V1's DeltaDataSource.createSource() behavior.
Snapshot latestSnapshot = snapshotManager.loadLatestSnapshot();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recall raising something similar on one of @zikangh PRs but can't find it again.

I know we want to stay really close to V1 behavior, but reloading the snapshot mid-analysis/execution is something we should avoid in V2. Ideally we get a snapshot pinned during table resolution, used through analysis, and yes it may become stale. But that shouldn't be an issue because even here, the snapshot could become stale right after you load it.
Having a consistent snapshot throughout avoids a whole class of bugs and dedicated handling due to state going out of sync.

What's the issue here with the using the initialSnapshot?
It's not directly related to your change as you're just moving the call, but still something worth discussing

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The motivation for reloading was to match V1's behavior of letting users adopt additive schema changes without refreshing the DataFrame. But I agree — the DataFrame should stay pinned to its analysis-time snapshot, and requiring an explicit refresh to adopt schema changes is the more correct semantics. V1's behavior here isn't quite right.

Comment thread spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java Outdated
Comment on lines +552 to +566
} else {
long startVersionForMetadataLogInit;
if (previousOffset.index() == DeltaSourceOffset.BASE_INDEX()) {
startVersionForMetadataLogInit = previousOffset.reservoirVersion() - 1;
} else {
startVersionForMetadataLogInit = previousOffset.reservoirVersion();
}
if (metadataEvolutionHandler.shouldInitializeMetadataTrackingEagerly()) {
metadataEvolutionHandler.initializeMetadataTrackingAndExitStream(
startVersionForMetadataLogInit,
/* batchEndVersion= */ null,
/* alwaysFailUponLogInitialized= */ false);
}
checkReadIncompatibleSchemaChangeOnStreamStartOnce(startVersionForMetadataLogInit, null);
}
Copy link
Copy Markdown
Collaborator

@TimothyW553 TimothyW553 May 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

v1 does not eager-init in this else branch (only the first-batch path does) , i think we cn drop it.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a V1/V2 engine-behavior difference. On restart from a fully committed batch, DSv1's engine first calls getBatch on the previous batch (where V1 inits via validateAndInitMetadataLogForPlannedBatchesDuringStreamStart) before calling latestOffset for the next one. DSv2's engine skips that and calls latestOffset directly, so the init has to move here — otherwise the schema-tracking log stays uninitialized on restart for streams that picked it up after they were already running.

Comment on lines +381 to +391
return new SnapshotImpl(
snapshotAtSourceInit.getDataPath(),
customMetadata.deltaCommitVersion(),
snapshotAtSourceInit.getLazyLogSegment(),
logReplay,
readProtocol,
readMetadata,
snapshotAtSourceInit.getCommitter(),
SnapshotQueryContext.forVersionSnapshot(
snapshotAtSourceInit.getDataPath().toString(), customMetadata.deltaCommitVersion()),
Optional.empty() /* inCommitTimestampOpt */);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the version (customMetadata.deltaCommitVersion()) and LazyLogSegment (from snapshotAtSourceInit) come from different commits.

Copy link
Copy Markdown
Collaborator Author

@PorridgeSwim PorridgeSwim May 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — they're from different commits. V1 sidesteps this by exposing only version/metadata/protocol via SnapshotDescriptor; V2 has no equivalent in Kernel, and all current consumers only read dataPath/metadata/protocol/version — none touch the log segment, so the mismatch is inert today.
Aligning the log segment would cost an extra loadSnapshotAt(persistedVersion) at every stream init, which V1 doesn't pay. Instead I replaced lazyLogSegment and lazyCrcInfo with traps that throw on access, so any future log-replay read fails loudly rather than silently against the wrong version.

customMetadata.dataSchemaJson(),
SchemaUtils.convertSparkSchemaToKernelSchema(customMetadata.dataSchema()),
VectorUtils.buildArrayValue(
Arrays.asList(customMetadata.partitionSchema().fieldNames()), StringType.STRING),
Copy link
Copy Markdown
Collaborator

@TimothyW553 TimothyW553 May 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fieldNames() drops column-mapping field ids . could we pass the full partition schema.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no partitionSchema field in v2 Metadata.

Offset postInsertOffset =
streamPostChange.latestOffset(postBarrierOffset, ReadLimit.allAvailable());
assertEquals(
1L, countRowsBetweenOffsets(streamPostChange, postBarrierOffset, postInsertOffset));
Copy link
Copy Markdown
Collaborator

@TimothyW553 TimothyW553 May 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a wrong-schema bug would still match the row count. could we read one post-change column value and assert it.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to exact match

Comment on lines +278 to +281
this.readSchemaAtSourceInit =
Objects.requireNonNull(
SchemaUtils.convertKernelSchemaToSparkSchema(snapshotAtSourceInit.getSchema()),
"readSchemaAtSourceInit is null");
SchemaUtils.convertKernelSchemaToSparkSchema(readSnapshotAtSourceInit.getSchema());
this.readProtocolAtSourceInit = readSnapshotAtSourceInit.getProtocol();
this.readConfigurationsAtSourceInit = readSnapshotAtSourceInit.getMetadata().getConfiguration();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the previous requireNonNull on schema/protocol/configuration was dropped in the rewrite.

@PorridgeSwim PorridgeSwim mentioned this pull request May 10, 2026
murali-db pushed a commit that referenced this pull request May 11, 2026
## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/6563/files) to
review incremental changes.
-
[stack/SparkMetadataAdapter](#6546)
[[Files changed](https://github.com/delta-io/delta/pull/6546/files)]
[MERGED]
-
[stack/RefactorMetadataTrackingLog](#6550)
[[Files changed](https://github.com/delta-io/delta/pull/6550/files)]
[MERGED]
-
[stack/RefactorDeltaSourceMetadataEvolutionSupport](#6562)
[[Files changed](https://github.com/delta-io/delta/pull/6562/files)]
[MERGED]
-
[**stack/MetadataEvolutionHandler2**](#6563)
[[Files changed](https://github.com/delta-io/delta/pull/6563/files)]
-
[stack/NonAdditiveSchemaEvolution2](#6570)
[[Files
changed](https://github.com/delta-io/delta/pull/6570/files/a20f1f3ab452a75fc954e15c57c17327e0cb9267..0e07f87285becd6be416450ae084df454d9c94a9)]
-
[stack/NonAdditiveSchemaEvolution3](#6697)
[[Files
changed](https://github.com/delta-io/delta/pull/6697/files/0e07f87285becd6be416450ae084df454d9c94a9..73e1aa7f4162a3e1480ffd2b88b9ca79d852f2fe)]
-
[stack/consecutiveSchemaChangesMerger](#6698)
[[Files
changed](https://github.com/delta-io/delta/pull/6698/files/73e1aa7f4162a3e1480ffd2b88b9ca79d852f2fe..5e5d260b64d45cc11bcfdb58e5aab1b2d2637b33)]
- [stack/V1V2MixTest](#6759)
[[Files
changed](https://github.com/delta-io/delta/pull/6759/files/5e5d260b64d45cc11bcfdb58e5aab1b2d2637b33..738379713040986c74f98dbebfdc6c83ec1d3f16)]

---------
#### Which Delta project/connector is this regarding?

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

PR 4/7 in the non-additive schema evolution for V2 streaming connector
stack.

Introduce `MetadataEvolutionHandler`, a Java class that implements the
V1 barrier protocol for schema evolution in the V2 connector. In V1 this
logic lives in `DeltaSourceMetadataEvolutionSupport`, a Scala trait
mixed into `DeltaSource` that accesses stream state via `this`. Since
V2's `SparkMicroBatchStream` is Java and cannot use Scala trait mixins,
`MetadataEvolutionHandler` receives all dependencies via constructor
injection instead.

The handler covers the full schema evolution lifecycle:
- **Stream start**: eager metadata tracking log initialization on first
batch
- **Offset generation**: injects `METADATA_CHANGE_INDEX` /
`POST_METADATA_CHANGE_INDEX` barrier sentinels into the file change
iterator
- **Pending schema offsets**: returns barrier offsets for in-progress
schema changes
- **Batch commit**: updates the schema log and throws
`DELTA_STREAMING_METADATA_EVOLUTION` to trigger stream restart
- **Batch planning on restart**: validates and re-initializes the schema
log

All detection logic delegates to the shared
`DeltaSourceMetadataEvolutionSupport$` companion object statics
(refactored in PR 3/7). V2-specific orchestration is limited to wiring
the barrier protocol into the `CloseableIterator<IndexedFile>` pipeline
and collecting metadata/protocol from Kernel commit ranges via
`StreamingHelper`.

Also extends `StreamingHelper` with
`getMetadataAndProtocolForVersionRange` to collect metadata and protocol
actions from a range of Kernel commits.

## How was this patch tested?

Unit tests in `MetadataEvolutionHandlerTest.java` covering: barrier
protocol (METADATA_CHANGE_INDEX / POST_METADATA_CHANGE_INDEX offset
generation), tracking state transitions, initialization lifecycle,
offset arithmetic, pending schema change handling, and commit-time
evolution exception.

## Does this PR introduce _any_ user-facing changes?

No.
@PorridgeSwim PorridgeSwim force-pushed the stack/NonAdditiveSchemaEvolution3 branch 4 times, most recently from 9defa7b to efbe032 Compare May 14, 2026 21:26
Copy link
Copy Markdown
Collaborator

@TimothyW553 TimothyW553 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@PorridgeSwim PorridgeSwim force-pushed the stack/NonAdditiveSchemaEvolution3 branch 2 times, most recently from feb5e79 to c376c08 Compare May 15, 2026 05:45
@PorridgeSwim PorridgeSwim force-pushed the stack/NonAdditiveSchemaEvolution3 branch from c376c08 to dbb6246 Compare May 15, 2026 22:34
murali-db pushed a commit that referenced this pull request May 16, 2026
## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/6570/files) to
review incremental changes.
-
[stack/SparkMetadataAdapter](#6546)
[[Files changed](https://github.com/delta-io/delta/pull/6546/files)]
[MERGED]
-
[stack/RefactorMetadataTrackingLog](#6550)
[[Files changed](https://github.com/delta-io/delta/pull/6550/files)]
[MERGED]
-
[stack/RefactorDeltaSourceMetadataEvolutionSupport](#6562)
[[Files changed](https://github.com/delta-io/delta/pull/6562/files)]
[MERGED]
-
[stack/MetadataEvolutionHandler2](#6563)
[[Files changed](https://github.com/delta-io/delta/pull/6563/files)]
[MERGED]
-
[**stack/NonAdditiveSchemaEvolution2**](#6570)
[[Files changed](https://github.com/delta-io/delta/pull/6570/files)]
-
[stack/NonAdditiveSchemaEvolution3](#6697)
[[Files
changed](https://github.com/delta-io/delta/pull/6697/files/b7f6c8ebfc0882e7e2cc580f09f376be23a8d43d..dbb6246c14be1ab7f017ad9fc26455ae599ee676)]
-
[stack/consecutiveSchemaChangesMerger](#6698)
[[Files
changed](https://github.com/delta-io/delta/pull/6698/files/dbb6246c14be1ab7f017ad9fc26455ae599ee676..4bf2fa3fa828bcab0b56c4c26ca51ee9cc40b482)]
-
[stack/SchemaTrackingWithCDC](#6801)
[[Files
changed](https://github.com/delta-io/delta/pull/6801/files/4bf2fa3fa828bcab0b56c4c26ca51ee9cc40b482..a78a4ac2bc9a52605278a36b98804230258c12a2)]
- [stack/V1V2MixTest](#6759)
[[Files
changed](https://github.com/delta-io/delta/pull/6759/files/7f9b7f2724b2245ab7380908616303cf7ea95fca..e146cdc9ebb0572e8b0a928cc6dd3bfdc198d984)]

---------
#### Which Delta project/connector is this regarding?

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

PR 5/7 in the non-additive schema evolution for V2 streaming connector
stack.

Wire schema tracking into V2's analysis path so the analyzed plan
reflects the persisted (evolved) schema instead of the live snapshot
schema.

- `DeltaAnalysis.verifyDeltaSourceSchemaLocation`: extend the
duplicate-schema-location check to also visit `StreamingRelationV2`,
keyed on the V2 `Table.name`.
- `SparkTable`: open `DeltaSourceMetadataTrackingLog` once during
construction (gated on `mergeConsecutiveSchemaChanges`) and seed
`SchemaProvider` from the persisted metadata, so analysis-time
`schema()` matches what the stream will read at runtime.
- `ApplyV2ReadOptions` (renamed from `ApplyV2Streaming`): generalize the
CDC-only rebuild to also fire when `schemaTrackingLocation` arrives via
`extraOptions` on the catalog `readStream.table()` path; rebuild
`SparkTable` with merged options so the schema-log lookup actually
fires.
- `MetadataEvolutionHandler.getMetadataTrackingLogForMicroBatchStream`:
V2 port of V1's helper, reused by `SparkTable` (analysis) and
`SparkScan` (execution).

## How was this patch tested?

`SparkTableTest`, `MetadataEvolutionHandlerTest`,
`ApplyV2ReadOptionsSuite`. Unified `DeltaV2SourceSchemaEvolutionSuite`
updated.

## Does this PR introduce _any_ user-facing changes?

No.
@PorridgeSwim PorridgeSwim force-pushed the stack/NonAdditiveSchemaEvolution3 branch from dbb6246 to 5e57fc8 Compare May 16, 2026 00:18
DeltaStreamUtils.SchemaReadOptions$.MODULE$.fromSparkSession(
spark, isStreamingFromColumnMappingTable, isTypeWideningSupportedInProtocol),
"schemaReadOptions is null");
this.metadataEvolutionHandler =
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

V1 checks that require(options.failOnDataLoss to avoid having log retention affect schema evolution. should we add the same check?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, added the check and a test to verify it

@PorridgeSwim PorridgeSwim force-pushed the stack/NonAdditiveSchemaEvolution3 branch from 5e57fc8 to f96643a Compare May 16, 2026 06:54
@PorridgeSwim PorridgeSwim requested a review from zikangh May 16, 2026 07:17
@murali-db murali-db merged commit b5e5aec into delta-io:master May 16, 2026
31 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants