diff --git a/build.sbt b/build.sbt index 27cf4cacd61..831f2f9c5b7 100644 --- a/build.sbt +++ b/build.sbt @@ -340,8 +340,22 @@ lazy val sparkV1 = (project in file("spark")) "org.apache.spark" %% "spark-core" % sparkVersion.value % "provided", "org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "provided", // For DynamoDBCommitStore - "com.amazonaws" % "aws-java-sdk" % "1.12.262" % "provided", - + "com.amazonaws" % "aws-java-sdk" % "1.12.262" % "provided" + ) ++ { + if (unityCatalogVersion >= "0.5.0") { + Seq( + "io.unitycatalog" % "unitycatalog-hadoop" % unityCatalogVersion excludeAll( + ExclusionRule(organization = "org.openapitools"), + ExclusionRule(organization = "com.fasterxml.jackson.core"), + ExclusionRule(organization = "com.fasterxml.jackson.module"), + ExclusionRule(organization = "com.fasterxml.jackson.datatype"), + ExclusionRule(organization = "com.fasterxml.jackson.dataformat") + ) + ) + } else { + Seq.empty + } + } ++ Seq( // Test deps "org.scalatest" %% "scalatest" % scalaTestVersion % "test", "org.scalatestplus" %% "scalacheck-1-15" % "3.2.9.0" % "test", @@ -660,8 +674,22 @@ lazy val spark = (project in file("spark-unified")) "org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided", "org.apache.spark" %% "spark-core" % sparkVersion.value % "provided", "org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "provided", - "com.amazonaws" % "aws-java-sdk" % "1.12.262" % "provided", - + "com.amazonaws" % "aws-java-sdk" % "1.12.262" % "provided" + ) ++ { + if (unityCatalogVersion >= "0.5.0") { + Seq( + "io.unitycatalog" % "unitycatalog-hadoop" % unityCatalogVersion excludeAll( + ExclusionRule(organization = "org.openapitools"), + ExclusionRule(organization = "com.fasterxml.jackson.core"), + ExclusionRule(organization = "com.fasterxml.jackson.module"), + ExclusionRule(organization = "com.fasterxml.jackson.datatype"), + ExclusionRule(organization = "com.fasterxml.jackson.dataformat") + ) + ) + } else { + Seq.empty + } + } ++ Seq( "org.scalatest" %% "scalatest" % scalaTestVersion % "test", "org.scalatestplus" %% "scalacheck-1-15" % "3.2.9.0" % "test", "junit" % "junit" % "4.13.2" % "test", @@ -669,7 +697,17 @@ lazy val spark = (project in file("spark-unified")) "org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "test" classifier "tests", "org.apache.spark" %% "spark-core" % sparkVersion.value % "test" classifier "tests", "org.apache.spark" %% "spark-sql" % sparkVersion.value % "test" classifier "tests", - "org.apache.spark" %% "spark-hive" % sparkVersion.value % "test" classifier "tests", + "org.apache.spark" %% "spark-hive" % sparkVersion.value % "test" classifier "tests" + ) ++ { + if (unityCatalogVersion >= "0.5.0") { + Seq( + // unitycatalog-hadoop references the ABFS token-provider interface during classloading. + "org.apache.hadoop" % "hadoop-azure" % hadoopVersion % "test" + ) + } else { + Seq.empty + } + } ++ Seq( "org.mockito" % "mockito-inline" % "4.11.0" % "test", ), @@ -843,13 +881,19 @@ Global / ensurePinnedUnityCatalog := { val home = file(sys.props("user.home")) // Check both layouts: a restored sbt cache can pre-populate ivy alone, leaving m2 empty - // checking only ivy would silently skip the slow publish and break mvn-based consumers. - val ivy2Canary = home / ".ivy2" / "local" / "io.unitycatalog" / + val ivy2ClientCanary = home / ".ivy2" / "local" / "io.unitycatalog" / "unitycatalog-client" / unityCatalogVersion / "ivys" / "ivy.xml" - val m2Canary = home / ".m2" / "repository" / "io" / "unitycatalog" / + val m2ClientCanary = home / ".m2" / "repository" / "io" / "unitycatalog" / "unitycatalog-client" / unityCatalogVersion / s"unitycatalog-client-$unityCatalogVersion.pom" - if (!ivy2Canary.exists || !m2Canary.exists) { - publishPinnedUnityCatalog(log, ivy2Canary) + val ivy2HadoopCanary = home / ".ivy2" / "local" / "io.unitycatalog" / + "unitycatalog-hadoop" / unityCatalogVersion / "ivys" / "ivy.xml" + val m2HadoopCanary = home / ".m2" / "repository" / "io" / "unitycatalog" / + "unitycatalog-hadoop" / unityCatalogVersion / + s"unitycatalog-hadoop-$unityCatalogVersion.pom" + if (!Seq(ivy2ClientCanary, m2ClientCanary, ivy2HadoopCanary, m2HadoopCanary) + .forall(_.exists)) { + publishPinnedUnityCatalog(log, ivy2ClientCanary) } } } diff --git a/project/scripts/setup_unitycatalog_main.sh b/project/scripts/setup_unitycatalog_main.sh index d66896365a7..685dcf48b7c 100755 --- a/project/scripts/setup_unitycatalog_main.sh +++ b/project/scripts/setup_unitycatalog_main.sh @@ -89,7 +89,7 @@ if [[ "${1:-}" == "--print-version" ]]; then exit 0 fi -# Canonical Ivy + Maven artifact paths. Delta depends on all three UC modules; sbt resolves from +# Canonical Ivy + Maven artifact paths. Delta depends on these UC modules; sbt resolves from # ~/.ivy2/local, mvn (kernel-examples integration tests) resolves from ~/.m2/repository. If any # is missing in either layout we must re-publish. IVY_LOCAL="$HOME/.ivy2/local/io.unitycatalog" @@ -118,24 +118,26 @@ if [[ "$UC_FORCE" != "1" ]] && all_canaries_present; then exit 0 fi -echo ">>> Fetching Unity Catalog main from $UC_REPO" +echo ">>> Fetching Unity Catalog from $UC_REPO" rm -rf "$UC_DIR" mkdir -p "$UC_DIR" -# Fetch main's full history so we can run `git merge-base --is-ancestor` below to verify the -# pinned SHA is actually on main. UC's repo is small; full fetch of one branch is cheap. +# Fetch the target branch so we can verify the pinned SHA is reachable. git -C "$UC_DIR" init --quiet git -C "$UC_DIR" remote add origin "$UC_REPO" -git -C "$UC_DIR" fetch --quiet origin main +if [[ "$UC_REF" == "main" ]]; then + git -C "$UC_DIR" fetch --quiet origin main +else + git -C "$UC_DIR" fetch --quiet origin "$UC_PIN_SHA" +fi cd "$UC_DIR" -# Safety check: the pinned SHA must be reachable from UC main. Local `merge-base --is-ancestor` +# Safety check: the pinned SHA must be reachable from the fetched branch. Local `merge-base --is-ancestor` # on the history we just fetched - no GitHub API, no token needed. Only applies when UC_REF is # the pinned SHA; UC_REF=main is trivially on main. if [[ "$UC_REF" == "$UC_PIN_SHA" ]]; then - if ! git merge-base --is-ancestor "$UC_PIN_SHA" origin/main 2>/dev/null; then - echo "ERROR: UC_PIN_SHA=$UC_PIN_SHA is not reachable from unitycatalog/unitycatalog main." >&2 - echo " Pin must reference a commit on https://github.com/unitycatalog/unitycatalog/commits/main" >&2 + if ! git rev-parse --verify "$UC_PIN_SHA^{commit}" >/dev/null 2>&1; then + echo "ERROR: UC_PIN_SHA=$UC_PIN_SHA could not be fetched from $UC_REPO." >&2 exit 1 fi fi @@ -161,7 +163,7 @@ fi # coordinate. Applied as a persistent setting so it sticks across the two sbt invocations below. SET_VERSION_CMD="set ThisBuild / version := \"$UC_VERSION\"" -echo ">>> Building and publishing UC client + server to local Maven repo" +echo ">>> Building and publishing UC client + server + hadoop to local Maven repo" ./build/sbt \ "$SET_VERSION_CMD" \ "set client / Compile / packageDoc / publishArtifact := false" \ diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala index 86a0f9b4ee9..b5dacd69a33 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala @@ -83,6 +83,9 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension val spark = SparkSession.active + private lazy val deltaCatalogClient: Option[DeltaCatalogClient] = + Some(UCDeltaCatalogClient(delegate, spark)) + private lazy val isUnityCatalog: Boolean = { val delegateField = classOf[DelegatingCatalogExtension].getDeclaredField("delegate") delegateField.setAccessible(true) @@ -290,7 +293,7 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension "DeltaCatalog", "loadTable") { setVariantBlockingConfigIfUC() try { - val table = super.loadTable(ident) + val table = deltaCatalogClient.flatMap(_.loadTable(ident)).getOrElse(super.loadTable(ident)) ServerSidePlannedTable.tryCreate(spark, ident, table, isUnityCatalog).foreach { sspt => return sspt diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalogClient.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalogClient.scala new file mode 100644 index 00000000000..e495152a129 --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalogClient.scala @@ -0,0 +1,29 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.catalog + +import org.apache.spark.sql.connector.catalog.{Identifier, Table} + +/** + * Spark-facing Delta catalog API hook. + * + *

The interface is intentionally free of UC SDK and Hadoop credential dependencies so the shared + * catalog path does not depend on a specific UC client implementation. + */ +private[catalog] trait DeltaCatalogClient { + def loadTable(ident: Identifier): Option[Table] +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/UCDeltaCatalogClient.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/UCDeltaCatalogClient.scala new file mode 100644 index 00000000000..314eb811d57 --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/UCDeltaCatalogClient.scala @@ -0,0 +1,332 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.catalog + +import java.io.IOException +import java.lang.reflect.InvocationTargetException +import java.net.URI +import java.util.{Locale, Map => JMap} + +import scala.collection.JavaConverters._ + +import io.delta.storage.commit.actions.AbstractMetadata +import io.delta.storage.commit.uccommitcoordinator.UCDeltaClient +import io.unitycatalog.client.ApiException +import io.unitycatalog.client.auth.TokenProvider +import io.unitycatalog.hadoop.UCCredentialHadoopConfs +import io.unitycatalog.hadoop.UCCredentialHadoopConfs.TableOperation +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils} +import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, Table, V1Table} +import org.apache.spark.sql.delta.coordinatedcommits.{ + UCCommitCoordinatorBuilder, + UCTokenBasedRestClientFactory +} +import org.apache.spark.sql.delta.sources.DeltaSourceUtils + +/** + * Spark-side client for using UC Delta Rest Catalog API responses inside `DeltaCatalog`. + * + * This class owns the UC-specific implementation behind the Spark-facing [[DeltaCatalogClient]] + * interface: choosing when to try the Delta API path, translating Delta API metadata into Spark + * catalog objects, fetching credentials for Spark/Hadoop, and returning `None` when + * `DeltaCatalog` should fall back to the legacy catalog path. + */ +private class UCDeltaCatalogClient private ( + private val ucDeltaClient: Option[UCDeltaClient], + catalogName: String, + credentialContext: Option[UCDeltaRestCatalogApiCredentialContext]) extends DeltaCatalogClient { + + import UCDeltaCatalogClient._ + + def loadTable(ident: Identifier): Option[Table] = { + ucDeltaClient match { + case Some(client) if isNamedTableIdentifier(ident) => + val schemaName = ident.namespace().head + val tableName = ident.name() + val metadata = try { + client.loadTable(catalogName, schemaName, tableName) + } catch { + case e: IOException if isUnsupportedTableFormat(e) => + return None + case e: IOException => + throw translateLoadTableException(ident, e) + } + val location = reflectedString(metadata, "getLocation") + val locationUri = CatalogUtils.stringToURI(location) + Some(V1Table(buildCatalogTableFromUCDeltaMetadata( + ident, + metadata, + location, + locationUri))) + case _ => + // UC Delta Rest Catalog API only supports catalog.schema.table identifiers for named + // tables. + None + } + } + + private def translateLoadTableException(ident: Identifier, e: IOException): Throwable = { + e.getCause match { + case api: ApiException if api.getCode == 404 => + new NoSuchTableException(ident) + case _ => + e + } + } + + /** + * UC Delta APIs use this explicit 501 response when a table exists in UC but cannot be served + * through the Delta endpoint, such as a metric view or another non-Delta table format. + */ + private def isUnsupportedTableFormat(e: IOException): Boolean = e.getCause match { + case api: ApiException => + api.getCode == 501 && + Option(api.getResponseBody).exists(_.contains(UnsupportedTableFormatExceptionType)) + case _ => false + } + + private def isNamedTableIdentifier(ident: Identifier): Boolean = { + ident.namespace().length == 1 && !isDeltaPathIdentifier(ident) + } + + private def isDeltaPathIdentifier(ident: Identifier): Boolean = { + try { + ident.namespace().length == 1 && + DeltaSourceUtils.isDeltaDataSourceName(ident.namespace().head) && + new Path(ident.name()).isAbsolute + } catch { + case _: IllegalArgumentException => false + } + } + + /** + * Builds the Spark V1 catalog table returned from UC Delta Rest Catalog API metadata. + * The UC Delta response supplies the Spark table type, schema, provider, and storage metadata. + */ + private def buildCatalogTableFromUCDeltaMetadata( + ident: Identifier, + metadata: AbstractMetadata, + location: String, + locationUri: URI): CatalogTable = { + val schemaName = ident.namespace().head + val tableName = ident.name() + CatalogTable( + identifier = + TableIdentifier(ident.name(), ident.namespace().lastOption, Some(catalogName)), + tableType = reflectedString(metadata, "getTableType") match { + case ManagedTableType => + CatalogTableType.MANAGED + case ExternalTableType => + CatalogTableType.EXTERNAL + case other => + throw new IllegalArgumentException( + s"Unsupported UC Delta Rest Catalog API table type for " + + s"$catalogName.${ident.namespace().mkString(".")}.${ident.name()}: $other") + }, + storage = CatalogStorageFormat.empty.copy( + locationUri = Some(locationUri), + properties = buildCatalogStorageProperties( + metadata, + location, + locationUri.getScheme, + schemaName, + tableName)), + schema = UCDeltaRestCatalogApiSchemaConverter.toSparkType(metadata.getSchemaString), + provider = Option(metadata.getProvider), + partitionColumnNames = Option(metadata.getPartitionColumns) + .map(_.asScala.toSeq) + .getOrElse(Nil)) + } + + /** + * Builds CatalogStorageFormat.properties for the Spark V1 table. + * V1Table later exposes these to Delta as option.* table properties. + */ + private def buildCatalogStorageProperties( + metadata: AbstractMetadata, + location: String, + locationScheme: String, + schemaName: String, + tableName: String): Map[String, String] = { + val credentialProperties = buildHadoopCredentialPropertiesForTable( + location, + locationScheme, + schemaName, + tableName) + // V1Table exposes storage properties as option.* table properties. Keep UC Delta Rest Catalog + // API table features here so the Delta load path receives them with the same option.* shape as + // other storage-level UC properties, while CatalogTable.properties stays reserved for Spark + // metadata. + Option(metadata.getConfiguration).map(_.asScala.toMap).getOrElse(Map.empty) ++ + credentialProperties + } + + private def buildHadoopCredentialPropertiesForTable( + location: String, + locationScheme: String, + schemaName: String, + tableName: String): Map[String, String] = { + if (!isCloudScheme(locationScheme)) { + Map.empty[String, String] + } else { + val context = credentialContext.getOrElse { + throw new IllegalStateException( + "UC Delta Rest Catalog API credential context is missing for cloud location " + location) + } + val builder = UCCredentialHadoopConfs.builder( + context.uri, + locationScheme.toLowerCase(Locale.ROOT)) + .tokenProvider(context.tokenProvider) + .enableCredentialRenewal(context.renewCredentialEnabled) + .enableCredentialScopedFs(context.credScopedFsEnabled) + .hadoopConf(context.hadoopConf) + .addAppVersions(context.appVersions) + try { + // Prefer READ_WRITE so a loaded table can be used for writes without reloading + // credentials; read-only principals fall back to READ below. + builder.buildForTable( + catalogName, + schemaName, + tableName, + TableOperation.READ_WRITE, + location) + .asScala + .toMap + } catch { + case e: ApiException if e.getCode == 401 || e.getCode == 403 => + builder.buildForTable(catalogName, schemaName, tableName, TableOperation.READ, location) + .asScala + .toMap + } + } + } +} + +private case class UCDeltaRestCatalogApiCredentialContext( + uri: String, + tokenProvider: TokenProvider, + renewCredentialEnabled: Boolean, + credScopedFsEnabled: Boolean, + hadoopConf: Configuration, + appVersions: JMap[String, String]) + +private object UCDeltaCatalogClient { + private[catalog] val UCDeltaRestCatalogApiEnabledKey = "deltaRestApi.enabled" + private[catalog] val RenewCredentialEnabledKey = "renewCredential.enabled" + private[catalog] val CredScopedFsEnabledKey = "credScopedFs.enabled" + private val ManagedTableType = "MANAGED" + private val ExternalTableType = "EXTERNAL" + private val UnsupportedTableFormatExceptionType = "UnsupportedTableFormatException" + private val DefaultRenewCredentialEnabled = true + private val DefaultCredScopedFsEnabled = false + private val CloudSchemes = Set("s3", "s3a", "gs", "abfs", "abfss") + + private[catalog] def deltaRestApiEnabledConf(catalogName: String): String = { + s"spark.sql.catalog.$catalogName.$UCDeltaRestCatalogApiEnabledKey" + } + + private[catalog] def renewCredentialEnabledConf(catalogName: String): String = { + s"spark.sql.catalog.$catalogName.$RenewCredentialEnabledKey" + } + + private[catalog] def credScopedFsEnabledConf(catalogName: String): String = { + s"spark.sql.catalog.$catalogName.$CredScopedFsEnabledKey" + } + + private def isCloudScheme(scheme: String): Boolean = { + Option(scheme).exists(s => CloudSchemes.contains(s.toLowerCase(Locale.ROOT))) + } + + private def reflectedString(metadata: AbstractMetadata, methodName: String): String = { + try { + metadata.getClass.getMethod(methodName).invoke(metadata).asInstanceOf[String] + } catch { + case e: NoSuchMethodException => + throw new IllegalStateException( + s"UC Delta metadata is missing required method $methodName.", e) + case e: InvocationTargetException => + e.getCause match { + case runtime: RuntimeException => throw runtime + case error: Error => throw error + case cause => throw new IllegalStateException( + s"Failed to read $methodName from UC Delta metadata.", cause) + } + } + } + + def apply(delegatePlugin: CatalogPlugin, spark: SparkSession): UCDeltaCatalogClient = { + val catalogName = delegatePlugin.name() + var credentialContext = Option.empty[UCDeltaRestCatalogApiCredentialContext] + val ucDeltaClient = if (spark.conf + .get(deltaRestApiEnabledConf(catalogName), "false") + .toBoolean) { + val (_, uri, authConfig) = UCCommitCoordinatorBuilder.getCatalogConfigs(spark) + .collectFirst { case (`catalogName`, configuredUri, configuredAuthConfig) => + (catalogName, configuredUri, configuredAuthConfig) + } + .getOrElse { + throw new IllegalArgumentException( + "UC Delta Rest Catalog API is enabled for catalog " + + s"$catalogName, but its Unity Catalog " + + "configuration is missing or incomplete.") + } + val tokenProvider = TokenProvider.create(authConfig.asJava) + // Catalog load has no DeltaLog yet, so pass the Spark session Hadoop conf to the UC + // credential builder. + // scalastyle:off deltahadoopconfiguration + val hadoopConf = spark.sessionState.newHadoopConf() + // scalastyle:on deltahadoopconfiguration + val appVersions = UCTokenBasedRestClientFactory.defaultAppVersionsAsJava + credentialContext = Some(UCDeltaRestCatalogApiCredentialContext( + uri.toString, + tokenProvider, + spark.conf.get( + renewCredentialEnabledConf(catalogName), + DefaultRenewCredentialEnabled.toString).toBoolean, + spark.conf.get( + credScopedFsEnabledConf(catalogName), + DefaultCredScopedFsEnabled.toString).toBoolean, + hadoopConf, + appVersions)) + UCTokenBasedRestClientFactory.createUCDeltaClient( + uri, + tokenProvider, + appVersions, + catalogName) match { + case Some(client) if client.supportsUCDeltaRestCatalogApi() => + Some(client) + case Some(client) => + client.close() + throw new IllegalArgumentException( + s"UC Delta Rest Catalog API is enabled for catalog $catalogName, but the Unity " + + "Catalog server does not support the required UC Delta Rest Catalog API endpoints.") + case None => + None + } + } else { + None + } + new UCDeltaCatalogClient(ucDeltaClient, catalogName, credentialContext) + } + +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/UCDeltaRestCatalogApiSchemaConverter.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/UCDeltaRestCatalogApiSchemaConverter.scala new file mode 100644 index 00000000000..809ce876acc --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/UCDeltaRestCatalogApiSchemaConverter.scala @@ -0,0 +1,29 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.catalog + +import org.apache.spark.sql.types.{DataType, StructType} + +private[catalog] object UCDeltaRestCatalogApiSchemaConverter { + + def toSparkType(schemaString: String): StructType = { + if (schemaString == null) { + throw new IllegalArgumentException("UC Delta Rest Catalog API table schema is missing.") + } + DataType.fromJson(schemaString).asInstanceOf[StructType] + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilder.scala b/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilder.scala index 0f92e15f870..e15ac7cb692 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilder.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilder.scala @@ -23,7 +23,12 @@ import scala.collection.JavaConverters._ import scala.util.control.NonFatal import io.delta.storage.commit.CommitCoordinatorClient -import io.delta.storage.commit.uccommitcoordinator.{UCClient, UCCommitCoordinatorClient, UCTokenBasedRestClient} +import io.delta.storage.commit.uccommitcoordinator.{ + UCClient, + UCCommitCoordinatorClient, + UCDeltaClient, + UCTokenBasedRestClient +} import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.metering.DeltaLogging @@ -31,6 +36,7 @@ import org.apache.spark.sql.delta.metering.DeltaLogging import io.unitycatalog.client.auth.TokenProvider import org.apache.spark.internal.MDC import org.apache.spark.sql.SparkSession +import org.apache.spark.util.Utils /** * Builder for Unity Catalog Commit Coordinator Clients. @@ -291,6 +297,9 @@ trait UCClientFactory { } object UCTokenBasedRestClientFactory extends UCClientFactory { + private val UCDeltaTokenBasedRestClientClassName = + "io.delta.storage.commit.uccommitcoordinator.UCDeltaTokenBasedRestClient" + override def createUCClient(uri: String, authConfig: Map[String, String]): UCClient = { createUCClientWithVersions(uri, authConfig, defaultAppVersions) } @@ -311,6 +320,27 @@ object UCTokenBasedRestClientFactory extends UCClientFactory { new UCTokenBasedRestClient(uri, tokenProvider, appVersions.asJava) } + private[delta] def createUCDeltaClient( + uri: String, + tokenProvider: TokenProvider, + appVersions: java.util.Map[String, String], + catalogName: String): Option[UCDeltaClient] = { + try { + val clientClass = Utils.classForName(UCDeltaTokenBasedRestClientClassName) + val constructor = clientClass.getConstructor( + classOf[String], + classOf[TokenProvider], + classOf[java.util.Map[String, String]], + classOf[String]) + Some(constructor + .newInstance(uri, tokenProvider, appVersions, catalogName) + .asInstanceOf[UCDeltaClient]) + } catch { + case _: ClassNotFoundException => + None + } + } + private[coordinatedcommits] def defaultAppVersions: Map[String, String] = { Map( "Delta" -> io.delta.VERSION, diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/catalog/DeltaCatalogClientSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/catalog/DeltaCatalogClientSuite.scala new file mode 100644 index 00000000000..28fcb000f58 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/delta/catalog/DeltaCatalogClientSuite.scala @@ -0,0 +1,472 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.catalog + +import java.io.IOException +import java.net.InetSocketAddress +import java.nio.charset.StandardCharsets + +import scala.collection.JavaConverters._ + +import com.sun.net.httpserver.{HttpExchange, HttpServer} +import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorClient.UC_TABLE_ID_KEY + +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.connector.catalog.{ + Identifier, + Table, + TableCatalog, + TableChange, + V1Table +} +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class DeltaCatalogClientSuite + extends QueryTest + with DeltaSQLCommandTest + with BeforeAndAfterAll + with BeforeAndAfterEach { + + private var server: HttpServer = _ + private var serverUri: String = _ + private var configHandler: HttpExchange => Unit = _ + private var handler: HttpExchange => Unit = _ + private var credentialRequestCount: Int = _ + + private val AwsVendedTokenProviderClass = + "io.unitycatalog.hadoop.internal.auth.AwsVendedTokenProvider" + private val S3ACredentialsProviderKey = "fs.s3a.aws.credentials.provider" + private val S3AInitAccessKey = "fs.s3a.init.access.key" + private val UCTableOperationKey = "fs.unitycatalog.table.operation" + + override def beforeAll(): Unit = { + super.beforeAll() + server = HttpServer.create(new InetSocketAddress("localhost", 0), 0) + server.createContext("/api/2.1/unity-catalog/delta/v1/config", exchange => { + try { + if (configHandler != null) { + configHandler(exchange) + } else { + sendJson(exchange, 200, + """{ + | "endpoints": [ + | "GET /v1/catalogs/{catalog}/schemas/{schema}/tables/{table}", + | "GET /v1/catalogs/{catalog}/schemas/{schema}/tables/{table}/credentials" + | ], + | "protocol-version": "1.0" + |}""".stripMargin) + } + } finally { + exchange.close() + } + }) + server.createContext("/api/2.1/unity-catalog/delta/v1/catalogs", exchange => { + try { + if (handler != null) handler(exchange) else sendJson(exchange, 404, "{}") + } finally { + exchange.close() + } + }) + server.start() + serverUri = s"http://localhost:${server.getAddress.getPort}" + } + + override def afterAll(): Unit = { + if (server != null) server.stop(0) + super.afterAll() + } + + override def beforeEach(): Unit = { + super.beforeEach() + configHandler = null + handler = null + credentialRequestCount = 0 + } + + test("loadTable skips credentials for local Delta locations") { + handler = exchange => exchange.getRequestURI.getPath match { + case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl" => + sendJson(exchange, 200, loadTableResponseJson("file:/tmp/uc/table")) + case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl/credentials" => + credentialRequestCount += 1 + sendJson(exchange, 500, "{}") + case path => + fail(s"Unexpected UC Delta Rest Catalog API request path: $path") + } + + val table = loadWithUCDeltaRestCatalogApi() + val properties = table.catalogTable.storage.properties + val idMetadata = table.catalogTable.schema("id").metadata + + assert(credentialRequestCount === 0) + assert(properties === Map( + "delta.feature.catalogManaged" -> "supported", + UC_TABLE_ID_KEY -> "11111111-1111-1111-1111-111111111111")) + assert(idMetadata.getLong("delta.columnMapping.id") === 1L) + assert(idMetadata.getString("delta.columnMapping.physicalName") === "col-123") + } + + test("loadTable fails loudly when cloud credentials are empty") { + handler = exchange => exchange.getRequestURI.getPath match { + case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl" => + sendJson(exchange, 200, loadTableResponseJson("s3://bucket/table")) + case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl/credentials" => + credentialRequestCount += 1 + sendJson(exchange, 200, """{"storage-credentials": []}""") + case path => + fail(s"Unexpected UC Delta Rest Catalog API request path: $path") + } + + val error = intercept[IllegalArgumentException] { + loadWithUCDeltaRestCatalogApi() + } + + assert(credentialRequestCount === 1) + assert(error.getMessage.contains("no storage credentials")) + } + + test("loadTable accepts trailing-slash cloud credential prefixes") { + handler = exchange => exchange.getRequestURI.getPath match { + case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl" => + sendJson(exchange, 200, loadTableResponseJson("s3://bucket/path/to/table")) + case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl/credentials" => + credentialRequestCount += 1 + assert(exchange.getRequestURI.getQuery === "operation=READ_WRITE") + sendJson(exchange, 200, s3CredentialsResponseJson( + "s3://bucket/path/to/table/", + "READ_WRITE")) + case path => + fail(s"Unexpected UC Delta Rest Catalog API request path: $path") + } + + val table = loadWithUCDeltaRestCatalogApi() + val storageProperties = table.catalogTable.storage.properties + val tableProperties = table.properties.asScala + + assert(credentialRequestCount === 1) + assert(storageProperties(S3ACredentialsProviderKey) === + AwsVendedTokenProviderClass) + assert(storageProperties(S3AInitAccessKey) === "ak") + assert(tableProperties( + s"option.${S3ACredentialsProviderKey}") === + AwsVendedTokenProviderClass) + assert(tableProperties( + s"option.${S3AInitAccessKey}") === "ak") + assert(storageProperties("delta.feature.catalogManaged") === "supported") + assert(!tableProperties.contains("delta.feature.catalogManaged")) + assert(!tableProperties.contains( + s"option.option.${S3AInitAccessKey}")) + } + + test("loadTable falls back to READ credentials when READ_WRITE is denied") { + var credentialQueries = Seq.empty[String] + handler = exchange => exchange.getRequestURI.getPath match { + case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl" => + sendJson(exchange, 200, loadTableResponseJson("s3://bucket/path/to/table")) + case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl/credentials" => + credentialQueries :+= exchange.getRequestURI.getQuery + exchange.getRequestURI.getQuery match { + case "operation=READ_WRITE" => + sendJson(exchange, 403, """{"error_code": "PERMISSION_DENIED"}""") + case "operation=READ" => + sendJson(exchange, 200, s3CredentialsResponseJson("s3://bucket/path/to/table", "READ")) + case other => + fail(s"Unexpected credential query: $other") + } + case path => + fail(s"Unexpected UC Delta Rest Catalog API request path: $path") + } + + val table = loadWithUCDeltaRestCatalogApi() + + assert(credentialQueries === Seq("operation=READ_WRITE", "operation=READ")) + assert(table.catalogTable.storage.properties( + UCTableOperationKey) === "READ") + } + + test("loadTable uses static credential properties when renewal is disabled") { + handler = exchange => exchange.getRequestURI.getPath match { + case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl" => + sendJson(exchange, 200, loadTableResponseJson("s3://bucket/path/to/table")) + case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl/credentials" => + credentialRequestCount += 1 + sendJson( + exchange, + 200, + s3CredentialsResponseJson("s3://bucket/path/to/table", "READ_WRITE")) + case path => + fail(s"Unexpected UC Delta Rest Catalog API request path: $path") + } + + val table = withUCDeltaRestCatalogApiRenewalDisabled { catalog => + catalog.loadTable(Identifier.of(Array("default"), "tbl")).get.asInstanceOf[V1Table] + } + + assert(credentialRequestCount === 1) + assert(table.catalogTable.storage.properties("fs.s3a.access.key") === "ak") + assert(!table.catalogTable.storage.properties.contains( + S3ACredentialsProviderKey)) + } + + test("loadTable maps missing provider to None") { + handler = exchange => exchange.getRequestURI.getPath match { + case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl" => + sendJson( + exchange, + 200, + loadTableResponseJson("file:/tmp/uc/table") + .replace("\"data-source-format\": \"DELTA\"", "\"data-source-format\": null")) + case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl/credentials" => + fail("Unexpected credentials request for local path") + case path => + fail(s"Unexpected UC Delta Rest Catalog API request path: $path") + } + + val table = loadWithUCDeltaRestCatalogApi() + + assert(table.catalogTable.provider.isEmpty) + } + + test("loadTable falls back when UC Delta Rest Catalog API reports unsupported table format") { + handler = exchange => exchange.getRequestURI.getPath match { + case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl" => + sendJson(exchange, 501, + """{ + | "error": { + | "message": "Table exists but is not supported by the Delta endpoint.", + | "type": "UnsupportedTableFormatException", + | "code": 501 + | } + |}""".stripMargin) + case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl/credentials" => + fail("Unexpected credentials request after unsupported table format") + case path => + fail(s"Unexpected UC Delta Rest Catalog API request path: $path") + } + + val loaded = withUCDeltaRestCatalogApi { catalog => + catalog.loadTable(Identifier.of(Array("default"), "tbl")) + } + + assert(loaded.isEmpty) + } + + test("loadTable propagates generic UC Delta Rest Catalog API 501 errors") { + handler = exchange => exchange.getRequestURI.getPath match { + case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl" => + sendJson(exchange, 501, + """{ + | "error": { + | "message": "Not implemented.", + | "type": "NotImplementedException", + | "code": 501 + | } + |}""".stripMargin) + case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl/credentials" => + fail("Unexpected credentials request after loadTable failure") + case path => + fail(s"Unexpected UC Delta Rest Catalog API request path: $path") + } + + val error = intercept[IOException] { + loadWithUCDeltaRestCatalogApi() + } + + assert(error.getMessage.contains("Failed to load table uc.default.tbl")) + assert(error.getMessage.contains("HTTP 501")) + assert(error.getMessage.contains("NotImplementedException")) + } + + test("loadTable propagates UC Delta Rest Catalog API server errors") { + handler = exchange => exchange.getRequestURI.getPath match { + case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl" => + sendJson(exchange, 500, """{"error_code":"INTERNAL_ERROR"}""") + case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl/credentials" => + fail("Unexpected credentials request after loadTable failure") + case path => + fail(s"Unexpected UC Delta Rest Catalog API request path: $path") + } + + val error = intercept[IOException] { + loadWithUCDeltaRestCatalogApi() + } + + assert(error.getMessage.contains("Failed to load table uc.default.tbl")) + assert(error.getMessage.contains("HTTP 500")) + } + + test("apply fails when UC Delta Rest Catalog API is enabled but unsupported") { + configHandler = exchange => sendJson(exchange, 200, + """{ + | "endpoints": [], + | "protocol-version": "1.0" + |}""".stripMargin) + handler = exchange => + fail(s"Unexpected UC Delta Rest Catalog API request path: ${exchange.getRequestURI.getPath}") + + val error = intercept[IllegalArgumentException] { + withUCDeltaRestCatalogApi { catalog => + catalog.loadTable(Identifier.of(Array("default"), "tbl")) + } + } + + assert(error.getMessage.contains("UC Delta Rest Catalog API is enabled for catalog uc")) + assert(error.getMessage.contains( + "does not support the required UC Delta Rest Catalog API endpoints")) + } + + test("loadTable does not probe UC Delta Rest Catalog API when disabled") { + configHandler = exchange => + fail(s"Unexpected UC Delta Rest Catalog API config request: ${exchange.getRequestURI}") + handler = exchange => + fail(s"Unexpected UC Delta Rest Catalog API request path: ${exchange.getRequestURI.getPath}") + + withSQLConf( + "spark.sql.catalog.uc" -> "io.unitycatalog.spark.UCSingleCatalog", + "spark.sql.catalog.uc.uri" -> serverUri, + "spark.sql.catalog.uc.token" -> "mock-token") { + val catalog = UCDeltaCatalogClient(new TestDelegateCatalog, spark) + assert(catalog.loadTable(Identifier.of(Array("default"), "tbl")).isEmpty) + } + } + + test("apply fails when UC Delta Rest Catalog API is enabled without UC config") { + withSQLConf( + "spark.sql.catalog.uc" -> "io.unitycatalog.spark.UCSingleCatalog", + UCDeltaCatalogClient.deltaRestApiEnabledConf("uc") -> "true") { + val error = intercept[IllegalArgumentException] { + UCDeltaCatalogClient(new TestDelegateCatalog, spark) + } + assert(error.getMessage.contains("configuration is missing or incomplete")) + } + } + + private def loadWithUCDeltaRestCatalogApi(): V1Table = { + withUCDeltaRestCatalogApi { catalog => + catalog.loadTable(Identifier.of(Array("default"), "tbl")).get.asInstanceOf[V1Table] + } + } + + private def withUCDeltaRestCatalogApi[T](f: DeltaCatalogClient => T): T = { + withUCDeltaRestCatalogApi(new TestDelegateCatalog, renewCredentialEnabled = true)(f) + } + + private def withUCDeltaRestCatalogApiRenewalDisabled[T](f: DeltaCatalogClient => T): T = { + withUCDeltaRestCatalogApi(new TestDelegateCatalog, renewCredentialEnabled = false)(f) + } + + private def withUCDeltaRestCatalogApi[T]( + delegate: TableCatalog)( + f: DeltaCatalogClient => T): T = { + withUCDeltaRestCatalogApi(delegate, renewCredentialEnabled = true)(f) + } + + private def withUCDeltaRestCatalogApi[T]( + delegate: TableCatalog, + renewCredentialEnabled: Boolean)( + f: DeltaCatalogClient => T): T = { + withSQLConf( + "spark.sql.catalog.uc" -> "io.unitycatalog.spark.UCSingleCatalog", + "spark.sql.catalog.uc.uri" -> serverUri, + "spark.sql.catalog.uc.token" -> "mock-token", + UCDeltaCatalogClient.renewCredentialEnabledConf("uc") -> renewCredentialEnabled.toString, + UCDeltaCatalogClient.deltaRestApiEnabledConf("uc") -> "true") { + val catalog = UCDeltaCatalogClient(delegate, spark) + f(catalog) + } + } + + private def loadTableResponseJson( + location: String, + dataSourceFormat: String = "DELTA"): String = + s"""{ + | "metadata": { + | "data-source-format": "$dataSourceFormat", + | "table-type": "MANAGED", + | "table-uuid": "11111111-1111-1111-1111-111111111111", + | "location": "$location", + | "columns": { + | "type": "struct", + | "fields": [ + | { + | "name": "id", + | "type": "long", + | "nullable": false, + | "metadata": { + | "delta.columnMapping.id": 1, + | "delta.columnMapping.physicalName": "col-123" + | } + | } + | ] + | }, + | "partition-columns": [], + | "properties": { + | "delta.feature.catalogManaged": "supported", + | "$UC_TABLE_ID_KEY": "11111111-1111-1111-1111-111111111111" + | } + | }, + | "commits": [] + |}""".stripMargin + + private def s3CredentialsResponseJson(prefix: String, operation: String): String = + s"""{ + | "storage-credentials": [ + | { + | "prefix": "$prefix", + | "operation": "$operation", + | "config": { + | "s3.access-key-id": "ak", + | "s3.secret-access-key": "sk", + | "s3.session-token": "st" + | } + | } + | ] + |}""".stripMargin + + private def sendJson(exchange: HttpExchange, status: Int, body: String): Unit = { + val bytes = body.getBytes(StandardCharsets.UTF_8) + exchange.getResponseHeaders.add("Content-Type", "application/json") + exchange.sendResponseHeaders(status, bytes.length) + exchange.getResponseBody.write(bytes) + exchange.getResponseBody.close() + } + + private class TestDelegateCatalog extends TableCatalog { + override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {} + override def name(): String = "uc" + override def listTables(namespace: Array[String]): Array[Identifier] = Array.empty + override def loadTable(ident: Identifier): Table = + throw new IllegalStateException("unexpected loadTable call") + override def createTable( + ident: Identifier, + schema: StructType, + partitions: Array[org.apache.spark.sql.connector.expressions.Transform], + properties: java.util.Map[String, String]): Table = + throw new UnsupportedOperationException("not needed in this test") + override def alterTable(ident: Identifier, changes: TableChange*): Table = + throw new UnsupportedOperationException("not needed in this test") + override def dropTable(ident: Identifier): Boolean = + throw new UnsupportedOperationException("not needed in this test") + override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = + throw new UnsupportedOperationException("not needed in this test") + } + +} diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/catalog/UCDeltaRestCatalogApiSchemaConverterSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/catalog/UCDeltaRestCatalogApiSchemaConverterSuite.scala new file mode 100644 index 00000000000..e115e8d5856 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/delta/catalog/UCDeltaRestCatalogApiSchemaConverterSuite.scala @@ -0,0 +1,63 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.catalog + +import org.scalatest.funsuite.AnyFunSuite + +import org.apache.spark.sql.types.{ + ArrayType, + BooleanType, + DecimalType, + IntegerType, + LongType, + MapType, + MetadataBuilder, + StringType, + StructField, + StructType +} + +class UCDeltaRestCatalogApiSchemaConverterSuite extends AnyFunSuite { + + test("converts Delta schema JSON to Spark schema") { + val fieldMetadata = new MetadataBuilder() + .putLong("delta.columnMapping.id", 1L) + .putString("delta.columnMapping.physicalName", "col-1") + .build() + val inputSchema = StructType(Seq( + StructField("id", LongType, nullable = false, fieldMetadata), + StructField("amount", DecimalType(10, 2)), + StructField("values", ArrayType(IntegerType, containsNull = true)), + StructField("tags", MapType(StringType, BooleanType, valueContainsNull = false)), + StructField("nested", StructType(Seq(StructField("name", StringType)))))) + + val schema = UCDeltaRestCatalogApiSchemaConverter.toSparkType(inputSchema.json) + + assert(schema === inputSchema) + assert(!schema("id").nullable) + assert(schema("id").metadata.getLong("delta.columnMapping.id") === 1L) + assert(schema("id").metadata.getString("delta.columnMapping.physicalName") === "col-1") + } + + test("rejects missing schema JSON") { + val e = intercept[IllegalArgumentException] { + UCDeltaRestCatalogApiSchemaConverter.toSparkType(null) + } + + assert(e.getMessage === "UC Delta Rest Catalog API table schema is missing.") + } +} diff --git a/spark/unitycatalog/src/test/java/io/sparkuctest/S3CredentialFileSystem.java b/spark/unitycatalog/src/test/java/io/sparkuctest/S3CredentialFileSystem.java index c4e6d9c1501..6b2dec556d9 100644 --- a/spark/unitycatalog/src/test/java/io/sparkuctest/S3CredentialFileSystem.java +++ b/spark/unitycatalog/src/test/java/io/sparkuctest/S3CredentialFileSystem.java @@ -149,14 +149,28 @@ private void assertCredentials() { private synchronized AwsCredentialsProvider resolveProvider(Configuration conf) { if (provider != null) return provider; - String clazz = conf.get(S3A_CREDENTIALS_PROVIDER); - if (clazz == null) return null; - try { - provider = - (AwsCredentialsProvider) - Class.forName(clazz).getConstructor(Configuration.class).newInstance(conf); - } catch (Exception e) { - throw new RuntimeException("Failed to instantiate credential provider: " + clazz, e); + String classes = conf.get(S3A_CREDENTIALS_PROVIDER); + if (classes == null) return null; + + // S3A accepts a comma-separated provider chain. This fake filesystem only understands + // AWS SDK v2 providers; if none are present, assert static UC-vended Hadoop keys instead. + for (String clazz : classes.split(",")) { + String trimmed = clazz.trim(); + if (trimmed.isEmpty()) continue; + try { + Class candidate = Class.forName(trimmed); + if (!AwsCredentialsProvider.class.isAssignableFrom(candidate)) continue; + provider = + (AwsCredentialsProvider) + candidate.getConstructor(Configuration.class).newInstance(conf); + return provider; + } catch (ClassNotFoundException e) { + // Ignore providers that are valid for real S3A but absent from this test classpath. + } catch (NoSuchMethodException e) { + // Ignore AWS providers that real S3A can construct without a Hadoop Configuration. + } catch (ReflectiveOperationException e) { + throw new RuntimeException("Failed to instantiate credential provider: " + trimmed, e); + } } return provider; } diff --git a/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableIntegrationBaseTest.java b/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableIntegrationBaseTest.java index 0139e56c093..0b5cce98449 100644 --- a/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableIntegrationBaseTest.java +++ b/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableIntegrationBaseTest.java @@ -144,7 +144,10 @@ private SparkConf configureSparkWithUnityCatalog(SparkConf conf) { String catalogName = uc.catalogName(); return conf.set("spark.sql.catalog." + catalogName, "io.unitycatalog.spark.UCSingleCatalog") .set("spark.sql.catalog." + catalogName + ".uri", uc.serverUri()) - .set("spark.sql.catalog." + catalogName + ".token", uc.serverToken()); + .set("spark.sql.catalog." + catalogName + ".token", uc.serverToken()) + .set( + "spark.sql.catalog." + catalogName + ".deltaRestApi.enabled", + String.valueOf(isUCDeltaRestCatalogApiEnabled())); } /** Stop the SparkSession after all tests. */ diff --git a/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableReadTest.java b/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableReadTest.java index 66a9169ff35..a51d7535410 100644 --- a/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableReadTest.java +++ b/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableReadTest.java @@ -18,6 +18,7 @@ import java.util.List; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Assumptions; /** * Read operation test suite for Delta Table operations through Unity Catalog. @@ -123,6 +124,12 @@ public void testDeltaTableForPath(TableType tableType) throws Exception { "For managed tables, path-based access should fail"); } else { // For EXTERNAL tables, path-based access should work + // TODO: Enable remote external path reads after Delta wires DRC + // GET /delta/v1/temporary-path-credentials for delta.`path` access. + Assumptions.assumeFalse( + isUCRemoteConfigured(), + "Remote external delta.`path` reads require DRC temporary path credentials; " + + "this PR only wires DRC table credentials for named UC table reads."); S3CredentialFileSystem.credentialCheckEnabled = false; try { check( diff --git a/spark/unitycatalog/src/test/java/io/sparkuctest/UnityCatalogSupport.java b/spark/unitycatalog/src/test/java/io/sparkuctest/UnityCatalogSupport.java index c6633febd78..22a32975eaa 100644 --- a/spark/unitycatalog/src/test/java/io/sparkuctest/UnityCatalogSupport.java +++ b/spark/unitycatalog/src/test/java/io/sparkuctest/UnityCatalogSupport.java @@ -129,12 +129,19 @@ public ApiClient createApiClient() { public static final String UC_CATALOG_NAME = "UC_CATALOG_NAME"; public static final String UC_SCHEMA_NAME = "UC_SCHEMA_NAME"; public static final String UC_BASE_TABLE_LOCATION = "UC_BASE_TABLE_LOCATION"; + public static final String UC_DELTA_REST_CATALOG_API_ENABLED = + "UC_DELTA_REST_CATALOG_API_ENABLED"; protected static boolean isUCRemoteConfigured() { String ucRemote = System.getenv(UC_REMOTE); return ucRemote != null && ucRemote.equalsIgnoreCase("true"); } + protected static boolean isUCDeltaRestCatalogApiEnabled() { + String deltaRestApiEnabled = System.getenv(UC_DELTA_REST_CATALOG_API_ENABLED); + return deltaRestApiEnabled == null || deltaRestApiEnabled.equalsIgnoreCase("true"); + } + /** The Unity Catalog info instance for subclasses access */ private UnityCatalogInfo ucInfo = null; diff --git a/spark/unitycatalog/src/test/java/io/sparkuctest/UnityCatalogSupportTest.java b/spark/unitycatalog/src/test/java/io/sparkuctest/UnityCatalogSupportTest.java index 5ad795cae0d..cd1b62aaae5 100644 --- a/spark/unitycatalog/src/test/java/io/sparkuctest/UnityCatalogSupportTest.java +++ b/spark/unitycatalog/src/test/java/io/sparkuctest/UnityCatalogSupportTest.java @@ -18,6 +18,7 @@ import static io.sparkuctest.UnityCatalogSupport.UC_BASE_TABLE_LOCATION; import static io.sparkuctest.UnityCatalogSupport.UC_CATALOG_NAME; +import static io.sparkuctest.UnityCatalogSupport.UC_DELTA_REST_CATALOG_API_ENABLED; import static io.sparkuctest.UnityCatalogSupport.UC_REMOTE; import static io.sparkuctest.UnityCatalogSupport.UC_SCHEMA_NAME; import static io.sparkuctest.UnityCatalogSupport.UC_TOKEN; @@ -37,7 +38,13 @@ public class UnityCatalogSupportTest { private static final List ALL_ENVS = ImmutableList.of( - UC_REMOTE, UC_URI, UC_TOKEN, UC_CATALOG_NAME, UC_SCHEMA_NAME, UC_BASE_TABLE_LOCATION); + UC_REMOTE, + UC_URI, + UC_TOKEN, + UC_CATALOG_NAME, + UC_SCHEMA_NAME, + UC_BASE_TABLE_LOCATION, + UC_DELTA_REST_CATALOG_API_ENABLED); @Test public void testUnityCatalogInfo() throws Exception { @@ -176,6 +183,30 @@ public void testNoBaseTableLocation() throws Exception { }); } + @Test + public void testUCDeltaRestCatalogApiEnabledFromEnv() throws Exception { + withEnvTesting( + ImmutableMap.of(), + () -> { + TestingUCSupport uc = new TestingUCSupport(); + assertThat(uc.isUCDeltaRestCatalogApiEnabledForTest()).isTrue(); + }); + + withEnvTesting( + ImmutableMap.of(UC_DELTA_REST_CATALOG_API_ENABLED, "false"), + () -> { + TestingUCSupport uc = new TestingUCSupport(); + assertThat(uc.isUCDeltaRestCatalogApiEnabledForTest()).isFalse(); + }); + + withEnvTesting( + ImmutableMap.of(UC_DELTA_REST_CATALOG_API_ENABLED, "unexpected"), + () -> { + TestingUCSupport uc = new TestingUCSupport(); + assertThat(uc.isUCDeltaRestCatalogApiEnabledForTest()).isFalse(); + }); + } + public interface TestCall { void call() throws Exception; @@ -225,5 +256,9 @@ public UnityCatalogInfo accessUnityCatalogInfo() throws Exception { setupServer(); return unityCatalogInfo(); } + + public boolean isUCDeltaRestCatalogApiEnabledForTest() { + return isUCDeltaRestCatalogApiEnabled(); + } } } diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedApiClientProvider.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedApiClientProvider.java new file mode 100644 index 00000000000..37ac19c8719 --- /dev/null +++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedApiClientProvider.java @@ -0,0 +1,76 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.storage.commit.uccommitcoordinator; + +import io.unitycatalog.client.ApiClient; +import io.unitycatalog.client.ApiClientBuilder; +import io.unitycatalog.client.auth.TokenProvider; +import io.unitycatalog.client.retry.JitterDelayRetryPolicy; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +abstract class UCTokenBasedApiClientProvider { + private ApiClient apiClient; + + protected UCTokenBasedApiClientProvider( + String baseUri, + TokenProvider tokenProvider, + Map appVersions) { + this.apiClient = buildApiClient(baseUri, tokenProvider, appVersions); + } + + protected UCTokenBasedApiClientProvider( + String baseUri, + TokenProvider tokenProvider, + Map appVersions, + String catalog) { + // The catalog is consumed by subclasses that probe catalog-scoped Delta API support. + this(baseUri, tokenProvider, appVersions); + } + + private static ApiClient buildApiClient( + String baseUri, + TokenProvider tokenProvider, + Map appVersions) { + Objects.requireNonNull(baseUri, "baseUri must not be null"); + Objects.requireNonNull(tokenProvider, "tokenProvider must not be null"); + Objects.requireNonNull(appVersions, "appVersions must not be null"); + + ApiClientBuilder builder = ApiClientBuilder.create() + .uri(baseUri) + .tokenProvider(tokenProvider) + .retryPolicy(JitterDelayRetryPolicy.builder().build()); + + appVersions.forEach((name, version) -> { + if (version != null) { + builder.addAppVersion(name, version); + } + }); + + return builder.build(); + } + + protected ApiClient getApiClient() { + return apiClient; + } + + public void close() throws IOException { + apiClient = null; + } +} diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClient.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClient.java index 67b582a1b00..a60f9d67290 100644 --- a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClient.java +++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClient.java @@ -26,7 +26,6 @@ import io.delta.storage.commit.uniform.IcebergMetadata; import io.delta.storage.commit.uniform.UniformMetadata; import io.unitycatalog.client.ApiClient; -import io.unitycatalog.client.ApiClientBuilder; import io.unitycatalog.client.ApiException; import io.unitycatalog.client.api.DeltaCommitsApi; import io.unitycatalog.client.api.MetastoresApi; @@ -83,7 +82,7 @@ * @see GetCommitsResponse * @see TokenProvider */ -public class UCTokenBasedRestClient implements UCClient { +public class UCTokenBasedRestClient extends UCTokenBasedApiClientProvider implements UCClient { private DeltaCommitsApi deltaCommitsApi; private MetastoresApi metastoresApi; @@ -109,21 +108,20 @@ public UCTokenBasedRestClient( String baseUri, TokenProvider tokenProvider, Map appVersions) { - Objects.requireNonNull(baseUri, "baseUri must not be null"); - Objects.requireNonNull(tokenProvider, "tokenProvider must not be null"); - Objects.requireNonNull(appVersions, "appVersions must not be null"); - - ApiClientBuilder builder = ApiClientBuilder.create() - .uri(baseUri) - .tokenProvider(tokenProvider); - - appVersions.forEach((name, version) -> { - if (version != null) { - builder.addAppVersion(name, version); - } - }); + super(baseUri, tokenProvider, appVersions); + ApiClient apiClient = getApiClient(); + this.deltaCommitsApi = new DeltaCommitsApi(apiClient); + this.metastoresApi = new MetastoresApi(apiClient); + this.tablesApi = new TablesApi(apiClient); + } - ApiClient apiClient = builder.build(); + public UCTokenBasedRestClient( + String baseUri, + TokenProvider tokenProvider, + Map appVersions, + String catalog) { + super(baseUri, tokenProvider, appVersions, catalog); + ApiClient apiClient = getApiClient(); this.deltaCommitsApi = new DeltaCommitsApi(apiClient); this.metastoresApi = new MetastoresApi(apiClient); this.tablesApi = new TablesApi(apiClient); @@ -233,6 +231,7 @@ public GetCommitsResponse getCommits( @Override public void close() throws IOException { + super.close(); // Nulling out the API instances makes them eligible for GC. Once garbage collected, // the underlying connection pool is freed and destroyed. this.deltaCommitsApi = null;