diff --git a/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaV1V2SourceSchemaTrackingLogCompatibilitySuite.scala b/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaV1V2SourceSchemaTrackingLogCompatibilitySuite.scala new file mode 100644 index 00000000000..e0235d4ee05 --- /dev/null +++ b/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaV1V2SourceSchemaTrackingLogCompatibilitySuite.scala @@ -0,0 +1,505 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.test + +import java.io.File + +import org.apache.spark.sql.delta.{ + DeltaColumnMappingEnableIdMode, + DeltaColumnMappingEnableNameMode, + DeltaLog, + StreamingSchemaEvolutionSuiteBase +} +import org.apache.spark.sql.delta.sources.{ + DeltaSourceMetadataTrackingLog, + DeltaSQLConf, + PersistedMetadata +} + +import org.apache.spark.sql.{DataFrame, Row} + +/** + * Verifies that a single stream's schema tracking log is interchangeable between the V1 (legacy + * DeltaSource) and V2 (Kernel-based SparkMicroBatchStream) connectors. Each test alternates the + * connector across stream restarts that share the same checkpoint and schema location, exercising + * schema log initialization, non-additive evolution, and consecutive evolutions. + */ +trait DeltaV1V2SourceSchemaTrackingLogCompatibilitySuiteBase + extends StreamingSchemaEvolutionSuiteBase { + + override protected def isCdcTest: Boolean = false + + // V2ForceTest enforces a per-suite parity contract; this suite tracks neither shouldPass nor + // shouldFail, so leave the column-mapping mixin's filter disabled. + override protected def runAllTests: Boolean = true + override protected def runOnlyTests: Seq[String] = Seq.empty + + // Connector selection is varied per readStream call rather than fixed at the suite level. + // `useDsv2` is read inside `loadStreamWithOptions`, so flipping this var between restarts + // routes the next read through either DeltaSource (V1) or SparkMicroBatchStream (V2). + private var useDsv2Override: Boolean = false + override protected def useDsv2: Boolean = useDsv2Override + + /** Run `f` with the named connector active for both reads and the V2_ENABLE_MODE conf. */ + private def withConnector[T](useV2: Boolean)(f: => T): T = { + val prev = useDsv2Override + useDsv2Override = useV2 + try { + val mode = if (useV2) "STRICT" else "NONE" + withSQLConf(DeltaSQLConf.V2_ENABLE_MODE.key -> mode) { + f + } + } finally { + useDsv2Override = prev + } + } + + protected def withV1[T](f: => T): T = withConnector(useV2 = false)(f) + protected def withV2[T](f: => T): T = withConnector(useV2 = true)(f) + + // SparkTable (V2) is read-only; route DDL/DML through V1 regardless of the active reader. + override protected def executeDml(sqlText: String): Unit = { + withSQLConf(DeltaSQLConf.V2_ENABLE_MODE.key -> "NONE") { + sql(sqlText) + } + } + + // =========================================================================================== + // Strict-equivalence helpers + // =========================================================================================== + + /** Construct a tracking log instance pointing at an arbitrary root location. */ + private def schemaLogAt(rootLocation: String)(implicit log: DeltaLog) + : DeltaSourceMetadataTrackingLog = + DeltaSourceMetadataTrackingLog.create( + spark, + rootLocation, + log.unsafeVolatileTableId, + log.dataPath.toString, + parameters = Map.empty) + + /** + * `sourceMetadataPath` is the only field that legitimately differs between two runs whose + * checkpoints live in different temp dirs. Strip it for cross-run comparison. + */ + private def normalize(metadata: PersistedMetadata): PersistedMetadata = + metadata.copy(sourceMetadataPath = "") + + /** + * Returns up to two entries currently stored in the tracking log: the previous and current. + * Sufficient for the evolution sequences these tests construct (init + post-rename). + */ + private def trackedEntries( + schemaLog: DeltaSourceMetadataTrackingLog): Seq[PersistedMetadata] = { + val current = schemaLog.getCurrentTrackedMetadata + val previous = schemaLog.getPreviousTrackedMetadata + (previous.toSeq ++ current.toSeq.filterNot(c => previous.contains(c))).distinct + } + + /** + * Parameterizes the non-additive evolution tests over rename vs drop. `onStarter` runs against + * the (a, b) starter schema; `afterAdditive` runs against an (a, b, c) schema produced by an + * earlier `addColumn("c")`. The two row builders describe the post-evolution row shape used by + * `addData` so that `CheckAnswer` is correct in each shape. + */ + protected case class NonAdditiveChange( + kind: String, + onStarter: DeltaLog => Unit, + afterAdditive: DeltaLog => Unit, + rowAfterStarter: Int => Row, + rowAfterAdditive: Int => Row) + + protected val nonAdditiveChanges: Seq[NonAdditiveChange] = Seq( + NonAdditiveChange( + kind = "rename", + onStarter = log => renameColumn("b", "c")(log), + afterAdditive = log => renameColumn("c", "d")(log), + rowAfterStarter = i => Row(i.toString, i.toString), + rowAfterAdditive = i => Row(i.toString, i.toString, i.toString)), + NonAdditiveChange( + kind = "drop", + onStarter = log => dropColumn("b")(log), + afterAdditive = log => dropColumn("c")(log), + rowAfterStarter = i => Row(i.toString), + rowAfterAdditive = i => Row(i.toString, i.toString)) + ) + + // =========================================================================================== + // Tests + // =========================================================================================== + + test("V1-initialized schema log can be read by V2") { + withStarterTable { implicit log => + def df: DataFrame = readStream(schemaLocation = Some(getDefaultSchemaLocation.toString)) + + // V1 initializes the schema log on first run. + withV1 { + testStream(df)( + StartStream(checkpointLocation = getDefaultCheckpoint.toString), + ProcessAllAvailable(), + CheckAnswer((-1 until 5).map(i => (i.toString, i.toString)): _*) + ) + } + val v0 = log.update().version + assert(getDefaultSchemaLog().getCurrentTrackedMetadata.get.deltaCommitVersion == v0) + + addData(5 until 10) + + // V2 resumes from the V1-written checkpoint and schema log. + withV2 { + testStream(df)( + StartStream(checkpointLocation = getDefaultCheckpoint.toString), + ProcessAllAvailable(), + CheckAnswer((5 until 10).map(i => (i.toString, i.toString)): _*) + ) + } + } + } + + test("V2-initialized schema log can be read by V1") { + withStarterTable { implicit log => + def df: DataFrame = readStream(schemaLocation = Some(getDefaultSchemaLocation.toString)) + + // V2 initializes the schema log on first run. + withV2 { + testStream(df)( + StartStream(checkpointLocation = getDefaultCheckpoint.toString), + ProcessAllAvailable(), + CheckAnswer((-1 until 5).map(i => (i.toString, i.toString)): _*) + ) + } + val v0 = log.update().version + assert(getDefaultSchemaLog().getCurrentTrackedMetadata.get.deltaCommitVersion == v0) + + addData(5 until 10) + + // V1 resumes from the V2-written checkpoint and schema log. + withV1 { + testStream(df)( + StartStream(checkpointLocation = getDefaultCheckpoint.toString), + ProcessAllAvailable(), + CheckAnswer((5 until 10).map(i => (i.toString, i.toString)): _*) + ) + } + } + } + + nonAdditiveChanges.foreach { change => + test(s"non-additive evolution (${change.kind}) written by V1 is consumed by V2") { + withStarterTable { implicit log => + def df: DataFrame = readStream(schemaLocation = Some(getDefaultSchemaLocation.toString)) + + withV1 { + testStream(df)( + StartStream(checkpointLocation = getDefaultCheckpoint.toString), + ProcessAllAvailable(), + CheckAnswer((-1 until 5).map(i => (i.toString, i.toString)): _*) + ) + } + + change.onStarter(log) + val evolutionVersion = log.update().version + addData(5 until 10) + + // V1 hits the change and writes the new metadata into the schema log. + withV1 { + testStream(df)( + StartStream(checkpointLocation = getDefaultCheckpoint.toString), + ProcessAllAvailableIgnoreError, + CheckAnswer(Nil: _*), + ExpectMetadataEvolutionException + ) + } + assert(getDefaultSchemaLog().getCurrentTrackedMetadata.get.deltaCommitVersion == + evolutionVersion) + + // V2 starts under the V1-evolved schema and processes the post-evolution data. + withV2 { + testStream(df)( + StartStream(checkpointLocation = getDefaultCheckpoint.toString), + ProcessAllAvailable(), + CheckAnswer((5 until 10).map(change.rowAfterStarter): _*) + ) + } + } + } + } + + nonAdditiveChanges.foreach { change => + test(s"non-additive evolution (${change.kind}) written by V2 is consumed by V1") { + withStarterTable { implicit log => + def df: DataFrame = readStream(schemaLocation = Some(getDefaultSchemaLocation.toString)) + + withV2 { + testStream(df)( + StartStream(checkpointLocation = getDefaultCheckpoint.toString), + ProcessAllAvailable(), + CheckAnswer((-1 until 5).map(i => (i.toString, i.toString)): _*) + ) + } + + change.onStarter(log) + val evolutionVersion = log.update().version + addData(5 until 10) + + // V2 hits the change and writes the new metadata into the schema log. + withV2 { + testStream(df)( + StartStream(checkpointLocation = getDefaultCheckpoint.toString), + ProcessAllAvailableIgnoreError, + CheckAnswer(Nil: _*), + ExpectMetadataEvolutionException + ) + } + assert(getDefaultSchemaLog().getCurrentTrackedMetadata.get.deltaCommitVersion == + evolutionVersion) + + // V1 starts under the V2-evolved schema and processes the post-evolution data. + withV1 { + testStream(df)( + StartStream(checkpointLocation = getDefaultCheckpoint.toString), + ProcessAllAvailable(), + CheckAnswer((5 until 10).map(change.rowAfterStarter): _*) + ) + } + } + } + } + + nonAdditiveChanges.foreach { change => + test(s"alternating connectors across consecutive evolutions (${change.kind})") { + withStarterTable { implicit log => + def df: DataFrame = readStream(schemaLocation = Some(getDefaultSchemaLocation.toString)) + + // 1) V1 initializes. + withV1 { + testStream(df)( + StartStream(checkpointLocation = getDefaultCheckpoint.toString), + ProcessAllAvailable(), + CheckAnswer((-1 until 5).map(i => (i.toString, i.toString)): _*) + ) + } + + // 2) Add column, then V2 picks up the additive evolution. + addColumn("c") + val additiveVersion = log.update().version + addData(5 until 10) + + withV2 { + testStream(df)( + StartStream(checkpointLocation = getDefaultCheckpoint.toString), + ProcessAllAvailableIgnoreError, + ExpectMetadataEvolutionException + ) + } + assert(getDefaultSchemaLog().getCurrentTrackedMetadata.get.deltaCommitVersion == + additiveVersion) + + withV2 { + testStream(df)( + StartStream(checkpointLocation = getDefaultCheckpoint.toString), + ProcessAllAvailable(), + CheckAnswer((5 until 10).map(i => (i.toString, i.toString, i.toString)): _*) + ) + } + + // 3) Apply the non-additive change to the post-additive schema; V1 picks it up. + change.afterAdditive(log) + val nonAdditiveVersion = log.update().version + addData(10 until 15) + + withV1 { + testStream(df)( + StartStream(checkpointLocation = getDefaultCheckpoint.toString), + ProcessAllAvailableIgnoreError, + ExpectMetadataEvolutionException + ) + } + assert(getDefaultSchemaLog().getCurrentTrackedMetadata.get.deltaCommitVersion == + nonAdditiveVersion) + + // 4) Final flip back to V2 to drain the post-evolution batch. + withV2 { + testStream(df)( + StartStream(checkpointLocation = getDefaultCheckpoint.toString), + ProcessAllAvailable(), + CheckAnswer((10 until 15).map(change.rowAfterAdditive): _*) + ) + } + } + } + } + + // ------------------------------------------------------------------------------------------- + // Strict equivalence: write path + // ------------------------------------------------------------------------------------------- + + nonAdditiveChanges.foreach { change => + test(s"V1 and V2 write equivalent schema tracking log entries (${change.kind})") { + withStarterTable { implicit log => + // Set up a deterministic evolution sequence: additive add column "c", then the + // parameterized non-additive change applied to that 3-column schema. + addColumn("c") + addData(5 until 10) + change.afterAdditive(log) + val evolutionVersion = log.update().version + addData(10 until 15) + + withTempDir { ckpt1 => + withTempDir { ckpt2 => + def schemaLocation(ckptDir: File): String = + new File(ckptDir, "_schema_location").getCanonicalPath + + // Drive `useV2` end-to-end against `ckptDir`: init log, hit the evolution barrier, + // then drain the post-evolution data. + def driveStream(useV2: Boolean, ckptDir: File): Unit = { + val checkpointPath = ckptDir.getCanonicalPath + val schemaPath = schemaLocation(ckptDir) + def df: DataFrame = readStream(schemaLocation = Some(schemaPath)) + + withConnector(useV2) { + testStream(df)( + StartStream(checkpointLocation = checkpointPath), + ProcessAllAvailableIgnoreError, + ExpectMetadataEvolutionException + ) + } + withConnector(useV2) { + testStream(df)( + StartStream(checkpointLocation = checkpointPath), + ProcessAllAvailable(), + CheckAnswer((10 until 15).map(change.rowAfterAdditive): _*) + ) + } + } + + driveStream(useV2 = false, ckpt1) + driveStream(useV2 = true, ckpt2) + + val v1Entries = trackedEntries(schemaLogAt(schemaLocation(ckpt1))) + val v2Entries = trackedEntries(schemaLogAt(schemaLocation(ckpt2))) + + // Same number of entries, same content per entry (after stripping the per-checkpoint + // sourceMetadataPath, which is the only field that legitimately differs between runs). + assert(v1Entries.size == v2Entries.size, + s"V1 wrote ${v1Entries.size} entries, V2 wrote ${v2Entries.size}") + v1Entries.zip(v2Entries).foreach { case (v1Entry, v2Entry) => + assert(normalize(v1Entry) == normalize(v2Entry), + s"Entry mismatch:\n V1: $v1Entry\n V2: $v2Entry") + } + + // And both should land at the evolution version. + assert(v1Entries.last.deltaCommitVersion == evolutionVersion) + assert(v2Entries.last.deltaCommitVersion == evolutionVersion) + } + } + } + } + } + + // ------------------------------------------------------------------------------------------- + // Strict equivalence: read path + // ------------------------------------------------------------------------------------------- + + nonAdditiveChanges.foreach { change => + test(s"V1 and V2 read identical pre-seeded schema log to the same final state " + + s"(${change.kind})") { + withStarterTable { implicit log => + // Apply the parameterized change after the seed point so each connector has work to do. + val seedVersion = log.update().version + val seedSnapshot = log.update() + change.onStarter(log) + val evolutionVersion = log.update().version + addData(5 until 10) + + withTempDir { ckpt1 => + withTempDir { ckpt2 => + def schemaLocation(ckptDir: File): String = + new File(ckptDir, "_schema_location").getCanonicalPath + + // The metadata path check requires the seed's sourceMetadataPath to match what the + // executing stream computes; disable it so we can pre-seed without prediction. + val seedConfs = + DeltaSQLConf.DELTA_STREAMING_SCHEMA_TRACKING_METADATA_PATH_CHECK_ENABLED.key -> + "false" + + // Pre-seed both schema-log directories with byte-identical content: the pre-evolution + // schema (a, b) at the starter table's latest version. + Seq(ckpt1, ckpt2).foreach { ckptDir => + withSQLConf(seedConfs) { + val seed = PersistedMetadata( + log.unsafeVolatileTableId, + seedVersion, + makeMetadata(seedSnapshot.schema, seedSnapshot.metadata.partitionSchema), + seedSnapshot.protocol, + sourceMetadataPath = "") + schemaLogAt(schemaLocation(ckptDir)).writeNewMetadata(seed) + } + } + + // Confirm the seeds are identical on disk, modulo sourceMetadataPath (empty in both). + val seed1 = trackedEntries(schemaLogAt(schemaLocation(ckpt1))) + val seed2 = trackedEntries(schemaLogAt(schemaLocation(ckpt2))) + assert(seed1.map(normalize) == seed2.map(normalize)) + + // Each connector reads from its own checkpoint, starting from the seed and running + // until it hits the evolution barrier. + def runUntilEvolution(useV2: Boolean, ckptDir: File): Unit = { + val checkpointPath = ckptDir.getCanonicalPath + val schemaPath = schemaLocation(ckptDir) + def df: DataFrame = + readStream(schemaLocation = Some(schemaPath), startingVersion = Some(0L)) + + withConnector(useV2) { + withSQLConf(seedConfs) { + testStream(df)( + StartStream(checkpointLocation = checkpointPath), + ProcessAllAvailableIgnoreError, + ExpectMetadataEvolutionException + ) + } + } + } + + runUntilEvolution(useV2 = false, ckpt1) + runUntilEvolution(useV2 = true, ckpt2) + + // Final tracking log state must match entry-for-entry. + val v1Entries = trackedEntries(schemaLogAt(schemaLocation(ckpt1))) + val v2Entries = trackedEntries(schemaLogAt(schemaLocation(ckpt2))) + assert(v1Entries.size == v2Entries.size, + s"V1 has ${v1Entries.size} entries, V2 has ${v2Entries.size} after read-path run") + v1Entries.zip(v2Entries).foreach { case (v1Entry, v2Entry) => + assert(normalize(v1Entry) == normalize(v2Entry), + s"Entry mismatch:\n V1: $v1Entry\n V2: $v2Entry") + } + // Both connectors converged on the evolution version. + assert(v1Entries.last.deltaCommitVersion == evolutionVersion) + } + } + } + } + } +} + +class DeltaV1V2SourceSchemaTrackingLogCompatibilityNameColumnMappingSuite + extends DeltaV1V2SourceSchemaTrackingLogCompatibilitySuiteBase + with DeltaColumnMappingEnableNameMode + +class DeltaV1V2SourceSchemaTrackingLogCompatibilityIdColumnMappingSuite + extends DeltaV1V2SourceSchemaTrackingLogCompatibilitySuiteBase + with DeltaColumnMappingEnableIdMode diff --git a/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaV2SourceSchemaEvolutionSuite.scala b/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaV2SourceSchemaEvolutionSuite.scala index 80e766f4e45..9dcd8142aaa 100644 --- a/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaV2SourceSchemaEvolutionSuite.scala +++ b/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaV2SourceSchemaEvolutionSuite.scala @@ -113,21 +113,17 @@ trait DeltaV2SourceSchemaEvolutionSuiteBase extends V2ForceTest { // ========== Schema evolution scenarios ========== "consecutive schema evolutions without schema merging", + "consecutive schema evolutions", "upgrade and downgrade", "multiple sources with schema evolution", "schema evolution with Delta sink", "latestOffset should not progress before schema evolved", + "unblock with sql conf", + "unblock with sql conf - nested struct", "schema tracking interacting with unsafe escape flag", + "streaming with a column mapping upgrade", "partition evolution" ) - - // TODO(#5319): Move to PASS after consecutive schema merger is supported - override protected def shouldFailTests: Set[String] = Set( - // ========== Schema log core ========== - "consecutive schema evolutions", - "unblock with sql conf", - "streaming with a column mapping upgrade" - ) } // Non-CDC suites diff --git a/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/typewidening/TypeWideningStreamingV2SourceSuite.scala b/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/typewidening/TypeWideningStreamingV2SourceSuite.scala index f0170192cc1..9e97ffa75d9 100644 --- a/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/typewidening/TypeWideningStreamingV2SourceSuite.scala +++ b/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/typewidening/TypeWideningStreamingV2SourceSuite.scala @@ -86,11 +86,5 @@ class TypeWideningStreamingV2SourceSchemaTrackingSuite "unblocking stream with reader option after type change - unblock version", "overwrite schema with type change and dropped column", "disable schema tracking log using internal conf" - ) -- shouldFailTests - - // TODO(#5319): Move to PASS after consecutive schema merger is supported - override protected def shouldFailTests: Set[String] = - super.shouldFailTests ++ Set( - "type change in delta source writing to a delta sink" ) } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataTrackingLog.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataTrackingLog.scala index 61c46302674..92b6ba87a3a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataTrackingLog.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataTrackingLog.scala @@ -107,7 +107,8 @@ object PersistedMetadata { def fromJson(json: String): PersistedMetadata = JsonUtils.fromJson[PersistedMetadata](json) /** - * Builds a [[PersistedMetadata]] from V2 interop abstractions. + * Serializes an [[AbstractProtocol]] to the Spark [[Protocol]] JSON format used in + * [[PersistedMetadata.protocolJson]]. * * Contract on [[AbstractProtocol]]: `readerFeatures` / `writerFeatures` must be consistent * with the min protocol versions: `readerFeatures` may only be defined when @@ -116,26 +117,32 @@ object PersistedMetadata { * relies on this invariant; [[Protocol]] will throw a `require` failure if an implementation * gets it wrong. */ + def toProtocolJson(abstractProtocol: AbstractProtocol): String = { + Protocol( + abstractProtocol.minReaderVersion, + abstractProtocol.minWriterVersion + ).copy( + readerFeatures = abstractProtocol.readerFeatures, + writerFeatures = abstractProtocol.writerFeatures + ).json + } + + /** + * Builds a [[PersistedMetadata]] from V2 interop abstractions. + */ def apply( tableId: String, deltaCommitVersion: Long, abstractMetadata: AbstractMetadata, abstractProtocol: AbstractProtocol, sourceMetadataPath: String): PersistedMetadata = { - val protocol = Protocol( - abstractProtocol.minReaderVersion, - abstractProtocol.minWriterVersion - ).copy( - readerFeatures = abstractProtocol.readerFeatures, - writerFeatures = abstractProtocol.writerFeatures - ) PersistedMetadata(tableId, deltaCommitVersion, abstractMetadata.schema.json, abstractMetadata.partitionSchema.json, // The schema is bound to the specific source sourceMetadataPath, // Table configurations come from the Metadata action Some(abstractMetadata.configuration), - Some(protocol.json) + Some(toProtocolJson(abstractProtocol)) ) } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSchemaEvolutionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSchemaEvolutionSuite.scala index ea35c49e616..14b9c71c92f 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSchemaEvolutionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSchemaEvolutionSuite.scala @@ -190,6 +190,22 @@ trait StreamingSchemaEvolutionSuiteBase extends ColumnMappingStreamingTestUtils } } + /** Like `addData`, but also fills nested struct fields recursively. */ + protected def addNestedData(data: Seq[Int])(implicit log: DeltaLog): Unit = { + val schema = log.update().schema + def buildRow(s: StructType, value: String): Row = Row.fromSeq(s.fields.map { f => + f.dataType match { + case sub: StructType => buildRow(sub, value) + case _ => value + } + }) + data.foreach { i => + val row = buildRow(schema, i.toString) + spark.createDataFrame(Seq(row).asJava, schema) + .write.format("delta").mode("append").save(log.dataPath.toString) + } + } + protected def readStream( schemaLocation: Option[String] = None, sourceTrackingId: Option[String] = None, @@ -1928,6 +1944,43 @@ trait StreamingSchemaEvolutionSuiteBase extends ColumnMappingStreamingTestUtils } } + /** + * Same as `withSimpleStreamingDf` but with a nested struct column, so tests can exercise + * drop / rename of fields inside a struct: schema is `a STRING, s STRUCT`. + */ + protected def withNestedStructStreamingDf(f: (() => DataFrame, DeltaLog) => Unit): Unit = { + withTempDir { dir => + val tablePath = dir.getCanonicalPath + val schema = StructType.fromDDL("a STRING, s STRUCT") + val initialRow = Seq(Row("0", Row("0", "0"))) + spark.createDataFrame(initialRow.asJava, schema) + .write.mode("append").format("delta").save(tablePath) + implicit val log = DeltaLog.forTable(spark, dir.getCanonicalPath) + val s0 = log.update() + val schemaLog = getDefaultSchemaLog() + schemaLog.writeNewMetadata( + PersistedMetadata(log.unsafeVolatileTableId, s0.version, s0.metadata, s0.protocol, + sourceMetadataPath = "") + ) + + def read(): DataFrame = + readStream( + Some(getDefaultSchemaLocation.toString), + startingVersion = Some(s0.version)) + + withSQLConf( + DeltaSQLConf.DELTA_STREAMING_SCHEMA_TRACKING_METADATA_PATH_CHECK_ENABLED.key -> "false") { + testStream(read())( + StartStream(checkpointLocation = getDefaultCheckpoint.toString), + ProcessAllAvailable(), + CheckAnswer(Row("0", Row("0", "0"))), + StopStream + ) + f(read, log) + } + } + } + testWithoutAllowStreamRestart("unblock with sql conf") { def testStreamFlow( changeSchema: DeltaLog => Unit, @@ -2037,6 +2090,105 @@ trait StreamingSchemaEvolutionSuiteBase extends ColumnMappingStreamingTestUtils } } + testWithoutAllowStreamRestart("unblock with sql conf - nested struct") { + def testStreamFlow( + changeSchema: DeltaLog => Unit, + schemaChangeType: String, + columnChangeDetails: String, + getConfKV: (Int, Long) => (String, String)): Unit = { + withNestedStructStreamingDf { (readDf, log) => + val ckptHash = (getDefaultCheckpoint(log).toString + "/sources/0").hashCode + changeSchema(log) + val v1 = log.update().version + addNestedData(Seq(1))(log) + // Encounter schema evolution exception + testStream(readDf())( + StartStream(checkpointLocation = getDefaultCheckpoint(log).toString), + ProcessAllAvailableIgnoreError, + CheckAnswer(Nil: _*), + ExpectMetadataEvolutionException + ) + // Restart fails on SQL conf validation + testStream(readDf())( + StartStream(checkpointLocation = getDefaultCheckpoint(log).toString), + ProcessAllAvailableIgnoreError, + CheckAnswer(Nil: _*), + expectSqlConfException(schemaChangeType, v1, columnChangeDetails, ckptHash) + ) + // With SQL Conf set we can move on + val (k, v) = getConfKV(ckptHash, v1) + withSQLConf(k -> v) { + testStream(readDf())( + StartStream(checkpointLocation = getDefaultCheckpoint(log).toString), + ProcessAllAvailable() + ) + } + } + } + + // Test drop column inside a nested struct (s.x) + Seq("allowSourceColumnRenameAndDrop", "allowSourceColumnDrop").foreach { allow => + Seq( + ( + (log: DeltaLog) => { + dropColumn("s.x")(log) + // Revert via add to ensure consecutive schema changes don't affect sql conf validation + addColumn("s.x")(log) + }, + (ckptHash: Int, _: Long) => + (s"${DeltaSQLConf.SQL_CONF_PREFIX}.streaming.$allow.ckpt_$ckptHash", "always") + ), + ( + (log: DeltaLog) => { + dropColumn("s.x")(log) + addColumn("s.x")(log) + }, + (ckptHash: Int, ver: Long) => + (s"${DeltaSQLConf.SQL_CONF_PREFIX}.streaming.$allow.ckpt_$ckptHash", ver.toString) + ) + ).foreach { case (changeSchema, getConfKV) => + testStreamFlow( + changeSchema, + schemaChangeType = "DROP COLUMN", + columnChangeDetails = + s"""Columns dropped: + |'s.x' + |""".stripMargin, + getConfKV) + } + } + + // Test rename column inside a nested struct (s.x -> s.z) + Seq("allowSourceColumnRenameAndDrop", "allowSourceColumnRename").foreach { allow => + Seq( + ( + (log: DeltaLog) => { + renameColumn("s.x", "z")(log) + }, + (ckptHash: Int, _: Long) => + (s"${DeltaSQLConf.SQL_CONF_PREFIX}.streaming.$allow.ckpt_$ckptHash", "always") + ), + ( + (log: DeltaLog) => { + renameColumn("s.x", "z")(log) + }, + (ckptHash: Int, ver: Long) => + (s"${DeltaSQLConf.SQL_CONF_PREFIX}.streaming.$allow.ckpt_$ckptHash", ver.toString) + ) + ).foreach { case (changeSchema, getConfKV) => + testStreamFlow( + changeSchema, + schemaChangeType = "RENAME COLUMN", + columnChangeDetails = + s"""Columns renamed: + |'s.x' -> 's.z' + |""".stripMargin, + getConfKV + ) + } + } + } + testSchemaEvolution( "schema tracking interacting with unsafe escape flag") { implicit log => renameColumn("b", "c") diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandler.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandler.java index 430762a4c61..8fc7272c3a0 100644 --- a/spark/v2/src/main/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandler.java +++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandler.java @@ -15,16 +15,21 @@ */ package io.delta.spark.internal.v2.read; +import io.delta.kernel.CommitActions; +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.data.ColumnarBatch; import io.delta.kernel.engine.Engine; import io.delta.kernel.internal.DeltaLogActionUtils; import io.delta.kernel.internal.SnapshotImpl; import io.delta.kernel.internal.actions.Metadata; import io.delta.kernel.internal.actions.Protocol; import io.delta.kernel.internal.checksum.CRCInfo; +import io.delta.kernel.internal.commitrange.CommitRangeImpl; import io.delta.kernel.internal.lang.Lazy; import io.delta.kernel.internal.metrics.SnapshotQueryContext; import io.delta.kernel.internal.replay.LogReplay; import io.delta.kernel.internal.snapshot.LogSegment; +import io.delta.kernel.internal.util.Preconditions; import io.delta.kernel.internal.util.Utils; import io.delta.kernel.internal.util.VectorUtils; import io.delta.kernel.types.StringType; @@ -36,6 +41,7 @@ import io.delta.spark.internal.v2.utils.ScalaUtils; import io.delta.spark.internal.v2.utils.SchemaUtils; import io.delta.spark.internal.v2.utils.StreamingHelper; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -692,9 +698,140 @@ public static Option getMetadataTrackingLogForMi tablePath, ScalaUtils.toScalaMap(options), sourceMetadataPathOpt, - // TODO(#5319): Implement v2 consecutiveSchema schema changes merger - /* mergeConsecutiveSchemaChanges= */ false, - /* consecutiveSchemaChangesMerger= */ Option.empty(), + /* mergeConsecutiveSchemaChanges= */ mergeConsecutiveSchemaChanges, + /* consecutiveSchemaChangesMerger= */ Option.apply( + currentMetadata -> + getMergedConsecutiveMetadataChanges( + currentMetadata, snapshotManager, engine, tablePath, mergeActionSet)), /* initMetadataLogEagerly= */ true)); } + + /** V2 port of {@code DeltaSourceMetadataEvolutionSupport.getMergedConsecutiveMetadataChanges}. */ + public static Option getMergedConsecutiveMetadataChanges( + PersistedMetadata currentMetadata, + DeltaSnapshotManager snapshotManager, + Engine engine, + String tablePath, + Set mergeActionSet) { + final long currentMetadataVersion = currentMetadata.deltaCommitVersion(); + // We start from the currentSchemaVersion so that we can stop early in case the current + // version still has file actions that potentially needs to be processed. + long mergedVersion = currentMetadataVersion; + Metadata mergedMetadata = null; + Protocol mergedProtocol = null; + + CommitRangeImpl commitRange = + (CommitRangeImpl) + snapshotManager.getTableChanges(engine, currentMetadataVersion, Optional.empty()); + + try (CloseableIterator commitsIter = + StreamingHelper.getCommitActionsFromRangeUnsafe( + engine, commitRange, tablePath, mergeActionSet)) { + while (commitsIter.hasNext()) { + try (CommitActions commit = commitsIter.next()) { + long version = commit.getVersion(); + Metadata metadataAction = null; + Protocol protocolAction = null; + boolean hasFileAction = false; + + try (CloseableIterator actionsIter = commit.getActions()) { + outer: + while (actionsIter.hasNext()) { + ColumnarBatch batch = actionsIter.next(); + int numRows = batch.getSize(); + int addIdx = batch.getSchema().indexOf(DeltaLogActionUtils.DeltaAction.ADD.colName); + int removeIdx = + batch.getSchema().indexOf(DeltaLogActionUtils.DeltaAction.REMOVE.colName); + ColumnVector addVec = addIdx >= 0 ? batch.getColumnVector(addIdx) : null; + ColumnVector removeVec = removeIdx >= 0 ? batch.getColumnVector(removeIdx) : null; + + // TODO(#5319): handle AddCDCFile as well + for (int rowId = 0; rowId < numRows; rowId++) { + Optional m = StreamingHelper.getMetadata(batch, rowId); + if (m.isPresent()) { + Preconditions.checkArgument( + metadataAction == null, + "Should not encounter two metadata actions in the same commit of version %d", + version); + metadataAction = m.get(); + } + Optional p = StreamingHelper.getProtocol(batch, rowId); + if (p.isPresent()) { + Preconditions.checkArgument( + protocolAction == null, + "Should not encounter two protocol actions in the same commit of version %d", + version); + protocolAction = p.get(); + } + if ((addVec != null && !addVec.isNullAt(rowId)) + || (removeVec != null && !removeVec.isNullAt(rowId))) { + hasFileAction = true; + break outer; + } + } + } + } catch (IOException e) { + throw new RuntimeException("Failed to process commit at version " + version, e); + } + + // Mirror v1's takeWhile predicate: + // continue while !hasFileAction && (metadata.isDefined || protocol.isDefined) + if (hasFileAction) break; + if (metadataAction == null && protocolAction == null) break; + + mergedVersion = version; + if (metadataAction != null) mergedMetadata = metadataAction; + if (protocolAction != null) mergedProtocol = protocolAction; + } + } + } catch (RuntimeException e) { + throw e; // Rethrow runtime exceptions directly + } catch (Exception e) { + // CommitActions.close() throws Exception + throw new RuntimeException("Failed to merge consecutive metadata changes", e); + } + + if (mergedVersion == currentMetadataVersion) { + return Option.empty(); + } + + logger.info( + "Looked ahead from version {} and will use metadata at version {} to read Delta stream.", + currentMetadataVersion, + mergedVersion); + + return Option.apply( + buildMergedPersistedMetadata( + currentMetadata, mergedVersion, mergedMetadata, mergedProtocol)); + } + + private static PersistedMetadata buildMergedPersistedMetadata( + PersistedMetadata current, long newVersion, Metadata newMetadata, Protocol newProtocol) { + String dataSchemaJson = current.dataSchemaJson(); + String partitionSchemaJson = current.partitionSchemaJson(); + Option> tableConfigurations = + current.tableConfigurations(); + Option protocolJson = current.protocolJson(); + + if (newMetadata != null) { + KernelMetadataAdapter mAdapter = new KernelMetadataAdapter(newMetadata); + dataSchemaJson = mAdapter.schema().json(); + partitionSchemaJson = mAdapter.partitionSchema().json(); + tableConfigurations = Option.apply(mAdapter.configuration()); + } + if (newProtocol != null) { + protocolJson = + Option.apply(PersistedMetadata.toProtocolJson(new KernelProtocolAdapter(newProtocol))); + } + + return new PersistedMetadata( + current.tableId(), + newVersion, + dataSchemaJson, + partitionSchemaJson, + current.sourceMetadataPath(), + tableConfigurations, + protocolJson, + current.previousMetadataSeqNum()); + } } diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java index 2dc8382a095..6c8c01e5ee5 100644 --- a/spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java +++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java @@ -264,7 +264,7 @@ public SparkMicroBatchStream( // tracking has a persisted entry, layer it onto the freshly loaded snapshotAtSourceInit so // the read schema/protocol/config reflect the evolved state. The merged entry has already // been written to the durable schema log during analysis (by SparkTable's call to - // getMetadataTrackingLogForMicroBatchStream with mergeConsecutiveSchemaChanges=true), so + // getPersistedMetadataForMicroBatchStream with mergeConsecutiveSchemaChanges=true), so // reading it back from the log here yields the same value V1 obtains. Option persistedMetadataAtSourceInit = metadataTrackingLog.isDefined() @@ -1340,6 +1340,10 @@ private CommitValidationResult validateCommitAndDecideSkipping( // Track Protocol for schema evolution barrier detection. Optional protocolOpt = StreamingHelper.getProtocol(batch, rowId); if (protocolOpt.isPresent()) { + Preconditions.checkArgument( + protocolAction == null, + "Should not encounter two protocol actions in the same commit of version %d", + version); protocolAction = protocolOpt.get(); } diff --git a/spark/v2/src/test/java/io/delta/spark/internal/v2/catalog/SparkTableTest.java b/spark/v2/src/test/java/io/delta/spark/internal/v2/catalog/SparkTableTest.java index a78cf5a7493..965a1c818b9 100644 --- a/spark/v2/src/test/java/io/delta/spark/internal/v2/catalog/SparkTableTest.java +++ b/spark/v2/src/test/java/io/delta/spark/internal/v2/catalog/SparkTableTest.java @@ -649,11 +649,15 @@ private void verifyPersistedEntryDrivesSparkTableSchema( tablePath + "/_delta_log/_streaming_metadata"); trackingLog.writeNewMetadata(seededEntry, false); - // Evolve the table — version 1 has an extra non-partition column. + // INSERT at v1 acts as a file-action barrier so the consecutive-schema-change merger (on by + // default) cannot fast-forward the seeded v0 entry past the upcoming v2 ALTER TABLE. + spark.sql(String.format("INSERT INTO %s VALUES (1, 'a')", tableName)); + + // Evolve the table — version 2 has an extra non-partition column. spark.sql(String.format("ALTER TABLE %s ADD COLUMNS (value DOUBLE)", tableName)); // Construct SparkTable with the schema-tracking option pointing at the seeded log. The - // snapshot is at v1 (3 columns) but the persisted entry is at v0 (2 columns). Default to + // snapshot is at v2 (3 columns) but the persisted entry is at v0 (2 columns). Default to // the catalog-table constructor since that's how production code typically loads the table. Identifier identifier = Identifier.of(new String[] {"default"}, tableName); Map options = new HashMap<>(); @@ -672,7 +676,7 @@ private void verifyPersistedEntryDrivesSparkTableSchema( table = new SparkTable(identifier, catalogTable, options); } - // Persisted metadata wins: schema reflects v0 (2 columns), not v1 (3 columns). + // Persisted metadata wins: schema reflects v0 (2 columns), not v2 (3 columns). // Public schema layout is data fields followed by partition fields, so [id, name]. StructType schema = table.schema(); assertEquals(2, schema.fields().length, "Persisted entry should override snapshot schema"); diff --git a/spark/v2/src/test/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandlerTest.java b/spark/v2/src/test/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandlerTest.java index 2b01afb1b34..3eb91979e02 100644 --- a/spark/v2/src/test/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandlerTest.java +++ b/spark/v2/src/test/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandlerTest.java @@ -1153,7 +1153,7 @@ public void testGetPersistedMetadata_returnsEmptyWhenLogIsEmpty(@TempDir File te /** Schema-tracking set and the log has a seeded entry → returns that entry. */ @Test - public void testGetPersistedMetadata_returnsSeededEntry(@TempDir File tempDir) { + public void testGetPersistedMetadata_returnsSeededEntry(@TempDir File tempDir) throws Exception { String tablePath = new File(tempDir, "table").getAbsolutePath(); String tableName = "t_" + UUID.randomUUID().toString().replace('-', '_'); createEmptyTestTable(tablePath, tableName); @@ -1165,34 +1165,42 @@ public void testGetPersistedMetadata_returnsSeededEntry(@TempDir File tempDir) { Map options = new HashMap<>(); options.put(DeltaOptions.SCHEMA_TRACKING_LOCATION(), schemaLogPath); - // Open the log through the same code path the util uses, then seed an entry. - DeltaSourceMetadataTrackingLog trackingLog = - MetadataEvolutionHandler.getMetadataTrackingLogForMicroBatchStream( - spark, - snapshot, - options, - snapshotManager, - defaultEngine, - SparkMicroBatchStream.ACTION_SET, - Option.empty(), - /* mergeConsecutiveSchemaChanges= */ false) - .get(); - // Use a non-zero version so it's distinct from a default-init entry. - long seededVersion = 42L; - PersistedMetadata seeded = - PersistedMetadata.apply( - snapshot.getMetadata().getId(), - seededVersion, - new KernelMetadataAdapter(snapshot.getMetadata()), - new KernelProtocolAdapter(snapshot.getProtocol()), - tablePath + "/_delta_log/_streaming_metadata"); - trackingLog.writeNewMetadata(seeded, false); - - Optional result = - MetadataEvolutionHandler.getPersistedMetadataForMicroBatchStream( - spark, snapshot, options, snapshotManager, defaultEngine); - assertTrue(result.isPresent()); - assertEquals(seededVersion, result.get().deltaCommitVersion()); + // Disable consecutive-schema-change merging: this test only verifies that a seeded entry is + // returned. With merging enabled, the merger looks ahead from the seeded version, and the + // artificial seededVersion (below) doesn't exist as a real commit on the table. + withSQLConf( + DeltaSQLConf.DELTA_STREAMING_ENABLE_SCHEMA_TRACKING_MERGE_CONSECUTIVE_CHANGES().key(), + "false", + () -> { + // Open the log through the same code path the util uses, then seed an entry. + DeltaSourceMetadataTrackingLog trackingLog = + MetadataEvolutionHandler.getMetadataTrackingLogForMicroBatchStream( + spark, + snapshot, + options, + snapshotManager, + defaultEngine, + SparkMicroBatchStream.ACTION_SET, + Option.empty(), + /* mergeConsecutiveSchemaChanges= */ false) + .get(); + // Use a non-zero version so it's distinct from a default-init entry. + long seededVersion = 42L; + PersistedMetadata seeded = + PersistedMetadata.apply( + snapshot.getMetadata().getId(), + seededVersion, + new KernelMetadataAdapter(snapshot.getMetadata()), + new KernelProtocolAdapter(snapshot.getProtocol()), + tablePath + "/_delta_log/_streaming_metadata"); + trackingLog.writeNewMetadata(seeded, false); + + Optional result = + MetadataEvolutionHandler.getPersistedMetadataForMicroBatchStream( + spark, snapshot, options, snapshotManager, defaultEngine); + assertTrue(result.isPresent()); + assertEquals(seededVersion, result.get().deltaCommitVersion()); + }); } // --------------------------------------------------------------------------- @@ -1329,4 +1337,122 @@ public void testBuildReadSnapshot_trapsLogReplayAccess(@TempDir File tempDir) { segEx.getMessage().contains("log segment is not available"), "Unexpected message: " + segEx.getMessage()); } + + // --------------------------------------------------------------------------- + // getMergedConsecutiveMetadataChanges + // + // Walks forward from currentMetadata.deltaCommitVersion looking for consecutive + // commits that contain only metadata/protocol changes (no file actions). Returns + // a merged PersistedMetadata at the last such version, or Option.empty if no + // forward progress beyond the current version was possible. + // --------------------------------------------------------------------------- + + /** Builds a {@link PersistedMetadata} snapshot of the table state at v0. */ + private PersistedMetadata buildCurrentMetadataAtV0( + String tablePath, PathBasedSnapshotManager snapshotManager) { + SnapshotImpl v0 = (SnapshotImpl) snapshotManager.loadSnapshotAt(0L); + return PersistedMetadata.apply( + "test-table-id", + 0L, + new KernelMetadataAdapter(v0.getMetadata()), + new KernelProtocolAdapter(v0.getProtocol()), + tablePath + "/_delta_log/_streaming_metadata"); + } + + /** + * -1 sentinel means {@code Option.empty} (no merge). Other values are expected merged version. + */ + private static final long EXPECTED_NO_MERGE = -1L; + + /** + * Creates a v0 table partitioned by {@code id} with column mapping (name) enabled. Partitioning + * exercises {@code partitionSchemaJson} verification; column mapping makes RENAME / DROP COLUMN + * (including renaming the partition column) valid. + */ + private void createPartitionedColumnMappingTable(String tablePath, String tableName) { + spark.sql( + String.format( + "CREATE TABLE %s (id INT, name STRING) USING delta " + + "PARTITIONED BY (id) LOCATION '%s' " + + "TBLPROPERTIES ('delta.columnMapping.mode' = 'name')", + tableName, tablePath)); + } + + private static Stream mergerTestCases() { + String alterAddC3 = "ALTER TABLE %s ADD COLUMNS (c3 INT)"; + String alterRenamePartitionId = "ALTER TABLE %s RENAME COLUMN id TO id_renamed"; + String alterDropNonPartitionName = "ALTER TABLE %s DROP COLUMN name"; + String alterBumpProtocol = + "ALTER TABLE %s SET TBLPROPERTIES " + + "('delta.minReaderVersion' = '3', 'delta.minWriterVersion' = '7')"; + String insert2col = "INSERT INTO %s VALUES (1, 'Alice')"; + String insert3col = "INSERT INTO %s VALUES (1, 'Alice', 10)"; + return Stream.of( + // scenario, postV0Commits, expectedMergedVersion + Arguments.of("noLaterCommits", List.of(), EXPECTED_NO_MERGE), + Arguments.of("nextCommitHasFileAction", List.of(insert2col), EXPECTED_NO_MERGE), + // Renaming the PARTITION column changes both dataSchemaJson and partitionSchemaJson. + Arguments.of("renamePartitionColumn", List.of(alterRenamePartitionId), 1L), + // Dropping a NON-partition column changes dataSchemaJson but not partitionSchemaJson. + Arguments.of("dropNonPartitionColumn", List.of(alterDropNonPartitionName), 1L), + // Schema change (v1) followed by protocol change (v2) — both must merge. + Arguments.of("schemaChangeThenProtocolUpgrade", List.of(alterAddC3, alterBumpProtocol), 2L), + // All three change kinds in one consecutive run: data schema (ADD c3), + // partition schema (RENAME id), and protocol (TBLPROPERTIES bump). + Arguments.of( + "mergesDataSchemaPartitionAndProtocol", + List.of(alterAddC3, alterRenamePartitionId, alterBumpProtocol), + 3L), + // v1=ADD c3, v2=INSERT, v3=DROP name → must stop at v1 (NOT skip to v3) + Arguments.of( + "stopsAtFileActionDoesNotSkipAhead", + List.of(alterAddC3, insert3col, alterDropNonPartitionName), + 1L)); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("mergerTestCases") + public void testGetMergedConsecutive( + String scenario, + List postV0Commits, + long expectedMergedVersion, + @TempDir File tempDir) { + String tablePath = tempDir.getAbsolutePath(); + String tableName = "t_" + UUID.randomUUID().toString().replace('-', '_'); + createPartitionedColumnMappingTable(tablePath, tableName); // v0 + for (String sqlTemplate : postV0Commits) { + spark.sql(String.format(sqlTemplate, tableName)); + } + + PathBasedSnapshotManager snapshotManager = + new PathBasedSnapshotManager(tablePath, spark.sessionState().newHadoopConf()); + PersistedMetadata current = buildCurrentMetadataAtV0(tablePath, snapshotManager); + + Option result = + MetadataEvolutionHandler.getMergedConsecutiveMetadataChanges( + current, snapshotManager, defaultEngine, tablePath, SparkMicroBatchStream.ACTION_SET); + + if (expectedMergedVersion == EXPECTED_NO_MERGE) { + assertTrue(result.isEmpty()); + return; + } + + // Verify the merger's result against the table's actual schema and protocol at the merged + // version. We build the "expected" PersistedMetadata directly from the snapshot at that + // version — if the merger captured the right metadata/protocol actions, the JSONs will match. + assertTrue(result.isDefined()); + SnapshotImpl mergedSnapshot = + (SnapshotImpl) snapshotManager.loadSnapshotAt(expectedMergedVersion); + PersistedMetadata expected = + PersistedMetadata.apply( + "test-table-id", + expectedMergedVersion, + new KernelMetadataAdapter(mergedSnapshot.getMetadata()), + new KernelProtocolAdapter(mergedSnapshot.getProtocol()), + tablePath + "/_delta_log/_streaming_metadata"); + + assertEquals(expected.dataSchemaJson(), result.get().dataSchemaJson()); + assertEquals(expected.partitionSchemaJson(), result.get().partitionSchemaJson()); + assertEquals(expected.protocolJson(), result.get().protocolJson()); + } }