Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -435,25 +435,11 @@ trait OptimisticTransactionImpl extends TransactionHelper
val shouldUnsetCatalogOwnedConf =
isActiveReplaceCommand && CatalogOwnedTableUtils.defaultCatalogOwnedEnabled(spark)
val conf = if (shouldUnsetCatalogOwnedConf) {
// Unset default CatalogOwned enablement iff:
// 0. `isCreatingNewTable` indicates that this either is a REPLACE or CREATE command.
// 1. `readVersion != 1` indicates the table already exists.
// - 0) and 1) suggest that this is an active REPLACE command.
// 2. Default CC enablement is set in the spark conf.
// This prevents any unintended modifications to the `newProtocol`.
// E.g., [[CatalogOwnedTableFeature]] and its dependent features
// [[InCommitTimestampTableFeature]] & [[VacuumProtocolCheckTableFeature]].
//
// Note that this does *not* affect global spark conf state as we are modifying
// the copy of `spark.sessionState.conf`. Thus, `defaultCatalogOwnedFeatureEnabledKey`
// will remain unchanged for any concurrent operations that use the same SparkSession.
// Isolate the spark conf used by [[DeltaConfigs.mergeGlobalConfigs]].
val clonedConf = spark.sessionState.conf.clone()
val defaultCatalogOwnedFeatureEnabledKey =
TableFeatureProtocolUtils.defaultPropertyKey(CatalogOwnedTableFeature)
// Isolate the spark conf to be used in the subsequent [[DeltaConfigs.mergeGlobalConfigs]]
// by cloning the existing configuration.
// Note: [[SQLConf.clone]] is already atomic so no extra synchronization is needed.
val clonedConf = spark.sessionState.conf.clone()
// Unset default CC conf on the cloned spark conf.
// Default CatalogOwned enablement is ignored during REPLACE.
clonedConf.unsetConf(defaultCatalogOwnedFeatureEnabledKey)
clonedConf
} else {
Expand Down Expand Up @@ -955,8 +941,6 @@ trait OptimisticTransactionImpl extends TransactionHelper
* configurations is finalized.
*/
def updateMetadataForNewTableInReplace(metadata: Metadata): Unit = {
assert(CoordinatedCommitsUtils.getExplicitCCConfigurations(metadata.configuration).isEmpty,
"Command-specified Coordinated Commits configurations should have been blocked earlier.")
assert(!metadata.configuration.contains(UCCommitCoordinatorClient.UC_TABLE_ID_KEY),
"Command-specified Catalog-Owned table UUID (ucTableId) should have been blocked earlier.")
// Extract any existing ucTableId from the snapshot metadata.
Expand All @@ -982,9 +966,7 @@ trait OptimisticTransactionImpl extends TransactionHelper
}
// Update the metadata.
updateMetadataForNewTable(metadata)
// Now the `txn.metadata` contains all the command-specified properties and all the default
// properties. The latter might still contain Coordinated Commits configurations, so we need
// to remove them and retain the Coordinated Commits configurations from the existing table.
// Rebuild the final configuration from the new metadata and retained target-owned metadata.
val newConfsWithoutCC = newMetadata.get.configuration --
CoordinatedCommitsUtils.TABLE_PROPERTY_KEYS
val existingQoLConfsToRetain = existingQoLConfs.filterNot { case (key, _) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,30 +339,6 @@ abstract class CloneTableBase(
}
}

/**
* Priority of Coordinated Commits configurations:
* - When CLONE into a new table, explicit command specification takes precedence over default
* SparkSession configurations.
* - When CLONE into an existing table, use the existing table's configurations.
*/
private def determineCoordinatedCommitsConfigurations(
spark: SparkSession,
targetSnapshot: SnapshotDescriptor,
validatedOverrides: Map[String, String]): Map[String, String] = {
if (tableExists(targetSnapshot)) {
assert(validatedOverrides.isEmpty,
"Explicit overrides on Coordinated Commits configurations for existing tables" +
" are not supported, and should have been caught earlier.")
CoordinatedCommitsUtils.getExplicitCCConfigurations(targetSnapshot.metadata.configuration)
} else {
if (validatedOverrides.nonEmpty) {
validatedOverrides
} else {
CoordinatedCommitsUtils.getDefaultCCConfigurations(spark)
}
}
}

/**
* Helper function to determine [[UCCommitCoordinatorClient.UC_TABLE_ID_KEY]]
* for the target table.
Expand Down Expand Up @@ -390,22 +366,12 @@ abstract class CloneTableBase(
var metadata = prepareSourceMetadata(targetSnapshot, opName)
val validatedConfigurations = DeltaConfigs.validateConfigurations(tablePropertyOverrides)

// Finalize Coordinated Commits configurations for the target
val coordinatedCommitsConfigurationOverrides =
CoordinatedCommitsUtils.getExplicitCCConfigurations(validatedConfigurations)
val validatedConfigurationsWithoutCoordinatedCommits =
validatedConfigurations -- coordinatedCommitsConfigurationOverrides.keys
val finalCoordinatedCommitsConfigurations = determineCoordinatedCommitsConfigurations(
spark,
targetSnapshot,
coordinatedCommitsConfigurationOverrides)
val finalCatalogOwnedMetadata = finalCoordinatedCommitsConfigurations ++
determineCatalogOwnedUCTableId(targetSnapshot)

// Merge source configuration, table property overrides and coordinated-commits configurations.
val finalCatalogOwnedMetadata = determineCatalogOwnedUCTableId(targetSnapshot)

// Merge source configuration, table property overrides, and CatalogManaged metadata.
metadata = metadata.copy(configuration =
metadata.configuration ++
validatedConfigurationsWithoutCoordinatedCommits ++
validatedConfigurations ++
finalCatalogOwnedMetadata)

verifyMetadataInvariants(targetSnapshot, metadata)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,6 @@ case class CreateDeltaTableCommand(
}
val deltaLog = DeltaUtils.getDeltaLogFromTableOrPath(
sparkSession, existingTableOpt, tableLocation, fileSystemOptions)
CoordinatedCommitsUtils.validateConfigurationsForCreateDeltaTableCommand(
sparkSession, deltaLog.tableExists, query, tableWithLocation.properties)
CatalogOwnedTableUtils.validatePropertiesForCreateDeltaTableCommand(
spark = sparkSession,
tableExists = deltaLog.tableExists,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,6 @@ case class AlterTableSetPropertiesDeltaCommand(
true
}.toMap

// For Coordinated Commits table validation
CoordinatedCommitsUtils.validateConfigurationsForAlterTableSetPropertiesDeltaCommand(
existingConfs = metadata.configuration, propertyOverrides = filteredConfs)
// For Catalog Owned table validation
CatalogOwnedTableUtils.validatePropertiesForAlterTableSetPropertiesDeltaCommand(
txn.snapshot, propertyOverrides = filteredConfs)
Expand Down Expand Up @@ -239,8 +236,6 @@ case class AlterTableUnsetPropertiesDeltaCommand(
}

if (!fromDropFeatureCommand) {
CoordinatedCommitsUtils.validateConfigurationsForAlterTableUnsetPropertiesDeltaCommand(
existingConfs = metadata.configuration, propKeysToUnset = normalizedKeys)
CatalogOwnedTableUtils.validatePropertiesForAlterTableUnsetPropertiesDeltaCommand(
txn.snapshot, propKeysToUnset = normalizedKeys)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -855,179 +855,6 @@ object CoordinatedCommitsUtils extends DeltaLogging {
}.toMap
}

/**
* Verifies that the properties contain exactly the Coordinator Name and Coordinator Conf.
* If `fromDefault` is true, then the properties have keys with the default prefix.
*/
private def verifyContainsOnlyCoordinatorNameAndConf(
properties: Map[String, String],
command: String,
fromDefault: Boolean): Unit = {
Seq(DeltaConfigs.COORDINATED_COMMITS_TABLE_CONF).foreach { conf =>
if (fromDefault) {
if (properties.contains(conf.defaultTablePropertyKey)) {
throw new DeltaIllegalArgumentException(
errorClass = "DELTA_CONF_OVERRIDE_NOT_SUPPORTED_IN_SESSION",
messageParameters = Array(
command, conf.defaultTablePropertyKey, conf.defaultTablePropertyKey))
}
} else {
if (properties.contains(conf.key)) {
throw new DeltaIllegalArgumentException(
errorClass = "DELTA_CONF_OVERRIDE_NOT_SUPPORTED_IN_COMMAND",
messageParameters = Array(command, conf.key))
}
}
}
Seq(
DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME,
DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_CONF).foreach { conf =>
if (fromDefault) {
if (!properties.contains(conf.defaultTablePropertyKey)) {
throw new DeltaIllegalArgumentException(
errorClass = "DELTA_MUST_SET_ALL_COORDINATED_COMMITS_CONFS_IN_SESSION",
messageParameters = Array(command, conf.defaultTablePropertyKey))
}
} else {
if (!properties.contains(conf.key)) {
throw new DeltaIllegalArgumentException(
errorClass = "DELTA_MUST_SET_ALL_COORDINATED_COMMITS_CONFS_IN_COMMAND",
messageParameters = Array(command, conf.key))
}
}
}
}

/**
* Verifies that the property keys do not contain any ICT dependencies for Coordinated Commits.
*/
private def verifyNotContainsICTConfigurations(
propKeys: Seq[String], command: String, errorClass: String): Unit = {
ICT_TABLE_PROPERTY_KEYS.foreach { key =>
if (propKeys.contains(key)) {
throw new DeltaIllegalArgumentException(
errorClass,
messageParameters = Array(command))
}
}
}

/**
* Validates the Coordinated Commits configurations in explicit command overrides for
* `AlterTableSetPropertiesDeltaCommand`.
*
* If the table already has Coordinated Commits configurations present, then we do not allow
* users to override them via `ALTER TABLE t SET TBLPROPERTIES ...`. Users must downgrade the
* table and then upgrade it with the new Coordinated Commits configurations.
* If the table is a Coordinated Commits table or will be one via this ALTER command, then we
* do not allow users to disable any ICT properties that Coordinated Commits depends on.
*/
def validateConfigurationsForAlterTableSetPropertiesDeltaCommand(
existingConfs: Map[String, String],
propertyOverrides: Map[String, String]): Unit = {
val existingCoordinatedCommitsConfs = getExplicitCCConfigurations(existingConfs)
val coordinatedCommitsOverrides = getExplicitCCConfigurations(propertyOverrides)
if (coordinatedCommitsOverrides.nonEmpty) {
if (existingCoordinatedCommitsConfs.nonEmpty) {
throw new DeltaIllegalArgumentException(
"DELTA_CANNOT_OVERRIDE_COORDINATED_COMMITS_CONFS",
Array("ALTER"))
}
verifyNotContainsICTConfigurations(propertyOverrides.keys.toSeq, command = "ALTER",
errorClass = "DELTA_CANNOT_SET_COORDINATED_COMMITS_DEPENDENCIES")
verifyContainsOnlyCoordinatorNameAndConf(
coordinatedCommitsOverrides, command = "ALTER", fromDefault = false)
}
if (existingCoordinatedCommitsConfs.nonEmpty) {
verifyNotContainsICTConfigurations(propertyOverrides.keys.toSeq, command = "ALTER",
errorClass = "DELTA_CANNOT_MODIFY_COORDINATED_COMMITS_DEPENDENCIES")
}
}

/**
* Validates the configurations to unset for `AlterTableUnsetPropertiesDeltaCommand`.
*
* If the table already has Coordinated Commits configurations present, then we do not allow users
* to unset them via `ALTER TABLE t UNSET TBLPROPERTIES ...`. Users could only downgrade the table
* via `ALTER TABLE t DROP FEATURE ...`. We also do not allow users to unset any ICT properties
* that Coordinated Commits depends on.
*/
def validateConfigurationsForAlterTableUnsetPropertiesDeltaCommand(
existingConfs: Map[String, String],
propKeysToUnset: Seq[String]): Unit = {
// If the table does not have any Coordinated Commits configurations, then we do not check the
// properties to unset. This is because unsetting non-existent entries would either be caught
// earlier (without `IF EXISTS`) or simply be a no-op (with `IF EXISTS`). Thus, we ignore them
// instead of throwing an exception.
if (getExplicitCCConfigurations(existingConfs).nonEmpty) {
if (propKeysToUnset.exists(TABLE_PROPERTY_KEYS.contains)) {
throw new DeltaIllegalArgumentException(
"DELTA_CANNOT_UNSET_COORDINATED_COMMITS_CONFS",
Array.empty)
}
verifyNotContainsICTConfigurations(propKeysToUnset, command = "ALTER",
errorClass = "DELTA_CANNOT_MODIFY_COORDINATED_COMMITS_DEPENDENCIES")
}
}

/**
* Validates the Coordinated Commits configurations in explicit command overrides and default
* SparkSession properties for `CreateDeltaTableCommand`.
* See `validateConfigurationsForCreateDeltaTableCommandImpl` for details.
*/
def validateConfigurationsForCreateDeltaTableCommand(
spark: SparkSession,
tableExists: Boolean,
query: Option[LogicalPlan],
catalogTableProperties: Map[String, String]): Unit = {
val (command, propertyOverrides) = query match {
// For CLONE, we cannot use the properties from the catalog table, because they are already
// the result of merging the source table properties with the overrides, but we do not
// consider the source table properties for Coordinated Commits.
case Some(cmd: CloneTableCommand) =>
(if (tableExists) "REPLACE with CLONE" else "CREATE with CLONE",
cmd.tablePropertyOverrides)
case _ => (if (tableExists) "REPLACE" else "CREATE", catalogTableProperties)
}
validateConfigurationsForCreateDeltaTableCommandImpl(
spark, propertyOverrides, tableExists, command)
}

/**
* Validates the Coordinated Commits configurations for the table.
* - If the table already exists, the explicit command property overrides must not contain any
* Coordinated Commits configurations.
* - If the table does not exist, the explicit command property overrides must contain exactly
* the Coordinator Name and Coordinator Conf, and no Table Conf. Default configurations are
* checked similarly if none of the three properties is present in explicit overrides.
*/
private[delta] def validateConfigurationsForCreateDeltaTableCommandImpl(
spark: SparkSession,
propertyOverrides: Map[String, String],
tableExists: Boolean,
command: String): Unit = {
val coordinatedCommitsConfs = getExplicitCCConfigurations(propertyOverrides)
if (tableExists) {
if (coordinatedCommitsConfs.nonEmpty) {
throw new DeltaIllegalArgumentException(
"DELTA_CANNOT_OVERRIDE_COORDINATED_COMMITS_CONFS",
Array(command))
}
} else {
if (coordinatedCommitsConfs.nonEmpty) {
verifyContainsOnlyCoordinatorNameAndConf(
coordinatedCommitsConfs, command, fromDefault = false)
} else {
val defaultCoordinatedCommitsConfs = getDefaultCCConfigurations(
spark, withDefaultKey = true)
if (defaultCoordinatedCommitsConfs.nonEmpty) {
verifyContainsOnlyCoordinatorNameAndConf(
defaultCoordinatedCommitsConfs, command, fromDefault = true)
}
}
}
}

/**
* Converts a given Spark [[CatalystTableIdentifier]] to Coordinated Commits [[TableIdentifier]]
*/
Expand Down
Loading
Loading