diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala index e4d373b524b..30d29881c78 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.delta.actions.{Metadata, Protocol} import org.apache.spark.sql.delta.commands.cdc.CDCReader import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils} +import org.apache.spark.sql.delta.v2.interop.AbstractMetadata import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.json4s.DefaultFormats import org.json4s.jackson.JsonMethods._ @@ -425,32 +426,34 @@ trait DeltaColumnMappingBase extends DeltaLogging { } /** - * For each column/field in a Metadata's schema, assign id using the current maximum id - * as the basis and increment from there, and assign physical name using UUID - * @param newMetadata The new metadata to assign Ids and physical names - * @param oldMetadata The old metadata - * @param isChangingModeOnExistingTable whether this is part of a commit that changes the - * mapping mode on a existing table - * @return new metadata with Ids and physical names assigned + * Core logic for assigning column IDs and physical names to a schema. + * Takes [[AbstractMetadata]] (no v1 Metadata dependency) so it can be reused by both v1 + * and v2 connectors. Bundling schema + configuration on each side avoids the swap footgun + * of having two `StructType` and two `Map` parameters next to each other. + * + * @return (upgradedSchema, maxColumnId) - the schema with IDs/physical names assigned, + * and the final max column ID. */ - def assignColumnIdAndPhysicalName( - newMetadata: Metadata, - oldMetadata: Metadata, + private[delta] def assignColumnIdAndPhysicalNameToSchema( + newMetadata: AbstractMetadata, + oldMetadata: AbstractMetadata, isChangingModeOnExistingTable: Boolean, - isOverwritingSchema: Boolean): Metadata = { - val rawSchema = newMetadata.schema - var maxId = DeltaConfigs.COLUMN_MAPPING_MAX_ID.fromMetaData(newMetadata) max - DeltaConfigs.COLUMN_MAPPING_MAX_ID.fromMetaData(oldMetadata) max - findMaxColumnId(rawSchema) - val startId = maxId - val newSchema = - SchemaMergingUtils.transformColumns(rawSchema)((path, field, _) => { + isOverwritingSchema: Boolean): (StructType, Long) = { + val newSchema = newMetadata.schema + val oldSchema = oldMetadata.schema + val newConfiguration = newMetadata.configuration + val oldConfiguration = oldMetadata.configuration + var maxId = DeltaConfigs.COLUMN_MAPPING_MAX_ID.fromMap(newConfiguration) max + DeltaConfigs.COLUMN_MAPPING_MAX_ID.fromMap(oldConfiguration) max + findMaxColumnId(newSchema) + val resultSchema = + SchemaMergingUtils.transformColumns(newSchema)((path, field, _) => { val builder = new MetadataBuilder().withMetadata(field.metadata) lazy val fullName = path :+ field.name lazy val existingFieldOpt = SchemaUtils.findNestedFieldIgnoreCase( - oldMetadata.schema, fullName, includeCollections = true) + oldSchema, fullName, includeCollections = true) lazy val canReuseColumnMappingMetadataDuringOverwrite = { val canReuse = isOverwritingSchema && @@ -484,12 +487,12 @@ trait DeltaColumnMappingBase extends DeltaLogging { if (!hasPhysicalName(field)) { val physicalName = if (isChangingModeOnExistingTable) { if (existingFieldOpt.isEmpty) { - if (oldMetadata.schema.isEmpty) { + if (oldSchema.isEmpty) { // We should relax the check for tables that have both an empty schema // and no data. Assumption: no schema => no data generatePhysicalName } else throw DeltaErrors.schemaChangeDuringMappingModeChangeNotSupported( - oldMetadata.schema, newMetadata.schema) + oldSchema, newSchema) } else { // When changing from NoMapping to NameMapping mode, we directly use old display names // as physical names. This is by design: 1) We don't need to rewrite the @@ -509,13 +512,30 @@ trait DeltaColumnMappingBase extends DeltaLogging { } field.copy(metadata = builder.build()) }) - // Starting from IcebergCompatV2, we require writing field-id for List/Map nested fields - val (finalSchema, newMaxId) = if (IcebergCompat.isGeqEnabled(newMetadata, 2)) { - rewriteFieldIdsForIceberg(newSchema, maxId) + if (IcebergCompat.anyEnabled(newConfiguration).exists(_.version >= 2)) { + rewriteFieldIdsForIceberg(resultSchema, maxId) } else { - (newSchema, maxId) + (resultSchema, maxId) } + } + + /** + * For each column/field in a Metadata's schema, assign id using the current maximum id + * as the basis and increment from there, and assign physical name using UUID + * @param newMetadata The new metadata to assign Ids and physical names + * @param oldMetadata The old metadata + * @param isChangingModeOnExistingTable whether this is part of a commit that changes the + * mapping mode on a existing table + * @return new metadata with Ids and physical names assigned + */ + def assignColumnIdAndPhysicalName( + newMetadata: Metadata, + oldMetadata: Metadata, + isChangingModeOnExistingTable: Boolean, + isOverwritingSchema: Boolean): Metadata = { + val (finalSchema, newMaxId) = assignColumnIdAndPhysicalNameToSchema( + newMetadata, oldMetadata, isChangingModeOnExistingTable, isOverwritingSchema) newMetadata.copy( schemaString = finalSchema.json, @@ -775,10 +795,12 @@ trait DeltaColumnMappingBase extends DeltaLogging { * As of now, `newMetadata` is column mapping read compatible with `oldMetadata` if * no rename column or drop column has happened in-between. */ - def hasNoColumnMappingSchemaChanges(newMetadata: Metadata, oldMetadata: Metadata, + def hasNoColumnMappingSchemaChanges( + newMetadata: AbstractMetadata, + oldMetadata: AbstractMetadata, allowUnsafeReadOnPartitionChanges: Boolean = false): Boolean = { - def hasColMappingOrPartitionSchemaChangeByMetadata(newMetadata: Metadata, - oldMetadata: Metadata): Boolean = { + def hasColMappingOrPartitionSchemaChangeByMetadata( + newMetadata: AbstractMetadata, oldMetadata: AbstractMetadata): Boolean = { val isBothColumnMappingEnabled = newMetadata.columnMappingMode != NoMapping && oldMetadata.columnMappingMode != NoMapping hasColMappingOrPartitionSchemaChange( @@ -802,15 +824,22 @@ trait DeltaColumnMappingBase extends DeltaLogging { // the new metadata, as the upgrade would use the logical name as the physical name, we could // easily capture any difference in the schema using the same is{Drop,Rename}ColumnOperation // utils. - var upgradedMetadata = assignColumnIdAndPhysicalName( - oldMetadata, oldMetadata, isChangingModeOnExistingTable = true, isOverwritingSchema = false - ) - // need to change to a column mapping mode too so the utils below can recognize - upgradedMetadata = upgradedMetadata.copy( - configuration = upgradedMetadata.configuration ++ - Map(DeltaConfigs.COLUMN_MAPPING_MODE.key -> newMetadata.columnMappingMode.name) - ) - // use the same check + val (upgradedSchema, upgradedMaxId) = assignColumnIdAndPhysicalNameToSchema( + newMetadata = oldMetadata, oldMetadata = oldMetadata, + isChangingModeOnExistingTable = true, isOverwritingSchema = false) + // Construct an AbstractMetadata with the upgraded schema and the new column mapping mode + // so the comparison utils below can recognize column mapping metadata. + val upgradedMetadata = new AbstractMetadata { + val id: String = oldMetadata.id + val name: String = oldMetadata.name + val description: String = oldMetadata.description + val schema: StructType = upgradedSchema + val partitionColumns: Seq[String] = oldMetadata.partitionColumns + val configuration: Map[String, String] = oldMetadata.configuration + + (DeltaConfigs.COLUMN_MAPPING_MODE.key -> newMetadata.columnMappingMode.name, + DeltaConfigs.COLUMN_MAPPING_MAX_ID.key -> upgradedMaxId.toString) + val columnMappingMode: DeltaColumnMappingMode = newMetadata.columnMappingMode + } !hasColMappingOrPartitionSchemaChangeByMetadata(newMetadata, upgradedMetadata) } else { // Prohibit reading across a downgrade. diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala index b54217d9a7d..b13ec642a3a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala @@ -143,14 +143,8 @@ trait DeltaSourceBase extends Source */ protected val readSchemaAtSourceInit: StructType = readSnapshotDescriptor.metadata.schema - protected val readPartitionSchemaAtSourceInit: StructType = - readSnapshotDescriptor.metadata.partitionSchema - protected val readProtocolAtSourceInit: Protocol = readSnapshotDescriptor.protocol - protected val readConfigurationsAtSourceInit: Map[String, String] = - readSnapshotDescriptor.metadata.configuration - /** * Create a snapshot descriptor, customizing its metadata using metadata tracking if necessary */ diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataEvolutionSupport.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataEvolutionSupport.scala index 4085435cb4e..397a11fdb2b 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataEvolutionSupport.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataEvolutionSupport.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.delta.actions.{Action, FileAction, Metadata, Protoco import org.apache.spark.sql.delta.schema.SchemaUtils import org.apache.spark.sql.delta.storage.ClosableIterator import org.apache.spark.sql.delta.storage.ClosableIterator._ +import org.apache.spark.sql.delta.v2.interop.{AbstractMetadata, AbstractProtocol} import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession @@ -89,26 +90,13 @@ import org.apache.spark.sql.types.StructType */ trait DeltaSourceMetadataEvolutionSupport extends DeltaSourceBase { base: DeltaSource => - /** - * Whether this DeltaSource is utilizing a schema log entry as its read schema. - * - * If user explicitly turn on the flag to fall back to using latest schema to read (i.e. the - * legacy mode), we will ignore the schema log. - */ protected def trackingMetadataChange: Boolean = - !schemaReadOptions.allowUnsafeStreamingReadOnColumnMappingSchemaChanges && - metadataTrackingLog.flatMap(_.getCurrentTrackedMetadata).nonEmpty + DeltaSourceMetadataEvolutionSupport.shouldTrackMetadataChange( + schemaReadOptions, metadataTrackingLog) - /** - * Whether a schema tracking log is provided (and is empty), so we could initialize eagerly. - * This should only be used for the first write to the schema log, after then, schema tracking - * should not rely on this state any more. - */ protected def readyToInitializeMetadataTrackingEagerly: Boolean = - !schemaReadOptions.allowUnsafeStreamingReadOnColumnMappingSchemaChanges && - metadataTrackingLog.exists { log => - log.getCurrentTrackedMetadata.isEmpty && log.initMetadataLogEagerly - } + DeltaSourceMetadataEvolutionSupport.shouldInitializeMetadataTrackingEagerly( + schemaReadOptions, metadataTrackingLog) /** @@ -126,44 +114,20 @@ trait DeltaSourceMetadataEvolutionSupport extends DeltaSourceBase { base: DeltaS } } - /** - * Check the table metadata or protocol changed since the initial read snapshot. We make sure: - * 1. The schema is the same, except for internal metadata, AND - * 2. The delta related table configurations are strictly equal, AND - * 3. The incoming metadata change should not be considered a failure-causing change if we have - * marked the persisted schema and the stream progress is behind that schema version. - * This could happen when we've already merged consecutive schema changes during the analysis - * phase and we are using the merged schema as the read schema. All the schema changes in - * between can be safely ignored because they won't contribute any data. - */ private def hasMetadataOrProtocolChangeComparedToStreamMetadata( metadataChangeOpt: Option[Metadata], protocolChangeOpt: Option[Protocol], newSchemaVersion: Long): Boolean = { - if (persistedMetadataAtSourceInit.exists(_.deltaCommitVersion >= newSchemaVersion)) { - false - } else { - protocolChangeOpt.exists(_ != readProtocolAtSourceInit) || - metadataChangeOpt.exists { newMetadata => - hasSchemaChangeComparedToStreamMetadata(newMetadata.schema) || - newMetadata.partitionSchema != readPartitionSchemaAtSourceInit || - newMetadata.configuration.filterKeys(_.startsWith("delta.")).toMap != - readConfigurationsAtSourceInit.filterKeys(_.startsWith("delta.")).toMap - } - } + DeltaSourceMetadataEvolutionSupport.hasMetadataOrProtocolChangeComparedToStreamMetadata( + metadataChangeOpt, + protocolChangeOpt, + newSchemaVersion, + persistedMetadataAtSourceInit, + readProtocolAtSourceInit, + readSnapshotDescriptor.metadata, + spark) } - /** - * Check that the give schema is the same as the schema from the initial read snapshot. - */ - private def hasSchemaChangeComparedToStreamMetadata(newSchema: StructType): Boolean = - if (spark.conf.get(DeltaSQLConf.DELTA_STREAMING_IGNORE_INTERNAL_METADATA_FOR_SCHEMA_CHANGE)) { - DeltaTableUtils.removeInternalWriterMetadata(spark, newSchema) != - DeltaTableUtils.removeInternalWriterMetadata(spark, readSchemaAtSourceInit) - } else { - newSchema != readSchemaAtSourceInit - } - /** * If the current stream metadata is not equal to the metadata change in [[metadataChangeOpt]], * return a metadata change barrier [[IndexedFile]]. @@ -685,6 +649,90 @@ object DeltaSourceMetadataEvolutionSupport extends Logging { spark.sessionState.conf.getConf( DeltaSQLConf.DELTA_TYPE_WIDENING_BYPASS_STREAMING_TYPE_CHANGE_CHECK) + /** + * Whether this DeltaSource is utilizing a schema log entry as its read schema. + * + * If user explicitly turn on the flag to fall back to using latest schema to read (i.e. the + * legacy mode), we will ignore the schema log. + */ + def shouldTrackMetadataChange( + schemaReadOptions: DeltaStreamUtils.SchemaReadOptions, + metadataTrackingLog: Option[DeltaSourceMetadataTrackingLog]): Boolean = { + !schemaReadOptions.allowUnsafeStreamingReadOnColumnMappingSchemaChanges && + metadataTrackingLog.flatMap(_.getCurrentTrackedMetadata).nonEmpty + } + + /** + * Whether a schema tracking log is provided (and is empty), so we could initialize eagerly. + * This should only be used for the first write to the schema log, after then, schema tracking + * should not rely on this state any more. + */ + def shouldInitializeMetadataTrackingEagerly( + schemaReadOptions: DeltaStreamUtils.SchemaReadOptions, + metadataTrackingLog: Option[DeltaSourceMetadataTrackingLog]): Boolean = { + !schemaReadOptions.allowUnsafeStreamingReadOnColumnMappingSchemaChanges && + metadataTrackingLog.exists { log => + log.getCurrentTrackedMetadata.isEmpty && log.initMetadataLogEagerly + } + } + + /** + * Check the table metadata or protocol changed since the initial read snapshot. We make sure: + * 1. The schema is the same, except for internal metadata, AND + * 2. The delta related table configurations are strictly equal, AND + * 3. The incoming metadata change should not be considered a failure-causing change if we have + * marked the persisted schema and the stream progress is behind that schema version. + * This could happen when we've already merged consecutive schema changes during the analysis + * phase and we are using the merged schema as the read schema. All the schema changes in + * between can be safely ignored because they won't contribute any data. + * + * @param metadataChangeOpt New metadata action, if any. + * @param protocolChangeOpt New protocol action, if any. + * @param newSchemaVersion The version of the incoming change. + * @param persistedMetadataAtSourceInit The persisted metadata at source init, if any. + * @param readProtocolAtSourceInit The protocol at source init. + * @param readMetadataAtSourceInit The metadata at source init (schema, partition schema, and + * configuration). Bundled to avoid the swap footgun of three + * adjacent params. + * @param spark The SparkSession (used for SQL conf checks). + */ + def hasMetadataOrProtocolChangeComparedToStreamMetadata( + metadataChangeOpt: Option[AbstractMetadata], + protocolChangeOpt: Option[AbstractProtocol], + newSchemaVersion: Long, + persistedMetadataAtSourceInit: Option[PersistedMetadata], + readProtocolAtSourceInit: AbstractProtocol, + readMetadataAtSourceInit: AbstractMetadata, + spark: SparkSession): Boolean = { + if (persistedMetadataAtSourceInit.exists(_.deltaCommitVersion >= newSchemaVersion)) { + false + } else { + protocolChangeOpt.exists(p => !p.equalsByFields(readProtocolAtSourceInit)) || + metadataChangeOpt.exists { newMetadata => + hasSchemaChangeComparedToStreamMetadata( + newMetadata.schema, readMetadataAtSourceInit.schema, spark) || + newMetadata.partitionSchema != readMetadataAtSourceInit.partitionSchema || + newMetadata.configuration.filterKeys(_.startsWith("delta.")).toMap != + readMetadataAtSourceInit.configuration.filterKeys(_.startsWith("delta.")).toMap + } + } + } + + /** + * Check that the given schema is the same as the schema from the initial read snapshot. + */ + private def hasSchemaChangeComparedToStreamMetadata( + newSchema: StructType, + readSchemaAtSourceInit: StructType, + spark: SparkSession): Boolean = { + if (spark.conf.get(DeltaSQLConf.DELTA_STREAMING_IGNORE_INTERNAL_METADATA_FOR_SCHEMA_CHANGE)) { + DeltaTableUtils.removeInternalWriterMetadata(spark, newSchema) != + DeltaTableUtils.removeInternalWriterMetadata(spark, readSchemaAtSourceInit) + } else { + newSchema != readSchemaAtSourceInit + } + } + /** * Speculate ahead and find the next merged consecutive metadata change if possible. * A metadata change is either: diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala index af9a9b346fd..907d55e76aa 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.delta.schema.SchemaMergingUtils import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.{DeltaSQLCommandTest, DeltaSQLTestUtils} import org.apache.spark.sql.delta.test.DeltaTestImplicits._ +import org.apache.spark.sql.delta.v2.interop.AbstractMetadata import org.apache.hadoop.fs.Path import org.apache.parquet.format.converter.ParquetMetadataConverter import org.apache.parquet.hadoop.ParquetFileReader @@ -639,6 +640,57 @@ class DeltaColumnMappingSuite extends QueryTest } } + test("hasNoColumnMappingSchemaChanges accepts non-Metadata AbstractMetadata inputs") { + // Anonymous AbstractMetadata impls (not the V1 Metadata action) prove the API actually + // relies on the abstract surface. Especially exercises the NoMapping -> NameMapping upgrade + // branch, which synthesizes a fresh AbstractMetadata internally and reads back from it. + def mkAbstractMetadata( + sch: StructType, + mode: DeltaColumnMappingMode, + conf: Map[String, String] = Map.empty): AbstractMetadata = new AbstractMetadata { + override def id: String = "tid" + override def name: String = "" + override def description: String = "" + override def schema: StructType = sch + override def partitionColumns: Seq[String] = Seq.empty + override def configuration: Map[String, String] = conf + override def columnMappingMode: DeltaColumnMappingMode = mode + } + + val baseSchema = new StructType().add("a", IntegerType).add("b", IntegerType) + val oldNoMapping = mkAbstractMetadata(baseSchema, NoMapping) + + // Upgrade with no other change: synthesized post-upgrade schema uses logical names as + // physical names, matching what we pass for `new` -> read-compatible. + val newNameMappingSameSchema = mkAbstractMetadata( + DeltaColumnMapping.setPhysicalNames( + baseSchema, Map(Seq("a") -> "a", Seq("b") -> "b")), + NameMapping) + assert(DeltaColumnMapping.hasNoColumnMappingSchemaChanges( + newNameMappingSameSchema, oldNoMapping)) + + // Upgrade + drop: detected as a non-additive change. + val newNameMappingDropped = mkAbstractMetadata( + DeltaColumnMapping.setPhysicalNames( + new StructType().add("a", IntegerType), Map(Seq("a") -> "a")), + NameMapping) + assert(!DeltaColumnMapping.hasNoColumnMappingSchemaChanges( + newNameMappingDropped, oldNoMapping)) + + // Upgrade + rename (renamed column's physical name diverges from its logical name). + val newNameMappingRenamed = mkAbstractMetadata( + DeltaColumnMapping.setPhysicalNames( + new StructType().add("c", IntegerType).add("b", IntegerType), + Map(Seq("c") -> "a", Seq("b") -> "b")), + NameMapping) + assert(!DeltaColumnMapping.hasNoColumnMappingSchemaChanges( + newNameMappingRenamed, oldNoMapping)) + + // Downgrade NameMapping -> NoMapping is prohibited. + assert(!DeltaColumnMapping.hasNoColumnMappingSchemaChanges( + oldNoMapping, newNameMappingSameSchema)) + } + testColumnMapping("create table through raw schema API should " + "auto bump the version and retain input metadata") { mode => diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataEvolutionSupportSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataEvolutionSupportSuite.scala index 90b85e5499c..2f8676c6008 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataEvolutionSupportSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataEvolutionSupportSuite.scala @@ -16,11 +16,13 @@ package org.apache.spark.sql.delta.sources -import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaOptions, DeltaTestUtilsBase, DeltaThrowable} +import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaColumnMappingMode, DeltaOptions} +import org.apache.spark.sql.delta.{DeltaTestUtilsBase, DeltaThrowable, NoMapping} +import org.apache.spark.sql.delta.v2.interop.{AbstractMetadata, AbstractProtocol} import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{StringType, StructType} /** * Unit tests covering `DeltaSourceMetadataEvolutionSupport`, which detects non-additive schema @@ -662,4 +664,80 @@ class DeltaSourceMetadataEvolutionSupportSuite previousSchema = persistedMetadata("a byte", Map.empty) ) } + + test("detects metadata/protocol changes through the AbstractMetadata/AbstractProtocol " + + "surface") { + // Anonymous trait impls (not V1 Metadata/Protocol) prove the static actually relies on + // the abstract surface. The trait wrapper used in production always passes V1 types, + // so this is the only path that would catch a regression specific to non-V1 impls. + val baseSchema = new StructType().add("a", StringType, nullable = true) + + def mkMetadata( + sch: StructType = baseSchema, + partCols: Seq[String] = Seq.empty, + conf: Map[String, String] = Map.empty): AbstractMetadata = new AbstractMetadata { + override def id: String = "tid" + override def name: String = "" + override def description: String = "" + override def schema: StructType = sch + override def partitionColumns: Seq[String] = partCols + override def configuration: Map[String, String] = conf + override def columnMappingMode: DeltaColumnMappingMode = NoMapping + } + + def mkProtocol( + readerV: Int = 1, + writerV: Int = 2, + readerFs: Option[Set[String]] = None, + writerFs: Option[Set[String]] = None): AbstractProtocol = new AbstractProtocol { + override def minReaderVersion: Int = readerV + override def minWriterVersion: Int = writerV + override def readerFeatures: Option[Set[String]] = readerFs + override def writerFeatures: Option[Set[String]] = writerFs + } + + val readMetadata = mkMetadata() + val readProtocol = mkProtocol() + + def call( + metadataChange: Option[AbstractMetadata] = None, + protocolChange: Option[AbstractProtocol] = None, + newVer: Long = 1L, + persisted: Option[PersistedMetadata] = None): Boolean = + DeltaSourceMetadataEvolutionSupport.hasMetadataOrProtocolChangeComparedToStreamMetadata( + metadataChange, protocolChange, newVer, persisted, readProtocol, readMetadata, spark) + + // No change: both sides identical anonymous impls -> false. + assert(!call(metadataChange = Some(mkMetadata()), protocolChange = Some(mkProtocol()))) + + // Schema differs. + assert(call(metadataChange = + Some(mkMetadata(sch = baseSchema.add("b", StringType, nullable = true))))) + + // Partition schema differs (same data schema, different partition columns). + assert(call(metadataChange = Some(mkMetadata(partCols = Seq("a"))))) + + // delta.* configuration differs. + assert(call(metadataChange = Some(mkMetadata(conf = Map("delta.foo" -> "bar"))))) + + // Non-delta.* configuration differs -> filtered out, no change. + assert(!call(metadataChange = Some(mkMetadata(conf = Map("foo" -> "bar"))))) + + // Protocol differs by a single field (uses equalsByFields under the hood). + assert(call(protocolChange = Some(mkProtocol(readerV = 2)))) + + // Persisted metadata is at or beyond newSchemaVersion -> short-circuits to false even if + // every other input would otherwise indicate a change. + val persisted = PersistedMetadata( + tableId = "tid", + deltaCommitVersion = 5L, + dataSchemaJson = baseSchema.json, + partitionSchemaJson = new StructType().json, + sourceMetadataPath = "") + assert(!call( + metadataChange = Some(mkMetadata(sch = baseSchema.add("b", StringType, nullable = true))), + protocolChange = Some(mkProtocol(readerV = 99)), + newVer = 3L, + persisted = Some(persisted))) + } }