diff --git a/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaV2SourceSchemaEvolutionSuite.scala b/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaV2SourceSchemaEvolutionSuite.scala index 60e4644da4e..80e766f4e45 100644 --- a/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaV2SourceSchemaEvolutionSuite.scala +++ b/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaV2SourceSchemaEvolutionSuite.scala @@ -70,7 +70,6 @@ trait DeltaV2SourceSchemaEvolutionSuiteBase extends V2ForceTest { } } - // TODO(#5319): Move tests to shouldPassTests as V2 schema tracking log support is implemented. override protected def shouldPassTests: Set[String] = Set( // ========== Schema log unit test ========== "schema location not under checkpoint", @@ -93,11 +92,7 @@ trait DeltaV2SourceSchemaEvolutionSuiteBase extends V2ForceTest { "forward-compat: older version can read back newer JSON", // ========== Schema log core ========== - "multiple delta source sharing same schema log is blocked" - ) - - override protected def shouldFailTests: Set[String] = Set( - // ========== Schema log core ========== + "multiple delta source sharing same schema log is blocked", "schema log is applied", "schema log initialization with additive schema changes", "detect incompatible schema change while streaming", @@ -118,16 +113,21 @@ trait DeltaV2SourceSchemaEvolutionSuiteBase extends V2ForceTest { // ========== Schema evolution scenarios ========== "consecutive schema evolutions without schema merging", - "consecutive schema evolutions", "upgrade and downgrade", "multiple sources with schema evolution", "schema evolution with Delta sink", "latestOffset should not progress before schema evolved", - "unblock with sql conf", "schema tracking interacting with unsafe escape flag", - "streaming with a column mapping upgrade", "partition evolution" ) + + // TODO(#5319): Move to PASS after consecutive schema merger is supported + override protected def shouldFailTests: Set[String] = Set( + // ========== Schema log core ========== + "consecutive schema evolutions", + "unblock with sql conf", + "streaming with a column mapping upgrade" + ) } // Non-CDC suites diff --git a/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/columnmapping/RemoveColumnMappingStreamingReadV2Suite.scala b/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/columnmapping/RemoveColumnMappingStreamingReadV2Suite.scala index 44cb20e4065..54b7a6c0c79 100644 --- a/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/columnmapping/RemoveColumnMappingStreamingReadV2Suite.scala +++ b/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/columnmapping/RemoveColumnMappingStreamingReadV2Suite.scala @@ -32,9 +32,8 @@ class RemoveColumnMappingStreamingReadV2Suite override protected def executeDml(sqlText: String): Unit = executeInV1Mode(sqlText) - // Tests that run without schema tracking. These exercise non-additive column-mapping schema - // change detection, which is supported on the V2 connector. override protected def shouldPassTests: Set[String] = Set( + // Tests that run without schema tracking. "Upgrade, StartStreamRead, Downgrade, FailNonAdditiveChange", "Upgrade, Downgrade, StartStreamRead, Success", "StartStreamRead, Upgrade, Rename, Downgrade, FailNonAdditiveChange", @@ -50,18 +49,12 @@ class RemoveColumnMappingStreamingReadV2Suite "Upgrade, Drop, StartStreamRead, Downgrade, FailNonAdditiveChange", "Upgrade, Drop, StartStreamRead, Downgrade, Upgrade, FailNonAdditiveChange", "Upgrade, Rename, Downgrade, StartStreamRead, Success", - "Upgrade, Drop, Downgrade, StartStreamRead, Success" - ) - - // Tests that run with schema tracking enabled. The schema tracking log is not yet supported - // on the V2 connector. - override protected def shouldFailTests: Set[String] = Set( - // TODO(#5319): the three tests are not supported in v2 yet due to the gap of columnMapping - // check util. - "StartStreamRead, Upgrade, Downgrade, SuccessAndFailSchemaTracking", + "Upgrade, Drop, Downgrade, StartStreamRead, Success", "Upgrade, Rename, Downgrade, StartStreamRead, Upgrade, SuccessAndFailSchemaTracking", "Upgrade, Drop, Downgrade, StartStreamRead, Upgrade, SuccessAndFailSchemaTracking", - // TODO(#5319): Move these to shouldPassTests as V2 schema tracking log support is implemented. + "StartStreamRead, Upgrade, Downgrade, SuccessAndFailSchemaTracking", + + // Tests that run with schema tracking enabled. "StartStreamRead, Upgrade, Downgrade, SuccessAndFailSchemaTracking with schema tracking", "Upgrade, StartStreamRead, Downgrade, FailNonAdditiveChange with schema tracking", "Upgrade, Downgrade, StartStreamRead, Success with schema tracking", @@ -88,6 +81,5 @@ class RemoveColumnMappingStreamingReadV2Suite "Upgrade, Rename, Downgrade, StartStreamRead, Upgrade, SuccessAndFailSchemaTracking" + " with schema tracking", "Upgrade, Drop, Downgrade, StartStreamRead, Upgrade, SuccessAndFailSchemaTracking" + - " with schema tracking" - ) + " with schema tracking") } diff --git a/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/typewidening/TypeWideningStreamingV2SourceSuite.scala b/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/typewidening/TypeWideningStreamingV2SourceSuite.scala index a7fe4c583c6..f0170192cc1 100644 --- a/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/typewidening/TypeWideningStreamingV2SourceSuite.scala +++ b/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/typewidening/TypeWideningStreamingV2SourceSuite.scala @@ -25,7 +25,10 @@ import org.apache.spark.sql.delta.typewidening.{ /** * Base trait for V2 type widening streaming source tests. - * Provides common shouldFail logic shared by both suites. + * + * The base lists every test from `TypeWideningStreamingSourceTests` as passing: V2 supports + * type-widening streaming reads. Subclasses move specific tests back to `shouldFailTests` when + * there is a concrete V2 limitation (e.g., partition-column schema bug, missing event logging). */ trait TypeWideningStreamingV2SourceSuiteBase extends V2ForceTest { self: TypeWideningStreamingSourceTestMixin => @@ -34,12 +37,7 @@ trait TypeWideningStreamingV2SourceSuiteBase extends V2ForceTest { override protected def executeDml(sqlText: String): Unit = executeInV1Mode(sqlText) - // TODO(#5319): Move tests to shouldPassTests as V2 schema tracking log support is implemented. - override protected def shouldPassTests: Set[String] = Set.empty[String] - - // Tests from TypeWideningStreamingSourceTests, shared by both suites. - // Override in subclasses to add suite-specific tests. - override protected def shouldFailTests: Set[String] = Set( + override protected def shouldPassTests: Set[String] = Set( "type change - filter", "type change - projection", "type change - projection partition column", @@ -59,45 +57,40 @@ trait TypeWideningStreamingV2SourceSuiteBase extends V2ForceTest { "arbitrary type changes are not supported", "type change in delta source writing to a delta sink" ) -} - -class TypeWideningStreamingV2SourceSuite - extends TypeWideningStreamingSourceSuite - with TypeWideningStreamingV2SourceSuiteBase { - - // All tests pass without schema tracking enabled, except where noted in shouldFailTests. - override protected def shouldPassTests: Set[String] = - super.shouldFailTests -- shouldFailTests + // Failures that affect both the schema-tracking and non-schema-tracking suites. override protected def shouldFailTests: Set[String] = Set( // Delta log event is not supported in V2, so event-logging tests are not meaningful. "schema changed event is logged for type widening", - "schema changed event is not logged when there are no schema changes", - // TODO(#5319): Partition column schema has a bug in V2 causing these to fail. - "type change - projection partition column", - "type change - widen aggregation expression partition column", - // TODO(#5319): V2 lacks the implementation of - // validateAndInitMetadataLogForPlannedBatchesDuringStreamStart, so the - // 2nd testStream restart does not throw on the incompatible type change. - "widening type change then restore back", - "narrowing type changes are not supported", - "arbitrary type changes are not supported" + "schema changed event is not logged when there are no schema changes" ) } +class TypeWideningStreamingV2SourceSuite + extends TypeWideningStreamingSourceSuite + with TypeWideningStreamingV2SourceSuiteBase + class TypeWideningStreamingV2SourceSchemaTrackingSuite extends TypeWideningStreamingSourceSchemaTrackingSuite with TypeWideningStreamingV2SourceSuiteBase { - override protected def shouldFailTests: Set[String] = super.shouldFailTests ++ Set( - // Additional tests from TypeWideningStreamingSourceSchemaTrackingTests - "type change first without schemaTrackingLocation and unblock using schemaTrackingLocation", - "unblocking stream with sql conf after type change - unblock all", - "unblocking stream with sql conf after type change - unblock stream", - "unblocking stream with sql conf after type change - unblock version", - "unblocking stream with reader option after type change - unblock stream", - "unblocking stream with reader option after type change - unblock version", - "overwrite schema with type change and dropped column", - "disable schema tracking log using internal conf" - ) + // Schema-tracking-specific tests from TypeWideningStreamingSourceSchemaTrackingTests, on top of + // the base type-widening tests inherited from the trait, minus tests with known V2 issues. + override protected def shouldPassTests: Set[String] = + super.shouldPassTests ++ Set( + "type change first without schemaTrackingLocation and unblock using schemaTrackingLocation", + "unblocking stream with sql conf after type change - unblock all", + "unblocking stream with sql conf after type change - unblock stream", + "unblocking stream with sql conf after type change - unblock version", + "unblocking stream with reader option after type change - unblock stream", + "unblocking stream with reader option after type change - unblock version", + "overwrite schema with type change and dropped column", + "disable schema tracking log using internal conf" + ) -- shouldFailTests + + // TODO(#5319): Move to PASS after consecutive schema merger is supported + override protected def shouldFailTests: Set[String] = + super.shouldFailTests ++ Set( + "type change in delta source writing to a delta sink" + ) } diff --git a/spark/src/main/resources/error/delta-error-classes.json b/spark/src/main/resources/error/delta-error-classes.json index 9d844b9a2f3..1bd15de7a4b 100644 --- a/spark/src/main/resources/error/delta-error-classes.json +++ b/spark/src/main/resources/error/delta-error-classes.json @@ -2685,15 +2685,6 @@ ], "sqlState" : "42KD4" }, - "DELTA_STREAMING_INCOMPATIBLE_SCHEMA_CHANGE_V2" : { - "message" : [ - "Streaming read is not supported on tables with read-incompatible schema changes (e.g. rename or drop or datatype changes).", - "Non-additive schema change handling is not supported in Delta source v2 yet.", - "For further information and possible next steps to resolve this issue, please review the documentation at ", - "Read schema: . Incompatible data schema: ." - ], - "sqlState" : "42KD4" - }, "DELTA_STREAMING_INITIAL_SNAPSHOT_TOO_LARGE" : { "message" : [ "Initial snapshot for Delta streaming at table '' (version ) contains files, which exceeds the maximum allowed files.", diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala index 5bca996f065..c272cbaef3c 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala @@ -3248,8 +3248,7 @@ trait DeltaErrorsBase spark: SparkSession, readSchema: StructType, incompatibleSchema: StructType, - detectedDuringStreaming: Boolean, - isV2DataSource: Boolean = false): Throwable = { + detectedDuringStreaming: Boolean): Throwable = { val docLink = "/versioning.html#column-mapping" val enableNonAdditiveSchemaEvolution = spark.sessionState.conf.getConf( DeltaSQLConf.DELTA_STREAMING_ENABLE_SCHEMA_TRACKING) @@ -3259,8 +3258,7 @@ trait DeltaErrorsBase generateDocsLinkOption(spark, docLink).getOrElse("-"), enableNonAdditiveSchemaEvolution, additionalProperties = Map( - "detectedDuringStreaming" -> detectedDuringStreaming.toString, - "isV2DataSource" -> isV2DataSource.toString + "detectedDuringStreaming" -> detectedDuringStreaming.toString )) } @@ -4443,9 +4441,7 @@ class DeltaStreamingNonAdditiveSchemaIncompatibleException( val enableNonAdditiveSchemaEvolution: Boolean = false, val additionalProperties: Map[String, String] = Map.empty) extends DeltaUnsupportedOperationException( - errorClass = if (additionalProperties.getOrElse("isV2DataSource", "false") == "true") { - "DELTA_STREAMING_INCOMPATIBLE_SCHEMA_CHANGE_V2" - } else if (enableNonAdditiveSchemaEvolution) { + errorClass = if (enableNonAdditiveSchemaEvolution) { "DELTA_STREAMING_INCOMPATIBLE_SCHEMA_CHANGE_USE_SCHEMA_LOG" } else { "DELTA_STREAMING_INCOMPATIBLE_SCHEMA_CHANGE" diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSchemaEvolutionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSchemaEvolutionSuite.scala index 4b5a70bbcd4..ea35c49e616 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSchemaEvolutionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSchemaEvolutionSuite.scala @@ -844,12 +844,16 @@ trait StreamingSchemaEvolutionSuiteBase extends ColumnMappingStreamingTestUtils test("identity columns shouldn't cause schema mismatches") { withTable("source") { - executeDml( - s""" - |CREATE TABLE source (key INT, id LONG GENERATED ALWAYS AS IDENTITY) - |USING DELTA - """.stripMargin - ) + io.delta.tables.DeltaTable.create() + .tableName("source") + .addColumn( + io.delta.tables.DeltaTable.columnBuilder("key").dataType("INT").build()) + .addColumn( + io.delta.tables.DeltaTable.columnBuilder("id") + .dataType("LONG") + .generatedAlwaysAsIdentity() + .build()) + .execute() val deltaLog = DeltaLog.forTable(spark, TableIdentifier("source")) deltaLog.update() @@ -857,8 +861,8 @@ trait StreamingSchemaEvolutionSuiteBase extends ColumnMappingStreamingTestUtils val checkpointLocation = getDefaultCheckpoint(deltaLog).toString def addData(values: Seq[Int]): Unit = - spark.createDataFrame(values.map(Row(_)).asJava, StructType.fromDDL("key INT")) - .write.format("delta").mode("append").saveAsTable("source") + executeDml( + s"INSERT INTO source (key) VALUES ${values.map(v => s"($v)").mkString(", ")}") def readStream(): DataFrame = spark.readStream diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandler.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandler.java index 32d01b9804c..430762a4c61 100644 --- a/spark/v2/src/main/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandler.java +++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandler.java @@ -20,16 +20,26 @@ import io.delta.kernel.internal.SnapshotImpl; import io.delta.kernel.internal.actions.Metadata; import io.delta.kernel.internal.actions.Protocol; +import io.delta.kernel.internal.checksum.CRCInfo; +import io.delta.kernel.internal.lang.Lazy; +import io.delta.kernel.internal.metrics.SnapshotQueryContext; +import io.delta.kernel.internal.replay.LogReplay; +import io.delta.kernel.internal.snapshot.LogSegment; import io.delta.kernel.internal.util.Utils; +import io.delta.kernel.internal.util.VectorUtils; +import io.delta.kernel.types.StringType; import io.delta.kernel.utils.CloseableIterator; import io.delta.kernel.utils.CloseableIterator.BreakableFilterResult; import io.delta.spark.internal.v2.adapters.KernelMetadataAdapter; import io.delta.spark.internal.v2.adapters.KernelProtocolAdapter; import io.delta.spark.internal.v2.snapshot.DeltaSnapshotManager; import io.delta.spark.internal.v2.utils.ScalaUtils; +import io.delta.spark.internal.v2.utils.SchemaUtils; import io.delta.spark.internal.v2.utils.StreamingHelper; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -188,7 +198,7 @@ public CloseableIterator stopIndexedFileIteratorAtSchemaChangeBarri * DeltaSourceMetadataEvolutionSupport.getMetadataOrProtocolChangeIndexedFileIterator}. */ public CloseableIterator getMetadataOrProtocolChangeIndexedFileIterator( - Metadata metadata, Protocol protocol, long version) { + @Nullable Metadata metadata, @Nullable Protocol protocol, long version) { if (shouldTrackMetadataChange() && hasMetadataOrProtocolChangeComparedToStreamMetadata(metadata, protocol, version)) { return Utils.toCloseableIterator( @@ -356,7 +366,7 @@ public void initializeMetadataTrackingAndExitStream( /** Delegates to the shared static method in {@code DeltaSourceMetadataEvolutionSupport}. */ private boolean hasMetadataOrProtocolChangeComparedToStreamMetadata( - Metadata newMetadata, Protocol newProtocol, long newSchemaVersion) { + @Nullable Metadata newMetadata, @Nullable Protocol newProtocol, long newSchemaVersion) { Option metadataOpt = newMetadata != null ? Option.apply((AbstractMetadata) new KernelMetadataAdapter(newMetadata)) @@ -543,6 +553,105 @@ public static boolean shouldPropagateSchemaTrackingToTable( return inReadOptions && !inTableOptions; } + /** + * Builds the effective read snapshot by overlaying {@code customMetadata} from the + * schema-tracking log onto {@code snapshotAtSourceInit}: it reports the persisted schema, + * configuration, protocol, and commit version so downstream reads see the evolved state. + * + *

The returned snapshot is intentionally not consistent for log-replay-driven APIs. + * Its reported {@code version} is the persisted commit version (often older than {@code + * snapshotAtSourceInit.getVersion()}), but there is no aligned log segment available without an + * extra log read. To prevent silent reads against the wrong version, the {@code lazyLogSegment} + * and {@code lazyCrcInfo} fields throw on access. Callers must only read {@code metadata}, {@code + * protocol}, {@code schema}, {@code version}, and {@code dataPath} from the returned snapshot; + * anything that triggers log replay (e.g. {@code getCurrentCrcInfo}, {@code getScanBuilder}) must + * call {@link DeltaSnapshotManager#loadSnapshotAt} instead. + */ + public static SnapshotImpl buildReadSnapshotFromPersistedMetadata( + SnapshotImpl snapshotAtSourceInit, Engine engine, PersistedMetadata customMetadata) { + Metadata sourceMetadata = snapshotAtSourceInit.getMetadata(); + + Map readConfigurations; + if (customMetadata.tableConfigurations().isDefined()) { + readConfigurations = ScalaUtils.toJavaMap(customMetadata.tableConfigurations().get()); + } else { + readConfigurations = sourceMetadata.getConfiguration(); + logger.warn("Using snapshot's table configuration: {}", readConfigurations); + } + + Metadata readMetadata = + new Metadata( + sourceMetadata.getId(), + sourceMetadata.getName(), + sourceMetadata.getDescription(), + sourceMetadata.getFormat(), + customMetadata.dataSchemaJson(), + SchemaUtils.convertSparkSchemaToKernelSchema(customMetadata.dataSchema()), + VectorUtils.buildArrayValue( + Arrays.asList(customMetadata.partitionSchema().fieldNames()), StringType.STRING), + sourceMetadata.getCreatedTime(), + VectorUtils.stringStringMapValue(readConfigurations)); + + Protocol readProtocol; + if (customMetadata.protocol().isDefined()) { + readProtocol = toKernelProtocol(customMetadata.protocol().get()); + } else { + readProtocol = snapshotAtSourceInit.getProtocol(); + logger.warn("Using snapshot's protocol: {}", readProtocol); + } + + // Trap log segment / crc: the synthetic snapshot's version does not match the source-init + // snapshot's log segment, so resolving either would read against the wrong version. Today no + // caller exercises log replay on this snapshot; trapping makes any future regression fail + // loudly instead of silently corrupting reads. + Lazy trapLazyLogSegment = + new Lazy<>( + () -> { + throw new IllegalStateException( + "log segment is not available on the synthetic read snapshot built from " + + "PersistedMetadata"); + }); + Lazy> trapLazyCrcInfo = + new Lazy<>( + () -> { + throw new IllegalStateException( + "CRC info is not available on the synthetic read snapshot built from " + + "PersistedMetadata"); + }); + LogReplay logReplay = + new LogReplay( + engine, snapshotAtSourceInit.getDataPath(), trapLazyLogSegment, trapLazyCrcInfo); + + return new SnapshotImpl( + snapshotAtSourceInit.getDataPath(), + customMetadata.deltaCommitVersion(), + trapLazyLogSegment, + logReplay, + readProtocol, + readMetadata, + snapshotAtSourceInit.getCommitter(), + SnapshotQueryContext.forVersionSnapshot( + snapshotAtSourceInit.getDataPath().toString(), customMetadata.deltaCommitVersion()), + Optional.empty() /* inCommitTimestampOpt */); + } + + private static Protocol toKernelProtocol( + org.apache.spark.sql.delta.actions.Protocol sparkProtocol) { + Set readerFeatures = + sparkProtocol.getReaderFeatures() == null + ? Collections.emptySet() + : new HashSet<>(sparkProtocol.getReaderFeatures()); + Set writerFeatures = + sparkProtocol.getWriterFeatures() == null + ? Collections.emptySet() + : new HashSet<>(sparkProtocol.getWriterFeatures()); + return new Protocol( + sparkProtocol.getMinReaderVersion(), + sparkProtocol.getMinWriterVersion(), + readerFeatures, + writerFeatures); + } + /** * Builds the tracking log from streaming options: empty when {@code schemaTrackingLocation} is * unset; throws when it is set but schema tracking is disabled in config. diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java index 54ee9a3d280..19417d1915a 100644 --- a/spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java +++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkMicroBatchStream.java @@ -34,6 +34,7 @@ import io.delta.kernel.internal.actions.AddFile; import io.delta.kernel.internal.actions.CommitInfo; import io.delta.kernel.internal.actions.Metadata; +import io.delta.kernel.internal.actions.Protocol; import io.delta.kernel.internal.actions.RemoveFile; import io.delta.kernel.internal.util.ColumnMapping; import io.delta.kernel.internal.util.ColumnMapping.ColumnMappingMode; @@ -41,6 +42,7 @@ import io.delta.kernel.internal.util.Utils; import io.delta.kernel.internal.util.VectorUtils; import io.delta.kernel.utils.CloseableIterator; +import io.delta.spark.internal.v2.adapters.KernelMetadataAdapter; import io.delta.spark.internal.v2.snapshot.DeltaSnapshotManager; import io.delta.spark.internal.v2.utils.PartitionUtils; import io.delta.spark.internal.v2.utils.ScalaUtils; @@ -54,13 +56,14 @@ import java.util.*; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import javax.annotation.Nullable; import org.apache.hadoop.conf.Configuration; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.expressions.Literal$; import org.apache.spark.sql.connector.read.InputPartition; import org.apache.spark.sql.connector.read.PartitionReaderFactory; import org.apache.spark.sql.connector.read.streaming.*; -import org.apache.spark.sql.delta.DeltaColumnMapping; +import org.apache.spark.sql.delta.DeltaColumnMapping$; import org.apache.spark.sql.delta.DeltaErrors; import org.apache.spark.sql.delta.DeltaOptions; import org.apache.spark.sql.delta.DeltaStartingVersion; @@ -71,9 +74,11 @@ import org.apache.spark.sql.delta.sources.AdmittableFile; import org.apache.spark.sql.delta.sources.DeltaSQLConf; import org.apache.spark.sql.delta.sources.DeltaSource; +import org.apache.spark.sql.delta.sources.DeltaSourceMetadataTrackingLog; import org.apache.spark.sql.delta.sources.DeltaSourceOffset; import org.apache.spark.sql.delta.sources.DeltaSourceOffset$; import org.apache.spark.sql.delta.sources.DeltaStreamUtils; +import org.apache.spark.sql.delta.sources.PersistedMetadata; import org.apache.spark.sql.execution.datasources.PartitionedFile; import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.sources.Filter; @@ -115,6 +120,7 @@ public class SparkMicroBatchStream DeltaAction.ADD, DeltaAction.REMOVE, DeltaAction.METADATA, + DeltaAction.PROTOCOL, DeltaAction.CDC, DeltaAction.COMMITINFO))); @@ -134,6 +140,7 @@ public class SparkMicroBatchStream private final boolean skipChangeCommits; private final SnapshotImpl snapshotAtSourceInit; private final String tableId; + private final SnapshotImpl readSnapshotAtSourceInit; private final StructType readSchemaAtSourceInit; private final boolean shouldValidateOffsets; private final Optional excludeRegex; @@ -148,6 +155,9 @@ public class SparkMicroBatchStream private final SQLConf sqlConf; private final scala.collection.immutable.Map scalaOptions; + /** Handler for non-additive schema evolution (rename/drop/type widening). */ + private final MetadataEvolutionHandler metadataEvolutionHandler; + /** * Tracks whether this is the first batch for this stream (no checkpointed offset). * @@ -218,7 +228,9 @@ public SparkMicroBatchStream( StructType readDataSchema, StructType ddlOrderedReadOutputSchema, Filter[] dataFilters, - scala.collection.immutable.Map scalaOptions) { + scala.collection.immutable.Map scalaOptions, + Option metadataTrackingLog, + String metadataPath) { this.snapshotManager = Objects.requireNonNull(snapshotManager, "snapshotManager is null"); this.hadoopConf = Objects.requireNonNull(hadoopConf, "hadoopConf is null"); this.spark = Objects.requireNonNull(spark, "spark is null"); @@ -247,11 +259,29 @@ public SparkMicroBatchStream( this.snapshotAtSourceInit = (SnapshotImpl) snapshotAtSourceInit; this.tableId = this.snapshotAtSourceInit.getMetadata().getId(); - // TODO(#5319): schema tracking for non-additive schema changes + + // The effective snapshot for reading, mirroring v1's readSnapshotDescriptor. When schema + // tracking has a persisted entry, layer it onto the freshly loaded snapshotAtSourceInit so + // the read schema/protocol/config reflect the evolved state. The merged entry has already + // been written to the durable schema log during analysis (by SparkTable's call to + // getMetadataTrackingLogForMicroBatchStream with mergeConsecutiveSchemaChanges=true), so + // reading it back from the log here yields the same value V1 obtains. + Option persistedMetadataAtSourceInit = + metadataTrackingLog.isDefined() + ? metadataTrackingLog.get().getCurrentTrackedMetadata() + : Option.empty(); + this.readSnapshotAtSourceInit = + Objects.requireNonNull( + persistedMetadataAtSourceInit.isDefined() + ? MetadataEvolutionHandler.buildReadSnapshotFromPersistedMetadata( + this.snapshotAtSourceInit, engine, persistedMetadataAtSourceInit.get()) + : this.snapshotAtSourceInit, + "readSnapshotAtSourceInit is null"); this.readSchemaAtSourceInit = Objects.requireNonNull( - SchemaUtils.convertKernelSchemaToSparkSchema(snapshotAtSourceInit.getSchema()), + SchemaUtils.convertKernelSchemaToSparkSchema(readSnapshotAtSourceInit.getSchema()), "readSchemaAtSourceInit is null"); + this.shouldValidateOffsets = Objects.requireNonNull( (Boolean) @@ -277,13 +307,29 @@ public SparkMicroBatchStream( DeltaStreamUtils.SchemaReadOptions$.MODULE$.fromSparkSession( spark, isStreamingFromColumnMappingTable, isTypeWideningSupportedInProtocol), "schemaReadOptions is null"); + this.metadataEvolutionHandler = + new MetadataEvolutionHandler( + spark, + tableId, + this.tablePath, + snapshotManager, + engine, + options, + schemaReadOptions, + metadataTrackingLog, + readSnapshotAtSourceInit.getMetadata(), + readSnapshotAtSourceInit.getProtocol(), + metadataPath); boolean shouldValidateSchemaOnRestart = (Boolean) spark .sessionState() .conf() .getConf(DeltaSQLConf.STREAMING_SCHEMA_VALIDATION_ON_RESTART()); - if (shouldValidateSchemaOnRestart) { + // When schema tracking is enabled, the MetadataEvolutionHandler manages schema changes via + // the schema log; the analysis-vs-snapshot mismatch this check guards against is expected on + // restart and is surfaced via the schema-log evolution exception instead. + if (shouldValidateSchemaOnRestart && !metadataEvolutionHandler.shouldTrackMetadataChange()) { validateSchemaCompatibilityOnStartup(dataSchema, partitionSchema, readSchemaAtSourceInit); } } @@ -426,7 +472,40 @@ private Optional getNextOffsetFromPreviousOffset( DeltaSourceOffset previousOffset, Optional limits, boolean isFirstBatch) { - // TODO(#5319): Special handling for schema tracking. + // Initialize schema tracking log eagerly if possible (fresh stream, first batch). + // This mirrors v1's getStartingOffsetFromSpecificDeltaVersion eager init path. + if (isFirstBatch) { + if (metadataEvolutionHandler.shouldInitializeMetadataTrackingEagerly()) { + metadataEvolutionHandler.initializeMetadataTrackingAndExitStream( + previousOffset.reservoirVersion(), + /* batchEndVersion= */ null, + /* alwaysFailUponLogInitialized= */ false); + } + } else { + long startVersionForMetadataLogInit; + if (previousOffset.index() == DeltaSourceOffset.BASE_INDEX()) { + startVersionForMetadataLogInit = previousOffset.reservoirVersion() - 1; + } else { + startVersionForMetadataLogInit = previousOffset.reservoirVersion(); + } + if (metadataEvolutionHandler.shouldInitializeMetadataTrackingEagerly()) { + metadataEvolutionHandler.initializeMetadataTrackingAndExitStream( + startVersionForMetadataLogInit, + /* batchEndVersion= */ null, + /* alwaysFailUponLogInitialized= */ false); + } + checkReadIncompatibleSchemaChangeOnStreamStartOnce(startVersionForMetadataLogInit, null); + } + + // Handle pending schema change offsets (two-barrier protocol). + if (metadataEvolutionHandler.shouldTrackMetadataChange()) { + Optional pendingOffset = + metadataEvolutionHandler.getNextOffsetFromPreviousOffsetIfPendingSchemaChange( + previousOffset); + if (pendingOffset.isPresent()) { + return pendingOffset; + } + } CloseableIterator changes = getFileChangesWithRateLimit( @@ -473,6 +552,10 @@ public InputPartition[] planInputPartitions(Offset start, Offset end) { long fromIndex = startOffset.index(); boolean isInitialSnapshot = startOffset.isInitialSnapshot(); + // Validate and lazily initialize the metadata tracking log for planned batches during + // stream (re)start. Mirrors v1's validateAndInitMetadataLogForPlannedBatchesDuringStreamStart. + validateAndInitMetadataLogForPlannedBatchesDuringStreamStart(fromVersion, endOffset); + List partitionedFiles = new ArrayList<>(); long totalBytesToRead = 0; boolean isCDC = options.readChangeFeed(); @@ -516,8 +599,10 @@ public InputPartition[] planInputPartitions(Offset start, Offset end) { @Override public PartitionReaderFactory createReaderFactory() { + // Use readSnapshotAtSourceInit (which has the evolved schema when schema tracking is active), + // mirroring v1's use of readSnapshotDescriptor in createDataFrame. return PartitionUtils.createDeltaParquetReaderFactory( - snapshotAtSourceInit, + readSnapshotAtSourceInit, dataSchema, partitionSchema, readDataSchema, @@ -555,7 +640,10 @@ private PartitionedFile buildPartitionedFile( @Override public void commit(Offset end) { - // TODO(#5319): update metadata tracking log. + // IMPORTANT: for future developers, please place any work you would like to do in commit() + // before updateMetadataTrackingLogAndFailTheStreamIfNeeded as it may throw an exception. + DeltaSourceOffset offset = DeltaSourceOffset.apply(tableId, end); + metadataEvolutionHandler.updateMetadataTrackingLogAndFailTheStreamIfNeeded(offset); } @Override @@ -752,8 +840,10 @@ CloseableIterator getFileChangesWithRateLimit( changes = changes.takeWhile(admissionLimits::admit); } - // TODO(#5318): Stop at schema change barriers - return changes; + // Stop before any schema change barrier if detected. + // Mirrors v1's stopIndexedFileIteratorAtSchemaChangeBarrier call in + // getFileChangesWithRateLimit. + return metadataEvolutionHandler.stopIndexedFileIteratorAtSchemaChangeBarrier(changes); } /** @@ -958,6 +1048,11 @@ private CloseableIterator filterDeltaLogsWithRateLimitForCDC( /** Resolves the version range and returns commit actions from delta logs. */ private CloseableIterator getCommitsFromRange( long startVersion, Optional endOffset, Set actionSet) { + // Schema tracking needs every metadata-change commit; failOnDataLoss=false may skip one. + Preconditions.checkArgument( + options.failOnDataLoss() || !metadataEvolutionHandler.shouldTrackMetadataChange(), + "Using schema from schema tracking log cannot tolerate missing commit files."); + Optional endVersionOpt = endOffset.isPresent() ? Optional.of(endOffset.get().reservoirVersion()) : Optional.empty(); @@ -1050,35 +1145,47 @@ private CloseableIterator processCommitToIndexedFiles( // commit, downstream would produce incorrect results. // // TODO(#5318): consider caching the commit actions to avoid reading the same commit twice. - // TODO(#5319): don't verify metadata action when schema tracking is enabled - boolean shouldSkipCommit = + // When schema tracking is enabled, skip per-commit metadata verification — the handler + // manages schema changes through the barrier protocol instead. + boolean trackingMetadataChange = metadataEvolutionHandler.shouldTrackMetadataChange(); + CommitValidationResult validationResult = validateCommitAndDecideSkipping( commit, version, startVersion, snapshotAtSourceInit.getPath(), endOffsetOpt, - /* verifyMetadataAction= */ true); + /* verifyMetadataAction= */ !trackingMetadataChange); // Second pass: Build a lazy iterator of IndexedFiles. // - // BEGIN (BASE_INDEX) + actual file actions + END (END_INDEX) + // BEGIN (BASE_INDEX) + [metadata barrier] + actual file actions + END (END_INDEX) // // These sentinel IndexedFiles have null file actions and are used for proper offset // tracking: // - BASE_INDEX: marks "before any files in this version", allowing the offset to // reference the start of a version. + // - METADATA_CHANGE_INDEX: schema change barrier (stops offset generation here). // - END_INDEX: marks end of version, triggers version advancement in // buildOffsetFromIndexedFile to skip re-reading completed versions. // // See DeltaSource.addBeginAndEndIndexOffsetsForVersion for the Scala equivalent. CloseableIterator fileActions = - shouldSkipCommit + validationResult.shouldSkipCommit ? Utils.toCloseableIterator(Collections.emptyIterator()) : getFilesFromCommit(commit, version); + + // If metadata tracking is active, inject schema change barrier sentinel before file + // actions. This mirrors v1's getMetadataOrProtocolChangeIndexedFileIterator concatenation + // in filterAndGetIndexedFiles. + CloseableIterator metadataBarrier = + metadataEvolutionHandler.getMetadataOrProtocolChangeIndexedFileIterator( + validationResult.metadataAction, validationResult.protocolAction, version); + CloseableIterator inner = Utils.singletonCloseableIterator( IndexedFile.sentinel(version, DeltaSourceOffset.BASE_INDEX())) + .combine(metadataBarrier) .combine(fileActions) .combine( Utils.singletonCloseableIterator( @@ -1136,6 +1243,22 @@ private CloseableIterator getFilesFromCommit(CommitActions commit, }); } + /** Result of commit validation, including skip decision and detected metadata/protocol. */ + private static class CommitValidationResult { + final boolean shouldSkipCommit; + @Nullable final Metadata metadataAction; + @Nullable final Protocol protocolAction; + + CommitValidationResult( + boolean shouldSkipCommit, + @Nullable Metadata metadataAction, + @Nullable Protocol protocolAction) { + this.shouldSkipCommit = shouldSkipCommit; + this.metadataAction = metadataAction; + this.protocolAction = protocolAction; + } + } + /** * Validates a commit, fail the stream if it's invalid and decides whether to skip it. Mimics * DeltaSource.validateCommitAndDecideSkipping in Scala. @@ -1146,10 +1269,10 @@ private CloseableIterator getFilesFromCommit(CommitActions commit, * @param tablePath the path to the Delta table * @param endOffsetOpt optional end offset for boundary checking * @param verifyMetadataAction Whether to verify metadata action compatibility - * @return true if the commit should be skipped (no AddFiles emitted), false otherwise + * @return validation result containing skip decision and detected metadata/protocol actions * @throws RuntimeException if the commit is invalid. */ - private boolean validateCommitAndDecideSkipping( + private CommitValidationResult validateCommitAndDecideSkipping( CommitActions commit, long version, long batchStartVersion, @@ -1161,7 +1284,7 @@ private boolean validateCommitAndDecideSkipping( DeltaSourceOffset endOffset = endOffsetOpt.get(); if (endOffset.reservoirVersion() == version && endOffset.index() == DeltaSourceOffset.BASE_INDEX()) { - return false; + return new CommitValidationResult(false, null, null); } } @@ -1176,6 +1299,7 @@ private boolean validateCommitAndDecideSkipping( boolean hasFileAdd = false; boolean shouldSkipCommit = false; Metadata metadataAction = null; + Protocol protocolAction = null; String removeFileActionPath = null; String operation = null; @@ -1213,6 +1337,12 @@ private boolean validateCommitAndDecideSkipping( verifyMetadataAction); } + // Track Protocol for schema evolution barrier detection. + Optional protocolOpt = StreamingHelper.getProtocol(batch, rowId); + if (protocolOpt.isPresent()) { + protocolAction = protocolOpt.get(); + } + // Track CommitInfo for operation details in error messages. Optional commitInfoOpt = StreamingHelper.getCommitInfo(batch, rowId); if (commitInfoOpt.isPresent()) { @@ -1242,7 +1372,7 @@ private boolean validateCommitAndDecideSkipping( } } - return shouldSkipCommit; + return new CommitValidationResult(shouldSkipCommit, metadataAction, protocolAction); } /** @@ -1307,12 +1437,7 @@ private void checkReadIncompatibleSchemaChanges( .toSeq(); } - checkNonAdditiveSchemaChanges( - oldMetadata, - newMetadata, - oldPartitionColumns, - newPartitionColumns, - validatedDuringStreamStart); + checkNonAdditiveSchemaChanges(oldMetadata, newMetadata, validatedDuringStreamStart); // Other standard read compatibility changes if (!validatedDuringStreamStart @@ -1350,17 +1475,11 @@ private void checkReadIncompatibleSchemaChanges( } } - // TODO(#5319): schema tracking for non-additive schema changes - // TODO(#5319): Extract the entire non-additive schema check into a static utility and share it - // with v1 by refactoring DeltaColumnMapping.hasNoColumnMappingSchemaChanges so it can be reused - // by both v1 and v2. - // Non-additive schema changes include rename column, drop column and change column type + // Non-additive schema changes include rename column, drop column and change column type. + // When schema tracking is enabled, this method is NOT called — the MetadataEvolutionHandler + // manages these changes through the barrier protocol instead. private void checkNonAdditiveSchemaChanges( - Metadata oldMetadata, - Metadata newMetadata, - Seq oldPartitionColumns, - Seq newPartitionColumns, - boolean validatedDuringStreamStart) { + Metadata oldMetadata, Metadata newMetadata, boolean validatedDuringStreamStart) { StructType sparkNewSchema = SchemaUtils.convertKernelSchemaToSparkSchema(newMetadata.getSchema()); StructType sparkOldSchema = @@ -1377,41 +1496,45 @@ private void checkNonAdditiveSchemaChanges( } else if (schemaReadOptions.allowUnsafeStreamingReadOnColumnMappingSchemaChanges()) { shouldTrackSchema = false; } else { - ColumnMappingMode NONE = ColumnMappingMode.NONE; - ColumnMappingMode oldMode = - ColumnMapping.getColumnMappingMode(oldMetadata.getConfiguration()); - ColumnMappingMode newMode = - ColumnMapping.getColumnMappingMode(newMetadata.getConfiguration()); - if (oldMode != NONE && newMode != NONE) { - Preconditions.checkArgument(oldMode == newMode, "changing mode is not supported"); - shouldTrackSchema = - DeltaColumnMapping.hasColMappingOrPartitionSchemaChange( - sparkNewSchema, - sparkOldSchema, - newPartitionColumns, - oldPartitionColumns, - /* isBothColumnMappingEnabled */ true); - } else if (oldMode == NONE && newMode != NONE) { - // TODO(#5319): We should disallow user to upgrade column mapping mode for now since we - // don't support schema tracking - shouldTrackSchema = true; - } else { - // Prohibit reading across a downgrade. - shouldTrackSchema = oldMode != NONE && newMode == NONE; - } + // Delegate to the shared utility which handles all column mapping mode transitions + // (both enabled, upgrade from none, downgrade to none). + shouldTrackSchema = + !DeltaColumnMapping$.MODULE$.hasNoColumnMappingSchemaChanges( + new KernelMetadataAdapter(newMetadata), + new KernelMetadataAdapter(oldMetadata), + schemaReadOptions.allowUnsafeStreamingReadOnPartitionColumnChanges()); } if (shouldTrackSchema) { throw (RuntimeException) DeltaErrors.blockStreamingReadsWithIncompatibleNonAdditiveSchemaChanges( - spark, - sparkOldSchema, - sparkNewSchema, - !validatedDuringStreamStart, - /* isV2DataSource= */ true); + spark, sparkOldSchema, sparkNewSchema, !validatedDuringStreamStart); } } + /** + * Validate and lazily initialize the metadata tracking log for planned batches during stream + * (re)start. Mirrors v1's {@code validateAndInitMetadataLogForPlannedBatchesDuringStreamStart}. + * + *

Called from {@code planInputPartitions} (the v2 equivalent of v1's {@code getBatch}). + */ + private void validateAndInitMetadataLogForPlannedBatchesDuringStreamStart( + long startVersion, DeltaSourceOffset endOffset) { + long endVersionForMetadataLogInit = + endOffset.index() == DeltaSourceOffset.BASE_INDEX() + ? endOffset.reservoirVersion() - 1 + : endOffset.reservoirVersion(); + + // For eager initialization, we initialize the log right now. + if (metadataEvolutionHandler.shouldInitializeMetadataTrackingEagerly()) { + metadataEvolutionHandler.initializeMetadataTrackingAndExitStream( + startVersion, endVersionForMetadataLogInit, /* alwaysFailUponLogInitialized= */ false); + } + + // Check for column mapping + streaming incompatible schema changes + checkReadIncompatibleSchemaChangeOnStreamStartOnce(startVersion, endVersionForMetadataLogInit); + } + /** * Check read-incompatible schema changes during stream (re)start so we could fail fast. * @@ -1436,7 +1559,9 @@ private void checkNonAdditiveSchemaChanges( */ private void checkReadIncompatibleSchemaChangeOnStreamStartOnce( long batchStartVersion, Long batchEndVersion) { - // TODO(#5319): skip if enable schema tracking log + // When schema tracking is enabled, the MetadataEvolutionHandler manages schema changes + // through the barrier protocol — skip the traditional compatibility check. + if (metadataEvolutionHandler.shouldTrackMetadataChange()) return; if (hasCheckedReadIncompatibleSchemaChangesOnStreamStart) return; @@ -1594,7 +1719,17 @@ private InitialSnapshotCache getSnapshotFiles(long version) { /** Loads snapshot files at the specified version. */ private InitialSnapshotCache loadAndValidateSnapshot(long version) { - Snapshot snapshot = snapshotManager.loadSnapshotAt(version); + SnapshotImpl snapshot = (SnapshotImpl) snapshotManager.loadSnapshotAt(version); + // If schema tracking is already active and the initial snapshot has advanced since the tracked + // read snapshot, replace the tracked metadata/protocol before reading snapshot files. + if (metadataEvolutionHandler.shouldTrackMetadataChange() + && snapshot.getVersion() >= readSnapshotAtSourceInit.getVersion()) { + metadataEvolutionHandler.updateMetadataTrackingLogAndFailTheStreamIfNeeded( + snapshot.getMetadata(), + snapshot.getProtocol(), + snapshot.getVersion(), + /* replace= */ true); + } long commitTimestamp = snapshot.getTimestamp(engine); Scan scan = snapshot.getScanBuilder().build(); @@ -1924,7 +2059,8 @@ private Metadata validateMetadata( existing == null, "Should not encounter two metadata actions in the same commit of version %d", version); - // TODO(#5319): don't verify metadata action when schema tracking is enabled + // When schema tracking is enabled, verifyMetadataAction is false — the handler manages + // schema changes through the barrier protocol instead. if (verifyMetadataAction) { Long batchEndVersion = endOffsetOpt.map(DeltaSourceOffset::reservoirVersion).orElse(null); checkReadIncompatibleSchemaChanges( diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkScan.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkScan.java index 0fa97ded986..c1b00a23dc3 100644 --- a/spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkScan.java +++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/read/SparkScan.java @@ -48,6 +48,7 @@ import org.apache.spark.sql.connector.read.colstats.ColumnStatistics; import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; import org.apache.spark.sql.delta.DeltaOptions; +import org.apache.spark.sql.delta.sources.DeltaSourceMetadataTrackingLog; import org.apache.spark.sql.execution.datasources.*; import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils; import org.apache.spark.sql.internal.SQLConf; @@ -56,6 +57,7 @@ import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import scala.Option; /** Spark DSV2 Scan implementation backed by Delta Kernel. */ public class SparkScan implements Scan, SupportsReportStatistics, SupportsRuntimeV2Filtering { @@ -78,17 +80,17 @@ public class SparkScan implements Scan, SupportsReportStatistics, SupportsRuntim DeltaOptions.CDC_READ_OPTION_LEGACY(), DeltaOptions.SCHEMA_TRACKING_LOCATION(), DeltaOptions.SCHEMA_TRACKING_LOCATION_ALIAS(), - DeltaOptions.STREAMING_SOURCE_TRACKING_ID())); + DeltaOptions.STREAMING_SOURCE_TRACKING_ID(), + DeltaOptions.ALLOW_SOURCE_COLUMN_DROP(), + DeltaOptions.ALLOW_SOURCE_COLUMN_RENAME(), + DeltaOptions.ALLOW_SOURCE_COLUMN_TYPE_CHANGE())); private static final Set UNSUPPORTED_STREAMING_OPTIONS = Collections.unmodifiableSet( new HashSet<>( Arrays.asList( DeltaOptions.CDC_END_VERSION().toLowerCase(), - DeltaOptions.CDC_END_TIMESTAMP().toLowerCase(), - DeltaOptions.ALLOW_SOURCE_COLUMN_DROP().toLowerCase(), - DeltaOptions.ALLOW_SOURCE_COLUMN_RENAME().toLowerCase(), - DeltaOptions.ALLOW_SOURCE_COLUMN_TYPE_CHANGE().toLowerCase()))); + DeltaOptions.CDC_END_TIMESTAMP().toLowerCase()))); private final DeltaSnapshotManager snapshotManager; private final Snapshot initialSnapshot; @@ -245,15 +247,33 @@ public Batch toBatch() { @Override public MicroBatchStream toMicroBatchStream(String checkpointLocation) { validateStreamingOptions(deltaOptions); + + // Loads a fresh snapshot as the baseline for schema change detection and table identity + // checks. SparkScan's initialSnapshot is from analysis time and may be stale by stream + // start/restart. + // Matches V1's DeltaDataSource.createSource() behavior. + Snapshot latestSnapshot = snapshotManager.loadLatestSnapshot(); + SparkSession spark = SparkSession.active(); + + // Create metadata tracking log for non-additive schema evolution support. + // Mirrors V1's DeltaDataSource.getMetadataTrackingLogForDeltaSource(). At execution time the + // merger is gated off (mergeConsecutiveSchemaChanges=false) — that fold only runs at analysis. + Option metadataTrackingLog = + MetadataEvolutionHandler.getMetadataTrackingLogForMicroBatchStream( + spark, + (io.delta.kernel.internal.SnapshotImpl) latestSnapshot, + options, + snapshotManager, + DefaultEngine.create(hadoopConf), + SparkMicroBatchStream.ACTION_SET, + Option.apply(checkpointLocation), + /* mergeConsecutiveSchemaChanges= */ false); + return new SparkMicroBatchStream( snapshotManager, - // Loads a fresh snapshot as the baseline for schema change detection and table identity - // checks. SparkScan's initialSnapshot is from analysis time and may be stale by stream - // start/restart. - // Matches V1's DeltaDataSource.createSource() behavior. - snapshotManager.loadLatestSnapshot(), + latestSnapshot, hadoopConf, - SparkSession.active(), + spark, deltaOptions, getTablePath(), dataSchema, @@ -261,7 +281,9 @@ public MicroBatchStream toMicroBatchStream(String checkpointLocation) { readDataSchema, ddlOrderedReadOutputSchema, dataFilters != null ? dataFilters : new Filter[0], - scalaOptions != null ? scalaOptions : scala.collection.immutable.Map$.MODULE$.empty()); + scalaOptions != null ? scalaOptions : scala.collection.immutable.Map$.MODULE$.empty(), + metadataTrackingLog, + checkpointLocation); } @Override diff --git a/spark/v2/src/test/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandlerTest.java b/spark/v2/src/test/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandlerTest.java index 12763ae2d13..2b01afb1b34 100644 --- a/spark/v2/src/test/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandlerTest.java +++ b/spark/v2/src/test/java/io/delta/spark/internal/v2/read/MetadataEvolutionHandlerTest.java @@ -1194,4 +1194,139 @@ public void testGetPersistedMetadata_returnsSeededEntry(@TempDir File tempDir) { assertTrue(result.isPresent()); assertEquals(seededVersion, result.get().deltaCommitVersion()); } + + // --------------------------------------------------------------------------- + // buildReadSnapshotFromPersistedMetadata + // --------------------------------------------------------------------------- + + /** Loads the latest snapshot from a fresh empty Delta table created at {@code tempDir/table}. */ + private SnapshotImpl loadSourceSnapshot(File tempDir) { + String tablePath = new File(tempDir, "table").getAbsolutePath(); + String tableName = "t_" + UUID.randomUUID().toString().replace('-', '_'); + createEmptyTestTable(tablePath, tableName); + PathBasedSnapshotManager snapshotManager = + new PathBasedSnapshotManager(tablePath, spark.sessionState().newHadoopConf()); + return (SnapshotImpl) snapshotManager.loadLatestSnapshot(); + } + + /** Persisted metadata with schema, protocol, and configuration distinct from the source. */ + private static PersistedMetadata persistedMetadataWithMarker(long version) { + Metadata customKernelMetadata = + new Metadata( + "persisted-id", + Optional.empty(), + Optional.empty(), + new Format("parquet", Collections.emptyMap()), + DEFAULT_SCHEMA_JSON, + DEFAULT_KERNEL_SCHEMA, + emptyArrayValue(), + Optional.empty(), + VectorUtils.stringStringMapValue(Collections.singletonMap("persisted.marker", "yes"))); + // Use feature-less reader/writer versions (2/5) — different from a fresh table's defaults. + Protocol customKernelProtocol = new Protocol(2, 5); + return PersistedMetadata.apply( + "persisted-id", + version, + new KernelMetadataAdapter(customKernelMetadata), + new KernelProtocolAdapter(customKernelProtocol), + "/fake/metadata/path"); + } + + /** Happy path: persisted metadata overlays version, schema, protocol, and configuration. */ + @Test + public void testBuildReadSnapshot_overlaysCustomMetadataAndProtocol(@TempDir File tempDir) { + SnapshotImpl source = loadSourceSnapshot(tempDir); + long persistedVersion = source.getVersion() + 100; // distinct from source version + PersistedMetadata persisted = persistedMetadataWithMarker(persistedVersion); + + SnapshotImpl read = + MetadataEvolutionHandler.buildReadSnapshotFromPersistedMetadata( + source, defaultEngine, persisted); + + assertEquals(persistedVersion, read.getVersion()); + assertEquals(2, read.getProtocol().getMinReaderVersion()); + assertEquals(5, read.getProtocol().getMinWriterVersion()); + assertEquals("yes", read.getMetadata().getConfiguration().get("persisted.marker")); + assertEquals(DEFAULT_KERNEL_SCHEMA, read.getSchema()); + } + + /** When {@code protocolJson} is absent on the persisted entry, fall back to the source's. */ + @Test + public void testBuildReadSnapshot_fallsBackToSnapshotProtocolWhenAbsent(@TempDir File tempDir) { + SnapshotImpl source = loadSourceSnapshot(tempDir); + PersistedMetadata withBoth = persistedMetadataWithMarker(42L); + PersistedMetadata withoutProtocol = + new PersistedMetadata( + withBoth.tableId(), + withBoth.deltaCommitVersion(), + withBoth.dataSchemaJson(), + withBoth.partitionSchemaJson(), + withBoth.sourceMetadataPath(), + withBoth.tableConfigurations(), + Option.empty(), + withBoth.previousMetadataSeqNum()); + + SnapshotImpl read = + MetadataEvolutionHandler.buildReadSnapshotFromPersistedMetadata( + source, defaultEngine, withoutProtocol); + + assertEquals( + source.getProtocol().getMinReaderVersion(), read.getProtocol().getMinReaderVersion()); + assertEquals( + source.getProtocol().getMinWriterVersion(), read.getProtocol().getMinWriterVersion()); + } + + /** + * When {@code tableConfigurations} is absent on the persisted entry, fall back to the source's. + */ + @Test + public void testBuildReadSnapshot_fallsBackToSnapshotConfigurationWhenAbsent( + @TempDir File tempDir) { + SnapshotImpl source = loadSourceSnapshot(tempDir); + PersistedMetadata withBoth = persistedMetadataWithMarker(42L); + PersistedMetadata withoutConfig = + new PersistedMetadata( + withBoth.tableId(), + withBoth.deltaCommitVersion(), + withBoth.dataSchemaJson(), + withBoth.partitionSchemaJson(), + withBoth.sourceMetadataPath(), + Option.empty(), + withBoth.protocolJson(), + withBoth.previousMetadataSeqNum()); + + SnapshotImpl read = + MetadataEvolutionHandler.buildReadSnapshotFromPersistedMetadata( + source, defaultEngine, withoutConfig); + + assertEquals(source.getMetadata().getConfiguration(), read.getMetadata().getConfiguration()); + // The persisted marker must not have leaked through when we fell back to the source. + assertFalse(read.getMetadata().getConfiguration().containsKey("persisted.marker")); + } + + /** + * The synthetic snapshot's {@code lazyLogSegment} / {@code lazyCrcInfo} are traps: anything that + * resolves them must fail loudly instead of silently reading against the wrong version. + */ + @Test + public void testBuildReadSnapshot_trapsLogReplayAccess(@TempDir File tempDir) { + SnapshotImpl source = loadSourceSnapshot(tempDir); + PersistedMetadata persisted = persistedMetadataWithMarker(42L); + + SnapshotImpl read = + MetadataEvolutionHandler.buildReadSnapshotFromPersistedMetadata( + source, defaultEngine, persisted); + + IllegalStateException crcEx = + assertThrows(IllegalStateException.class, read::getCurrentCrcInfo); + assertTrue( + crcEx.getMessage().contains("CRC info is not available"), + "Unexpected message: " + crcEx.getMessage()); + + IllegalStateException segEx = + assertThrows(IllegalStateException.class, () -> read.getLazyLogSegment().get()); + assertTrue( + segEx.getMessage().contains("log segment is not available"), + "Unexpected message: " + segEx.getMessage()); + } } diff --git a/spark/v2/src/test/java/io/delta/spark/internal/v2/read/SparkMicroBatchStreamCDCTest.java b/spark/v2/src/test/java/io/delta/spark/internal/v2/read/SparkMicroBatchStreamCDCTest.java index 57f9f9cac29..54e2079dbaa 100644 --- a/spark/v2/src/test/java/io/delta/spark/internal/v2/read/SparkMicroBatchStreamCDCTest.java +++ b/spark/v2/src/test/java/io/delta/spark/internal/v2/read/SparkMicroBatchStreamCDCTest.java @@ -926,7 +926,9 @@ private SparkMicroBatchStream createTestStreamWithDefaults( /* readDataSchema= */ new StructType(), /* ddlOrderedReadOutputSchema= */ new StructType(), /* dataFilters= */ new org.apache.spark.sql.sources.Filter[0], - /* scalaOptions= */ scala.collection.immutable.Map$.MODULE$.empty()); + /* scalaOptions= */ scala.collection.immutable.Map$.MODULE$.empty(), + /* metadataTrackingLog= */ scala.Option.empty(), + /* metadataPath= */ ""); } private SparkMicroBatchStream createStream(String tablePath) { diff --git a/spark/v2/src/test/java/io/delta/spark/internal/v2/read/SparkMicroBatchStreamTest.java b/spark/v2/src/test/java/io/delta/spark/internal/v2/read/SparkMicroBatchStreamTest.java index e5fe9466cb9..8df9d91da22 100644 --- a/spark/v2/src/test/java/io/delta/spark/internal/v2/read/SparkMicroBatchStreamTest.java +++ b/spark/v2/src/test/java/io/delta/spark/internal/v2/read/SparkMicroBatchStreamTest.java @@ -38,6 +38,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; @@ -55,7 +56,9 @@ import org.apache.spark.sql.delta.*; import org.apache.spark.sql.delta.sources.DeltaSQLConf; import org.apache.spark.sql.delta.sources.DeltaSource; +import org.apache.spark.sql.delta.sources.DeltaSourceMetadataTrackingLog; import org.apache.spark.sql.delta.sources.DeltaSourceOffset; +import org.apache.spark.sql.delta.sources.PersistedMetadata; import org.apache.spark.sql.delta.sources.ReadMaxBytes; import org.apache.spark.sql.delta.storage.ClosableIterator; import org.apache.spark.sql.delta.util.JsonUtils; @@ -2062,7 +2065,9 @@ public void testPlanInputPartitions_dataParity( dataSchema, partitionSchema), new org.apache.spark.sql.sources.Filter[0], - Map$.MODULE$.empty()); + Map$.MODULE$.empty(), + Option.empty(), + testTablePath + "/_checkpoint"); InputPartition[] partitions = stream.planInputPartitions(startOffset, planPartitionsEndOffset); PartitionReaderFactory readerFactory = stream.createReaderFactory(); @@ -2248,7 +2253,9 @@ public void testPlanInputPartitions_excludeRegex( dataSchema, partitionSchema), new org.apache.spark.sql.sources.Filter[0], - Map$.MODULE$.empty()); + Map$.MODULE$.empty(), + Option.empty(), + testTablePath + "/_checkpoint"); InputPartition[] partitions = stream.planInputPartitions(startOffset, endOffset); PartitionReaderFactory readerFactory = stream.createReaderFactory(); @@ -3051,18 +3058,10 @@ public void testSchemaEvolution_onForwardNonAdditiveChanges_throwsError( }, String.format("DSv2 should throw on METADATA for scenario: %s", testDescription)); - // TODO(#5319): assertEqual after schema tracking log is supported - String expectedPrefix = "DELTA_STREAMING_INCOMPATIBLE_SCHEMA_CHANGE"; - assertTrue( - dsv1Exception.getErrorClass().startsWith(expectedPrefix), - String.format( - "v1 connector error class should start with %s, but got: %s", - expectedPrefix, dsv1Exception.getErrorClass())); - assertTrue( - dsv2Exception.getErrorClass().startsWith(expectedPrefix), - String.format( - "v2 connector error class should start with %s, but got: %s", - expectedPrefix, dsv2Exception.getErrorClass())); + assertEquals( + dsv1Exception.getErrorClass(), + dsv2Exception.getErrorClass(), + "v1 connector and v2 connector should throw the same error class on forward-fill non-additive schema changes"); assertEquals( dsv1Exception.getMessageParameters(), dsv2Exception.getMessageParameters(), @@ -3165,18 +3164,10 @@ public void testSchemaEvolution_onBackfillNonAdditiveChanges_throwsError( }, String.format("DSv2 should throw on METADATA for scenario: %s", testDescription)); - // TODO(#5319): assertEqual after schema tracking log is supported - String expectedPrefix = "DELTA_STREAMING_INCOMPATIBLE_SCHEMA_CHANGE"; - assertTrue( - dsv1Exception.getErrorClass().startsWith(expectedPrefix), - String.format( - "v1 connector error class should start with %s, but got: %s", - expectedPrefix, dsv1Exception.getErrorClass())); - assertTrue( - dsv2Exception.getErrorClass().startsWith(expectedPrefix), - String.format( - "v2 connector error class should start with %s, but got: %s", - expectedPrefix, dsv2Exception.getErrorClass())); + assertEquals( + dsv1Exception.getErrorClass(), + dsv2Exception.getErrorClass(), + "v1 connector and v2 connector should throw the same error class on backfill non-additive schema changes"); assertEquals( dsv1Exception.getMessageParameters(), dsv2Exception.getMessageParameters(), @@ -3268,24 +3259,365 @@ public void testSchemaEvolution_onStreamStartOnce(@TempDir File tempDir) throws element -> element.toString().contains("checkReadIncompatibleSchemaChangeOnStreamStartOnce")); - // TODO(#5319): assertEqual after schema tracking log is supported - String expectedPrefix = "DELTA_STREAMING_INCOMPATIBLE_SCHEMA_CHANGE"; - assertTrue( - dsv1Exception.getErrorClass().startsWith(expectedPrefix), - String.format( - "v1 connector error class should start with %s, but got: %s", - expectedPrefix, dsv1Exception.getErrorClass())); - assertTrue( - dsv2Exception.getErrorClass().startsWith(expectedPrefix), - String.format( - "v2 connector error class should start with %s, but got: %s", - expectedPrefix, dsv2Exception.getErrorClass())); + assertEquals( + dsv1Exception.getErrorClass(), + dsv2Exception.getErrorClass(), + "v1 connector and v2 connector should throw the same error class on stream start schema changes"); assertEquals( dsv1Exception.getMessageParameters(), dsv2Exception.getMessageParameters(), "v1 connector and v2 connector should throw the same error messages on stream start schema changes"); } + /** + * Forward counterpart: schema change is applied AFTER stream construction, so the source's + * snapshot-at-init is pre-change. Only one restart is needed: + * + *

    + *
  1. First {@code latestOffset} runs the eager-init path, which writes the pre-change schema + * to the empty log. No throw, because the fresh stream's read schema matches what + * eager-init wrote. {@code latestOffset} then scans forward and stops at the barrier. + *
  2. {@code commit(barrier)} writes the post-change entry to the log and throws. + *
  3. Restart with post-change schema: barrier advances to {@code POST_METADATA_CHANGE_INDEX} + * and {@code commit} succeeds. A row written after the schema change is then read through + * the evolved stream to confirm the new schema is honored end-to-end. + *
+ */ + @ParameterizedTest + @MethodSource("nonAdditiveSchemaEvolutionLifecycleScenarios") + public void testSchemaTrackingLifecycle_forward( + ScenarioSetup scenarioSetup, + Map sparkConf, + ScenarioSetup insertPostChangeRow, + Consumer assertPostChangeSchema, + Consumer assertPostInsertRow, + String testDescription, + @TempDir File tempDir) + throws Exception { + String testTablePath = tempDir.getAbsolutePath(); + String testTableName = + "test_forward_lifecycle_" + Math.abs(testDescription.hashCode()) + "_" + System.nanoTime(); + + try { + sparkConf.forEach((k, v) -> spark.conf().set(k, v)); + + createSchemaEvolutionTestTable(testTablePath, testTableName); + DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(testTablePath)); + long startVersion = deltaLog.update(false, Option.empty(), Option.empty()).version(); + // createSchemaEvolutionTestTable inserts 2 rows. + long preChangeRowCount = 2L; + + Configuration hadoopConf = new Configuration(); + PathBasedSnapshotManager snapshotManager = + new PathBasedSnapshotManager(testTablePath, hadoopConf); + String schemaTrackingLocation = new File(tempDir, "schema_tracking").getAbsolutePath(); + String checkpointLocation = new File(tempDir, "checkpoint").getAbsolutePath(); + java.util.Map optionMap = + Collections.singletonMap("startingVersion", String.valueOf(startVersion)); + DeltaOptions options = createDeltaOptions(optionMap); + DeltaSourceMetadataTrackingLog trackingLog = + createTrackingLog(snapshotManager, schemaTrackingLocation, checkpointLocation, optionMap); + + // Stream constructed BEFORE the schema change — pre-change snapshot is still the latest. + StructType preChangeSchema = loadSparkSchemaAtVersion(snapshotManager, startVersion); + SparkMicroBatchStream streamPreChange = + createSchemaTrackingTestStream( + snapshotManager, + hadoopConf, + options, + testTablePath, + preChangeSchema, + Option.apply(trackingLog), + checkpointLocation); + + // Apply non-additive schema change AFTER stream init. + scenarioSetup.setup(testTableName, tempDir); + long schemaChangeVersion = deltaLog.update(false, Option.empty(), Option.empty()).version(); + + Offset startOffset = streamPreChange.initialOffset(); + DeltaSourceOffset startDelta = (DeltaSourceOffset) startOffset; + assertEquals(startVersion, startDelta.reservoirVersion()); + assertEquals(DeltaSourceOffset.BASE_INDEX(), startDelta.index()); + assertFalse(startDelta.isInitialSnapshot()); + + // First latestOffset eagerly initializes the log to the pre-change schema (no throw because + // the fresh stream's read schema matches). Then it scans forward and stops at the barrier. + Offset barrierOffset = streamPreChange.latestOffset(startOffset, ReadLimit.allAvailable()); + DeltaSourceOffset barrierDelta = (DeltaSourceOffset) barrierOffset; + assertEquals(schemaChangeVersion, barrierDelta.reservoirVersion()); + assertEquals(DeltaSourceOffset.METADATA_CHANGE_INDEX(), barrierDelta.index()); + + PersistedMetadata initialized = trackingLog.getCurrentTrackedMetadata().get(); + assertEquals(startVersion, initialized.deltaCommitVersion()); + + assertEquals( + preChangeRowCount, countRowsBetweenOffsets(streamPreChange, startOffset, barrierOffset)); + + DeltaRuntimeException barrierEx = + assertThrows(DeltaRuntimeException.class, () -> streamPreChange.commit(barrierOffset)); + assertMetadataEvolutionException(barrierEx, "on barrier commit (forward)"); + + PersistedMetadata evolved = trackingLog.getCurrentTrackedMetadata().get(); + assertEquals(schemaChangeVersion, evolved.deltaCommitVersion()); + assertPostChangeSchema.accept(evolved.dataSchema()); + + // Restart with the post-change schema (same trackingLog: it now carries the evolved entry). + StructType postChangeSchema = + io.delta.spark.internal.v2.utils.SchemaUtils.convertKernelSchemaToSparkSchema( + snapshotManager.loadLatestSnapshot().getSchema()); + SparkMicroBatchStream streamPostChange = + createSchemaTrackingTestStream( + snapshotManager, + hadoopConf, + options, + testTablePath, + postChangeSchema, + Option.apply(trackingLog), + checkpointLocation); + + Offset postBarrierOffset = + streamPostChange.latestOffset(barrierOffset, ReadLimit.allAvailable()); + DeltaSourceOffset postBarrierDelta = (DeltaSourceOffset) postBarrierOffset; + assertEquals(schemaChangeVersion, postBarrierDelta.reservoirVersion()); + assertEquals(DeltaSourceOffset.POST_METADATA_CHANGE_INDEX(), postBarrierDelta.index()); + assertEquals(0L, countRowsBetweenOffsets(streamPostChange, barrierOffset, postBarrierOffset)); + assertDoesNotThrow(() -> streamPostChange.commit(postBarrierOffset)); + + // Successful commit must leave the evolved entry in place — not roll back or double-evolve. + PersistedMetadata afterPostBarrierCommit = trackingLog.getCurrentTrackedMetadata().get(); + assertEquals(schemaChangeVersion, afterPostBarrierCommit.deltaCommitVersion()); + assertPostChangeSchema.accept(afterPostBarrierCommit.dataSchema()); + + // Insert a row shaped to the post-change schema and read it through the evolved stream. + // Asserting a column value (not just the row count) verifies the projection actually honors + // the evolved schema — a stale-schema read would also produce 1 row. + insertPostChangeRow.setup(testTableName, tempDir); + Offset postInsertOffset = + streamPostChange.latestOffset(postBarrierOffset, ReadLimit.allAvailable()); + List postInsertRows = + readRowsBetweenOffsets(streamPostChange, postBarrierOffset, postInsertOffset); + assertEquals(1, postInsertRows.size()); + assertPostInsertRow.accept(postInsertRows.get(0)); + assertDoesNotThrow(() -> streamPostChange.commit(postInsertOffset)); + } finally { + sparkConf.keySet().forEach(k -> spark.conf().unset(k)); + } + } + + /** + * Backfill counterpart: schema change is applied BEFORE stream construction, so the source's + * snapshot-at-init is post-change but the streaming starts at a pre-change version. Three + * restarts are needed: + * + *
    + *
  1. First {@code latestOffset} runs the eager-init path, which writes the pre-change schema + * to the empty log and throws (post-change ≠ pre-change). + *
  2. Restart with pre-change schema: {@code latestOffset} returns the barrier; pre-change rows + * are read; {@code commit(barrier)} writes the post-change entry and throws. + *
  3. Restart with post-change schema: barrier advances to {@code POST_METADATA_CHANGE_INDEX} + * and {@code commit} succeeds. A row written after the schema change is then read through + * the evolved stream to confirm the new schema is honored end-to-end. + *
+ */ + @ParameterizedTest + @MethodSource("nonAdditiveSchemaEvolutionLifecycleScenarios") + public void testSchemaTrackingLifecycle_backfill( + ScenarioSetup scenarioSetup, + Map sparkConf, + ScenarioSetup insertPostChangeRow, + Consumer assertPostChangeSchema, + Consumer assertPostInsertRow, + String testDescription, + @TempDir File tempDir) + throws Exception { + String testTablePath = tempDir.getAbsolutePath(); + String testTableName = + "test_backfill_lifecycle_" + Math.abs(testDescription.hashCode()) + "_" + System.nanoTime(); + + try { + sparkConf.forEach((k, v) -> spark.conf().set(k, v)); + + createSchemaEvolutionTestTable(testTablePath, testTableName); + DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(testTablePath)); + long startVersion = deltaLog.update(false, Option.empty(), Option.empty()).version(); + long preChangeRowCount = 2L; + + // Apply non-additive schema change BEFORE stream construction. + scenarioSetup.setup(testTableName, tempDir); + long schemaChangeVersion = deltaLog.update(false, Option.empty(), Option.empty()).version(); + + Configuration hadoopConf = new Configuration(); + PathBasedSnapshotManager snapshotManager = + new PathBasedSnapshotManager(testTablePath, hadoopConf); + String schemaTrackingLocation = new File(tempDir, "schema_tracking").getAbsolutePath(); + String checkpointLocation = new File(tempDir, "checkpoint").getAbsolutePath(); + java.util.Map optionMap = + Collections.singletonMap("startingVersion", String.valueOf(startVersion)); + DeltaOptions options = createDeltaOptions(optionMap); + DeltaSourceMetadataTrackingLog trackingLog = + createTrackingLog(snapshotManager, schemaTrackingLocation, checkpointLocation, optionMap); + + StructType preChangeSchema = loadSparkSchemaAtVersion(snapshotManager, startVersion); + StructType postChangeSchema = + io.delta.spark.internal.v2.utils.SchemaUtils.convertKernelSchemaToSparkSchema( + snapshotManager.loadLatestSnapshot().getSchema()); + + // Round 1: post-change schema (analysis would bind to the latest snapshot since the log + // is empty). First latestOffset runs eager-init and throws. + SparkMicroBatchStream streamForEagerInit = + createSchemaTrackingTestStream( + snapshotManager, + hadoopConf, + options, + testTablePath, + postChangeSchema, + Option.apply(trackingLog), + checkpointLocation); + Offset eagerInitStartOffset = streamForEagerInit.initialOffset(); + DeltaRuntimeException initEx = + assertThrows( + DeltaRuntimeException.class, + () -> + streamForEagerInit.latestOffset(eagerInitStartOffset, ReadLimit.allAvailable())); + assertMetadataEvolutionException(initEx, "during backfill log initialization"); + + PersistedMetadata afterInit = trackingLog.getCurrentTrackedMetadata().get(); + assertEquals(startVersion, afterInit.deltaCommitVersion()); + + // Round 2: pre-change schema (the log entry now carries it). + SparkMicroBatchStream streamForBarrier = + createSchemaTrackingTestStream( + snapshotManager, + hadoopConf, + options, + testTablePath, + preChangeSchema, + Option.apply(trackingLog), + checkpointLocation); + Offset barrierStartOffset = streamForBarrier.initialOffset(); + DeltaSourceOffset barrierStartDelta = (DeltaSourceOffset) barrierStartOffset; + assertEquals(startVersion, barrierStartDelta.reservoirVersion()); + assertEquals(DeltaSourceOffset.BASE_INDEX(), barrierStartDelta.index()); + assertFalse(barrierStartDelta.isInitialSnapshot()); + + Offset barrierOffset = + streamForBarrier.latestOffset(barrierStartOffset, ReadLimit.allAvailable()); + DeltaSourceOffset barrierDelta = (DeltaSourceOffset) barrierOffset; + assertEquals(schemaChangeVersion, barrierDelta.reservoirVersion()); + assertEquals(DeltaSourceOffset.METADATA_CHANGE_INDEX(), barrierDelta.index()); + + assertEquals( + preChangeRowCount, + countRowsBetweenOffsets(streamForBarrier, barrierStartOffset, barrierOffset)); + + DeltaRuntimeException barrierEx = + assertThrows(DeltaRuntimeException.class, () -> streamForBarrier.commit(barrierOffset)); + assertMetadataEvolutionException(barrierEx, "on barrier commit (backfill)"); + + PersistedMetadata evolved = trackingLog.getCurrentTrackedMetadata().get(); + assertEquals(schemaChangeVersion, evolved.deltaCommitVersion()); + assertPostChangeSchema.accept(evolved.dataSchema()); + + // Round 3: post-change schema. Advance barrier → POST_BARRIER, commit succeeds. + SparkMicroBatchStream streamForPostBarrier = + createSchemaTrackingTestStream( + snapshotManager, + hadoopConf, + options, + testTablePath, + postChangeSchema, + Option.apply(trackingLog), + checkpointLocation); + Offset postBarrierOffset = + streamForPostBarrier.latestOffset(barrierOffset, ReadLimit.allAvailable()); + DeltaSourceOffset postBarrierDelta = (DeltaSourceOffset) postBarrierOffset; + assertEquals(schemaChangeVersion, postBarrierDelta.reservoirVersion()); + assertEquals(DeltaSourceOffset.POST_METADATA_CHANGE_INDEX(), postBarrierDelta.index()); + assertEquals( + 0L, countRowsBetweenOffsets(streamForPostBarrier, barrierOffset, postBarrierOffset)); + assertDoesNotThrow(() -> streamForPostBarrier.commit(postBarrierOffset)); + + // Successful commit must leave the evolved entry in place — not roll back or double-evolve. + PersistedMetadata afterPostBarrierCommit = trackingLog.getCurrentTrackedMetadata().get(); + assertEquals(schemaChangeVersion, afterPostBarrierCommit.deltaCommitVersion()); + assertPostChangeSchema.accept(afterPostBarrierCommit.dataSchema()); + + // Insert a row shaped to the post-change schema and read it through the evolved stream. + // Asserting a column value (not just the row count) verifies the projection actually honors + // the evolved schema — a stale-schema read would also produce 1 row. + insertPostChangeRow.setup(testTableName, tempDir); + Offset postInsertOffset = + streamForPostBarrier.latestOffset(postBarrierOffset, ReadLimit.allAvailable()); + List postInsertRows = + readRowsBetweenOffsets(streamForPostBarrier, postBarrierOffset, postInsertOffset); + assertEquals(1, postInsertRows.size()); + assertPostInsertRow.accept(postInsertRows.get(0)); + assertDoesNotThrow(() -> streamForPostBarrier.commit(postInsertOffset)); + } finally { + sparkConf.keySet().forEach(k -> spark.conf().unset(k)); + } + } + + /** + * Schema tracking and failOnDataLoss=false are incompatible: log-retention pruning could let the + * stream skip past a metadata-change commit, desyncing the tracking log from the data schema. + * Mirrors V1's require() inside filterAndIndexDeltaLogs. + */ + @Test + public void testSchemaTrackingRejectsFailOnDataLossFalse(@TempDir File tempDir) throws Exception { + String testTablePath = tempDir.getAbsolutePath(); + String testTableName = "test_schema_tracking_fail_on_data_loss_" + System.nanoTime(); + createSchemaEvolutionTestTable(testTablePath, testTableName); + + Configuration hadoopConf = new Configuration(); + PathBasedSnapshotManager snapshotManager = + new PathBasedSnapshotManager(testTablePath, hadoopConf); + String schemaTrackingLocation = new File(tempDir, "schema_tracking").getAbsolutePath(); + String checkpointLocation = new File(tempDir, "checkpoint").getAbsolutePath(); + StructType schema = + loadSparkSchemaAtVersion( + snapshotManager, snapshotManager.loadLatestSnapshot().getVersion()); + DeltaSourceMetadataTrackingLog trackingLog = + createTrackingLog( + snapshotManager, schemaTrackingLocation, checkpointLocation, Collections.emptyMap()); + + // Populate the tracking log via eager init on a default-options stream so that + // shouldTrackMetadataChange() returns true on the next stream. + SparkMicroBatchStream streamForInit = + createSchemaTrackingTestStream( + snapshotManager, + hadoopConf, + emptyDeltaOptions(), + testTablePath, + schema, + Option.apply(trackingLog), + checkpointLocation); + Offset initOffset = streamForInit.initialOffset(); + streamForInit.latestOffset(initOffset, ReadLimit.allAvailable()); + assertTrue( + trackingLog.getCurrentTrackedMetadata().nonEmpty(), "eager init should populate log"); + + // With the log populated, a stream configured with failOnDataLoss=false must reject the first + // change-log scan. + DeltaOptions failOnDataLossFalse = createDeltaOptions("failOnDataLoss", "false"); + SparkMicroBatchStream stream = + createSchemaTrackingTestStream( + snapshotManager, + hadoopConf, + failOnDataLossFalse, + testTablePath, + schema, + Option.apply(trackingLog), + checkpointLocation); + Offset startOffset = stream.initialOffset(); + IllegalArgumentException ex = + assertThrows( + IllegalArgumentException.class, + () -> stream.latestOffset(startOffset, ReadLimit.allAvailable())); + assertThat(ex.getMessage()) + .contains("schema tracking log cannot tolerate missing commit files"); + } + /** Provides test scenarios that generate additive schema changes actions. */ private static Stream additiveSchemaEvolutionScenarios() { return Stream.of( @@ -3371,55 +3703,186 @@ private static Stream additiveSchemaEvolutionScenarios() { "Widen INT column to BIGINT")); } - /** Provides test scenarios that generate non-additive schema changes actions. */ - private static Stream nonAdditiveSchemaEvolutionScenarios() { - return Stream.of( - // Rename column - Arguments.of( - (ScenarioSetup) - (tableName, tempDir) -> { - sql("ALTER TABLE %s RENAME COLUMN id TO userId", tableName); - }, - /* sparkConf */ Map.of(), - "Rename column"), + /** + * Single source of truth for non-additive schema-evolution scenarios. Each scenario carries: + * + *
    + *
  • {@code change} — the non-additive ALTER applied to the fixture + *
  • {@code sparkConf} — feature flags required for the change to be classified as + * non-additive (column-mapping unsafe-read flag, type-widening tracking flag, etc.) + *
  • {@code insertPostChangeRow} — INSERT shaped to the post-change schema; used by lifecycle + * tests to verify the evolved stream can read freshly-written data + *
  • {@code assertPostChangeSchema} — predicate over the persisted schema after the evolve; + * used by lifecycle tests + *
  • {@code assertPostInsertRow} — predicate over the {@link InternalRow} produced by reading + * the post-change INSERT through the evolved stream. Verifies that the projection actually + * honors the post-change schema — a row-count check alone can't distinguish "row read with + * evolved schema" from "row silently read with the wrong schema." + *
  • {@code description} — human-readable label used to disambiguate temp tables / failures + *
+ * + * Throws-error tests project {@code (change, sparkConf, description)}; lifecycle tests use the + * full tuple. Pre-change schema is always the fixture's schema, so it doesn't need to be + * parameterized. + */ + private static final class NonAdditiveScenario { + final ScenarioSetup change; + final Map sparkConf; + final ScenarioSetup insertPostChangeRow; + final Consumer assertPostChangeSchema; + final Consumer assertPostInsertRow; + final String description; + + NonAdditiveScenario( + ScenarioSetup change, + Map sparkConf, + ScenarioSetup insertPostChangeRow, + Consumer assertPostChangeSchema, + Consumer assertPostInsertRow, + String description) { + this.change = change; + this.sparkConf = sparkConf; + this.insertPostChangeRow = insertPostChangeRow; + this.assertPostChangeSchema = assertPostChangeSchema; + this.assertPostInsertRow = assertPostInsertRow; + this.description = description; + } + } - // Drop nullable, non-nullable and struct columns - Arguments.of( - (ScenarioSetup) - (tableName, tempDir) -> { - sql("ALTER TABLE %s DROP COLUMNS (id, value, info)", tableName); - }, - /* sparkConf */ Map.of( - DeltaSQLConf - .DELTA_STREAMING_UNSAFE_READ_ON_INCOMPATIBLE_COLUMN_MAPPING_SCHEMA_CHANGES() - .key(), - "false"), + private static List nonAdditiveScenarios() { + Map unsafeReadFalse = + Map.of( + DeltaSQLConf.DELTA_STREAMING_UNSAFE_READ_ON_INCOMPATIBLE_COLUMN_MAPPING_SCHEMA_CHANGES() + .key(), + "false"); + Map typeWideningTracking = + Map.of(DeltaSQLConf.DELTA_TYPE_WIDENING_ENABLE_STREAMING_SCHEMA_TRACKING().key(), "true"); + + return List.of( + new NonAdditiveScenario( + (tableName, tempDir) -> sql("ALTER TABLE %s DROP COLUMNS (value)", tableName), + unsafeReadFalse, + (tableName, tempDir) -> + sql( + "INSERT INTO %s VALUES (3, 'Carol', named_struct('col1', 50, 'col2', 'SF'))", + tableName), + schema -> assertThat(Arrays.asList(schema.fieldNames())).doesNotContain("value"), + // Post-change projection is (id, name, info) — 3 columns, info at ordinal 2. + row -> { + assertEquals(3, row.getInt(0)); + assertEquals("Carol", row.getUTF8String(1).toString()); + assertEquals(50, row.getStruct(2, 2).getInt(0)); + }, + "Drop column"), + new NonAdditiveScenario( + (tableName, tempDir) -> sql("ALTER TABLE %s DROP COLUMNS (id, value, info)", tableName), + unsafeReadFalse, + (tableName, tempDir) -> sql("INSERT INTO %s VALUES ('Carol')", tableName), + schema -> assertThat(Arrays.asList(schema.fieldNames())).containsExactly("name"), + // Post-change projection is (name) — 1 column. + row -> assertEquals("Carol", row.getUTF8String(0).toString()), "Drop nullable, non-nullable and struct columns"), - - // Drop column in nested struct - Arguments.of( - (ScenarioSetup) - (tableName, tempDir) -> { - sql("ALTER TABLE %s DROP COLUMNS info.col1", tableName); - }, - /* sparkConf */ Map.of( - DeltaSQLConf - .DELTA_STREAMING_UNSAFE_READ_ON_INCOMPATIBLE_COLUMN_MAPPING_SCHEMA_CHANGES() - .key(), - "false"), + new NonAdditiveScenario( + (tableName, tempDir) -> sql("ALTER TABLE %s RENAME COLUMN id TO userId", tableName), + Map.of(), + (tableName, tempDir) -> + sql( + "INSERT INTO %s VALUES (3, 'Carol', 50.0, " + + "named_struct('col1', 50, 'col2', 'SF'))", + tableName), + schema -> + assertThat(Arrays.asList(schema.fieldNames())) + .contains("userId") + .doesNotContain("id"), + // Renamed column keeps physical column id; new logical name "userId" at ordinal 0. + row -> { + assertEquals(3, row.getInt(0)); + assertEquals("Carol", row.getUTF8String(1).toString()); + }, + "Rename column"), + new NonAdditiveScenario( + (tableName, tempDir) -> sql("ALTER TABLE %s DROP COLUMNS info.col1", tableName), + unsafeReadFalse, + (tableName, tempDir) -> + sql( + "INSERT INTO %s VALUES (3, 'Carol', 50.0, named_struct('col2', 'SF'))", + tableName), + schema -> { + StructType info = (StructType) schema.apply("info").dataType(); + assertThat(Arrays.asList(info.fieldNames())).doesNotContain("col1"); + }, + // info struct has only col2 after drop — 1 field. + row -> { + assertEquals(3, row.getInt(0)); + assertEquals("SF", row.getStruct(3, 1).getUTF8String(0).toString()); + }, "Drop column in nested struct"), + new NonAdditiveScenario( + (tableName, tempDir) -> + sql("ALTER TABLE %s RENAME COLUMN info.col1 TO newCol1", tableName), + unsafeReadFalse, + (tableName, tempDir) -> + sql( + "INSERT INTO %s VALUES (3, 'Carol', 50.0, " + + "named_struct('newCol1', 50, 'col2', 'SF'))", + tableName), + schema -> { + StructType info = (StructType) schema.apply("info").dataType(); + assertThat(Arrays.asList(info.fieldNames())) + .contains("newCol1") + .doesNotContain("col1"); + }, + // info struct keeps two fields; newCol1 (renamed from col1) at struct ordinal 0. + row -> assertEquals(50, row.getStruct(3, 2).getInt(0)), + "Rename column in nested struct"), + new NonAdditiveScenario( + (tableName, tempDir) -> sql("ALTER TABLE %s ALTER COLUMN id TYPE BIGINT", tableName), + typeWideningTracking, + (tableName, tempDir) -> + sql( + "INSERT INTO %s VALUES (3, 'Carol', 50.0, " + + "named_struct('col1', 50, 'col2', 'SF'))", + tableName), + schema -> assertEquals(DataTypes.LongType, schema.apply("id").dataType()), + // id is now BIGINT — must be read as long, not int. + row -> assertEquals(3L, row.getLong(0)), + "Widen INT column to BIGINT"), + new NonAdditiveScenario( + (tableName, tempDir) -> + sql("ALTER TABLE %s ALTER COLUMN info.col1 TYPE BIGINT", tableName), + typeWideningTracking, + (tableName, tempDir) -> + sql( + "INSERT INTO %s VALUES (3, 'Carol', 50.0, " + + "named_struct('col1', 50, 'col2', 'SF'))", + tableName), + schema -> { + StructType info = (StructType) schema.apply("info").dataType(); + assertEquals(DataTypes.LongType, info.apply("col1").dataType()); + }, + // info.col1 is now BIGINT — must be read as long from the struct. + row -> assertEquals(50L, row.getStruct(3, 2).getLong(0)), + "Widen INT column to BIGINT in nested struct")); + } - // Widen INT column to BIGINT - Arguments.of( - (ScenarioSetup) - (tableName, tempDir) -> { - sql("ALTER TABLE %s ALTER COLUMN id TYPE BIGINT", tableName); - }, - // Set enableSchemaTrackingForTypeWidening to be true to treat widening type changes as - // non-additive - /* sparkConf */ Map.of( - DeltaSQLConf.DELTA_TYPE_WIDENING_ENABLE_STREAMING_SCHEMA_TRACKING().key(), "true"), - "Widen INT column to BIGINT")); + /** Provides test scenarios that generate non-additive schema changes actions. */ + private static Stream nonAdditiveSchemaEvolutionScenarios() { + return nonAdditiveScenarios().stream() + .map(s -> Arguments.of(s.change, s.sparkConf, s.description)); + } + + /** Same scenarios as {@link #nonAdditiveSchemaEvolutionScenarios}, plus lifecycle-only fields. */ + private static Stream nonAdditiveSchemaEvolutionLifecycleScenarios() { + return nonAdditiveScenarios().stream() + .map( + s -> + Arguments.of( + s.change, + s.sparkConf, + s.insertPostChangeRow, + s.assertPostChangeSchema, + s.assertPostInsertRow, + s.description)); } // ================================================================================================ @@ -3830,6 +4293,7 @@ private DeltaSource createDeltaSource(DeltaLog deltaLog, String tablePath, Delta private SparkMicroBatchStream createTestStreamWithDefaults( PathBasedSnapshotManager snapshotManager, Configuration hadoopConf, DeltaOptions options) { io.delta.kernel.Snapshot snapshot = snapshotManager.loadLatestSnapshot(); + String tablePath = ((io.delta.kernel.internal.SnapshotImpl) snapshot).getPath(); StructType tableSchema = io.delta.spark.internal.v2.utils.SchemaUtils.convertKernelSchemaToSparkSchema( snapshot.getSchema()); @@ -3839,13 +4303,188 @@ private SparkMicroBatchStream createTestStreamWithDefaults( hadoopConf, spark, options, - /* tablePath= */ "", + /* tablePath= */ tablePath, /* dataSchema= */ tableSchema, /* partitionSchema= */ new StructType(), /* readDataSchema= */ new StructType(), /* ddlOrderedReadOutputSchema= */ new StructType(), /* dataFilters= */ new org.apache.spark.sql.sources.Filter[0], - /* scalaOptions= */ scala.collection.immutable.Map$.MODULE$.empty()); + /* scalaOptions= */ scala.collection.immutable.Map$.MODULE$.empty(), + /* metadataTrackingLog= */ Option.empty(), + /* metadataPath= */ tablePath + "/_checkpoint"); + } + + private SparkMicroBatchStream createSchemaTrackingTestStream( + PathBasedSnapshotManager snapshotManager, + Configuration hadoopConf, + DeltaOptions options, + String tablePath, + StructType dataSchema, + Option metadataTrackingLog, + String metadataPath) { + io.delta.kernel.Snapshot snapshot = snapshotManager.loadLatestSnapshot(); + return new SparkMicroBatchStream( + snapshotManager, + snapshot, + hadoopConf, + spark, + options, + tablePath, + dataSchema, + /* partitionSchema= */ new StructType(), + /* readDataSchema= */ dataSchema, + SchemaUtils.ddlOrderedOutputSchema( + io.delta.spark.internal.v2.utils.SchemaUtils.convertKernelSchemaToSparkSchema( + snapshotManager.loadLatestSnapshot().getSchema()), + dataSchema, + new StructType()), + /* dataFilters= */ new org.apache.spark.sql.sources.Filter[0], + /* scalaOptions= */ scala.collection.immutable.Map$.MODULE$.empty(), + metadataTrackingLog, + metadataPath); + } + + private DeltaSourceMetadataTrackingLog createTrackingLog( + PathBasedSnapshotManager snapshotManager, + String schemaTrackingLocation, + String checkpointLocation, + java.util.Map optionMap) { + io.delta.kernel.internal.SnapshotImpl snapshot = + (io.delta.kernel.internal.SnapshotImpl) snapshotManager.loadLatestSnapshot(); + return DeltaSourceMetadataTrackingLog.create( + spark, + schemaTrackingLocation, + snapshot.getMetadata().getId(), + snapshot.getPath(), + ScalaUtils.toScalaMap(optionMap), + Option.apply(checkpointLocation), + /* mergeConsecutiveSchemaChanges= */ false, + /* consecutiveSchemaChangesMerger= */ Option.empty(), + /* initMetadataLogEagerly= */ true); + } + + private StructType loadSparkSchemaAtVersion( + PathBasedSnapshotManager snapshotManager, long version) { + return io.delta.spark.internal.v2.utils.SchemaUtils.convertKernelSchemaToSparkSchema( + snapshotManager.loadSnapshotAt(version).getSchema()); + } + + private List readIdsBetweenOffsets( + SparkMicroBatchStream stream, Offset startOffset, Offset endOffset) throws Exception { + InputPartition[] partitions = stream.planInputPartitions(startOffset, endOffset); + PartitionReaderFactory readerFactory = stream.createReaderFactory(); + List ids = new ArrayList<>(); + for (InputPartition partition : partitions) { + if (readerFactory.supportColumnarReads(partition)) { + PartitionReader reader = + readerFactory.createColumnarReader(partition); + try { + while (reader.next()) { + org.apache.spark.sql.vectorized.ColumnarBatch batch = reader.get(); + for (int rowId = 0; rowId < batch.numRows(); rowId++) { + ids.add(batch.getRow(rowId).getInt(0)); + } + } + } finally { + reader.close(); + } + } else { + PartitionReader reader = readerFactory.createReader(partition); + try { + while (reader.next()) { + ids.add(reader.get().getInt(0)); + } + } finally { + reader.close(); + } + } + } + Collections.sort(ids); + return ids; + } + + /** + * Reads rows between offsets and returns each row as an {@link InternalRow}. Columnar batches are + * materialized via {@code batch.getRow(i).copy()} so callers can hold onto the rows after the + * reader is closed. + */ + private List readRowsBetweenOffsets( + SparkMicroBatchStream stream, Offset startOffset, Offset endOffset) throws Exception { + InputPartition[] partitions = stream.planInputPartitions(startOffset, endOffset); + PartitionReaderFactory readerFactory = stream.createReaderFactory(); + List rows = new ArrayList<>(); + for (InputPartition partition : partitions) { + if (readerFactory.supportColumnarReads(partition)) { + PartitionReader reader = + readerFactory.createColumnarReader(partition); + try { + while (reader.next()) { + org.apache.spark.sql.vectorized.ColumnarBatch batch = reader.get(); + for (int rowId = 0; rowId < batch.numRows(); rowId++) { + rows.add(batch.getRow(rowId).copy()); + } + } + } finally { + reader.close(); + } + } else { + PartitionReader reader = readerFactory.createReader(partition); + try { + while (reader.next()) { + rows.add(reader.get().copy()); + } + } finally { + reader.close(); + } + } + } + return rows; + } + + /** + * Reads rows between offsets and returns just the count. Used by lifecycle tests where the + * specific column values are not the focus and reading by column name/ordinal would break across + * scenarios that drop or rename the leading column. + */ + private long countRowsBetweenOffsets( + SparkMicroBatchStream stream, Offset startOffset, Offset endOffset) throws Exception { + InputPartition[] partitions = stream.planInputPartitions(startOffset, endOffset); + PartitionReaderFactory readerFactory = stream.createReaderFactory(); + long count = 0L; + for (InputPartition partition : partitions) { + if (readerFactory.supportColumnarReads(partition)) { + PartitionReader reader = + readerFactory.createColumnarReader(partition); + try { + while (reader.next()) { + count += reader.get().numRows(); + } + } finally { + reader.close(); + } + } else { + PartitionReader reader = readerFactory.createReader(partition); + try { + while (reader.next()) { + count++; + } + } finally { + reader.close(); + } + } + } + return count; + } + + private static void assertMetadataEvolutionException(DeltaRuntimeException ex, String context) { + assertEquals( + "DELTA_STREAMING_METADATA_EVOLUTION", + ex.getErrorClass(), + "Should throw metadata evolution exception " + context); + java.util.Map params = ex.getMessageParameters(); + assertTrue(params.containsKey("schema"), "Missing 'schema' message parameter"); + assertTrue(params.containsKey("config"), "Missing 'config' message parameter"); + assertTrue(params.containsKey("protocol"), "Missing 'protocol' message parameter"); } /** Helper method to create DeltaOptions with read option for testing. */ @@ -3861,6 +4500,13 @@ private DeltaOptions createDeltaOptions(String optionName, String optionValue) { } } + private DeltaOptions createDeltaOptions(java.util.Map optionMap) { + if (optionMap == null || optionMap.isEmpty()) { + return emptyDeltaOptions(); + } + return new DeltaOptions(ScalaUtils.toScalaMap(optionMap), spark.sessionState().conf()); + } + /** Helper method to test and compare getStartingVersion results from DSv1 and DSv2. */ private void testAndCompareStartingVersion( String testTablePath, diff --git a/spark/v2/src/test/java/io/delta/spark/internal/v2/read/SparkScanTest.java b/spark/v2/src/test/java/io/delta/spark/internal/v2/read/SparkScanTest.java index b76cd4db6d8..4e27509daed 100644 --- a/spark/v2/src/test/java/io/delta/spark/internal/v2/read/SparkScanTest.java +++ b/spark/v2/src/test/java/io/delta/spark/internal/v2/read/SparkScanTest.java @@ -751,7 +751,8 @@ public void testValidateStreamingOptions_UnsupportedOptions() { + "Supported options are: [startingVersion, startingTimestamp, maxFilesPerTrigger, " + "maxBytesPerTrigger, ignoreFileDeletion, ignoreChanges, ignoreDeletes, " + "skipChangeCommits, excludeRegex, failOnDataLoss, readChangeFeed, readChangeData, " - + "schemaTrackingLocation, schemaLocation, streamingSourceTrackingId].", + + "schemaTrackingLocation, schemaLocation, streamingSourceTrackingId, " + + "allowSourceColumnDrop, allowSourceColumnRename, allowSourceColumnTypeChange].", exception.getMessage()); }