Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.delta.DataFrameUtils
import org.apache.spark.sql.delta.skipping.clustering.{ClusteredTableUtils, ClusteringColumnInfo}
import org.apache.spark.sql.delta.skipping.clustering.temp.ClusterBySpec
import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.commands.WriteIntoDelta
import org.apache.spark.sql.delta.commands.{DeltaCommand, WriteIntoDelta}
import org.apache.spark.sql.delta.commands.cdc.CDCReader
import org.apache.spark.sql.delta.metering.{DeltaLogging, DeltaLoggingProvider}
import org.apache.spark.sql.delta.sources.{DeltaDataSource, DeltaSourceUtils}
Expand All @@ -42,14 +42,16 @@ import org.apache.spark.sql.catalyst.analysis.{ResolvedTable, UnresolvedTable}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils}
import org.apache.spark.sql.catalyst.plans.logical.{AnalysisHelper, LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability, TableCatalog, V2TableWithV1Fallback}
import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability, TableCatalog, TruncatableTable, V2TableWithV1Fallback}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.catalog.TableCapability._
import org.apache.spark.sql.connector.catalog.V1Table
import org.apache.spark.sql.connector.expressions._
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, SupportsDynamicOverwrite, SupportsOverwrite, SupportsTruncate, V1Write, WriteBuilder}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.metric.SQLMetrics.{createMetric, createTimingMetric}
import org.apache.spark.sql.sources.{BaseRelation, Filter, InsertableRelation}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand All @@ -70,9 +72,11 @@ class DeltaTableV2 private(
val options: Map[String, String])
extends Table
with SupportsWrite
with TruncatableTable
with V2TableWithV1Fallback
with DeltaLogging
with DeltaLoggingProvider {
with DeltaLoggingProvider
with DeltaCommand {

case class PathInfo(
rootPath: Path,
Expand Down Expand Up @@ -271,6 +275,44 @@ class DeltaTableV2 private(
V1_BATCH_WRITE, OVERWRITE_BY_FILTER, TRUNCATE, OVERWRITE_DYNAMIC
).asJava

override def truncateTable(): Boolean = recordDeltaOperation(deltaLog, "delta.truncateTable") {
val metrics = Map[String, SQLMetric](
"numRemovedFiles" -> createMetric(spark.sparkContext, "number of files removed."),
"executionTimeMs" ->
createTimingMetric(spark.sparkContext, "time taken to execute the entire operation")
)

deltaLog.withNewTransaction(catalogTable) { txn =>
DeltaLog.assertRemovable(txn.snapshot)
if (hasBeenExecuted(txn, spark)) {
sendDriverMetrics(spark, metrics)
false
} else {
val startTime = System.nanoTime()
val addFiles = txn.filterFiles()
val removedFiles = addFiles.map(_.removeWithTimestamp(System.currentTimeMillis()))
val actions = createSetTransaction(spark, deltaLog).toSeq ++ removedFiles

metrics("numRemovedFiles").set(addFiles.size)
metrics("executionTimeMs").set((System.nanoTime() - startTime) / 1000 / 1000)
txn.registerSQLMetrics(spark, metrics)
sendDriverMetrics(spark, metrics)

val commitVersion = txn.commitIfNeeded(
actions = actions,
op = DeltaOperations.Truncate(),
tags = RowTracking.addPreservedRowTrackingTagIfNotSet(txn.snapshot))
recordDeltaEvent(
deltaLog,
"delta.dml.truncate.stats",
data = TruncateMetric(
numRemovedFiles = addFiles.size,
commitVersion = commitVersion))
commitVersion.isDefined
}
Comment thread
foss-contributor marked this conversation as resolved.
}
Comment thread
foss-contributor marked this conversation as resolved.
}

def tableExists: Boolean = deltaLog.tableExists


Expand Down Expand Up @@ -687,3 +729,7 @@ private class WriteIntoDeltaBuilder(
}
}
}

case class TruncateMetric(
numRemovedFiles: Long,
commitVersion: Option[Long] = None)
Loading
Loading