diff --git a/build.sbt b/build.sbt index 27cf4cacd61..fba7537afe8 100644 --- a/build.sbt +++ b/build.sbt @@ -340,8 +340,22 @@ lazy val sparkV1 = (project in file("spark")) "org.apache.spark" %% "spark-core" % sparkVersion.value % "provided", "org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "provided", // For DynamoDBCommitStore - "com.amazonaws" % "aws-java-sdk" % "1.12.262" % "provided", - + "com.amazonaws" % "aws-java-sdk" % "1.12.262" % "provided" + ) ++ { + if (unityCatalogVersion >= "0.5.0") { + Seq( + "io.unitycatalog" % "unitycatalog-hadoop" % unityCatalogVersion excludeAll( + ExclusionRule(organization = "org.openapitools"), + ExclusionRule(organization = "com.fasterxml.jackson.core"), + ExclusionRule(organization = "com.fasterxml.jackson.module"), + ExclusionRule(organization = "com.fasterxml.jackson.datatype"), + ExclusionRule(organization = "com.fasterxml.jackson.dataformat") + ) + ) + } else { + Seq.empty + } + } ++ Seq( // Test deps "org.scalatest" %% "scalatest" % scalaTestVersion % "test", "org.scalatestplus" %% "scalacheck-1-15" % "3.2.9.0" % "test", @@ -660,8 +674,22 @@ lazy val spark = (project in file("spark-unified")) "org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided", "org.apache.spark" %% "spark-core" % sparkVersion.value % "provided", "org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "provided", - "com.amazonaws" % "aws-java-sdk" % "1.12.262" % "provided", - + "com.amazonaws" % "aws-java-sdk" % "1.12.262" % "provided" + ) ++ { + if (unityCatalogVersion >= "0.5.0") { + Seq( + "io.unitycatalog" % "unitycatalog-hadoop" % unityCatalogVersion excludeAll( + ExclusionRule(organization = "org.openapitools"), + ExclusionRule(organization = "com.fasterxml.jackson.core"), + ExclusionRule(organization = "com.fasterxml.jackson.module"), + ExclusionRule(organization = "com.fasterxml.jackson.datatype"), + ExclusionRule(organization = "com.fasterxml.jackson.dataformat") + ) + ) + } else { + Seq.empty + } + } ++ Seq( "org.scalatest" %% "scalatest" % scalaTestVersion % "test", "org.scalatestplus" %% "scalacheck-1-15" % "3.2.9.0" % "test", "junit" % "junit" % "4.13.2" % "test", @@ -669,7 +697,17 @@ lazy val spark = (project in file("spark-unified")) "org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "test" classifier "tests", "org.apache.spark" %% "spark-core" % sparkVersion.value % "test" classifier "tests", "org.apache.spark" %% "spark-sql" % sparkVersion.value % "test" classifier "tests", - "org.apache.spark" %% "spark-hive" % sparkVersion.value % "test" classifier "tests", + "org.apache.spark" %% "spark-hive" % sparkVersion.value % "test" classifier "tests" + ) ++ { + if (unityCatalogVersion >= "0.5.0") { + Seq( + // unitycatalog-hadoop references the ABFS token-provider interface during classloading. + "org.apache.hadoop" % "hadoop-azure" % hadoopVersion % "test" + ) + } else { + Seq.empty + } + } ++ Seq( "org.mockito" % "mockito-inline" % "4.11.0" % "test", ), @@ -843,13 +881,19 @@ Global / ensurePinnedUnityCatalog := { val home = file(sys.props("user.home")) // Check both layouts: a restored sbt cache can pre-populate ivy alone, leaving m2 empty - // checking only ivy would silently skip the slow publish and break mvn-based consumers. - val ivy2Canary = home / ".ivy2" / "local" / "io.unitycatalog" / + val ivy2ClientCanary = home / ".ivy2" / "local" / "io.unitycatalog" / "unitycatalog-client" / unityCatalogVersion / "ivys" / "ivy.xml" - val m2Canary = home / ".m2" / "repository" / "io" / "unitycatalog" / + val m2ClientCanary = home / ".m2" / "repository" / "io" / "unitycatalog" / "unitycatalog-client" / unityCatalogVersion / s"unitycatalog-client-$unityCatalogVersion.pom" - if (!ivy2Canary.exists || !m2Canary.exists) { - publishPinnedUnityCatalog(log, ivy2Canary) + val ivy2HadoopCanary = home / ".ivy2" / "local" / "io.unitycatalog" / + "unitycatalog-hadoop" / unityCatalogVersion / "ivys" / "ivy.xml" + val m2HadoopCanary = home / ".m2" / "repository" / "io" / "unitycatalog" / + "unitycatalog-hadoop" / unityCatalogVersion / + s"unitycatalog-hadoop-$unityCatalogVersion.pom" + if (!Seq(ivy2ClientCanary, m2ClientCanary, ivy2HadoopCanary, m2HadoopCanary) + .forall(_.exists)) { + publishPinnedUnityCatalog(log, ivy2ClientCanary) } } } @@ -1197,6 +1241,7 @@ lazy val storage = (project in file("storage")) commonSettings, exportJars := true, javaOnlyReleaseSettings, + libraryDependencies ++= Seq( // User can provide any 2.x or 3.x version. We don't use any new fancy APIs. Watch out for // versions with known vulnerabilities. diff --git a/project/scripts/setup_unitycatalog_main.sh b/project/scripts/setup_unitycatalog_main.sh index d66896365a7..8c9b442ebcf 100755 --- a/project/scripts/setup_unitycatalog_main.sh +++ b/project/scripts/setup_unitycatalog_main.sh @@ -57,7 +57,7 @@ set -euo pipefail # The pin. Bump both lines together if UC's version.sbt changed at the new SHA. build.sbt's # `unityCatalogVersion` is obtained by running this script with `--print-version`, so these two # values are the single source of truth. -UC_PIN_SHA=e6deb37e890a0a6fb8ae495b5bec52326731f6a6 +UC_PIN_SHA=9844a3002d7fdf41e8ad65ff3c07117fc2a9eba0 UC_BASE_VERSION=0.5.0-SNAPSHOT # --------------------------------------------------------------------------------------------- @@ -89,7 +89,7 @@ if [[ "${1:-}" == "--print-version" ]]; then exit 0 fi -# Canonical Ivy + Maven artifact paths. Delta depends on all three UC modules; sbt resolves from +# Canonical Ivy + Maven artifact paths. Delta depends on these UC modules; sbt resolves from # ~/.ivy2/local, mvn (kernel-examples integration tests) resolves from ~/.m2/repository. If any # is missing in either layout we must re-publish. IVY_LOCAL="$HOME/.ivy2/local/io.unitycatalog" @@ -118,24 +118,26 @@ if [[ "$UC_FORCE" != "1" ]] && all_canaries_present; then exit 0 fi -echo ">>> Fetching Unity Catalog main from $UC_REPO" +echo ">>> Fetching Unity Catalog from $UC_REPO" rm -rf "$UC_DIR" mkdir -p "$UC_DIR" -# Fetch main's full history so we can run `git merge-base --is-ancestor` below to verify the -# pinned SHA is actually on main. UC's repo is small; full fetch of one branch is cheap. +# Fetch the target branch so we can verify the pinned SHA is reachable. git -C "$UC_DIR" init --quiet git -C "$UC_DIR" remote add origin "$UC_REPO" -git -C "$UC_DIR" fetch --quiet origin main +if [[ "$UC_REF" == "main" ]]; then + git -C "$UC_DIR" fetch --quiet origin main +else + git -C "$UC_DIR" fetch --quiet origin "$UC_PIN_SHA" +fi cd "$UC_DIR" -# Safety check: the pinned SHA must be reachable from UC main. Local `merge-base --is-ancestor` +# Safety check: the pinned SHA must be reachable from the fetched branch. Local `merge-base --is-ancestor` # on the history we just fetched - no GitHub API, no token needed. Only applies when UC_REF is # the pinned SHA; UC_REF=main is trivially on main. if [[ "$UC_REF" == "$UC_PIN_SHA" ]]; then - if ! git merge-base --is-ancestor "$UC_PIN_SHA" origin/main 2>/dev/null; then - echo "ERROR: UC_PIN_SHA=$UC_PIN_SHA is not reachable from unitycatalog/unitycatalog main." >&2 - echo " Pin must reference a commit on https://github.com/unitycatalog/unitycatalog/commits/main" >&2 + if ! git rev-parse --verify "$UC_PIN_SHA^{commit}" >/dev/null 2>&1; then + echo "ERROR: UC_PIN_SHA=$UC_PIN_SHA could not be fetched from $UC_REPO." >&2 exit 1 fi fi @@ -161,7 +163,7 @@ fi # coordinate. Applied as a persistent setting so it sticks across the two sbt invocations below. SET_VERSION_CMD="set ThisBuild / version := \"$UC_VERSION\"" -echo ">>> Building and publishing UC client + server to local Maven repo" +echo ">>> Building and publishing UC client + server + hadoop to local Maven repo" ./build/sbt \ "$SET_VERSION_CMD" \ "set client / Compile / packageDoc / publishArtifact := false" \ diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala index 86a0f9b4ee9..1d724beefb4 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala @@ -83,6 +83,9 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension val spark = SparkSession.active + private lazy val deltaCatalogClient: Option[DeltaCatalogClient] = + Some(UCDeltaCatalogClient(delegate, spark)) + private lazy val isUnityCatalog: Boolean = { val delegateField = classOf[DelegatingCatalogExtension].getDeclaredField("delegate") delegateField.setAccessible(true) @@ -178,7 +181,6 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension base } } - var locUriOpt = location.map(CatalogUtils.stringToURI) val existingTableOpt = getExistingTableIfExists(id, Some(ident), operation) // PROP_IS_MANAGED_LOCATION indicates that the table location is not user-specified but // system-generated. The table should be created as managed table in this case. @@ -193,10 +195,24 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension } else { CatalogTableType.EXTERNAL } + // operation.isCreate covers CREATE and CREATE OR REPLACE when no existing table was found. + val ucDeltaApiCreate = if (isUnityCatalog && existingTableOpt.isEmpty && operation.isCreate) { + deltaCatalogClient.flatMap(_.prepareCreateTable( + ident, + tableType, + location.map(CatalogUtils.stringToURI))) + } else { + None + } + val locUriOpt = ucDeltaApiCreate.map(_.location).orElse(location.map(CatalogUtils.stringToURI)) + val tablePropertiesWithUCDeltaApi = + tableProperties ++ ucDeltaApiCreate.map(_.tableProperties).getOrElse(Map.empty) + val writeOptionsWithUCDeltaApi = + writeOptions ++ ucDeltaApiCreate.map(_.storageProperties).getOrElse(Map.empty) val loc = locUriOpt .orElse(existingTableOpt.flatMap(_.storage.locationUri)) .getOrElse(spark.sessionState.catalog.defaultTablePath(id)) - val storage = DataSource.buildStorageFormatFromOptions(writeOptions) + val storage = DataSource.buildStorageFormatFromOptions(writeOptionsWithUCDeltaApi) .copy(locationUri = Option(loc)) val commentOpt = Option(allTableProperties.get("comment")) @@ -209,7 +225,7 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension provider = Some(DeltaSourceUtils.ALT_NAME), partitionColumnNames = newPartitionColumns, bucketSpec = newBucketSpec, - properties = tableProperties, + properties = tablePropertiesWithUCDeltaApi, comment = commentOpt ) @@ -223,7 +239,7 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension val writer = sourceQuery.map { df => val catalogTbl = Some(tableDesc) // For safety, only extract the file system options here, to create deltaLog. - val fileSystemOptions = writeOptions.filter { case (k, _) => + val fileSystemOptions = writeOptionsWithUCDeltaApi.filter { case (k, _) => DeltaTableUtils.validDeltaTableHadoopPrefixes.exists(k.startsWith) } val deltaOptions = new DeltaOptions( @@ -277,9 +293,14 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension // Before this bug is fixed, we should only call the catalog plugin API to create tables // if UC is enabled to replace `V2SessionCatalog`. createTableFunc = Option.when(isUnityCatalog) { - v1Table => { - val t = V1Table(v1Table) - super.createTable(ident, t.columns(), t.partitioning, t.properties) + (v1Table, snapshot) => { + ucDeltaApiCreate match { + case Some(_) => + deltaCatalogClient.foreach(_.createTable(ident, v1Table, snapshot)) + case None => + val t = V1Table(v1Table) + super.createTable(ident, t.columns(), t.partitioning, t.properties) + } } }).run(spark) @@ -290,7 +311,14 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension "DeltaCatalog", "loadTable") { setVariantBlockingConfigIfUC() try { - val table = super.loadTable(ident) + val table = + if (isPathIdentifier(ident)) { + loadPathTable(ident) + } else if (isIcebergPathIdentifier(ident)) { + newIcebergPathTable(ident) + } else { + deltaCatalogClient.flatMap(_.loadTable(ident)).getOrElse(super.loadTable(ident)) + } ServerSidePlannedTable.tryCreate(spark, ident, table, isUnityCatalog).foreach { sspt => return sspt diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalogClient.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalogClient.scala new file mode 100644 index 00000000000..43af9ff31fc --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalogClient.scala @@ -0,0 +1,87 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.catalog + +import java.net.URI + +import org.apache.hadoop.fs.Path + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} +import org.apache.spark.sql.connector.catalog.{Identifier, Table} +import org.apache.spark.sql.delta.Snapshot + +/** + * Values returned by the UC Delta Rest Catalog API prepare-create step. + * + * @param location UC-chosen location where Delta should write the initial log. + * @param tableProperties properties added to the CatalogTable so the Delta commit uses the + * server-required protocol/features and UC table id. + * @param storageProperties Hadoop storage options, usually UC-vended credentials, added to the + * write options for the initial Delta commit. + */ +private[catalog] case class PreparedUCDeltaRestCatalogApiCreate( + location: URI, + tableProperties: Map[String, String], + storageProperties: Map[String, String]) + +/** + * Spark-facing Delta catalog API hook. + * + *
The interface is intentionally free of UC SDK and Hadoop credential dependencies so the shared
+ * catalog path does not depend on a specific UC client implementation.
+ */
+private[catalog] trait DeltaCatalogClient {
+ def loadTable(ident: Identifier): Option[Table]
+
+ def prepareCreateTable(
+ ident: Identifier,
+ tableType: CatalogTableType,
+ location: Option[URI]): Option[PreparedUCDeltaRestCatalogApiCreate]
+
+ def createTable(
+ ident: Identifier,
+ table: CatalogTable,
+ snapshot: Snapshot): Unit
+}
+
+private[delta] object DeltaCatalogClient {
+ private[catalog] val UCDeltaRestCatalogApiEnabledKey =
+ UCDeltaCatalogClient.UCDeltaRestCatalogApiEnabledKey
+ private[catalog] val RenewCredentialEnabledKey =
+ UCDeltaCatalogClient.RenewCredentialEnabledKey
+ private[catalog] val CredScopedFsEnabledKey =
+ UCDeltaCatalogClient.CredScopedFsEnabledKey
+
+ private[catalog] def deltaRestApiEnabledConf(catalogName: String): String = {
+ UCDeltaCatalogClient.deltaRestApiEnabledConf(catalogName)
+ }
+
+ private[catalog] def renewCredentialEnabledConf(catalogName: String): String = {
+ UCDeltaCatalogClient.renewCredentialEnabledConf(catalogName)
+ }
+
+ private[catalog] def credScopedFsEnabledConf(catalogName: String): String = {
+ UCDeltaCatalogClient.credScopedFsEnabledConf(catalogName)
+ }
+
+ private[delta] def pathCredentialOptions(
+ spark: SparkSession,
+ path: Path): Map[String, String] = {
+ UCDeltaCatalogClient.pathCredentialOptions(spark, path)
+ }
+}
diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala
index 9508477ad28..796ce96c633 100644
--- a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala
@@ -86,11 +86,19 @@ class DeltaTableV2 private(
PathInfo(new Path(catalogTable.get.location), Seq.empty, None)
} else {
val (rootPath, filters, timeTravel) =
- DeltaDataSource.parsePathIdentifier(spark, path.toString, options)
+ DeltaDataSource.parsePathIdentifier(spark, path.toString, pathBasedOptions)
PathInfo(rootPath, filters, timeTravel)
}
}
+ private lazy val pathBasedOptions: Map[String, String] = {
+ if (catalogTable.isDefined) {
+ options
+ } else {
+ DeltaCatalogClient.pathCredentialOptions(spark, path) ++ options
+ }
+ }
+
private def rootPath = pathInfo.rootPath
private def partitionFilters = pathInfo.partitionFilters
@@ -122,7 +130,7 @@ class DeltaTableV2 private(
}
fileSystemOptions ++ options
} else {
- options
+ pathBasedOptions
}
DeltaLog.forTable(
spark,
diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/UCDeltaCatalogClient.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/UCDeltaCatalogClient.scala
new file mode 100644
index 00000000000..dd5d40d1bc2
--- /dev/null
+++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/UCDeltaCatalogClient.scala
@@ -0,0 +1,558 @@
+/*
+ * Copyright (2026) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.delta.catalog
+
+import java.io.IOException
+import java.lang.reflect.InvocationTargetException
+import java.net.URI
+import java.util.{Locale, Map => JMap}
+
+import scala.collection.JavaConverters._
+
+import io.delta.storage.commit.actions.AbstractMetadata
+import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorClient.UC_TABLE_ID_KEY
+import io.delta.storage.commit.uccommitcoordinator.UCDeltaClient
+import io.delta.storage.commit.uccommitcoordinator.UCDeltaModels.{
+ CreateTableRequest,
+ DataSourceFormat => DeltaDataSourceFormat,
+ DeltaProtocol => UCDeltaRestCatalogApiProtocol,
+ StagingTableResponse,
+ TableType => DeltaTableType
+}
+import io.unitycatalog.client.ApiException
+import io.unitycatalog.client.auth.TokenProvider
+import io.unitycatalog.hadoop.UCCredentialHadoopConfs
+import io.unitycatalog.hadoop.UCCredentialHadoopConfs.{PathOperation, TableOperation}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils}
+import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, Table, TableCatalog, V1Table}
+import org.apache.spark.sql.delta.Snapshot
+import org.apache.spark.sql.delta.actions.Protocol
+import org.apache.spark.sql.delta.coordinatedcommits.{
+ UCCommitCoordinatorBuilder,
+ UCTokenBasedRestClientFactory
+}
+import org.apache.spark.sql.delta.sources.DeltaSourceUtils
+
+/**
+ * Spark-side client for using UC Delta Rest Catalog API responses inside `DeltaCatalog`.
+ *
+ * This class owns the UC-specific implementation behind the Spark-facing [[DeltaCatalogClient]]
+ * interface: choosing when to try the Delta API path, translating Delta API metadata into Spark
+ * catalog objects, fetching credentials for Spark/Hadoop, and returning `None` when
+ * `DeltaCatalog` should fall back to the legacy catalog path.
+ */
+private class UCDeltaCatalogClient private (
+ private val ucDeltaClient: Option[UCDeltaClient],
+ catalogName: String,
+ credentialContext: Option[UCDeltaRestCatalogApiCredentialContext]) extends DeltaCatalogClient {
+
+ import UCDeltaCatalogClient._
+
+ def loadTable(ident: Identifier): Option[Table] = {
+ ucDeltaClient match {
+ case Some(client) if isNamedTableIdentifier(ident) =>
+ val schemaName = ident.namespace().head
+ val tableName = ident.name()
+ val metadata = try {
+ client.loadTable(catalogName, schemaName, tableName)
+ } catch {
+ case e: IOException if isUnsupportedTableFormat(e) =>
+ return None
+ case e: IOException =>
+ throw translateLoadTableException(ident, e)
+ }
+ val location = reflectedString(metadata, "getLocation")
+ val locationUri = CatalogUtils.stringToURI(location)
+ Some(V1Table(buildCatalogTableFromUCDeltaMetadata(
+ ident,
+ metadata,
+ location,
+ locationUri)))
+ case _ =>
+ // UC Delta Rest Catalog API only supports catalog.schema.table identifiers for named
+ // tables.
+ None
+ }
+ }
+
+ /**
+ * Prepares a UC Delta Rest Catalog API-backed CREATE TABLE before Delta writes the initial log.
+ */
+ def prepareCreateTable(
+ ident: Identifier,
+ tableType: CatalogTableType,
+ location: Option[URI]): Option[PreparedUCDeltaRestCatalogApiCreate] = {
+ ucDeltaClient match {
+ case Some(client) if isNamedTableIdentifier(ident) =>
+ val schemaName = ident.namespace().head
+ val tableName = ident.name()
+ (tableType, location) match {
+ case (CatalogTableType.MANAGED, None) =>
+ val staging = client.createStagingTable(catalogName, schemaName, tableName)
+ val stagingLocation = CatalogUtils.stringToURI(staging.getLocation)
+ Some(PreparedUCDeltaRestCatalogApiCreate(
+ location = stagingLocation,
+ tableProperties = toTableProperties(staging),
+ storageProperties = buildHadoopCredentialPropertiesForTable(
+ staging.getLocation,
+ stagingLocation.getScheme,
+ schemaName,
+ tableName)))
+ case (CatalogTableType.EXTERNAL, Some(externalLocation))
+ if isCloudScheme(externalLocation.getScheme) =>
+ val locationText = externalLocation.toString
+ // External create must write the initial _delta_log, so READ fallback would be wrong.
+ Some(PreparedUCDeltaRestCatalogApiCreate(
+ location = externalLocation,
+ tableProperties = Map.empty,
+ storageProperties = buildHadoopCredentialPropertiesForPath(
+ locationText,
+ externalLocation.getScheme,
+ PathOperation.PATH_CREATE_TABLE,
+ credentialContext)))
+ case _ =>
+ None
+ }
+ case _ =>
+ // UC Delta Rest Catalog API only supports catalog.schema.table identifiers for create.
+ None
+ }
+ }
+
+ /**
+ * Finalizes a UC Delta Rest Catalog API-backed CREATE TABLE after Delta has written the
+ * initial log.
+ */
+ def createTable(
+ ident: Identifier,
+ table: CatalogTable,
+ snapshot: Snapshot): Unit = {
+ ucDeltaClient match {
+ case Some(client) if isNamedTableIdentifier(ident) =>
+ client.createTable(
+ catalogName,
+ ident.namespace().head,
+ toCreateTableRequest(ident, table, snapshot))
+ case _ =>
+ // Safety net: AbstractDeltaCatalog only calls this after prepareCreateTable returned Some.
+ throw new IllegalStateException(
+ s"UC Delta Rest Catalog API createTable is not available for $ident.")
+ }
+ }
+
+ private def translateLoadTableException(ident: Identifier, e: IOException): Throwable = {
+ e.getCause match {
+ case api: ApiException if api.getCode == 404 =>
+ new NoSuchTableException(ident)
+ case _ =>
+ e
+ }
+ }
+
+ /**
+ * UC Delta APIs use this explicit 501 response when a table exists in UC but cannot be served
+ * through the Delta endpoint, such as a metric view or another non-Delta table format.
+ */
+ private def isUnsupportedTableFormat(e: IOException): Boolean = e.getCause match {
+ case api: ApiException =>
+ api.getCode == 501 &&
+ Option(api.getResponseBody).exists(_.contains(UnsupportedTableFormatExceptionType))
+ case _ => false
+ }
+
+ private def isNamedTableIdentifier(ident: Identifier): Boolean = {
+ ident.namespace().length == 1 && !isDeltaPathIdentifier(ident)
+ }
+
+ private def isDeltaPathIdentifier(ident: Identifier): Boolean = {
+ try {
+ ident.namespace().length == 1 &&
+ DeltaSourceUtils.isDeltaDataSourceName(ident.namespace().head) &&
+ new Path(ident.name()).isAbsolute
+ } catch {
+ case _: IllegalArgumentException => false
+ }
+ }
+
+ /**
+ * Builds the Spark V1 catalog table returned from UC Delta Rest Catalog API metadata.
+ * The UC Delta response supplies the Spark table type, schema, provider, and storage metadata.
+ */
+ private def buildCatalogTableFromUCDeltaMetadata(
+ ident: Identifier,
+ metadata: AbstractMetadata,
+ location: String,
+ locationUri: URI): CatalogTable = {
+ val schemaName = ident.namespace().head
+ val tableName = ident.name()
+ CatalogTable(
+ identifier =
+ TableIdentifier(ident.name(), ident.namespace().lastOption, Some(catalogName)),
+ tableType = reflectedString(metadata, "getTableType") match {
+ case ManagedTableType =>
+ CatalogTableType.MANAGED
+ case ExternalTableType =>
+ CatalogTableType.EXTERNAL
+ case other =>
+ throw new IllegalArgumentException(
+ s"Unsupported UC Delta Rest Catalog API table type for " +
+ s"$catalogName.${ident.namespace().mkString(".")}.${ident.name()}: $other")
+ },
+ storage = CatalogStorageFormat.empty.copy(
+ locationUri = Some(locationUri),
+ properties = buildCatalogStorageProperties(
+ metadata,
+ location,
+ locationUri.getScheme,
+ schemaName,
+ tableName)),
+ schema = UCDeltaRestCatalogApiSchemaConverter.toSparkType(metadata.getSchemaString),
+ provider = Option(metadata.getProvider),
+ partitionColumnNames = Option(metadata.getPartitionColumns)
+ .map(_.asScala.toSeq)
+ .getOrElse(Nil))
+ }
+
+ /**
+ * Builds CatalogStorageFormat.properties for the Spark V1 table.
+ * V1Table later exposes these to Delta as option.* table properties.
+ */
+ private def buildCatalogStorageProperties(
+ metadata: AbstractMetadata,
+ location: String,
+ locationScheme: String,
+ schemaName: String,
+ tableName: String): Map[String, String] = {
+ val credentialProperties = buildHadoopCredentialPropertiesForTable(
+ location,
+ locationScheme,
+ schemaName,
+ tableName)
+ // V1Table exposes storage properties as option.* table properties. Keep UC Delta Rest Catalog
+ // API table features here so the Delta load path receives them with the same option.* shape as
+ // other storage-level UC properties, while CatalogTable.properties stays reserved for Spark
+ // metadata.
+ Option(metadata.getConfiguration).map(_.asScala.toMap).getOrElse(Map.empty) ++
+ credentialProperties
+ }
+
+ private def buildHadoopCredentialPropertiesForTable(
+ location: String,
+ locationScheme: String,
+ schemaName: String,
+ tableName: String): Map[String, String] = {
+ if (!isCloudScheme(locationScheme)) {
+ Map.empty[String, String]
+ } else {
+ val context = credentialContext.getOrElse {
+ throw new IllegalStateException(
+ "UC Delta Rest Catalog API credential context is missing for cloud location " + location)
+ }
+ val builder = UCCredentialHadoopConfs.builder(
+ context.uri,
+ locationScheme.toLowerCase(Locale.ROOT))
+ .tokenProvider(context.tokenProvider)
+ .enableCredentialRenewal(context.renewCredentialEnabled)
+ .enableCredentialScopedFs(context.credScopedFsEnabled)
+ .hadoopConf(context.hadoopConf)
+ .addAppVersions(context.appVersions)
+ try {
+ // Prefer READ_WRITE so a loaded table can be used for writes without reloading
+ // credentials; read-only principals fall back to READ below.
+ builder.buildForTable(
+ catalogName,
+ schemaName,
+ tableName,
+ TableOperation.READ_WRITE,
+ location)
+ .asScala
+ .toMap
+ } catch {
+ case e: ApiException if e.getCode == 401 || e.getCode == 403 =>
+ builder.buildForTable(catalogName, schemaName, tableName, TableOperation.READ, location)
+ .asScala
+ .toMap
+ }
+ }
+ }
+}
+
+private case class UCDeltaRestCatalogApiCredentialContext(
+ uri: String,
+ tokenProvider: TokenProvider,
+ renewCredentialEnabled: Boolean,
+ credScopedFsEnabled: Boolean,
+ hadoopConf: Configuration,
+ appVersions: JMap[String, String])
+
+private[catalog] object UCDeltaCatalogClient {
+ private[catalog] val UCDeltaRestCatalogApiEnabledKey = "deltaRestApi.enabled"
+ private[catalog] val RenewCredentialEnabledKey = "renewCredential.enabled"
+ private[catalog] val CredScopedFsEnabledKey = "credScopedFs.enabled"
+ private val ManagedTableType = "MANAGED"
+ private val ExternalTableType = "EXTERNAL"
+ private val UnsupportedTableFormatExceptionType = "UnsupportedTableFormatException"
+ private val DefaultCatalogConf = "spark.sql.defaultCatalog"
+ private val DefaultRenewCredentialEnabled = true
+ private val DefaultCredScopedFsEnabled = false
+ private val CloudSchemes = Set("s3", "s3a", "gs", "abfs", "abfss")
+
+ private[catalog] def deltaRestApiEnabledConf(catalogName: String): String = {
+ s"spark.sql.catalog.$catalogName.$UCDeltaRestCatalogApiEnabledKey"
+ }
+
+ private[catalog] def renewCredentialEnabledConf(catalogName: String): String = {
+ s"spark.sql.catalog.$catalogName.$RenewCredentialEnabledKey"
+ }
+
+ private[catalog] def credScopedFsEnabledConf(catalogName: String): String = {
+ s"spark.sql.catalog.$catalogName.$CredScopedFsEnabledKey"
+ }
+
+ private def isCloudScheme(scheme: String): Boolean = {
+ Option(scheme).exists(s => CloudSchemes.contains(s.toLowerCase(Locale.ROOT)))
+ }
+
+ /**
+ * Returns UC Delta Rest Catalog API path credential options for raw path-based Delta access.
+ *
+ * Path-based access has no catalog identifier, so this uses the UC Delta Rest Catalog API-enabled
+ * default catalog as the credential authority. If the session has no such default catalog, path
+ * reads keep their original options.
+ */
+ private[delta] def pathCredentialOptions(
+ spark: SparkSession,
+ path: Path): Map[String, String] = {
+ val location = path.toString
+ val locationScheme = path.toUri.getScheme
+ if (!isCloudScheme(locationScheme)) {
+ return Map.empty[String, String]
+ }
+
+ selectedUCDeltaRestCatalogApiConfigForPathCredentials(spark)
+ .map { context =>
+ try {
+ buildHadoopCredentialPropertiesForPath(
+ location,
+ locationScheme,
+ PathOperation.PATH_READ,
+ context)
+ } catch {
+ case e: ApiException if e.getCode == 404 =>
+ Map.empty[String, String]
+ }
+ }
+ .getOrElse(Map.empty[String, String])
+ }
+
+ private def selectedUCDeltaRestCatalogApiConfigForPathCredentials(
+ spark: SparkSession): Option[UCDeltaRestCatalogApiCredentialContext] = {
+ spark.conf.getOption(DefaultCatalogConf)
+ .filter(_.nonEmpty)
+ .filter(catalogName =>
+ spark.conf.get(deltaRestApiEnabledConf(catalogName), "false").toBoolean)
+ .flatMap(catalogName => ucDeltaRestCatalogApiCredentialContext(spark, catalogName))
+ }
+
+ private def reflectedString(metadata: AbstractMetadata, methodName: String): String = {
+ try {
+ metadata.getClass.getMethod(methodName).invoke(metadata).asInstanceOf[String]
+ } catch {
+ case e: NoSuchMethodException =>
+ throw new IllegalStateException(
+ s"UC Delta metadata is missing required method $methodName.", e)
+ case e: InvocationTargetException =>
+ e.getCause match {
+ case runtime: RuntimeException => throw runtime
+ case error: Error => throw error
+ case cause => throw new IllegalStateException(
+ s"Failed to read $methodName from UC Delta metadata.", cause)
+ }
+ }
+ }
+
+ private def ucDeltaRestCatalogApiCredentialContext(
+ spark: SparkSession,
+ catalogName: String): Option[UCDeltaRestCatalogApiCredentialContext] = {
+ if (!spark.conf.get(deltaRestApiEnabledConf(catalogName), "false").toBoolean) {
+ return None
+ }
+
+ val (_, uri, authConfig) = UCCommitCoordinatorBuilder.getCatalogConfigs(spark)
+ .collectFirst { case (`catalogName`, configuredUri, configuredAuthConfig) =>
+ (catalogName, configuredUri, configuredAuthConfig)
+ }
+ .getOrElse {
+ throw new IllegalArgumentException(
+ "UC Delta Rest Catalog API is enabled for catalog " +
+ s"$catalogName, but its Unity Catalog " +
+ "configuration is missing or incomplete.")
+ }
+ val tokenProvider = TokenProvider.create(authConfig.asJava)
+ // Catalog load has no DeltaLog yet, so pass the Spark session Hadoop conf to the UC
+ // credential builder.
+ // scalastyle:off deltahadoopconfiguration
+ val hadoopConf = spark.sessionState.newHadoopConf()
+ // scalastyle:on deltahadoopconfiguration
+ Some(UCDeltaRestCatalogApiCredentialContext(
+ uri.toString,
+ tokenProvider,
+ spark.conf.get(
+ renewCredentialEnabledConf(catalogName),
+ DefaultRenewCredentialEnabled.toString).toBoolean,
+ spark.conf.get(
+ credScopedFsEnabledConf(catalogName),
+ DefaultCredScopedFsEnabled.toString).toBoolean,
+ hadoopConf,
+ UCTokenBasedRestClientFactory.defaultAppVersionsAsJava))
+ }
+
+ private def buildHadoopCredentialPropertiesForPath(
+ location: String,
+ locationScheme: String,
+ pathOperation: PathOperation,
+ credentialContext: UCDeltaRestCatalogApiCredentialContext): Map[String, String] = {
+ UCCredentialHadoopConfs.builder(
+ credentialContext.uri,
+ locationScheme.toLowerCase(Locale.ROOT))
+ .tokenProvider(credentialContext.tokenProvider)
+ .addAppVersions(credentialContext.appVersions)
+ .enableCredentialRenewal(credentialContext.renewCredentialEnabled)
+ .enableCredentialScopedFs(credentialContext.credScopedFsEnabled)
+ .hadoopConf(credentialContext.hadoopConf)
+ .buildForPath(location, pathOperation)
+ .asScala
+ .toMap
+ }
+
+ private def buildHadoopCredentialPropertiesForPath(
+ location: String,
+ locationScheme: String,
+ pathOperation: PathOperation,
+ credentialContext: Option[UCDeltaRestCatalogApiCredentialContext]): Map[String, String] = {
+ val context = credentialContext.getOrElse {
+ throw new IllegalStateException(
+ "UC Delta Rest Catalog API credential context is missing for cloud path location " +
+ s"$location.")
+ }
+ buildHadoopCredentialPropertiesForPath(location, locationScheme, pathOperation, context)
+ }
+
+ private def toTableProperties(staging: StagingTableResponse): Map[String, String] = {
+ val stagingTableId = staging.getTableId.toString
+ val requiredProperties = Option(staging.getRequiredProperties)
+ .map(_.asScala.collect { case (key, value) if value != null => key -> value }.toMap)
+ .getOrElse(Map.empty)
+ requiredProperties.get(UC_TABLE_ID_KEY).foreach { requiredTableId =>
+ if (requiredTableId != stagingTableId) {
+ throw new IllegalArgumentException(
+ s"UC Delta Rest Catalog API staging response table id $stagingTableId does not match " +
+ s"required property $UC_TABLE_ID_KEY=$requiredTableId.")
+ }
+ }
+ // Later maps win so UC stays authoritative for table identity and managed-location markers.
+ protocolFeatureProperties(staging.getRequiredProtocol) ++
+ requiredProperties ++
+ Map(
+ TableCatalog.PROP_IS_MANAGED_LOCATION -> "true",
+ UC_TABLE_ID_KEY -> stagingTableId)
+ }
+
+ private def protocolFeatureProperties(
+ protocol: UCDeltaRestCatalogApiProtocol): Map[String, String] = {
+ Option(protocol).map { p =>
+ (Option(p.getReaderFeatures).map(_.asScala).getOrElse(Nil) ++
+ Option(p.getWriterFeatures).map(_.asScala).getOrElse(Nil))
+ .map(feature => s"delta.feature.$feature" -> "supported")
+ .toMap
+ }.getOrElse(Map.empty)
+ }
+
+ /**
+ * Builds the final UC Delta Rest Catalog API createTable request from the post-commit Delta
+ * state.
+ */
+ private def toCreateTableRequest(
+ ident: Identifier,
+ table: CatalogTable,
+ snapshot: Snapshot): CreateTableRequest = {
+ new CreateTableRequest()
+ .name(ident.name())
+ .location(table.storage.locationUri
+ .getOrElse {
+ throw new IllegalArgumentException(
+ "UC Delta Rest Catalog API createTable requires a location for " +
+ s"${ident.toString}.")
+ }
+ .toString)
+ .tableType(toDeltaTableType(table.tableType))
+ .dataSourceFormat(DeltaDataSourceFormat.DELTA)
+ .comment(table.comment.orNull)
+ .schemaString(snapshot.schema.json)
+ .partitionColumns(snapshot.metadata.partitionColumns.asJava)
+ .protocol(toDeltaProtocol(snapshot.protocol))
+ .properties(snapshot.metadata.configuration.asJava)
+ .lastCommitTimestampMs(snapshot.timestamp)
+ }
+
+ private def toDeltaTableType(tableType: CatalogTableType): DeltaTableType = tableType match {
+ case CatalogTableType.MANAGED => DeltaTableType.MANAGED
+ case CatalogTableType.EXTERNAL => DeltaTableType.EXTERNAL
+ case other =>
+ throw new IllegalArgumentException(
+ s"Unsupported UC Delta Rest Catalog API table type: $other")
+ }
+
+ private def toDeltaProtocol(protocol: Protocol): UCDeltaRestCatalogApiProtocol = {
+ new UCDeltaRestCatalogApiProtocol()
+ .minReaderVersion(protocol.minReaderVersion)
+ .minWriterVersion(protocol.minWriterVersion)
+ // Keep wire JSON deterministic even though Protocol stores features as sets.
+ .readerFeatures(protocol.readerFeatureNames.toSeq.sorted.asJava)
+ .writerFeatures(protocol.writerFeatureNames.toSeq.sorted.asJava)
+ }
+
+ def apply(delegatePlugin: CatalogPlugin, spark: SparkSession): UCDeltaCatalogClient = {
+ val catalogName = delegatePlugin.name()
+ val credentialContext = ucDeltaRestCatalogApiCredentialContext(spark, catalogName)
+ val ucDeltaClient = credentialContext.flatMap { context =>
+ UCTokenBasedRestClientFactory.createUCDeltaClient(
+ context.uri,
+ context.tokenProvider,
+ context.appVersions,
+ catalogName) match {
+ case Some(client) if client.supportsUCDeltaRestCatalogApi() =>
+ Some(client)
+ case Some(client) =>
+ client.close()
+ throw new IllegalArgumentException(
+ s"UC Delta Rest Catalog API is enabled for catalog $catalogName, but the Unity " +
+ "Catalog server does not support the required UC Delta Rest Catalog API endpoints.")
+ case None =>
+ None
+ }
+ }
+ new UCDeltaCatalogClient(ucDeltaClient, catalogName, credentialContext)
+ }
+
+}
diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/UCDeltaRestCatalogApiSchemaConverter.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/UCDeltaRestCatalogApiSchemaConverter.scala
new file mode 100644
index 00000000000..809ce876acc
--- /dev/null
+++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/UCDeltaRestCatalogApiSchemaConverter.scala
@@ -0,0 +1,29 @@
+/*
+ * Copyright (2026) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.delta.catalog
+
+import org.apache.spark.sql.types.{DataType, StructType}
+
+private[catalog] object UCDeltaRestCatalogApiSchemaConverter {
+
+ def toSparkType(schemaString: String): StructType = {
+ if (schemaString == null) {
+ throw new IllegalArgumentException("UC Delta Rest Catalog API table schema is missing.")
+ }
+ DataType.fromJson(schemaString).asInstanceOf[StructType]
+ }
+}
diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala
index fa40034cc4e..2d44f1641fc 100644
--- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala
@@ -67,8 +67,9 @@ import org.apache.spark.util.Utils
* @param output SQL output of the command
* @param protocol This is used to create a table with specific protocol version
* @param allowCatalogManaged This is used to create UC managed table with catalogManaged feature
- * @param createTableFunc If specified, call this function to create the table, instead of
- * Spark `SessionCatalog#createTable` which is backed by Hive Metastore.
+ * @param createTableFunc If specified, call this function with the post-commit snapshot to create
+ * the table, instead of Spark `SessionCatalog#createTable` which is backed
+ * by Hive Metastore.
*/
case class CreateDeltaTableCommand(
override val table: CatalogTable,
@@ -80,7 +81,7 @@ case class CreateDeltaTableCommand(
override val output: Seq[Attribute] = Nil,
protocol: Option[Protocol] = None,
override val allowCatalogManaged: Boolean = false,
- createTableFunc: Option[CatalogTable => Unit] = None)
+ createTableFunc: Option[(CatalogTable, Snapshot) => Unit] = None)
extends LeafRunnableCommand
with DeltaCommand
with DeltaLogging
diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableLike.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableLike.scala
index d0e38f95e65..512339e1ddc 100644
--- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableLike.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableLike.scala
@@ -91,14 +91,14 @@ trait CreateDeltaTableLike extends SQLConfHelper {
snapshot: Snapshot,
query: Option[LogicalPlan],
didNotChangeMetadata: Boolean,
- createTableFunc: Option[CatalogTable => Unit] = None
+ createTableFunc: Option[(CatalogTable, Snapshot) => Unit] = None
): Unit = {
val cleaned = cleanupTableDefinition(spark, table, snapshot)
operation match {
case _ if tableByPath => // do nothing with the metastore if this is by path
case TableCreationModes.Create =>
if (createTableFunc.isDefined) {
- createTableFunc.get.apply(cleaned)
+ createTableFunc.get.apply(cleaned, snapshot)
} else {
spark.sessionState.catalog.createTable(
cleaned,
@@ -120,7 +120,7 @@ trait CreateDeltaTableLike extends SQLConfHelper {
case Some(createFunc) =>
// This is the new missing-table path where creation is delegated through the V2
// catalog plugin (for example Unity Catalog) instead of SessionCatalog.createTable().
- createFunc(cleaned)
+ createFunc(cleaned, snapshot)
case None =>
spark.sessionState.catalog.createTable(
cleaned,
diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilder.scala b/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilder.scala
index 0f92e15f870..e15ac7cb692 100644
--- a/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilder.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilder.scala
@@ -23,7 +23,12 @@ import scala.collection.JavaConverters._
import scala.util.control.NonFatal
import io.delta.storage.commit.CommitCoordinatorClient
-import io.delta.storage.commit.uccommitcoordinator.{UCClient, UCCommitCoordinatorClient, UCTokenBasedRestClient}
+import io.delta.storage.commit.uccommitcoordinator.{
+ UCClient,
+ UCCommitCoordinatorClient,
+ UCDeltaClient,
+ UCTokenBasedRestClient
+}
import org.apache.spark.sql.delta.logging.DeltaLogKeys
import org.apache.spark.sql.delta.metering.DeltaLogging
@@ -31,6 +36,7 @@ import org.apache.spark.sql.delta.metering.DeltaLogging
import io.unitycatalog.client.auth.TokenProvider
import org.apache.spark.internal.MDC
import org.apache.spark.sql.SparkSession
+import org.apache.spark.util.Utils
/**
* Builder for Unity Catalog Commit Coordinator Clients.
@@ -291,6 +297,9 @@ trait UCClientFactory {
}
object UCTokenBasedRestClientFactory extends UCClientFactory {
+ private val UCDeltaTokenBasedRestClientClassName =
+ "io.delta.storage.commit.uccommitcoordinator.UCDeltaTokenBasedRestClient"
+
override def createUCClient(uri: String, authConfig: Map[String, String]): UCClient = {
createUCClientWithVersions(uri, authConfig, defaultAppVersions)
}
@@ -311,6 +320,27 @@ object UCTokenBasedRestClientFactory extends UCClientFactory {
new UCTokenBasedRestClient(uri, tokenProvider, appVersions.asJava)
}
+ private[delta] def createUCDeltaClient(
+ uri: String,
+ tokenProvider: TokenProvider,
+ appVersions: java.util.Map[String, String],
+ catalogName: String): Option[UCDeltaClient] = {
+ try {
+ val clientClass = Utils.classForName(UCDeltaTokenBasedRestClientClassName)
+ val constructor = clientClass.getConstructor(
+ classOf[String],
+ classOf[TokenProvider],
+ classOf[java.util.Map[String, String]],
+ classOf[String])
+ Some(constructor
+ .newInstance(uri, tokenProvider, appVersions, catalogName)
+ .asInstanceOf[UCDeltaClient])
+ } catch {
+ case _: ClassNotFoundException =>
+ None
+ }
+ }
+
private[coordinatedcommits] def defaultAppVersions: Map[String, String] = {
Map(
"Delta" -> io.delta.VERSION,
diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlannedTable.scala b/spark/src/main/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlannedTable.scala
index e6e0057fcaf..1da1ee756bd 100644
--- a/spark/src/main/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlannedTable.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlannedTable.scala
@@ -95,6 +95,10 @@ object ServerSidePlannedTable extends DeltaLogging {
// Check if we should enable server-side planning (for testing)
val enableServerSidePlanning =
spark.conf.get(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key, "false").toBoolean
+ if (!enableServerSidePlanning) {
+ return None
+ }
+
val hasTableCredentials = hasCredentials(table)
// Check if we should use server-side planning
diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala
index 96c222d1c85..d275d37b290 100644
--- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala
@@ -24,7 +24,7 @@ import scala.util.{Failure, Success, Try}
import com.databricks.spark.util.DatabricksLogging
import org.apache.spark.internal.MDC
import org.apache.spark.sql.delta._
-import org.apache.spark.sql.delta.catalog.DeltaTableV2
+import org.apache.spark.sql.delta.catalog.{DeltaCatalogClient, DeltaTableV2}
import org.apache.spark.sql.delta.commands.{
DeltaInsertReplaceOnOrUsingCommand,
InsertReplaceOnOrUsingAPIOrigin,
@@ -95,7 +95,12 @@ class DeltaDataSource
catalogTableOpt
.map(catalogTable => DeltaLog.forTableWithSnapshot(
sparkSession, catalogTable, options))
- .getOrElse(DeltaLog.forTableWithSnapshot(sparkSession, path, options))._2
+ .getOrElse {
+ DeltaLog.forTableWithSnapshot(
+ sparkSession,
+ path,
+ DeltaCatalogClient.pathCredentialOptions(sparkSession, path) ++ options)
+ }._2
}
def inferSchema: StructType = new StructType() // empty
diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCreateTableLikeSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCreateTableLikeSuite.scala
index 396822e2068..b6dcd717d7f 100644
--- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCreateTableLikeSuite.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCreateTableLikeSuite.scala
@@ -380,7 +380,7 @@ class DeltaCreateTableLikeSuite extends QueryTest
snapshot,
query = None,
didNotChangeMetadata = true,
- createTableFunc = Some((_: CatalogTable) => {
+ createTableFunc = Some((_: CatalogTable, _: Snapshot) => {
createCallbackCalls += 1
}))
}
diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/catalog/DeltaCatalogClientSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/catalog/DeltaCatalogClientSuite.scala
new file mode 100644
index 00000000000..ed3913a831a
--- /dev/null
+++ b/spark/src/test/scala/org/apache/spark/sql/delta/catalog/DeltaCatalogClientSuite.scala
@@ -0,0 +1,975 @@
+/*
+ * Copyright (2026) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.delta.catalog
+
+import java.io.IOException
+import java.net.{InetSocketAddress, URI, URLDecoder}
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+
+import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
+import com.sun.net.httpserver.{HttpExchange, HttpServer}
+import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorClient.UC_TABLE_ID_KEY
+import org.apache.hadoop.fs.Path
+
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.connector.catalog.{
+ Identifier,
+ Table,
+ TableCatalog,
+ TableChange,
+ V1Table
+}
+import org.apache.spark.sql.delta.{CatalogOwnedTableFeature, DeltaLog, DummySnapshot}
+import org.apache.spark.sql.delta.actions.{Metadata, Protocol}
+import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
+import org.apache.spark.sql.types.{ArrayType, IntegerType, LongType, StringType, StructType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class DeltaCatalogClientSuite
+ extends QueryTest
+ with DeltaSQLCommandTest
+ with BeforeAndAfterAll
+ with BeforeAndAfterEach {
+
+ private var server: HttpServer = _
+ private var serverUri: String = _
+ private var configHandler: HttpExchange => Unit = _
+ private var handler: HttpExchange => Unit = _
+ private var pathCredentialsHandler: HttpExchange => Unit = _
+ private var credentialRequestCount: Int = _
+ private val objectMapper = new ObjectMapper()
+
+ private val AwsVendedTokenProviderClass =
+ "io.unitycatalog.hadoop.internal.auth.AwsVendedTokenProvider"
+ private val S3ACredentialsProviderKey = "fs.s3a.aws.credentials.provider"
+ private val S3AInitAccessKey = "fs.s3a.init.access.key"
+ private val UCTableOperationKey = "fs.unitycatalog.table.operation"
+ private val UCCredentialsTypeKey = "fs.unitycatalog.credentials.type"
+ private val UCCredentialsTypePathValue = "path"
+ private val UCPathKey = "fs.unitycatalog.path"
+ private val UCPathOperationKey = "fs.unitycatalog.path.operation"
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ server = HttpServer.create(new InetSocketAddress("localhost", 0), 0)
+ server.createContext("/api/2.1/unity-catalog/delta/v1/config", exchange => {
+ try {
+ if (configHandler != null) {
+ configHandler(exchange)
+ } else {
+ sendJson(exchange, 200,
+ """{
+ | "endpoints": [
+ | "GET /v1/catalogs/{catalog}/schemas/{schema}/tables/{table}"
+ | ],
+ | "protocol-version": "1.0"
+ |}""".stripMargin)
+ }
+ } finally {
+ exchange.close()
+ }
+ })
+ server.createContext("/api/2.1/unity-catalog/delta/v1/catalogs", exchange => {
+ try {
+ if (handler != null) handler(exchange) else sendJson(exchange, 404, "{}")
+ } finally {
+ exchange.close()
+ }
+ })
+ server.createContext("/api/2.1/unity-catalog/temporary-path-credentials", exchange => {
+ try {
+ if (pathCredentialsHandler != null) {
+ pathCredentialsHandler(exchange)
+ } else {
+ sendJson(exchange, 404, "{}")
+ }
+ } finally {
+ exchange.close()
+ }
+ })
+ server.start()
+ serverUri = s"http://localhost:${server.getAddress.getPort}"
+ }
+
+ override def afterAll(): Unit = {
+ if (server != null) server.stop(0)
+ super.afterAll()
+ }
+
+ override def beforeEach(): Unit = {
+ super.beforeEach()
+ configHandler = null
+ handler = null
+ pathCredentialsHandler = null
+ credentialRequestCount = 0
+ }
+
+ test("loadTable skips credentials for local Delta locations") {
+ handler = exchange => exchange.getRequestURI.getPath match {
+ case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl" =>
+ sendJson(exchange, 200, loadTableResponseJson("file:/tmp/uc/table"))
+ case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl/credentials" =>
+ credentialRequestCount += 1
+ sendJson(exchange, 500, "{}")
+ case path =>
+ fail(s"Unexpected UC Delta Rest Catalog API request path: $path")
+ }
+
+ val table = loadWithUCDeltaRestCatalogApi()
+ val properties = table.catalogTable.storage.properties
+ val idMetadata = table.catalogTable.schema("id").metadata
+
+ assert(credentialRequestCount === 0)
+ assert(properties === Map(
+ "delta.feature.catalogManaged" -> "supported",
+ UC_TABLE_ID_KEY -> "11111111-1111-1111-1111-111111111111"))
+ assert(idMetadata.getLong("delta.columnMapping.id") === 1L)
+ assert(idMetadata.getString("delta.columnMapping.physicalName") === "col-123")
+ }
+
+ test("loadTable fails loudly when cloud credentials are empty") {
+ handler = exchange => exchange.getRequestURI.getPath match {
+ case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl" =>
+ sendJson(exchange, 200, loadTableResponseJson("s3://bucket/table"))
+ case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl/credentials" =>
+ credentialRequestCount += 1
+ sendJson(exchange, 200, """{"storage-credentials": []}""")
+ case path =>
+ fail(s"Unexpected UC Delta Rest Catalog API request path: $path")
+ }
+
+ val error = intercept[IllegalArgumentException] {
+ loadWithUCDeltaRestCatalogApi()
+ }
+
+ assert(credentialRequestCount === 1)
+ assert(error.getMessage.contains("no storage credentials"))
+ }
+
+ test("loadTable accepts trailing-slash cloud credential prefixes") {
+ handler = exchange => exchange.getRequestURI.getPath match {
+ case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl" =>
+ sendJson(exchange, 200, loadTableResponseJson("s3://bucket/path/to/table"))
+ case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl/credentials" =>
+ credentialRequestCount += 1
+ assert(exchange.getRequestURI.getQuery === "operation=READ_WRITE")
+ sendJson(exchange, 200, s3CredentialsResponseJson(
+ "s3://bucket/path/to/table/",
+ "READ_WRITE"))
+ case path =>
+ fail(s"Unexpected UC Delta Rest Catalog API request path: $path")
+ }
+
+ val table = loadWithUCDeltaRestCatalogApi()
+ val storageProperties = table.catalogTable.storage.properties
+ val tableProperties = table.properties.asScala
+
+ assert(credentialRequestCount === 1)
+ assert(storageProperties(S3ACredentialsProviderKey) ===
+ AwsVendedTokenProviderClass)
+ assert(storageProperties(S3AInitAccessKey) === "ak")
+ assert(tableProperties(
+ s"option.${S3ACredentialsProviderKey}") ===
+ AwsVendedTokenProviderClass)
+ assert(tableProperties(
+ s"option.${S3AInitAccessKey}") === "ak")
+ assert(storageProperties("delta.feature.catalogManaged") === "supported")
+ assert(!tableProperties.contains("delta.feature.catalogManaged"))
+ assert(!tableProperties.contains(
+ s"option.option.${S3AInitAccessKey}"))
+ }
+
+ test("loadTable falls back to READ credentials when READ_WRITE is denied") {
+ var credentialQueries = Seq.empty[String]
+ handler = exchange => exchange.getRequestURI.getPath match {
+ case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl" =>
+ sendJson(exchange, 200, loadTableResponseJson("s3://bucket/path/to/table"))
+ case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl/credentials" =>
+ credentialQueries :+= exchange.getRequestURI.getQuery
+ exchange.getRequestURI.getQuery match {
+ case "operation=READ_WRITE" =>
+ sendJson(exchange, 403, """{"error_code": "PERMISSION_DENIED"}""")
+ case "operation=READ" =>
+ sendJson(exchange, 200, s3CredentialsResponseJson("s3://bucket/path/to/table", "READ"))
+ case other =>
+ fail(s"Unexpected credential query: $other")
+ }
+ case path =>
+ fail(s"Unexpected UC Delta Rest Catalog API request path: $path")
+ }
+
+ val table = loadWithUCDeltaRestCatalogApi()
+
+ assert(credentialQueries === Seq("operation=READ_WRITE", "operation=READ"))
+ assert(table.catalogTable.storage.properties(
+ UCTableOperationKey) === "READ")
+ }
+
+ test("loadTable uses static credential properties when renewal is disabled") {
+ handler = exchange => exchange.getRequestURI.getPath match {
+ case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl" =>
+ sendJson(exchange, 200, loadTableResponseJson("s3://bucket/path/to/table"))
+ case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl/credentials" =>
+ credentialRequestCount += 1
+ sendJson(
+ exchange,
+ 200,
+ s3CredentialsResponseJson("s3://bucket/path/to/table", "READ_WRITE"))
+ case path =>
+ fail(s"Unexpected UC Delta Rest Catalog API request path: $path")
+ }
+
+ val table = withUCDeltaRestCatalogApiRenewalDisabled { catalog =>
+ catalog.loadTable(Identifier.of(Array("default"), "tbl")).get.asInstanceOf[V1Table]
+ }
+
+ assert(credentialRequestCount === 1)
+ assert(table.catalogTable.storage.properties("fs.s3a.access.key") === "ak")
+ assert(!table.catalogTable.storage.properties.contains(
+ S3ACredentialsProviderKey))
+ }
+
+ test("loadTable maps missing provider to None") {
+ handler = exchange => exchange.getRequestURI.getPath match {
+ case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl" =>
+ sendJson(
+ exchange,
+ 200,
+ loadTableResponseJson("file:/tmp/uc/table")
+ .replace("\"data-source-format\": \"DELTA\"", "\"data-source-format\": null"))
+ case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl/credentials" =>
+ fail("Unexpected credentials request for local path")
+ case path =>
+ fail(s"Unexpected UC Delta Rest Catalog API request path: $path")
+ }
+
+ val table = loadWithUCDeltaRestCatalogApi()
+
+ assert(table.catalogTable.provider.isEmpty)
+ }
+
+ test("loadTable falls back when UC Delta Rest Catalog API reports unsupported table format") {
+ handler = exchange => exchange.getRequestURI.getPath match {
+ case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl" =>
+ sendJson(exchange, 501,
+ """{
+ | "error": {
+ | "message": "Table exists but is not supported by the Delta endpoint.",
+ | "type": "UnsupportedTableFormatException",
+ | "code": 501
+ | }
+ |}""".stripMargin)
+ case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl/credentials" =>
+ fail("Unexpected credentials request after unsupported table format")
+ case path =>
+ fail(s"Unexpected UC Delta Rest Catalog API request path: $path")
+ }
+
+ val loaded = withUCDeltaRestCatalogApi { catalog =>
+ catalog.loadTable(Identifier.of(Array("default"), "tbl"))
+ }
+
+ assert(loaded.isEmpty)
+ }
+
+ test("loadTable propagates generic UC Delta Rest Catalog API 501 errors") {
+ handler = exchange => exchange.getRequestURI.getPath match {
+ case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl" =>
+ sendJson(exchange, 501,
+ """{
+ | "error": {
+ | "message": "Not implemented.",
+ | "type": "NotImplementedException",
+ | "code": 501
+ | }
+ |}""".stripMargin)
+ case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl/credentials" =>
+ fail("Unexpected credentials request after loadTable failure")
+ case path =>
+ fail(s"Unexpected UC Delta Rest Catalog API request path: $path")
+ }
+
+ val error = intercept[IOException] {
+ loadWithUCDeltaRestCatalogApi()
+ }
+
+ assert(error.getMessage.contains("Failed to load table uc.default.tbl"))
+ assert(error.getMessage.contains("HTTP 501"))
+ assert(error.getMessage.contains("NotImplementedException"))
+ }
+
+ test("loadTable propagates UC Delta Rest Catalog API server errors") {
+ handler = exchange => exchange.getRequestURI.getPath match {
+ case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl" =>
+ sendJson(exchange, 500, """{"error_code":"INTERNAL_ERROR"}""")
+ case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl/credentials" =>
+ fail("Unexpected credentials request after loadTable failure")
+ case path =>
+ fail(s"Unexpected UC Delta Rest Catalog API request path: $path")
+ }
+
+ val error = intercept[IOException] {
+ loadWithUCDeltaRestCatalogApi()
+ }
+
+ assert(error.getMessage.contains("Failed to load table uc.default.tbl"))
+ assert(error.getMessage.contains("HTTP 500"))
+ }
+
+ test("apply fails when UC Delta Rest Catalog API is enabled but unsupported") {
+ configHandler = exchange => sendJson(exchange, 200,
+ """{
+ | "endpoints": [],
+ | "protocol-version": "1.0"
+ |}""".stripMargin)
+ handler = exchange =>
+ fail(s"Unexpected UC Delta Rest Catalog API request path: ${exchange.getRequestURI.getPath}")
+
+ val error = intercept[IllegalArgumentException] {
+ withUCDeltaRestCatalogApi { catalog =>
+ catalog.loadTable(Identifier.of(Array("default"), "tbl"))
+ }
+ }
+
+ assert(error.getMessage.contains("UC Delta Rest Catalog API is enabled for catalog uc"))
+ assert(error.getMessage.contains(
+ "does not support the required UC Delta Rest Catalog API endpoints"))
+ }
+
+ test("loadTable does not probe UC Delta Rest Catalog API when disabled") {
+ configHandler = exchange =>
+ fail(s"Unexpected UC Delta Rest Catalog API config request: ${exchange.getRequestURI}")
+ handler = exchange =>
+ fail(s"Unexpected UC Delta Rest Catalog API request path: ${exchange.getRequestURI.getPath}")
+
+ withSQLConf(
+ "spark.sql.catalog.uc" -> "io.unitycatalog.spark.UCSingleCatalog",
+ "spark.sql.catalog.uc.uri" -> serverUri,
+ "spark.sql.catalog.uc.token" -> "mock-token") {
+ val catalog = UCDeltaCatalogClient(new TestDelegateCatalog, spark)
+ assert(catalog.loadTable(Identifier.of(Array("default"), "tbl")).isEmpty)
+ }
+ }
+
+ test("apply fails when UC Delta Rest Catalog API is enabled without UC config") {
+ withSQLConf(
+ "spark.sql.catalog.uc" -> "io.unitycatalog.spark.UCSingleCatalog",
+ UCDeltaCatalogClient.deltaRestApiEnabledConf("uc") -> "true") {
+ val error = intercept[IllegalArgumentException] {
+ UCDeltaCatalogClient(new TestDelegateCatalog, spark)
+ }
+ assert(error.getMessage.contains("configuration is missing or incomplete"))
+ }
+ }
+
+ test("loadTable skips UC Delta Rest Catalog API for delta path identifiers") {
+ handler = exchange =>
+ fail(s"Unexpected UC Delta Rest Catalog API table request: ${exchange.getRequestURI}")
+
+ withUCDeltaRestCatalogApi { catalog =>
+ assert(catalog.loadTable(
+ Identifier.of(Array("delta"), "s3://bucket/path/to/table")).isEmpty)
+ }
+ }
+
+ test(
+ "pathCredentialOptions returns UC Delta Rest Catalog API path credential properties " +
+ "for cloud paths") {
+ configHandler = exchange => {
+ assert(queryParams(exchange)("catalog") === "uc")
+ sendJson(exchange, 200,
+ """{
+ | "endpoints": [
+ | "GET /v1/catalogs/{catalog}/schemas/{schema}/tables/{table}"
+ | ],
+ | "protocol-version": "1.0"
+ |}""".stripMargin)
+ }
+ pathCredentialsHandler = exchange => {
+ assert(exchange.getRequestMethod === "POST")
+ assertJsonContains(exchange, Seq(
+ "\"url\":\"s3://bucket/path/to/table\"",
+ "\"operation\":\"PATH_READ\""))
+ sendJson(exchange, 200, s3TemporaryCredentialsResponseJson())
+ }
+
+ withSQLConf(
+ "spark.sql.catalog.uc" -> "io.unitycatalog.spark.UCSingleCatalog",
+ "spark.sql.catalog.uc.uri" -> serverUri,
+ "spark.sql.catalog.uc.token" -> "mock-token",
+ "spark.sql.defaultCatalog" -> "uc",
+ DeltaCatalogClient.deltaRestApiEnabledConf("uc") -> "true") {
+ val props = DeltaCatalogClient.pathCredentialOptions(
+ spark,
+ new Path("s3://bucket/path/to/table"))
+
+ assert(props(S3ACredentialsProviderKey) ===
+ AwsVendedTokenProviderClass)
+ assert(props(S3AInitAccessKey) === "ak")
+ assert(props(UCCredentialsTypeKey) === UCCredentialsTypePathValue)
+ assert(props(UCPathKey) ===
+ "s3://bucket/path/to/table")
+ assert(props(UCPathOperationKey) === "PATH_READ")
+ }
+ }
+
+ test("pathCredentialOptions returns empty when path credentials are unavailable") {
+ configHandler = exchange => {
+ assert(queryParams(exchange)("catalog") === "uc")
+ sendJson(exchange, 200,
+ """{
+ | "endpoints": [
+ | "GET /v1/catalogs/{catalog}/schemas/{schema}/tables/{table}"
+ | ],
+ | "protocol-version": "1.0"
+ |}""".stripMargin)
+ }
+ pathCredentialsHandler = exchange => sendJson(exchange, 404, "{}")
+
+ withSQLConf(
+ "spark.sql.catalog.uc" -> "io.unitycatalog.spark.UCSingleCatalog",
+ "spark.sql.catalog.uc.uri" -> serverUri,
+ "spark.sql.catalog.uc.token" -> "mock-token",
+ "spark.sql.defaultCatalog" -> "uc",
+ DeltaCatalogClient.deltaRestApiEnabledConf("uc") -> "true") {
+ val props = DeltaCatalogClient.pathCredentialOptions(
+ spark,
+ new Path("s3://bucket/path/to/table"))
+
+ assert(props.isEmpty)
+ }
+ }
+
+ test("pathCredentialOptions returns empty when path is not governed by UC") {
+ configHandler = exchange => {
+ assert(queryParams(exchange)("catalog") === "uc")
+ sendJson(exchange, 200,
+ """{
+ | "endpoints": [
+ | "GET /v1/catalogs/{catalog}/schemas/{schema}/tables/{table}"
+ | ],
+ | "protocol-version": "1.0"
+ |}""".stripMargin)
+ }
+ pathCredentialsHandler = exchange => {
+ assert(exchange.getRequestMethod === "POST")
+ assertJsonContains(exchange, Seq("\"url\":\"s3://other-bucket/path/to/table\""))
+ sendJson(exchange, 404, """{"error_code":"NOT_FOUND"}""")
+ }
+
+ withSQLConf(
+ "spark.sql.catalog.uc" -> "io.unitycatalog.spark.UCSingleCatalog",
+ "spark.sql.catalog.uc.uri" -> serverUri,
+ "spark.sql.catalog.uc.token" -> "mock-token",
+ "spark.sql.defaultCatalog" -> "uc",
+ DeltaCatalogClient.deltaRestApiEnabledConf("uc") -> "true") {
+ val props = DeltaCatalogClient.pathCredentialOptions(
+ spark,
+ new Path("s3://other-bucket/path/to/table"))
+
+ assert(props.isEmpty)
+ }
+ }
+
+ test(
+ "pathCredentialOptions returns empty when no UC Delta Rest Catalog API catalog is " +
+ "configured") {
+ configHandler = exchange =>
+ fail(s"Unexpected UC Delta Rest Catalog API config request: ${exchange.getRequestURI}")
+ pathCredentialsHandler = exchange =>
+ fail(s"Unexpected temporary path credentials request: ${exchange.getRequestURI}")
+
+ withSQLConf(
+ "spark.sql.catalog.uc" -> "io.unitycatalog.spark.UCSingleCatalog",
+ "spark.sql.catalog.uc.uri" -> serverUri,
+ "spark.sql.catalog.uc.token" -> "mock-token") {
+ val props = DeltaCatalogClient.pathCredentialOptions(
+ spark,
+ new Path("s3://bucket/path/to/table"))
+
+ assert(props.isEmpty)
+ }
+ }
+
+ test("prepareCreateTable uses UC Delta Rest Catalog API staging response for managed tables") {
+ handler = exchange => exchange.getRequestURI.getPath match {
+ case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/staging-tables" =>
+ sendJson(exchange, 200,
+ """{
+ | "table-id": "11111111-1111-1111-1111-111111111111",
+ | "table-type": "MANAGED",
+ | "location": "s3://bucket/table",
+ | "storage-credentials": [],
+ | "required-protocol": {
+ | "min-reader-version": 3,
+ | "min-writer-version": 7,
+ | "reader-features": ["catalogManaged"],
+ | "writer-features": ["catalogManaged"]
+ | },
+ | "required-properties": {
+ | "delta.enableDeletionVectors": "true",
+ | "io.unitycatalog.tableId": "11111111-1111-1111-1111-111111111111"
+ | }
+ |}""".stripMargin)
+ case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl/credentials" =>
+ credentialRequestCount += 1
+ assert(exchange.getRequestURI.getQuery === "operation=READ_WRITE")
+ sendJson(exchange, 200, s3CredentialsResponseJson("s3://bucket/table", "READ_WRITE"))
+ case path =>
+ fail(s"Unexpected UC Delta Rest Catalog API request path: $path")
+ }
+
+ val prepared = withUCDeltaRestCatalogApi { catalog =>
+ catalog.prepareCreateTable(
+ Identifier.of(Array("default"), "tbl"),
+ CatalogTableType.MANAGED,
+ location = None).get
+ }
+
+ assert(prepared.location.toString === "s3://bucket/table")
+ assert(prepared.tableProperties(TableCatalog.PROP_IS_MANAGED_LOCATION) === "true")
+ assert(prepared.tableProperties("delta.feature.catalogManaged") === "supported")
+ assert(prepared.tableProperties("delta.enableDeletionVectors") === "true")
+ assert(prepared.tableProperties("io.unitycatalog.tableId") ===
+ "11111111-1111-1111-1111-111111111111")
+ assert(credentialRequestCount === 1)
+ assert(prepared.storageProperties(S3ACredentialsProviderKey) ===
+ AwsVendedTokenProviderClass)
+ assert(prepared.storageProperties(S3AInitAccessKey) === "ak")
+ }
+
+ test("prepareCreateTable allows local managed staging without credentials") {
+ handler = exchange => exchange.getRequestURI.getPath match {
+ case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/staging-tables" =>
+ sendJson(exchange, 200,
+ """{
+ | "table-id": "11111111-1111-1111-1111-111111111111",
+ | "table-type": "MANAGED",
+ | "location": "/tmp/uc-managed-tables/default/tbl",
+ | "storage-credentials": [],
+ | "required-properties": {}
+ |}""".stripMargin)
+ case path =>
+ fail(s"Unexpected UC Delta Rest Catalog API request path: $path")
+ }
+
+ val prepared = withUCDeltaRestCatalogApi { catalog =>
+ catalog.prepareCreateTable(
+ Identifier.of(Array("default"), "tbl"),
+ CatalogTableType.MANAGED,
+ location = None).get
+ }
+
+ assert(prepared.location.toString === "/tmp/uc-managed-tables/default/tbl")
+ assert(prepared.storageProperties.isEmpty)
+ }
+
+ test("prepareCreateTable does not intercept unsupported create shapes") {
+ handler = exchange =>
+ fail(s"Unexpected UC Delta Rest Catalog API request path: ${exchange.getRequestURI.getPath}")
+
+ withUCDeltaRestCatalogApi { catalog =>
+ assert(catalog.prepareCreateTable(
+ Identifier.of(Array("default"), "tbl"),
+ CatalogTableType.MANAGED,
+ location = Some(new URI("file:/tmp/user-location"))).isEmpty)
+ assert(catalog.prepareCreateTable(
+ Identifier.of(Array("default"), "tbl"),
+ CatalogTableType.EXTERNAL,
+ location = None).isEmpty)
+ assert(catalog.prepareCreateTable(
+ Identifier.of(Array("default"), "tbl"),
+ CatalogTableType.EXTERNAL,
+ location = Some(new URI("file:/tmp/external"))).isEmpty)
+ assert(catalog.prepareCreateTable(
+ Identifier.of(Array("nested", "default"), "tbl"),
+ CatalogTableType.MANAGED,
+ location = None).isEmpty)
+ }
+ }
+
+ test("prepareCreateTable returns None when UC Delta Rest Catalog API is disabled") {
+ handler = exchange =>
+ fail(s"Unexpected UC Delta Rest Catalog API request path: ${exchange.getRequestURI.getPath}")
+
+ withSQLConf(
+ "spark.sql.catalog.uc" -> "io.unitycatalog.spark.UCSingleCatalog",
+ "spark.sql.catalog.uc.uri" -> serverUri,
+ "spark.sql.catalog.uc.token" -> "mock-token") {
+ val catalog = UCDeltaCatalogClient(new TestDelegateCatalog, spark)
+ assert(catalog.prepareCreateTable(
+ Identifier.of(Array("default"), "tbl"),
+ CatalogTableType.MANAGED,
+ location = None).isEmpty)
+ }
+ }
+
+ test("prepareCreateTable propagates staging errors and rejects table id mismatches") {
+ handler = exchange => exchange.getRequestURI.getPath match {
+ case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/staging-tables" =>
+ sendJson(exchange, 500, """{"error_code":"INTERNAL_ERROR"}""")
+ case path =>
+ fail(s"Unexpected UC Delta Rest Catalog API request path: $path")
+ }
+
+ val stagingError = intercept[IOException] {
+ withUCDeltaRestCatalogApi { catalog =>
+ catalog.prepareCreateTable(
+ Identifier.of(Array("default"), "tbl"),
+ CatalogTableType.MANAGED,
+ location = None)
+ }
+ }
+ assert(stagingError.getMessage.contains("Failed to create staging table"))
+ assert(stagingError.getMessage.contains("HTTP 500"))
+
+ handler = exchange => exchange.getRequestURI.getPath match {
+ case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/staging-tables" =>
+ sendJson(exchange, 200,
+ """{
+ | "table-id": "11111111-1111-1111-1111-111111111111",
+ | "table-type": "MANAGED",
+ | "location": "/tmp/uc-managed-tables/default/tbl",
+ | "storage-credentials": [],
+ | "required-properties": {
+ | "io.unitycatalog.tableId": "22222222-2222-2222-2222-222222222222"
+ | }
+ |}""".stripMargin)
+ case path =>
+ fail(s"Unexpected UC Delta Rest Catalog API request path: $path")
+ }
+
+ val tableIdError = intercept[IllegalArgumentException] {
+ withUCDeltaRestCatalogApi { catalog =>
+ catalog.prepareCreateTable(
+ Identifier.of(Array("default"), "tbl"),
+ CatalogTableType.MANAGED,
+ location = None)
+ }
+ }
+ assert(tableIdError.getMessage.contains("does not match"))
+ assert(tableIdError.getMessage.contains("io.unitycatalog.tableId"))
+ }
+
+ test("createTable fails when called without a prepared UC Delta Rest Catalog API create") {
+ val error = intercept[IllegalStateException] {
+ withSQLConf(
+ "spark.sql.catalog.uc" -> "io.unitycatalog.spark.UCSingleCatalog",
+ "spark.sql.catalog.uc.uri" -> serverUri,
+ "spark.sql.catalog.uc.token" -> "mock-token") {
+ val catalog = UCDeltaCatalogClient(new TestDelegateCatalog, spark)
+ catalog.createTable(Identifier.of(Array("default"), "tbl"), null, null)
+ }
+ }
+
+ assert(error.getMessage.contains("UC Delta Rest Catalog API createTable is not available"))
+ }
+
+ test("createTable posts Delta metadata to the expected UC namespace") {
+ var requestJson: JsonNode = null
+ var expectedLocation: String = null
+ val tableLocation = "file:/tmp/uc-created-table"
+ handler = exchange => exchange.getRequestURI.getPath match {
+ case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables" =>
+ assert(exchange.getRequestMethod === "POST")
+ requestJson = objectMapper.readTree(readRequestBody(exchange))
+ sendJson(exchange, 200, loadTableResponseJson(tableLocation))
+ case path =>
+ fail(s"Unexpected UC Delta Rest Catalog API request path: $path")
+ }
+
+ withTempDir { dir =>
+ val schema = new StructType()
+ .add("id", LongType)
+ .add("payload", new StructType()
+ .add("name", StringType, nullable = true)
+ .add("scores", ArrayType(IntegerType)))
+ .add("p", StringType)
+ val metadata = Metadata(
+ schemaString = schema.json,
+ partitionColumns = Seq("p"),
+ configuration = Map("user.prop" -> "kept"))
+ val tablePath = new Path(dir.getCanonicalPath)
+ expectedLocation = dir.toURI.toString
+ val snapshot = new DummySnapshot(
+ new Path(tablePath, "_delta_log"),
+ DeltaLog.forTable(spark, tablePath),
+ metadata,
+ Some(Protocol.forTableFeature(CatalogOwnedTableFeature)))
+ val table = CatalogTable(
+ identifier = TableIdentifier("tbl", Some("default"), Some("uc")),
+ tableType = CatalogTableType.MANAGED,
+ storage = CatalogStorageFormat.empty.copy(locationUri = Some(dir.toURI)),
+ schema = schema,
+ provider = Some("delta"),
+ partitionColumnNames = Seq("p"),
+ properties = Map(
+ TableCatalog.PROP_PROVIDER -> "delta",
+ TableCatalog.PROP_COMMENT -> "ignored-property-comment",
+ TableCatalog.PROP_LOCATION -> "ignored-location",
+ TableCatalog.PROP_IS_MANAGED_LOCATION -> "true",
+ "path" -> "ignored-path",
+ "option.path" -> "ignored-option-path"),
+ comment = Some("table comment"))
+
+ withUCDeltaRestCatalogApi { catalog =>
+ catalog.createTable(Identifier.of(Array("default"), "tbl"), table, snapshot)
+ }
+ }
+
+ assert(requestJson.get("name").asText === "tbl")
+ assert(requestJson.get("location").asText === expectedLocation)
+ assert(requestJson.get("table-type").asText === "MANAGED")
+ assert(requestJson.get("data-source-format").asText === "DELTA")
+ assert(requestJson.get("comment").asText === "table comment")
+
+ val protocol = requestJson.get("protocol")
+ assert(protocol.get("min-reader-version").asInt === 3)
+ assert(protocol.get("min-writer-version").asInt === 7)
+ assert(protocol.get("reader-features").elements().asScala.map(_.asText).toSeq ===
+ Seq("catalogManaged", "vacuumProtocolCheck"))
+ assert(protocol.get("writer-features").elements().asScala.map(_.asText).toSeq ===
+ Seq("catalogManaged", "inCommitTimestamp", "vacuumProtocolCheck"))
+
+ assert(requestJson.get("partition-columns").elements().asScala.map(_.asText).toSeq === Seq("p"))
+
+ val fields = requestJson.get("columns").get("fields")
+ assert(fields.size() === 3)
+ assert(fields.get(0).get("name").asText === "id")
+ assert(deltaTypeName(fields.get(0).get("type")) === "long")
+ assert(fields.get(1).get("name").asText === "payload")
+ assert(deltaTypeName(fields.get(1).get("type")) === "struct")
+ assert(deltaTypeName(fields.get(1).get("type").get("fields").get(1).get("type")) === "array")
+
+ val properties = requestJson.get("properties")
+ assert(properties.get("user.prop").asText === "kept")
+ Seq(
+ TableCatalog.PROP_PROVIDER,
+ TableCatalog.PROP_COMMENT,
+ TableCatalog.PROP_LOCATION,
+ TableCatalog.PROP_IS_MANAGED_LOCATION,
+ "path",
+ "option.path").foreach { key =>
+ assert(!properties.has(key), s"CreateTableRequest should not include $key")
+ }
+ }
+
+ test("prepareCreateTable uses temporary path credentials for external cloud tables") {
+ val location = "s3://bucket/external/tbl"
+ configHandler = exchange => {
+ assert(queryParams(exchange)("catalog") === "uc")
+ sendJson(exchange, 200,
+ """{
+ | "endpoints": [
+ | "GET /v1/catalogs/{catalog}/schemas/{schema}/tables/{table}",
+ | "GET /v1/catalogs/{catalog}/schemas/{schema}/tables/{table}/credentials",
+ | "GET /v1/temporary-path-credentials"
+ | ],
+ | "protocol-version": "1.0"
+ |}""".stripMargin)
+ }
+ pathCredentialsHandler = exchange => {
+ credentialRequestCount += 1
+ assert(exchange.getRequestMethod === "POST")
+ assertJsonContains(exchange, Seq(
+ s""""url":"$location"""",
+ """"operation":"PATH_CREATE_TABLE""""))
+ sendJson(exchange, 200, s3TemporaryCredentialsResponseJson())
+ }
+
+ val prepared = withUCDeltaRestCatalogApi { catalog =>
+ catalog.prepareCreateTable(
+ Identifier.of(Array("default"), "tbl"),
+ CatalogTableType.EXTERNAL,
+ location = Some(java.net.URI.create(location))).get
+ }
+
+ assert(credentialRequestCount === 1)
+ assert(prepared.location.toString === location)
+ assert(prepared.tableProperties.isEmpty)
+ assert(prepared.storageProperties(S3ACredentialsProviderKey) ===
+ AwsVendedTokenProviderClass)
+ assert(prepared.storageProperties(UCCredentialsTypeKey) === UCCredentialsTypePathValue)
+ assert(prepared.storageProperties(UCPathOperationKey) ===
+ "PATH_CREATE_TABLE")
+ assert(prepared.storageProperties(UCPathKey) === location)
+ }
+
+ private def loadWithUCDeltaRestCatalogApi(): V1Table = {
+ withUCDeltaRestCatalogApi { catalog =>
+ catalog.loadTable(Identifier.of(Array("default"), "tbl")).get.asInstanceOf[V1Table]
+ }
+ }
+
+ private def withUCDeltaRestCatalogApi[T](f: DeltaCatalogClient => T): T = {
+ withUCDeltaRestCatalogApi(new TestDelegateCatalog, renewCredentialEnabled = true)(f)
+ }
+
+ private def withUCDeltaRestCatalogApiRenewalDisabled[T](f: DeltaCatalogClient => T): T = {
+ withUCDeltaRestCatalogApi(new TestDelegateCatalog, renewCredentialEnabled = false)(f)
+ }
+
+ private def withUCDeltaRestCatalogApi[T](
+ delegate: TableCatalog)(
+ f: DeltaCatalogClient => T): T = {
+ withUCDeltaRestCatalogApi(delegate, renewCredentialEnabled = true)(f)
+ }
+
+ private def withUCDeltaRestCatalogApi[T](
+ delegate: TableCatalog,
+ renewCredentialEnabled: Boolean)(
+ f: DeltaCatalogClient => T): T = {
+ withSQLConf(
+ "spark.sql.catalog.uc" -> "io.unitycatalog.spark.UCSingleCatalog",
+ "spark.sql.catalog.uc.uri" -> serverUri,
+ "spark.sql.catalog.uc.token" -> "mock-token",
+ UCDeltaCatalogClient.renewCredentialEnabledConf("uc") -> renewCredentialEnabled.toString,
+ UCDeltaCatalogClient.deltaRestApiEnabledConf("uc") -> "true") {
+ val catalog = UCDeltaCatalogClient(delegate, spark)
+ f(catalog)
+ }
+ }
+
+ private def loadTableResponseJson(
+ location: String,
+ dataSourceFormat: String = "DELTA"): String =
+ s"""{
+ | "metadata": {
+ | "data-source-format": "$dataSourceFormat",
+ | "table-type": "MANAGED",
+ | "table-uuid": "11111111-1111-1111-1111-111111111111",
+ | "location": "$location",
+ | "columns": {
+ | "type": "struct",
+ | "fields": [
+ | {
+ | "name": "id",
+ | "type": "long",
+ | "nullable": false,
+ | "metadata": {
+ | "delta.columnMapping.id": 1,
+ | "delta.columnMapping.physicalName": "col-123"
+ | }
+ | }
+ | ]
+ | },
+ | "partition-columns": [],
+ | "properties": {
+ | "delta.feature.catalogManaged": "supported",
+ | "$UC_TABLE_ID_KEY": "11111111-1111-1111-1111-111111111111"
+ | }
+ | },
+ | "commits": []
+ |}""".stripMargin
+
+ private def s3CredentialsResponseJson(prefix: String, operation: String): String =
+ s"""{
+ | "storage-credentials": [
+ | {
+ | "prefix": "$prefix",
+ | "operation": "$operation",
+ | "config": {
+ | "s3.access-key-id": "ak",
+ | "s3.secret-access-key": "sk",
+ | "s3.session-token": "st"
+ | }
+ | }
+ | ]
+ |}""".stripMargin
+
+ private def s3TemporaryCredentialsResponseJson(): String =
+ """{
+ | "aws_temp_credentials": {
+ | "access_key_id": "ak",
+ | "secret_access_key": "sk",
+ | "session_token": "st"
+ | },
+ | "expiration_time": 1710000000000
+ |}""".stripMargin
+
+ private def assertJsonContains(exchange: HttpExchange, expectedSnippets: Seq[String]): Unit = {
+ val body = new String(exchange.getRequestBody.readAllBytes(), StandardCharsets.UTF_8)
+ .replaceAll("\\s+", "")
+ expectedSnippets.foreach { snippet =>
+ assert(body.contains(snippet), s"Expected request body $body to contain $snippet")
+ }
+ }
+
+ private def queryParams(exchange: HttpExchange): Map[String, String] = {
+ Option(exchange.getRequestURI.getRawQuery).toSeq
+ .flatMap(_.split("&"))
+ .filter(_.nonEmpty)
+ .map { kv =>
+ val pair = kv.split("=", 2)
+ val key = URLDecoder.decode(pair(0), StandardCharsets.UTF_8)
+ val value = if (pair.length == 2) {
+ URLDecoder.decode(pair(1), StandardCharsets.UTF_8)
+ } else {
+ ""
+ }
+ key -> value
+ }.toMap
+ }
+
+ private def readRequestBody(exchange: HttpExchange): String = {
+ val input = exchange.getRequestBody
+ try {
+ new String(input.readAllBytes(), StandardCharsets.UTF_8)
+ } finally {
+ input.close()
+ }
+ }
+
+ private def deltaTypeName(deltaType: JsonNode): String = {
+ if (deltaType.isTextual) deltaType.asText else deltaType.get("type").asText
+ }
+
+ private def sendJson(exchange: HttpExchange, status: Int, body: String): Unit = {
+ val bytes = body.getBytes(StandardCharsets.UTF_8)
+ exchange.getResponseHeaders.add("Content-Type", "application/json")
+ exchange.sendResponseHeaders(status, bytes.length)
+ exchange.getResponseBody.write(bytes)
+ exchange.getResponseBody.close()
+ }
+
+ private class TestDelegateCatalog extends TableCatalog {
+ override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {}
+ override def name(): String = "uc"
+ override def listTables(namespace: Array[String]): Array[Identifier] = Array.empty
+ override def loadTable(ident: Identifier): Table =
+ throw new IllegalStateException("unexpected loadTable call")
+ override def createTable(
+ ident: Identifier,
+ schema: StructType,
+ partitions: Array[org.apache.spark.sql.connector.expressions.Transform],
+ properties: java.util.Map[String, String]): Table =
+ throw new UnsupportedOperationException("not needed in this test")
+ override def alterTable(ident: Identifier, changes: TableChange*): Table =
+ throw new UnsupportedOperationException("not needed in this test")
+ override def dropTable(ident: Identifier): Boolean =
+ throw new UnsupportedOperationException("not needed in this test")
+ override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit =
+ throw new UnsupportedOperationException("not needed in this test")
+ }
+
+}
diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/catalog/UCDeltaRestCatalogApiSchemaConverterSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/catalog/UCDeltaRestCatalogApiSchemaConverterSuite.scala
new file mode 100644
index 00000000000..e115e8d5856
--- /dev/null
+++ b/spark/src/test/scala/org/apache/spark/sql/delta/catalog/UCDeltaRestCatalogApiSchemaConverterSuite.scala
@@ -0,0 +1,63 @@
+/*
+ * Copyright (2026) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.delta.catalog
+
+import org.scalatest.funsuite.AnyFunSuite
+
+import org.apache.spark.sql.types.{
+ ArrayType,
+ BooleanType,
+ DecimalType,
+ IntegerType,
+ LongType,
+ MapType,
+ MetadataBuilder,
+ StringType,
+ StructField,
+ StructType
+}
+
+class UCDeltaRestCatalogApiSchemaConverterSuite extends AnyFunSuite {
+
+ test("converts Delta schema JSON to Spark schema") {
+ val fieldMetadata = new MetadataBuilder()
+ .putLong("delta.columnMapping.id", 1L)
+ .putString("delta.columnMapping.physicalName", "col-1")
+ .build()
+ val inputSchema = StructType(Seq(
+ StructField("id", LongType, nullable = false, fieldMetadata),
+ StructField("amount", DecimalType(10, 2)),
+ StructField("values", ArrayType(IntegerType, containsNull = true)),
+ StructField("tags", MapType(StringType, BooleanType, valueContainsNull = false)),
+ StructField("nested", StructType(Seq(StructField("name", StringType))))))
+
+ val schema = UCDeltaRestCatalogApiSchemaConverter.toSparkType(inputSchema.json)
+
+ assert(schema === inputSchema)
+ assert(!schema("id").nullable)
+ assert(schema("id").metadata.getLong("delta.columnMapping.id") === 1L)
+ assert(schema("id").metadata.getString("delta.columnMapping.physicalName") === "col-1")
+ }
+
+ test("rejects missing schema JSON") {
+ val e = intercept[IllegalArgumentException] {
+ UCDeltaRestCatalogApiSchemaConverter.toSparkType(null)
+ }
+
+ assert(e.getMessage === "UC Delta Rest Catalog API table schema is missing.")
+ }
+}
diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlannedTableSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlannedTableSuite.scala
index 231ea7493ed..dcf94676748 100644
--- a/spark/src/test/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlannedTableSuite.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlannedTableSuite.scala
@@ -17,9 +17,11 @@
package org.apache.spark.sql.delta.serverSidePlanning
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCapability}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.spark.sql.sources.{And, EqualTo, Filter, GreaterThan, LessThan}
+import org.apache.spark.sql.types.StructType
/**
* Tests for server-side planning with a mock client.
@@ -142,6 +144,33 @@ class ServerSidePlannedTableSuite extends QueryTest with DeltaSQLCommandTest {
s"Expected normal table but got ServerSidePlannedTable when config is disabled")
}
+ test("disabled server-side planning does not inspect table credentials") {
+ val throwingTable = new Table {
+ override def name(): String = "throwing_table"
+ override def schema(): StructType = new StructType()
+ override def properties(): java.util.Map[String, String] =
+ throw new IllegalStateException("properties should not be called")
+ override def capabilities(): java.util.Set[TableCapability] =
+ java.util.Collections.emptySet()
+ }
+
+ val originalConfig = spark.conf.getOption(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key)
+ spark.conf.set(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key, "false")
+ try {
+ val plannedTable = ServerSidePlannedTable.tryCreate(
+ spark,
+ Identifier.of(Array("db"), "throwing_table"),
+ throwingTable,
+ isUnityCatalog = true)
+ assert(plannedTable.isEmpty)
+ } finally {
+ originalConfig match {
+ case Some(value) => spark.conf.set(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key, value)
+ case None => spark.conf.unset(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key)
+ }
+ }
+ }
+
test("shouldUseServerSidePlanning() decision logic") {
// ============================================================
// Production mode: skipUCRequirementForTests = false
diff --git a/spark/unitycatalog/src/test/java/io/sparkuctest/S3CredentialFileSystem.java b/spark/unitycatalog/src/test/java/io/sparkuctest/S3CredentialFileSystem.java
index c4e6d9c1501..6b2dec556d9 100644
--- a/spark/unitycatalog/src/test/java/io/sparkuctest/S3CredentialFileSystem.java
+++ b/spark/unitycatalog/src/test/java/io/sparkuctest/S3CredentialFileSystem.java
@@ -149,14 +149,28 @@ private void assertCredentials() {
private synchronized AwsCredentialsProvider resolveProvider(Configuration conf) {
if (provider != null) return provider;
- String clazz = conf.get(S3A_CREDENTIALS_PROVIDER);
- if (clazz == null) return null;
- try {
- provider =
- (AwsCredentialsProvider)
- Class.forName(clazz).getConstructor(Configuration.class).newInstance(conf);
- } catch (Exception e) {
- throw new RuntimeException("Failed to instantiate credential provider: " + clazz, e);
+ String classes = conf.get(S3A_CREDENTIALS_PROVIDER);
+ if (classes == null) return null;
+
+ // S3A accepts a comma-separated provider chain. This fake filesystem only understands
+ // AWS SDK v2 providers; if none are present, assert static UC-vended Hadoop keys instead.
+ for (String clazz : classes.split(",")) {
+ String trimmed = clazz.trim();
+ if (trimmed.isEmpty()) continue;
+ try {
+ Class> candidate = Class.forName(trimmed);
+ if (!AwsCredentialsProvider.class.isAssignableFrom(candidate)) continue;
+ provider =
+ (AwsCredentialsProvider)
+ candidate.getConstructor(Configuration.class).newInstance(conf);
+ return provider;
+ } catch (ClassNotFoundException e) {
+ // Ignore providers that are valid for real S3A but absent from this test classpath.
+ } catch (NoSuchMethodException e) {
+ // Ignore AWS providers that real S3A can construct without a Hadoop Configuration.
+ } catch (ReflectiveOperationException e) {
+ throw new RuntimeException("Failed to instantiate credential provider: " + trimmed, e);
+ }
}
return provider;
}
diff --git a/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableCreationTest.java b/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableCreationTest.java
index bc819ecff22..cace5f3f48d 100644
--- a/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableCreationTest.java
+++ b/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableCreationTest.java
@@ -24,6 +24,7 @@
import io.unitycatalog.client.ApiException;
import io.unitycatalog.client.api.TablesApi;
import io.unitycatalog.client.model.ColumnInfo;
+import io.unitycatalog.client.model.ColumnTypeName;
import io.unitycatalog.client.model.DataSourceFormat;
import io.unitycatalog.client.model.TableInfo;
import java.util.ArrayList;
@@ -556,6 +557,9 @@ public void testTableWithComplexTypes(TableType tableType) throws Exception {
Map.of(),
null,
null);
+ if (tableType == TableType.MANAGED) {
+ assertComplexColumnMetadata(tableName);
+ }
// Verify data can be queried
check(
@@ -625,8 +629,7 @@ private void assertUCTableInfo(
String schemaName = uc.schemaName();
// Verify that properties are set on server. This can not be done by DESC EXTENDED.
- TablesApi tablesApi = new TablesApi(uc.createApiClient());
- TableInfo tableInfo = tablesApi.getTable(fullTableName, false, false);
+ TableInfo tableInfo = loadUCTableInfo(fullTableName);
assertThat(tableInfo.getCatalogName()).isEqualTo(catalogName);
assertThat(tableInfo.getName()).isEqualTo(parseTableName(fullTableName));
assertThat(tableInfo.getSchemaName()).isEqualTo(schemaName);
@@ -640,21 +643,22 @@ private void assertUCTableInfo(
List This keeps UC Delta Rest Catalog API operations separate from the legacy UC client.
+ * Implementations that do not support these APIs should use the default methods, which fail loudly.
*/
public interface UCDeltaClient extends UCClient {
/**
- * Loads a table's metadata from Unity Catalog.
- *
- * @param catalog the catalog name
- * @param schema the schema name
- * @param table the table name
- * @return the table's {@link AbstractMetadata}
- * @throws IOException on network or API errors
+ * Returns whether this client can use UC Delta Rest Catalog API.
+ */
+ default boolean supportsUCDeltaRestCatalogApi() {
+ return false;
+ }
+
+ /**
+ * Loads a Delta table from Unity Catalog through the UC Delta Rest Catalog API.
+ */
+ default AbstractMetadata loadTable(
+ String catalog,
+ String schema,
+ String table) throws IOException {
+ throw new UnsupportedOperationException(
+ "loadTable requires UC Delta Rest Catalog API support.");
+ }
+
+ /**
+ * Creates a Delta staging table in Unity Catalog through the UC Delta Rest Catalog API.
*/
- AbstractMetadata loadTable(String catalog, String schema, String table) throws IOException;
+ default StagingTableResponse createStagingTable(
+ String catalog,
+ String schema,
+ String table) throws IOException {
+ throw new UnsupportedOperationException(
+ "createStagingTable requires UC Delta Rest Catalog API support.");
+ }
/**
- * Reserves a staging slot for a new Delta table. The returned response contains the table ID,
- * storage location, and protocol/property requirements that the caller must honor when
- * finalizing the table with {@link #createTable}.
- *
- * @param catalog the catalog name
- * @param schema the schema name
- * @param table the table name
- * @return a {@link StagingTableInfo} with the reserved table details
- * @throws IOException on network or API errors
+ * Finalizes a staged Delta table in Unity Catalog through the UC Delta Rest Catalog API.
*/
- StagingTableInfo createStagingTable(String catalog, String schema, String table)
- throws IOException;
+ default AbstractMetadata createTable(
+ String catalog,
+ String schema,
+ CreateTableRequest request) throws IOException {
+ throw new UnsupportedOperationException(
+ "createTable requires UC Delta Rest Catalog API support.");
+ }
/**
- * Finalizes a previously staged Delta table, making it visible in the catalog.
- *
- * @param catalog the catalog name
- * @param schema the schema name
- * @param name the table name
- * @param location the storage location
- * @param tableType the table type (MANAGED or EXTERNAL), or {@code null}
- * @param comment the table comment, or {@code null}
- * @param partitionColumns the partition column names, or {@code null}
- * @param protocol the required Delta protocol, or {@code null}
- * @param properties the table properties, or {@code null}
- * @return the newly created table's {@link AbstractMetadata}
- * @throws IOException on network or API errors
+ * Updates a Delta table in Unity Catalog through the UC Delta Rest Catalog API.
*/
- AbstractMetadata createTable(
+ default AbstractMetadata updateTable(
String catalog,
String schema,
- String name,
- String location,
- UCDeltaModels.TableType tableType,
- String comment,
- List This client uses {@code io.unitycatalog.client.delta.api.TablesApi} for Delta-specific
- * table operations (load, create, update) and {@link MetastoresApi} for metastore queries.
- *
- * @see UCDeltaClient
+ * Token-based REST client implementation for UC Delta Rest Catalog API operations.
*/
-public class UCDeltaTokenBasedRestClient implements UCDeltaClient {
+public class UCDeltaTokenBasedRestClient
+ extends UCTokenBasedRestClient
+ implements UCDeltaClient {
+
+ private static final Logger LOG = LoggerFactory.getLogger(UCDeltaTokenBasedRestClient.class);
+ private static final ObjectMapper DELTA_TYPE_OBJECT_MAPPER =
+ JsonMapper.builder().serializationInclusion(JsonInclude.Include.NON_NULL).build()
+ .registerModule(new DeltaTypeModule());
+ private static final ObjectMapper DELTA_SCHEMA_OBJECT_MAPPER =
+ createDeltaSchemaObjectMapper();
+
+ private static ObjectMapper createDeltaSchemaObjectMapper() {
+ ObjectMapper mapper =
+ JsonMapper.builder().serializationInclusion(JsonInclude.Include.NON_NULL).build();
+ mapper.registerModule(new DeltaTypeModule());
+ mapper.addMixIn(ArrayType.class, CamelCaseArrayMixin.class);
+ mapper.addMixIn(MapType.class, CamelCaseMapMixin.class);
+ return mapper;
+ }
- private static final int HTTP_CONFLICT = 409;
- private static final int HTTP_NOT_FOUND = 404;
+ private abstract static class CamelCaseArrayMixin {
+ @JsonProperty("elementType")
+ abstract DeltaType getElementType();
- private static final Set This uses the UC Delta Rest Catalog API. Callers that need legacy UC loadTable behavior
+ * should use the existing catalog path instead of this API-only method.
+ */
@Override
public AbstractMetadata loadTable(
- String catalog, String schema, String table) throws IOException {
- ensureOpen();
- Objects.requireNonNull(catalog, "catalog must not be null");
- Objects.requireNonNull(schema, "schema must not be null");
- Objects.requireNonNull(table, "table must not be null");
+ String catalog,
+ String schema,
+ String table) throws IOException {
+ ensureUCDeltaRestCatalogApiSupported("loadTable");
+ Objects.requireNonNull(catalog, "catalog must not be null.");
+ Objects.requireNonNull(schema, "schema must not be null.");
+ Objects.requireNonNull(table, "table must not be null.");
try {
- LoadTableResponse response = deltaTablesApi.loadTable(catalog, schema, table);
- return new DeltaTableMetadata(table, response.getMetadata());
+ io.unitycatalog.client.delta.model.LoadTableResponse response =
+ deltaTablesApi.loadTable(catalog, schema, table);
+ if (response == null || response.getMetadata() == null) {
+ throw new IOException(
+ String.format(
+ "Malformed UC Delta Rest Catalog API loadTable response for table %s.%s.%s: "
+ + "missing table metadata.",
+ catalog,
+ schema,
+ table));
+ }
+ if (response.getMetadata().getColumns() == null) {
+ throw new IOException(
+ String.format(
+ "Malformed UC Delta Rest Catalog API loadTable response for table %s.%s.%s: "
+ + "missing table schema columns.",
+ catalog,
+ schema,
+ table));
+ }
+ return toTableMetadata(table, response.getMetadata());
} catch (ApiException e) {
throw new IOException(
- String.format("Failed to load table %s.%s.%s (HTTP %s): %s",
- catalog, schema, table, e.getCode(), e.getResponseBody()), e);
+ String.format(
+ "Failed to load table %s.%s.%s via UC Delta Rest Catalog API (HTTP %s): %s",
+ catalog,
+ schema,
+ table,
+ e.getCode(),
+ e.getResponseBody()),
+ e);
}
}
+ /**
+ * Creates a Delta staging table in Unity Catalog through the UC Delta Rest Catalog API.
+ */
@Override
- public UCDeltaModels.StagingTableInfo createStagingTable(
- String catalog, String schema, String table) throws IOException {
- ensureOpen();
- Objects.requireNonNull(catalog, "catalog must not be null");
- Objects.requireNonNull(schema, "schema must not be null");
- Objects.requireNonNull(table, "table must not be null");
+ public StagingTableResponse createStagingTable(
+ String catalog,
+ String schema,
+ String table) throws IOException {
+ ensureUCDeltaRestCatalogApiSupported("createStagingTable");
+ Objects.requireNonNull(catalog, "catalog must not be null.");
+ Objects.requireNonNull(schema, "schema must not be null.");
+ Objects.requireNonNull(table, "table must not be null.");
try {
- CreateStagingTableRequest request = new CreateStagingTableRequest().name(table);
- StagingTableResponse response =
- deltaTablesApi.createStagingTable(catalog, schema, request);
- return toStagingTableInfo(response);
+ return toStagingTableResponse(deltaTablesApi.createStagingTable(
+ catalog,
+ schema,
+ new CreateStagingTableRequest().name(table)));
} catch (ApiException e) {
throw new IOException(
- String.format("Failed to create staging table %s.%s.%s (HTTP %s): %s",
- catalog, schema, table, e.getCode(), e.getResponseBody()), e);
+ String.format(
+ "Failed to create staging table %s.%s.%s via UC Delta Rest Catalog API (HTTP %s): %s",
+ catalog,
+ schema,
+ table,
+ e.getCode(),
+ e.getResponseBody()),
+ e);
}
}
+ /**
+ * Finalizes a Delta table in Unity Catalog through the UC Delta Rest Catalog API.
+ */
@Override
public AbstractMetadata createTable(
String catalog,
String schema,
- String name,
- String location,
- UCDeltaModels.TableType tableType,
- String comment,
- List The UC SDK schema stays hidden behind this client implementation. Callers see the schema
+ * through Delta's storage-level schema JSON.
*/
- private static final class DeltaTableMetadata implements AbstractMetadata {
+ public static final class TableMetadataAdapter implements AbstractMetadata {
+ private final String tableName;
+ private final TableMetadata delegate;
+ private final String schemaString;
+
+ private TableMetadataAdapter(String tableName, TableMetadata delegate) {
+ this.tableName = Objects.requireNonNull(tableName, "tableName must not be null.");
+ this.delegate = Objects.requireNonNull(delegate, "delegate must not be null.");
+ this.schemaString = toSchemaString(
+ Objects.requireNonNull(delegate.getColumns(), "UC Delta table schema is missing."));
+ }
+
+ public String getTableType() {
+ return delegate.getTableType() == null ? null : delegate.getTableType().getValue();
+ }
- private final String name;
- private final TableMetadata m;
+ public UUID getTableUuid() {
+ return delegate.getTableUuid();
+ }
- DeltaTableMetadata(String name, TableMetadata m) {
- this.name = name;
- this.m = m;
+ public String getLocation() {
+ return delegate.getLocation();
}
@Override
public String getId() {
- return m.getTableUuid() != null ? m.getTableUuid().toString() : null;
+ return delegate.getTableUuid() == null ? null : delegate.getTableUuid().toString();
}
@Override
public String getName() {
- return name;
+ return tableName;
}
@Override
@@ -596,7 +718,9 @@ public String getDescription() {
@Override
public String getProvider() {
- return m.getDataSourceFormat() != null ? m.getDataSourceFormat().getValue() : null;
+ return delegate.getDataSourceFormat() == null
+ ? null
+ : delegate.getDataSourceFormat().getValue().toLowerCase(Locale.ROOT);
}
@Override
@@ -606,43 +730,38 @@ public Map