Skip to content

[kernel-spark] Refactor DeltaSourceMetadataEvolutionSupport to be reuseable in v2#6562

Merged
murali-db merged 1 commit into
delta-io:masterfrom
PorridgeSwim:stack/RefactorDeltaSourceMetadataEvolutionSupport
May 6, 2026
Merged

[kernel-spark] Refactor DeltaSourceMetadataEvolutionSupport to be reuseable in v2#6562
murali-db merged 1 commit into
delta-io:masterfrom
PorridgeSwim:stack/RefactorDeltaSourceMetadataEvolutionSupport

Conversation

@PorridgeSwim
Copy link
Copy Markdown
Collaborator

@PorridgeSwim PorridgeSwim commented Apr 14, 2026

🥞 Stacked PR

Use this link to review incremental changes.


Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

PR 3/7 in the non-additive schema evolution for V2 streaming connector stack.

Refactor DeltaSourceMetadataEvolutionSupport and DeltaColumnMapping so the schema change detection logic can be called from V2 without depending on V1 instance state.

DeltaSourceMetadataEvolutionSupport:

  • Extract instance methods (validateAndResolveMetadataEvolution, checkColumnMappingSchemaChangesDuringStreaming, resolveMetadataEvolutionForCommitRange, etc.) to companion object statics that accept explicit parameters instead of accessing V1 DeltaSource via this
  • V1 trait methods now delegate to the companion object statics

DeltaColumnMapping:

  • Widen hasNoColumnMappingSchemaChanges from V1 Metadata to AbstractMetadata so V2 can call it via the adapter layer
  • Extract assignColumnIdAndPhysicalNameToSchema(StructType, Map) from assignColumnIdAndPhysicalName(Metadata, Metadata, ...) — needed for simulating column mapping upgrades during NoMapping-to-NameMapping transitions

All changes are structural refactors with no behavioral change.

How was this patch tested?

Existing tests in DeltaSourceSchemaEvolutionSuite continue to pass. No behavioral changes.

Does this PR introduce any user-facing changes?

No.

@PorridgeSwim
Copy link
Copy Markdown
Collaborator Author

Range-diff: stack/RefactorMetadataTrackingLog (de14314 -> 6d9822e)
spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataEvolutionSupport.scala
@@ -160,7 +160,7 @@
 +      metadataChangeOpt.exists { newMetadata =>
 +        hasSchemaChangeComparedToStreamMetadata(
 +          newMetadata.schema, readSchemaAtSourceInit, spark) ||
-+          newMetadata.partitionColumns != readPartitionSchemaAtSourceInit ||
++          newMetadata.partitionSchema != readPartitionSchemaAtSourceInit ||
 +          newMetadata.configuration.filterKeys(_.startsWith("delta.")).toMap !=
 +            readConfigurationsAtSourceInit.filterKeys(_.startsWith("delta.")).toMap
 +      }

Reproduce locally: git range-diff cfcfa95..de14314 cfcfa95..6d9822e | Disable: git config gitstack.push-range-diff false

@PorridgeSwim PorridgeSwim force-pushed the stack/RefactorDeltaSourceMetadataEvolutionSupport branch from 6d9822e to 2490e84 Compare April 14, 2026 18:14
@PorridgeSwim PorridgeSwim force-pushed the stack/RefactorDeltaSourceMetadataEvolutionSupport branch from 2490e84 to 9491023 Compare April 15, 2026 22:24
@PorridgeSwim
Copy link
Copy Markdown
Collaborator Author

Range-diff: stack/RefactorMetadataTrackingLog (2490e84 -> 9491023)
spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataEvolutionSupport.scala
@@ -96,8 +96,10 @@
        DeltaSQLConf.DELTA_TYPE_WIDENING_BYPASS_STREAMING_TYPE_CHANGE_CHECK)
  
 +  /**
-+   * Whether this source should use schema tracking for metadata evolution.
-+   * Shared between v1 and v2 connectors.
++   * Whether this DeltaSource is utilizing a schema log entry as its read schema.
++   *
++   * If user explicitly turn on the flag to fall back to using latest schema to read (i.e. the
++   * legacy mode), we will ignore the schema log.
 +   */
 +  def shouldTrackMetadataChange(
 +      schemaReadOptions: DeltaStreamUtils.SchemaReadOptions,
@@ -107,8 +109,9 @@
 +  }
 +
 +  /**
-+   * Whether the tracking log should be initialized eagerly (log is provided but empty).
-+   * Shared between v1 and v2 connectors.
++   * Whether a schema tracking log is provided (and is empty), so we could initialize eagerly.
++   * This should only be used for the first write to the schema log, after then, schema tracking
++   * should not rely on this state any more.
 +   */
 +  def shouldInitializeMetadataTrackingEagerly(
 +      schemaReadOptions: DeltaStreamUtils.SchemaReadOptions,
@@ -169,7 +172,6 @@
 +
 +  /**
 +   * Check that the given schema is the same as the schema from the initial read snapshot.
-+   * This is shared between v1 and v2 connectors.
 +   */
 +  def hasSchemaChangeComparedToStreamMetadata(
 +      newSchema: StructType,

Reproduce locally: git range-diff 20f7851..2490e84 20f7851..9491023 | Disable: git config gitstack.push-range-diff false

@PorridgeSwim PorridgeSwim marked this pull request as ready for review April 15, 2026 22:24
@PorridgeSwim PorridgeSwim self-assigned this Apr 15, 2026
@PorridgeSwim PorridgeSwim changed the title Refactor DeltaSourceMetadataEvolutionSupport to be reuseable in v2 [kernel-spark] Refactor DeltaSourceMetadataEvolutionSupport to be reuseable in v2 Apr 15, 2026
@PorridgeSwim PorridgeSwim force-pushed the stack/RefactorDeltaSourceMetadataEvolutionSupport branch from 9491023 to 44b86f0 Compare April 24, 2026 23:17
@PorridgeSwim
Copy link
Copy Markdown
Collaborator Author

Range-diff: stack/RefactorMetadataTrackingLog (9491023 -> 44b86f0)
spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala
@@ -0,0 +1,11 @@
+diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala b/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala
+--- a/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala
++++ b/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala
+  * Note: Please initialize this class using the companion object's `apply` method, which will
+  * assign correct values (`Set()` vs `None`) to [[readerFeatures]] and [[writerFeatures]].
+  */
+-case class Protocol private (
++case class Protocol (
+     minReaderVersion: Int,
+     minWriterVersion: Int,
+     @JsonInclude(Include.NON_ABSENT) // write to JSON only when the field is not `None`
\ No newline at end of file

Reproduce locally: git range-diff 20f7851..9491023 37e60f2..44b86f0 | Disable: git config gitstack.push-range-diff false

@PorridgeSwim PorridgeSwim force-pushed the stack/RefactorDeltaSourceMetadataEvolutionSupport branch 2 times, most recently from 685d49e to 38b283e Compare April 29, 2026 18:12
@PorridgeSwim
Copy link
Copy Markdown
Collaborator Author

Range-diff: stack/RefactorMetadataTrackingLog (685d49e -> 38b283e)
spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala
@@ -1,11 +0,0 @@
-diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala b/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala
---- a/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala
-+++ b/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala
-  * Note: Please initialize this class using the companion object's `apply` method, which will
-  * assign correct values (`Set()` vs `None`) to [[readerFeatures]] and [[writerFeatures]].
-  */
--case class Protocol private (
-+case class Protocol (
-     minReaderVersion: Int,
-     minWriterVersion: Int,
-     @JsonInclude(Include.NON_ABSENT) // write to JSON only when the field is not `None`
\ No newline at end of file

Reproduce locally: git range-diff 0951054..685d49e b1fa08b..38b283e | Disable: git config gitstack.push-range-diff false

@PorridgeSwim PorridgeSwim force-pushed the stack/RefactorDeltaSourceMetadataEvolutionSupport branch 2 times, most recently from 67377eb to a80dfae Compare May 1, 2026 08:41
Copy link
Copy Markdown
Collaborator

@TimothyW553 TimothyW553 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a few comments.

for testing, existing tests pass through the v1 wrapper, so they would still pass even if the new static has a bug. Can we add one test that calls the static with an anonymous AbstractMetadata / AbstractProtocol directly? That is the test that proves the refactor is real.

p.minReaderVersion != readProtocolAtSourceInit.minReaderVersion ||
p.minWriterVersion != readProtocolAtSourceInit.minWriterVersion ||
p.readerFeatures != readProtocolAtSourceInit.readerFeatures ||
p.writerFeatures != readProtocolAtSourceInit.writerFeatures) ||
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if Protocol ever gets a 5th field, this check will silently miss it. how about an equalsByFields on AbstractProtocol?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds great to me

readSchemaAtSourceInit: StructType,
readPartitionSchemaAtSourceInit: StructType,
readConfigurationsAtSourceInit: Map[String, String],
spark: SparkSession): Boolean = {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

9 params is a lot. can we bundle the *AtSourceInit ones into a small case class before v2 calls in?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replaced it with readMetadataAtSourceInit

val configuration: Map[String, String] = oldMetadata.configuration +
(DeltaConfigs.COLUMN_MAPPING_MODE.key -> newMetadata.columnMappingMode.name)
val columnMappingMode: DeltaColumnMappingMode = newMetadata.columnMappingMode
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

9 lines of anonymous trait in the middle of a method. v2 will copy this pattern. can we extract a small AbstractMetadataAdapter case class?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already KernelMetadataAdapter in v2 so we won't copy this pattern. Because oldMetadata is of trait AbstractMetadata, we cannot call copy() on it, current code is the idiomatic way to "copy with overrides" on a trait

val schema: StructType = upgradedSchema
val partitionColumns: Seq[String] = oldMetadata.partitionColumns
val configuration: Map[String, String] = oldMetadata.configuration +
(DeltaConfigs.COLUMN_MAPPING_MODE.key -> newMetadata.columnMappingMode.name)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: old path wrote COLUMN_MAPPING_MAX_ID into config, this one forgets it. inert today, but breaks the 'no behavior change' claim - can we add it back?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added back, thank you for pointing it out

val startId = maxId
val newSchema =
SchemaMergingUtils.transformColumns(rawSchema)((path, field, _) => {
isOverwritingSchema: Boolean): (StructType, Long) = {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two StructType then two Map right next to each other - easy to swap new and old by mistake. group them into one case class?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changes to use abstractMetadata


/** Returns the partitionSchema as a [[StructType]] */
def partitionSchema: StructType =
new StructType(partitionColumns.map(c => schema(c)).toArray)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this rebuilds on every call, and the streaming path hits it once per metadata action. v1 Metadata already has a precomputed partitionSchema -- can the v1 adapter just override this?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The override is already added, you can check the file change in: spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala

@PorridgeSwim PorridgeSwim force-pushed the stack/RefactorDeltaSourceMetadataEvolutionSupport branch from a80dfae to e3eb104 Compare May 4, 2026 06:25
@PorridgeSwim
Copy link
Copy Markdown
Collaborator Author

Range-diff: stack/RefactorMetadataTrackingLog (a80dfae -> e3eb104)
spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaV2SourceSchemaEvolutionSuite.scala
@@ -0,0 +1,10 @@
+diff --git a/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaV2SourceSchemaEvolutionSuite.scala b/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaV2SourceSchemaEvolutionSuite.scala
+--- a/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaV2SourceSchemaEvolutionSuite.scala
++++ b/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaV2SourceSchemaEvolutionSuite.scala
+     "schema / checkpoint location unit tests - special characters in schema location",
+ 
+     // ========== Schema log core ==========
++    "detects metadata/protocol changes through the AbstractMetadata/AbstractProtocol surface",
+     "multiple delta source sharing same schema log is blocked",
+     "schema log is applied",
+     "concurrent schema log modification should be detected",
\ No newline at end of file
spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala
@@ -19,8 +19,9 @@
 -   *                                      mapping mode on a existing table
 -   * @return new metadata with Ids and physical names assigned
 +   * Core logic for assigning column IDs and physical names to a schema.
-+   * Takes raw schema and configuration inputs (no v1 Metadata dependency) so it can be
-+   * reused by both v1 and v2 connectors.
++   * Takes [[AbstractMetadata]] (no v1 Metadata dependency) so it can be reused by both v1
++   * and v2 connectors. Bundling schema + configuration on each side avoids the swap footgun
++   * of having two `StructType` and two `Map` parameters next to each other.
 +   *
 +   * @return (upgradedSchema, maxColumnId) - the schema with IDs/physical names assigned,
 +   *         and the final max column ID.
@@ -29,10 +30,8 @@
 -      newMetadata: Metadata,
 -      oldMetadata: Metadata,
 +  private[delta] def assignColumnIdAndPhysicalNameToSchema(
-+      newSchema: StructType,
-+      oldSchema: StructType,
-+      newConfiguration: Map[String, String],
-+      oldConfiguration: Map[String, String],
++      newMetadata: AbstractMetadata,
++      oldMetadata: AbstractMetadata,
        isChangingModeOnExistingTable: Boolean,
 -      isOverwritingSchema: Boolean): Metadata = {
 -    val rawSchema = newMetadata.schema
@@ -43,6 +42,10 @@
 -    val newSchema =
 -      SchemaMergingUtils.transformColumns(rawSchema)((path, field, _) => {
 +      isOverwritingSchema: Boolean): (StructType, Long) = {
++    val newSchema = newMetadata.schema
++    val oldSchema = oldMetadata.schema
++    val newConfiguration = newMetadata.configuration
++    val oldConfiguration = oldMetadata.configuration
 +    var maxId = DeltaConfigs.COLUMN_MAPPING_MAX_ID.fromMap(newConfiguration) max
 +      DeltaConfigs.COLUMN_MAPPING_MAX_ID.fromMap(oldConfiguration) max
 +      findMaxColumnId(newSchema)
@@ -102,9 +105,7 @@
 +      isChangingModeOnExistingTable: Boolean,
 +      isOverwritingSchema: Boolean): Metadata = {
 +    val (finalSchema, newMaxId) = assignColumnIdAndPhysicalNameToSchema(
-+      newMetadata.schema, oldMetadata.schema,
-+      newMetadata.configuration, oldMetadata.configuration,
-+      isChangingModeOnExistingTable, isOverwritingSchema)
++      newMetadata, oldMetadata, isChangingModeOnExistingTable, isOverwritingSchema)
  
      newMetadata.copy(
        schemaString = finalSchema.json,
@@ -135,9 +136,8 @@
 -          Map(DeltaConfigs.COLUMN_MAPPING_MODE.key -> newMetadata.columnMappingMode.name)
 -      )
 -      // use the same check
-+      val (upgradedSchema, _) = assignColumnIdAndPhysicalNameToSchema(
-+        oldMetadata.schema, oldMetadata.schema,
-+        oldMetadata.configuration, oldMetadata.configuration,
++      val (upgradedSchema, upgradedMaxId) = assignColumnIdAndPhysicalNameToSchema(
++        newMetadata = oldMetadata, oldMetadata = oldMetadata,
 +        isChangingModeOnExistingTable = true, isOverwritingSchema = false)
 +      // Construct an AbstractMetadata with the upgraded schema and the new column mapping mode
 +      // so the comparison utils below can recognize column mapping metadata.
@@ -148,7 +148,8 @@
 +        val schema: StructType = upgradedSchema
 +        val partitionColumns: Seq[String] = oldMetadata.partitionColumns
 +        val configuration: Map[String, String] = oldMetadata.configuration +
-+          (DeltaConfigs.COLUMN_MAPPING_MODE.key -> newMetadata.columnMappingMode.name)
++          (DeltaConfigs.COLUMN_MAPPING_MODE.key -> newMetadata.columnMappingMode.name,
++            DeltaConfigs.COLUMN_MAPPING_MAX_ID.key -> upgradedMaxId.toString)
 +        val columnMappingMode: DeltaColumnMappingMode = newMetadata.columnMappingMode
 +      }
        !hasColMappingOrPartitionSchemaChangeByMetadata(newMetadata, upgradedMetadata)
spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala
@@ -0,0 +1,17 @@
+diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala
+--- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala
++++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala
+    */
+   protected val readSchemaAtSourceInit: StructType = readSnapshotDescriptor.metadata.schema
+ 
+-  protected val readPartitionSchemaAtSourceInit: StructType =
+-    readSnapshotDescriptor.metadata.partitionSchema
+-
+   protected val readProtocolAtSourceInit: Protocol = readSnapshotDescriptor.protocol
+ 
+-  protected val readConfigurationsAtSourceInit: Map[String, String] =
+-    readSnapshotDescriptor.metadata.configuration
+-
+   /**
+    * Create a snapshot descriptor, customizing its metadata using metadata tracking if necessary
+    */
\ No newline at end of file
spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataEvolutionSupport.scala
@@ -72,9 +72,7 @@
 +      newSchemaVersion,
 +      persistedMetadataAtSourceInit,
 +      readProtocolAtSourceInit,
-+      readSchemaAtSourceInit,
-+      readPartitionSchemaAtSourceInit,
-+      readConfigurationsAtSourceInit,
++      readSnapshotDescriptor.metadata,
 +      spark)
    }
  
@@ -137,9 +135,9 @@
 +   * @param newSchemaVersion The version of the incoming change.
 +   * @param persistedMetadataAtSourceInit The persisted metadata at source init, if any.
 +   * @param readProtocolAtSourceInit The protocol at source init.
-+   * @param readSchemaAtSourceInit The schema at source init.
-+   * @param readPartitionSchemaAtSourceInit The partition schema at source init.
-+   * @param readConfigurationsAtSourceInit The table configurations at source init.
++   * @param readMetadataAtSourceInit The metadata at source init (schema, partition schema, and
++   *                                 configuration). Bundled to avoid the swap footgun of three
++   *                                 adjacent params.
 +   * @param spark The SparkSession (used for SQL conf checks).
 +   */
 +  def hasMetadataOrProtocolChangeComparedToStreamMetadata(
@@ -148,24 +146,18 @@
 +      newSchemaVersion: Long,
 +      persistedMetadataAtSourceInit: Option[PersistedMetadata],
 +      readProtocolAtSourceInit: AbstractProtocol,
-+      readSchemaAtSourceInit: StructType,
-+      readPartitionSchemaAtSourceInit: StructType,
-+      readConfigurationsAtSourceInit: Map[String, String],
++      readMetadataAtSourceInit: AbstractMetadata,
 +      spark: SparkSession): Boolean = {
 +    if (persistedMetadataAtSourceInit.exists(_.deltaCommitVersion >= newSchemaVersion)) {
 +      false
 +    } else {
-+      protocolChangeOpt.exists(p =>
-+        p.minReaderVersion != readProtocolAtSourceInit.minReaderVersion ||
-+          p.minWriterVersion != readProtocolAtSourceInit.minWriterVersion ||
-+          p.readerFeatures != readProtocolAtSourceInit.readerFeatures ||
-+          p.writerFeatures != readProtocolAtSourceInit.writerFeatures) ||
++      protocolChangeOpt.exists(p => !p.equalsByFields(readProtocolAtSourceInit)) ||
 +      metadataChangeOpt.exists { newMetadata =>
 +        hasSchemaChangeComparedToStreamMetadata(
-+          newMetadata.schema, readSchemaAtSourceInit, spark) ||
-+          newMetadata.partitionSchema != readPartitionSchemaAtSourceInit ||
++          newMetadata.schema, readMetadataAtSourceInit.schema, spark) ||
++          newMetadata.partitionSchema != readMetadataAtSourceInit.partitionSchema ||
 +          newMetadata.configuration.filterKeys(_.startsWith("delta.")).toMap !=
-+            readConfigurationsAtSourceInit.filterKeys(_.startsWith("delta.")).toMap
++            readMetadataAtSourceInit.configuration.filterKeys(_.startsWith("delta.")).toMap
 +      }
 +    }
 +  }
spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSchemaEvolutionSuite.scala
@@ -0,0 +1,99 @@
+diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSchemaEvolutionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSchemaEvolutionSuite.scala
+--- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSchemaEvolutionSuite.scala
++++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSchemaEvolutionSuite.scala
+ import org.apache.spark.sql.delta.actions.{Metadata, Protocol}
+ import org.apache.spark.sql.delta.sources._
+ import org.apache.spark.sql.delta.test.{DeltaColumnMappingSelectedTestMixin, DeltaSQLCommandTest}
++import org.apache.spark.sql.delta.v2.interop.{AbstractMetadata, AbstractProtocol}
+ import org.apache.spark.sql.delta.util.JsonUtils
+ import org.apache.commons.io.FileUtils
+ import org.apache.commons.lang3.exception.ExceptionUtils
+   with DeltaSourceSuiteBase with DeltaColumnMappingSelectedTestMixin with DeltaSQLCommandTest {
+ 
+   override protected def runOnlyTests: Seq[String] = Seq(
++    "detects metadata/protocol changes through the AbstractMetadata/AbstractProtocol surface",
+     "schema log initialization with additive schema changes",
+     "detect incompatible schema change while streaming",
+     "trigger.Once with deferred commit should work",
+     ))
+   }
+ 
++  test("detects metadata/protocol changes through the AbstractMetadata/AbstractProtocol " +
++      "surface") {
++    // Anonymous trait impls (not V1 Metadata/Protocol) prove the static actually relies on
++    // the abstract surface. The trait wrapper used in production always passes V1 types,
++    // so this is the only path that would catch a regression specific to non-V1 impls.
++    val baseSchema = new StructType().add("a", StringType, nullable = true)
++
++    def mkMetadata(
++        sch: StructType = baseSchema,
++        partCols: Seq[String] = Seq.empty,
++        conf: Map[String, String] = Map.empty): AbstractMetadata = new AbstractMetadata {
++      override def id: String = "tid"
++      override def name: String = ""
++      override def description: String = ""
++      override def schema: StructType = sch
++      override def partitionColumns: Seq[String] = partCols
++      override def configuration: Map[String, String] = conf
++      override def columnMappingMode: DeltaColumnMappingMode = NoMapping
++    }
++
++    def mkProtocol(
++        readerV: Int = 1,
++        writerV: Int = 2,
++        readerFs: Option[Set[String]] = None,
++        writerFs: Option[Set[String]] = None): AbstractProtocol = new AbstractProtocol {
++      override def minReaderVersion: Int = readerV
++      override def minWriterVersion: Int = writerV
++      override def readerFeatures: Option[Set[String]] = readerFs
++      override def writerFeatures: Option[Set[String]] = writerFs
++    }
++
++    val readMetadata = mkMetadata()
++    val readProtocol = mkProtocol()
++
++    def call(
++        metadataChange: Option[AbstractMetadata] = None,
++        protocolChange: Option[AbstractProtocol] = None,
++        newVer: Long = 1L,
++        persisted: Option[PersistedMetadata] = None): Boolean =
++      DeltaSourceMetadataEvolutionSupport.hasMetadataOrProtocolChangeComparedToStreamMetadata(
++        metadataChange, protocolChange, newVer, persisted, readProtocol, readMetadata, spark)
++
++    // No change: both sides identical anonymous impls -> false.
++    assert(!call(metadataChange = Some(mkMetadata()), protocolChange = Some(mkProtocol())))
++
++    // Schema differs.
++    assert(call(metadataChange =
++      Some(mkMetadata(sch = baseSchema.add("b", StringType, nullable = true)))))
++
++    // Partition schema differs (same data schema, different partition columns).
++    assert(call(metadataChange = Some(mkMetadata(partCols = Seq("a")))))
++
++    // delta.* configuration differs.
++    assert(call(metadataChange = Some(mkMetadata(conf = Map("delta.foo" -> "bar")))))
++
++    // Non-delta.* configuration differs -> filtered out, no change.
++    assert(!call(metadataChange = Some(mkMetadata(conf = Map("foo" -> "bar")))))
++
++    // Protocol differs by a single field (uses equalsByFields under the hood).
++    assert(call(protocolChange = Some(mkProtocol(readerV = 2))))
++
++    // Persisted metadata is at or beyond newSchemaVersion -> short-circuits to false even if
++    // every other input would otherwise indicate a change.
++    val persisted = PersistedMetadata(
++      tableId = "tid",
++      deltaCommitVersion = 5L,
++      dataSchemaJson = baseSchema.json,
++      partitionSchemaJson = new StructType().json,
++      sourceMetadataPath = "")
++    assert(!call(
++      metadataChange = Some(mkMetadata(sch = baseSchema.add("b", StringType, nullable = true))),
++      protocolChange = Some(mkProtocol(readerV = 99)),
++      newVer = 3L,
++      persisted = Some(persisted)))
++  }
++
+   test("forward-compat: older version can read back newer JSON") {
+     val newSchema = PersistedMetadata(
+       tableId = "test",
\ No newline at end of file

Reproduce locally: git range-diff dd2dc63..a80dfae ba9972a..e3eb104 | Disable: git config gitstack.push-range-diff false

Copy link
Copy Markdown
Collaborator

@TimothyW553 TimothyW553 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. but a note before complete approval: please add a test to validate the hasNoColumnMappingSchemaChanges in DeltaColumnMapping.

* no rename column or drop column has happened in-between.
*/
def hasNoColumnMappingSchemaChanges(newMetadata: Metadata, oldMetadata: Metadata,
def hasNoColumnMappingSchemaChanges(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add one test that calls hasNoColumnMappingSchemaChanges with non-Metadata AbstractMetadata inputs, especially the NoMapping -> NameMapping upgrade path, so we prove this refactor actually works for the V2 caller?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rough idea: add a small DeltaColumnMappingSuite test with anonymous AbstractMetadata wrappers around the same schemas/configs, then call DeltaColumnMapping.hasNoColumnMappingSchemaChanges(newAbstractMetadata, oldAbstractMetadata) and assert the expected true/false result.

@PorridgeSwim PorridgeSwim force-pushed the stack/RefactorDeltaSourceMetadataEvolutionSupport branch from e3eb104 to 28bb702 Compare May 5, 2026 20:47
))
}

test("detects metadata/protocol changes through the AbstractMetadata/AbstractProtocol " +
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a unit test, perhaps it should live in DeltaSourceMetadataEvolutionSupportSuite.scala.

/**
* Check that the given schema is the same as the schema from the initial read snapshot.
*/
def hasSchemaChangeComparedToStreamMetadata(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be private.

@PorridgeSwim PorridgeSwim force-pushed the stack/RefactorDeltaSourceMetadataEvolutionSupport branch from 28bb702 to 83962e6 Compare May 5, 2026 22:10
@PorridgeSwim
Copy link
Copy Markdown
Collaborator Author

Range-diff: stack/RefactorMetadataTrackingLog (28bb702 -> 83962e6)
spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataEvolutionSupport.scala
@@ -165,7 +165,7 @@
 +  /**
 +   * Check that the given schema is the same as the schema from the initial read snapshot.
 +   */
-+  def hasSchemaChangeComparedToStreamMetadata(
++  private def hasSchemaChangeComparedToStreamMetadata(
 +      newSchema: StructType,
 +      readSchemaAtSourceInit: StructType,
 +      spark: SparkSession): Boolean = {
spark/src/test/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataEvolutionSupportSuite.scala
@@ -0,0 +1,98 @@
+diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataEvolutionSupportSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataEvolutionSupportSuite.scala
+--- a/spark/src/test/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataEvolutionSupportSuite.scala
++++ b/spark/src/test/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataEvolutionSupportSuite.scala
+ 
+ package org.apache.spark.sql.delta.sources
+ 
+-import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaOptions, DeltaTestUtilsBase, DeltaThrowable}
++import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaColumnMappingMode, DeltaOptions}
++import org.apache.spark.sql.delta.{DeltaTestUtilsBase, DeltaThrowable, NoMapping}
++import org.apache.spark.sql.delta.v2.interop.{AbstractMetadata, AbstractProtocol}
+ 
+ import org.apache.spark.{SparkConf, SparkFunSuite}
+ import org.apache.spark.sql.test.SharedSparkSession
+-import org.apache.spark.sql.types.StructType
++import org.apache.spark.sql.types.{StringType, StructType}
+ 
+ /**
+  * Unit tests covering `DeltaSourceMetadataEvolutionSupport`, which detects non-additive schema
+       previousSchema = persistedMetadata("a byte", Map.empty)
+     )
+   }
++
++  test("detects metadata/protocol changes through the AbstractMetadata/AbstractProtocol " +
++      "surface") {
++    // Anonymous trait impls (not V1 Metadata/Protocol) prove the static actually relies on
++    // the abstract surface. The trait wrapper used in production always passes V1 types,
++    // so this is the only path that would catch a regression specific to non-V1 impls.
++    val baseSchema = new StructType().add("a", StringType, nullable = true)
++
++    def mkMetadata(
++        sch: StructType = baseSchema,
++        partCols: Seq[String] = Seq.empty,
++        conf: Map[String, String] = Map.empty): AbstractMetadata = new AbstractMetadata {
++      override def id: String = "tid"
++      override def name: String = ""
++      override def description: String = ""
++      override def schema: StructType = sch
++      override def partitionColumns: Seq[String] = partCols
++      override def configuration: Map[String, String] = conf
++      override def columnMappingMode: DeltaColumnMappingMode = NoMapping
++    }
++
++    def mkProtocol(
++        readerV: Int = 1,
++        writerV: Int = 2,
++        readerFs: Option[Set[String]] = None,
++        writerFs: Option[Set[String]] = None): AbstractProtocol = new AbstractProtocol {
++      override def minReaderVersion: Int = readerV
++      override def minWriterVersion: Int = writerV
++      override def readerFeatures: Option[Set[String]] = readerFs
++      override def writerFeatures: Option[Set[String]] = writerFs
++    }
++
++    val readMetadata = mkMetadata()
++    val readProtocol = mkProtocol()
++
++    def call(
++        metadataChange: Option[AbstractMetadata] = None,
++        protocolChange: Option[AbstractProtocol] = None,
++        newVer: Long = 1L,
++        persisted: Option[PersistedMetadata] = None): Boolean =
++      DeltaSourceMetadataEvolutionSupport.hasMetadataOrProtocolChangeComparedToStreamMetadata(
++        metadataChange, protocolChange, newVer, persisted, readProtocol, readMetadata, spark)
++
++    // No change: both sides identical anonymous impls -> false.
++    assert(!call(metadataChange = Some(mkMetadata()), protocolChange = Some(mkProtocol())))
++
++    // Schema differs.
++    assert(call(metadataChange =
++      Some(mkMetadata(sch = baseSchema.add("b", StringType, nullable = true)))))
++
++    // Partition schema differs (same data schema, different partition columns).
++    assert(call(metadataChange = Some(mkMetadata(partCols = Seq("a")))))
++
++    // delta.* configuration differs.
++    assert(call(metadataChange = Some(mkMetadata(conf = Map("delta.foo" -> "bar")))))
++
++    // Non-delta.* configuration differs -> filtered out, no change.
++    assert(!call(metadataChange = Some(mkMetadata(conf = Map("foo" -> "bar")))))
++
++    // Protocol differs by a single field (uses equalsByFields under the hood).
++    assert(call(protocolChange = Some(mkProtocol(readerV = 2))))
++
++    // Persisted metadata is at or beyond newSchemaVersion -> short-circuits to false even if
++    // every other input would otherwise indicate a change.
++    val persisted = PersistedMetadata(
++      tableId = "tid",
++      deltaCommitVersion = 5L,
++      dataSchemaJson = baseSchema.json,
++      partitionSchemaJson = new StructType().json,
++      sourceMetadataPath = "")
++    assert(!call(
++      metadataChange = Some(mkMetadata(sch = baseSchema.add("b", StringType, nullable = true))),
++      protocolChange = Some(mkProtocol(readerV = 99)),
++      newVer = 3L,
++      persisted = Some(persisted)))
++  }
+ }
\ No newline at end of file
spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaV2SourceSchemaEvolutionSuite.scala
@@ -1,10 +0,0 @@
-diff --git a/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaV2SourceSchemaEvolutionSuite.scala b/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaV2SourceSchemaEvolutionSuite.scala
---- a/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaV2SourceSchemaEvolutionSuite.scala
-+++ b/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaV2SourceSchemaEvolutionSuite.scala
-     "schema / checkpoint location unit tests - special characters in schema location",
- 
-     // ========== Schema log core ==========
-+    "detects metadata/protocol changes through the AbstractMetadata/AbstractProtocol surface",
-     "multiple delta source sharing same schema log is blocked",
-     "schema log is applied",
-     "concurrent schema log modification should be detected",
\ No newline at end of file
spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSchemaEvolutionSuite.scala
@@ -1,99 +0,0 @@
-diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSchemaEvolutionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSchemaEvolutionSuite.scala
---- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSchemaEvolutionSuite.scala
-+++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSchemaEvolutionSuite.scala
- import org.apache.spark.sql.delta.actions.{Metadata, Protocol}
- import org.apache.spark.sql.delta.sources._
- import org.apache.spark.sql.delta.test.{DeltaColumnMappingSelectedTestMixin, DeltaSQLCommandTest}
-+import org.apache.spark.sql.delta.v2.interop.{AbstractMetadata, AbstractProtocol}
- import org.apache.spark.sql.delta.util.JsonUtils
- import org.apache.commons.io.FileUtils
- import org.apache.commons.lang3.exception.ExceptionUtils
-   with DeltaSourceSuiteBase with DeltaColumnMappingSelectedTestMixin with DeltaSQLCommandTest {
- 
-   override protected def runOnlyTests: Seq[String] = Seq(
-+    "detects metadata/protocol changes through the AbstractMetadata/AbstractProtocol surface",
-     "schema log initialization with additive schema changes",
-     "detect incompatible schema change while streaming",
-     "trigger.Once with deferred commit should work",
-     ))
-   }
- 
-+  test("detects metadata/protocol changes through the AbstractMetadata/AbstractProtocol " +
-+      "surface") {
-+    // Anonymous trait impls (not V1 Metadata/Protocol) prove the static actually relies on
-+    // the abstract surface. The trait wrapper used in production always passes V1 types,
-+    // so this is the only path that would catch a regression specific to non-V1 impls.
-+    val baseSchema = new StructType().add("a", StringType, nullable = true)
-+
-+    def mkMetadata(
-+        sch: StructType = baseSchema,
-+        partCols: Seq[String] = Seq.empty,
-+        conf: Map[String, String] = Map.empty): AbstractMetadata = new AbstractMetadata {
-+      override def id: String = "tid"
-+      override def name: String = ""
-+      override def description: String = ""
-+      override def schema: StructType = sch
-+      override def partitionColumns: Seq[String] = partCols
-+      override def configuration: Map[String, String] = conf
-+      override def columnMappingMode: DeltaColumnMappingMode = NoMapping
-+    }
-+
-+    def mkProtocol(
-+        readerV: Int = 1,
-+        writerV: Int = 2,
-+        readerFs: Option[Set[String]] = None,
-+        writerFs: Option[Set[String]] = None): AbstractProtocol = new AbstractProtocol {
-+      override def minReaderVersion: Int = readerV
-+      override def minWriterVersion: Int = writerV
-+      override def readerFeatures: Option[Set[String]] = readerFs
-+      override def writerFeatures: Option[Set[String]] = writerFs
-+    }
-+
-+    val readMetadata = mkMetadata()
-+    val readProtocol = mkProtocol()
-+
-+    def call(
-+        metadataChange: Option[AbstractMetadata] = None,
-+        protocolChange: Option[AbstractProtocol] = None,
-+        newVer: Long = 1L,
-+        persisted: Option[PersistedMetadata] = None): Boolean =
-+      DeltaSourceMetadataEvolutionSupport.hasMetadataOrProtocolChangeComparedToStreamMetadata(
-+        metadataChange, protocolChange, newVer, persisted, readProtocol, readMetadata, spark)
-+
-+    // No change: both sides identical anonymous impls -> false.
-+    assert(!call(metadataChange = Some(mkMetadata()), protocolChange = Some(mkProtocol())))
-+
-+    // Schema differs.
-+    assert(call(metadataChange =
-+      Some(mkMetadata(sch = baseSchema.add("b", StringType, nullable = true)))))
-+
-+    // Partition schema differs (same data schema, different partition columns).
-+    assert(call(metadataChange = Some(mkMetadata(partCols = Seq("a")))))
-+
-+    // delta.* configuration differs.
-+    assert(call(metadataChange = Some(mkMetadata(conf = Map("delta.foo" -> "bar")))))
-+
-+    // Non-delta.* configuration differs -> filtered out, no change.
-+    assert(!call(metadataChange = Some(mkMetadata(conf = Map("foo" -> "bar")))))
-+
-+    // Protocol differs by a single field (uses equalsByFields under the hood).
-+    assert(call(protocolChange = Some(mkProtocol(readerV = 2))))
-+
-+    // Persisted metadata is at or beyond newSchemaVersion -> short-circuits to false even if
-+    // every other input would otherwise indicate a change.
-+    val persisted = PersistedMetadata(
-+      tableId = "tid",
-+      deltaCommitVersion = 5L,
-+      dataSchemaJson = baseSchema.json,
-+      partitionSchemaJson = new StructType().json,
-+      sourceMetadataPath = "")
-+    assert(!call(
-+      metadataChange = Some(mkMetadata(sch = baseSchema.add("b", StringType, nullable = true))),
-+      protocolChange = Some(mkProtocol(readerV = 99)),
-+      newVer = 3L,
-+      persisted = Some(persisted)))
-+  }
-+
-   test("forward-compat: older version can read back newer JSON") {
-     val newSchema = PersistedMetadata(
-       tableId = "test",
\ No newline at end of file

Reproduce locally: git range-diff 8378d33..28bb702 8378d33..83962e6 | Disable: git config gitstack.push-range-diff false

@PorridgeSwim PorridgeSwim force-pushed the stack/RefactorDeltaSourceMetadataEvolutionSupport branch from 83962e6 to 9036543 Compare May 5, 2026 22:25
@PorridgeSwim
Copy link
Copy Markdown
Collaborator Author

Range-diff: stack/RefactorMetadataTrackingLog (83962e6 -> 9036543)
spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala
@@ -0,0 +1,67 @@
+diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala
+--- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala
++++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala
+ import org.apache.spark.sql.delta.sources.DeltaSQLConf
+ import org.apache.spark.sql.delta.test.{DeltaSQLCommandTest, DeltaSQLTestUtils}
+ import org.apache.spark.sql.delta.test.DeltaTestImplicits._
++import org.apache.spark.sql.delta.v2.interop.AbstractMetadata
+ import org.apache.hadoop.fs.Path
+ import org.apache.parquet.format.converter.ParquetMetadataConverter
+ import org.apache.parquet.hadoop.ParquetFileReader
+     }
+   }
+ 
++  test("hasNoColumnMappingSchemaChanges accepts non-Metadata AbstractMetadata inputs") {
++    // Anonymous AbstractMetadata impls (not the V1 Metadata action) prove the API actually
++    // relies on the abstract surface. Especially exercises the NoMapping -> NameMapping upgrade
++    // branch, which synthesizes a fresh AbstractMetadata internally and reads back from it.
++    def mkAbstractMetadata(
++        sch: StructType,
++        mode: DeltaColumnMappingMode,
++        conf: Map[String, String] = Map.empty): AbstractMetadata = new AbstractMetadata {
++      override def id: String = "tid"
++      override def name: String = ""
++      override def description: String = ""
++      override def schema: StructType = sch
++      override def partitionColumns: Seq[String] = Seq.empty
++      override def configuration: Map[String, String] = conf
++      override def columnMappingMode: DeltaColumnMappingMode = mode
++    }
++
++    val baseSchema = new StructType().add("a", IntegerType).add("b", IntegerType)
++    val oldNoMapping = mkAbstractMetadata(baseSchema, NoMapping)
++
++    // Upgrade with no other change: synthesized post-upgrade schema uses logical names as
++    // physical names, matching what we pass for `new` -> read-compatible.
++    val newNameMappingSameSchema = mkAbstractMetadata(
++      DeltaColumnMapping.setPhysicalNames(
++        baseSchema, Map(Seq("a") -> "a", Seq("b") -> "b")),
++      NameMapping)
++    assert(DeltaColumnMapping.hasNoColumnMappingSchemaChanges(
++      newNameMappingSameSchema, oldNoMapping))
++
++    // Upgrade + drop: detected as a non-additive change.
++    val newNameMappingDropped = mkAbstractMetadata(
++      DeltaColumnMapping.setPhysicalNames(
++        new StructType().add("a", IntegerType), Map(Seq("a") -> "a")),
++      NameMapping)
++    assert(!DeltaColumnMapping.hasNoColumnMappingSchemaChanges(
++      newNameMappingDropped, oldNoMapping))
++
++    // Upgrade + rename (renamed column's physical name diverges from its logical name).
++    val newNameMappingRenamed = mkAbstractMetadata(
++      DeltaColumnMapping.setPhysicalNames(
++        new StructType().add("c", IntegerType).add("b", IntegerType),
++        Map(Seq("c") -> "a", Seq("b") -> "b")),
++      NameMapping)
++    assert(!DeltaColumnMapping.hasNoColumnMappingSchemaChanges(
++      newNameMappingRenamed, oldNoMapping))
++
++    // Downgrade NameMapping -> NoMapping is prohibited.
++    assert(!DeltaColumnMapping.hasNoColumnMappingSchemaChanges(
++      oldNoMapping, newNameMappingSameSchema))
++  }
++
+   testColumnMapping("create table through raw schema API should " +
+     "auto bump the version and retain input metadata") { mode =>
+ 
\ No newline at end of file

Reproduce locally: git range-diff 8378d33..83962e6 8378d33..9036543 | Disable: git config gitstack.push-range-diff false

TimothyW553 pushed a commit that referenced this pull request May 5, 2026
## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/6546/files) to
review incremental changes.
-
[**stack/SparkMetadataAdapter**](#6546)
[[Files changed](https://github.com/delta-io/delta/pull/6546/files)]
-
[stack/RefactorMetadataTrackingLog](#6550)
[[Files
changed](https://github.com/delta-io/delta/pull/6550/files/9271a6262f7a2615b977de0319c7238044b7d0a9..8378d33acda70a34a109b35173a968a4b3401ec1)]
-
[stack/RefactorDeltaSourceMetadataEvolutionSupport](#6562)
[[Files
changed](https://github.com/delta-io/delta/pull/6562/files/8378d33acda70a34a109b35173a968a4b3401ec1..90365431b12640de181446ec9c2033fb1b143b03)]
-
[stack/MetadataEvolutionHandler2](#6563)
[[Files
changed](https://github.com/delta-io/delta/pull/6563/files/28bb7021adb12b055e1b281fdfee0ab48a8732ac..578870181fa81a9146b2fa907244e350ffcabb52)]
-
[stack/NonAdditiveSchemaEvolution2](#6570)
[[Files
changed](https://github.com/delta-io/delta/pull/6570/files/578870181fa81a9146b2fa907244e350ffcabb52..c025b7c3c386e8d46d6142d0727dce95582bb0ef)]
-
[stack/NonAdditiveSchemaEvolution3](#6697)
[[Files
changed](https://github.com/delta-io/delta/pull/6697/files/c025b7c3c386e8d46d6142d0727dce95582bb0ef..db16b9fa80a80c105430c93589126ba8b828458f)]
-
[stack/consecutiveSchemaChangesMerger](#6698)
[[Files
changed](https://github.com/delta-io/delta/pull/6698/files/0148020ffe11e7b079e99fa8c5189a19c354f2be..9a360aa819f20d78b5361b2e997d24433fb793d5)]

---------
#### Which Delta project/connector is this regarding?

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

PR 1/7 in the non-additive schema evolution for V2 streaming connector
stack.

The shared V1 Scala utilities (`DeltaColumnMapping`,
`DeltaSourceMetadataEvolutionSupport`) operate on
`AbstractMetadata`/`AbstractProtocol`, but V2 holds Kernel types. This
PR creates two adapter classes that bridge the gap:

- `KernelMetadataAdapter`: Kernel `Metadata` → `AbstractMetadata`
(schema conversion via `SchemaUtils`, partition columns and
configuration converted to Scala collections)
- `KernelProtocolAdapter`: Kernel `Protocol` → `AbstractProtocol` (maps
reader/writer features to `Option[Set[String]]`)

Also adds `columnMappingMode` and `partitionSchema` to the
`AbstractMetadata` trait — V1's `Metadata` already had these fields, the
trait just didn't expose them.

## How was this patch tested?

Unit tests in `ActionAdaptersTest.java`: table-features protocol, legacy
protocol, full metadata round-trip, null optional fields, and null
constructor rejection.

## Does this PR introduce _any_ user-facing changes?

No.
@PorridgeSwim PorridgeSwim force-pushed the stack/RefactorDeltaSourceMetadataEvolutionSupport branch 2 times, most recently from 624a83e to 027984b Compare May 6, 2026 01:34
TimothyW553 pushed a commit that referenced this pull request May 6, 2026
…#6550)

## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/6550/files) to
review incremental changes.
-
[stack/SparkMetadataAdapter](#6546)
[[Files changed](https://github.com/delta-io/delta/pull/6546/files)]
[MERGED]
-
[**stack/RefactorMetadataTrackingLog**](#6550)
[[Files changed](https://github.com/delta-io/delta/pull/6550/files)]
-
[stack/RefactorDeltaSourceMetadataEvolutionSupport](#6562)
[[Files
changed](https://github.com/delta-io/delta/pull/6562/files/953f137f8c4ce46d8b8a9605b0c7bed898e30df4..027984b6edcbad0f4731e560425c2ed9bcf8fc27)]
-
[stack/MetadataEvolutionHandler2](#6563)
[[Files
changed](https://github.com/delta-io/delta/pull/6563/files/027984b6edcbad0f4731e560425c2ed9bcf8fc27..ada845895139edcb2727a87b39922c8e16837a99)]
-
[stack/NonAdditiveSchemaEvolution2](#6570)
[[Files
changed](https://github.com/delta-io/delta/pull/6570/files/ada845895139edcb2727a87b39922c8e16837a99..476762fde7b9cb9b9bc3e416c86a260cd29806ed)]
-
[stack/NonAdditiveSchemaEvolution3](#6697)
[[Files
changed](https://github.com/delta-io/delta/pull/6697/files/476762fde7b9cb9b9bc3e416c86a260cd29806ed..13395a7f2a49db4962091e8ee919bebdab5bd4e2)]
-
[stack/consecutiveSchemaChangesMerger](#6698)
[[Files
changed](https://github.com/delta-io/delta/pull/6698/files/13395a7f2a49db4962091e8ee919bebdab5bd4e2..f22ba063eaf35ab69d653a2d5faefdc52f35eab5)]

---------
#### Which Delta project/connector is this regarding?

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

PR 2/7 in the non-additive schema evolution for V2 streaming connector
stack.

Decouple `DeltaSourceMetadataTrackingLog` and `PersistedMetadata` from
V1-specific types so the schema log can be reused by the V2 connector.

- Replace `SnapshotDescriptor` parameter in `create()` with plain
`sourceTableId` and `sourceDataPath` strings
- Unify `PersistedMetadata.apply` to accept
`AbstractMetadata`/`AbstractProtocol` instead of V1
`Metadata`/`Protocol`
- Extract the consecutive schema changes merger (V1-specific, depends on
`DeltaLog`) out of the companion object into
`DeltaSourceMetadataEvolutionSupport`, and inject it as a function
parameter so V2 can provide its own implementation
- Remove `Protocol`'s `private` constructor modifier to allow
construction from abstract protocol fields

All changes are structural refactors with no behavioral change.

## How was this patch tested?

Existing tests in `DeltaSourceSchemaEvolutionSuite` updated to use the
new API. No behavioral changes.

## Does this PR introduce _any_ user-facing changes?

No.
@PorridgeSwim PorridgeSwim force-pushed the stack/RefactorDeltaSourceMetadataEvolutionSupport branch from 027984b to 145eaf3 Compare May 6, 2026 04:38
@PorridgeSwim PorridgeSwim force-pushed the stack/RefactorDeltaSourceMetadataEvolutionSupport branch from 145eaf3 to ed92a0f Compare May 6, 2026 17:56
@murali-db murali-db merged commit 8ec947a into delta-io:master May 6, 2026
31 checks passed
@PorridgeSwim PorridgeSwim mentioned this pull request May 10, 2026
murali-db pushed a commit that referenced this pull request May 11, 2026
## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/6563/files) to
review incremental changes.
-
[stack/SparkMetadataAdapter](#6546)
[[Files changed](https://github.com/delta-io/delta/pull/6546/files)]
[MERGED]
-
[stack/RefactorMetadataTrackingLog](#6550)
[[Files changed](https://github.com/delta-io/delta/pull/6550/files)]
[MERGED]
-
[stack/RefactorDeltaSourceMetadataEvolutionSupport](#6562)
[[Files changed](https://github.com/delta-io/delta/pull/6562/files)]
[MERGED]
-
[**stack/MetadataEvolutionHandler2**](#6563)
[[Files changed](https://github.com/delta-io/delta/pull/6563/files)]
-
[stack/NonAdditiveSchemaEvolution2](#6570)
[[Files
changed](https://github.com/delta-io/delta/pull/6570/files/a20f1f3ab452a75fc954e15c57c17327e0cb9267..0e07f87285becd6be416450ae084df454d9c94a9)]
-
[stack/NonAdditiveSchemaEvolution3](#6697)
[[Files
changed](https://github.com/delta-io/delta/pull/6697/files/0e07f87285becd6be416450ae084df454d9c94a9..73e1aa7f4162a3e1480ffd2b88b9ca79d852f2fe)]
-
[stack/consecutiveSchemaChangesMerger](#6698)
[[Files
changed](https://github.com/delta-io/delta/pull/6698/files/73e1aa7f4162a3e1480ffd2b88b9ca79d852f2fe..5e5d260b64d45cc11bcfdb58e5aab1b2d2637b33)]
- [stack/V1V2MixTest](#6759)
[[Files
changed](https://github.com/delta-io/delta/pull/6759/files/5e5d260b64d45cc11bcfdb58e5aab1b2d2637b33..738379713040986c74f98dbebfdc6c83ec1d3f16)]

---------
#### Which Delta project/connector is this regarding?

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

PR 4/7 in the non-additive schema evolution for V2 streaming connector
stack.

Introduce `MetadataEvolutionHandler`, a Java class that implements the
V1 barrier protocol for schema evolution in the V2 connector. In V1 this
logic lives in `DeltaSourceMetadataEvolutionSupport`, a Scala trait
mixed into `DeltaSource` that accesses stream state via `this`. Since
V2's `SparkMicroBatchStream` is Java and cannot use Scala trait mixins,
`MetadataEvolutionHandler` receives all dependencies via constructor
injection instead.

The handler covers the full schema evolution lifecycle:
- **Stream start**: eager metadata tracking log initialization on first
batch
- **Offset generation**: injects `METADATA_CHANGE_INDEX` /
`POST_METADATA_CHANGE_INDEX` barrier sentinels into the file change
iterator
- **Pending schema offsets**: returns barrier offsets for in-progress
schema changes
- **Batch commit**: updates the schema log and throws
`DELTA_STREAMING_METADATA_EVOLUTION` to trigger stream restart
- **Batch planning on restart**: validates and re-initializes the schema
log

All detection logic delegates to the shared
`DeltaSourceMetadataEvolutionSupport$` companion object statics
(refactored in PR 3/7). V2-specific orchestration is limited to wiring
the barrier protocol into the `CloseableIterator<IndexedFile>` pipeline
and collecting metadata/protocol from Kernel commit ranges via
`StreamingHelper`.

Also extends `StreamingHelper` with
`getMetadataAndProtocolForVersionRange` to collect metadata and protocol
actions from a range of Kernel commits.

## How was this patch tested?

Unit tests in `MetadataEvolutionHandlerTest.java` covering: barrier
protocol (METADATA_CHANGE_INDEX / POST_METADATA_CHANGE_INDEX offset
generation), tracking state transitions, initialization lifecycle,
offset arithmetic, pending schema change handling, and commit-time
evolution exception.

## Does this PR introduce _any_ user-facing changes?

No.
murali-db pushed a commit that referenced this pull request May 16, 2026
## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/6570/files) to
review incremental changes.
-
[stack/SparkMetadataAdapter](#6546)
[[Files changed](https://github.com/delta-io/delta/pull/6546/files)]
[MERGED]
-
[stack/RefactorMetadataTrackingLog](#6550)
[[Files changed](https://github.com/delta-io/delta/pull/6550/files)]
[MERGED]
-
[stack/RefactorDeltaSourceMetadataEvolutionSupport](#6562)
[[Files changed](https://github.com/delta-io/delta/pull/6562/files)]
[MERGED]
-
[stack/MetadataEvolutionHandler2](#6563)
[[Files changed](https://github.com/delta-io/delta/pull/6563/files)]
[MERGED]
-
[**stack/NonAdditiveSchemaEvolution2**](#6570)
[[Files changed](https://github.com/delta-io/delta/pull/6570/files)]
-
[stack/NonAdditiveSchemaEvolution3](#6697)
[[Files
changed](https://github.com/delta-io/delta/pull/6697/files/b7f6c8ebfc0882e7e2cc580f09f376be23a8d43d..dbb6246c14be1ab7f017ad9fc26455ae599ee676)]
-
[stack/consecutiveSchemaChangesMerger](#6698)
[[Files
changed](https://github.com/delta-io/delta/pull/6698/files/dbb6246c14be1ab7f017ad9fc26455ae599ee676..4bf2fa3fa828bcab0b56c4c26ca51ee9cc40b482)]
-
[stack/SchemaTrackingWithCDC](#6801)
[[Files
changed](https://github.com/delta-io/delta/pull/6801/files/4bf2fa3fa828bcab0b56c4c26ca51ee9cc40b482..a78a4ac2bc9a52605278a36b98804230258c12a2)]
- [stack/V1V2MixTest](#6759)
[[Files
changed](https://github.com/delta-io/delta/pull/6759/files/7f9b7f2724b2245ab7380908616303cf7ea95fca..e146cdc9ebb0572e8b0a928cc6dd3bfdc198d984)]

---------
#### Which Delta project/connector is this regarding?

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

PR 5/7 in the non-additive schema evolution for V2 streaming connector
stack.

Wire schema tracking into V2's analysis path so the analyzed plan
reflects the persisted (evolved) schema instead of the live snapshot
schema.

- `DeltaAnalysis.verifyDeltaSourceSchemaLocation`: extend the
duplicate-schema-location check to also visit `StreamingRelationV2`,
keyed on the V2 `Table.name`.
- `SparkTable`: open `DeltaSourceMetadataTrackingLog` once during
construction (gated on `mergeConsecutiveSchemaChanges`) and seed
`SchemaProvider` from the persisted metadata, so analysis-time
`schema()` matches what the stream will read at runtime.
- `ApplyV2ReadOptions` (renamed from `ApplyV2Streaming`): generalize the
CDC-only rebuild to also fire when `schemaTrackingLocation` arrives via
`extraOptions` on the catalog `readStream.table()` path; rebuild
`SparkTable` with merged options so the schema-log lookup actually
fires.
- `MetadataEvolutionHandler.getMetadataTrackingLogForMicroBatchStream`:
V2 port of V1's helper, reused by `SparkTable` (analysis) and
`SparkScan` (execution).

## How was this patch tested?

`SparkTableTest`, `MetadataEvolutionHandlerTest`,
`ApplyV2ReadOptionsSuite`. Unified `DeltaV2SourceSchemaEvolutionSuite`
updated.

## Does this PR introduce _any_ user-facing changes?

No.
murali-db pushed a commit that referenced this pull request May 16, 2026
…6697)

## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/6697/files) to
review incremental changes.
-
[stack/SparkMetadataAdapter](#6546)
[[Files changed](https://github.com/delta-io/delta/pull/6546/files)]
[MERGED]
-
[stack/RefactorMetadataTrackingLog](#6550)
[[Files changed](https://github.com/delta-io/delta/pull/6550/files)]
[MERGED]
-
[stack/RefactorDeltaSourceMetadataEvolutionSupport](#6562)
[[Files changed](https://github.com/delta-io/delta/pull/6562/files)]
[MERGED]
-
[stack/MetadataEvolutionHandler2](#6563)
[[Files changed](https://github.com/delta-io/delta/pull/6563/files)]
[MERGED]
-
[stack/NonAdditiveSchemaEvolution2](#6570)
[[Files changed](https://github.com/delta-io/delta/pull/6570/files)]
[MERGED]
-
[**stack/NonAdditiveSchemaEvolution3**](#6697)
[[Files changed](https://github.com/delta-io/delta/pull/6697/files)]
-
[stack/consecutiveSchemaChangesMerger](#6698)
[[Files
changed](https://github.com/delta-io/delta/pull/6698/files/f96643aa3cc01e7f70cc13a18b82dc27f277f11d..f612628ad931ec35c237801109f01b6fbd1379f7)]
-
[stack/SchemaTrackingWithCDC](#6801)
[[Files
changed](https://github.com/delta-io/delta/pull/6801/files/f612628ad931ec35c237801109f01b6fbd1379f7..4aeacfb120b33e9cdfe124352290b72f53f7cf89)]
- [stack/V1V2MixTest](#6759)
[[Files
changed](https://github.com/delta-io/delta/pull/6759/files/f612628ad931ec35c237801109f01b6fbd1379f7..0c818ee431ab417a4f2ffbcc609930be09d25031)]

---------
#### Which Delta project/connector is this regarding?

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

PR 6/7 in the non-additive schema evolution for V2 streaming connector
stack.

Wire `MetadataEvolutionHandler` into `SparkMicroBatchStream` and
`SparkScan` so V2 streaming reads honor non-additive schema evolution
(column rename/drop, type widening).

- `SparkMicroBatchStream`: take `metadataTrackingLog` + `metadataPath`
as constructor inputs; when a persisted entry exists, layer it onto the
freshly loaded `snapshotAtSourceInit` to derive
`readSnapshotAtSourceInit` (mirrors V1's `readSnapshotDescriptor`).
Integrate the schema-evolution barrier protocol into `latestOffset` /
`commit` / `planInputPartitions`. Skip the on-restart schema-validation
check when schema tracking is active — the schema-log evolution
exception covers it.
- `SparkScan.toMicroBatchStream`: reload latest snapshot (the
analysis-time `initialSnapshot` can be stale by stream start), open the
tracking log via
`MetadataEvolutionHandler.getMetadataTrackingLogForMicroBatchStream`
with `mergeConsecutiveSchemaChanges=false` (the merger only runs at
analysis), and pass it through with the checkpoint location.
- `SparkScan` option allow-list: move `allowSourceColumnDrop` / `Rename`
/ `TypeChange` out of the unsupported list now that they are honored.

## How was this patch tested?

`SparkMicroBatchStreamTest`, `MetadataEvolutionHandlerTest`. Unified
suites (`DeltaV2SourceSchemaEvolutionSuite`,
`TypeWideningStreamingV2SourceSuite`,
`RemoveColumnMappingStreamingReadV2Suite`) move non-merger evolution
scenarios from `shouldFailTests` to `shouldPassTests`; merger-dependent
tests remain pending until PR 7/7.

## Does this PR introduce _any_ user-facing changes?

No.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants