Skip to content

[kernel-spark] Implement consecutiveSchemaChangesMerger#6698

Open
PorridgeSwim wants to merge 2 commits into
delta-io:masterfrom
PorridgeSwim:stack/consecutiveSchemaChangesMerger
Open

[kernel-spark] Implement consecutiveSchemaChangesMerger#6698
PorridgeSwim wants to merge 2 commits into
delta-io:masterfrom
PorridgeSwim:stack/consecutiveSchemaChangesMerger

Conversation

@PorridgeSwim
Copy link
Copy Markdown
Collaborator

@PorridgeSwim PorridgeSwim commented May 1, 2026

🥞 Stacked PR

Use this link to review incremental changes.


Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

PR 7/7 in the non-additive schema evolution for V2 streaming connector stack.

Implement V2's consecutive-schema-changes merger so analysis-time evolution matches V1: runs of consecutive metadata-only commits collapse to a single tracked entry at the latest version. Without the merger, each metadata-only commit produces its own pending schema offset.

  • MetadataEvolutionHandler.getMergedConsecutiveMetadataChanges: V2 port of V1's DeltaSourceMetadataEvolutionSupport.getMergedConsecutiveMetadataChanges. Walks Kernel commits forward via CommitRangeImpl + StreamingHelper.getCommitActionsFromRangeUnsafe; for each commit detects file actions (ADD/REMOVE) and metadata/protocol actions; stops on the first commit with a file action or with neither metadata nor protocol; emits a merged PersistedMetadata at the latest metadata-only version.
  • MetadataEvolutionHandler.getMetadataTrackingLogForMicroBatchStream: pass the merger lambda into DeltaSourceMetadataTrackingLog.create (was a null placeholder).
  • DeltaSourceMetadataTrackingLog (V1): extract PersistedMetadata.toProtocolJson helper so V2's merger can reuse the same protocol-JSON encoding.

How was this patch tested?

MetadataEvolutionHandlerTest covers merger walk semantics — stop-on-file-action, stop-on-no-metadata-or-protocol, multiple folded changes, protocol-only and combined updates. DeltaSourceSchemaEvolutionSuite adds parallel V1 tests for the same scenarios. Unified DeltaV2SourceSchemaEvolutionSuite moves the remaining merger-dependent scenarios (consecutive schema evolutions, unblock with sql conf, streaming with a column mapping upgrade) from shouldFailTests to shouldPassTests.

Does this PR introduce any user-facing changes?

No.

@PorridgeSwim
Copy link
Copy Markdown
Collaborator Author

Range-diff: stack/NonAdditiveSchemaEvolution3 (68709cd -> 381ecd6)
spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaV2SourceSchemaEvolutionSuite.scala
@@ -2,13 +2,6 @@
 --- a/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaV2SourceSchemaEvolutionSuite.scala
 +++ b/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaV2SourceSchemaEvolutionSuite.scala
  
-   override protected def shouldPassTests: Set[String] = Set(
-     // ========== Schema log unit test ==========
-+    "detects metadata/protocol changes through the AbstractMetadata/AbstractProtocol surface",
-     "schema location not under checkpoint",
-     "schema location same as checkpoint",
-     "schema location using a different file system",
- 
      // ========== Schema evolution scenarios ==========
      "consecutive schema evolutions without schema merging",
 +    "consecutive schema evolutions",
spark-unified/src/test/scala/org/apache/spark/sql/delta/test/typewidening/TypeWideningStreamingV2SourceSuite.scala
@@ -1,15 +1,6 @@
 diff --git a/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/typewidening/TypeWideningStreamingV2SourceSuite.scala b/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/typewidening/TypeWideningStreamingV2SourceSuite.scala
 --- a/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/typewidening/TypeWideningStreamingV2SourceSuite.scala
 +++ b/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/typewidening/TypeWideningStreamingV2SourceSuite.scala
-     "type change - widen flatMap groups with state",
-     "widening type change then restore back",
-     "narrowing type changes are not supported",
--    "arbitrary type changes are not supported"
-+    "arbitrary type changes are not supported",
-+    "type change in delta source writing to a delta sink"
-   )
- 
-   // Failures that affect both the schema-tracking and non-schema-tracking suites.
        "unblocking stream with reader option after type change - unblock version",
        "overwrite schema with type change and dropped column",
        "disable schema tracking log using internal conf"
spark/v2/src/main/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandler.java
@@ -23,50 +23,6 @@
  import java.util.ArrayList;
  import java.util.Collections;
  import java.util.List;
- import org.apache.spark.sql.delta.sources.PersistedMetadata;
- import org.apache.spark.sql.delta.v2.interop.AbstractMetadata;
- import org.apache.spark.sql.delta.v2.interop.AbstractProtocol;
--import org.apache.spark.sql.types.StructType;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import scala.Option;
- 
-   // Read-time state captured at source initialization
-   private final Metadata readMetadataAtSourceInit;
--  private final StructType readSchemaAtSourceInit;
--  private final StructType readPartitionSchemaAtSourceInit;
--  private final Map<String, String> readConfigurationsAtSourceInit;
-   private final Protocol readProtocolAtSourceInit;
-   private final String metadataPath;
- 
-       DeltaStreamUtils.SchemaReadOptions schemaReadOptions,
-       Option<DeltaSourceMetadataTrackingLog> metadataTrackingLog,
-       Metadata readMetadataAtSourceInit,
--      StructType readSchemaAtSourceInit,
--      StructType readPartitionSchemaAtSourceInit,
--      Map<String, String> readConfigurationsAtSourceInit,
-       Protocol readProtocolAtSourceInit,
-       String metadataPath) {
-     this.spark = Objects.requireNonNull(spark);
-     this.schemaReadOptions = Objects.requireNonNull(schemaReadOptions);
-     this.metadataTrackingLog = Objects.requireNonNull(metadataTrackingLog);
-     this.readMetadataAtSourceInit = Objects.requireNonNull(readMetadataAtSourceInit);
--    this.readSchemaAtSourceInit = Objects.requireNonNull(readSchemaAtSourceInit);
--    this.readPartitionSchemaAtSourceInit = Objects.requireNonNull(readPartitionSchemaAtSourceInit);
--    this.readConfigurationsAtSourceInit = Objects.requireNonNull(readConfigurationsAtSourceInit);
-     this.readProtocolAtSourceInit = Objects.requireNonNull(readProtocolAtSourceInit);
-     this.metadataPath = Objects.requireNonNull(metadataPath);
-     this.persistedMetadataAtSourceInit =
-             newSchemaVersion,
-             persistedOpt,
-             new KernelProtocolAdapter(readProtocolAtSourceInit),
--            readSchemaAtSourceInit,
--            readPartitionSchemaAtSourceInit,
--            ScalaUtils.toScalaMap(readConfigurationsAtSourceInit),
-+            new KernelMetadataAdapter(readMetadataAtSourceInit),
-             spark);
-   }
- 
              tablePath,
              ScalaUtils.toScalaMap(options),
              sourceMetadataPathOpt,
spark/v2/src/test/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandlerTest.java
@@ -17,31 +17,6 @@
  import scala.Option;
  
  /** Unit tests for {@link MetadataEvolutionHandler}. */
-   private MetadataEvolutionHandler buildLightweightHandler(
-       Option<DeltaSourceMetadataTrackingLog> trackingLog,
-       DeltaStreamUtils.SchemaReadOptions readOptions) {
--    KernelMetadataAdapter adapter = new KernelMetadataAdapter(DEFAULT_METADATA);
-     return new MetadataEvolutionHandler(
-         spark,
-         "test-table-id",
-         readOptions,
-         trackingLog,
-         DEFAULT_METADATA,
--        adapter.schema(),
--        adapter.partitionSchema(),
--        scala.jdk.javaapi.CollectionConverters.asJava(adapter.configuration()),
-         DEFAULT_PROTOCOL,
-         "/tmp/fake-table/_delta_log/_streaming_metadata");
-   }
-             schemaReadOptions(/* allowUnsafeColumnMappingRead= */ false),
-             Option.apply(trackingLog),
-             tableMetadata,
--            adapter.schema(),
--            adapter.partitionSchema(),
--            scala.jdk.javaapi.CollectionConverters.asJava(adapter.configuration()),
-             tableProtocol,
-             tablePath + "/_delta_log/_streaming_metadata");
- 
      assertTrue(result.isDefined());
      assertTrue(result.get().getCurrentTrackedMetadata().isEmpty());
    }
spark-unified/src/test/scala/io/delta/internal/ApplyV2StreamingSuite.scala
@@ -1,45 +0,0 @@
-diff --git a/spark-unified/src/test/scala/io/delta/internal/ApplyV2StreamingSuite.scala b/spark-unified/src/test/scala/io/delta/internal/ApplyV2StreamingSuite.scala
---- a/spark-unified/src/test/scala/io/delta/internal/ApplyV2StreamingSuite.scala
-+++ b/spark-unified/src/test/scala/io/delta/internal/ApplyV2StreamingSuite.scala
-       }
-     }
-   }
-+
-+  test("schema-tracking via V1 StreamingRelation: option propagates through V1 -> V2 conversion") {
-+    // Counterpart to the V2 rebuild tests above: those start from StreamingRelationV2 and exercise
-+    // the rebuild branch. This test starts from a V1 StreamingRelation carrying the schema-tracking
-+    // option in dataSource.options, and verifies the V1 -> V2 conversion branch hands the option to
-+    // the new SparkTable so its schema is driven by the persisted log entry.
-+    withTempDir { tableDir =>
-+      withTempDir { schemaLogDir =>
-+        val tablePath = tableDir.getCanonicalPath
-+        createDeltaTable(tablePath)
-+        val schemaLogPath = schemaLogDir.getCanonicalPath
-+        seedSchemaLogWithExtraColumn(tablePath, schemaLogPath)
-+
-+        val catalogTable = createCatalogTable(tableDir.toURI, ucManaged = false)
-+        val dataSource = DataSource(
-+          sparkSession = spark,
-+          userSpecifiedSchema = None,
-+          className = "delta",
-+          options = Map(
-+            "path" -> tablePath,
-+            DeltaOptions.SCHEMA_TRACKING_LOCATION -> schemaLogPath),
-+          catalogTable = Some(catalogTable))
-+        val plan = StreamingRelation(dataSource)
-+
-+        // STRICT mode forces V1 -> V2 conversion in ApplyV2Streaming.
-+        withSQLConf(DeltaSQLConf.V2_ENABLE_MODE.key -> "STRICT") {
-+          val result = applyRule(plan).asInstanceOf[StreamingRelationV2]
-+          val convertedTable = result.table.asInstanceOf[SparkTable]
-+          assert(convertedTable.getOptions.containsKey(DeltaOptions.SCHEMA_TRACKING_LOCATION))
-+          assert(convertedTable.getOptions.get(DeltaOptions.SCHEMA_TRACKING_LOCATION) ==
-+            schemaLogPath)
-+          // Schema is driven by the seeded log entry, not the underlying snapshot.
-+          assertSchemaMatchesSeededLogEntry(convertedTable)
-+          assert(result.output.map(_.name) == seededFieldNames)
-+        }
-+      }
-+    }
-+  }
- }
\ No newline at end of file
spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala
@@ -1,61 +0,0 @@
-diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala
---- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala
-+++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala
- 
-   /**
-    * Core logic for assigning column IDs and physical names to a schema.
--   * Takes raw schema and configuration inputs (no v1 Metadata dependency) so it can be
--   * reused by both v1 and v2 connectors.
-+   * Takes [[AbstractMetadata]] (no v1 Metadata dependency) so it can be reused by both v1
-+   * and v2 connectors. Bundling schema + configuration on each side avoids the swap footgun
-+   * of having two `StructType` and two `Map` parameters next to each other.
-    *
-    * @return (upgradedSchema, maxColumnId) - the schema with IDs/physical names assigned,
-    *         and the final max column ID.
-    */
-   private[delta] def assignColumnIdAndPhysicalNameToSchema(
--      newSchema: StructType,
--      oldSchema: StructType,
--      newConfiguration: Map[String, String],
--      oldConfiguration: Map[String, String],
-+      newMetadata: AbstractMetadata,
-+      oldMetadata: AbstractMetadata,
-       isChangingModeOnExistingTable: Boolean,
-       isOverwritingSchema: Boolean): (StructType, Long) = {
-+    val newSchema = newMetadata.schema
-+    val oldSchema = oldMetadata.schema
-+    val newConfiguration = newMetadata.configuration
-+    val oldConfiguration = oldMetadata.configuration
-     var maxId = DeltaConfigs.COLUMN_MAPPING_MAX_ID.fromMap(newConfiguration) max
-       DeltaConfigs.COLUMN_MAPPING_MAX_ID.fromMap(oldConfiguration) max
-       findMaxColumnId(newSchema)
-       isChangingModeOnExistingTable: Boolean,
-       isOverwritingSchema: Boolean): Metadata = {
-     val (finalSchema, newMaxId) = assignColumnIdAndPhysicalNameToSchema(
--      newMetadata.schema, oldMetadata.schema,
--      newMetadata.configuration, oldMetadata.configuration,
--      isChangingModeOnExistingTable, isOverwritingSchema)
-+      newMetadata, oldMetadata, isChangingModeOnExistingTable, isOverwritingSchema)
- 
-     newMetadata.copy(
-       schemaString = finalSchema.json,
-       // the new metadata, as the upgrade would use the logical name as the physical name, we could
-       // easily capture any difference in the schema using the same is{Drop,Rename}ColumnOperation
-       // utils.
--      val (upgradedSchema, _) = assignColumnIdAndPhysicalNameToSchema(
--        oldMetadata.schema, oldMetadata.schema,
--        oldMetadata.configuration, oldMetadata.configuration,
-+      val (upgradedSchema, upgradedMaxId) = assignColumnIdAndPhysicalNameToSchema(
-+        newMetadata = oldMetadata, oldMetadata = oldMetadata,
-         isChangingModeOnExistingTable = true, isOverwritingSchema = false)
-       // Construct an AbstractMetadata with the upgraded schema and the new column mapping mode
-       // so the comparison utils below can recognize column mapping metadata.
-         val schema: StructType = upgradedSchema
-         val partitionColumns: Seq[String] = oldMetadata.partitionColumns
-         val configuration: Map[String, String] = oldMetadata.configuration +
--          (DeltaConfigs.COLUMN_MAPPING_MODE.key -> newMetadata.columnMappingMode.name)
-+          (DeltaConfigs.COLUMN_MAPPING_MODE.key -> newMetadata.columnMappingMode.name,
-+            DeltaConfigs.COLUMN_MAPPING_MAX_ID.key -> upgradedMaxId.toString)
-         val columnMappingMode: DeltaColumnMappingMode = newMetadata.columnMappingMode
-       }
-       !hasColMappingOrPartitionSchemaChangeByMetadata(newMetadata, upgradedMetadata)
\ No newline at end of file
spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala
@@ -1,17 +0,0 @@
-diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala
---- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala
-+++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala
-    */
-   protected val readSchemaAtSourceInit: StructType = readSnapshotDescriptor.metadata.schema
- 
--  protected val readPartitionSchemaAtSourceInit: StructType =
--    readSnapshotDescriptor.metadata.partitionSchema
--
-   protected val readProtocolAtSourceInit: Protocol = readSnapshotDescriptor.protocol
- 
--  protected val readConfigurationsAtSourceInit: Map[String, String] =
--    readSnapshotDescriptor.metadata.configuration
--
-   /**
-    * Create a snapshot descriptor, customizing its metadata using metadata tracking if necessary
-    */
\ No newline at end of file
spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataEvolutionSupport.scala
@@ -1,70 +0,0 @@
-diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataEvolutionSupport.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataEvolutionSupport.scala
---- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataEvolutionSupport.scala
-+++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataEvolutionSupport.scala
-       newSchemaVersion,
-       persistedMetadataAtSourceInit,
-       readProtocolAtSourceInit,
--      readSchemaAtSourceInit,
--      readPartitionSchemaAtSourceInit,
--      readConfigurationsAtSourceInit,
-+      readSnapshotDescriptor.metadata,
-       spark)
-   }
- 
-    * @param newSchemaVersion The version of the incoming change.
-    * @param persistedMetadataAtSourceInit The persisted metadata at source init, if any.
-    * @param readProtocolAtSourceInit The protocol at source init.
--   * @param readSchemaAtSourceInit The schema at source init.
--   * @param readPartitionSchemaAtSourceInit The partition schema at source init.
--   * @param readConfigurationsAtSourceInit The table configurations at source init.
-+   * @param readMetadataAtSourceInit The metadata at source init (schema, partition schema, and
-+   *                                 configuration). Bundled to avoid the swap footgun of three
-+   *                                 adjacent params.
-    * @param spark The SparkSession (used for SQL conf checks).
-    */
-   def hasMetadataOrProtocolChangeComparedToStreamMetadata(
-       newSchemaVersion: Long,
-       persistedMetadataAtSourceInit: Option[PersistedMetadata],
-       readProtocolAtSourceInit: AbstractProtocol,
--      readSchemaAtSourceInit: StructType,
--      readPartitionSchemaAtSourceInit: StructType,
--      readConfigurationsAtSourceInit: Map[String, String],
-+      readMetadataAtSourceInit: AbstractMetadata,
-       spark: SparkSession): Boolean = {
-     if (persistedMetadataAtSourceInit.exists(_.deltaCommitVersion >= newSchemaVersion)) {
-       false
-     } else {
--      protocolChangeOpt.exists(p =>
--        p.minReaderVersion != readProtocolAtSourceInit.minReaderVersion ||
--          p.minWriterVersion != readProtocolAtSourceInit.minWriterVersion ||
--          p.readerFeatures != readProtocolAtSourceInit.readerFeatures ||
--          p.writerFeatures != readProtocolAtSourceInit.writerFeatures) ||
-+      protocolChangeOpt.exists(p => !p.equalsByFields(readProtocolAtSourceInit)) ||
-       metadataChangeOpt.exists { newMetadata =>
-         hasSchemaChangeComparedToStreamMetadata(
--          newMetadata.schema, readSchemaAtSourceInit, spark) ||
--          newMetadata.partitionSchema != readPartitionSchemaAtSourceInit ||
-+          newMetadata.schema, readMetadataAtSourceInit.schema, spark) ||
-+          newMetadata.partitionSchema != readMetadataAtSourceInit.partitionSchema ||
-           newMetadata.configuration.filterKeys(_.startsWith("delta.")).toMap !=
--            readConfigurationsAtSourceInit.filterKeys(_.startsWith("delta.")).toMap
-+            readMetadataAtSourceInit.configuration.filterKeys(_.startsWith("delta.")).toMap
-       }
-     }
-   }
-    * 1. A [[Metadata]] action change. OR
-    * 2. A [[Protocol]] change.
-    */
-- def getMergedConsecutiveMetadataChanges(
--     spark: SparkSession,
--     deltaLog: DeltaLog,
--     catalogTableOpt: Option[CatalogTable],
--     currentMetadata: PersistedMetadata): Option[PersistedMetadata] = {
-+  def getMergedConsecutiveMetadataChanges(
-+      spark: SparkSession,
-+      deltaLog: DeltaLog,
-+      catalogTableOpt: Option[CatalogTable],
-+      currentMetadata: PersistedMetadata): Option[PersistedMetadata] = {
-     val currentMetadataVersion = currentMetadata.deltaCommitVersion
-     // We start from the currentSchemaVersion so that we can stop early in case the current
-     // version still has file actions that potentially needs to be processed.
\ No newline at end of file
spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataTrackingLog.scala
@@ -1,38 +0,0 @@
-diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataTrackingLog.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataTrackingLog.scala
---- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataTrackingLog.scala
-+++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataTrackingLog.scala
- 
-   def fromJson(json: String): PersistedMetadata = JsonUtils.fromJson[PersistedMetadata](json)
- 
-+  /**
-+   * Builds a [[PersistedMetadata]] from V2 interop abstractions.
-+   *
-+   * Contract on [[AbstractProtocol]]: `readerFeatures` / `writerFeatures` must be consistent
-+   * with the min protocol versions: `readerFeatures` may only be defined when
-+   * `minReaderVersion >= TABLE_FEATURES_MIN_READER_VERSION`, and `writerFeatures` may only be
-+   * defined when `minWriterVersion >= TABLE_FEATURES_MIN_WRITER_VERSION`. The conversion below
-+   * relies on this invariant; [[Protocol]] will throw a `require` failure if an implementation
-+   * gets it wrong.
-+   */
-   def apply(
-       tableId: String,
-       deltaCommitVersion: Long,
-       parameters: Map[String, String],
-       sourceMetadataPathOpt: Option[String] = None,
-       mergeConsecutiveSchemaChanges: Boolean = false,
--      consecutiveSchemaChangesMerger: PersistedMetadata => Option[PersistedMetadata] = _ => None,
-+      consecutiveSchemaChangesMerger: PersistedMetadata => Option[PersistedMetadata] =
-+        _ => throw new IllegalStateException(
-+          "consecutiveSchemaChangesMerger must be provided when " +
-+            "mergeConsecutiveSchemaChanges is true"),
-       initMetadataLogEagerly: Boolean = true): DeltaSourceMetadataTrackingLog = {
-     val options = new CaseInsensitiveStringMap(parameters.asJava)
-     val sourceTrackingId = Option(options.get(DeltaOptions.STREAMING_SOURCE_TRACKING_ID))
-     )
- 
-     // During initialize schema log, validate against:
--    // 1. table snapshot to check for partition and tahoe id mismatch
-+    // 1. table id mismatch
-     // 2. source metadata path to ensure we are not using the wrong schema log for the source
-     log.getCurrentTrackedMetadata.foreach { schema =>
-       schema.validateAgainstSourceTableId(sourceTableId)
\ No newline at end of file
spark/src/main/scala/org/apache/spark/sql/delta/v2/interop/AbstractProtocol.scala
@@ -1,19 +0,0 @@
-diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/v2/interop/AbstractProtocol.scala b/spark/src/main/scala/org/apache/spark/sql/delta/v2/interop/AbstractProtocol.scala
---- a/spark/src/main/scala/org/apache/spark/sql/delta/v2/interop/AbstractProtocol.scala
-+++ b/spark/src/main/scala/org/apache/spark/sql/delta/v2/interop/AbstractProtocol.scala
-    * Returns None if table features are not enabled for writers.
-    */
-   def writerFeatures: Option[Set[String]]
-+
-+  /**
-+   * Field-wise equality across the abstract surface of [[AbstractProtocol]]. Use this instead of
-+   * comparing fields ad-hoc at call sites so that adding a new field to this trait forces an
-+   * update here rather than silently leaving stale comparisons elsewhere.
-+   */
-+  def equalsByFields(other: AbstractProtocol): Boolean =
-+    minReaderVersion == other.minReaderVersion &&
-+      minWriterVersion == other.minWriterVersion &&
-+      readerFeatures == other.readerFeatures &&
-+      writerFeatures == other.writerFeatures
- }
- 
\ No newline at end of file
spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSchemaEvolutionSuite.scala
@@ -1,99 +0,0 @@
-diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSchemaEvolutionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSchemaEvolutionSuite.scala
---- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSchemaEvolutionSuite.scala
-+++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSchemaEvolutionSuite.scala
- import org.apache.spark.sql.delta.actions.{Metadata, Protocol}
- import org.apache.spark.sql.delta.sources._
- import org.apache.spark.sql.delta.test.{DeltaColumnMappingSelectedTestMixin, DeltaSQLCommandTest}
-+import org.apache.spark.sql.delta.v2.interop.{AbstractMetadata, AbstractProtocol}
- import org.apache.spark.sql.delta.util.JsonUtils
- import org.apache.commons.io.FileUtils
- import org.apache.commons.lang3.exception.ExceptionUtils
-   with DeltaSourceSuiteBase with DeltaColumnMappingSelectedTestMixin with DeltaSQLCommandTest {
- 
-   override protected def runOnlyTests: Seq[String] = Seq(
-+    "detects metadata/protocol changes through the AbstractMetadata/AbstractProtocol surface",
-     "schema log initialization with additive schema changes",
-     "detect incompatible schema change while streaming",
-     "trigger.Once with deferred commit should work",
-     ))
-   }
- 
-+  test("detects metadata/protocol changes through the AbstractMetadata/AbstractProtocol " +
-+      "surface") {
-+    // Anonymous trait impls (not V1 Metadata/Protocol) prove the static actually relies on
-+    // the abstract surface. The trait wrapper used in production always passes V1 types,
-+    // so this is the only path that would catch a regression specific to non-V1 impls.
-+    val baseSchema = new StructType().add("a", StringType, nullable = true)
-+
-+    def mkMetadata(
-+        sch: StructType = baseSchema,
-+        partCols: Seq[String] = Seq.empty,
-+        conf: Map[String, String] = Map.empty): AbstractMetadata = new AbstractMetadata {
-+      override def id: String = "tid"
-+      override def name: String = ""
-+      override def description: String = ""
-+      override def schema: StructType = sch
-+      override def partitionColumns: Seq[String] = partCols
-+      override def configuration: Map[String, String] = conf
-+      override def columnMappingMode: DeltaColumnMappingMode = NoMapping
-+    }
-+
-+    def mkProtocol(
-+        readerV: Int = 1,
-+        writerV: Int = 2,
-+        readerFs: Option[Set[String]] = None,
-+        writerFs: Option[Set[String]] = None): AbstractProtocol = new AbstractProtocol {
-+      override def minReaderVersion: Int = readerV
-+      override def minWriterVersion: Int = writerV
-+      override def readerFeatures: Option[Set[String]] = readerFs
-+      override def writerFeatures: Option[Set[String]] = writerFs
-+    }
-+
-+    val readMetadata = mkMetadata()
-+    val readProtocol = mkProtocol()
-+
-+    def call(
-+        metadataChange: Option[AbstractMetadata] = None,
-+        protocolChange: Option[AbstractProtocol] = None,
-+        newVer: Long = 1L,
-+        persisted: Option[PersistedMetadata] = None): Boolean =
-+      DeltaSourceMetadataEvolutionSupport.hasMetadataOrProtocolChangeComparedToStreamMetadata(
-+        metadataChange, protocolChange, newVer, persisted, readProtocol, readMetadata, spark)
-+
-+    // No change: both sides identical anonymous impls -> false.
-+    assert(!call(metadataChange = Some(mkMetadata()), protocolChange = Some(mkProtocol())))
-+
-+    // Schema differs.
-+    assert(call(metadataChange =
-+      Some(mkMetadata(sch = baseSchema.add("b", StringType, nullable = true)))))
-+
-+    // Partition schema differs (same data schema, different partition columns).
-+    assert(call(metadataChange = Some(mkMetadata(partCols = Seq("a")))))
-+
-+    // delta.* configuration differs.
-+    assert(call(metadataChange = Some(mkMetadata(conf = Map("delta.foo" -> "bar")))))
-+
-+    // Non-delta.* configuration differs -> filtered out, no change.
-+    assert(!call(metadataChange = Some(mkMetadata(conf = Map("foo" -> "bar")))))
-+
-+    // Protocol differs by a single field (uses equalsByFields under the hood).
-+    assert(call(protocolChange = Some(mkProtocol(readerV = 2))))
-+
-+    // Persisted metadata is at or beyond newSchemaVersion -> short-circuits to false even if
-+    // every other input would otherwise indicate a change.
-+    val persisted = PersistedMetadata(
-+      tableId = "tid",
-+      deltaCommitVersion = 5L,
-+      dataSchemaJson = baseSchema.json,
-+      partitionSchemaJson = new StructType().json,
-+      sourceMetadataPath = "")
-+    assert(!call(
-+      metadataChange = Some(mkMetadata(sch = baseSchema.add("b", StringType, nullable = true))),
-+      protocolChange = Some(mkProtocol(readerV = 99)),
-+      newVer = 3L,
-+      persisted = Some(persisted)))
-+  }
-+
-   test("forward-compat: older version can read back newer JSON") {
-     val newSchema = PersistedMetadata(
-       tableId = "test",
\ No newline at end of file
spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java
@@ -1,12 +0,0 @@
-diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java
---- a/spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java
-+++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java
-             schemaReadOptions,
-             metadataTrackingLog,
-             readSnapshotAtSourceInit.getMetadata(),
--            readSchemaAtSourceInit,
--            this.partitionSchema,
--            readConfigurationsAtSourceInit,
-             readProtocolAtSourceInit,
-             metadataPath);
-     boolean shouldValidateSchemaOnRestart =
\ No newline at end of file
spark/v2/src/test/java/io/delta/spark/internal/v2/adapters/ActionAdaptersTest.java
@@ -1,54 +0,0 @@
-diff --git a/spark/v2/src/test/java/io/delta/spark/internal/v2/adapters/ActionAdaptersTest.java b/spark/v2/src/test/java/io/delta/spark/internal/v2/adapters/ActionAdaptersTest.java
---- a/spark/v2/src/test/java/io/delta/spark/internal/v2/adapters/ActionAdaptersTest.java
-+++ b/spark/v2/src/test/java/io/delta/spark/internal/v2/adapters/ActionAdaptersTest.java
-     assertThrows(NullPointerException.class, () -> new KernelProtocolAdapter(null));
-   }
- 
-+  @Test
-+  public void testProtocolAdapterEqualsByFields() {
-+    Set<String> rf = new HashSet<>(Arrays.asList("v2Checkpoint"));
-+    Set<String> wf = new HashSet<>(Arrays.asList("rowTracking"));
-+    KernelProtocolAdapter base = new KernelProtocolAdapter(new Protocol(3, 7, rf, wf));
-+
-+    // Identical fields (built from a fresh Protocol instance) compare equal both directions.
-+    KernelProtocolAdapter same =
-+        new KernelProtocolAdapter(
-+            new Protocol(
-+                3,
-+                7,
-+                new HashSet<>(Arrays.asList("v2Checkpoint")),
-+                new HashSet<>(Arrays.asList("rowTracking"))));
-+    assertTrue(base.equalsByFields(same));
-+    assertTrue(same.equalsByFields(base));
-+
-+    // Each field difference flips the result to false.
-+    assertFalse(
-+        base.equalsByFields(new KernelProtocolAdapter(new Protocol(2, 7, rf, wf))),
-+        "minReaderVersion mismatch should not be equal");
-+    assertFalse(
-+        base.equalsByFields(new KernelProtocolAdapter(new Protocol(3, 6, rf, wf))),
-+        "minWriterVersion mismatch should not be equal");
-+    assertFalse(
-+        base.equalsByFields(
-+            new KernelProtocolAdapter(
-+                new Protocol(3, 7, new HashSet<>(Arrays.asList("columnMapping")), wf))),
-+        "readerFeatures mismatch should not be equal");
-+    assertFalse(
-+        base.equalsByFields(
-+            new KernelProtocolAdapter(
-+                new Protocol(3, 7, rf, new HashSet<>(Arrays.asList("columnMapping"))))),
-+        "writerFeatures mismatch should not be equal");
-+
-+    // Some(empty) vs None — features defined only at table-features versions; the helper must
-+    // distinguish (3,7,Some(empty),Some(empty)) from legacy (1,2,None,None).
-+    KernelProtocolAdapter emptyFeatures =
-+        new KernelProtocolAdapter(
-+            new Protocol(3, 7, Collections.emptySet(), Collections.emptySet()));
-+    KernelProtocolAdapter legacy = new KernelProtocolAdapter(new Protocol(1, 2));
-+    assertFalse(emptyFeatures.equalsByFields(legacy));
-+    assertTrue(legacy.equalsByFields(new KernelProtocolAdapter(new Protocol(1, 2))));
-+  }
-+
-   // ===== KernelMetadataAdapter =====
- 
-   @Test
\ No newline at end of file
spark/v2/src/test/java/io/delta/spark/internal/v2/utils/StreamingHelperTest.java
@@ -1,16 +0,0 @@
-diff --git a/spark/v2/src/test/java/io/delta/spark/internal/v2/utils/StreamingHelperTest.java b/spark/v2/src/test/java/io/delta/spark/internal/v2/utils/StreamingHelperTest.java
---- a/spark/v2/src/test/java/io/delta/spark/internal/v2/utils/StreamingHelperTest.java
-+++ b/spark/v2/src/test/java/io/delta/spark/internal/v2/utils/StreamingHelperTest.java
-   private void setupTableWithProtocolUpgradeAtV2(String testTablePath, String testTableName) {
-     createEmptyTestTable(testTablePath, testTableName); // v0
-     spark.sql(String.format("INSERT INTO %s VALUES (1, 'Alice')", testTableName)); // v1
-+    // Adding a table feature forces a real protocol upgrade; setting only minReader/minWriter to
-+    // (3, 7) is normalized back to (1, 2) by Protocol.merge when no features are introduced and
-+    // produces no Protocol action.
-     spark.sql(
-         String.format(
--            "ALTER TABLE %s SET TBLPROPERTIES "
--                + "('delta.minReaderVersion' = '3', 'delta.minWriterVersion' = '7')",
-+            "ALTER TABLE %s SET TBLPROPERTIES " + "('delta.feature.deletionVectors' = 'supported')",
-             testTableName)); // v2
-   }
\ No newline at end of file

Reproduce locally: git range-diff d8362c3..68709cd d344128..381ecd6 | Disable: git config gitstack.push-range-diff false

@PorridgeSwim PorridgeSwim force-pushed the stack/consecutiveSchemaChangesMerger branch from 381ecd6 to 9a360aa Compare May 5, 2026 20:47
TimothyW553 pushed a commit that referenced this pull request May 5, 2026
## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/6546/files) to
review incremental changes.
-
[**stack/SparkMetadataAdapter**](#6546)
[[Files changed](https://github.com/delta-io/delta/pull/6546/files)]
-
[stack/RefactorMetadataTrackingLog](#6550)
[[Files
changed](https://github.com/delta-io/delta/pull/6550/files/9271a6262f7a2615b977de0319c7238044b7d0a9..8378d33acda70a34a109b35173a968a4b3401ec1)]
-
[stack/RefactorDeltaSourceMetadataEvolutionSupport](#6562)
[[Files
changed](https://github.com/delta-io/delta/pull/6562/files/8378d33acda70a34a109b35173a968a4b3401ec1..90365431b12640de181446ec9c2033fb1b143b03)]
-
[stack/MetadataEvolutionHandler2](#6563)
[[Files
changed](https://github.com/delta-io/delta/pull/6563/files/28bb7021adb12b055e1b281fdfee0ab48a8732ac..578870181fa81a9146b2fa907244e350ffcabb52)]
-
[stack/NonAdditiveSchemaEvolution2](#6570)
[[Files
changed](https://github.com/delta-io/delta/pull/6570/files/578870181fa81a9146b2fa907244e350ffcabb52..c025b7c3c386e8d46d6142d0727dce95582bb0ef)]
-
[stack/NonAdditiveSchemaEvolution3](#6697)
[[Files
changed](https://github.com/delta-io/delta/pull/6697/files/c025b7c3c386e8d46d6142d0727dce95582bb0ef..db16b9fa80a80c105430c93589126ba8b828458f)]
-
[stack/consecutiveSchemaChangesMerger](#6698)
[[Files
changed](https://github.com/delta-io/delta/pull/6698/files/0148020ffe11e7b079e99fa8c5189a19c354f2be..9a360aa819f20d78b5361b2e997d24433fb793d5)]

---------
#### Which Delta project/connector is this regarding?

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

PR 1/7 in the non-additive schema evolution for V2 streaming connector
stack.

The shared V1 Scala utilities (`DeltaColumnMapping`,
`DeltaSourceMetadataEvolutionSupport`) operate on
`AbstractMetadata`/`AbstractProtocol`, but V2 holds Kernel types. This
PR creates two adapter classes that bridge the gap:

- `KernelMetadataAdapter`: Kernel `Metadata` → `AbstractMetadata`
(schema conversion via `SchemaUtils`, partition columns and
configuration converted to Scala collections)
- `KernelProtocolAdapter`: Kernel `Protocol` → `AbstractProtocol` (maps
reader/writer features to `Option[Set[String]]`)

Also adds `columnMappingMode` and `partitionSchema` to the
`AbstractMetadata` trait — V1's `Metadata` already had these fields, the
trait just didn't expose them.

## How was this patch tested?

Unit tests in `ActionAdaptersTest.java`: table-features protocol, legacy
protocol, full metadata round-trip, null optional fields, and null
constructor rejection.

## Does this PR introduce _any_ user-facing changes?

No.
@PorridgeSwim PorridgeSwim force-pushed the stack/consecutiveSchemaChangesMerger branch from 9a360aa to 48aa86b Compare May 6, 2026 00:04
@PorridgeSwim
Copy link
Copy Markdown
Collaborator Author

Range-diff: stack/NonAdditiveSchemaEvolution3 (9a360aa -> 48aa86b)
spark/v2/src/main/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandler.java
@@ -15,7 +15,7 @@
 +import io.delta.kernel.internal.commitrange.CommitRangeImpl;
  import io.delta.kernel.internal.util.Utils;
  import io.delta.kernel.utils.CloseableIterator;
- import io.delta.spark.internal.v2.adapters.KernelMetadataAdapter;
+ import io.delta.kernel.utils.CloseableIterator.BreakableFilterResult;
  import io.delta.spark.internal.v2.snapshot.DeltaSnapshotManager;
  import io.delta.spark.internal.v2.utils.ScalaUtils;
  import io.delta.spark.internal.v2.utils.StreamingHelper;

Reproduce locally: git range-diff 0148020..9a360aa ea7cfeb..48aa86b | Disable: git config gitstack.push-range-diff false

@PorridgeSwim PorridgeSwim force-pushed the stack/consecutiveSchemaChangesMerger branch 2 times, most recently from 9c89386 to f22ba06 Compare May 6, 2026 01:34
@PorridgeSwim
Copy link
Copy Markdown
Collaborator Author

Range-diff: stack/NonAdditiveSchemaEvolution3 (9c89386 -> f22ba06)
spark/v2/src/main/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandler.java
@@ -28,11 +28,12 @@
              sourceMetadataPathOpt,
 -            // TODO(#5319): Implement v2 consecutiveSchema schema changes merger
 -            /* mergeConsecutiveSchemaChanges= */ false,
--            /* consecutiveSchemaChangesMerger= */ null,
+-            /* consecutiveSchemaChangesMerger= */ Option.empty(),
 +            /* mergeConsecutiveSchemaChanges= */ mergeConsecutiveSchemaChanges,
-+            /* consecutiveSchemaChangesMerger= */ currentMetadata ->
-+                getMergedConsecutiveMetadataChanges(
-+                    currentMetadata, snapshotManager, engine, tablePath, mergeActionSet),
++            /* consecutiveSchemaChangesMerger= */ Option.apply(
++                currentMetadata ->
++                    getMergedConsecutiveMetadataChanges(
++                        currentMetadata, snapshotManager, engine, tablePath, mergeActionSet)),
              /* initMetadataLogEagerly= */ true));
    }
 +

Reproduce locally: git range-diff 6854223..9c89386 13395a7..f22ba06 | Disable: git config gitstack.push-range-diff false

TimothyW553 pushed a commit that referenced this pull request May 6, 2026
…#6550)

## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/6550/files) to
review incremental changes.
-
[stack/SparkMetadataAdapter](#6546)
[[Files changed](https://github.com/delta-io/delta/pull/6546/files)]
[MERGED]
-
[**stack/RefactorMetadataTrackingLog**](#6550)
[[Files changed](https://github.com/delta-io/delta/pull/6550/files)]
-
[stack/RefactorDeltaSourceMetadataEvolutionSupport](#6562)
[[Files
changed](https://github.com/delta-io/delta/pull/6562/files/953f137f8c4ce46d8b8a9605b0c7bed898e30df4..027984b6edcbad0f4731e560425c2ed9bcf8fc27)]
-
[stack/MetadataEvolutionHandler2](#6563)
[[Files
changed](https://github.com/delta-io/delta/pull/6563/files/027984b6edcbad0f4731e560425c2ed9bcf8fc27..ada845895139edcb2727a87b39922c8e16837a99)]
-
[stack/NonAdditiveSchemaEvolution2](#6570)
[[Files
changed](https://github.com/delta-io/delta/pull/6570/files/ada845895139edcb2727a87b39922c8e16837a99..476762fde7b9cb9b9bc3e416c86a260cd29806ed)]
-
[stack/NonAdditiveSchemaEvolution3](#6697)
[[Files
changed](https://github.com/delta-io/delta/pull/6697/files/476762fde7b9cb9b9bc3e416c86a260cd29806ed..13395a7f2a49db4962091e8ee919bebdab5bd4e2)]
-
[stack/consecutiveSchemaChangesMerger](#6698)
[[Files
changed](https://github.com/delta-io/delta/pull/6698/files/13395a7f2a49db4962091e8ee919bebdab5bd4e2..f22ba063eaf35ab69d653a2d5faefdc52f35eab5)]

---------
#### Which Delta project/connector is this regarding?

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

PR 2/7 in the non-additive schema evolution for V2 streaming connector
stack.

Decouple `DeltaSourceMetadataTrackingLog` and `PersistedMetadata` from
V1-specific types so the schema log can be reused by the V2 connector.

- Replace `SnapshotDescriptor` parameter in `create()` with plain
`sourceTableId` and `sourceDataPath` strings
- Unify `PersistedMetadata.apply` to accept
`AbstractMetadata`/`AbstractProtocol` instead of V1
`Metadata`/`Protocol`
- Extract the consecutive schema changes merger (V1-specific, depends on
`DeltaLog`) out of the companion object into
`DeltaSourceMetadataEvolutionSupport`, and inject it as a function
parameter so V2 can provide its own implementation
- Remove `Protocol`'s `private` constructor modifier to allow
construction from abstract protocol fields

All changes are structural refactors with no behavioral change.

## How was this patch tested?

Existing tests in `DeltaSourceSchemaEvolutionSuite` updated to use the
new API. No behavioral changes.

## Does this PR introduce _any_ user-facing changes?

No.
@PorridgeSwim PorridgeSwim force-pushed the stack/consecutiveSchemaChangesMerger branch 2 times, most recently from f3777c3 to 8101b33 Compare May 6, 2026 17:56
murali-db pushed a commit that referenced this pull request May 6, 2026
…seable in v2 (#6562)

## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/6562/files) to
review incremental changes.
-
[stack/SparkMetadataAdapter](#6546)
[[Files changed](https://github.com/delta-io/delta/pull/6546/files)]
[MERGED]
-
[stack/RefactorMetadataTrackingLog](#6550)
[[Files changed](https://github.com/delta-io/delta/pull/6550/files)]
[MERGED]
-
[**stack/RefactorDeltaSourceMetadataEvolutionSupport**](#6562)
[[Files changed](https://github.com/delta-io/delta/pull/6562/files)]
-
[stack/MetadataEvolutionHandler2](#6563)
[[Files
changed](https://github.com/delta-io/delta/pull/6563/files/ed92a0fa2051432b6bc5784034df0b7949bbfb98..e5b2c3295843ec85753e07dc0010aa5ccebaabb7)]
-
[stack/NonAdditiveSchemaEvolution2](#6570)
[[Files
changed](https://github.com/delta-io/delta/pull/6570/files/e5b2c3295843ec85753e07dc0010aa5ccebaabb7..7c66bf11a0f1b651cda32ed7f529f552dd9dbfcb)]
-
[stack/NonAdditiveSchemaEvolution3](#6697)
[[Files
changed](https://github.com/delta-io/delta/pull/6697/files/7c66bf11a0f1b651cda32ed7f529f552dd9dbfcb..14956ea304c93d2343ccd7eb89a112966f07f906)]
-
[stack/consecutiveSchemaChangesMerger](#6698)
[[Files
changed](https://github.com/delta-io/delta/pull/6698/files/14956ea304c93d2343ccd7eb89a112966f07f906..8101b335b892a6a5b6d6fe11f4a202d14102721c)]

---------
#### Which Delta project/connector is this regarding?

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

PR 3/7 in the non-additive schema evolution for V2 streaming connector
stack.

Refactor `DeltaSourceMetadataEvolutionSupport` and `DeltaColumnMapping`
so the schema change detection logic can be called from V2 without
depending on V1 instance state.

**`DeltaSourceMetadataEvolutionSupport`:**
- Extract instance methods (`validateAndResolveMetadataEvolution`,
`checkColumnMappingSchemaChangesDuringStreaming`,
`resolveMetadataEvolutionForCommitRange`, etc.) to companion object
statics that accept explicit parameters instead of accessing V1
`DeltaSource` via `this`
- V1 trait methods now delegate to the companion object statics

**`DeltaColumnMapping`:**
- Widen `hasNoColumnMappingSchemaChanges` from V1 `Metadata` to
`AbstractMetadata` so V2 can call it via the adapter layer
- Extract `assignColumnIdAndPhysicalNameToSchema(StructType, Map)` from
`assignColumnIdAndPhysicalName(Metadata, Metadata, ...)` — needed for
simulating column mapping upgrades during NoMapping-to-NameMapping
transitions

All changes are structural refactors with no behavioral change.

## How was this patch tested?

Existing tests in `DeltaSourceSchemaEvolutionSuite` continue to pass. No
behavioral changes.

## Does this PR introduce _any_ user-facing changes?

No.
@PorridgeSwim PorridgeSwim force-pushed the stack/consecutiveSchemaChangesMerger branch 2 times, most recently from 979efdf to d4932c1 Compare May 6, 2026 23:16
@PorridgeSwim PorridgeSwim force-pushed the stack/consecutiveSchemaChangesMerger branch from d4932c1 to 5e5d260 Compare May 7, 2026 03:05
Copy link
Copy Markdown
Collaborator

@TimothyW553 TimothyW553 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a few comments

}
if ((addVec != null && !addVec.isNullAt(rowId))
|| (removeVec != null && !removeVec.isNullAt(rowId))
|| (cdcVec != null && !cdcVec.isNullAt(rowId))) {
Copy link
Copy Markdown
Collaborator

@TimothyW553 TimothyW553 May 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SparkTable and SparkScan pass ACTION_SET which has no CDC , so cdcIdx is always -1 and a commit with both a CDC action and a metadata change will fold past v1's FileAction stop , could we add CDC to the action set here ?

Copy link
Copy Markdown
Collaborator Author

@PorridgeSwim PorridgeSwim May 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't plan to support CDC in current PR, I will remove cdc check here and add a TODO


CommitRangeImpl commitRange =
(CommitRangeImpl)
snapshotManager.getTableChanges(engine, currentMetadataVersion, Optional.empty());
Copy link
Copy Markdown
Collaborator

@TimothyW553 TimothyW553 May 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getTableChanges returns the CommitRange interface , so this cast will throw ClassCastException if the impl ever changes , could we widen getCommitActionsFromRangeUnsafe to take CommitRange instead ?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is out of scope for this PR

* Replicates {@code PersistedMetadata.apply}'s protocol conversion: build a Spark {@code
* Protocol} from the Kernel protocol's reader/writer versions and features, then serialize.
*/
private static String toSparkProtocolJson(Protocol kernelProtocol) {
Copy link
Copy Markdown
Collaborator

@TimothyW553 TimothyW553 May 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this code does the same kernel Protocol to spark Protocol JSON conversion as PersistedMetadata.apply , could we make one shared helper so both sides stay in sync ?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added in object PersistedMetadata

// metadata-only commits, which would defeat this test's premise that the seeded v0 entry
// wins over the evolved v1 snapshot.
withSQLConf(
DeltaSQLConf.DELTA_STREAMING_ENABLE_SCHEMA_TRACKING_MERGE_CONSECUTIVE_CHANGES().key(),
Copy link
Copy Markdown
Collaborator

@TimothyW553 TimothyW553 May 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the merger on path is not tested , could we add a test that runs with the merger on ?

@PorridgeSwim PorridgeSwim mentioned this pull request May 10, 2026
murali-db pushed a commit that referenced this pull request May 11, 2026
## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/6563/files) to
review incremental changes.
-
[stack/SparkMetadataAdapter](#6546)
[[Files changed](https://github.com/delta-io/delta/pull/6546/files)]
[MERGED]
-
[stack/RefactorMetadataTrackingLog](#6550)
[[Files changed](https://github.com/delta-io/delta/pull/6550/files)]
[MERGED]
-
[stack/RefactorDeltaSourceMetadataEvolutionSupport](#6562)
[[Files changed](https://github.com/delta-io/delta/pull/6562/files)]
[MERGED]
-
[**stack/MetadataEvolutionHandler2**](#6563)
[[Files changed](https://github.com/delta-io/delta/pull/6563/files)]
-
[stack/NonAdditiveSchemaEvolution2](#6570)
[[Files
changed](https://github.com/delta-io/delta/pull/6570/files/a20f1f3ab452a75fc954e15c57c17327e0cb9267..0e07f87285becd6be416450ae084df454d9c94a9)]
-
[stack/NonAdditiveSchemaEvolution3](#6697)
[[Files
changed](https://github.com/delta-io/delta/pull/6697/files/0e07f87285becd6be416450ae084df454d9c94a9..73e1aa7f4162a3e1480ffd2b88b9ca79d852f2fe)]
-
[stack/consecutiveSchemaChangesMerger](#6698)
[[Files
changed](https://github.com/delta-io/delta/pull/6698/files/73e1aa7f4162a3e1480ffd2b88b9ca79d852f2fe..5e5d260b64d45cc11bcfdb58e5aab1b2d2637b33)]
- [stack/V1V2MixTest](#6759)
[[Files
changed](https://github.com/delta-io/delta/pull/6759/files/5e5d260b64d45cc11bcfdb58e5aab1b2d2637b33..738379713040986c74f98dbebfdc6c83ec1d3f16)]

---------
#### Which Delta project/connector is this regarding?

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

PR 4/7 in the non-additive schema evolution for V2 streaming connector
stack.

Introduce `MetadataEvolutionHandler`, a Java class that implements the
V1 barrier protocol for schema evolution in the V2 connector. In V1 this
logic lives in `DeltaSourceMetadataEvolutionSupport`, a Scala trait
mixed into `DeltaSource` that accesses stream state via `this`. Since
V2's `SparkMicroBatchStream` is Java and cannot use Scala trait mixins,
`MetadataEvolutionHandler` receives all dependencies via constructor
injection instead.

The handler covers the full schema evolution lifecycle:
- **Stream start**: eager metadata tracking log initialization on first
batch
- **Offset generation**: injects `METADATA_CHANGE_INDEX` /
`POST_METADATA_CHANGE_INDEX` barrier sentinels into the file change
iterator
- **Pending schema offsets**: returns barrier offsets for in-progress
schema changes
- **Batch commit**: updates the schema log and throws
`DELTA_STREAMING_METADATA_EVOLUTION` to trigger stream restart
- **Batch planning on restart**: validates and re-initializes the schema
log

All detection logic delegates to the shared
`DeltaSourceMetadataEvolutionSupport$` companion object statics
(refactored in PR 3/7). V2-specific orchestration is limited to wiring
the barrier protocol into the `CloseableIterator<IndexedFile>` pipeline
and collecting metadata/protocol from Kernel commit ranges via
`StreamingHelper`.

Also extends `StreamingHelper` with
`getMetadataAndProtocolForVersionRange` to collect metadata and protocol
actions from a range of Kernel commits.

## How was this patch tested?

Unit tests in `MetadataEvolutionHandlerTest.java` covering: barrier
protocol (METADATA_CHANGE_INDEX / POST_METADATA_CHANGE_INDEX offset
generation), tracking state transitions, initialization lifecycle,
offset arithmetic, pending schema change handling, and commit-time
evolution exception.

## Does this PR introduce _any_ user-facing changes?

No.
@PorridgeSwim PorridgeSwim force-pushed the stack/consecutiveSchemaChangesMerger branch 6 times, most recently from f7ebfc4 to 7f9b7f2 Compare May 15, 2026 05:45
@PorridgeSwim PorridgeSwim force-pushed the stack/consecutiveSchemaChangesMerger branch from 7f9b7f2 to 4bf2fa3 Compare May 15, 2026 22:34
murali-db pushed a commit that referenced this pull request May 16, 2026
## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/6570/files) to
review incremental changes.
-
[stack/SparkMetadataAdapter](#6546)
[[Files changed](https://github.com/delta-io/delta/pull/6546/files)]
[MERGED]
-
[stack/RefactorMetadataTrackingLog](#6550)
[[Files changed](https://github.com/delta-io/delta/pull/6550/files)]
[MERGED]
-
[stack/RefactorDeltaSourceMetadataEvolutionSupport](#6562)
[[Files changed](https://github.com/delta-io/delta/pull/6562/files)]
[MERGED]
-
[stack/MetadataEvolutionHandler2](#6563)
[[Files changed](https://github.com/delta-io/delta/pull/6563/files)]
[MERGED]
-
[**stack/NonAdditiveSchemaEvolution2**](#6570)
[[Files changed](https://github.com/delta-io/delta/pull/6570/files)]
-
[stack/NonAdditiveSchemaEvolution3](#6697)
[[Files
changed](https://github.com/delta-io/delta/pull/6697/files/b7f6c8ebfc0882e7e2cc580f09f376be23a8d43d..dbb6246c14be1ab7f017ad9fc26455ae599ee676)]
-
[stack/consecutiveSchemaChangesMerger](#6698)
[[Files
changed](https://github.com/delta-io/delta/pull/6698/files/dbb6246c14be1ab7f017ad9fc26455ae599ee676..4bf2fa3fa828bcab0b56c4c26ca51ee9cc40b482)]
-
[stack/SchemaTrackingWithCDC](#6801)
[[Files
changed](https://github.com/delta-io/delta/pull/6801/files/4bf2fa3fa828bcab0b56c4c26ca51ee9cc40b482..a78a4ac2bc9a52605278a36b98804230258c12a2)]
- [stack/V1V2MixTest](#6759)
[[Files
changed](https://github.com/delta-io/delta/pull/6759/files/7f9b7f2724b2245ab7380908616303cf7ea95fca..e146cdc9ebb0572e8b0a928cc6dd3bfdc198d984)]

---------
#### Which Delta project/connector is this regarding?

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

PR 5/7 in the non-additive schema evolution for V2 streaming connector
stack.

Wire schema tracking into V2's analysis path so the analyzed plan
reflects the persisted (evolved) schema instead of the live snapshot
schema.

- `DeltaAnalysis.verifyDeltaSourceSchemaLocation`: extend the
duplicate-schema-location check to also visit `StreamingRelationV2`,
keyed on the V2 `Table.name`.
- `SparkTable`: open `DeltaSourceMetadataTrackingLog` once during
construction (gated on `mergeConsecutiveSchemaChanges`) and seed
`SchemaProvider` from the persisted metadata, so analysis-time
`schema()` matches what the stream will read at runtime.
- `ApplyV2ReadOptions` (renamed from `ApplyV2Streaming`): generalize the
CDC-only rebuild to also fire when `schemaTrackingLocation` arrives via
`extraOptions` on the catalog `readStream.table()` path; rebuild
`SparkTable` with merged options so the schema-log lookup actually
fires.
- `MetadataEvolutionHandler.getMetadataTrackingLogForMicroBatchStream`:
V2 port of V1's helper, reused by `SparkTable` (analysis) and
`SparkScan` (execution).

## How was this patch tested?

`SparkTableTest`, `MetadataEvolutionHandlerTest`,
`ApplyV2ReadOptionsSuite`. Unified `DeltaV2SourceSchemaEvolutionSuite`
updated.

## Does this PR introduce _any_ user-facing changes?

No.
@PorridgeSwim PorridgeSwim force-pushed the stack/consecutiveSchemaChangesMerger branch from 4bf2fa3 to b074167 Compare May 16, 2026 00:18
@PorridgeSwim PorridgeSwim force-pushed the stack/consecutiveSchemaChangesMerger branch from b074167 to f612628 Compare May 16, 2026 06:54
murali-db pushed a commit that referenced this pull request May 16, 2026
…6697)

## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/6697/files) to
review incremental changes.
-
[stack/SparkMetadataAdapter](#6546)
[[Files changed](https://github.com/delta-io/delta/pull/6546/files)]
[MERGED]
-
[stack/RefactorMetadataTrackingLog](#6550)
[[Files changed](https://github.com/delta-io/delta/pull/6550/files)]
[MERGED]
-
[stack/RefactorDeltaSourceMetadataEvolutionSupport](#6562)
[[Files changed](https://github.com/delta-io/delta/pull/6562/files)]
[MERGED]
-
[stack/MetadataEvolutionHandler2](#6563)
[[Files changed](https://github.com/delta-io/delta/pull/6563/files)]
[MERGED]
-
[stack/NonAdditiveSchemaEvolution2](#6570)
[[Files changed](https://github.com/delta-io/delta/pull/6570/files)]
[MERGED]
-
[**stack/NonAdditiveSchemaEvolution3**](#6697)
[[Files changed](https://github.com/delta-io/delta/pull/6697/files)]
-
[stack/consecutiveSchemaChangesMerger](#6698)
[[Files
changed](https://github.com/delta-io/delta/pull/6698/files/f96643aa3cc01e7f70cc13a18b82dc27f277f11d..f612628ad931ec35c237801109f01b6fbd1379f7)]
-
[stack/SchemaTrackingWithCDC](#6801)
[[Files
changed](https://github.com/delta-io/delta/pull/6801/files/f612628ad931ec35c237801109f01b6fbd1379f7..4aeacfb120b33e9cdfe124352290b72f53f7cf89)]
- [stack/V1V2MixTest](#6759)
[[Files
changed](https://github.com/delta-io/delta/pull/6759/files/f612628ad931ec35c237801109f01b6fbd1379f7..0c818ee431ab417a4f2ffbcc609930be09d25031)]

---------
#### Which Delta project/connector is this regarding?

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

PR 6/7 in the non-additive schema evolution for V2 streaming connector
stack.

Wire `MetadataEvolutionHandler` into `SparkMicroBatchStream` and
`SparkScan` so V2 streaming reads honor non-additive schema evolution
(column rename/drop, type widening).

- `SparkMicroBatchStream`: take `metadataTrackingLog` + `metadataPath`
as constructor inputs; when a persisted entry exists, layer it onto the
freshly loaded `snapshotAtSourceInit` to derive
`readSnapshotAtSourceInit` (mirrors V1's `readSnapshotDescriptor`).
Integrate the schema-evolution barrier protocol into `latestOffset` /
`commit` / `planInputPartitions`. Skip the on-restart schema-validation
check when schema tracking is active — the schema-log evolution
exception covers it.
- `SparkScan.toMicroBatchStream`: reload latest snapshot (the
analysis-time `initialSnapshot` can be stale by stream start), open the
tracking log via
`MetadataEvolutionHandler.getMetadataTrackingLogForMicroBatchStream`
with `mergeConsecutiveSchemaChanges=false` (the merger only runs at
analysis), and pass it through with the checkpoint location.
- `SparkScan` option allow-list: move `allowSourceColumnDrop` / `Rename`
/ `TypeChange` out of the unsupported list now that they are honored.

## How was this patch tested?

`SparkMicroBatchStreamTest`, `MetadataEvolutionHandlerTest`. Unified
suites (`DeltaV2SourceSchemaEvolutionSuite`,
`TypeWideningStreamingV2SourceSuite`,
`RemoveColumnMappingStreamingReadV2Suite`) move non-merger evolution
scenarios from `shouldFailTests` to `shouldPassTests`; merger-dependent
tests remain pending until PR 7/7.

## Does this PR introduce _any_ user-facing changes?

No.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants