diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index 832e2edd389..fe61f717689 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -435,25 +435,11 @@ trait OptimisticTransactionImpl extends TransactionHelper val shouldUnsetCatalogOwnedConf = isActiveReplaceCommand && CatalogOwnedTableUtils.defaultCatalogOwnedEnabled(spark) val conf = if (shouldUnsetCatalogOwnedConf) { - // Unset default CatalogOwned enablement iff: - // 0. `isCreatingNewTable` indicates that this either is a REPLACE or CREATE command. - // 1. `readVersion != 1` indicates the table already exists. - // - 0) and 1) suggest that this is an active REPLACE command. - // 2. Default CC enablement is set in the spark conf. - // This prevents any unintended modifications to the `newProtocol`. - // E.g., [[CatalogOwnedTableFeature]] and its dependent features - // [[InCommitTimestampTableFeature]] & [[VacuumProtocolCheckTableFeature]]. - // - // Note that this does *not* affect global spark conf state as we are modifying - // the copy of `spark.sessionState.conf`. Thus, `defaultCatalogOwnedFeatureEnabledKey` - // will remain unchanged for any concurrent operations that use the same SparkSession. + // Isolate the spark conf used by [[DeltaConfigs.mergeGlobalConfigs]]. + val clonedConf = spark.sessionState.conf.clone() val defaultCatalogOwnedFeatureEnabledKey = TableFeatureProtocolUtils.defaultPropertyKey(CatalogOwnedTableFeature) - // Isolate the spark conf to be used in the subsequent [[DeltaConfigs.mergeGlobalConfigs]] - // by cloning the existing configuration. - // Note: [[SQLConf.clone]] is already atomic so no extra synchronization is needed. - val clonedConf = spark.sessionState.conf.clone() - // Unset default CC conf on the cloned spark conf. + // Default CatalogOwned enablement is ignored during REPLACE. clonedConf.unsetConf(defaultCatalogOwnedFeatureEnabledKey) clonedConf } else { @@ -955,8 +941,6 @@ trait OptimisticTransactionImpl extends TransactionHelper * configurations is finalized. */ def updateMetadataForNewTableInReplace(metadata: Metadata): Unit = { - assert(CoordinatedCommitsUtils.getExplicitCCConfigurations(metadata.configuration).isEmpty, - "Command-specified Coordinated Commits configurations should have been blocked earlier.") assert(!metadata.configuration.contains(UCCommitCoordinatorClient.UC_TABLE_ID_KEY), "Command-specified Catalog-Owned table UUID (ucTableId) should have been blocked earlier.") // Extract any existing ucTableId from the snapshot metadata. @@ -982,9 +966,7 @@ trait OptimisticTransactionImpl extends TransactionHelper } // Update the metadata. updateMetadataForNewTable(metadata) - // Now the `txn.metadata` contains all the command-specified properties and all the default - // properties. The latter might still contain Coordinated Commits configurations, so we need - // to remove them and retain the Coordinated Commits configurations from the existing table. + // Rebuild the final configuration from the new metadata and retained target-owned metadata. val newConfsWithoutCC = newMetadata.get.configuration -- CoordinatedCommitsUtils.TABLE_PROPERTY_KEYS val existingQoLConfsToRetain = existingQoLConfs.filterNot { case (key, _) => diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableBase.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableBase.scala index 364e3c21daf..0f1c2f005bf 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableBase.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableBase.scala @@ -339,30 +339,6 @@ abstract class CloneTableBase( } } - /** - * Priority of Coordinated Commits configurations: - * - When CLONE into a new table, explicit command specification takes precedence over default - * SparkSession configurations. - * - When CLONE into an existing table, use the existing table's configurations. - */ - private def determineCoordinatedCommitsConfigurations( - spark: SparkSession, - targetSnapshot: SnapshotDescriptor, - validatedOverrides: Map[String, String]): Map[String, String] = { - if (tableExists(targetSnapshot)) { - assert(validatedOverrides.isEmpty, - "Explicit overrides on Coordinated Commits configurations for existing tables" + - " are not supported, and should have been caught earlier.") - CoordinatedCommitsUtils.getExplicitCCConfigurations(targetSnapshot.metadata.configuration) - } else { - if (validatedOverrides.nonEmpty) { - validatedOverrides - } else { - CoordinatedCommitsUtils.getDefaultCCConfigurations(spark) - } - } - } - /** * Helper function to determine [[UCCommitCoordinatorClient.UC_TABLE_ID_KEY]] * for the target table. @@ -390,22 +366,12 @@ abstract class CloneTableBase( var metadata = prepareSourceMetadata(targetSnapshot, opName) val validatedConfigurations = DeltaConfigs.validateConfigurations(tablePropertyOverrides) - // Finalize Coordinated Commits configurations for the target - val coordinatedCommitsConfigurationOverrides = - CoordinatedCommitsUtils.getExplicitCCConfigurations(validatedConfigurations) - val validatedConfigurationsWithoutCoordinatedCommits = - validatedConfigurations -- coordinatedCommitsConfigurationOverrides.keys - val finalCoordinatedCommitsConfigurations = determineCoordinatedCommitsConfigurations( - spark, - targetSnapshot, - coordinatedCommitsConfigurationOverrides) - val finalCatalogOwnedMetadata = finalCoordinatedCommitsConfigurations ++ - determineCatalogOwnedUCTableId(targetSnapshot) - - // Merge source configuration, table property overrides and coordinated-commits configurations. + val finalCatalogOwnedMetadata = determineCatalogOwnedUCTableId(targetSnapshot) + + // Merge source configuration, table property overrides, and CatalogManaged metadata. metadata = metadata.copy(configuration = metadata.configuration ++ - validatedConfigurationsWithoutCoordinatedCommits ++ + validatedConfigurations ++ finalCatalogOwnedMetadata) verifyMetadataInvariants(targetSnapshot, metadata) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala index 70c0bdd4b97..e7f7fba13b6 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala @@ -140,8 +140,6 @@ case class CreateDeltaTableCommand( } val deltaLog = DeltaUtils.getDeltaLogFromTableOrPath( sparkSession, existingTableOpt, tableLocation, fileSystemOptions) - CoordinatedCommitsUtils.validateConfigurationsForCreateDeltaTableCommand( - sparkSession, deltaLog.tableExists, query, tableWithLocation.properties) CatalogOwnedTableUtils.validatePropertiesForCreateDeltaTableCommand( spark = sparkSession, tableExists = deltaLog.tableExists, diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala index 43b3dad183c..56ce7a8f2a3 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala @@ -154,9 +154,6 @@ case class AlterTableSetPropertiesDeltaCommand( true }.toMap - // For Coordinated Commits table validation - CoordinatedCommitsUtils.validateConfigurationsForAlterTableSetPropertiesDeltaCommand( - existingConfs = metadata.configuration, propertyOverrides = filteredConfs) // For Catalog Owned table validation CatalogOwnedTableUtils.validatePropertiesForAlterTableSetPropertiesDeltaCommand( txn.snapshot, propertyOverrides = filteredConfs) @@ -239,8 +236,6 @@ case class AlterTableUnsetPropertiesDeltaCommand( } if (!fromDropFeatureCommand) { - CoordinatedCommitsUtils.validateConfigurationsForAlterTableUnsetPropertiesDeltaCommand( - existingConfs = metadata.configuration, propKeysToUnset = normalizedKeys) CatalogOwnedTableUtils.validatePropertiesForAlterTableUnsetPropertiesDeltaCommand( txn.snapshot, propKeysToUnset = normalizedKeys) } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsUtils.scala b/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsUtils.scala index 0f41a183605..469fefc2ea8 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsUtils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsUtils.scala @@ -855,179 +855,6 @@ object CoordinatedCommitsUtils extends DeltaLogging { }.toMap } - /** - * Verifies that the properties contain exactly the Coordinator Name and Coordinator Conf. - * If `fromDefault` is true, then the properties have keys with the default prefix. - */ - private def verifyContainsOnlyCoordinatorNameAndConf( - properties: Map[String, String], - command: String, - fromDefault: Boolean): Unit = { - Seq(DeltaConfigs.COORDINATED_COMMITS_TABLE_CONF).foreach { conf => - if (fromDefault) { - if (properties.contains(conf.defaultTablePropertyKey)) { - throw new DeltaIllegalArgumentException( - errorClass = "DELTA_CONF_OVERRIDE_NOT_SUPPORTED_IN_SESSION", - messageParameters = Array( - command, conf.defaultTablePropertyKey, conf.defaultTablePropertyKey)) - } - } else { - if (properties.contains(conf.key)) { - throw new DeltaIllegalArgumentException( - errorClass = "DELTA_CONF_OVERRIDE_NOT_SUPPORTED_IN_COMMAND", - messageParameters = Array(command, conf.key)) - } - } - } - Seq( - DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME, - DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_CONF).foreach { conf => - if (fromDefault) { - if (!properties.contains(conf.defaultTablePropertyKey)) { - throw new DeltaIllegalArgumentException( - errorClass = "DELTA_MUST_SET_ALL_COORDINATED_COMMITS_CONFS_IN_SESSION", - messageParameters = Array(command, conf.defaultTablePropertyKey)) - } - } else { - if (!properties.contains(conf.key)) { - throw new DeltaIllegalArgumentException( - errorClass = "DELTA_MUST_SET_ALL_COORDINATED_COMMITS_CONFS_IN_COMMAND", - messageParameters = Array(command, conf.key)) - } - } - } - } - - /** - * Verifies that the property keys do not contain any ICT dependencies for Coordinated Commits. - */ - private def verifyNotContainsICTConfigurations( - propKeys: Seq[String], command: String, errorClass: String): Unit = { - ICT_TABLE_PROPERTY_KEYS.foreach { key => - if (propKeys.contains(key)) { - throw new DeltaIllegalArgumentException( - errorClass, - messageParameters = Array(command)) - } - } - } - - /** - * Validates the Coordinated Commits configurations in explicit command overrides for - * `AlterTableSetPropertiesDeltaCommand`. - * - * If the table already has Coordinated Commits configurations present, then we do not allow - * users to override them via `ALTER TABLE t SET TBLPROPERTIES ...`. Users must downgrade the - * table and then upgrade it with the new Coordinated Commits configurations. - * If the table is a Coordinated Commits table or will be one via this ALTER command, then we - * do not allow users to disable any ICT properties that Coordinated Commits depends on. - */ - def validateConfigurationsForAlterTableSetPropertiesDeltaCommand( - existingConfs: Map[String, String], - propertyOverrides: Map[String, String]): Unit = { - val existingCoordinatedCommitsConfs = getExplicitCCConfigurations(existingConfs) - val coordinatedCommitsOverrides = getExplicitCCConfigurations(propertyOverrides) - if (coordinatedCommitsOverrides.nonEmpty) { - if (existingCoordinatedCommitsConfs.nonEmpty) { - throw new DeltaIllegalArgumentException( - "DELTA_CANNOT_OVERRIDE_COORDINATED_COMMITS_CONFS", - Array("ALTER")) - } - verifyNotContainsICTConfigurations(propertyOverrides.keys.toSeq, command = "ALTER", - errorClass = "DELTA_CANNOT_SET_COORDINATED_COMMITS_DEPENDENCIES") - verifyContainsOnlyCoordinatorNameAndConf( - coordinatedCommitsOverrides, command = "ALTER", fromDefault = false) - } - if (existingCoordinatedCommitsConfs.nonEmpty) { - verifyNotContainsICTConfigurations(propertyOverrides.keys.toSeq, command = "ALTER", - errorClass = "DELTA_CANNOT_MODIFY_COORDINATED_COMMITS_DEPENDENCIES") - } - } - - /** - * Validates the configurations to unset for `AlterTableUnsetPropertiesDeltaCommand`. - * - * If the table already has Coordinated Commits configurations present, then we do not allow users - * to unset them via `ALTER TABLE t UNSET TBLPROPERTIES ...`. Users could only downgrade the table - * via `ALTER TABLE t DROP FEATURE ...`. We also do not allow users to unset any ICT properties - * that Coordinated Commits depends on. - */ - def validateConfigurationsForAlterTableUnsetPropertiesDeltaCommand( - existingConfs: Map[String, String], - propKeysToUnset: Seq[String]): Unit = { - // If the table does not have any Coordinated Commits configurations, then we do not check the - // properties to unset. This is because unsetting non-existent entries would either be caught - // earlier (without `IF EXISTS`) or simply be a no-op (with `IF EXISTS`). Thus, we ignore them - // instead of throwing an exception. - if (getExplicitCCConfigurations(existingConfs).nonEmpty) { - if (propKeysToUnset.exists(TABLE_PROPERTY_KEYS.contains)) { - throw new DeltaIllegalArgumentException( - "DELTA_CANNOT_UNSET_COORDINATED_COMMITS_CONFS", - Array.empty) - } - verifyNotContainsICTConfigurations(propKeysToUnset, command = "ALTER", - errorClass = "DELTA_CANNOT_MODIFY_COORDINATED_COMMITS_DEPENDENCIES") - } - } - - /** - * Validates the Coordinated Commits configurations in explicit command overrides and default - * SparkSession properties for `CreateDeltaTableCommand`. - * See `validateConfigurationsForCreateDeltaTableCommandImpl` for details. - */ - def validateConfigurationsForCreateDeltaTableCommand( - spark: SparkSession, - tableExists: Boolean, - query: Option[LogicalPlan], - catalogTableProperties: Map[String, String]): Unit = { - val (command, propertyOverrides) = query match { - // For CLONE, we cannot use the properties from the catalog table, because they are already - // the result of merging the source table properties with the overrides, but we do not - // consider the source table properties for Coordinated Commits. - case Some(cmd: CloneTableCommand) => - (if (tableExists) "REPLACE with CLONE" else "CREATE with CLONE", - cmd.tablePropertyOverrides) - case _ => (if (tableExists) "REPLACE" else "CREATE", catalogTableProperties) - } - validateConfigurationsForCreateDeltaTableCommandImpl( - spark, propertyOverrides, tableExists, command) - } - - /** - * Validates the Coordinated Commits configurations for the table. - * - If the table already exists, the explicit command property overrides must not contain any - * Coordinated Commits configurations. - * - If the table does not exist, the explicit command property overrides must contain exactly - * the Coordinator Name and Coordinator Conf, and no Table Conf. Default configurations are - * checked similarly if none of the three properties is present in explicit overrides. - */ - private[delta] def validateConfigurationsForCreateDeltaTableCommandImpl( - spark: SparkSession, - propertyOverrides: Map[String, String], - tableExists: Boolean, - command: String): Unit = { - val coordinatedCommitsConfs = getExplicitCCConfigurations(propertyOverrides) - if (tableExists) { - if (coordinatedCommitsConfs.nonEmpty) { - throw new DeltaIllegalArgumentException( - "DELTA_CANNOT_OVERRIDE_COORDINATED_COMMITS_CONFS", - Array(command)) - } - } else { - if (coordinatedCommitsConfs.nonEmpty) { - verifyContainsOnlyCoordinatorNameAndConf( - coordinatedCommitsConfs, command, fromDefault = false) - } else { - val defaultCoordinatedCommitsConfs = getDefaultCCConfigurations( - spark, withDefaultKey = true) - if (defaultCoordinatedCommitsConfs.nonEmpty) { - verifyContainsOnlyCoordinatorNameAndConf( - defaultCoordinatedCommitsConfs, command, fromDefault = true) - } - } - } - } - /** * Converts a given Spark [[CatalystTableIdentifier]] to Coordinated Commits [[TableIdentifier]] */ diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsPropertySuiteBase.scala b/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsPropertySuiteBase.scala deleted file mode 100644 index 63373411343..00000000000 --- a/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsPropertySuiteBase.scala +++ /dev/null @@ -1,473 +0,0 @@ -/* - * Copyright (2021) The Delta Lake Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.delta.coordinatedcommits - -import org.apache.spark.sql.delta.{DeltaIllegalArgumentException, DeltaLog} -import org.apache.spark.sql.delta.DeltaConfigs.{COORDINATED_COMMITS_COORDINATOR_CONF, COORDINATED_COMMITS_COORDINATOR_NAME, COORDINATED_COMMITS_TABLE_CONF} -import org.apache.spark.sql.delta.test.DeltaSQLCommandTest -import org.apache.spark.sql.delta.util.JsonUtils -import io.delta.storage.commit.CommitCoordinatorClient - -import org.apache.spark.sql.{QueryTest, SparkSession} -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.test.SharedSparkSession - -trait CoordinatedCommitsPropertySuiteBase extends QueryTest - with SharedSparkSession - with DeltaSQLCommandTest - with CoordinatedCommitsTestUtils { - - private def getRandomTableName: String = scala.util.Random.alphanumeric.take(10).mkString("") - - override def beforeEach(): Unit = { - super.beforeEach() - target = getRandomTableName - source = getRandomTableName - CommitCoordinatorProvider.clearNonDefaultBuilders() - CommitCoordinatorProvider.registerBuilder(CommitCoordinatorBuilder1()) - CommitCoordinatorProvider.registerBuilder(CommitCoordinatorBuilder2()) - } - - protected val command: String - - protected val cc1: String = "commit-coordinator-1" - - private case class CommitCoordinatorBuilder1() extends CommitCoordinatorBuilder { - private val commitCoordinator = new InMemoryCommitCoordinator(batchSize = 1000L) - override def getName: String = cc1 - override def build(spark: SparkSession, conf: Map[String, String]): CommitCoordinatorClient = - commitCoordinator - } - - protected val cc2: String = "commit-coordinator-2" - - private case class CommitCoordinatorBuilder2() extends CommitCoordinatorBuilder { - private val commitCoordinator = new InMemoryCommitCoordinator(batchSize = 1000L) - override def getName: String = cc2 - override def build(spark: SparkSession, conf: Map[String, String]): CommitCoordinatorClient = - commitCoordinator - } - - protected var target: String = getRandomTableName - protected var source: String = getRandomTableName - - protected val coordinatorNameKey: String = COORDINATED_COMMITS_COORDINATOR_NAME.key - protected val coordinatorConfKey: String = COORDINATED_COMMITS_COORDINATOR_CONF.key - protected val tableConfKey: String = COORDINATED_COMMITS_TABLE_CONF.key - - protected val coordinatorNameDefaultKey: String = - COORDINATED_COMMITS_COORDINATOR_NAME.defaultTablePropertyKey - protected val coordinatorConfDefaultKey: String = - COORDINATED_COMMITS_COORDINATOR_CONF.defaultTablePropertyKey - protected val tableConfDefaultKey: String = - COORDINATED_COMMITS_TABLE_CONF.defaultTablePropertyKey - - protected val randomCoordinatorConf: String = - JsonUtils.toJson(Map("randomCoordinatorConf" -> "randomCoordinatorConfValue")) - protected val randomTableConf: String = - JsonUtils.toJson(Map("randomTableConf" -> "randomTableConfValue")) - - def getCCPropertiesClause(properties: Seq[(String, String)]): String = { - if (properties.nonEmpty) { - " TBLPROPERTIES (" + - properties.map { case (k, v) => s"'$k' = '$v'" }.mkString(", ") + - ")" - } else { - "" - } - } - - def verifyCommitCoordinator(table: String, expectedCoordinator: Option[String]): Unit = { - assert(DeltaLog.forTable(spark, TableIdentifier(table)) - .update().metadata.coordinatedCommitsCoordinatorName == expectedCoordinator) - } - - def testImpl( - commandConfs: Seq[(String, String)] = Seq(), - defaultConfs: Seq[(String, String)] = Seq(), - targetConfs: Seq[(String, String)] = Seq(), - sourceConfs: Seq[(String, String)] = Seq(), - expectedCoordinator: Option[String] = None): Unit -} - -trait CoordinatedCommitsPropertyCreateTableSuiteBase extends CoordinatedCommitsPropertySuiteBase { - - test("Commit coordinators are picked from command specification.") { - testImpl( - commandConfs = Seq( - coordinatorNameKey -> cc1, - coordinatorConfKey -> randomCoordinatorConf), - expectedCoordinator = Some(cc1)) - } - - test("Commit coordinators are picked from default configurations if not specified in command.") { - testImpl( - defaultConfs = Seq( - coordinatorNameDefaultKey -> cc1, - coordinatorConfDefaultKey -> randomCoordinatorConf), - expectedCoordinator = Some(cc1)) - } - - test("Command-specified commit coordinators take precedence over default configurations.") { - testImpl( - commandConfs = Seq( - coordinatorNameKey -> cc1, - coordinatorConfKey -> randomCoordinatorConf), - defaultConfs = Seq( - coordinatorNameDefaultKey -> cc2, - coordinatorConfDefaultKey -> randomCoordinatorConf), - expectedCoordinator = Some(cc1)) - } - - test("Illegal command-specified property combinations throw an exception.") { - var e = intercept[DeltaIllegalArgumentException] { - testImpl( - commandConfs = Seq(coordinatorNameKey -> cc1)) - } - checkError( - exception = e, - "DELTA_MUST_SET_ALL_COORDINATED_COMMITS_CONFS_IN_COMMAND", - sqlState = "42616", - parameters = Map("command" -> command, "configuration" -> coordinatorConfKey)) - - e = intercept[DeltaIllegalArgumentException] { - testImpl( - commandConfs = Seq(coordinatorConfKey -> randomCoordinatorConf)) - } - checkError( - exception = e, - "DELTA_MUST_SET_ALL_COORDINATED_COMMITS_CONFS_IN_COMMAND", - sqlState = "42616", - parameters = Map("command" -> command, "configuration" -> coordinatorNameKey)) - - e = intercept[DeltaIllegalArgumentException] { - testImpl( - commandConfs = Seq( - coordinatorNameKey -> cc1, - coordinatorConfKey -> randomCoordinatorConf, - tableConfKey -> randomTableConf)) - } - checkError( - exception = e, - "DELTA_CONF_OVERRIDE_NOT_SUPPORTED_IN_COMMAND", - sqlState = "42616", - parameters = Map("command" -> command, "configuration" -> tableConfKey)) - } - - test("Illegal default property combinations throw an exception if none specified in command.") { - var e = intercept[DeltaIllegalArgumentException] { - testImpl( - defaultConfs = Seq(coordinatorNameDefaultKey -> cc1)) - } - checkError( - exception = e, - "DELTA_MUST_SET_ALL_COORDINATED_COMMITS_CONFS_IN_SESSION", - sqlState = "42616", - parameters = Map("command" -> command, "configuration" -> coordinatorConfDefaultKey)) - - e = intercept[DeltaIllegalArgumentException] { - testImpl( - defaultConfs = Seq(coordinatorConfDefaultKey -> randomCoordinatorConf)) - } - checkError( - exception = e, - "DELTA_MUST_SET_ALL_COORDINATED_COMMITS_CONFS_IN_SESSION", - sqlState = "42616", - parameters = Map("command" -> command, "configuration" -> coordinatorNameDefaultKey)) - - e = intercept[DeltaIllegalArgumentException] { - testImpl( - defaultConfs = Seq( - coordinatorNameDefaultKey -> cc1, - coordinatorConfDefaultKey -> randomCoordinatorConf, - tableConfDefaultKey -> randomTableConf)) - } - checkError( - exception = e, - "DELTA_CONF_OVERRIDE_NOT_SUPPORTED_IN_SESSION", - sqlState = "42616", - parameters = Map("command" -> command, "configuration" -> tableConfDefaultKey)) - } - - test("Illegal default property combinations are ignored if command specifications are valid.") { - testImpl( - commandConfs = Seq( - coordinatorNameKey -> cc1, - coordinatorConfKey -> randomCoordinatorConf), - defaultConfs = Seq( - coordinatorNameDefaultKey -> cc2, - coordinatorConfDefaultKey -> randomCoordinatorConf, - tableConfDefaultKey -> randomTableConf), - expectedCoordinator = Some(cc1)) - } - test("Illegal command-specified property combinations throw an exception even if default " + - "configurations are valid.") { - val e = intercept[DeltaIllegalArgumentException] { - testImpl( - commandConfs = Seq( - coordinatorNameKey -> cc1, - coordinatorConfKey -> randomCoordinatorConf, - tableConfKey -> randomTableConf), - defaultConfs = Seq( - coordinatorNameDefaultKey -> cc2, - coordinatorConfDefaultKey -> randomCoordinatorConf)) - } - checkError( - exception = e, - "DELTA_CONF_OVERRIDE_NOT_SUPPORTED_IN_COMMAND", - sqlState = "42616", - parameters = Map("command" -> command, "configuration" -> tableConfKey)) - } -} - -class CoordinatedCommitsPropertyCreateTableSuite - extends CoordinatedCommitsPropertyCreateTableSuiteBase { - - override protected val command: String = "CREATE" - - override def testImpl( - commandConfs: Seq[(String, String)], - defaultConfs: Seq[(String, String)], - targetConfs: Seq[(String, String)], - sourceConfs: Seq[(String, String)], - expectedCoordinator: Option[String]): Unit = { - withTable(target) { - withSQLConf(defaultConfs: _*) { - sql(s"CREATE TABLE $target (id LONG) USING delta" + getCCPropertiesClause(commandConfs)) - } - verifyCommitCoordinator(target, expectedCoordinator) - } - } -} - -class CoordinatedCommitsPropertyCreateTableAsSelectSuite - extends CoordinatedCommitsPropertyCreateTableSuiteBase { - - override protected val command: String = "CREATE" - - override def testImpl( - commandConfs: Seq[(String, String)], - defaultConfs: Seq[(String, String)], - targetConfs: Seq[(String, String)], - sourceConfs: Seq[(String, String)], - expectedCoordinator: Option[String]): Unit = { - withTable(target, source) { - sql(s"CREATE TABLE $source (id LONG) USING delta") - sql(s"INSERT INTO $source VALUES (1)") - withSQLConf(defaultConfs: _*) { - sql(s"CREATE TABLE $target USING delta" + - getCCPropertiesClause(commandConfs) + s" AS SELECT * FROM $source") - } - verifyCommitCoordinator(target, expectedCoordinator) - } - } -} - -class CoordinatedCommitsPropertyCreateTableWithShallowCloneSuite - extends CoordinatedCommitsPropertyCreateTableSuiteBase { - - override protected val command: String = "CREATE with CLONE" - - override def testImpl( - commandConfs: Seq[(String, String)] = Seq(), - defaultConfs: Seq[(String, String)] = Seq(), - targetConfs: Seq[(String, String)] = Seq(), - sourceConfs: Seq[(String, String)] = Seq(), - expectedCoordinator: Option[String] = None): Unit = { - withTable(target, source) { - sql(s"CREATE TABLE $source (id LONG) USING delta" + getCCPropertiesClause(sourceConfs)) - withSQLConf(defaultConfs: _*) { - sql(s"CREATE TABLE $target SHALLOW CLONE $source" + getCCPropertiesClause(commandConfs)) - } - verifyCommitCoordinator(target, expectedCoordinator) - } - } - - test("Source table's commit coordinator should never be copied to the target table: no commit " + - "coordinators are specified") { - testImpl( - sourceConfs = Seq( - coordinatorNameKey -> cc1, - coordinatorConfKey -> randomCoordinatorConf), - expectedCoordinator = None) - } - - test("Source table's commit coordinator should never be copied to the target table: command " + - "specifies a commit coordinator") { - testImpl( - commandConfs = Seq( - coordinatorNameKey -> cc1, - coordinatorConfKey -> randomCoordinatorConf), - sourceConfs = Seq( - coordinatorNameKey -> cc2, - coordinatorConfKey -> randomCoordinatorConf), - expectedCoordinator = Some(cc1)) - } - - test("Source table's commit coordinator should never be copied to the target table: default " + - "configurations specify a commit coordinator") { - testImpl( - defaultConfs = Seq( - coordinatorNameDefaultKey -> cc1, - coordinatorConfDefaultKey -> randomCoordinatorConf), - sourceConfs = Seq( - coordinatorNameKey -> cc2, - coordinatorConfKey -> randomCoordinatorConf), - expectedCoordinator = Some(cc1)) - } -} - -trait CoordinatedCommitsPropertyReplaceTableSuiteBase extends CoordinatedCommitsPropertySuiteBase { - - test("Any command-specified Coordinated Commits overrides throw an exception") { - var e = intercept[DeltaIllegalArgumentException] { - testImpl( - commandConfs = Seq( - coordinatorNameKey -> cc1, - coordinatorConfKey -> randomCoordinatorConf)) - } - checkError( - exception = e, - "DELTA_CANNOT_OVERRIDE_COORDINATED_COMMITS_CONFS", - sqlState = "42616", - parameters = Map("Command" -> command)) - - e = intercept[DeltaIllegalArgumentException] { - testImpl( - commandConfs = Seq( - coordinatorNameKey -> cc1, - coordinatorConfKey -> randomCoordinatorConf, - tableConfKey -> randomTableConf)) - } - checkError( - exception = e, - "DELTA_CANNOT_OVERRIDE_COORDINATED_COMMITS_CONFS", - sqlState = "42616", - parameters = Map("Command" -> command)) - } - - test("Default Coordinated Commits configurations from SparkSession are ignored") { - testImpl( - defaultConfs = Seq( - coordinatorNameDefaultKey -> cc1, - coordinatorConfDefaultKey -> randomCoordinatorConf), - expectedCoordinator = None) - - testImpl( - defaultConfs = Seq( - coordinatorNameDefaultKey -> cc1, - coordinatorConfDefaultKey -> randomCoordinatorConf, - tableConfDefaultKey -> randomTableConf), - expectedCoordinator = None) - } - - test("Existing Coordinated Commits configurations from the target table are retained.") { - testImpl( - targetConfs = Seq( - coordinatorNameKey -> cc1, - coordinatorConfKey -> randomCoordinatorConf), - expectedCoordinator = Some(cc1)) - } -} - -class CoordinatedCommitsPropertyReplaceTableSuite - extends CoordinatedCommitsPropertyReplaceTableSuiteBase { - - override protected val command: String = "REPLACE" - - override def testImpl( - commandConfs: Seq[(String, String)], - defaultConfs: Seq[(String, String)], - targetConfs: Seq[(String, String)], - sourceConfs: Seq[(String, String)], - expectedCoordinator: Option[String]): Unit = { - withTable(target) { - sql(s"CREATE TABLE $target (id LONG) USING delta" + getCCPropertiesClause(targetConfs)) - withSQLConf(defaultConfs: _*) { - sql(s"REPLACE TABLE $target (id STRING) USING delta" + getCCPropertiesClause(commandConfs)) - } - verifyCommitCoordinator(target, expectedCoordinator) - } - } -} - -class CoordinatedCommitsPropertyReplaceTableAsSelectSuite - extends CoordinatedCommitsPropertyReplaceTableSuiteBase { - - override protected val command: String = "REPLACE" - - override def testImpl( - commandConfs: Seq[(String, String)], - defaultConfs: Seq[(String, String)], - targetConfs: Seq[(String, String)], - sourceConfs: Seq[(String, String)], - expectedCoordinator: Option[String]): Unit = { - withTable(target, source) { - sql(s"CREATE TABLE $source (id LONG) USING delta") - sql(s"INSERT INTO $source VALUES (1)") - sql(s"CREATE TABLE $target (id LONG) USING delta" + getCCPropertiesClause(targetConfs)) - withSQLConf(defaultConfs: _*) { - sql(s"REPLACE TABLE $target USING delta" + - getCCPropertiesClause(commandConfs) + s" AS SELECT * FROM $source") - } - verifyCommitCoordinator(target, expectedCoordinator) - } - } -} - -class CoordinatedCommitsPropertyReplaceTableWithShallowCloneSuite - extends CoordinatedCommitsPropertyReplaceTableSuiteBase { - - override protected val command: String = "REPLACE with CLONE" - - override def testImpl( - commandConfs: Seq[(String, String)] = Seq(), - defaultConfs: Seq[(String, String)] = Seq(), - targetConfs: Seq[(String, String)] = Seq(), - sourceConfs: Seq[(String, String)] = Seq(), - expectedCoordinator: Option[String] = None): Unit = { - withTable(target, source) { - sql(s"CREATE TABLE $target (id LONG) USING delta" + getCCPropertiesClause(targetConfs)) - sql(s"CREATE TABLE $source (id LONG) USING delta" + getCCPropertiesClause(sourceConfs)) - withSQLConf(defaultConfs: _*) { - sql(s"REPLACE TABLE $target SHALLOW CLONE $source" + getCCPropertiesClause(commandConfs)) - } - verifyCommitCoordinator(target, expectedCoordinator) - } - } - - test("Source table's commit coordinator should never be copied to the target table: target " + - "table does not have any coordinator") { - testImpl( - sourceConfs = Seq( - coordinatorNameKey -> cc1, - coordinatorConfKey -> randomCoordinatorConf), - expectedCoordinator = None) - } - - test("Source table's commit coordinator should never be copied to the target table: target " + - "table has a coordinator") { - testImpl( - targetConfs = Seq( - coordinatorNameKey -> cc1, - coordinatorConfKey -> randomCoordinatorConf), - sourceConfs = Seq( - coordinatorNameKey -> cc2, - coordinatorConfKey -> randomCoordinatorConf), - expectedCoordinator = Some(cc1)) - } -} diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsSuite.scala index e202708add3..fc3831d9484 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsSuite.scala @@ -21,13 +21,11 @@ import java.lang.{Long => JLong} import java.util.{Iterator => JIterator, Optional} import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer import com.databricks.spark.util.Log4jUsageLogger import com.databricks.spark.util.UsageRecord import org.apache.spark.sql.delta.{CatalogOwnedTableFeature, CheckpointPolicy, CommitCoordinatorGetCommitsFailedException, CommitStats, CoordinatedCommitsStats, CoordinatedCommitsTableFeature, DeltaIllegalArgumentException, DeltaOperations, DeltaTestUtilsBase, DeltaUnsupportedOperationException, V2CheckpointTableFeature} import org.apache.spark.sql.delta.CoordinatedCommitType._ -import org.apache.spark.sql.delta.DeltaConfigs import org.apache.spark.sql.delta.DeltaConfigs.{CHECKPOINT_INTERVAL, CHECKPOINT_POLICY, COORDINATED_COMMITS_COORDINATOR_CONF, COORDINATED_COMMITS_COORDINATOR_NAME, COORDINATED_COMMITS_TABLE_CONF, IN_COMMIT_TIMESTAMPS_ENABLED} import org.apache.spark.sql.delta.DeltaLog import org.apache.spark.sql.delta.DeltaTestUtils.createTestAddFile @@ -90,47 +88,6 @@ class CoordinatedCommitsSuite assert(JCoordinatedCommitsUtils.getCoordinatorConf(m4) === Map.empty.asJava) } - test("During ALTER, overriding Coordinated Commits configurations throws an exception.") { - registerBuilder(TrackingInMemoryCommitCoordinatorBuilder(1)) - registerBuilder(InMemoryCommitCoordinatorBuilder(1)) - - withTempDir { tempDir => - sql(s"CREATE TABLE delta.`${tempDir.getAbsolutePath}` (id LONG) USING delta TBLPROPERTIES " + - s"('${COORDINATED_COMMITS_COORDINATOR_NAME.key}' = 'tracking-in-memory', " + - s"'${COORDINATED_COMMITS_COORDINATOR_CONF.key}' = '${JsonUtils.toJson(Map())}')") - val e = interceptWithUnwrapping[DeltaIllegalArgumentException] { - sql(s"ALTER TABLE delta.`${tempDir.getAbsolutePath}` SET TBLPROPERTIES " + - s"('${COORDINATED_COMMITS_COORDINATOR_NAME.key}' = 'in-memory', " + - s"'${COORDINATED_COMMITS_COORDINATOR_CONF.key}' = '${JsonUtils.toJson(Map())}')") - } - checkError( - e, - "DELTA_CANNOT_OVERRIDE_COORDINATED_COMMITS_CONFS", - sqlState = "42616", - parameters = Map("Command" -> "ALTER")) - } - } - - test("During ALTER, unsetting Coordinated Commits configurations throws an exception.") { - registerBuilder(TrackingInMemoryCommitCoordinatorBuilder(1)) - - withTempDir { tempDir => - sql(s"CREATE TABLE delta.`${tempDir.getAbsolutePath}` (id LONG) USING delta TBLPROPERTIES " + - s"('${COORDINATED_COMMITS_COORDINATOR_NAME.key}' = 'tracking-in-memory', " + - s"'${COORDINATED_COMMITS_COORDINATOR_CONF.key}' = '${JsonUtils.toJson(Map())}')") - val e = interceptWithUnwrapping[DeltaIllegalArgumentException] { - sql(s"ALTER TABLE delta.`${tempDir.getAbsolutePath}` UNSET TBLPROPERTIES " + - s"('${COORDINATED_COMMITS_COORDINATOR_NAME.key}', " + - s"'${COORDINATED_COMMITS_COORDINATOR_CONF.key}')") - } - checkError( - e, - "DELTA_CANNOT_UNSET_COORDINATED_COMMITS_CONFS", - sqlState = "42616", - parameters = Map[String, String]()) - } - } - test("tableConf returned from registration API is recorded in deltaLog and passed " + "to CommitCoordinatorClient in future for all the APIs") { val tableConf = Map("tableID" -> "random-u-u-i-d", "1" -> "2").asJava @@ -484,7 +441,6 @@ abstract class CommitCoordinatorSuiteBase Seq(3).toDF.write.format("delta").mode("append").save(tablePath) // version 2 DeltaLog.clearCache() commitCoordinatorClient.numGetCommitsCalled.set(0) - import testImplicits._ val result1 = sql(s"SELECT * FROM delta.`$tablePath`").collect() assert(result1.length === 2 && result1.toSet === Set(Row(2), Row(3))) assert(commitCoordinatorClient.numGetCommitsCalled.get === 2) @@ -1673,11 +1629,14 @@ abstract class CommitCoordinatorSuiteBase // Test coordinated-commits with DeltaLog.getChangeLogFile API ENDS // ///////////////////////////////////////////////////////////////////////////////////////////// - test("During ALTER, overriding ICT configurations on (potential) Coordinated Commits " + - "or Catalog Owned tables throws an exception.") { + test("During ALTER, overriding ICT configurations on CatalogManaged tables throws an " + + "exception.") { + if (!isCatalogOwnedTest) { + cancel("Legacy CCv1 DDL dependency validation is removed.") + } registerBuilder(TrackingInMemoryCommitCoordinatorBuilder(1)) - // For a table that had Coordinated Commits enabled before the ALTER command. + // For a CatalogManaged table before the ALTER command. withTempDir { tempDir => sql(s"CREATE TABLE delta.`${tempDir.getAbsolutePath}` (id LONG) USING delta TBLPROPERTIES " + propertiesString) @@ -1685,45 +1644,19 @@ abstract class CommitCoordinatorSuiteBase sql(s"ALTER TABLE delta.`${tempDir.getAbsolutePath}` SET TBLPROPERTIES " + s"('${IN_COMMIT_TIMESTAMPS_ENABLED.key}' = 'false')") } - if (isCatalogOwnedTest) { - checkError( - e, - "DELTA_CANNOT_MODIFY_CATALOG_MANAGED_DEPENDENCIES", - sqlState = "42616", - parameters = Map[String, String]()) - } else { - checkError( + checkError( e, - "DELTA_CANNOT_MODIFY_COORDINATED_COMMITS_DEPENDENCIES", + "DELTA_CANNOT_MODIFY_CATALOG_MANAGED_DEPENDENCIES", sqlState = "42616", - parameters = Map("Command" -> "ALTER")) - } - } - - if (isCatalogOwnedTest) { - cancel("Upgrade is not yet supported for catalog owned tables") - } - // For a table that is about to enable Coordinated Commits during the same ALTER command. - withoutDefaultCCTableFeature { - withTempDir { tempDir => - sql(s"CREATE TABLE delta.`${tempDir.getAbsolutePath}` (id LONG) USING delta") - val e = interceptWithUnwrapping[DeltaIllegalArgumentException] { - sql(s"ALTER TABLE delta.`${tempDir.getAbsolutePath}` SET TBLPROPERTIES " + - s"('${COORDINATED_COMMITS_COORDINATOR_NAME.key}' = 'tracking-in-memory', " + - s"'${COORDINATED_COMMITS_COORDINATOR_CONF.key}' = '${JsonUtils.toJson(Map())}', " + - s"'${IN_COMMIT_TIMESTAMPS_ENABLED.key}' = 'false')") - } - checkError( - e, - "DELTA_CANNOT_SET_COORDINATED_COMMITS_DEPENDENCIES", - sqlState = "42616", - parameters = Map("Command" -> "ALTER")) - } + parameters = Map[String, String]()) } } - test("During ALTER, unsetting ICT configurations on Coordinated Commits tables throws an " + + test("During ALTER, unsetting ICT configurations on CatalogManaged tables throws an " + "exception.") { + if (!isCatalogOwnedTest) { + cancel("Legacy CCv1 DDL dependency validation is removed.") + } registerBuilder(TrackingInMemoryCommitCoordinatorBuilder(1)) withTempDir { tempDir => @@ -1733,50 +1666,11 @@ abstract class CommitCoordinatorSuiteBase sql(s"ALTER TABLE delta.`${tempDir.getAbsolutePath}` UNSET TBLPROPERTIES " + s"('${IN_COMMIT_TIMESTAMPS_ENABLED.key}')") } - if (isCatalogOwnedTest) { - checkError( - e, - "DELTA_CANNOT_MODIFY_CATALOG_MANAGED_DEPENDENCIES", - sqlState = "42616", - parameters = Map[String, String]()) - } else { - checkError( - e, - "DELTA_CANNOT_MODIFY_COORDINATED_COMMITS_DEPENDENCIES", - sqlState = "42616", - parameters = Map("Command" -> "ALTER")) - } - } - } - - test("During REPLACE, for non-CC tables, default CC configurations are ignored, but default " + - "ICT confs are retained, and existing ICT confs are discarded") { - // Non-CC table, REPLACE with default CC and ICT confs => Non-CC, but with ICT confs. - withTempDir { tempDir => - withoutDefaultCCTableFeature { - sql(s"CREATE TABLE delta.`${tempDir.getAbsolutePath}` (id LONG) USING delta") - } - withSQLConf(IN_COMMIT_TIMESTAMPS_ENABLED.defaultTablePropertyKey -> "true") { - sql(s"REPLACE TABLE delta.`${tempDir.getAbsolutePath}` (id STRING) USING delta") - } - assert(DeltaLog.forTable(spark, tempDir).snapshot.tableCommitCoordinatorClientOpt.isEmpty) - assert(!DeltaLog.forTable(spark, tempDir).snapshot.isCatalogOwned) - assert(DeltaLog.forTable(spark, tempDir).snapshot.metadata.configuration.contains( - IN_COMMIT_TIMESTAMPS_ENABLED.key)) - } - - // Non-CC table with ICT confs, REPLACE with only default CC confs => Non-CC, also no ICT confs. - withTempDir { tempDir => - withoutDefaultCCTableFeature { - withSQLConf(IN_COMMIT_TIMESTAMPS_ENABLED.defaultTablePropertyKey -> "true") { - sql(s"CREATE TABLE delta.`${tempDir.getAbsolutePath}` (id LONG) USING delta") - } - } - sql(s"REPLACE TABLE delta.`${tempDir.getAbsolutePath}` (id STRING) USING delta") - val snapshot = DeltaLog.forTable(spark, tempDir).unsafeVolatileSnapshot - assert(snapshot.tableCommitCoordinatorClientOpt.isEmpty) - assert(!snapshot.isCatalogOwned) - assert(!snapshot.metadata.configuration.contains(IN_COMMIT_TIMESTAMPS_ENABLED.key)) + checkError( + e, + "DELTA_CANNOT_MODIFY_CATALOG_MANAGED_DEPENDENCIES", + sqlState = "42616", + parameters = Map[String, String]()) } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsUtilsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsUtilsSuite.scala deleted file mode 100644 index 356387f54f5..00000000000 --- a/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsUtilsSuite.scala +++ /dev/null @@ -1,381 +0,0 @@ -/* - * Copyright (2021) The Delta Lake Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.delta.coordinatedcommits - -import scala.jdk.CollectionConverters._ - -import org.apache.spark.sql.delta.DeltaConfigs.{COORDINATED_COMMITS_COORDINATOR_CONF, COORDINATED_COMMITS_COORDINATOR_NAME, COORDINATED_COMMITS_TABLE_CONF} -import org.apache.spark.sql.delta.DeltaIllegalArgumentException -import org.apache.spark.sql.delta.test.shims.GridTestShim - -import org.apache.spark.sql.QueryTest -import org.apache.spark.sql.test.SharedSparkSession - -class CoordinatedCommitsUtilsSuite extends QueryTest - with GridTestShim - with SharedSparkSession - with CoordinatedCommitsTestUtils { - - ///////////////////////////////////////////////////////////////////////////////////////////// - // Test CoordinatedCommitsUtils.validateCoordinatedCommitsConfigurationsImpl STARTS // - ///////////////////////////////////////////////////////////////////////////////////////////// - - private val cNameKey = COORDINATED_COMMITS_COORDINATOR_NAME.key - private val cConfKey = COORDINATED_COMMITS_COORDINATOR_CONF.key - private val tableConfKey = COORDINATED_COMMITS_TABLE_CONF.key - private val cName = cNameKey -> "some-cc-name" - private val cConf = cConfKey -> "some-cc-conf" - private val tableConf = tableConfKey -> "some-table-conf" - - private val cNameDefaultKey = COORDINATED_COMMITS_COORDINATOR_NAME.defaultTablePropertyKey - private val cConfDefaultKey = COORDINATED_COMMITS_COORDINATOR_CONF.defaultTablePropertyKey - private val tableConfDefaultKey = COORDINATED_COMMITS_TABLE_CONF.defaultTablePropertyKey - private val cNameDefault = cNameDefaultKey -> "some-cc-name" - private val cConfDefault = cConfDefaultKey -> "some-cc-conf" - private val tableConfDefault = tableConfDefaultKey -> "some-table-conf" - - private val command = "CLONE" - - private def errCannotOverride = new DeltaIllegalArgumentException( - "DELTA_CANNOT_OVERRIDE_COORDINATED_COMMITS_CONFS", Array(command)) - - private def errMissingConfInCommand(key: String) = new DeltaIllegalArgumentException( - "DELTA_MUST_SET_ALL_COORDINATED_COMMITS_CONFS_IN_COMMAND", Array(command, key)) - - private def errMissingConfInSession(key: String) = new DeltaIllegalArgumentException( - "DELTA_MUST_SET_ALL_COORDINATED_COMMITS_CONFS_IN_SESSION", Array(command, key)) - - private def errTableConfInCommand = new DeltaIllegalArgumentException( - "DELTA_CONF_OVERRIDE_NOT_SUPPORTED_IN_COMMAND", Array(command, tableConfKey)) - - private def errTableConfInSession = new DeltaIllegalArgumentException( - "DELTA_CONF_OVERRIDE_NOT_SUPPORTED_IN_SESSION", - Array(command, tableConfDefaultKey, tableConfDefaultKey)) - - private def testValidationForCreateDeltaTableCommand( - tableExists: Boolean, - propertyOverrides: Map[String, String], - defaultConfs: Seq[(String, String)], - errorOpt: Option[DeltaIllegalArgumentException]): Unit = { - withoutDefaultCCTableFeature { - withSQLConf(defaultConfs: _*) { - if (errorOpt.isDefined) { - val e = intercept[DeltaIllegalArgumentException] { - CoordinatedCommitsUtils.validateConfigurationsForCreateDeltaTableCommandImpl( - spark, propertyOverrides, tableExists, command) - } - checkError( - e, - errorOpt.get.getErrorClass, - sqlState = errorOpt.get.getSqlState, - parameters = errorOpt.get.getMessageParameters.asScala.toMap) - } else { - CoordinatedCommitsUtils.validateConfigurationsForCreateDeltaTableCommandImpl( - spark, propertyOverrides, tableExists, command) - } - } - } - } - - // tableExists: True - // | False - // - // propertyOverrides: Map.empty - // | Map(cName) - // | Map(cName, cConf) - // | Map(cName, cConf, tableConf) - // | Map(tableConf) - // - // defaultConf: Seq.empty - // | Seq(cNameDefault) - // | Seq(cNameDefault, cConfDefault) - // | Seq(cNameDefault, cConfDefault, tableConfDefault) - // | Seq(tableConfDefault) - // - // errorOpt: None - // | Some(errCannotOverride) - // | Some(errMissingConfInCommand(cConfKey)) - // | Some(errMissingConfInSession(cConfKey)) - // | Some(errTableConfInCommand) - // | Some(errTableConfInSession) - - gridTest("During CLONE, CoordinatedCommitsUtils.validateCoordinatedCommitsConfigurationsImpl " + - "passes for existing target tables with no explicit Coordinated Commits Configurations.") ( - Seq( - Seq.empty, - // Not having any explicit Coordinated Commits configurations, but having an illegal - // combination of Coordinated Commits configurations in default: pass. - // This is because we don't consider default configurations when the table exists. - Seq(cNameDefault), - Seq(cNameDefault, cConfDefault), - Seq(cNameDefault, cConfDefault, tableConfDefault), - Seq(tableConfDefault) - ) - ) { defaultConfs: Seq[(String, String)] => - testValidationForCreateDeltaTableCommand( - tableExists = true, - propertyOverrides = Map.empty, - defaultConfs, - errorOpt = None) - } - - gridTest("During CLONE, CoordinatedCommitsUtils.validateCoordinatedCommitsConfigurationsImpl " + - "fails for existing target tables with any explicit Coordinated Commits Configurations.") ( - Seq( - (Map(cName), Seq.empty), - (Map(cName), Seq(cNameDefault)), - (Map(cName), Seq(cNameDefault, cConfDefault)), - (Map(cName), Seq(cNameDefault, cConfDefault, tableConfDefault)), - (Map(cName), Seq(tableConfDefault)), - - (Map(cName, cConf), Seq.empty), - (Map(cName, cConf), Seq(cNameDefault)), - (Map(cName, cConf), Seq(cNameDefault, cConfDefault)), - (Map(cName, cConf), Seq(cNameDefault, cConfDefault, tableConfDefault)), - (Map(cName, cConf), Seq(tableConfDefault)), - - (Map(cName, cConf, tableConf), Seq.empty), - (Map(cName, cConf, tableConf), Seq(cNameDefault)), - (Map(cName, cConf, tableConf), Seq(cNameDefault, cConfDefault)), - (Map(cName, cConf, tableConf), Seq(cNameDefault, cConfDefault, tableConfDefault)), - (Map(cName, cConf, tableConf), Seq(tableConfDefault)), - - (Map(tableConf), Seq.empty), - (Map(tableConf), Seq(cNameDefault)), - (Map(tableConf), Seq(cNameDefault, cConfDefault)), - (Map(tableConf), Seq(cNameDefault, cConfDefault, tableConfDefault)), - (Map(tableConf), Seq(tableConfDefault)) - ) - ) { case ( - propertyOverrides: Map[String, String], - defaultConfs: Seq[(String, String)]) => - testValidationForCreateDeltaTableCommand( - tableExists = true, - propertyOverrides, - defaultConfs, - errorOpt = Some(errCannotOverride)) - } - - gridTest("During CLONE, CoordinatedCommitsUtils.validateCoordinatedCommitsConfigurationsImpl " + - "works correctly for new target tables with default Coordinated Commits Configurations.") ( - Seq( - (Seq.empty, None), - (Seq(cNameDefault), Some(errMissingConfInSession(cConfDefaultKey))), - (Seq(cNameDefault, cConfDefault), None), - (Seq(cNameDefault, cConfDefault, tableConfDefault), Some(errTableConfInSession)), - (Seq(tableConfDefault), Some(errTableConfInSession)) - ) - ) { case ( - defaultConfs: Seq[(String, String)], - errorOpt: Option[DeltaIllegalArgumentException]) => - testValidationForCreateDeltaTableCommand( - tableExists = false, - propertyOverrides = Map.empty, - defaultConfs, - errorOpt) - } - - gridTest("During CLONE, CoordinatedCommitsUtils.validateCoordinatedCommitsConfigurationsImpl " + - "fails for new target tables with any illegal explicit Coordinated Commits Configurations.") ( - Seq( - (Map(cName), Seq.empty, Some(errMissingConfInCommand(cConfKey))), - (Map(cName), Seq(cNameDefault), Some(errMissingConfInCommand(cConfKey))), - (Map(cName), Seq(cNameDefault, cConfDefault), Some(errMissingConfInCommand(cConfKey))), - (Map(cName), Seq(cNameDefault, cConfDefault, tableConfDefault), - Some(errMissingConfInCommand(cConfKey))), - (Map(cName), Seq(tableConfDefault), Some(errMissingConfInCommand(cConfKey))), - - (Map(cName, cConf, tableConf), Seq.empty, Some(errTableConfInCommand)), - (Map(cName, cConf, tableConf), Seq(cNameDefault), Some(errTableConfInCommand)), - (Map(cName, cConf, tableConf), Seq(cNameDefault, cConfDefault), Some(errTableConfInCommand)), - (Map(cName, cConf, tableConf), Seq(cNameDefault, cConfDefault, tableConfDefault), - Some(errTableConfInCommand)), - (Map(cName, cConf, tableConf), Seq(tableConfDefault), Some(errTableConfInCommand)), - - (Map(tableConf), Seq.empty, Some(errTableConfInCommand)), - (Map(tableConf), Seq(cNameDefault), Some(errTableConfInCommand)), - (Map(tableConf), Seq(cNameDefault, cConfDefault), Some(errTableConfInCommand)), - (Map(tableConf), Seq(cNameDefault, cConfDefault, tableConfDefault), - Some(errTableConfInCommand)), - (Map(tableConf), Seq(tableConfDefault), Some(errTableConfInCommand)) - ) - ) { case ( - propertyOverrides: Map[String, String], - defaultConfs: Seq[(String, String)], - errorOpt: Option[DeltaIllegalArgumentException]) => - testValidationForCreateDeltaTableCommand( - tableExists = false, - propertyOverrides, - defaultConfs, - errorOpt) - } - - gridTest("During CLONE, CoordinatedCommitsUtils.validateCoordinatedCommitsConfigurationsImpl " + - "passes for new target tables with legal explicit Coordinated Commits Configurations.") ( - Seq( - // Having exactly Coordinator Name and Coordinator Conf explicitly, but having an illegal - // combination of Coordinated Commits configurations in default: pass. - // This is because we don't consider default configurations when explicit ones are provided. - Seq.empty, - Seq(cNameDefault), - Seq(cNameDefault, cConfDefault), - Seq(cNameDefault, cConfDefault, tableConfDefault), - Seq(tableConfDefault) - ) - ) { defaultConfs: Seq[(String, String)] => - testValidationForCreateDeltaTableCommand( - tableExists = false, - propertyOverrides = Map(cName, cConf), - defaultConfs, - errorOpt = None) - } - - private def testValidateConfigurationsForAlterTableSetPropertiesDeltaCommand( - existingConfs: Map[String, String], - propertyOverrides: Map[String, String], - errorOpt: Option[DeltaIllegalArgumentException]): Unit = { - if (errorOpt.isDefined) { - val e = intercept[DeltaIllegalArgumentException] { - CoordinatedCommitsUtils.validateConfigurationsForAlterTableSetPropertiesDeltaCommand( - existingConfs, propertyOverrides) - } - checkError( - e, - errorOpt.get.getErrorClass, - sqlState = errorOpt.get.getSqlState, - parameters = errorOpt.get.getMessageParameters.asScala.toMap) - } else { - CoordinatedCommitsUtils.validateConfigurationsForAlterTableSetPropertiesDeltaCommand( - existingConfs, propertyOverrides) - } - } - - gridTest("During ALTER, `validateConfigurationsForAlterTableSetPropertiesDeltaCommand` " + - "works correctly for tables without Coordinated Commits configurations.") { - Seq( - (Map.empty, None), - (Map(cName), Some(new DeltaIllegalArgumentException( - "DELTA_MUST_SET_ALL_COORDINATED_COMMITS_CONFS_IN_COMMAND", Array("ALTER", cConfKey)))), - (Map(cName, cConf), None), - (Map(cName, cConf, tableConf), Some(new DeltaIllegalArgumentException( - "DELTA_CONF_OVERRIDE_NOT_SUPPORTED_IN_COMMAND", Array("ALTER", tableConfKey)))), - (Map(tableConf), Some(new DeltaIllegalArgumentException( - "DELTA_CONF_OVERRIDE_NOT_SUPPORTED_IN_COMMAND", Array("ALTER", tableConfKey)))) - ) - } { case ( - propertyOverrides: Map[String, String], - errorOpt: Option[DeltaIllegalArgumentException]) => - testValidateConfigurationsForAlterTableSetPropertiesDeltaCommand( - existingConfs = Map.empty, - propertyOverrides, - errorOpt) - } - - test("During ALTER, `validateConfigurationsForAlterTableSetPropertiesDeltaCommand` " + - "passes with no overrides for tables with Coordinated Commits configurations.") { - testValidateConfigurationsForAlterTableSetPropertiesDeltaCommand( - existingConfs = Map(cName, cConf, tableConf), - propertyOverrides = Map.empty, - errorOpt = None) - } - - gridTest("During ALTER, `validateConfigurationsForAlterTableSetPropertiesDeltaCommand` " + - "fails with overrides for tables with Coordinated Commits configurations.") ( - Seq( - Map(cName), - Map(cName, cConf), - Map(cName, cConf, tableConf), - Map(tableConf) - ) - ) { propertyOverrides: Map[String, String] => - testValidateConfigurationsForAlterTableSetPropertiesDeltaCommand( - existingConfs = Map(cName, cConf, tableConf), - propertyOverrides, - errorOpt = Some(new DeltaIllegalArgumentException( - "DELTA_CANNOT_OVERRIDE_COORDINATED_COMMITS_CONFS", Array("ALTER")))) - } - - private def errCannotUnset = new DeltaIllegalArgumentException( - "DELTA_CANNOT_UNSET_COORDINATED_COMMITS_CONFS", Array.empty) - - private def testValidateConfigurationsForAlterTableUnsetPropertiesDeltaCommand( - existingConfs: Map[String, String], - propKeysToUnset: Seq[String], - errorOpt: Option[DeltaIllegalArgumentException]): Unit = { - if (errorOpt.isDefined) { - val e = intercept[DeltaIllegalArgumentException] { - CoordinatedCommitsUtils.validateConfigurationsForAlterTableUnsetPropertiesDeltaCommand( - existingConfs, propKeysToUnset) - } - checkError( - e, - errorOpt.get.getErrorClass, - sqlState = errorOpt.get.getSqlState, - parameters = errorOpt.get.getMessageParameters.asScala.toMap) - } else { - CoordinatedCommitsUtils.validateConfigurationsForAlterTableUnsetPropertiesDeltaCommand( - existingConfs, propKeysToUnset) - } - } - - gridTest("During ALTER, `validateConfigurationsForAlterTableUnsetPropertiesDeltaCommand` " + - "fails with overrides for tables with Coordinated Commits configurations.") { - Seq( - Seq(cNameKey), - Seq(cNameKey, cConfKey), - Seq(cNameKey, cConfKey, tableConfKey), - Seq(tableConfKey) - ) - } { propKeysToUnset: Seq[String] => - testValidateConfigurationsForAlterTableUnsetPropertiesDeltaCommand( - existingConfs = Map(cName, cConf, tableConf), - propKeysToUnset, - errorOpt = Some(errCannotUnset)) - } - - gridTest("During ALTER, `validateConfigurationsForAlterTableUnsetPropertiesDeltaCommand` " + - "passes with no overrides for tables with or without Coordinated Commits configurations.") { - Seq( - Map.empty, - Map(cName, cConf, tableConf) - ) - } { case existingConfs: Map[String, String] => - testValidateConfigurationsForAlterTableUnsetPropertiesDeltaCommand( - existingConfs, - propKeysToUnset = Seq.empty, - errorOpt = None) - } - - gridTest("During ALTER, `validateConfigurationsForAlterTableUnsetPropertiesDeltaCommand` " + - "passes with overrides for tables without Coordinated Commits configurations.") { - Seq( - Seq(cNameKey), - Seq(cNameKey, cConfKey), - Seq(cNameKey, cConfKey, tableConfKey), - Seq(tableConfKey) - ) - } { propKeysToUnset: Seq[String] => - testValidateConfigurationsForAlterTableUnsetPropertiesDeltaCommand( - existingConfs = Map.empty, - propKeysToUnset, - errorOpt = None) - } - - ///////////////////////////////////////////////////////////////////////////////////////////// - // Test CoordinatedCommitsUtils.validateCoordinatedCommitsConfigurationsImpl ENDS // - ///////////////////////////////////////////////////////////////////////////////////////////// -}