From 5a06eff7e950bb03c85022c29d7d4981c575d298 Mon Sep 17 00:00:00 2001 From: Timothy Wang Date: Thu, 14 May 2026 07:06:32 +0000 Subject: [PATCH 1/3] storage: add UC Delta Rest Catalog API loadTable client APIs --- .../UCTokenBasedApiClientProvider.java | 76 +++++++++++++++++++ .../UCTokenBasedRestClient.java | 31 ++++---- 2 files changed, 91 insertions(+), 16 deletions(-) create mode 100644 storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedApiClientProvider.java diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedApiClientProvider.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedApiClientProvider.java new file mode 100644 index 00000000000..37ac19c8719 --- /dev/null +++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedApiClientProvider.java @@ -0,0 +1,76 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.storage.commit.uccommitcoordinator; + +import io.unitycatalog.client.ApiClient; +import io.unitycatalog.client.ApiClientBuilder; +import io.unitycatalog.client.auth.TokenProvider; +import io.unitycatalog.client.retry.JitterDelayRetryPolicy; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +abstract class UCTokenBasedApiClientProvider { + private ApiClient apiClient; + + protected UCTokenBasedApiClientProvider( + String baseUri, + TokenProvider tokenProvider, + Map appVersions) { + this.apiClient = buildApiClient(baseUri, tokenProvider, appVersions); + } + + protected UCTokenBasedApiClientProvider( + String baseUri, + TokenProvider tokenProvider, + Map appVersions, + String catalog) { + // The catalog is consumed by subclasses that probe catalog-scoped Delta API support. + this(baseUri, tokenProvider, appVersions); + } + + private static ApiClient buildApiClient( + String baseUri, + TokenProvider tokenProvider, + Map appVersions) { + Objects.requireNonNull(baseUri, "baseUri must not be null"); + Objects.requireNonNull(tokenProvider, "tokenProvider must not be null"); + Objects.requireNonNull(appVersions, "appVersions must not be null"); + + ApiClientBuilder builder = ApiClientBuilder.create() + .uri(baseUri) + .tokenProvider(tokenProvider) + .retryPolicy(JitterDelayRetryPolicy.builder().build()); + + appVersions.forEach((name, version) -> { + if (version != null) { + builder.addAppVersion(name, version); + } + }); + + return builder.build(); + } + + protected ApiClient getApiClient() { + return apiClient; + } + + public void close() throws IOException { + apiClient = null; + } +} diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClient.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClient.java index 67b582a1b00..a60f9d67290 100644 --- a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClient.java +++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClient.java @@ -26,7 +26,6 @@ import io.delta.storage.commit.uniform.IcebergMetadata; import io.delta.storage.commit.uniform.UniformMetadata; import io.unitycatalog.client.ApiClient; -import io.unitycatalog.client.ApiClientBuilder; import io.unitycatalog.client.ApiException; import io.unitycatalog.client.api.DeltaCommitsApi; import io.unitycatalog.client.api.MetastoresApi; @@ -83,7 +82,7 @@ * @see GetCommitsResponse * @see TokenProvider */ -public class UCTokenBasedRestClient implements UCClient { +public class UCTokenBasedRestClient extends UCTokenBasedApiClientProvider implements UCClient { private DeltaCommitsApi deltaCommitsApi; private MetastoresApi metastoresApi; @@ -109,21 +108,20 @@ public UCTokenBasedRestClient( String baseUri, TokenProvider tokenProvider, Map appVersions) { - Objects.requireNonNull(baseUri, "baseUri must not be null"); - Objects.requireNonNull(tokenProvider, "tokenProvider must not be null"); - Objects.requireNonNull(appVersions, "appVersions must not be null"); - - ApiClientBuilder builder = ApiClientBuilder.create() - .uri(baseUri) - .tokenProvider(tokenProvider); - - appVersions.forEach((name, version) -> { - if (version != null) { - builder.addAppVersion(name, version); - } - }); + super(baseUri, tokenProvider, appVersions); + ApiClient apiClient = getApiClient(); + this.deltaCommitsApi = new DeltaCommitsApi(apiClient); + this.metastoresApi = new MetastoresApi(apiClient); + this.tablesApi = new TablesApi(apiClient); + } - ApiClient apiClient = builder.build(); + public UCTokenBasedRestClient( + String baseUri, + TokenProvider tokenProvider, + Map appVersions, + String catalog) { + super(baseUri, tokenProvider, appVersions, catalog); + ApiClient apiClient = getApiClient(); this.deltaCommitsApi = new DeltaCommitsApi(apiClient); this.metastoresApi = new MetastoresApi(apiClient); this.tablesApi = new TablesApi(apiClient); @@ -233,6 +231,7 @@ public GetCommitsResponse getCommits( @Override public void close() throws IOException { + super.close(); // Nulling out the API instances makes them eligible for GC. Once garbage collected, // the underlying connection pool is freed and destroyed. this.deltaCommitsApi = null; From 466d2eb1adeb23510b7fa4b2552cdab0ba378fb8 Mon Sep 17 00:00:00 2001 From: Timothy Wang Date: Thu, 14 May 2026 16:53:00 +0000 Subject: [PATCH 2/3] spark: wire UC Delta Rest Catalog API loadTable into Delta catalog --- build.sbt | 62 ++- project/scripts/setup_unitycatalog_main.sh | 22 +- .../delta/catalog/AbstractDeltaCatalog.scala | 5 +- .../delta/catalog/DeltaCatalogClient.scala | 29 ++ .../delta/catalog/UCDeltaCatalogClient.scala | 332 ++++++++++++ ...UCDeltaRestCatalogApiSchemaConverter.scala | 29 ++ .../UCCommitCoordinatorBuilder.scala | 32 +- .../catalog/DeltaCatalogClientSuite.scala | 472 ++++++++++++++++++ ...taRestCatalogApiSchemaConverterSuite.scala | 63 +++ .../sparkuctest/S3CredentialFileSystem.java | 30 +- .../UCDeltaTableIntegrationBaseTest.java | 5 +- .../io/sparkuctest/UCDeltaTableReadTest.java | 7 + .../io/sparkuctest/UnityCatalogSupport.java | 7 + .../sparkuctest/UnityCatalogSupportTest.java | 37 +- 14 files changed, 1101 insertions(+), 31 deletions(-) create mode 100644 spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalogClient.scala create mode 100644 spark/src/main/scala/org/apache/spark/sql/delta/catalog/UCDeltaCatalogClient.scala create mode 100644 spark/src/main/scala/org/apache/spark/sql/delta/catalog/UCDeltaRestCatalogApiSchemaConverter.scala create mode 100644 spark/src/test/scala/org/apache/spark/sql/delta/catalog/DeltaCatalogClientSuite.scala create mode 100644 spark/src/test/scala/org/apache/spark/sql/delta/catalog/UCDeltaRestCatalogApiSchemaConverterSuite.scala diff --git a/build.sbt b/build.sbt index 27cf4cacd61..831f2f9c5b7 100644 --- a/build.sbt +++ b/build.sbt @@ -340,8 +340,22 @@ lazy val sparkV1 = (project in file("spark")) "org.apache.spark" %% "spark-core" % sparkVersion.value % "provided", "org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "provided", // For DynamoDBCommitStore - "com.amazonaws" % "aws-java-sdk" % "1.12.262" % "provided", - + "com.amazonaws" % "aws-java-sdk" % "1.12.262" % "provided" + ) ++ { + if (unityCatalogVersion >= "0.5.0") { + Seq( + "io.unitycatalog" % "unitycatalog-hadoop" % unityCatalogVersion excludeAll( + ExclusionRule(organization = "org.openapitools"), + ExclusionRule(organization = "com.fasterxml.jackson.core"), + ExclusionRule(organization = "com.fasterxml.jackson.module"), + ExclusionRule(organization = "com.fasterxml.jackson.datatype"), + ExclusionRule(organization = "com.fasterxml.jackson.dataformat") + ) + ) + } else { + Seq.empty + } + } ++ Seq( // Test deps "org.scalatest" %% "scalatest" % scalaTestVersion % "test", "org.scalatestplus" %% "scalacheck-1-15" % "3.2.9.0" % "test", @@ -660,8 +674,22 @@ lazy val spark = (project in file("spark-unified")) "org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided", "org.apache.spark" %% "spark-core" % sparkVersion.value % "provided", "org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "provided", - "com.amazonaws" % "aws-java-sdk" % "1.12.262" % "provided", - + "com.amazonaws" % "aws-java-sdk" % "1.12.262" % "provided" + ) ++ { + if (unityCatalogVersion >= "0.5.0") { + Seq( + "io.unitycatalog" % "unitycatalog-hadoop" % unityCatalogVersion excludeAll( + ExclusionRule(organization = "org.openapitools"), + ExclusionRule(organization = "com.fasterxml.jackson.core"), + ExclusionRule(organization = "com.fasterxml.jackson.module"), + ExclusionRule(organization = "com.fasterxml.jackson.datatype"), + ExclusionRule(organization = "com.fasterxml.jackson.dataformat") + ) + ) + } else { + Seq.empty + } + } ++ Seq( "org.scalatest" %% "scalatest" % scalaTestVersion % "test", "org.scalatestplus" %% "scalacheck-1-15" % "3.2.9.0" % "test", "junit" % "junit" % "4.13.2" % "test", @@ -669,7 +697,17 @@ lazy val spark = (project in file("spark-unified")) "org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "test" classifier "tests", "org.apache.spark" %% "spark-core" % sparkVersion.value % "test" classifier "tests", "org.apache.spark" %% "spark-sql" % sparkVersion.value % "test" classifier "tests", - "org.apache.spark" %% "spark-hive" % sparkVersion.value % "test" classifier "tests", + "org.apache.spark" %% "spark-hive" % sparkVersion.value % "test" classifier "tests" + ) ++ { + if (unityCatalogVersion >= "0.5.0") { + Seq( + // unitycatalog-hadoop references the ABFS token-provider interface during classloading. + "org.apache.hadoop" % "hadoop-azure" % hadoopVersion % "test" + ) + } else { + Seq.empty + } + } ++ Seq( "org.mockito" % "mockito-inline" % "4.11.0" % "test", ), @@ -843,13 +881,19 @@ Global / ensurePinnedUnityCatalog := { val home = file(sys.props("user.home")) // Check both layouts: a restored sbt cache can pre-populate ivy alone, leaving m2 empty - // checking only ivy would silently skip the slow publish and break mvn-based consumers. - val ivy2Canary = home / ".ivy2" / "local" / "io.unitycatalog" / + val ivy2ClientCanary = home / ".ivy2" / "local" / "io.unitycatalog" / "unitycatalog-client" / unityCatalogVersion / "ivys" / "ivy.xml" - val m2Canary = home / ".m2" / "repository" / "io" / "unitycatalog" / + val m2ClientCanary = home / ".m2" / "repository" / "io" / "unitycatalog" / "unitycatalog-client" / unityCatalogVersion / s"unitycatalog-client-$unityCatalogVersion.pom" - if (!ivy2Canary.exists || !m2Canary.exists) { - publishPinnedUnityCatalog(log, ivy2Canary) + val ivy2HadoopCanary = home / ".ivy2" / "local" / "io.unitycatalog" / + "unitycatalog-hadoop" / unityCatalogVersion / "ivys" / "ivy.xml" + val m2HadoopCanary = home / ".m2" / "repository" / "io" / "unitycatalog" / + "unitycatalog-hadoop" / unityCatalogVersion / + s"unitycatalog-hadoop-$unityCatalogVersion.pom" + if (!Seq(ivy2ClientCanary, m2ClientCanary, ivy2HadoopCanary, m2HadoopCanary) + .forall(_.exists)) { + publishPinnedUnityCatalog(log, ivy2ClientCanary) } } } diff --git a/project/scripts/setup_unitycatalog_main.sh b/project/scripts/setup_unitycatalog_main.sh index d66896365a7..685dcf48b7c 100755 --- a/project/scripts/setup_unitycatalog_main.sh +++ b/project/scripts/setup_unitycatalog_main.sh @@ -89,7 +89,7 @@ if [[ "${1:-}" == "--print-version" ]]; then exit 0 fi -# Canonical Ivy + Maven artifact paths. Delta depends on all three UC modules; sbt resolves from +# Canonical Ivy + Maven artifact paths. Delta depends on these UC modules; sbt resolves from # ~/.ivy2/local, mvn (kernel-examples integration tests) resolves from ~/.m2/repository. If any # is missing in either layout we must re-publish. IVY_LOCAL="$HOME/.ivy2/local/io.unitycatalog" @@ -118,24 +118,26 @@ if [[ "$UC_FORCE" != "1" ]] && all_canaries_present; then exit 0 fi -echo ">>> Fetching Unity Catalog main from $UC_REPO" +echo ">>> Fetching Unity Catalog from $UC_REPO" rm -rf "$UC_DIR" mkdir -p "$UC_DIR" -# Fetch main's full history so we can run `git merge-base --is-ancestor` below to verify the -# pinned SHA is actually on main. UC's repo is small; full fetch of one branch is cheap. +# Fetch the target branch so we can verify the pinned SHA is reachable. git -C "$UC_DIR" init --quiet git -C "$UC_DIR" remote add origin "$UC_REPO" -git -C "$UC_DIR" fetch --quiet origin main +if [[ "$UC_REF" == "main" ]]; then + git -C "$UC_DIR" fetch --quiet origin main +else + git -C "$UC_DIR" fetch --quiet origin "$UC_PIN_SHA" +fi cd "$UC_DIR" -# Safety check: the pinned SHA must be reachable from UC main. Local `merge-base --is-ancestor` +# Safety check: the pinned SHA must be reachable from the fetched branch. Local `merge-base --is-ancestor` # on the history we just fetched - no GitHub API, no token needed. Only applies when UC_REF is # the pinned SHA; UC_REF=main is trivially on main. if [[ "$UC_REF" == "$UC_PIN_SHA" ]]; then - if ! git merge-base --is-ancestor "$UC_PIN_SHA" origin/main 2>/dev/null; then - echo "ERROR: UC_PIN_SHA=$UC_PIN_SHA is not reachable from unitycatalog/unitycatalog main." >&2 - echo " Pin must reference a commit on https://github.com/unitycatalog/unitycatalog/commits/main" >&2 + if ! git rev-parse --verify "$UC_PIN_SHA^{commit}" >/dev/null 2>&1; then + echo "ERROR: UC_PIN_SHA=$UC_PIN_SHA could not be fetched from $UC_REPO." >&2 exit 1 fi fi @@ -161,7 +163,7 @@ fi # coordinate. Applied as a persistent setting so it sticks across the two sbt invocations below. SET_VERSION_CMD="set ThisBuild / version := \"$UC_VERSION\"" -echo ">>> Building and publishing UC client + server to local Maven repo" +echo ">>> Building and publishing UC client + server + hadoop to local Maven repo" ./build/sbt \ "$SET_VERSION_CMD" \ "set client / Compile / packageDoc / publishArtifact := false" \ diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala index 86a0f9b4ee9..b5dacd69a33 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala @@ -83,6 +83,9 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension val spark = SparkSession.active + private lazy val deltaCatalogClient: Option[DeltaCatalogClient] = + Some(UCDeltaCatalogClient(delegate, spark)) + private lazy val isUnityCatalog: Boolean = { val delegateField = classOf[DelegatingCatalogExtension].getDeclaredField("delegate") delegateField.setAccessible(true) @@ -290,7 +293,7 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension "DeltaCatalog", "loadTable") { setVariantBlockingConfigIfUC() try { - val table = super.loadTable(ident) + val table = deltaCatalogClient.flatMap(_.loadTable(ident)).getOrElse(super.loadTable(ident)) ServerSidePlannedTable.tryCreate(spark, ident, table, isUnityCatalog).foreach { sspt => return sspt diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalogClient.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalogClient.scala new file mode 100644 index 00000000000..e495152a129 --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalogClient.scala @@ -0,0 +1,29 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.catalog + +import org.apache.spark.sql.connector.catalog.{Identifier, Table} + +/** + * Spark-facing Delta catalog API hook. + * + *

The interface is intentionally free of UC SDK and Hadoop credential dependencies so the shared + * catalog path does not depend on a specific UC client implementation. + */ +private[catalog] trait DeltaCatalogClient { + def loadTable(ident: Identifier): Option[Table] +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/UCDeltaCatalogClient.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/UCDeltaCatalogClient.scala new file mode 100644 index 00000000000..314eb811d57 --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/UCDeltaCatalogClient.scala @@ -0,0 +1,332 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.catalog + +import java.io.IOException +import java.lang.reflect.InvocationTargetException +import java.net.URI +import java.util.{Locale, Map => JMap} + +import scala.collection.JavaConverters._ + +import io.delta.storage.commit.actions.AbstractMetadata +import io.delta.storage.commit.uccommitcoordinator.UCDeltaClient +import io.unitycatalog.client.ApiException +import io.unitycatalog.client.auth.TokenProvider +import io.unitycatalog.hadoop.UCCredentialHadoopConfs +import io.unitycatalog.hadoop.UCCredentialHadoopConfs.TableOperation +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils} +import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, Table, V1Table} +import org.apache.spark.sql.delta.coordinatedcommits.{ + UCCommitCoordinatorBuilder, + UCTokenBasedRestClientFactory +} +import org.apache.spark.sql.delta.sources.DeltaSourceUtils + +/** + * Spark-side client for using UC Delta Rest Catalog API responses inside `DeltaCatalog`. + * + * This class owns the UC-specific implementation behind the Spark-facing [[DeltaCatalogClient]] + * interface: choosing when to try the Delta API path, translating Delta API metadata into Spark + * catalog objects, fetching credentials for Spark/Hadoop, and returning `None` when + * `DeltaCatalog` should fall back to the legacy catalog path. + */ +private class UCDeltaCatalogClient private ( + private val ucDeltaClient: Option[UCDeltaClient], + catalogName: String, + credentialContext: Option[UCDeltaRestCatalogApiCredentialContext]) extends DeltaCatalogClient { + + import UCDeltaCatalogClient._ + + def loadTable(ident: Identifier): Option[Table] = { + ucDeltaClient match { + case Some(client) if isNamedTableIdentifier(ident) => + val schemaName = ident.namespace().head + val tableName = ident.name() + val metadata = try { + client.loadTable(catalogName, schemaName, tableName) + } catch { + case e: IOException if isUnsupportedTableFormat(e) => + return None + case e: IOException => + throw translateLoadTableException(ident, e) + } + val location = reflectedString(metadata, "getLocation") + val locationUri = CatalogUtils.stringToURI(location) + Some(V1Table(buildCatalogTableFromUCDeltaMetadata( + ident, + metadata, + location, + locationUri))) + case _ => + // UC Delta Rest Catalog API only supports catalog.schema.table identifiers for named + // tables. + None + } + } + + private def translateLoadTableException(ident: Identifier, e: IOException): Throwable = { + e.getCause match { + case api: ApiException if api.getCode == 404 => + new NoSuchTableException(ident) + case _ => + e + } + } + + /** + * UC Delta APIs use this explicit 501 response when a table exists in UC but cannot be served + * through the Delta endpoint, such as a metric view or another non-Delta table format. + */ + private def isUnsupportedTableFormat(e: IOException): Boolean = e.getCause match { + case api: ApiException => + api.getCode == 501 && + Option(api.getResponseBody).exists(_.contains(UnsupportedTableFormatExceptionType)) + case _ => false + } + + private def isNamedTableIdentifier(ident: Identifier): Boolean = { + ident.namespace().length == 1 && !isDeltaPathIdentifier(ident) + } + + private def isDeltaPathIdentifier(ident: Identifier): Boolean = { + try { + ident.namespace().length == 1 && + DeltaSourceUtils.isDeltaDataSourceName(ident.namespace().head) && + new Path(ident.name()).isAbsolute + } catch { + case _: IllegalArgumentException => false + } + } + + /** + * Builds the Spark V1 catalog table returned from UC Delta Rest Catalog API metadata. + * The UC Delta response supplies the Spark table type, schema, provider, and storage metadata. + */ + private def buildCatalogTableFromUCDeltaMetadata( + ident: Identifier, + metadata: AbstractMetadata, + location: String, + locationUri: URI): CatalogTable = { + val schemaName = ident.namespace().head + val tableName = ident.name() + CatalogTable( + identifier = + TableIdentifier(ident.name(), ident.namespace().lastOption, Some(catalogName)), + tableType = reflectedString(metadata, "getTableType") match { + case ManagedTableType => + CatalogTableType.MANAGED + case ExternalTableType => + CatalogTableType.EXTERNAL + case other => + throw new IllegalArgumentException( + s"Unsupported UC Delta Rest Catalog API table type for " + + s"$catalogName.${ident.namespace().mkString(".")}.${ident.name()}: $other") + }, + storage = CatalogStorageFormat.empty.copy( + locationUri = Some(locationUri), + properties = buildCatalogStorageProperties( + metadata, + location, + locationUri.getScheme, + schemaName, + tableName)), + schema = UCDeltaRestCatalogApiSchemaConverter.toSparkType(metadata.getSchemaString), + provider = Option(metadata.getProvider), + partitionColumnNames = Option(metadata.getPartitionColumns) + .map(_.asScala.toSeq) + .getOrElse(Nil)) + } + + /** + * Builds CatalogStorageFormat.properties for the Spark V1 table. + * V1Table later exposes these to Delta as option.* table properties. + */ + private def buildCatalogStorageProperties( + metadata: AbstractMetadata, + location: String, + locationScheme: String, + schemaName: String, + tableName: String): Map[String, String] = { + val credentialProperties = buildHadoopCredentialPropertiesForTable( + location, + locationScheme, + schemaName, + tableName) + // V1Table exposes storage properties as option.* table properties. Keep UC Delta Rest Catalog + // API table features here so the Delta load path receives them with the same option.* shape as + // other storage-level UC properties, while CatalogTable.properties stays reserved for Spark + // metadata. + Option(metadata.getConfiguration).map(_.asScala.toMap).getOrElse(Map.empty) ++ + credentialProperties + } + + private def buildHadoopCredentialPropertiesForTable( + location: String, + locationScheme: String, + schemaName: String, + tableName: String): Map[String, String] = { + if (!isCloudScheme(locationScheme)) { + Map.empty[String, String] + } else { + val context = credentialContext.getOrElse { + throw new IllegalStateException( + "UC Delta Rest Catalog API credential context is missing for cloud location " + location) + } + val builder = UCCredentialHadoopConfs.builder( + context.uri, + locationScheme.toLowerCase(Locale.ROOT)) + .tokenProvider(context.tokenProvider) + .enableCredentialRenewal(context.renewCredentialEnabled) + .enableCredentialScopedFs(context.credScopedFsEnabled) + .hadoopConf(context.hadoopConf) + .addAppVersions(context.appVersions) + try { + // Prefer READ_WRITE so a loaded table can be used for writes without reloading + // credentials; read-only principals fall back to READ below. + builder.buildForTable( + catalogName, + schemaName, + tableName, + TableOperation.READ_WRITE, + location) + .asScala + .toMap + } catch { + case e: ApiException if e.getCode == 401 || e.getCode == 403 => + builder.buildForTable(catalogName, schemaName, tableName, TableOperation.READ, location) + .asScala + .toMap + } + } + } +} + +private case class UCDeltaRestCatalogApiCredentialContext( + uri: String, + tokenProvider: TokenProvider, + renewCredentialEnabled: Boolean, + credScopedFsEnabled: Boolean, + hadoopConf: Configuration, + appVersions: JMap[String, String]) + +private object UCDeltaCatalogClient { + private[catalog] val UCDeltaRestCatalogApiEnabledKey = "deltaRestApi.enabled" + private[catalog] val RenewCredentialEnabledKey = "renewCredential.enabled" + private[catalog] val CredScopedFsEnabledKey = "credScopedFs.enabled" + private val ManagedTableType = "MANAGED" + private val ExternalTableType = "EXTERNAL" + private val UnsupportedTableFormatExceptionType = "UnsupportedTableFormatException" + private val DefaultRenewCredentialEnabled = true + private val DefaultCredScopedFsEnabled = false + private val CloudSchemes = Set("s3", "s3a", "gs", "abfs", "abfss") + + private[catalog] def deltaRestApiEnabledConf(catalogName: String): String = { + s"spark.sql.catalog.$catalogName.$UCDeltaRestCatalogApiEnabledKey" + } + + private[catalog] def renewCredentialEnabledConf(catalogName: String): String = { + s"spark.sql.catalog.$catalogName.$RenewCredentialEnabledKey" + } + + private[catalog] def credScopedFsEnabledConf(catalogName: String): String = { + s"spark.sql.catalog.$catalogName.$CredScopedFsEnabledKey" + } + + private def isCloudScheme(scheme: String): Boolean = { + Option(scheme).exists(s => CloudSchemes.contains(s.toLowerCase(Locale.ROOT))) + } + + private def reflectedString(metadata: AbstractMetadata, methodName: String): String = { + try { + metadata.getClass.getMethod(methodName).invoke(metadata).asInstanceOf[String] + } catch { + case e: NoSuchMethodException => + throw new IllegalStateException( + s"UC Delta metadata is missing required method $methodName.", e) + case e: InvocationTargetException => + e.getCause match { + case runtime: RuntimeException => throw runtime + case error: Error => throw error + case cause => throw new IllegalStateException( + s"Failed to read $methodName from UC Delta metadata.", cause) + } + } + } + + def apply(delegatePlugin: CatalogPlugin, spark: SparkSession): UCDeltaCatalogClient = { + val catalogName = delegatePlugin.name() + var credentialContext = Option.empty[UCDeltaRestCatalogApiCredentialContext] + val ucDeltaClient = if (spark.conf + .get(deltaRestApiEnabledConf(catalogName), "false") + .toBoolean) { + val (_, uri, authConfig) = UCCommitCoordinatorBuilder.getCatalogConfigs(spark) + .collectFirst { case (`catalogName`, configuredUri, configuredAuthConfig) => + (catalogName, configuredUri, configuredAuthConfig) + } + .getOrElse { + throw new IllegalArgumentException( + "UC Delta Rest Catalog API is enabled for catalog " + + s"$catalogName, but its Unity Catalog " + + "configuration is missing or incomplete.") + } + val tokenProvider = TokenProvider.create(authConfig.asJava) + // Catalog load has no DeltaLog yet, so pass the Spark session Hadoop conf to the UC + // credential builder. + // scalastyle:off deltahadoopconfiguration + val hadoopConf = spark.sessionState.newHadoopConf() + // scalastyle:on deltahadoopconfiguration + val appVersions = UCTokenBasedRestClientFactory.defaultAppVersionsAsJava + credentialContext = Some(UCDeltaRestCatalogApiCredentialContext( + uri.toString, + tokenProvider, + spark.conf.get( + renewCredentialEnabledConf(catalogName), + DefaultRenewCredentialEnabled.toString).toBoolean, + spark.conf.get( + credScopedFsEnabledConf(catalogName), + DefaultCredScopedFsEnabled.toString).toBoolean, + hadoopConf, + appVersions)) + UCTokenBasedRestClientFactory.createUCDeltaClient( + uri, + tokenProvider, + appVersions, + catalogName) match { + case Some(client) if client.supportsUCDeltaRestCatalogApi() => + Some(client) + case Some(client) => + client.close() + throw new IllegalArgumentException( + s"UC Delta Rest Catalog API is enabled for catalog $catalogName, but the Unity " + + "Catalog server does not support the required UC Delta Rest Catalog API endpoints.") + case None => + None + } + } else { + None + } + new UCDeltaCatalogClient(ucDeltaClient, catalogName, credentialContext) + } + +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/UCDeltaRestCatalogApiSchemaConverter.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/UCDeltaRestCatalogApiSchemaConverter.scala new file mode 100644 index 00000000000..809ce876acc --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/UCDeltaRestCatalogApiSchemaConverter.scala @@ -0,0 +1,29 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.catalog + +import org.apache.spark.sql.types.{DataType, StructType} + +private[catalog] object UCDeltaRestCatalogApiSchemaConverter { + + def toSparkType(schemaString: String): StructType = { + if (schemaString == null) { + throw new IllegalArgumentException("UC Delta Rest Catalog API table schema is missing.") + } + DataType.fromJson(schemaString).asInstanceOf[StructType] + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilder.scala b/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilder.scala index 0f92e15f870..e15ac7cb692 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilder.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilder.scala @@ -23,7 +23,12 @@ import scala.collection.JavaConverters._ import scala.util.control.NonFatal import io.delta.storage.commit.CommitCoordinatorClient -import io.delta.storage.commit.uccommitcoordinator.{UCClient, UCCommitCoordinatorClient, UCTokenBasedRestClient} +import io.delta.storage.commit.uccommitcoordinator.{ + UCClient, + UCCommitCoordinatorClient, + UCDeltaClient, + UCTokenBasedRestClient +} import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.metering.DeltaLogging @@ -31,6 +36,7 @@ import org.apache.spark.sql.delta.metering.DeltaLogging import io.unitycatalog.client.auth.TokenProvider import org.apache.spark.internal.MDC import org.apache.spark.sql.SparkSession +import org.apache.spark.util.Utils /** * Builder for Unity Catalog Commit Coordinator Clients. @@ -291,6 +297,9 @@ trait UCClientFactory { } object UCTokenBasedRestClientFactory extends UCClientFactory { + private val UCDeltaTokenBasedRestClientClassName = + "io.delta.storage.commit.uccommitcoordinator.UCDeltaTokenBasedRestClient" + override def createUCClient(uri: String, authConfig: Map[String, String]): UCClient = { createUCClientWithVersions(uri, authConfig, defaultAppVersions) } @@ -311,6 +320,27 @@ object UCTokenBasedRestClientFactory extends UCClientFactory { new UCTokenBasedRestClient(uri, tokenProvider, appVersions.asJava) } + private[delta] def createUCDeltaClient( + uri: String, + tokenProvider: TokenProvider, + appVersions: java.util.Map[String, String], + catalogName: String): Option[UCDeltaClient] = { + try { + val clientClass = Utils.classForName(UCDeltaTokenBasedRestClientClassName) + val constructor = clientClass.getConstructor( + classOf[String], + classOf[TokenProvider], + classOf[java.util.Map[String, String]], + classOf[String]) + Some(constructor + .newInstance(uri, tokenProvider, appVersions, catalogName) + .asInstanceOf[UCDeltaClient]) + } catch { + case _: ClassNotFoundException => + None + } + } + private[coordinatedcommits] def defaultAppVersions: Map[String, String] = { Map( "Delta" -> io.delta.VERSION, diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/catalog/DeltaCatalogClientSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/catalog/DeltaCatalogClientSuite.scala new file mode 100644 index 00000000000..28fcb000f58 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/delta/catalog/DeltaCatalogClientSuite.scala @@ -0,0 +1,472 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.catalog + +import java.io.IOException +import java.net.InetSocketAddress +import java.nio.charset.StandardCharsets + +import scala.collection.JavaConverters._ + +import com.sun.net.httpserver.{HttpExchange, HttpServer} +import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorClient.UC_TABLE_ID_KEY + +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.connector.catalog.{ + Identifier, + Table, + TableCatalog, + TableChange, + V1Table +} +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class DeltaCatalogClientSuite + extends QueryTest + with DeltaSQLCommandTest + with BeforeAndAfterAll + with BeforeAndAfterEach { + + private var server: HttpServer = _ + private var serverUri: String = _ + private var configHandler: HttpExchange => Unit = _ + private var handler: HttpExchange => Unit = _ + private var credentialRequestCount: Int = _ + + private val AwsVendedTokenProviderClass = + "io.unitycatalog.hadoop.internal.auth.AwsVendedTokenProvider" + private val S3ACredentialsProviderKey = "fs.s3a.aws.credentials.provider" + private val S3AInitAccessKey = "fs.s3a.init.access.key" + private val UCTableOperationKey = "fs.unitycatalog.table.operation" + + override def beforeAll(): Unit = { + super.beforeAll() + server = HttpServer.create(new InetSocketAddress("localhost", 0), 0) + server.createContext("/api/2.1/unity-catalog/delta/v1/config", exchange => { + try { + if (configHandler != null) { + configHandler(exchange) + } else { + sendJson(exchange, 200, + """{ + | "endpoints": [ + | "GET /v1/catalogs/{catalog}/schemas/{schema}/tables/{table}", + | "GET /v1/catalogs/{catalog}/schemas/{schema}/tables/{table}/credentials" + | ], + | "protocol-version": "1.0" + |}""".stripMargin) + } + } finally { + exchange.close() + } + }) + server.createContext("/api/2.1/unity-catalog/delta/v1/catalogs", exchange => { + try { + if (handler != null) handler(exchange) else sendJson(exchange, 404, "{}") + } finally { + exchange.close() + } + }) + server.start() + serverUri = s"http://localhost:${server.getAddress.getPort}" + } + + override def afterAll(): Unit = { + if (server != null) server.stop(0) + super.afterAll() + } + + override def beforeEach(): Unit = { + super.beforeEach() + configHandler = null + handler = null + credentialRequestCount = 0 + } + + test("loadTable skips credentials for local Delta locations") { + handler = exchange => exchange.getRequestURI.getPath match { + case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl" => + sendJson(exchange, 200, loadTableResponseJson("file:/tmp/uc/table")) + case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl/credentials" => + credentialRequestCount += 1 + sendJson(exchange, 500, "{}") + case path => + fail(s"Unexpected UC Delta Rest Catalog API request path: $path") + } + + val table = loadWithUCDeltaRestCatalogApi() + val properties = table.catalogTable.storage.properties + val idMetadata = table.catalogTable.schema("id").metadata + + assert(credentialRequestCount === 0) + assert(properties === Map( + "delta.feature.catalogManaged" -> "supported", + UC_TABLE_ID_KEY -> "11111111-1111-1111-1111-111111111111")) + assert(idMetadata.getLong("delta.columnMapping.id") === 1L) + assert(idMetadata.getString("delta.columnMapping.physicalName") === "col-123") + } + + test("loadTable fails loudly when cloud credentials are empty") { + handler = exchange => exchange.getRequestURI.getPath match { + case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl" => + sendJson(exchange, 200, loadTableResponseJson("s3://bucket/table")) + case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl/credentials" => + credentialRequestCount += 1 + sendJson(exchange, 200, """{"storage-credentials": []}""") + case path => + fail(s"Unexpected UC Delta Rest Catalog API request path: $path") + } + + val error = intercept[IllegalArgumentException] { + loadWithUCDeltaRestCatalogApi() + } + + assert(credentialRequestCount === 1) + assert(error.getMessage.contains("no storage credentials")) + } + + test("loadTable accepts trailing-slash cloud credential prefixes") { + handler = exchange => exchange.getRequestURI.getPath match { + case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl" => + sendJson(exchange, 200, loadTableResponseJson("s3://bucket/path/to/table")) + case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl/credentials" => + credentialRequestCount += 1 + assert(exchange.getRequestURI.getQuery === "operation=READ_WRITE") + sendJson(exchange, 200, s3CredentialsResponseJson( + "s3://bucket/path/to/table/", + "READ_WRITE")) + case path => + fail(s"Unexpected UC Delta Rest Catalog API request path: $path") + } + + val table = loadWithUCDeltaRestCatalogApi() + val storageProperties = table.catalogTable.storage.properties + val tableProperties = table.properties.asScala + + assert(credentialRequestCount === 1) + assert(storageProperties(S3ACredentialsProviderKey) === + AwsVendedTokenProviderClass) + assert(storageProperties(S3AInitAccessKey) === "ak") + assert(tableProperties( + s"option.${S3ACredentialsProviderKey}") === + AwsVendedTokenProviderClass) + assert(tableProperties( + s"option.${S3AInitAccessKey}") === "ak") + assert(storageProperties("delta.feature.catalogManaged") === "supported") + assert(!tableProperties.contains("delta.feature.catalogManaged")) + assert(!tableProperties.contains( + s"option.option.${S3AInitAccessKey}")) + } + + test("loadTable falls back to READ credentials when READ_WRITE is denied") { + var credentialQueries = Seq.empty[String] + handler = exchange => exchange.getRequestURI.getPath match { + case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl" => + sendJson(exchange, 200, loadTableResponseJson("s3://bucket/path/to/table")) + case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl/credentials" => + credentialQueries :+= exchange.getRequestURI.getQuery + exchange.getRequestURI.getQuery match { + case "operation=READ_WRITE" => + sendJson(exchange, 403, """{"error_code": "PERMISSION_DENIED"}""") + case "operation=READ" => + sendJson(exchange, 200, s3CredentialsResponseJson("s3://bucket/path/to/table", "READ")) + case other => + fail(s"Unexpected credential query: $other") + } + case path => + fail(s"Unexpected UC Delta Rest Catalog API request path: $path") + } + + val table = loadWithUCDeltaRestCatalogApi() + + assert(credentialQueries === Seq("operation=READ_WRITE", "operation=READ")) + assert(table.catalogTable.storage.properties( + UCTableOperationKey) === "READ") + } + + test("loadTable uses static credential properties when renewal is disabled") { + handler = exchange => exchange.getRequestURI.getPath match { + case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl" => + sendJson(exchange, 200, loadTableResponseJson("s3://bucket/path/to/table")) + case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl/credentials" => + credentialRequestCount += 1 + sendJson( + exchange, + 200, + s3CredentialsResponseJson("s3://bucket/path/to/table", "READ_WRITE")) + case path => + fail(s"Unexpected UC Delta Rest Catalog API request path: $path") + } + + val table = withUCDeltaRestCatalogApiRenewalDisabled { catalog => + catalog.loadTable(Identifier.of(Array("default"), "tbl")).get.asInstanceOf[V1Table] + } + + assert(credentialRequestCount === 1) + assert(table.catalogTable.storage.properties("fs.s3a.access.key") === "ak") + assert(!table.catalogTable.storage.properties.contains( + S3ACredentialsProviderKey)) + } + + test("loadTable maps missing provider to None") { + handler = exchange => exchange.getRequestURI.getPath match { + case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl" => + sendJson( + exchange, + 200, + loadTableResponseJson("file:/tmp/uc/table") + .replace("\"data-source-format\": \"DELTA\"", "\"data-source-format\": null")) + case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl/credentials" => + fail("Unexpected credentials request for local path") + case path => + fail(s"Unexpected UC Delta Rest Catalog API request path: $path") + } + + val table = loadWithUCDeltaRestCatalogApi() + + assert(table.catalogTable.provider.isEmpty) + } + + test("loadTable falls back when UC Delta Rest Catalog API reports unsupported table format") { + handler = exchange => exchange.getRequestURI.getPath match { + case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl" => + sendJson(exchange, 501, + """{ + | "error": { + | "message": "Table exists but is not supported by the Delta endpoint.", + | "type": "UnsupportedTableFormatException", + | "code": 501 + | } + |}""".stripMargin) + case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl/credentials" => + fail("Unexpected credentials request after unsupported table format") + case path => + fail(s"Unexpected UC Delta Rest Catalog API request path: $path") + } + + val loaded = withUCDeltaRestCatalogApi { catalog => + catalog.loadTable(Identifier.of(Array("default"), "tbl")) + } + + assert(loaded.isEmpty) + } + + test("loadTable propagates generic UC Delta Rest Catalog API 501 errors") { + handler = exchange => exchange.getRequestURI.getPath match { + case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl" => + sendJson(exchange, 501, + """{ + | "error": { + | "message": "Not implemented.", + | "type": "NotImplementedException", + | "code": 501 + | } + |}""".stripMargin) + case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl/credentials" => + fail("Unexpected credentials request after loadTable failure") + case path => + fail(s"Unexpected UC Delta Rest Catalog API request path: $path") + } + + val error = intercept[IOException] { + loadWithUCDeltaRestCatalogApi() + } + + assert(error.getMessage.contains("Failed to load table uc.default.tbl")) + assert(error.getMessage.contains("HTTP 501")) + assert(error.getMessage.contains("NotImplementedException")) + } + + test("loadTable propagates UC Delta Rest Catalog API server errors") { + handler = exchange => exchange.getRequestURI.getPath match { + case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl" => + sendJson(exchange, 500, """{"error_code":"INTERNAL_ERROR"}""") + case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl/credentials" => + fail("Unexpected credentials request after loadTable failure") + case path => + fail(s"Unexpected UC Delta Rest Catalog API request path: $path") + } + + val error = intercept[IOException] { + loadWithUCDeltaRestCatalogApi() + } + + assert(error.getMessage.contains("Failed to load table uc.default.tbl")) + assert(error.getMessage.contains("HTTP 500")) + } + + test("apply fails when UC Delta Rest Catalog API is enabled but unsupported") { + configHandler = exchange => sendJson(exchange, 200, + """{ + | "endpoints": [], + | "protocol-version": "1.0" + |}""".stripMargin) + handler = exchange => + fail(s"Unexpected UC Delta Rest Catalog API request path: ${exchange.getRequestURI.getPath}") + + val error = intercept[IllegalArgumentException] { + withUCDeltaRestCatalogApi { catalog => + catalog.loadTable(Identifier.of(Array("default"), "tbl")) + } + } + + assert(error.getMessage.contains("UC Delta Rest Catalog API is enabled for catalog uc")) + assert(error.getMessage.contains( + "does not support the required UC Delta Rest Catalog API endpoints")) + } + + test("loadTable does not probe UC Delta Rest Catalog API when disabled") { + configHandler = exchange => + fail(s"Unexpected UC Delta Rest Catalog API config request: ${exchange.getRequestURI}") + handler = exchange => + fail(s"Unexpected UC Delta Rest Catalog API request path: ${exchange.getRequestURI.getPath}") + + withSQLConf( + "spark.sql.catalog.uc" -> "io.unitycatalog.spark.UCSingleCatalog", + "spark.sql.catalog.uc.uri" -> serverUri, + "spark.sql.catalog.uc.token" -> "mock-token") { + val catalog = UCDeltaCatalogClient(new TestDelegateCatalog, spark) + assert(catalog.loadTable(Identifier.of(Array("default"), "tbl")).isEmpty) + } + } + + test("apply fails when UC Delta Rest Catalog API is enabled without UC config") { + withSQLConf( + "spark.sql.catalog.uc" -> "io.unitycatalog.spark.UCSingleCatalog", + UCDeltaCatalogClient.deltaRestApiEnabledConf("uc") -> "true") { + val error = intercept[IllegalArgumentException] { + UCDeltaCatalogClient(new TestDelegateCatalog, spark) + } + assert(error.getMessage.contains("configuration is missing or incomplete")) + } + } + + private def loadWithUCDeltaRestCatalogApi(): V1Table = { + withUCDeltaRestCatalogApi { catalog => + catalog.loadTable(Identifier.of(Array("default"), "tbl")).get.asInstanceOf[V1Table] + } + } + + private def withUCDeltaRestCatalogApi[T](f: DeltaCatalogClient => T): T = { + withUCDeltaRestCatalogApi(new TestDelegateCatalog, renewCredentialEnabled = true)(f) + } + + private def withUCDeltaRestCatalogApiRenewalDisabled[T](f: DeltaCatalogClient => T): T = { + withUCDeltaRestCatalogApi(new TestDelegateCatalog, renewCredentialEnabled = false)(f) + } + + private def withUCDeltaRestCatalogApi[T]( + delegate: TableCatalog)( + f: DeltaCatalogClient => T): T = { + withUCDeltaRestCatalogApi(delegate, renewCredentialEnabled = true)(f) + } + + private def withUCDeltaRestCatalogApi[T]( + delegate: TableCatalog, + renewCredentialEnabled: Boolean)( + f: DeltaCatalogClient => T): T = { + withSQLConf( + "spark.sql.catalog.uc" -> "io.unitycatalog.spark.UCSingleCatalog", + "spark.sql.catalog.uc.uri" -> serverUri, + "spark.sql.catalog.uc.token" -> "mock-token", + UCDeltaCatalogClient.renewCredentialEnabledConf("uc") -> renewCredentialEnabled.toString, + UCDeltaCatalogClient.deltaRestApiEnabledConf("uc") -> "true") { + val catalog = UCDeltaCatalogClient(delegate, spark) + f(catalog) + } + } + + private def loadTableResponseJson( + location: String, + dataSourceFormat: String = "DELTA"): String = + s"""{ + | "metadata": { + | "data-source-format": "$dataSourceFormat", + | "table-type": "MANAGED", + | "table-uuid": "11111111-1111-1111-1111-111111111111", + | "location": "$location", + | "columns": { + | "type": "struct", + | "fields": [ + | { + | "name": "id", + | "type": "long", + | "nullable": false, + | "metadata": { + | "delta.columnMapping.id": 1, + | "delta.columnMapping.physicalName": "col-123" + | } + | } + | ] + | }, + | "partition-columns": [], + | "properties": { + | "delta.feature.catalogManaged": "supported", + | "$UC_TABLE_ID_KEY": "11111111-1111-1111-1111-111111111111" + | } + | }, + | "commits": [] + |}""".stripMargin + + private def s3CredentialsResponseJson(prefix: String, operation: String): String = + s"""{ + | "storage-credentials": [ + | { + | "prefix": "$prefix", + | "operation": "$operation", + | "config": { + | "s3.access-key-id": "ak", + | "s3.secret-access-key": "sk", + | "s3.session-token": "st" + | } + | } + | ] + |}""".stripMargin + + private def sendJson(exchange: HttpExchange, status: Int, body: String): Unit = { + val bytes = body.getBytes(StandardCharsets.UTF_8) + exchange.getResponseHeaders.add("Content-Type", "application/json") + exchange.sendResponseHeaders(status, bytes.length) + exchange.getResponseBody.write(bytes) + exchange.getResponseBody.close() + } + + private class TestDelegateCatalog extends TableCatalog { + override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {} + override def name(): String = "uc" + override def listTables(namespace: Array[String]): Array[Identifier] = Array.empty + override def loadTable(ident: Identifier): Table = + throw new IllegalStateException("unexpected loadTable call") + override def createTable( + ident: Identifier, + schema: StructType, + partitions: Array[org.apache.spark.sql.connector.expressions.Transform], + properties: java.util.Map[String, String]): Table = + throw new UnsupportedOperationException("not needed in this test") + override def alterTable(ident: Identifier, changes: TableChange*): Table = + throw new UnsupportedOperationException("not needed in this test") + override def dropTable(ident: Identifier): Boolean = + throw new UnsupportedOperationException("not needed in this test") + override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = + throw new UnsupportedOperationException("not needed in this test") + } + +} diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/catalog/UCDeltaRestCatalogApiSchemaConverterSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/catalog/UCDeltaRestCatalogApiSchemaConverterSuite.scala new file mode 100644 index 00000000000..e115e8d5856 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/delta/catalog/UCDeltaRestCatalogApiSchemaConverterSuite.scala @@ -0,0 +1,63 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.catalog + +import org.scalatest.funsuite.AnyFunSuite + +import org.apache.spark.sql.types.{ + ArrayType, + BooleanType, + DecimalType, + IntegerType, + LongType, + MapType, + MetadataBuilder, + StringType, + StructField, + StructType +} + +class UCDeltaRestCatalogApiSchemaConverterSuite extends AnyFunSuite { + + test("converts Delta schema JSON to Spark schema") { + val fieldMetadata = new MetadataBuilder() + .putLong("delta.columnMapping.id", 1L) + .putString("delta.columnMapping.physicalName", "col-1") + .build() + val inputSchema = StructType(Seq( + StructField("id", LongType, nullable = false, fieldMetadata), + StructField("amount", DecimalType(10, 2)), + StructField("values", ArrayType(IntegerType, containsNull = true)), + StructField("tags", MapType(StringType, BooleanType, valueContainsNull = false)), + StructField("nested", StructType(Seq(StructField("name", StringType)))))) + + val schema = UCDeltaRestCatalogApiSchemaConverter.toSparkType(inputSchema.json) + + assert(schema === inputSchema) + assert(!schema("id").nullable) + assert(schema("id").metadata.getLong("delta.columnMapping.id") === 1L) + assert(schema("id").metadata.getString("delta.columnMapping.physicalName") === "col-1") + } + + test("rejects missing schema JSON") { + val e = intercept[IllegalArgumentException] { + UCDeltaRestCatalogApiSchemaConverter.toSparkType(null) + } + + assert(e.getMessage === "UC Delta Rest Catalog API table schema is missing.") + } +} diff --git a/spark/unitycatalog/src/test/java/io/sparkuctest/S3CredentialFileSystem.java b/spark/unitycatalog/src/test/java/io/sparkuctest/S3CredentialFileSystem.java index c4e6d9c1501..6b2dec556d9 100644 --- a/spark/unitycatalog/src/test/java/io/sparkuctest/S3CredentialFileSystem.java +++ b/spark/unitycatalog/src/test/java/io/sparkuctest/S3CredentialFileSystem.java @@ -149,14 +149,28 @@ private void assertCredentials() { private synchronized AwsCredentialsProvider resolveProvider(Configuration conf) { if (provider != null) return provider; - String clazz = conf.get(S3A_CREDENTIALS_PROVIDER); - if (clazz == null) return null; - try { - provider = - (AwsCredentialsProvider) - Class.forName(clazz).getConstructor(Configuration.class).newInstance(conf); - } catch (Exception e) { - throw new RuntimeException("Failed to instantiate credential provider: " + clazz, e); + String classes = conf.get(S3A_CREDENTIALS_PROVIDER); + if (classes == null) return null; + + // S3A accepts a comma-separated provider chain. This fake filesystem only understands + // AWS SDK v2 providers; if none are present, assert static UC-vended Hadoop keys instead. + for (String clazz : classes.split(",")) { + String trimmed = clazz.trim(); + if (trimmed.isEmpty()) continue; + try { + Class candidate = Class.forName(trimmed); + if (!AwsCredentialsProvider.class.isAssignableFrom(candidate)) continue; + provider = + (AwsCredentialsProvider) + candidate.getConstructor(Configuration.class).newInstance(conf); + return provider; + } catch (ClassNotFoundException e) { + // Ignore providers that are valid for real S3A but absent from this test classpath. + } catch (NoSuchMethodException e) { + // Ignore AWS providers that real S3A can construct without a Hadoop Configuration. + } catch (ReflectiveOperationException e) { + throw new RuntimeException("Failed to instantiate credential provider: " + trimmed, e); + } } return provider; } diff --git a/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableIntegrationBaseTest.java b/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableIntegrationBaseTest.java index 0139e56c093..0b5cce98449 100644 --- a/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableIntegrationBaseTest.java +++ b/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableIntegrationBaseTest.java @@ -144,7 +144,10 @@ private SparkConf configureSparkWithUnityCatalog(SparkConf conf) { String catalogName = uc.catalogName(); return conf.set("spark.sql.catalog." + catalogName, "io.unitycatalog.spark.UCSingleCatalog") .set("spark.sql.catalog." + catalogName + ".uri", uc.serverUri()) - .set("spark.sql.catalog." + catalogName + ".token", uc.serverToken()); + .set("spark.sql.catalog." + catalogName + ".token", uc.serverToken()) + .set( + "spark.sql.catalog." + catalogName + ".deltaRestApi.enabled", + String.valueOf(isUCDeltaRestCatalogApiEnabled())); } /** Stop the SparkSession after all tests. */ diff --git a/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableReadTest.java b/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableReadTest.java index 66a9169ff35..a51d7535410 100644 --- a/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableReadTest.java +++ b/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableReadTest.java @@ -18,6 +18,7 @@ import java.util.List; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Assumptions; /** * Read operation test suite for Delta Table operations through Unity Catalog. @@ -123,6 +124,12 @@ public void testDeltaTableForPath(TableType tableType) throws Exception { "For managed tables, path-based access should fail"); } else { // For EXTERNAL tables, path-based access should work + // TODO: Enable remote external path reads after Delta wires DRC + // GET /delta/v1/temporary-path-credentials for delta.`path` access. + Assumptions.assumeFalse( + isUCRemoteConfigured(), + "Remote external delta.`path` reads require DRC temporary path credentials; " + + "this PR only wires DRC table credentials for named UC table reads."); S3CredentialFileSystem.credentialCheckEnabled = false; try { check( diff --git a/spark/unitycatalog/src/test/java/io/sparkuctest/UnityCatalogSupport.java b/spark/unitycatalog/src/test/java/io/sparkuctest/UnityCatalogSupport.java index c6633febd78..22a32975eaa 100644 --- a/spark/unitycatalog/src/test/java/io/sparkuctest/UnityCatalogSupport.java +++ b/spark/unitycatalog/src/test/java/io/sparkuctest/UnityCatalogSupport.java @@ -129,12 +129,19 @@ public ApiClient createApiClient() { public static final String UC_CATALOG_NAME = "UC_CATALOG_NAME"; public static final String UC_SCHEMA_NAME = "UC_SCHEMA_NAME"; public static final String UC_BASE_TABLE_LOCATION = "UC_BASE_TABLE_LOCATION"; + public static final String UC_DELTA_REST_CATALOG_API_ENABLED = + "UC_DELTA_REST_CATALOG_API_ENABLED"; protected static boolean isUCRemoteConfigured() { String ucRemote = System.getenv(UC_REMOTE); return ucRemote != null && ucRemote.equalsIgnoreCase("true"); } + protected static boolean isUCDeltaRestCatalogApiEnabled() { + String deltaRestApiEnabled = System.getenv(UC_DELTA_REST_CATALOG_API_ENABLED); + return deltaRestApiEnabled == null || deltaRestApiEnabled.equalsIgnoreCase("true"); + } + /** The Unity Catalog info instance for subclasses access */ private UnityCatalogInfo ucInfo = null; diff --git a/spark/unitycatalog/src/test/java/io/sparkuctest/UnityCatalogSupportTest.java b/spark/unitycatalog/src/test/java/io/sparkuctest/UnityCatalogSupportTest.java index 5ad795cae0d..cd1b62aaae5 100644 --- a/spark/unitycatalog/src/test/java/io/sparkuctest/UnityCatalogSupportTest.java +++ b/spark/unitycatalog/src/test/java/io/sparkuctest/UnityCatalogSupportTest.java @@ -18,6 +18,7 @@ import static io.sparkuctest.UnityCatalogSupport.UC_BASE_TABLE_LOCATION; import static io.sparkuctest.UnityCatalogSupport.UC_CATALOG_NAME; +import static io.sparkuctest.UnityCatalogSupport.UC_DELTA_REST_CATALOG_API_ENABLED; import static io.sparkuctest.UnityCatalogSupport.UC_REMOTE; import static io.sparkuctest.UnityCatalogSupport.UC_SCHEMA_NAME; import static io.sparkuctest.UnityCatalogSupport.UC_TOKEN; @@ -37,7 +38,13 @@ public class UnityCatalogSupportTest { private static final List ALL_ENVS = ImmutableList.of( - UC_REMOTE, UC_URI, UC_TOKEN, UC_CATALOG_NAME, UC_SCHEMA_NAME, UC_BASE_TABLE_LOCATION); + UC_REMOTE, + UC_URI, + UC_TOKEN, + UC_CATALOG_NAME, + UC_SCHEMA_NAME, + UC_BASE_TABLE_LOCATION, + UC_DELTA_REST_CATALOG_API_ENABLED); @Test public void testUnityCatalogInfo() throws Exception { @@ -176,6 +183,30 @@ public void testNoBaseTableLocation() throws Exception { }); } + @Test + public void testUCDeltaRestCatalogApiEnabledFromEnv() throws Exception { + withEnvTesting( + ImmutableMap.of(), + () -> { + TestingUCSupport uc = new TestingUCSupport(); + assertThat(uc.isUCDeltaRestCatalogApiEnabledForTest()).isTrue(); + }); + + withEnvTesting( + ImmutableMap.of(UC_DELTA_REST_CATALOG_API_ENABLED, "false"), + () -> { + TestingUCSupport uc = new TestingUCSupport(); + assertThat(uc.isUCDeltaRestCatalogApiEnabledForTest()).isFalse(); + }); + + withEnvTesting( + ImmutableMap.of(UC_DELTA_REST_CATALOG_API_ENABLED, "unexpected"), + () -> { + TestingUCSupport uc = new TestingUCSupport(); + assertThat(uc.isUCDeltaRestCatalogApiEnabledForTest()).isFalse(); + }); + } + public interface TestCall { void call() throws Exception; @@ -225,5 +256,9 @@ public UnityCatalogInfo accessUnityCatalogInfo() throws Exception { setupServer(); return unityCatalogInfo(); } + + public boolean isUCDeltaRestCatalogApiEnabledForTest() { + return isUCDeltaRestCatalogApiEnabled(); + } } } From 4b796b90cceeef08daadd479b35f7ebf11e2f46e Mon Sep 17 00:00:00 2001 From: Timothy Wang Date: Thu, 7 May 2026 05:06:04 +0000 Subject: [PATCH 3/3] spark: add UC Delta Rest Catalog API path credentials for raw path reads --- .../delta/catalog/AbstractDeltaCatalog.scala | 9 +- .../delta/catalog/DeltaCatalogClient.scala | 30 +++ .../sql/delta/catalog/DeltaTableV2.scala | 12 +- .../delta/catalog/UCDeltaCatalogClient.scala | 139 +++++++++---- .../ServerSidePlannedTable.scala | 4 + .../sql/delta/sources/DeltaDataSource.scala | 9 +- .../catalog/DeltaCatalogClientSuite.scala | 186 +++++++++++++++++- .../ServerSidePlannedTableSuite.scala | 29 +++ .../UCDeltaTableIntegrationBaseTest.java | 17 +- .../io/sparkuctest/UCDeltaTableReadTest.java | 14 +- 10 files changed, 386 insertions(+), 63 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala index b5dacd69a33..533f724e8f6 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala @@ -293,7 +293,14 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension "DeltaCatalog", "loadTable") { setVariantBlockingConfigIfUC() try { - val table = deltaCatalogClient.flatMap(_.loadTable(ident)).getOrElse(super.loadTable(ident)) + val table = + if (isPathIdentifier(ident)) { + loadPathTable(ident) + } else if (isIcebergPathIdentifier(ident)) { + newIcebergPathTable(ident) + } else { + deltaCatalogClient.flatMap(_.loadTable(ident)).getOrElse(super.loadTable(ident)) + } ServerSidePlannedTable.tryCreate(spark, ident, table, isUnityCatalog).foreach { sspt => return sspt diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalogClient.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalogClient.scala index e495152a129..516fea229d1 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalogClient.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalogClient.scala @@ -16,6 +16,9 @@ package org.apache.spark.sql.delta.catalog +import org.apache.hadoop.fs.Path + +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connector.catalog.{Identifier, Table} /** @@ -27,3 +30,30 @@ import org.apache.spark.sql.connector.catalog.{Identifier, Table} private[catalog] trait DeltaCatalogClient { def loadTable(ident: Identifier): Option[Table] } + +private[delta] object DeltaCatalogClient { + private[catalog] val UCDeltaRestCatalogApiEnabledKey = + UCDeltaCatalogClient.UCDeltaRestCatalogApiEnabledKey + private[catalog] val RenewCredentialEnabledKey = + UCDeltaCatalogClient.RenewCredentialEnabledKey + private[catalog] val CredScopedFsEnabledKey = + UCDeltaCatalogClient.CredScopedFsEnabledKey + + private[catalog] def deltaRestApiEnabledConf(catalogName: String): String = { + UCDeltaCatalogClient.deltaRestApiEnabledConf(catalogName) + } + + private[catalog] def renewCredentialEnabledConf(catalogName: String): String = { + UCDeltaCatalogClient.renewCredentialEnabledConf(catalogName) + } + + private[catalog] def credScopedFsEnabledConf(catalogName: String): String = { + UCDeltaCatalogClient.credScopedFsEnabledConf(catalogName) + } + + private[delta] def pathCredentialOptions( + spark: SparkSession, + path: Path): Map[String, String] = { + UCDeltaCatalogClient.pathCredentialOptions(spark, path) + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala index 9508477ad28..796ce96c633 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala @@ -86,11 +86,19 @@ class DeltaTableV2 private( PathInfo(new Path(catalogTable.get.location), Seq.empty, None) } else { val (rootPath, filters, timeTravel) = - DeltaDataSource.parsePathIdentifier(spark, path.toString, options) + DeltaDataSource.parsePathIdentifier(spark, path.toString, pathBasedOptions) PathInfo(rootPath, filters, timeTravel) } } + private lazy val pathBasedOptions: Map[String, String] = { + if (catalogTable.isDefined) { + options + } else { + DeltaCatalogClient.pathCredentialOptions(spark, path) ++ options + } + } + private def rootPath = pathInfo.rootPath private def partitionFilters = pathInfo.partitionFilters @@ -122,7 +130,7 @@ class DeltaTableV2 private( } fileSystemOptions ++ options } else { - options + pathBasedOptions } DeltaLog.forTable( spark, diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/UCDeltaCatalogClient.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/UCDeltaCatalogClient.scala index 314eb811d57..e5b68a59e5b 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/UCDeltaCatalogClient.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/UCDeltaCatalogClient.scala @@ -28,7 +28,7 @@ import io.delta.storage.commit.uccommitcoordinator.UCDeltaClient import io.unitycatalog.client.ApiException import io.unitycatalog.client.auth.TokenProvider import io.unitycatalog.hadoop.UCCredentialHadoopConfs -import io.unitycatalog.hadoop.UCCredentialHadoopConfs.TableOperation +import io.unitycatalog.hadoop.UCCredentialHadoopConfs.{PathOperation, TableOperation} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -230,13 +230,14 @@ private case class UCDeltaRestCatalogApiCredentialContext( hadoopConf: Configuration, appVersions: JMap[String, String]) -private object UCDeltaCatalogClient { +private[catalog] object UCDeltaCatalogClient { private[catalog] val UCDeltaRestCatalogApiEnabledKey = "deltaRestApi.enabled" private[catalog] val RenewCredentialEnabledKey = "renewCredential.enabled" private[catalog] val CredScopedFsEnabledKey = "credScopedFs.enabled" private val ManagedTableType = "MANAGED" private val ExternalTableType = "EXTERNAL" private val UnsupportedTableFormatExceptionType = "UnsupportedTableFormatException" + private val DefaultCatalogConf = "spark.sql.defaultCatalog" private val DefaultRenewCredentialEnabled = true private val DefaultCredScopedFsEnabled = false private val CloudSchemes = Set("s3", "s3a", "gs", "abfs", "abfss") @@ -257,6 +258,43 @@ private object UCDeltaCatalogClient { Option(scheme).exists(s => CloudSchemes.contains(s.toLowerCase(Locale.ROOT))) } + /** + * Returns UC Delta Rest Catalog API path credential options for raw path-based Delta access. + * + * Path-based access has no catalog identifier, so this uses the UC Delta Rest Catalog API-enabled + * default catalog as the credential authority. If the session has no such default catalog, path + * reads keep their original options. + */ + private[delta] def pathCredentialOptions( + spark: SparkSession, + path: Path): Map[String, String] = { + val location = path.toString + val locationScheme = path.toUri.getScheme + if (!isCloudScheme(locationScheme)) { + return Map.empty[String, String] + } + + selectedUCDeltaRestCatalogApiConfigForPathCredentials(spark) + .map { context => + try { + buildHadoopCredentialPropertiesForPath(location, locationScheme, context) + } catch { + case e: ApiException if e.getCode == 404 => + Map.empty[String, String] + } + } + .getOrElse(Map.empty[String, String]) + } + + private def selectedUCDeltaRestCatalogApiConfigForPathCredentials( + spark: SparkSession): Option[UCDeltaRestCatalogApiCredentialContext] = { + spark.conf.getOption(DefaultCatalogConf) + .filter(_.nonEmpty) + .filter(catalogName => + spark.conf.get(deltaRestApiEnabledConf(catalogName), "false").toBoolean) + .flatMap(catalogName => ucDeltaRestCatalogApiCredentialContext(spark, catalogName)) + } + private def reflectedString(metadata: AbstractMetadata, methodName: String): String = { try { metadata.getClass.getMethod(methodName).invoke(metadata).asInstanceOf[String] @@ -274,44 +312,67 @@ private object UCDeltaCatalogClient { } } + private def ucDeltaRestCatalogApiCredentialContext( + spark: SparkSession, + catalogName: String): Option[UCDeltaRestCatalogApiCredentialContext] = { + if (!spark.conf.get(deltaRestApiEnabledConf(catalogName), "false").toBoolean) { + return None + } + + val (_, uri, authConfig) = UCCommitCoordinatorBuilder.getCatalogConfigs(spark) + .collectFirst { case (`catalogName`, configuredUri, configuredAuthConfig) => + (catalogName, configuredUri, configuredAuthConfig) + } + .getOrElse { + throw new IllegalArgumentException( + "UC Delta Rest Catalog API is enabled for catalog " + + s"$catalogName, but its Unity Catalog " + + "configuration is missing or incomplete.") + } + val tokenProvider = TokenProvider.create(authConfig.asJava) + // Catalog load has no DeltaLog yet, so pass the Spark session Hadoop conf to the UC + // credential builder. + // scalastyle:off deltahadoopconfiguration + val hadoopConf = spark.sessionState.newHadoopConf() + // scalastyle:on deltahadoopconfiguration + Some(UCDeltaRestCatalogApiCredentialContext( + uri.toString, + tokenProvider, + spark.conf.get( + renewCredentialEnabledConf(catalogName), + DefaultRenewCredentialEnabled.toString).toBoolean, + spark.conf.get( + credScopedFsEnabledConf(catalogName), + DefaultCredScopedFsEnabled.toString).toBoolean, + hadoopConf, + UCTokenBasedRestClientFactory.defaultAppVersionsAsJava)) + } + + private def buildHadoopCredentialPropertiesForPath( + location: String, + locationScheme: String, + credentialContext: UCDeltaRestCatalogApiCredentialContext): Map[String, String] = { + UCCredentialHadoopConfs.builder( + credentialContext.uri, + locationScheme.toLowerCase(Locale.ROOT)) + .tokenProvider(credentialContext.tokenProvider) + .addAppVersions(credentialContext.appVersions) + .enableCredentialRenewal(credentialContext.renewCredentialEnabled) + .enableCredentialScopedFs(credentialContext.credScopedFsEnabled) + .hadoopConf(credentialContext.hadoopConf) + .buildForPath(location, PathOperation.PATH_READ) + .asScala + .toMap + } + def apply(delegatePlugin: CatalogPlugin, spark: SparkSession): UCDeltaCatalogClient = { val catalogName = delegatePlugin.name() - var credentialContext = Option.empty[UCDeltaRestCatalogApiCredentialContext] - val ucDeltaClient = if (spark.conf - .get(deltaRestApiEnabledConf(catalogName), "false") - .toBoolean) { - val (_, uri, authConfig) = UCCommitCoordinatorBuilder.getCatalogConfigs(spark) - .collectFirst { case (`catalogName`, configuredUri, configuredAuthConfig) => - (catalogName, configuredUri, configuredAuthConfig) - } - .getOrElse { - throw new IllegalArgumentException( - "UC Delta Rest Catalog API is enabled for catalog " + - s"$catalogName, but its Unity Catalog " + - "configuration is missing or incomplete.") - } - val tokenProvider = TokenProvider.create(authConfig.asJava) - // Catalog load has no DeltaLog yet, so pass the Spark session Hadoop conf to the UC - // credential builder. - // scalastyle:off deltahadoopconfiguration - val hadoopConf = spark.sessionState.newHadoopConf() - // scalastyle:on deltahadoopconfiguration - val appVersions = UCTokenBasedRestClientFactory.defaultAppVersionsAsJava - credentialContext = Some(UCDeltaRestCatalogApiCredentialContext( - uri.toString, - tokenProvider, - spark.conf.get( - renewCredentialEnabledConf(catalogName), - DefaultRenewCredentialEnabled.toString).toBoolean, - spark.conf.get( - credScopedFsEnabledConf(catalogName), - DefaultCredScopedFsEnabled.toString).toBoolean, - hadoopConf, - appVersions)) + val credentialContext = ucDeltaRestCatalogApiCredentialContext(spark, catalogName) + val ucDeltaClient = credentialContext.flatMap { context => UCTokenBasedRestClientFactory.createUCDeltaClient( - uri, - tokenProvider, - appVersions, + context.uri, + context.tokenProvider, + context.appVersions, catalogName) match { case Some(client) if client.supportsUCDeltaRestCatalogApi() => Some(client) @@ -322,9 +383,7 @@ private object UCDeltaCatalogClient { "Catalog server does not support the required UC Delta Rest Catalog API endpoints.") case None => None - } - } else { - None + } } new UCDeltaCatalogClient(ucDeltaClient, catalogName, credentialContext) } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlannedTable.scala b/spark/src/main/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlannedTable.scala index e6e0057fcaf..1da1ee756bd 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlannedTable.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlannedTable.scala @@ -95,6 +95,10 @@ object ServerSidePlannedTable extends DeltaLogging { // Check if we should enable server-side planning (for testing) val enableServerSidePlanning = spark.conf.get(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key, "false").toBoolean + if (!enableServerSidePlanning) { + return None + } + val hasTableCredentials = hasCredentials(table) // Check if we should use server-side planning diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala index 96c222d1c85..d275d37b290 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala @@ -24,7 +24,7 @@ import scala.util.{Failure, Success, Try} import com.databricks.spark.util.DatabricksLogging import org.apache.spark.internal.MDC import org.apache.spark.sql.delta._ -import org.apache.spark.sql.delta.catalog.DeltaTableV2 +import org.apache.spark.sql.delta.catalog.{DeltaCatalogClient, DeltaTableV2} import org.apache.spark.sql.delta.commands.{ DeltaInsertReplaceOnOrUsingCommand, InsertReplaceOnOrUsingAPIOrigin, @@ -95,7 +95,12 @@ class DeltaDataSource catalogTableOpt .map(catalogTable => DeltaLog.forTableWithSnapshot( sparkSession, catalogTable, options)) - .getOrElse(DeltaLog.forTableWithSnapshot(sparkSession, path, options))._2 + .getOrElse { + DeltaLog.forTableWithSnapshot( + sparkSession, + path, + DeltaCatalogClient.pathCredentialOptions(sparkSession, path) ++ options) + }._2 } def inferSchema: StructType = new StructType() // empty diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/catalog/DeltaCatalogClientSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/catalog/DeltaCatalogClientSuite.scala index 28fcb000f58..32c7e39fafc 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/catalog/DeltaCatalogClientSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/catalog/DeltaCatalogClientSuite.scala @@ -17,13 +17,14 @@ package org.apache.spark.sql.delta.catalog import java.io.IOException -import java.net.InetSocketAddress +import java.net.{InetSocketAddress, URLDecoder} import java.nio.charset.StandardCharsets import scala.collection.JavaConverters._ import com.sun.net.httpserver.{HttpExchange, HttpServer} import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorClient.UC_TABLE_ID_KEY +import org.apache.hadoop.fs.Path import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} @@ -49,6 +50,7 @@ class DeltaCatalogClientSuite private var serverUri: String = _ private var configHandler: HttpExchange => Unit = _ private var handler: HttpExchange => Unit = _ + private var pathCredentialsHandler: HttpExchange => Unit = _ private var credentialRequestCount: Int = _ private val AwsVendedTokenProviderClass = @@ -56,6 +58,10 @@ class DeltaCatalogClientSuite private val S3ACredentialsProviderKey = "fs.s3a.aws.credentials.provider" private val S3AInitAccessKey = "fs.s3a.init.access.key" private val UCTableOperationKey = "fs.unitycatalog.table.operation" + private val UCCredentialsTypeKey = "fs.unitycatalog.credentials.type" + private val UCCredentialsTypePathValue = "path" + private val UCPathKey = "fs.unitycatalog.path" + private val UCPathOperationKey = "fs.unitycatalog.path.operation" override def beforeAll(): Unit = { super.beforeAll() @@ -68,8 +74,7 @@ class DeltaCatalogClientSuite sendJson(exchange, 200, """{ | "endpoints": [ - | "GET /v1/catalogs/{catalog}/schemas/{schema}/tables/{table}", - | "GET /v1/catalogs/{catalog}/schemas/{schema}/tables/{table}/credentials" + | "GET /v1/catalogs/{catalog}/schemas/{schema}/tables/{table}" | ], | "protocol-version": "1.0" |}""".stripMargin) @@ -85,6 +90,17 @@ class DeltaCatalogClientSuite exchange.close() } }) + server.createContext("/api/2.1/unity-catalog/temporary-path-credentials", exchange => { + try { + if (pathCredentialsHandler != null) { + pathCredentialsHandler(exchange) + } else { + sendJson(exchange, 404, "{}") + } + } finally { + exchange.close() + } + }) server.start() serverUri = s"http://localhost:${server.getAddress.getPort}" } @@ -98,6 +114,7 @@ class DeltaCatalogClientSuite super.beforeEach() configHandler = null handler = null + pathCredentialsHandler = null credentialRequestCount = 0 } @@ -359,6 +376,135 @@ class DeltaCatalogClientSuite } } + test("loadTable skips UC Delta Rest Catalog API for delta path identifiers") { + handler = exchange => + fail(s"Unexpected UC Delta Rest Catalog API table request: ${exchange.getRequestURI}") + + withUCDeltaRestCatalogApi { catalog => + assert(catalog.loadTable( + Identifier.of(Array("delta"), "s3://bucket/path/to/table")).isEmpty) + } + } + + test( + "pathCredentialOptions returns UC Delta Rest Catalog API path credential properties " + + "for cloud paths") { + configHandler = exchange => { + assert(queryParams(exchange)("catalog") === "uc") + sendJson(exchange, 200, + """{ + | "endpoints": [ + | "GET /v1/catalogs/{catalog}/schemas/{schema}/tables/{table}" + | ], + | "protocol-version": "1.0" + |}""".stripMargin) + } + pathCredentialsHandler = exchange => { + assert(exchange.getRequestMethod === "POST") + assertJsonContains(exchange, Seq( + "\"url\":\"s3://bucket/path/to/table\"", + "\"operation\":\"PATH_READ\"")) + sendJson(exchange, 200, s3TemporaryCredentialsResponseJson()) + } + + withSQLConf( + "spark.sql.catalog.uc" -> "io.unitycatalog.spark.UCSingleCatalog", + "spark.sql.catalog.uc.uri" -> serverUri, + "spark.sql.catalog.uc.token" -> "mock-token", + "spark.sql.defaultCatalog" -> "uc", + DeltaCatalogClient.deltaRestApiEnabledConf("uc") -> "true") { + val props = DeltaCatalogClient.pathCredentialOptions( + spark, + new Path("s3://bucket/path/to/table")) + + assert(props(S3ACredentialsProviderKey) === + AwsVendedTokenProviderClass) + assert(props(S3AInitAccessKey) === "ak") + assert(props(UCCredentialsTypeKey) === UCCredentialsTypePathValue) + assert(props(UCPathKey) === + "s3://bucket/path/to/table") + assert(props(UCPathOperationKey) === "PATH_READ") + } + } + + test("pathCredentialOptions returns empty when path credentials are unavailable") { + configHandler = exchange => { + assert(queryParams(exchange)("catalog") === "uc") + sendJson(exchange, 200, + """{ + | "endpoints": [ + | "GET /v1/catalogs/{catalog}/schemas/{schema}/tables/{table}" + | ], + | "protocol-version": "1.0" + |}""".stripMargin) + } + pathCredentialsHandler = exchange => sendJson(exchange, 404, "{}") + + withSQLConf( + "spark.sql.catalog.uc" -> "io.unitycatalog.spark.UCSingleCatalog", + "spark.sql.catalog.uc.uri" -> serverUri, + "spark.sql.catalog.uc.token" -> "mock-token", + "spark.sql.defaultCatalog" -> "uc", + DeltaCatalogClient.deltaRestApiEnabledConf("uc") -> "true") { + val props = DeltaCatalogClient.pathCredentialOptions( + spark, + new Path("s3://bucket/path/to/table")) + + assert(props.isEmpty) + } + } + + test("pathCredentialOptions returns empty when path is not governed by UC") { + configHandler = exchange => { + assert(queryParams(exchange)("catalog") === "uc") + sendJson(exchange, 200, + """{ + | "endpoints": [ + | "GET /v1/catalogs/{catalog}/schemas/{schema}/tables/{table}" + | ], + | "protocol-version": "1.0" + |}""".stripMargin) + } + pathCredentialsHandler = exchange => { + assert(exchange.getRequestMethod === "POST") + assertJsonContains(exchange, Seq("\"url\":\"s3://other-bucket/path/to/table\"")) + sendJson(exchange, 404, """{"error_code":"NOT_FOUND"}""") + } + + withSQLConf( + "spark.sql.catalog.uc" -> "io.unitycatalog.spark.UCSingleCatalog", + "spark.sql.catalog.uc.uri" -> serverUri, + "spark.sql.catalog.uc.token" -> "mock-token", + "spark.sql.defaultCatalog" -> "uc", + DeltaCatalogClient.deltaRestApiEnabledConf("uc") -> "true") { + val props = DeltaCatalogClient.pathCredentialOptions( + spark, + new Path("s3://other-bucket/path/to/table")) + + assert(props.isEmpty) + } + } + + test( + "pathCredentialOptions returns empty when no UC Delta Rest Catalog API catalog is " + + "configured") { + configHandler = exchange => + fail(s"Unexpected UC Delta Rest Catalog API config request: ${exchange.getRequestURI}") + pathCredentialsHandler = exchange => + fail(s"Unexpected temporary path credentials request: ${exchange.getRequestURI}") + + withSQLConf( + "spark.sql.catalog.uc" -> "io.unitycatalog.spark.UCSingleCatalog", + "spark.sql.catalog.uc.uri" -> serverUri, + "spark.sql.catalog.uc.token" -> "mock-token") { + val props = DeltaCatalogClient.pathCredentialOptions( + spark, + new Path("s3://bucket/path/to/table")) + + assert(props.isEmpty) + } + } + private def loadWithUCDeltaRestCatalogApi(): V1Table = { withUCDeltaRestCatalogApi { catalog => catalog.loadTable(Identifier.of(Array("default"), "tbl")).get.asInstanceOf[V1Table] @@ -441,6 +587,40 @@ class DeltaCatalogClientSuite | ] |}""".stripMargin + private def s3TemporaryCredentialsResponseJson(): String = + """{ + | "aws_temp_credentials": { + | "access_key_id": "ak", + | "secret_access_key": "sk", + | "session_token": "st" + | }, + | "expiration_time": 1710000000000 + |}""".stripMargin + + private def assertJsonContains(exchange: HttpExchange, expectedSnippets: Seq[String]): Unit = { + val body = new String(exchange.getRequestBody.readAllBytes(), StandardCharsets.UTF_8) + .replaceAll("\\s+", "") + expectedSnippets.foreach { snippet => + assert(body.contains(snippet), s"Expected request body $body to contain $snippet") + } + } + + private def queryParams(exchange: HttpExchange): Map[String, String] = { + Option(exchange.getRequestURI.getRawQuery).toSeq + .flatMap(_.split("&")) + .filter(_.nonEmpty) + .map { kv => + val pair = kv.split("=", 2) + val key = URLDecoder.decode(pair(0), StandardCharsets.UTF_8) + val value = if (pair.length == 2) { + URLDecoder.decode(pair(1), StandardCharsets.UTF_8) + } else { + "" + } + key -> value + }.toMap + } + private def sendJson(exchange: HttpExchange, status: Int, body: String): Unit = { val bytes = body.getBytes(StandardCharsets.UTF_8) exchange.getResponseHeaders.add("Content-Type", "application/json") diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlannedTableSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlannedTableSuite.scala index 231ea7493ed..dcf94676748 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlannedTableSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlannedTableSuite.scala @@ -17,9 +17,11 @@ package org.apache.spark.sql.delta.serverSidePlanning import org.apache.spark.sql.{AnalysisException, QueryTest, Row} +import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCapability} import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.sources.{And, EqualTo, Filter, GreaterThan, LessThan} +import org.apache.spark.sql.types.StructType /** * Tests for server-side planning with a mock client. @@ -142,6 +144,33 @@ class ServerSidePlannedTableSuite extends QueryTest with DeltaSQLCommandTest { s"Expected normal table but got ServerSidePlannedTable when config is disabled") } + test("disabled server-side planning does not inspect table credentials") { + val throwingTable = new Table { + override def name(): String = "throwing_table" + override def schema(): StructType = new StructType() + override def properties(): java.util.Map[String, String] = + throw new IllegalStateException("properties should not be called") + override def capabilities(): java.util.Set[TableCapability] = + java.util.Collections.emptySet() + } + + val originalConfig = spark.conf.getOption(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key) + spark.conf.set(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key, "false") + try { + val plannedTable = ServerSidePlannedTable.tryCreate( + spark, + Identifier.of(Array("db"), "throwing_table"), + throwingTable, + isUnityCatalog = true) + assert(plannedTable.isEmpty) + } finally { + originalConfig match { + case Some(value) => spark.conf.set(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key, value) + case None => spark.conf.unset(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key) + } + } + } + test("shouldUseServerSidePlanning() decision logic") { // ============================================================ // Production mode: skipUCRequirementForTests = false diff --git a/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableIntegrationBaseTest.java b/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableIntegrationBaseTest.java index 0b5cce98449..fb7dac55a41 100644 --- a/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableIntegrationBaseTest.java +++ b/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableIntegrationBaseTest.java @@ -142,12 +142,17 @@ private SparkConf configureSparkWithUnityCatalog(SparkConf conf) { // Set the catalog specific configs. UnityCatalogInfo uc = unityCatalogInfo(); String catalogName = uc.catalogName(); - return conf.set("spark.sql.catalog." + catalogName, "io.unitycatalog.spark.UCSingleCatalog") - .set("spark.sql.catalog." + catalogName + ".uri", uc.serverUri()) - .set("spark.sql.catalog." + catalogName + ".token", uc.serverToken()) - .set( - "spark.sql.catalog." + catalogName + ".deltaRestApi.enabled", - String.valueOf(isUCDeltaRestCatalogApiEnabled())); + conf = + conf.set("spark.sql.catalog." + catalogName, "io.unitycatalog.spark.UCSingleCatalog") + .set("spark.sql.catalog." + catalogName + ".uri", uc.serverUri()) + .set("spark.sql.catalog." + catalogName + ".token", uc.serverToken()) + .set( + "spark.sql.catalog." + catalogName + ".deltaRestApi.enabled", + String.valueOf(isUCDeltaRestCatalogApiEnabled())); + if (isUCRemoteConfigured()) { + conf.set("spark.sql.defaultCatalog", catalogName); + } + return conf; } /** Stop the SparkSession after all tests. */ diff --git a/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableReadTest.java b/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableReadTest.java index a51d7535410..8b17c275f2f 100644 --- a/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableReadTest.java +++ b/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableReadTest.java @@ -18,7 +18,6 @@ import java.util.List; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Assumptions; /** * Read operation test suite for Delta Table operations through Unity Catalog. @@ -123,14 +122,11 @@ public void testDeltaTableForPath(TableType tableType) throws Exception { () -> sql("SELECT * FROM delta.`%s`", tablePath), "For managed tables, path-based access should fail"); } else { - // For EXTERNAL tables, path-based access should work - // TODO: Enable remote external path reads after Delta wires DRC - // GET /delta/v1/temporary-path-credentials for delta.`path` access. - Assumptions.assumeFalse( - isUCRemoteConfigured(), - "Remote external delta.`path` reads require DRC temporary path credentials; " - + "this PR only wires DRC table credentials for named UC table reads."); - S3CredentialFileSystem.credentialCheckEnabled = false; + // Local UC OSS does not implement the UC Delta Rest Catalog API path credentials + // handler yet. Remote runs keep credential checks enabled and validate UC Delta Rest + // Catalog API path credential + // propagation. + S3CredentialFileSystem.credentialCheckEnabled = isUCRemoteConfigured(); try { check( sql("SELECT * FROM delta.`%s` ORDER BY id", tablePath),