diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala index 9508477ad28..5faeacccafc 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.delta.DataFrameUtils import org.apache.spark.sql.delta.skipping.clustering.{ClusteredTableUtils, ClusteringColumnInfo} import org.apache.spark.sql.delta.skipping.clustering.temp.ClusterBySpec import org.apache.spark.sql.delta._ -import org.apache.spark.sql.delta.commands.WriteIntoDelta +import org.apache.spark.sql.delta.commands.{DeltaCommand, WriteIntoDelta} import org.apache.spark.sql.delta.commands.cdc.CDCReader import org.apache.spark.sql.delta.metering.{DeltaLogging, DeltaLoggingProvider} import org.apache.spark.sql.delta.sources.{DeltaDataSource, DeltaSourceUtils} @@ -42,7 +42,7 @@ import org.apache.spark.sql.catalyst.analysis.{ResolvedTable, UnresolvedTable} import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils} import org.apache.spark.sql.catalyst.plans.logical.{AnalysisHelper, LogicalPlan, SubqueryAlias} import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes -import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability, TableCatalog, V2TableWithV1Fallback} +import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability, TableCatalog, TruncatableTable, V2TableWithV1Fallback} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ import org.apache.spark.sql.connector.catalog.TableCapability._ import org.apache.spark.sql.connector.catalog.V1Table @@ -50,6 +50,8 @@ import org.apache.spark.sql.connector.expressions._ import org.apache.spark.sql.connector.write.{LogicalWriteInfo, SupportsDynamicOverwrite, SupportsOverwrite, SupportsTruncate, V1Write, WriteBuilder} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.execution.metric.SQLMetrics.{createMetric, createTimingMetric} import org.apache.spark.sql.sources.{BaseRelation, Filter, InsertableRelation} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -70,9 +72,11 @@ class DeltaTableV2 private( val options: Map[String, String]) extends Table with SupportsWrite + with TruncatableTable with V2TableWithV1Fallback with DeltaLogging - with DeltaLoggingProvider { + with DeltaLoggingProvider + with DeltaCommand { case class PathInfo( rootPath: Path, @@ -271,6 +275,44 @@ class DeltaTableV2 private( V1_BATCH_WRITE, OVERWRITE_BY_FILTER, TRUNCATE, OVERWRITE_DYNAMIC ).asJava + override def truncateTable(): Boolean = recordDeltaOperation(deltaLog, "delta.truncateTable") { + val metrics = Map[String, SQLMetric]( + "numRemovedFiles" -> createMetric(spark.sparkContext, "number of files removed."), + "executionTimeMs" -> + createTimingMetric(spark.sparkContext, "time taken to execute the entire operation") + ) + + deltaLog.withNewTransaction(catalogTable) { txn => + DeltaLog.assertRemovable(txn.snapshot) + if (hasBeenExecuted(txn, spark)) { + sendDriverMetrics(spark, metrics) + false + } else { + val startTime = System.nanoTime() + val addFiles = txn.filterFiles() + val removedFiles = addFiles.map(_.removeWithTimestamp(System.currentTimeMillis())) + val actions = createSetTransaction(spark, deltaLog).toSeq ++ removedFiles + + metrics("numRemovedFiles").set(addFiles.size) + metrics("executionTimeMs").set((System.nanoTime() - startTime) / 1000 / 1000) + txn.registerSQLMetrics(spark, metrics) + sendDriverMetrics(spark, metrics) + + val commitVersion = txn.commitIfNeeded( + actions = actions, + op = DeltaOperations.Truncate(), + tags = RowTracking.addPreservedRowTrackingTagIfNotSet(txn.snapshot)) + recordDeltaEvent( + deltaLog, + "delta.dml.truncate.stats", + data = TruncateMetric( + numRemovedFiles = addFiles.size, + commitVersion = commitVersion)) + commitVersion.isDefined + } + } + } + def tableExists: Boolean = deltaLog.tableExists @@ -687,3 +729,7 @@ private class WriteIntoDeltaBuilder( } } } + +case class TruncateMetric( + numRemovedFiles: Long, + commitVersion: Option[Long] = None) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTruncateTableSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTruncateTableSuite.scala new file mode 100644 index 00000000000..df61fee14cd --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTruncateTableSuite.scala @@ -0,0 +1,312 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import java.sql.Timestamp +import java.util.Locale + +import org.apache.spark.sql.delta.coordinatedcommits.CatalogOwnedTestBaseSuite +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest + +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.types.StructType + +class DeltaTruncateTableSuite + extends QueryTest + with DeltaSQLCommandTest + with CatalogOwnedTestBaseSuite { + + import DeltaTruncateTableSuite._ + + private final val MutableCatalogProperties: Set[String] = Set( + "delta.lastCommitTimestamp", + "delta.lastUpdateVersion", + "transient_lastDdlTime") + + private def deltaLogForTable(tableName: String): DeltaLog = + DeltaLog.forTable(spark, TableIdentifier(tableName)) + + private def activeFileCount(log: DeltaLog): Long = log.update().allFiles.count() + + private def latestHistory(tableName: String): Row = + sql(s"DESCRIBE HISTORY $tableName LIMIT 1").head() + + private def tableProperties(tableName: String): Map[String, String] = { + sql(s"SHOW TBLPROPERTIES $tableName") + .collect() + .map(row => row.getString(0) -> row.getString(1)) + .toMap + } + + private def stableTableProperties(properties: Map[String, String]): Map[String, String] = { + properties -- MutableCatalogProperties + } + + private def getStringSeq(row: Row, fieldName: String): Seq[String] = { + row.getAs[scala.collection.Seq[String]](fieldName).toVector + } + + private def getStringMap(row: Row, fieldName: String): Map[String, String] = { + row.getAs[scala.collection.Map[String, String]](fieldName).toMap + } + + private def getDetails(tableName: String): TableDetails = { + val row = sql(s"DESCRIBE DETAIL $tableName").head() + val stable = StableTableDetails( + format = row.getAs[String]("format"), + id = row.getAs[String]("id"), + name = row.getAs[String]("name"), + description = row.getAs[String]("description"), + location = row.getAs[String]("location"), + createdAt = row.getAs[Timestamp]("createdAt"), + partitionColumns = getStringSeq(row, "partitionColumns"), + clusteringColumns = getStringSeq(row, "clusteringColumns"), + properties = getStringMap(row, "properties"), + minReaderVersion = row.getAs[Int]("minReaderVersion"), + minWriterVersion = row.getAs[Int]("minWriterVersion"), + tableFeatures = getStringSeq(row, "tableFeatures")) + + TableDetails( + stable = stable, + numFiles = row.getAs[Long]("numFiles"), + sizeInBytes = row.getAs[Long]("sizeInBytes")) + } + + private def getState(log: DeltaLog): TableState = { + val snapshot = log.update() + TableState( + minReaderVersion = snapshot.protocol.minReaderVersion, + minWriterVersion = snapshot.protocol.minWriterVersion, + readerFeatures = snapshot.protocol.readerFeatures.getOrElse(Set.empty), + writerFeatures = snapshot.protocol.writerFeatures.getOrElse(Set.empty), + configuration = snapshot.metadata.configuration, + schema = snapshot.metadata.schema, + partitionColumns = snapshot.metadata.partitionColumns) + } + + private def captureSnapshot(log: DeltaLog, tableName: String): TableSnapshot = { + val state = getState(log) + val details = getDetails(tableName) + val props = tableProperties(tableName) + TableSnapshot(state, details.stable, stableTableProperties(props)) + } + + private def assertSnapshot( + log: DeltaLog, + tableName: String, + expected: TableSnapshot): Unit = { + val actual = captureSnapshot(log, tableName) + assert(actual.state == expected.state) + assert(actual.stableDetails === expected.stableDetails) + assert(actual.stableProperties === expected.stableProperties) + } + + Seq(false, true).foreach { partitioned => + val desc = if (partitioned) "partitioned" else "unpartitioned" + test(s"truncate $desc table removes all data and preserves metadata") { + val tableName = s"truncate_$desc" + withTable(tableName) { + val partitionClause = if (partitioned) "PARTITIONED BY (part)" else "" + sql( + s""" + |CREATE TABLE $tableName (id LONG, part LONG) + |USING delta + |$partitionClause + |TBLPROPERTIES ('truncateTestProp' = 'preserved') + |COMMENT 'metadata survives truncate' + |""".stripMargin) + val log = deltaLogForTable(tableName) + val pathRef = s"delta.`${log.dataPath}`" + val snapshotBefore = captureSnapshot(log, tableName) + + Seq(tableName, pathRef).foreach { truncateTarget => + sql(s"INSERT INTO $tableName VALUES (0, 0), (1, 1), (2, 0), (3, 1)") + + sql(s"TRUNCATE TABLE $truncateTarget") + + checkAnswer(sql(s"SELECT * FROM $tableName"), Nil) + assert(activeFileCount(log) === 0) + + val history = latestHistory(tableName) + assert(history.getAs[String]("operation") === "TRUNCATE") + + assertSnapshot(log, tableName, snapshotBefore) + val detailsAfter = getDetails(tableName) + assert(detailsAfter.numFiles === 0) + assert(detailsAfter.sizeInBytes === 0) + } + } + } + } + + test("truncate empty delta table is a no-op") { + val tableName = "truncate_empty" + withTable(tableName) { + sql(s"CREATE TABLE $tableName (id LONG) USING delta") + + val log = deltaLogForTable(tableName) + val pathRef = s"delta.`${log.dataPath}`" + val versionBeforeTruncate = log.update().version + + val snapshotBefore = captureSnapshot(log, tableName) + + Seq(tableName, pathRef).foreach { truncateTarget => + sql(s"TRUNCATE TABLE $truncateTarget") + + assert(log.update().version === versionBeforeTruncate) + + assertSnapshot(log, tableName, snapshotBefore) + } + } + } + + test("truncate is idempotent: second truncate on empty table is a no-op") { + val tableName = "truncate_idempotent" + withTable(tableName) { + sql(s"CREATE TABLE $tableName (id LONG) USING delta") + sql(s"INSERT INTO $tableName VALUES (1), (2)") + + val log = deltaLogForTable(tableName) + val pathRef = s"delta.`${log.dataPath}`" + val snapshotBefore = captureSnapshot(log, tableName) + + Seq(tableName, pathRef).foreach { truncateTarget => + sql(s"INSERT INTO $tableName VALUES (1), (2)") + + sql(s"TRUNCATE TABLE $truncateTarget") + val versionAfterFirstTruncate = log.update().version + checkAnswer(sql(s"SELECT * FROM $tableName"), Nil) + assertSnapshot(log, tableName, snapshotBefore) + + sql(s"TRUNCATE TABLE $truncateTarget") + assert(log.update().version === versionAfterFirstTruncate) + checkAnswer(sql(s"SELECT * FROM $tableName"), Nil) + assertSnapshot(log, tableName, snapshotBefore) + } + } + } + + test("truncate append-only table fails atomically") { + val tableName = "truncate_append_only" + withTable(tableName) { + sql( + s""" + |CREATE TABLE $tableName (id LONG) + |USING delta + |TBLPROPERTIES ('delta.appendOnly' = 'true') + |""".stripMargin) + sql(s"INSERT INTO $tableName VALUES (1), (2)") + + val log = deltaLogForTable(tableName) + val pathRef = s"delta.`${log.dataPath}`" + val versionBeforeTruncate = log.update().version + + val snapshotBefore = captureSnapshot(log, tableName) + + Seq(tableName, pathRef).foreach { truncateTarget => + val e = intercept[DeltaUnsupportedOperationException] { + sql(s"TRUNCATE TABLE $truncateTarget") + } + checkError( + e, + "DELTA_CANNOT_MODIFY_APPEND_ONLY", + parameters = Map("table_name" -> "null", "config" -> DeltaConfigs.IS_APPEND_ONLY.key)) + + checkAnswer(sql(s"SELECT id FROM $tableName ORDER BY id"), Seq(Row(1), Row(2))) + assert(log.update().version === versionBeforeTruncate) + + assertSnapshot(log, tableName, snapshotBefore) + } + } + } + + + test("truncate with partition spec is rejected and leaves table unchanged") { + val tableName = "truncate_partition_spec" + withTable(tableName) { + spark.range(start = 0, end = 4, step = 1, numPartitions = 2) + .selectExpr("id", "id % 2 AS part") + .write.format("delta").partitionBy("part").saveAsTable(tableName) + + val log = deltaLogForTable(tableName) + val pathRef = s"delta.`${log.dataPath}`" + val versionBeforeTruncate = log.update().version + + val snapshotBefore = captureSnapshot(log, tableName) + + Seq(tableName, pathRef).foreach { truncateTarget => + val e = intercept[Exception] { + sql(s"TRUNCATE TABLE $truncateTarget PARTITION (part = 1)") + } + assert( + Option(e.getMessage).exists { message => + val lowerCaseMessage = message.toLowerCase(Locale.ROOT) + message.contains("DELTA_TRUNCATE_TABLE_PARTITION_NOT_SUPPORTED") || + lowerCaseMessage.contains("truncate") && lowerCaseMessage.contains("partition") + }) + + checkAnswer( + sql(s"SELECT id, part FROM $tableName ORDER BY id"), + Seq(Row(0, 0), Row(1, 1), Row(2, 0), Row(3, 1))) + assert(log.update().version === versionBeforeTruncate) + + assertSnapshot(log, tableName, snapshotBefore) + } + } + } +} + +class DeltaTruncateTableWithCatalogOwnedBatch1Suite extends DeltaTruncateTableSuite { + override def catalogOwnedCoordinatorBackfillBatchSize: Option[Int] = Some(1) +} + +object DeltaTruncateTableSuite { + + private final case class TableState( + minReaderVersion: Int, + minWriterVersion: Int, + readerFeatures: Set[String], + writerFeatures: Set[String], + configuration: Map[String, String], + schema: StructType, + partitionColumns: Seq[String]) + + private final case class StableTableDetails( + format: String, + id: String, + name: String, + description: String, + location: String, + createdAt: Timestamp, + partitionColumns: Seq[String], + clusteringColumns: Seq[String], + properties: Map[String, String], + minReaderVersion: Int, + minWriterVersion: Int, + tableFeatures: Seq[String]) + + private final case class TableDetails( + stable: StableTableDetails, + numFiles: Long, + sizeInBytes: Long) + + private final case class TableSnapshot( + state: TableState, + stableDetails: StableTableDetails, + stableProperties: Map[String, String]) +} diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/TruncateTableMetricsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/TruncateTableMetricsSuite.scala new file mode 100644 index 00000000000..0295d334d9c --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/delta/TruncateTableMetricsSuite.scala @@ -0,0 +1,82 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import org.apache.spark.sql.delta.DeltaLog +import org.apache.spark.sql.delta.catalog.TruncateMetric +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest +import org.apache.spark.sql.delta.util.JsonUtils + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.test.SharedSparkSession + +class TruncateTableMetricsSuite extends QueryTest + with SharedSparkSession + with DeltaSQLCommandTest { + + private final val numPartitionsToTest: Seq[Int] = Seq(1, 2, 4, 8, 16, 32, 64) + private final val testTable: String = "t" + + private def forEachTruncateTarget(numPartitions: Int)(action: String => Unit): Unit = + withTable(testTable) { + spark.range(0L).write.format("delta").saveAsTable(testTable) + val pathRef = s"delta.`${DeltaLog.forTable(spark, TableIdentifier(testTable)).dataPath}`" + Seq(testTable, pathRef).foreach { truncateTarget => + spark.range(start = 0, end = numPartitions * 10L, step = 1, numPartitions = numPartitions) + .write.mode("append").format("delta").saveAsTable(testTable) + action(truncateTarget) + } + } + + private def truncateAndCheck(truncateTarget: String, expectedNumRemovedFiles: Long): Unit = { + spark.sql(s"TRUNCATE TABLE $truncateTarget") + val metrics = DeltaMetricsUtils.getLastOperationMetrics(testTable) + assert(metrics.keySet === Set("numRemovedFiles", "executionTimeMs")) + assert(metrics("numRemovedFiles") === expectedNumRemovedFiles) + assert(metrics("executionTimeMs") >= 0) // default value is -1 in SQLMetric + } + + numPartitionsToTest.foreach { numPartitions => + test(s"truncate metrics: truncate non-empty table then empty table" + + s" (numPartitions=$numPartitions)") { + withSQLConf( + DeltaSQLConf.DELTA_HISTORY_METRICS_ENABLED.key -> "true", + DeltaSQLConf.DELTA_SKIP_RECORDING_EMPTY_COMMITS.key -> "false" + ) { + forEachTruncateTarget(numPartitions) { truncateTarget => + truncateAndCheck(truncateTarget, numPartitions) + truncateAndCheck(truncateTarget, 0) + } + } + } + + test(s"truncate emits delta.dml.truncate.stats usage log event" + + s" (numPartitions=$numPartitions)") { + forEachTruncateTarget(numPartitions) { truncateTarget => + val events = DeltaTestUtils.collectUsageLogs("delta.dml.truncate.stats") { + spark.sql(s"TRUNCATE TABLE $truncateTarget") + } + assert(events.size == 1) + val truncateMetric = JsonUtils.fromJson[TruncateMetric](events.head.blob) + assert(truncateMetric.numRemovedFiles === numPartitions) + assert(truncateMetric.commitVersion.isDefined) + } + } + } +} diff --git a/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableDDLTest.java b/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableDDLTest.java new file mode 100644 index 00000000000..ff014b70cae --- /dev/null +++ b/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableDDLTest.java @@ -0,0 +1,132 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sparkuctest; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class UCDeltaTableDDLTest extends UCDeltaTableIntegrationBaseTest { + + private static final Set MUTABLE_CATALOG_PROPERTIES = + Set.of("delta.lastCommitTimestamp", "delta.lastUpdateVersion", "transient_lastDdlTime"); + + // --------------------------------------------------------------------------- + // TRUNCATE TABLE + // --------------------------------------------------------------------------- + + @TestAllTableTypes + public void testTruncatePreservesMetadata(TableType tableType) throws Exception { + for (boolean partitioned : List.of(false, true)) { + String desc = partitioned ? "partitioned" : "unpartitioned"; + withNewTable( + "ddl_truncate_" + desc, + "id INT, name STRING, part INT", + partitioned ? "part" : null, + tableType, + tableName -> { + Map snapshotBefore = stableTableProperties(tableName); + for (String truncateTarget : truncateTargets(tableName, tableType)) { + sql( + "INSERT INTO %s VALUES (1, 'alpha', 0), (2, 'beta', 1), (3, 'gamma', 1)", + tableName); + truncateTable(truncateTarget); + check(tableName, List.of()); + assertPreservedTableSnapshot(tableName, snapshotBefore); + } + sql("INSERT INTO %s VALUES (4, 'delta', 0), (5, 'epsilon', 2)", tableName); + check(tableName, List.of(row("4", "delta", "0"), row("5", "epsilon", "2"))); + }); + } + } + + @Test + public void testTruncateByPathBlockedForManagedTable() throws Exception { + withNewTable( + "ddl_truncate_path_blocked", + "id INT", + TableType.MANAGED, + tableName -> { + sql("INSERT INTO %s VALUES (1), (2), (3)", tableName); + String tablePath = tableLocation(tableName); + Map snapshotBefore = stableTableProperties(tableName); + + Assertions.assertThrows( + Exception.class, () -> truncateTable(String.format("delta.`%s`", tablePath))); + + check(tableName, List.of(row("1"), row("2"), row("3"))); + assertPreservedTableSnapshot(tableName, snapshotBefore); + }); + } + + private List truncateTargets(String tableName, TableType tableType) { + List targets = new ArrayList<>(); + targets.add(tableName); + if (tableType == TableType.EXTERNAL) { + targets.add(String.format("delta.`%s`", tableLocation(tableName))); + } + return targets; + } + + private void truncateTable(String truncateTarget) { + if (truncateTarget.startsWith("delta.`")) { + S3CredentialFileSystem.credentialCheckEnabled = false; + try { + sql("TRUNCATE TABLE %s", truncateTarget); + } finally { + S3CredentialFileSystem.credentialCheckEnabled = true; + } + } else { + sql("TRUNCATE TABLE %s", truncateTarget); + } + } + + private String tableLocation(String tableName) { + return sql("DESCRIBE FORMATTED %s", tableName).stream() + .filter(row -> row.size() >= 2 && "Location".equalsIgnoreCase(row.get(0).trim())) + .map(row -> row.get(1).trim()) + .findFirst() + .orElseThrow(() -> new AssertionError("Could not retrieve table location")); + } + + private Map stableTableProperties(String tableName) throws Exception { + Map stable = new LinkedHashMap<>(tableProperties(tableName)); + stable.keySet().removeAll(MUTABLE_CATALOG_PROPERTIES); + return stable; + } + + private void assertPreservedTableSnapshot(String tableName, Map expected) + throws Exception { + assertThat(stableTableProperties(tableName)).isEqualTo(expected); + } + + private Map tableProperties(String tableName) { + Map properties = new LinkedHashMap<>(); + for (List row : sql("SHOW TBLPROPERTIES %s", tableName)) { + if (row.size() >= 2) { + properties.put(row.get(0), row.get(1)); + } + } + return properties; + } +}