Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.delta.actions.{Metadata, Protocol}
import org.apache.spark.sql.delta.commands.cdc.CDCReader
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils}
import org.apache.spark.sql.delta.v2.interop.AbstractMetadata
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods._
Expand Down Expand Up @@ -425,32 +426,34 @@ trait DeltaColumnMappingBase extends DeltaLogging {
}

/**
* For each column/field in a Metadata's schema, assign id using the current maximum id
* as the basis and increment from there, and assign physical name using UUID
* @param newMetadata The new metadata to assign Ids and physical names
* @param oldMetadata The old metadata
* @param isChangingModeOnExistingTable whether this is part of a commit that changes the
* mapping mode on a existing table
* @return new metadata with Ids and physical names assigned
* Core logic for assigning column IDs and physical names to a schema.
* Takes [[AbstractMetadata]] (no v1 Metadata dependency) so it can be reused by both v1
* and v2 connectors. Bundling schema + configuration on each side avoids the swap footgun
* of having two `StructType` and two `Map` parameters next to each other.
*
* @return (upgradedSchema, maxColumnId) - the schema with IDs/physical names assigned,
* and the final max column ID.
*/
def assignColumnIdAndPhysicalName(
newMetadata: Metadata,
oldMetadata: Metadata,
private[delta] def assignColumnIdAndPhysicalNameToSchema(
newMetadata: AbstractMetadata,
oldMetadata: AbstractMetadata,
isChangingModeOnExistingTable: Boolean,
isOverwritingSchema: Boolean): Metadata = {
val rawSchema = newMetadata.schema
var maxId = DeltaConfigs.COLUMN_MAPPING_MAX_ID.fromMetaData(newMetadata) max
DeltaConfigs.COLUMN_MAPPING_MAX_ID.fromMetaData(oldMetadata) max
findMaxColumnId(rawSchema)
val startId = maxId
val newSchema =
SchemaMergingUtils.transformColumns(rawSchema)((path, field, _) => {
isOverwritingSchema: Boolean): (StructType, Long) = {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two StructType then two Map right next to each other - easy to swap new and old by mistake. group them into one case class?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changes to use abstractMetadata

val newSchema = newMetadata.schema
val oldSchema = oldMetadata.schema
val newConfiguration = newMetadata.configuration
val oldConfiguration = oldMetadata.configuration
var maxId = DeltaConfigs.COLUMN_MAPPING_MAX_ID.fromMap(newConfiguration) max
DeltaConfigs.COLUMN_MAPPING_MAX_ID.fromMap(oldConfiguration) max
findMaxColumnId(newSchema)
val resultSchema =
SchemaMergingUtils.transformColumns(newSchema)((path, field, _) => {
val builder = new MetadataBuilder().withMetadata(field.metadata)

lazy val fullName = path :+ field.name
lazy val existingFieldOpt =
SchemaUtils.findNestedFieldIgnoreCase(
oldMetadata.schema, fullName, includeCollections = true)
oldSchema, fullName, includeCollections = true)
lazy val canReuseColumnMappingMetadataDuringOverwrite = {
val canReuse =
isOverwritingSchema &&
Expand Down Expand Up @@ -484,12 +487,12 @@ trait DeltaColumnMappingBase extends DeltaLogging {
if (!hasPhysicalName(field)) {
val physicalName = if (isChangingModeOnExistingTable) {
if (existingFieldOpt.isEmpty) {
if (oldMetadata.schema.isEmpty) {
if (oldSchema.isEmpty) {
// We should relax the check for tables that have both an empty schema
// and no data. Assumption: no schema => no data
generatePhysicalName
} else throw DeltaErrors.schemaChangeDuringMappingModeChangeNotSupported(
oldMetadata.schema, newMetadata.schema)
oldSchema, newSchema)
} else {
// When changing from NoMapping to NameMapping mode, we directly use old display names
// as physical names. This is by design: 1) We don't need to rewrite the
Expand All @@ -509,13 +512,30 @@ trait DeltaColumnMappingBase extends DeltaLogging {
}
field.copy(metadata = builder.build())
})

// Starting from IcebergCompatV2, we require writing field-id for List/Map nested fields
val (finalSchema, newMaxId) = if (IcebergCompat.isGeqEnabled(newMetadata, 2)) {
rewriteFieldIdsForIceberg(newSchema, maxId)
if (IcebergCompat.anyEnabled(newConfiguration).exists(_.version >= 2)) {
rewriteFieldIdsForIceberg(resultSchema, maxId)
} else {
(newSchema, maxId)
(resultSchema, maxId)
}
}

/**
* For each column/field in a Metadata's schema, assign id using the current maximum id
* as the basis and increment from there, and assign physical name using UUID
* @param newMetadata The new metadata to assign Ids and physical names
* @param oldMetadata The old metadata
* @param isChangingModeOnExistingTable whether this is part of a commit that changes the
* mapping mode on a existing table
* @return new metadata with Ids and physical names assigned
*/
def assignColumnIdAndPhysicalName(
newMetadata: Metadata,
oldMetadata: Metadata,
isChangingModeOnExistingTable: Boolean,
isOverwritingSchema: Boolean): Metadata = {
val (finalSchema, newMaxId) = assignColumnIdAndPhysicalNameToSchema(
newMetadata, oldMetadata, isChangingModeOnExistingTable, isOverwritingSchema)

newMetadata.copy(
schemaString = finalSchema.json,
Expand Down Expand Up @@ -775,10 +795,12 @@ trait DeltaColumnMappingBase extends DeltaLogging {
* As of now, `newMetadata` is column mapping read compatible with `oldMetadata` if
* no rename column or drop column has happened in-between.
*/
def hasNoColumnMappingSchemaChanges(newMetadata: Metadata, oldMetadata: Metadata,
def hasNoColumnMappingSchemaChanges(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add one test that calls hasNoColumnMappingSchemaChanges with non-Metadata AbstractMetadata inputs, especially the NoMapping -> NameMapping upgrade path, so we prove this refactor actually works for the V2 caller?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rough idea: add a small DeltaColumnMappingSuite test with anonymous AbstractMetadata wrappers around the same schemas/configs, then call DeltaColumnMapping.hasNoColumnMappingSchemaChanges(newAbstractMetadata, oldAbstractMetadata) and assert the expected true/false result.

newMetadata: AbstractMetadata,
oldMetadata: AbstractMetadata,
allowUnsafeReadOnPartitionChanges: Boolean = false): Boolean = {
def hasColMappingOrPartitionSchemaChangeByMetadata(newMetadata: Metadata,
oldMetadata: Metadata): Boolean = {
def hasColMappingOrPartitionSchemaChangeByMetadata(
newMetadata: AbstractMetadata, oldMetadata: AbstractMetadata): Boolean = {
val isBothColumnMappingEnabled =
newMetadata.columnMappingMode != NoMapping && oldMetadata.columnMappingMode != NoMapping
hasColMappingOrPartitionSchemaChange(
Expand All @@ -802,15 +824,22 @@ trait DeltaColumnMappingBase extends DeltaLogging {
// the new metadata, as the upgrade would use the logical name as the physical name, we could
// easily capture any difference in the schema using the same is{Drop,Rename}ColumnOperation
// utils.
var upgradedMetadata = assignColumnIdAndPhysicalName(
oldMetadata, oldMetadata, isChangingModeOnExistingTable = true, isOverwritingSchema = false
)
// need to change to a column mapping mode too so the utils below can recognize
upgradedMetadata = upgradedMetadata.copy(
configuration = upgradedMetadata.configuration ++
Map(DeltaConfigs.COLUMN_MAPPING_MODE.key -> newMetadata.columnMappingMode.name)
)
// use the same check
val (upgradedSchema, upgradedMaxId) = assignColumnIdAndPhysicalNameToSchema(
newMetadata = oldMetadata, oldMetadata = oldMetadata,
isChangingModeOnExistingTable = true, isOverwritingSchema = false)
// Construct an AbstractMetadata with the upgraded schema and the new column mapping mode
// so the comparison utils below can recognize column mapping metadata.
val upgradedMetadata = new AbstractMetadata {
val id: String = oldMetadata.id
val name: String = oldMetadata.name
val description: String = oldMetadata.description
val schema: StructType = upgradedSchema
val partitionColumns: Seq[String] = oldMetadata.partitionColumns
val configuration: Map[String, String] = oldMetadata.configuration +
(DeltaConfigs.COLUMN_MAPPING_MODE.key -> newMetadata.columnMappingMode.name,
DeltaConfigs.COLUMN_MAPPING_MAX_ID.key -> upgradedMaxId.toString)
val columnMappingMode: DeltaColumnMappingMode = newMetadata.columnMappingMode
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

9 lines of anonymous trait in the middle of a method. v2 will copy this pattern. can we extract a small AbstractMetadataAdapter case class?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already KernelMetadataAdapter in v2 so we won't copy this pattern. Because oldMetadata is of trait AbstractMetadata, we cannot call copy() on it, current code is the idiomatic way to "copy with overrides" on a trait

!hasColMappingOrPartitionSchemaChangeByMetadata(newMetadata, upgradedMetadata)
} else {
// Prohibit reading across a downgrade.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,8 @@ trait DeltaSourceBase extends Source
*/
protected val readSchemaAtSourceInit: StructType = readSnapshotDescriptor.metadata.schema

protected val readPartitionSchemaAtSourceInit: StructType =
readSnapshotDescriptor.metadata.partitionSchema

protected val readProtocolAtSourceInit: Protocol = readSnapshotDescriptor.protocol

protected val readConfigurationsAtSourceInit: Map[String, String] =
readSnapshotDescriptor.metadata.configuration

/**
* Create a snapshot descriptor, customizing its metadata using metadata tracking if necessary
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.delta.actions.{Action, FileAction, Metadata, Protoco
import org.apache.spark.sql.delta.schema.SchemaUtils
import org.apache.spark.sql.delta.storage.ClosableIterator
import org.apache.spark.sql.delta.storage.ClosableIterator._
import org.apache.spark.sql.delta.v2.interop.{AbstractMetadata, AbstractProtocol}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
Expand Down Expand Up @@ -89,26 +90,13 @@ import org.apache.spark.sql.types.StructType
*/
trait DeltaSourceMetadataEvolutionSupport extends DeltaSourceBase { base: DeltaSource =>

/**
* Whether this DeltaSource is utilizing a schema log entry as its read schema.
*
* If user explicitly turn on the flag to fall back to using latest schema to read (i.e. the
* legacy mode), we will ignore the schema log.
*/
protected def trackingMetadataChange: Boolean =
!schemaReadOptions.allowUnsafeStreamingReadOnColumnMappingSchemaChanges &&
metadataTrackingLog.flatMap(_.getCurrentTrackedMetadata).nonEmpty
DeltaSourceMetadataEvolutionSupport.shouldTrackMetadataChange(
schemaReadOptions, metadataTrackingLog)

/**
* Whether a schema tracking log is provided (and is empty), so we could initialize eagerly.
* This should only be used for the first write to the schema log, after then, schema tracking
* should not rely on this state any more.
*/
protected def readyToInitializeMetadataTrackingEagerly: Boolean =
!schemaReadOptions.allowUnsafeStreamingReadOnColumnMappingSchemaChanges &&
metadataTrackingLog.exists { log =>
log.getCurrentTrackedMetadata.isEmpty && log.initMetadataLogEagerly
}
DeltaSourceMetadataEvolutionSupport.shouldInitializeMetadataTrackingEagerly(
schemaReadOptions, metadataTrackingLog)


/**
Expand All @@ -126,44 +114,20 @@ trait DeltaSourceMetadataEvolutionSupport extends DeltaSourceBase { base: DeltaS
}
}

/**
* Check the table metadata or protocol changed since the initial read snapshot. We make sure:
* 1. The schema is the same, except for internal metadata, AND
* 2. The delta related table configurations are strictly equal, AND
* 3. The incoming metadata change should not be considered a failure-causing change if we have
* marked the persisted schema and the stream progress is behind that schema version.
* This could happen when we've already merged consecutive schema changes during the analysis
* phase and we are using the merged schema as the read schema. All the schema changes in
* between can be safely ignored because they won't contribute any data.
*/
private def hasMetadataOrProtocolChangeComparedToStreamMetadata(
metadataChangeOpt: Option[Metadata],
protocolChangeOpt: Option[Protocol],
newSchemaVersion: Long): Boolean = {
if (persistedMetadataAtSourceInit.exists(_.deltaCommitVersion >= newSchemaVersion)) {
false
} else {
protocolChangeOpt.exists(_ != readProtocolAtSourceInit) ||
metadataChangeOpt.exists { newMetadata =>
hasSchemaChangeComparedToStreamMetadata(newMetadata.schema) ||
newMetadata.partitionSchema != readPartitionSchemaAtSourceInit ||
newMetadata.configuration.filterKeys(_.startsWith("delta.")).toMap !=
readConfigurationsAtSourceInit.filterKeys(_.startsWith("delta.")).toMap
}
}
DeltaSourceMetadataEvolutionSupport.hasMetadataOrProtocolChangeComparedToStreamMetadata(
metadataChangeOpt,
protocolChangeOpt,
newSchemaVersion,
persistedMetadataAtSourceInit,
readProtocolAtSourceInit,
readSnapshotDescriptor.metadata,
spark)
}

/**
* Check that the give schema is the same as the schema from the initial read snapshot.
*/
private def hasSchemaChangeComparedToStreamMetadata(newSchema: StructType): Boolean =
if (spark.conf.get(DeltaSQLConf.DELTA_STREAMING_IGNORE_INTERNAL_METADATA_FOR_SCHEMA_CHANGE)) {
DeltaTableUtils.removeInternalWriterMetadata(spark, newSchema) !=
DeltaTableUtils.removeInternalWriterMetadata(spark, readSchemaAtSourceInit)
} else {
newSchema != readSchemaAtSourceInit
}

/**
* If the current stream metadata is not equal to the metadata change in [[metadataChangeOpt]],
* return a metadata change barrier [[IndexedFile]].
Expand Down Expand Up @@ -685,6 +649,90 @@ object DeltaSourceMetadataEvolutionSupport extends Logging {
spark.sessionState.conf.getConf(
DeltaSQLConf.DELTA_TYPE_WIDENING_BYPASS_STREAMING_TYPE_CHANGE_CHECK)

/**
* Whether this DeltaSource is utilizing a schema log entry as its read schema.
*
* If user explicitly turn on the flag to fall back to using latest schema to read (i.e. the
* legacy mode), we will ignore the schema log.
*/
def shouldTrackMetadataChange(
schemaReadOptions: DeltaStreamUtils.SchemaReadOptions,
metadataTrackingLog: Option[DeltaSourceMetadataTrackingLog]): Boolean = {
!schemaReadOptions.allowUnsafeStreamingReadOnColumnMappingSchemaChanges &&
metadataTrackingLog.flatMap(_.getCurrentTrackedMetadata).nonEmpty
}

/**
* Whether a schema tracking log is provided (and is empty), so we could initialize eagerly.
* This should only be used for the first write to the schema log, after then, schema tracking
* should not rely on this state any more.
*/
def shouldInitializeMetadataTrackingEagerly(
schemaReadOptions: DeltaStreamUtils.SchemaReadOptions,
metadataTrackingLog: Option[DeltaSourceMetadataTrackingLog]): Boolean = {
!schemaReadOptions.allowUnsafeStreamingReadOnColumnMappingSchemaChanges &&
metadataTrackingLog.exists { log =>
log.getCurrentTrackedMetadata.isEmpty && log.initMetadataLogEagerly
}
}

/**
* Check the table metadata or protocol changed since the initial read snapshot. We make sure:
* 1. The schema is the same, except for internal metadata, AND
* 2. The delta related table configurations are strictly equal, AND
* 3. The incoming metadata change should not be considered a failure-causing change if we have
* marked the persisted schema and the stream progress is behind that schema version.
* This could happen when we've already merged consecutive schema changes during the analysis
* phase and we are using the merged schema as the read schema. All the schema changes in
* between can be safely ignored because they won't contribute any data.
*
* @param metadataChangeOpt New metadata action, if any.
* @param protocolChangeOpt New protocol action, if any.
* @param newSchemaVersion The version of the incoming change.
* @param persistedMetadataAtSourceInit The persisted metadata at source init, if any.
* @param readProtocolAtSourceInit The protocol at source init.
* @param readMetadataAtSourceInit The metadata at source init (schema, partition schema, and
* configuration). Bundled to avoid the swap footgun of three
* adjacent params.
* @param spark The SparkSession (used for SQL conf checks).
*/
def hasMetadataOrProtocolChangeComparedToStreamMetadata(
metadataChangeOpt: Option[AbstractMetadata],
protocolChangeOpt: Option[AbstractProtocol],
newSchemaVersion: Long,
persistedMetadataAtSourceInit: Option[PersistedMetadata],
readProtocolAtSourceInit: AbstractProtocol,
readMetadataAtSourceInit: AbstractMetadata,
spark: SparkSession): Boolean = {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

9 params is a lot. can we bundle the *AtSourceInit ones into a small case class before v2 calls in?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replaced it with readMetadataAtSourceInit

if (persistedMetadataAtSourceInit.exists(_.deltaCommitVersion >= newSchemaVersion)) {
false
} else {
protocolChangeOpt.exists(p => !p.equalsByFields(readProtocolAtSourceInit)) ||
metadataChangeOpt.exists { newMetadata =>
hasSchemaChangeComparedToStreamMetadata(
newMetadata.schema, readMetadataAtSourceInit.schema, spark) ||
newMetadata.partitionSchema != readMetadataAtSourceInit.partitionSchema ||
newMetadata.configuration.filterKeys(_.startsWith("delta.")).toMap !=
readMetadataAtSourceInit.configuration.filterKeys(_.startsWith("delta.")).toMap
}
}
}

/**
* Check that the given schema is the same as the schema from the initial read snapshot.
*/
private def hasSchemaChangeComparedToStreamMetadata(
newSchema: StructType,
readSchemaAtSourceInit: StructType,
spark: SparkSession): Boolean = {
if (spark.conf.get(DeltaSQLConf.DELTA_STREAMING_IGNORE_INTERNAL_METADATA_FOR_SCHEMA_CHANGE)) {
DeltaTableUtils.removeInternalWriterMetadata(spark, newSchema) !=
DeltaTableUtils.removeInternalWriterMetadata(spark, readSchemaAtSourceInit)
} else {
newSchema != readSchemaAtSourceInit
}
}

/**
* Speculate ahead and find the next merged consecutive metadata change if possible.
* A metadata change is either:
Expand Down
Loading
Loading