From 13e9a38bc715648b87847ac213bb17b2dcaf852c Mon Sep 17 00:00:00 2001 From: Yi Li Date: Fri, 15 May 2026 12:59:27 -0700 Subject: [PATCH 1/3] [Spark] Route Delta loadTable through the Delta REST API When `deltaRestApi.enabled` is set on a UC catalog, AbstractDeltaCatalog routes table loads through UCDeltaCatalogClientImpl, backed by UCDeltaTokenBasedRestClient. Non-Delta tables fall back to the legacy delegate. Signed-off-by: Yi Li --- .../delta/catalog/AbstractDeltaCatalog.scala | 23 +- .../catalog/AbstractDeltaCatalogClient.scala | 84 +++++++ .../catalog/UCDeltaCatalogClientImpl.scala | 231 ++++++++++++++++++ ...stractDeltaCatalogClientRoutingSuite.scala | 198 +++++++++++++++ .../UCDeltaTableIntegrationBaseTest.java | 58 ++++- 5 files changed, 590 insertions(+), 4 deletions(-) create mode 100644 spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalogClient.scala create mode 100644 spark/src/main/scala/org/apache/spark/sql/delta/catalog/UCDeltaCatalogClientImpl.scala create mode 100644 spark/src/test/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalogClientRoutingSuite.scala diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala index 86a0f9b4ee9..773b2876037 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala @@ -63,6 +63,7 @@ import org.apache.spark.sql.execution.datasources.{DataSource, PartitioningUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.InsertableRelation import org.apache.spark.sql.types.{IntegerType, StructField, StructType} +import org.apache.spark.sql.util.CaseInsensitiveStringMap /** @@ -83,6 +84,22 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension val spark = SparkSession.active + /** + * When non-null, table operations are routed through this client instead of through the + * [[org.apache.spark.sql.connector.catalog.DelegatingCatalogExtension]] delegate that + * `AbstractDeltaCatalog` normally relies on. This lets the catalog inject custom + * interactions (e.g. talking to a REST endpoint, catalog-specific property handling, + * storage-credential vending) rather than going through the Spark + * [[org.apache.spark.sql.connector.catalog.TableCatalog]] API. + */ + private[catalog] var deltaCatalogClient: AbstractDeltaCatalogClient = null + + override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = { + super.initialize(name, options) + deltaCatalogClient = + AbstractDeltaCatalogClient.fromCatalogOptionsIfEnabled(name, options, super.loadTable) + } + private lazy val isUnityCatalog: Boolean = { val delegateField = classOf[DelegatingCatalogExtension].getDeclaredField("delegate") delegateField.setAccessible(true) @@ -290,7 +307,11 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension "DeltaCatalog", "loadTable") { setVariantBlockingConfigIfUC() try { - val table = super.loadTable(ident) + val table = if (deltaCatalogClient != null) { + deltaCatalogClient.loadTable(ident) + } else { + super.loadTable(ident) + } ServerSidePlannedTable.tryCreate(spark, ident, table, isUnityCatalog).foreach { sspt => return sspt diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalogClient.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalogClient.scala new file mode 100644 index 00000000000..74e06115f71 --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalogClient.scala @@ -0,0 +1,84 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.catalog + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.connector.catalog.{Identifier, Table} +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +/** + * Backend hook through which [[AbstractDeltaCatalog]] injects custom catalog interactions + * that bypass the catalog operations normally provided by Spark's + * [[org.apache.spark.sql.connector.catalog.TableCatalog]] interface (the + * [[org.apache.spark.sql.connector.catalog.DelegatingCatalogExtension]] delegate that + * `AbstractDeltaCatalog` extends). Concrete implementations route table operations to a + * catalog-specific path, e.g. talking directly to a REST endpoint instead of the + * configured delegate, applying catalog-specific table-property handling, or vending + * storage credentials on the returned [[Table]]. Keeping these behind a client interface + * isolates that plumbing from `AbstractDeltaCatalog`. + */ +private[catalog] trait AbstractDeltaCatalogClient { + + /** + * @throws org.apache.spark.sql.catalyst.analysis.NoSuchTableException if the catalog has + * no record of this identifier + */ + def loadTable(ident: Identifier): Table +} + +/** Builds a [[AbstractDeltaCatalogClient]] from catalog options. */ +private[catalog] trait AbstractDeltaCatalogClientFactory { + def fromCatalogOptions( + catalogName: String, + options: CaseInsensitiveStringMap, + fallbackLoadTable: Identifier => Table): AbstractDeltaCatalogClient +} + +private[catalog] object AbstractDeltaCatalogClient extends Logging { + + private val UC_DELTA_REST_API_ENABLED_KEY: String = "deltaRestApi.enabled" + private val UC_DELTA_CATALOG_CLIENT_IMPL_CLASS_NAME: String = + "org.apache.spark.sql.delta.catalog.UCDeltaCatalogClientImpl" + + /** + * Returns a [[AbstractDeltaCatalogClient]] when the catalog opted in via `deltaRestApi.enabled`, + * else `null`. The concrete impl is loaded reflectively so [[AbstractDeltaCatalog]] doesn't + * compile-depend on it; environments that don't ship [[UCDeltaCatalogClientImpl]] degrade + * to `null`. + */ + def fromCatalogOptionsIfEnabled( + catalogName: String, + options: CaseInsensitiveStringMap, + fallbackLoadTable: Identifier => Table): AbstractDeltaCatalogClient = { + if (options.getBoolean(UC_DELTA_REST_API_ENABLED_KEY, false)) { + val factory = try { + // scalastyle:off classforname + val cls = Class.forName(UC_DELTA_CATALOG_CLIENT_IMPL_CLASS_NAME + "$") + // scalastyle:on classforname + cls.getField("MODULE$").get(null).asInstanceOf[AbstractDeltaCatalogClientFactory] + } catch { + case _: ClassNotFoundException => + logWarning(s"'$UC_DELTA_REST_API_ENABLED_KEY' is true but " + + s"$UC_DELTA_CATALOG_CLIENT_IMPL_CLASS_NAME is not on the classpath; skipping it.") + return null + } + factory.fromCatalogOptions(catalogName, options, fallbackLoadTable) + } else { + null + } + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/UCDeltaCatalogClientImpl.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/UCDeltaCatalogClientImpl.scala new file mode 100644 index 00000000000..0b8ae731427 --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/UCDeltaCatalogClientImpl.scala @@ -0,0 +1,231 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.catalog + +import java.net.URI +import java.util.concurrent.atomic.AtomicLong +import java.util.function.Supplier + +import scala.jdk.CollectionConverters._ + +import io.delta.storage.commit.{TableIdentifier => StorageTableIdentifier} +import io.delta.storage.commit.uccommitcoordinator.{ + UCDeltaClient, + UCDeltaModels, + UCDeltaTokenBasedRestClient +} +import io.delta.storage.commit.uccommitcoordinator.UCDeltaModels.TableInfo +import io.delta.storage.commit.uccommitcoordinator.exceptions.{ + CredentialFetchFailedException, + UnsupportedTableFormatException, + NoSuchTableException => StorageNoSuchTableException +} +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException +import org.apache.spark.sql.catalyst.catalog.{ + CatalogStorageFormat, + CatalogTable, + CatalogTableType +} +import org.apache.spark.sql.connector.catalog.{Identifier, Table, V1Table} +import org.apache.spark.sql.delta.coordinatedcommits.UCTokenBasedRestClientFactory +import org.apache.spark.sql.delta.logging.DeltaLogKeys +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +/** + * [[AbstractDeltaCatalogClient]] backed by a [[UCDeltaClient]]; translates between + * Spark/Delta types and the storage-side UC types. + */ +private[catalog] class UCDeltaCatalogClientImpl( + catalogName: String, + ucClient: UCDeltaClient, + serverSidePlanningEnabled: Boolean = false, + fallbackLoadTable: Identifier => Table = UCDeltaCatalogClientImpl.defaultFallbackLoadTable) + extends AbstractDeltaCatalogClient with Logging { + + override def loadTable(ident: Identifier): Table = { + UCDeltaCatalogClientImpl.LOAD_TABLE_INVOCATIONS.incrementAndGet() + val tid = toStorageTableIdent(ident) + val info = + try ucClient.loadTable(tid) + catch { + case _: StorageNoSuchTableException => throw new NoSuchTableException(ident) + case e: UnsupportedTableFormatException => + logInfo(log"Table ${MDC(DeltaLogKeys.TABLE_NAME, ident)} is not in Delta format; " + + log"falling back to the legacy catalog path. Cause: " + + log"${MDC(DeltaLogKeys.EXCEPTION, e.getMessage)}") + return fallbackLoadTable(ident) + case e: CredentialFetchFailedException if serverSidePlanningEnabled => + logWarning( + s"Credential fetch failed for ${fullQualifiedTableName(tid)}; enabling " + + s"server-side planning fallback. Cause: ${e.getMessage}") + enableServerSidePlanningConfig(ident) + e.getTableInfoWithoutCredentials + } + UCDeltaCatalogClientImpl.SUCCESSFUL_DELTA_REST_API_LOADS.incrementAndGet() + toV1Table(ident, info) + } + + private def enableServerSidePlanningConfig(ident: Identifier): Unit = { + SparkSession.getActiveSession match { + case Some(spark) => + spark.conf.set(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key, "true") + logInfo(log"Server-side planning enabled for table " + + log"${MDC(DeltaLogKeys.TABLE_NAME, ident)}; Delta will read via SSP with empty creds.") + case None => + logWarning(log"Server-side planning requested for table " + + log"${MDC(DeltaLogKeys.TABLE_NAME, ident)} but no active SparkSession found.") + } + } + + // ---------- conversions ---------- + + private def toStorageTableIdent(ident: Identifier): StorageTableIdentifier = { + val ns = ident.namespace() + require( + ns.length == 1, + s"UC identifiers must be of the form .; got namespace ${ns.mkString(".")}") + new StorageTableIdentifier(Array(catalogName, ns(0)), ident.name()) + } + + /** Three-part dotted name from a `[catalog, schema]` + `name` storage identifier. */ + private def fullQualifiedTableName(t: StorageTableIdentifier): String = { + val ns = t.getNamespace + s"${ns(0)}.${ns(1)}.${t.getName}" + } + + private def toV1Table(ident: Identifier, info: TableInfo): V1Table = { + val m = info.getMetadata + val properties = Option(m.getConfiguration) + .map(_.asScala.toMap) + .getOrElse(Map.empty[String, String]) + val partitionColumns = Option(m.getPartitionColumns) + .map(_.asScala.toSeq) + .getOrElse(Seq.empty[String]) + val schema = Option(m.getSchemaString) + .map(DataType.fromJson(_).asInstanceOf[StructType]) + .getOrElse(new StructType()) + val storage = CatalogStorageFormat.empty.copy( + locationUri = Some(new URI(info.getLocation)), + properties = properties ++ info.getStorageProperties.asScala.toMap) + val catalogTable = CatalogTable( + identifier = TableIdentifier(ident.name(), ident.namespace().headOption, Some(catalogName)), + tableType = fromUcTableType(info.getTableType), + storage = storage, + schema = schema, + provider = Option(m.getProvider).map(_.toLowerCase(java.util.Locale.ROOT)), + partitionColumnNames = partitionColumns, + comment = Option(m.getDescription), + createTime = if (m.getCreatedTime != null) m.getCreatedTime else 0L, + tracksPartitionsInCatalog = false) + V1Table(catalogTable) + } + + private def fromUcTableType(t: UCDeltaModels.TableType): CatalogTableType = t match { + case UCDeltaModels.TableType.MANAGED => CatalogTableType.MANAGED + case UCDeltaModels.TableType.EXTERNAL => CatalogTableType.EXTERNAL + } +} + +object UCDeltaCatalogClientImpl extends AbstractDeltaCatalogClientFactory with Logging { + /** Bumped at every loadTable entry, regardless of outcome. */ + val LOAD_TABLE_INVOCATIONS: AtomicLong = new AtomicLong(0L) + + /** + * Bumped only when loadTable returned a Delta table from the Delta REST API (no fallback, + * no rethrow). Use this for "Delta REST actually served the load" assertions. + */ + val SUCCESSFUL_DELTA_REST_API_LOADS: AtomicLong = new AtomicLong(0L) + + private[catalog] val RenewCredentialEnabledKey: String = "renewCredential.enabled" + private[catalog] val CredScopedFsEnabledKey: String = "credScopedFs.enabled" + private[catalog] val ServerSidePlanningEnabledKey: String = "serverSidePlanning.enabled" + + private[catalog] val defaultFallbackLoadTable: Identifier => Table = ident => + throw new IllegalStateException( + s"Non-Delta table $ident cannot be served via the Delta REST API path and no " + + "fallback catalog was configured.") + + /** + * Builds a [[UCDeltaCatalogClientImpl]] from catalog options. The `deltaRestApi.enabled` + * gate is the caller's responsibility + * ([[AbstractDeltaCatalogClient.fromCatalogOptionsIfEnabled]]). + * {@code fallbackLoadTable} is invoked when UC reports {@code UnsupportedTableFormatException}. + */ + override def fromCatalogOptions( + catalogName: String, + options: CaseInsensitiveStringMap, + fallbackLoadTable: Identifier => Table + ): UCDeltaCatalogClientImpl = { + val uri = Option(options.get("uri")).getOrElse(throw new IllegalArgumentException( + s"'uri' is required when 'deltaRestApi.enabled' is true (catalog '$catalogName')")) + val authConfigs = extractAuthConfigs(options, catalogName) + val appVersions = UCTokenBasedRestClientFactory.defaultAppVersionsAsJava + val renewCredEnabled = options.getBoolean(RenewCredentialEnabledKey, true) + val credScopedFsEnabled = options.getBoolean(CredScopedFsEnabledKey, false) + val sspEnabled = options.getBoolean(ServerSidePlanningEnabledKey, false) + val hadoopConfSupplier: Supplier[Configuration] = + () => SparkSession.getActiveSession + .map(_.sparkContext.hadoopConfiguration) + .getOrElse(new Configuration()) + val restClient = UCDeltaTokenBasedRestClient.create( + uri, + authConfigs, + appVersions, + renewCredEnabled, + credScopedFsEnabled, + hadoopConfSupplier) + new UCDeltaCatalogClientImpl(catalogName, restClient, sspEnabled, fallbackLoadTable) + } + + /** + * `auth.*` sub-keys (prefix stripped) feed `TokenProvider.create`. Legacy bare `token` + * is translated to `{type=static, token=}`, only when no `auth.*` is present. + */ + private[catalog] def extractAuthConfigs( + options: CaseInsensitiveStringMap, + catalogName: String): java.util.Map[String, String] = { + val authConfigs = new java.util.HashMap[String, String]() + val authPrefix = "auth." + // CaseInsensitiveStringMap.entrySet() returns keys already lowercased. + options.entrySet().asScala.foreach { e => + val key = e.getKey + if (key.startsWith(authPrefix)) { + authConfigs.put(key.substring(authPrefix.length), e.getValue) + } + } + if (authConfigs.isEmpty) { + Option(options.get("token")).foreach { tok => + authConfigs.put("type", "static") + authConfigs.put("token", tok) + } + } + if (authConfigs.isEmpty) { + throw new IllegalArgumentException( + s"auth configuration is required when 'deltaRestApi.enabled' is true " + + s"(catalog '$catalogName'). Set either 'auth.type' (with the corresponding " + + s"auth.* keys) or the legacy 'token' option.") + } + authConfigs + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalogClientRoutingSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalogClientRoutingSuite.scala new file mode 100644 index 00000000000..c8b5696f136 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalogClientRoutingSuite.scala @@ -0,0 +1,198 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.catalog + +import java.util +import java.util.UUID +import io.delta.storage.commit.{TableIdentifier => StorageTableIdentifier} +import io.delta.storage.commit.uccommitcoordinator.{UCDeltaClient, UCDeltaModels} +import io.delta.storage.commit.uccommitcoordinator.UCDeltaModels.{StagingTableInfo, TableInfo} +import io.delta.storage.commit.actions.AbstractMetadata +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.catalog.CatalogTableType +import org.apache.spark.sql.connector.catalog.{Identifier, Table, V1Table} +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +/** + * Unit tests for the Delta REST API client wiring on [[AbstractDeltaCatalog]]. These verify + * only the catalog-option / initialize plumbing and the loadTable dispatch decision; they do + * not require a UC server. + */ +class AbstractDeltaCatalogClientRoutingSuite extends QueryTest with DeltaSQLCommandTest { + + private def options(kv: (String, String)*): CaseInsensitiveStringMap = { + val m = new util.HashMap[String, String]() + kv.foreach { case (k, v) => m.put(k, v) } + new CaseInsensitiveStringMap(m) + } + + test("deltaRestApi.enabled=false leaves deltaCatalogClient null") { + val catalog = new AbstractDeltaCatalog + catalog.initialize("test_cat", options()) + assert(catalog.deltaCatalogClient == null, + "Delta REST API client should not be constructed when the catalog opts out") + } + + test("deltaRestApi.enabled=true requires uri") { + val catalog = new AbstractDeltaCatalog + val e = intercept[IllegalArgumentException] { + catalog.initialize("test_cat", options("deltaRestApi.enabled" -> "true")) + } + assert(e.getMessage.contains("'uri' is required")) + } + + test("deltaRestApi.enabled=true requires an auth configuration") { + val catalog = new AbstractDeltaCatalog + val e = intercept[IllegalArgumentException] { + catalog.initialize("test_cat", + options("deltaRestApi.enabled" -> "true", "uri" -> "http://uc")) + } + assert(e.getMessage.contains("auth configuration is required")) + } + + test("auth.* options are passed through to TokenProvider (new format)") { + val catalog = new AbstractDeltaCatalog + catalog.initialize("test_cat", + options( + "deltaRestApi.enabled" -> "true", + "uri" -> "http://uc", + "auth.type" -> "static", + "auth.token" -> "tok")) + assert(catalog.deltaCatalogClient != null) + } + + test("deltaRestApi.enabled=true with uri+token constructs the Delta REST API client") { + val catalog = new AbstractDeltaCatalog + catalog.initialize("test_cat", + options("deltaRestApi.enabled" -> "true", "uri" -> "http://uc", "token" -> "tok")) + assert(catalog.deltaCatalogClient != null, + "Delta REST API client should be constructed when the catalog opts in") + assert(catalog.deltaCatalogClient.isInstanceOf[UCDeltaCatalogClientImpl], + s"Delta REST API client should be UCDeltaCatalogClientImpl, " + + s"was ${catalog.deltaCatalogClient.getClass}") + } + + test("DeltaCatalogClient.fromCatalogOptionsIfEnabled returns null when the flag is off") { + val result = AbstractDeltaCatalogClient.fromCatalogOptionsIfEnabled( + "test_cat", options(), noFallback) + assert(result == null) + } + + test("DeltaCatalogClient.fromCatalogOptionsIfEnabled returns non-null when the flag is on") { + val result = AbstractDeltaCatalogClient.fromCatalogOptionsIfEnabled( + "test_cat", + options("deltaRestApi.enabled" -> "true", "uri" -> "http://uc", "token" -> "tok"), + noFallback) + assert(result != null) + } + + private val noFallback: Identifier => Table = + _ => throw new UnsupportedOperationException("fallback not expected in this test") + + test("loadTable converts TableInfo to V1Table with catalog-supplied fields") { + val tableId = UUID.randomUUID() + val metadata = new AbstractMetadata { + override def getId: String = null + override def getName: String = "tbl" + override def getDescription: String = "a test table" + override def getProvider: String = "DELTA" + override def getFormatOptions: java.util.Map[String, String] = + java.util.Collections.emptyMap() + override def getSchemaString: String = + """{"type":"struct","fields":[{"name":"id","type":"long","nullable":true,"metadata":{}}]}""" + override def getPartitionColumns: java.util.List[String] = java.util.Collections.emptyList() + override def getConfiguration: java.util.Map[String, String] = + java.util.Map.of("ucTableId", tableId.toString, "delta.feature.x", "supported") + override def getCreatedTime: java.lang.Long = 42L + } + val info = new TableInfo( + tableId, + UCDeltaModels.TableType.EXTERNAL, + "s3://bucket/table", + metadata, + java.util.Map.of("fs.s3a.access.key", "key")) + + val client = new UCDeltaCatalogClientImpl( + catalogName = "main", + ucClient = new StubUCDeltaClient(info)) + + val table = client.loadTable(Identifier.of(Array("sch"), "tbl")) + val v1 = table.asInstanceOf[V1Table].catalogTable + assert(v1.identifier.table === "tbl") + assert(v1.identifier.database === Some("sch")) + assert(v1.identifier.catalog === Some("main")) + assert(v1.tableType === CatalogTableType.EXTERNAL) + assert(v1.storage.locationUri.map(_.toString) === Some("s3://bucket/table")) + assert(v1.provider === Some("delta")) + assert(v1.comment === Some("a test table")) + assert(v1.createTime === 42L) + assert(!v1.tracksPartitionsInCatalog) + assert(v1.schema.fieldNames.toSeq === Seq("id")) + val merged = v1.storage.properties + assert(merged.get("ucTableId") === Some(tableId.toString)) + assert(merged.get("fs.s3a.access.key") === Some("key")) + } +} + +/** Returns a fixed [[TableInfo]] from {@code loadTable}; throws elsewhere. */ +private class StubUCDeltaClient(info: TableInfo) extends UCDeltaClient { + override def getMetastoreId(): String = throw new UnsupportedOperationException + override def loadTable(tableIdentifier: StorageTableIdentifier): TableInfo = info + override def createStagingTable( + catalog: String, schema: String, table: String): StagingTableInfo = + throw new UnsupportedOperationException + override def createTable( + catalog: String, + schema: String, + name: String, + location: String, + tableType: UCDeltaModels.TableType, + comment: String, + partitionColumns: java.util.List[String], + protocol: UCDeltaModels.DeltaProtocol, + properties: java.util.Map[String, String]): AbstractMetadata = + throw new UnsupportedOperationException + override def commit( + tableId: String, + tableUri: java.net.URI, + tableIdentifier: io.delta.storage.commit.TableIdentifier, + commit: java.util.Optional[io.delta.storage.commit.Commit], + lastKnownBackfilledVersion: java.util.Optional[java.lang.Long], + oldMetadata: java.util.Optional[AbstractMetadata], + newMetadata: java.util.Optional[AbstractMetadata], + oldProtocol: java.util.Optional[io.delta.storage.commit.actions.AbstractProtocol], + newProtocol: java.util.Optional[io.delta.storage.commit.actions.AbstractProtocol], + uniform: java.util.Optional[io.delta.storage.commit.uniform.UniformMetadata]): Unit = + throw new UnsupportedOperationException + override def getCommits( + tableId: String, + tableUri: java.net.URI, + tableIdentifier: io.delta.storage.commit.TableIdentifier, + startVersion: java.util.Optional[java.lang.Long], + endVersion: java.util.Optional[java.lang.Long]): io.delta.storage.commit.GetCommitsResponse = + throw new UnsupportedOperationException + override def finalizeCreate( + tableName: String, + catalogName: String, + schemaName: String, + storageLocation: String, + columns: java.util.List[io.delta.storage.commit.uccommitcoordinator.UCClient.ColumnDef], + properties: java.util.Map[String, String]): Unit = + throw new UnsupportedOperationException + override def close(): Unit = () +} diff --git a/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableIntegrationBaseTest.java b/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableIntegrationBaseTest.java index 0139e56c093..a0696436e86 100644 --- a/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableIntegrationBaseTest.java +++ b/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableIntegrationBaseTest.java @@ -32,10 +32,12 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.hadoop.fs.Path; +import org.apache.log4j.Logger; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.delta.catalog.UCDeltaCatalogClientImpl; import org.assertj.core.api.ThrowableAssert.ThrowingCallable; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -142,9 +144,59 @@ private SparkConf configureSparkWithUnityCatalog(SparkConf conf) { // Set the catalog specific configs. UnityCatalogInfo uc = unityCatalogInfo(); String catalogName = uc.catalogName(); - return conf.set("spark.sql.catalog." + catalogName, "io.unitycatalog.spark.UCSingleCatalog") - .set("spark.sql.catalog." + catalogName + ".uri", uc.serverUri()) - .set("spark.sql.catalog." + catalogName + ".token", uc.serverToken()); + conf = + conf.set("spark.sql.catalog." + catalogName, "io.unitycatalog.spark.UCSingleCatalog") + .set("spark.sql.catalog." + catalogName + ".uri", uc.serverUri()) + .set("spark.sql.catalog." + catalogName + ".token", uc.serverToken()); + if (useDeltaRestApiForTests()) { + conf = conf.set("spark.sql.catalog." + catalogName + ".deltaRestApi.enabled", "true"); + } + return conf; + } + + /** Subclasses can override to false for A/B comparison with the legacy path. */ + protected boolean useDeltaRestApiForTests() { + return true; + } + + private static final Logger LOG = Logger.getLogger(UCDeltaTableIntegrationBaseTest.class); + + private long deltaRestApiLoadsAtClassStart; + private long loadTableInvocationsAtClassStart; + + @BeforeAll + public void captureDeltaRestApiBaseline() { + deltaRestApiLoadsAtClassStart = + UCDeltaCatalogClientImpl.SUCCESSFUL_DELTA_REST_API_LOADS().get(); + loadTableInvocationsAtClassStart = UCDeltaCatalogClientImpl.LOAD_TABLE_INVOCATIONS().get(); + } + + @AfterAll + public void verifyDeltaRestApiExercisedAtClassLevel() { + if (!useDeltaRestApiForTests()) { + return; + } + long loadInvocationsAfter = UCDeltaCatalogClientImpl.LOAD_TABLE_INVOCATIONS().get(); + if (loadInvocationsAfter <= loadTableInvocationsAtClassStart) { + // Every test in the suite was aborted (e.g. via Assumption.assumeTrue) before any + // loadTable call ran, so there is nothing to assert about the Delta REST API path. + return; + } + long after = UCDeltaCatalogClientImpl.SUCCESSFUL_DELTA_REST_API_LOADS().get(); + if (after <= deltaRestApiLoadsAtClassStart) { + throw new AssertionError( + "Suite finished but no UCDeltaCatalogClientImpl.loadTable call actually returned a " + + "Delta table via the Delta REST API. deltaRestApi.enabled is on but every " + + "load either fell back to the legacy delegate or threw. baseline=" + + deltaRestApiLoadsAtClassStart + + ", after=" + + after); + } + LOG.info( + "[delta-api] " + + getClass().getSimpleName() + + " successful Delta REST API loads: " + + (after - deltaRestApiLoadsAtClassStart)); } /** Stop the SparkSession after all tests. */ From 316797b253f57d34079839be8893bd20186b2109 Mon Sep 17 00:00:00 2001 From: Yi Li Date: Tue, 19 May 2026 00:49:10 -0700 Subject: [PATCH 2/3] Address comments Signed-off-by: Yi Li --- .../delta/catalog/AbstractDeltaCatalog.scala | 12 +- .../catalog/AbstractDeltaCatalogClient.scala | 45 ++--- .../catalog/UCDeltaCatalogClientImpl.scala | 25 ++- ...stractDeltaCatalogClientRoutingSuite.scala | 167 +++++++++++++----- .../UCDeltaTableIntegrationBaseTest.java | 24 ++- .../UCDeltaTableNonDeltaFallbackTest.java | 103 +++++++++++ .../io/sparkuctest/UCDeltaTableReadTest.java | 7 - 7 files changed, 296 insertions(+), 87 deletions(-) create mode 100644 spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableNonDeltaFallbackTest.java diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala index 773b2876037..8502108d671 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala @@ -85,14 +85,14 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension val spark = SparkSession.active /** - * When non-null, table operations are routed through this client instead of through the + * When defined, table operations are routed through this client instead of through the * [[org.apache.spark.sql.connector.catalog.DelegatingCatalogExtension]] delegate that * `AbstractDeltaCatalog` normally relies on. This lets the catalog inject custom * interactions (e.g. talking to a REST endpoint, catalog-specific property handling, * storage-credential vending) rather than going through the Spark * [[org.apache.spark.sql.connector.catalog.TableCatalog]] API. */ - private[catalog] var deltaCatalogClient: AbstractDeltaCatalogClient = null + private[catalog] var deltaCatalogClient: Option[AbstractDeltaCatalogClient] = None override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = { super.initialize(name, options) @@ -307,11 +307,9 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension "DeltaCatalog", "loadTable") { setVariantBlockingConfigIfUC() try { - val table = if (deltaCatalogClient != null) { - deltaCatalogClient.loadTable(ident) - } else { - super.loadTable(ident) - } + val table = deltaCatalogClient + .map(_.loadTable(ident)) + .getOrElse(super.loadTable(ident)) ServerSidePlannedTable.tryCreate(spark, ident, table, isUnityCatalog).foreach { sspt => return sspt diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalogClient.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalogClient.scala index 74e06115f71..7ec07cf8a85 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalogClient.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalogClient.scala @@ -55,30 +55,35 @@ private[catalog] object AbstractDeltaCatalogClient extends Logging { "org.apache.spark.sql.delta.catalog.UCDeltaCatalogClientImpl" /** - * Returns a [[AbstractDeltaCatalogClient]] when the catalog opted in via `deltaRestApi.enabled`, - * else `null`. The concrete impl is loaded reflectively so [[AbstractDeltaCatalog]] doesn't - * compile-depend on it; environments that don't ship [[UCDeltaCatalogClientImpl]] degrade - * to `null`. + * Returns a [[AbstractDeltaCatalogClient]] wrapped in [[Some]] when the catalog opted in via + * `deltaRestApi.enabled`, else [[None]]. The concrete impl is loaded reflectively so + * [[AbstractDeltaCatalog]] doesn't compile-depend on it. + * + * When opt-in is explicit but reflective loading fails (missing class, wrong type, missing + * MODULE$ field, etc.), this throws [[IllegalStateException]] rather than silently degrading + * to the legacy delegate. Following the [[deltaCatalogClient]] is `null` path when the user + * configured the opposite would mask a misconfiguration. */ def fromCatalogOptionsIfEnabled( catalogName: String, options: CaseInsensitiveStringMap, - fallbackLoadTable: Identifier => Table): AbstractDeltaCatalogClient = { - if (options.getBoolean(UC_DELTA_REST_API_ENABLED_KEY, false)) { - val factory = try { - // scalastyle:off classforname - val cls = Class.forName(UC_DELTA_CATALOG_CLIENT_IMPL_CLASS_NAME + "$") - // scalastyle:on classforname - cls.getField("MODULE$").get(null).asInstanceOf[AbstractDeltaCatalogClientFactory] - } catch { - case _: ClassNotFoundException => - logWarning(s"'$UC_DELTA_REST_API_ENABLED_KEY' is true but " + - s"$UC_DELTA_CATALOG_CLIENT_IMPL_CLASS_NAME is not on the classpath; skipping it.") - return null - } - factory.fromCatalogOptions(catalogName, options, fallbackLoadTable) - } else { - null + fallbackLoadTable: Identifier => Table): Option[AbstractDeltaCatalogClient] = { + if (!options.getBoolean(UC_DELTA_REST_API_ENABLED_KEY, false)) { + return None } + val factory = try { + // scalastyle:off classforname + val cls = Class.forName(UC_DELTA_CATALOG_CLIENT_IMPL_CLASS_NAME + "$") + // scalastyle:on classforname + cls.getField("MODULE$").get(null).asInstanceOf[AbstractDeltaCatalogClientFactory] + } catch { + case e: Exception => + throw new IllegalStateException( + s"Failed to load $UC_DELTA_CATALOG_CLIENT_IMPL_CLASS_NAME though " + + s"'$UC_DELTA_REST_API_ENABLED_KEY' is true. Ensure the implementation JAR is on " + + s"the classpath, or remove '$UC_DELTA_REST_API_ENABLED_KEY' from the catalog " + + s"options to fall back to the legacy delegate.", e) + } + Some(factory.fromCatalogOptions(catalogName, options, fallbackLoadTable)) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/UCDeltaCatalogClientImpl.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/UCDeltaCatalogClientImpl.scala index 0b8ae731427..afdef0729f2 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/UCDeltaCatalogClientImpl.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/UCDeltaCatalogClientImpl.scala @@ -76,9 +76,10 @@ private[catalog] class UCDeltaCatalogClientImpl( log"${MDC(DeltaLogKeys.EXCEPTION, e.getMessage)}") return fallbackLoadTable(ident) case e: CredentialFetchFailedException if serverSidePlanningEnabled => - logWarning( - s"Credential fetch failed for ${fullQualifiedTableName(tid)}; enabling " + - s"server-side planning fallback. Cause: ${e.getMessage}") + logWarning(log"Credential fetch failed for " + + log"${MDC(DeltaLogKeys.TABLE_NAME, fullQualifiedTableName(tid))}; enabling " + + log"server-side planning fallback. Cause: " + + log"${MDC(DeltaLogKeys.EXCEPTION, e.getMessage)}") enableServerSidePlanningConfig(ident) e.getTableInfoWithoutCredentials } @@ -104,7 +105,8 @@ private[catalog] class UCDeltaCatalogClientImpl( val ns = ident.namespace() require( ns.length == 1, - s"UC identifiers must be of the form .
; got namespace ${ns.mkString(".")}") + s"UC identifiers must be of the form .
; got namespace of length " + + s"${ns.length}: '${ns.mkString(".")}' (full identifier: '${ident.toString}')") new StorageTableIdentifier(Array(catalogName, ns(0)), ident.name()) } @@ -148,12 +150,21 @@ private[catalog] class UCDeltaCatalogClientImpl( } object UCDeltaCatalogClientImpl extends AbstractDeltaCatalogClientFactory with Logging { - /** Bumped at every loadTable entry, regardless of outcome. */ + /** + * Test-only instrumentation. Bumped at every `loadTable` entry regardless of outcome. + * Read by integration tests (e.g. {@code UCDeltaTableIntegrationBaseTest}) to verify the + * Delta REST API path was exercised. Not part of any public API; do not depend on this + * from production code. Kept public so cross-package integration tests + * ({@code io.sparkuctest.*}) can read it without reflection. + */ val LOAD_TABLE_INVOCATIONS: AtomicLong = new AtomicLong(0L) /** - * Bumped only when loadTable returned a Delta table from the Delta REST API (no fallback, - * no rethrow). Use this for "Delta REST actually served the load" assertions. + * Test-only instrumentation. Bumped only when `loadTable` returned a Delta table via the + * Delta REST API (no fallback, no rethrow). Read by integration tests to assert "Delta REST + * actually served the load." Not part of any public API; do not depend on this from + * production code. Kept public so cross-package integration tests + * ({@code io.sparkuctest.*}) can read it without reflection. */ val SUCCESSFUL_DELTA_REST_API_LOADS: AtomicLong = new AtomicLong(0L) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalogClientRoutingSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalogClientRoutingSuite.scala index c8b5696f136..25910d6ddaa 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalogClientRoutingSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalogClientRoutingSuite.scala @@ -16,15 +16,21 @@ package org.apache.spark.sql.delta.catalog +import java.net.URI import java.util -import java.util.UUID -import io.delta.storage.commit.{TableIdentifier => StorageTableIdentifier} -import io.delta.storage.commit.uccommitcoordinator.{UCDeltaClient, UCDeltaModels} -import io.delta.storage.commit.uccommitcoordinator.UCDeltaModels.{StagingTableInfo, TableInfo} -import io.delta.storage.commit.actions.AbstractMetadata +import java.util.{Collections, Optional, UUID} + +import io.delta.storage.commit.{Commit, GetCommitsResponse, TableIdentifier => StorageTableIdentifier} +import io.delta.storage.commit.actions.{AbstractMetadata, AbstractProtocol} +import io.delta.storage.commit.uccommitcoordinator.{UCClient, UCDeltaClient, UCDeltaModels} +import io.delta.storage.commit.uccommitcoordinator.UCDeltaModels.{DeltaProtocol, StagingTableInfo, TableInfo, TableType => UcTableType} +import io.delta.storage.commit.uccommitcoordinator.exceptions.CredentialFetchFailedException +import io.delta.storage.commit.uniform.UniformMetadata + import org.apache.spark.sql.QueryTest import org.apache.spark.sql.catalyst.catalog.CatalogTableType import org.apache.spark.sql.connector.catalog.{Identifier, Table, V1Table} +import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -41,10 +47,10 @@ class AbstractDeltaCatalogClientRoutingSuite extends QueryTest with DeltaSQLComm new CaseInsensitiveStringMap(m) } - test("deltaRestApi.enabled=false leaves deltaCatalogClient null") { + test("deltaRestApi.enabled=false leaves deltaCatalogClient empty") { val catalog = new AbstractDeltaCatalog catalog.initialize("test_cat", options()) - assert(catalog.deltaCatalogClient == null, + assert(catalog.deltaCatalogClient.isEmpty, "Delta REST API client should not be constructed when the catalog opts out") } @@ -73,32 +79,31 @@ class AbstractDeltaCatalogClientRoutingSuite extends QueryTest with DeltaSQLComm "uri" -> "http://uc", "auth.type" -> "static", "auth.token" -> "tok")) - assert(catalog.deltaCatalogClient != null) + assert(catalog.deltaCatalogClient.isDefined) } test("deltaRestApi.enabled=true with uri+token constructs the Delta REST API client") { val catalog = new AbstractDeltaCatalog catalog.initialize("test_cat", options("deltaRestApi.enabled" -> "true", "uri" -> "http://uc", "token" -> "tok")) - assert(catalog.deltaCatalogClient != null, - "Delta REST API client should be constructed when the catalog opts in") - assert(catalog.deltaCatalogClient.isInstanceOf[UCDeltaCatalogClientImpl], - s"Delta REST API client should be UCDeltaCatalogClientImpl, " + - s"was ${catalog.deltaCatalogClient.getClass}") + val client = catalog.deltaCatalogClient.getOrElse( + fail("Delta REST API client should be constructed when the catalog opts in")) + assert(client.isInstanceOf[UCDeltaCatalogClientImpl], + s"Delta REST API client should be UCDeltaCatalogClientImpl, was ${client.getClass}") } - test("DeltaCatalogClient.fromCatalogOptionsIfEnabled returns null when the flag is off") { + test("AbstractDeltaCatalogClient.fromCatalogOptionsIfEnabled returns None when flag is off") { val result = AbstractDeltaCatalogClient.fromCatalogOptionsIfEnabled( "test_cat", options(), noFallback) - assert(result == null) + assert(result.isEmpty) } - test("DeltaCatalogClient.fromCatalogOptionsIfEnabled returns non-null when the flag is on") { + test("AbstractDeltaCatalogClient.fromCatalogOptionsIfEnabled returns Some when flag is on") { val result = AbstractDeltaCatalogClient.fromCatalogOptionsIfEnabled( "test_cat", options("deltaRestApi.enabled" -> "true", "uri" -> "http://uc", "token" -> "tok"), noFallback) - assert(result != null) + assert(result.isDefined) } private val noFallback: Identifier => Table = @@ -111,13 +116,12 @@ class AbstractDeltaCatalogClientRoutingSuite extends QueryTest with DeltaSQLComm override def getName: String = "tbl" override def getDescription: String = "a test table" override def getProvider: String = "DELTA" - override def getFormatOptions: java.util.Map[String, String] = - java.util.Collections.emptyMap() + override def getFormatOptions: util.Map[String, String] = Collections.emptyMap() override def getSchemaString: String = """{"type":"struct","fields":[{"name":"id","type":"long","nullable":true,"metadata":{}}]}""" - override def getPartitionColumns: java.util.List[String] = java.util.Collections.emptyList() - override def getConfiguration: java.util.Map[String, String] = - java.util.Map.of("ucTableId", tableId.toString, "delta.feature.x", "supported") + override def getPartitionColumns: util.List[String] = Collections.emptyList() + override def getConfiguration: util.Map[String, String] = + util.Map.of("ucTableId", tableId.toString, "delta.feature.x", "supported") override def getCreatedTime: java.lang.Long = 42L } val info = new TableInfo( @@ -125,7 +129,7 @@ class AbstractDeltaCatalogClientRoutingSuite extends QueryTest with DeltaSQLComm UCDeltaModels.TableType.EXTERNAL, "s3://bucket/table", metadata, - java.util.Map.of("fs.s3a.access.key", "key")) + util.Map.of("fs.s3a.access.key", "key")) val client = new UCDeltaCatalogClientImpl( catalogName = "main", @@ -147,12 +151,85 @@ class AbstractDeltaCatalogClientRoutingSuite extends QueryTest with DeltaSQLComm assert(merged.get("ucTableId") === Some(tableId.toString)) assert(merged.get("fs.s3a.access.key") === Some("key")) } + + test("loadTable falls back to SSP on CredentialFetchFailedException when SSP is enabled") { + val tableId = UUID.randomUUID() + val metadata = new AbstractMetadata { + override def getId: String = null + override def getName: String = "tbl" + override def getDescription: String = null + override def getProvider: String = "DELTA" + override def getFormatOptions: util.Map[String, String] = Collections.emptyMap() + override def getSchemaString: String = + """{"type":"struct","fields":[{"name":"id","type":"long","nullable":true,"metadata":{}}]}""" + override def getPartitionColumns: util.List[String] = Collections.emptyList() + // No credential properties; this is the "without credentials" TableInfo. + override def getConfiguration: util.Map[String, String] = Collections.emptyMap() + override def getCreatedTime: java.lang.Long = 0L + } + val tableInfoNoCreds = new TableInfo( + tableId, + UCDeltaModels.TableType.EXTERNAL, + "s3://bucket/no-creds-table", + metadata, + Collections.emptyMap()) // no storage properties either + val credEx = new CredentialFetchFailedException( + "creds exhausted", new RuntimeException("simulated"), tableInfoNoCreds) + + val client = new UCDeltaCatalogClientImpl( + catalogName = "main", + ucClient = new StubUCDeltaClient(throw credEx), + serverSidePlanningEnabled = true) + + // Capture and restore the SSP conf so this test doesn't leak into others. + val sspKey = DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key + val originalSsp = spark.conf.getOption(sspKey) + spark.conf.unset(sspKey) + try { + val table = client.loadTable(Identifier.of(Array("sch"), "tbl")) + val v1 = table.asInstanceOf[V1Table].catalogTable + assert(v1.identifier.table === "tbl") + assert(v1.storage.locationUri.map(_.toString) === Some("s3://bucket/no-creds-table")) + assert(v1.storage.properties.isEmpty, + s"no credentials should be set on the V1Table; got ${v1.storage.properties}") + // The fallback path must have flipped SSP on. + assert(spark.conf.get(sspKey) === "true", + "Server-side planning conf should be set after CredentialFetchFailedException fallback") + } finally { + originalSsp match { + case Some(value) => spark.conf.set(sspKey, value) + case None => spark.conf.unset(sspKey) + } + } + } + + test("loadTable without serverSidePlanningEnabled rethrows CredentialFetchFailedException") { + val ex = new CredentialFetchFailedException( + "creds exhausted", new RuntimeException("simulated"), null) + val client = new UCDeltaCatalogClientImpl( + catalogName = "main", + ucClient = new StubUCDeltaClient(throw ex), + serverSidePlanningEnabled = false) + val thrown = intercept[CredentialFetchFailedException] { + client.loadTable(Identifier.of(Array("sch"), "tbl")) + } + assert(thrown eq ex) + } } -/** Returns a fixed [[TableInfo]] from {@code loadTable}; throws elsewhere. */ -private class StubUCDeltaClient(info: TableInfo) extends UCDeltaClient { +/** + * Returns the result of {@code loadTableResult} (a by-name parameter) from + * {@code loadTable}; throws on every other method. Pass a [[TableInfo]] to get a successful + * load, or {@code throw new ...} to simulate UC-side failures. + * + *

Because {@code loadTableResult} is by-name, the body re-evaluates on every + * {@code loadTable} invocation: a {@code throw} expression re-throws each time; a + * {@link TableInfo} reference is rebound (cheap). For tests that need to vary the result + * across calls, replace this with a {@code Supplier}-shaped constructor. + */ +private class StubUCDeltaClient(loadTableResult: => TableInfo) extends UCDeltaClient { override def getMetastoreId(): String = throw new UnsupportedOperationException - override def loadTable(tableIdentifier: StorageTableIdentifier): TableInfo = info + override def loadTable(tableIdentifier: StorageTableIdentifier): TableInfo = loadTableResult override def createStagingTable( catalog: String, schema: String, table: String): StagingTableInfo = throw new UnsupportedOperationException @@ -161,38 +238,38 @@ private class StubUCDeltaClient(info: TableInfo) extends UCDeltaClient { schema: String, name: String, location: String, - tableType: UCDeltaModels.TableType, + tableType: UcTableType, comment: String, - partitionColumns: java.util.List[String], - protocol: UCDeltaModels.DeltaProtocol, - properties: java.util.Map[String, String]): AbstractMetadata = + partitionColumns: util.List[String], + protocol: DeltaProtocol, + properties: util.Map[String, String]): AbstractMetadata = throw new UnsupportedOperationException override def commit( tableId: String, - tableUri: java.net.URI, - tableIdentifier: io.delta.storage.commit.TableIdentifier, - commit: java.util.Optional[io.delta.storage.commit.Commit], - lastKnownBackfilledVersion: java.util.Optional[java.lang.Long], - oldMetadata: java.util.Optional[AbstractMetadata], - newMetadata: java.util.Optional[AbstractMetadata], - oldProtocol: java.util.Optional[io.delta.storage.commit.actions.AbstractProtocol], - newProtocol: java.util.Optional[io.delta.storage.commit.actions.AbstractProtocol], - uniform: java.util.Optional[io.delta.storage.commit.uniform.UniformMetadata]): Unit = + tableUri: URI, + tableIdentifier: StorageTableIdentifier, + commit: Optional[Commit], + lastKnownBackfilledVersion: Optional[java.lang.Long], + oldMetadata: Optional[AbstractMetadata], + newMetadata: Optional[AbstractMetadata], + oldProtocol: Optional[AbstractProtocol], + newProtocol: Optional[AbstractProtocol], + uniform: Optional[UniformMetadata]): Unit = throw new UnsupportedOperationException override def getCommits( tableId: String, - tableUri: java.net.URI, - tableIdentifier: io.delta.storage.commit.TableIdentifier, - startVersion: java.util.Optional[java.lang.Long], - endVersion: java.util.Optional[java.lang.Long]): io.delta.storage.commit.GetCommitsResponse = + tableUri: URI, + tableIdentifier: StorageTableIdentifier, + startVersion: Optional[java.lang.Long], + endVersion: Optional[java.lang.Long]): GetCommitsResponse = throw new UnsupportedOperationException override def finalizeCreate( tableName: String, catalogName: String, schemaName: String, storageLocation: String, - columns: java.util.List[io.delta.storage.commit.uccommitcoordinator.UCClient.ColumnDef], - properties: java.util.Map[String, String]): Unit = + columns: util.List[UCClient.ColumnDef], + properties: util.Map[String, String]): Unit = throw new UnsupportedOperationException override def close(): Unit = () } diff --git a/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableIntegrationBaseTest.java b/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableIntegrationBaseTest.java index a0696436e86..62375568d7c 100644 --- a/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableIntegrationBaseTest.java +++ b/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableIntegrationBaseTest.java @@ -159,6 +159,16 @@ protected boolean useDeltaRestApiForTests() { return true; } + /** + * Whether the class-level @AfterAll should assert that the Delta REST API actually served at + * least one load. Override to false in classes that intentionally exercise only the fallback path + * (which does NOT bump {@code SUCCESSFUL_DELTA_REST_API_LOADS}), so the class-level check doesn't + * false-positive when test sharding distributes methods across CI shards. + */ + protected boolean expectDeltaRestApiSuccessAtClassLevel() { + return true; + } + private static final Logger LOG = Logger.getLogger(UCDeltaTableIntegrationBaseTest.class); private long deltaRestApiLoadsAtClassStart; @@ -173,7 +183,7 @@ public void captureDeltaRestApiBaseline() { @AfterAll public void verifyDeltaRestApiExercisedAtClassLevel() { - if (!useDeltaRestApiForTests()) { + if (!useDeltaRestApiForTests() || !expectDeltaRestApiSuccessAtClassLevel()) { return; } long loadInvocationsAfter = UCDeltaCatalogClientImpl.LOAD_TABLE_INVOCATIONS().get(); @@ -254,6 +264,18 @@ protected void check(String tableName, List> expected) { getSqlExecutor().checkWithSQL("SELECT * FROM " + tableName + " ORDER BY 1", expected); } + /** + * Verify that {@code actual} equals {@code expected}, with an error message that includes both. + * Use this overload when the caller has already run the query and just needs to compare the row + * list (e.g. queries that aren't a plain {@code SELECT *}). + */ + protected void check(List> actual, List> expected) { + if (!actual.equals(expected)) { + throw new AssertionError( + String.format("Query results do not match.\nExpected: %s\nActual: %s", expected, actual)); + } + } + /** Helper method to run code with a temporary directory that gets cleaned up. */ protected void withTempDir(TempDirCode code) throws Exception { UnityCatalogInfo uc = unityCatalogInfo(); diff --git a/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableNonDeltaFallbackTest.java b/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableNonDeltaFallbackTest.java new file mode 100644 index 00000000000..74e4c9f4f5a --- /dev/null +++ b/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableNonDeltaFallbackTest.java @@ -0,0 +1,103 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sparkuctest; + +import java.util.List; +import org.apache.hadoop.fs.Path; +import org.apache.spark.sql.delta.catalog.UCDeltaCatalogClientImpl; +import org.junit.jupiter.api.Test; + +/** + * Integration test for the non-Delta fallback path in {@code UCDeltaCatalogClientImpl}. + * + *

When {@code deltaRestApi.enabled=true}, every {@code loadTable} call first asks the Delta REST + * API. If the target table is not in Delta format, UC returns {@code + * UnsupportedTableFormatException}; the client must then fall back to the legacy {@code + * TableCatalog} delegate so the non-Delta table is still readable. + * + *

This class disables the class-level "Delta REST API served at least one load" assertion (see + * {@link #expectDeltaRestApiSuccessAtClassLevel()}) because its tests intentionally exercise only + * the fallback path, which does not bump {@code SUCCESSFUL_DELTA_REST_API_LOADS}. CI sharding also + * makes a same-file "sanity" Delta test unreliable: methods can be distributed across shards, so + * each shard's @AfterAll runs without a guarantee of seeing both methods. + */ +public class UCDeltaTableNonDeltaFallbackTest extends UCDeltaTableIntegrationBaseTest { + + @Override + protected boolean expectDeltaRestApiSuccessAtClassLevel() { + return false; + } + + @Test + public void testLoadNonDeltaParquetExternalTableFallsBackToLegacyCatalog() throws Exception { + String tableName = "non_delta_parquet_fallback"; + String fullTableName = fullTableName(tableName); + withTempDir( + (Path dir) -> { + Path tablePath = new Path(dir, tableName); + sql("DROP TABLE IF EXISTS %s", fullTableName); + try { + // Create a non-Delta EXTERNAL Parquet table. UC accepts external non-Delta tables; + // managed non-Delta tables are rejected upstream, so EXTERNAL is the only shape + // that reaches the loadTable fallback path. + sql( + "CREATE TABLE %s (id INT, name STRING) USING parquet LOCATION '%s'", + fullTableName, tablePath); + sql("INSERT INTO %s VALUES (1, 'alpha'), (2, 'beta'), (3, 'gamma')", fullTableName); + + long invocationsBefore = UCDeltaCatalogClientImpl.LOAD_TABLE_INVOCATIONS().get(); + long successesBefore = UCDeltaCatalogClientImpl.SUCCESSFUL_DELTA_REST_API_LOADS().get(); + + // The Delta REST API path runs first: ucClient.loadTable -> UC server returns + // UnsupportedTableFormatException (table isn't Delta-format) -> the catch handler + // calls fallbackLoadTable(ident) which is super.loadTable from AbstractDeltaCatalog + // (i.e. the legacy DelegatingCatalogExtension delegate). The SELECT below succeeds + // only if that fallback hands back a usable V1 table for the Parquet data. + List> rows = sql("SELECT id, name FROM %s ORDER BY id", fullTableName); + check( + rows, List.of(List.of("1", "alpha"), List.of("2", "beta"), List.of("3", "gamma"))); + + // Counter delta: loadTable was invoked (catch handler ran) but the Delta REST API + // did NOT successfully serve the load. A future regression that silently returned + // a (wrong) Delta table from the REST path would bump the success counter and fail + // this assertion, even though the row data check above would also still pass. + long invocationsAfter = UCDeltaCatalogClientImpl.LOAD_TABLE_INVOCATIONS().get(); + long successesAfter = UCDeltaCatalogClientImpl.SUCCESSFUL_DELTA_REST_API_LOADS().get(); + if (invocationsAfter <= invocationsBefore) { + throw new AssertionError( + "Expected LOAD_TABLE_INVOCATIONS to increase during the SELECT, but it did not" + + " (before=" + + invocationsBefore + + ", after=" + + invocationsAfter + + ")"); + } + if (successesAfter != successesBefore) { + throw new AssertionError( + "Expected SUCCESSFUL_DELTA_REST_API_LOADS to be unchanged (fallback path took" + + " over), but it changed (before=" + + successesBefore + + ", after=" + + successesAfter + + ")"); + } + } finally { + sql("DROP TABLE IF EXISTS %s", fullTableName); + } + }); + } +} diff --git a/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableReadTest.java b/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableReadTest.java index 66a9169ff35..8520b6e9c4f 100644 --- a/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableReadTest.java +++ b/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableReadTest.java @@ -134,11 +134,4 @@ public void testDeltaTableForPath(TableType tableType) throws Exception { } }); } - - private void check(List> actual, List> expected) { - if (!actual.equals(expected)) { - throw new AssertionError( - String.format("Query results do not match.\nExpected: %s\nActual: %s", expected, actual)); - } - } } From 32b44f904c8c7df81e06fe3d204fad6f7da380cd Mon Sep 17 00:00:00 2001 From: Yi Li Date: Tue, 19 May 2026 15:50:05 -0700 Subject: [PATCH 3/3] Address comments Signed-off-by: Yi Li --- .../catalog/AbstractDeltaCatalogClient.scala | 26 ++-- .../catalog/UCDeltaCatalogClientImpl.scala | 139 +++++++++--------- .../UCCommitCoordinatorBuilder.scala | 76 +++++++--- .../UCCommitCoordinatorBuilderSuite.scala | 22 +-- .../UCCommitCoordinatorClientSuiteBase.scala | 2 +- .../UCDeltaTableIntegrationBaseTest.java | 22 ++- .../UCDeltaTableNonDeltaFallbackTest.java | 21 ++- .../UCTokenBasedRestClient.java | 18 +++ .../CredentialFetchFailedException.java | 3 +- .../exceptions/NoSuchTableException.java | 4 +- .../UnsupportedTableFormatException.java | 4 +- 11 files changed, 192 insertions(+), 145 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalogClient.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalogClient.scala index 7ec07cf8a85..defce5356a5 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalogClient.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalogClient.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.delta.catalog import org.apache.spark.internal.Logging import org.apache.spark.sql.connector.catalog.{Identifier, Table} +import org.apache.spark.sql.delta.coordinatedcommits.UCTokenBasedRestClientFactory import org.apache.spark.sql.util.CaseInsensitiveStringMap /** @@ -45,30 +46,26 @@ private[catalog] trait AbstractDeltaCatalogClientFactory { def fromCatalogOptions( catalogName: String, options: CaseInsensitiveStringMap, - fallbackLoadTable: Identifier => Table): AbstractDeltaCatalogClient + fallbackLoadTableFunc: Identifier => Table): AbstractDeltaCatalogClient } private[catalog] object AbstractDeltaCatalogClient extends Logging { - private val UC_DELTA_REST_API_ENABLED_KEY: String = "deltaRestApi.enabled" private val UC_DELTA_CATALOG_CLIENT_IMPL_CLASS_NAME: String = "org.apache.spark.sql.delta.catalog.UCDeltaCatalogClientImpl" /** * Returns a [[AbstractDeltaCatalogClient]] wrapped in [[Some]] when the catalog opted in via * `deltaRestApi.enabled`, else [[None]]. The concrete impl is loaded reflectively so - * [[AbstractDeltaCatalog]] doesn't compile-depend on it. - * - * When opt-in is explicit but reflective loading fails (missing class, wrong type, missing - * MODULE$ field, etc.), this throws [[IllegalStateException]] rather than silently degrading - * to the legacy delegate. Following the [[deltaCatalogClient]] is `null` path when the user - * configured the opposite would mask a misconfiguration. + * [[AbstractDeltaCatalog]] doesn't compile-depend on it. If opt-in is explicit but reflective + * loading fails, throws [[IllegalStateException]] rather than silently degrading. */ def fromCatalogOptionsIfEnabled( catalogName: String, options: CaseInsensitiveStringMap, - fallbackLoadTable: Identifier => Table): Option[AbstractDeltaCatalogClient] = { - if (!options.getBoolean(UC_DELTA_REST_API_ENABLED_KEY, false)) { + fallbackLoadTableFunc: Identifier => Table): Option[AbstractDeltaCatalogClient] = { + val key = UCTokenBasedRestClientFactory.DELTA_REST_API_ENABLED_KEY + if (!options.getBoolean(key, false)) { return None } val factory = try { @@ -79,11 +76,10 @@ private[catalog] object AbstractDeltaCatalogClient extends Logging { } catch { case e: Exception => throw new IllegalStateException( - s"Failed to load $UC_DELTA_CATALOG_CLIENT_IMPL_CLASS_NAME though " + - s"'$UC_DELTA_REST_API_ENABLED_KEY' is true. Ensure the implementation JAR is on " + - s"the classpath, or remove '$UC_DELTA_REST_API_ENABLED_KEY' from the catalog " + - s"options to fall back to the legacy delegate.", e) + s"Failed to load $UC_DELTA_CATALOG_CLIENT_IMPL_CLASS_NAME though '$key' is true. " + + "Ensure the implementation JAR is on the classpath, or remove " + + s"'$key' from the catalog options to fall back to the legacy delegate.", e) } - Some(factory.fromCatalogOptions(catalogName, options, fallbackLoadTable)) + Some(factory.fromCatalogOptions(catalogName, options, fallbackLoadTableFunc)) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/UCDeltaCatalogClientImpl.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/UCDeltaCatalogClientImpl.scala index afdef0729f2..aa1620bcde6 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/UCDeltaCatalogClientImpl.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/UCDeltaCatalogClientImpl.scala @@ -18,23 +18,17 @@ package org.apache.spark.sql.delta.catalog import java.net.URI import java.util.concurrent.atomic.AtomicLong -import java.util.function.Supplier import scala.jdk.CollectionConverters._ import io.delta.storage.commit.{TableIdentifier => StorageTableIdentifier} -import io.delta.storage.commit.uccommitcoordinator.{ - UCDeltaClient, - UCDeltaModels, - UCDeltaTokenBasedRestClient -} +import io.delta.storage.commit.uccommitcoordinator.{UCDeltaClient, UCDeltaModels} import io.delta.storage.commit.uccommitcoordinator.UCDeltaModels.TableInfo import io.delta.storage.commit.uccommitcoordinator.exceptions.{ CredentialFetchFailedException, UnsupportedTableFormatException, NoSuchTableException => StorageNoSuchTableException } -import org.apache.hadoop.conf.Configuration import org.apache.spark.internal.{Logging, MDC} import org.apache.spark.sql.SparkSession @@ -60,11 +54,12 @@ private[catalog] class UCDeltaCatalogClientImpl( catalogName: String, ucClient: UCDeltaClient, serverSidePlanningEnabled: Boolean = false, - fallbackLoadTable: Identifier => Table = UCDeltaCatalogClientImpl.defaultFallbackLoadTable) + fallbackLoadTableFunc: Identifier => Table + = UCDeltaCatalogClientImpl.defaultFallbackLoadTableFunc) extends AbstractDeltaCatalogClient with Logging { override def loadTable(ident: Identifier): Table = { - UCDeltaCatalogClientImpl.LOAD_TABLE_INVOCATIONS.incrementAndGet() + UCDeltaCatalogClientImpl.loadTableInvocationsCounter.incrementAndGet() val tid = toStorageTableIdent(ident) val info = try ucClient.loadTable(tid) @@ -74,7 +69,7 @@ private[catalog] class UCDeltaCatalogClientImpl( logInfo(log"Table ${MDC(DeltaLogKeys.TABLE_NAME, ident)} is not in Delta format; " + log"falling back to the legacy catalog path. Cause: " + log"${MDC(DeltaLogKeys.EXCEPTION, e.getMessage)}") - return fallbackLoadTable(ident) + return fallbackLoadTableFunc(ident) case e: CredentialFetchFailedException if serverSidePlanningEnabled => logWarning(log"Credential fetch failed for " + log"${MDC(DeltaLogKeys.TABLE_NAME, fullQualifiedTableName(tid))}; enabling " + @@ -83,7 +78,7 @@ private[catalog] class UCDeltaCatalogClientImpl( enableServerSidePlanningConfig(ident) e.getTableInfoWithoutCredentials } - UCDeltaCatalogClientImpl.SUCCESSFUL_DELTA_REST_API_LOADS.incrementAndGet() + UCDeltaCatalogClientImpl.successfulDeltaRestApiLoadsCounter.incrementAndGet() toV1Table(ident, info) } @@ -150,93 +145,93 @@ private[catalog] class UCDeltaCatalogClientImpl( } object UCDeltaCatalogClientImpl extends AbstractDeltaCatalogClientFactory with Logging { + // Test-only instrumentation. The mutable counters are encapsulated so production code + // can neither read nor write them; read access is exposed via the `*ForTesting` methods + // below so cross-package integration tests (e.g. `io.sparkuctest.*`) don't need + // reflection. + + /** Bumped at every `loadTable` entry regardless of outcome. Read via the *ForTesting API. */ + private val loadTableInvocationsCounter: AtomicLong = new AtomicLong(0L) + /** - * Test-only instrumentation. Bumped at every `loadTable` entry regardless of outcome. - * Read by integration tests (e.g. {@code UCDeltaTableIntegrationBaseTest}) to verify the - * Delta REST API path was exercised. Not part of any public API; do not depend on this - * from production code. Kept public so cross-package integration tests - * ({@code io.sparkuctest.*}) can read it without reflection. + * Bumped only when `loadTable` returned a Delta table via the Delta REST API (no fallback, + * no rethrow). Read via the *ForTesting API. */ - val LOAD_TABLE_INVOCATIONS: AtomicLong = new AtomicLong(0L) + private val successfulDeltaRestApiLoadsCounter: AtomicLong = new AtomicLong(0L) /** - * Test-only instrumentation. Bumped only when `loadTable` returned a Delta table via the - * Delta REST API (no fallback, no rethrow). Read by integration tests to assert "Delta REST - * actually served the load." Not part of any public API; do not depend on this from - * production code. Kept public so cross-package integration tests - * ({@code io.sparkuctest.*}) can read it without reflection. + * Test-only read accessor for the `loadTable` invocation counter. Used by integration + * tests to verify the Delta REST API code path ran. Not part of any public API; production + * code must not depend on it. */ - val SUCCESSFUL_DELTA_REST_API_LOADS: AtomicLong = new AtomicLong(0L) + def loadTableInvocationsForTesting: Long = loadTableInvocationsCounter.get() + + /** + * Test-only read accessor for the count of `loadTable` calls served by the Delta REST API + * (no fallback, no rethrow). Not part of any public API. + */ + def successfulDeltaRestApiLoadsForTesting: Long = successfulDeltaRestApiLoadsCounter.get() - private[catalog] val RenewCredentialEnabledKey: String = "renewCredential.enabled" - private[catalog] val CredScopedFsEnabledKey: String = "credScopedFs.enabled" private[catalog] val ServerSidePlanningEnabledKey: String = "serverSidePlanning.enabled" - private[catalog] val defaultFallbackLoadTable: Identifier => Table = ident => + private[catalog] val defaultFallbackLoadTableFunc: Identifier => Table = ident => throw new IllegalStateException( s"Non-Delta table $ident cannot be served via the Delta REST API path and no " + "fallback catalog was configured.") /** - * Builds a [[UCDeltaCatalogClientImpl]] from catalog options. The `deltaRestApi.enabled` - * gate is the caller's responsibility - * ([[AbstractDeltaCatalogClient.fromCatalogOptionsIfEnabled]]). - * {@code fallbackLoadTable} is invoked when UC reports {@code UnsupportedTableFormatException}. + * Builds a [[UCDeltaCatalogClientImpl]] from catalog options. The `deltaRestApi.enabled` gate + * is the caller's responsibility ([[AbstractDeltaCatalogClient.fromCatalogOptionsIfEnabled]]). + * `fallbackLoadTableFunc` is invoked when UC reports `UnsupportedTableFormatException`. UC client + * construction is delegated to [[UCTokenBasedRestClientFactory]] with `renewCredential.enabled` + * defaulted to `true` and `credScopedFs.enabled` defaulted to `false` when not set. */ override def fromCatalogOptions( catalogName: String, options: CaseInsensitiveStringMap, - fallbackLoadTable: Identifier => Table + fallbackLoadTableFunc: Identifier => Table ): UCDeltaCatalogClientImpl = { - val uri = Option(options.get("uri")).getOrElse(throw new IllegalArgumentException( - s"'uri' is required when 'deltaRestApi.enabled' is true (catalog '$catalogName')")) - val authConfigs = extractAuthConfigs(options, catalogName) - val appVersions = UCTokenBasedRestClientFactory.defaultAppVersionsAsJava - val renewCredEnabled = options.getBoolean(RenewCredentialEnabledKey, true) - val credScopedFsEnabled = options.getBoolean(CredScopedFsEnabledKey, false) + // Pre-flight: keep our user-facing errors instead of the factory's less specific ones. + if (options.get(UriKey) == null) { + throw new IllegalArgumentException(s"'$UriKey' is required (catalog '$catalogName')") + } + validateAuthConfigured(options, catalogName) + + // `asCaseSensitiveMap()` preserves the user's original key case; `containsKey` is + // case-insensitive so defaults don't create duplicate keys. + val merged = new java.util.HashMap[String, String](options.asCaseSensitiveMap()) + Seq( + UCTokenBasedRestClientFactory.DELTA_REST_API_ENABLED_KEY -> "true", + UCTokenBasedRestClientFactory.RENEW_CREDENTIAL_ENABLED_KEY -> "true", + UCTokenBasedRestClientFactory.CRED_SCOPED_FS_ENABLED_KEY -> "false" + ).foreach { case (k, v) => if (!options.containsKey(k)) merged.put(k, v) } + val ucClient = UCTokenBasedRestClientFactory + .createUCClient(new CaseInsensitiveStringMap(merged)) + .asInstanceOf[UCDeltaClient] + val sspEnabled = options.getBoolean(ServerSidePlanningEnabledKey, false) - val hadoopConfSupplier: Supplier[Configuration] = - () => SparkSession.getActiveSession - .map(_.sparkContext.hadoopConfiguration) - .getOrElse(new Configuration()) - val restClient = UCDeltaTokenBasedRestClient.create( - uri, - authConfigs, - appVersions, - renewCredEnabled, - credScopedFsEnabled, - hadoopConfSupplier) - new UCDeltaCatalogClientImpl(catalogName, restClient, sspEnabled, fallbackLoadTable) + new UCDeltaCatalogClientImpl(catalogName, ucClient, sspEnabled, fallbackLoadTableFunc) } + private val UriKey: String = "uri" + private val AuthPrefix: String = "auth." + private val LegacyTokenKey: String = "token" + /** - * `auth.*` sub-keys (prefix stripped) feed `TokenProvider.create`. Legacy bare `token` - * is translated to `{type=static, token=}`, only when no `auth.*` is present. + * Pre-flight: ensure at least one of `auth.*` or legacy `token` is present, so the user + * sees a clear error (and catalog name) instead of the factory's internal failure when + * `TokenProvider.create` is handed an empty config. */ - private[catalog] def extractAuthConfigs( + private[catalog] def validateAuthConfigured( options: CaseInsensitiveStringMap, - catalogName: String): java.util.Map[String, String] = { - val authConfigs = new java.util.HashMap[String, String]() - val authPrefix = "auth." - // CaseInsensitiveStringMap.entrySet() returns keys already lowercased. - options.entrySet().asScala.foreach { e => - val key = e.getKey - if (key.startsWith(authPrefix)) { - authConfigs.put(key.substring(authPrefix.length), e.getValue) - } - } - if (authConfigs.isEmpty) { - Option(options.get("token")).foreach { tok => - authConfigs.put("type", "static") - authConfigs.put("token", tok) - } - } - if (authConfigs.isEmpty) { + catalogName: String): Unit = { + val hasAuthPrefix = options.entrySet().asScala.exists(_.getKey.startsWith(AuthPrefix)) + val hasLegacyToken = options.get(LegacyTokenKey) != null + if (!hasAuthPrefix && !hasLegacyToken) { throw new IllegalArgumentException( s"auth configuration is required when 'deltaRestApi.enabled' is true " + - s"(catalog '$catalogName'). Set either 'auth.type' (with the corresponding " + - s"auth.* keys) or the legacy 'token' option.") + s"(catalog '$catalogName'). Set either '${AuthPrefix}type' (with the corresponding " + + s"$AuthPrefix* keys) or the legacy '$LegacyTokenKey' option.") } - authConfigs } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilder.scala b/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilder.scala index f9b752ba6e0..b291849fe8d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilder.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilder.scala @@ -18,12 +18,14 @@ package org.apache.spark.sql.delta.coordinatedcommits import java.net.{URI, URISyntaxException} import java.util.concurrent.ConcurrentHashMap +import java.util.function.Supplier import scala.collection.JavaConverters._ import scala.util.control.NonFatal import io.delta.storage.commit.CommitCoordinatorClient import io.delta.storage.commit.uccommitcoordinator.{UCClient, UCCommitCoordinatorClient, UCTokenBasedRestClient} +import org.apache.hadoop.conf.Configuration import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.metering.DeltaLogging @@ -31,6 +33,7 @@ import org.apache.spark.sql.delta.metering.DeltaLogging import io.unitycatalog.client.auth.TokenProvider import org.apache.spark.internal.MDC import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.Utils /** @@ -82,7 +85,7 @@ object UCCommitCoordinatorBuilder spark: SparkSession, catalogName: String): CommitCoordinatorClient = { val client = getCatalogConfigs(spark).find(_._1 == catalogName) match { - case Some((_, ucConfig)) => ucClientFactory.createUCClient(ucConfig) + case Some((_, ucConfig)) => ucClientFactory.createUCClient(ucConfig.asJava) case None => throw new IllegalArgumentException( s"Catalog $catalogName not found in the provided SparkSession configurations.") @@ -107,7 +110,7 @@ object UCCommitCoordinatorBuilder matchingConfigs match { case Nil => throw noMatchingCatalogException(metastoreId) - case ucConfig :: Nil => ucClientFactory.createUCClient(ucConfig) + case ucConfig :: Nil => ucClientFactory.createUCClient(ucConfig.asJava) case multiple => throw multipleMatchingCatalogs(metastoreId, multiple.map(_.getOrElse("uri", ""))) } @@ -125,7 +128,7 @@ object UCCommitCoordinatorBuilder val metastoreId = ucConfigToMetastoreIdCache.computeIfAbsent( ucConfig, _ => { - val ucClient = ucClientFactory.createUCClient(ucConfig) + val ucClient = ucClientFactory.createUCClient(ucConfig.asJava) try { ucClient.getMetastoreId } finally { @@ -246,7 +249,7 @@ object UCCommitCoordinatorBuilder /** Factory trait for creating [[UCClient]] instances from a unified configuration map. */ trait UCClientFactory { - def createUCClient(ucConfig: Map[String, String]): UCClient + def createUCClient(ucConfig: java.util.Map[String, String]): UCClient } /** @@ -288,21 +291,25 @@ object UCTokenBasedRestClientFactory extends UCClientFactory { final val AUTH_PREFIX = "auth." final val DELTA_REST_API_ENABLED_KEY = "deltaRestApi.enabled" final val APP_VERSIONS_PREFIX = "appVersions." + /** Opt-in: caller wants `UCDeltaTokenBasedRestClient` constructed with credential renewal. */ + final val RENEW_CREDENTIAL_ENABLED_KEY = "renewCredential.enabled" + /** Opt-in: caller wants `UCDeltaTokenBasedRestClient` constructed with cred-scoped FS. */ + final val CRED_SCOPED_FS_ENABLED_KEY = "credScopedFs.enabled" private val DEFAULT_UC_CLIENT_CLASS: String = classOf[UCTokenBasedRestClient].getName private val DELTA_UC_CLIENT_CLASS: String = "io.delta.storage.commit.uccommitcoordinator.UCDeltaTokenBasedRestClient" - override def createUCClient(ucConfig: Map[String, String]): UCClient = { - val uri = ucConfig.getOrElse(URI_KEY, + override def createUCClient(ucConfig: java.util.Map[String, String]): UCClient = { + val uri = Option(ucConfig.get(URI_KEY)).getOrElse( throw new IllegalArgumentException(s"UC config must contain '$URI_KEY'")) val authConfig = extractAuthConfig(ucConfig) val tokenProvider = TokenProvider.create(authConfig.asJava) val className = - if (ucConfig.get(DELTA_REST_API_ENABLED_KEY).exists(_.equalsIgnoreCase("true"))) { + if (Option(ucConfig.get(DELTA_REST_API_ENABLED_KEY)).exists(_.equalsIgnoreCase("true"))) { DELTA_UC_CLIENT_CLASS } else { DEFAULT_UC_CLIENT_CLASS @@ -312,14 +319,28 @@ object UCTokenBasedRestClientFactory extends UCClientFactory { require(classOf[UCClient].isAssignableFrom(cls), s"$className does not implement ${classOf[UCClient].getName}") val appVersions = extractAppVersions(ucConfig) + val renewCred = Option(ucConfig.get(RENEW_CREDENTIAL_ENABLED_KEY)).exists(_.toBoolean) + val credScopedFs = Option(ucConfig.get(CRED_SCOPED_FS_ENABLED_KEY)).exists(_.toBoolean) + val hadoopConfSupplier: Supplier[Configuration] = () => + SparkSession.getActiveSession + .map(_.sparkContext.hadoopConfiguration) + .getOrElse(new Configuration()) val ctor = cls.getConstructor( - classOf[String], classOf[TokenProvider], classOf[java.util.Map[_, _]]) - ctor.newInstance(uri, tokenProvider, appVersions.asJava).asInstanceOf[UCClient] - } - - /** Java-friendly overload that accepts a java.util.Map. */ - def createUCClient(ucConfig: java.util.Map[String, String]): UCClient = { - createUCClient(ucConfig.asScala.toMap) + classOf[String], + classOf[TokenProvider], + classOf[java.util.Map[_, _]], + java.lang.Boolean.TYPE, + java.lang.Boolean.TYPE, + classOf[Supplier[_]]) + ctor + .newInstance( + uri, + tokenProvider, + appVersions.asJava, + java.lang.Boolean.valueOf(renewCred), + java.lang.Boolean.valueOf(credScopedFs), + hadoopConfSupplier) + .asInstanceOf[UCClient] } /** @@ -327,16 +348,20 @@ object UCTokenBasedRestClientFactory extends UCClientFactory { * Prefers `auth.*` keys; falls back to legacy `token` key. */ private[coordinatedcommits] def extractAuthConfig( - ucConfig: Map[String, String]): Map[String, String] = { - val authConfig = ucConfig - .filterKeys(_.startsWith(AUTH_PREFIX)) - .map { case (k, v) => (k.stripPrefix(AUTH_PREFIX), v) } + ucConfig: java.util.Map[String, String]): Map[String, String] = { + val authPrefixLower = AUTH_PREFIX.toLowerCase(java.util.Locale.ROOT) + val authConfig = ucConfig.entrySet().asScala.iterator + .collect { + case e if e.getKey.toLowerCase(java.util.Locale.ROOT).startsWith(authPrefixLower) => + val suffix = e.getKey.substring(authPrefixLower.length) + suffix -> e.getValue + } .toMap if (authConfig.nonEmpty) { authConfig } else { - ucConfig.get("token") match { + Option(ucConfig.get("token")) match { case Some(token) => Map("type" -> "static", "token" -> token) case None => Map.empty } @@ -348,10 +373,13 @@ object UCTokenBasedRestClientFactory extends UCClientFactory { * Caller-supplied entries override defaults with the same key. */ private[coordinatedcommits] def extractAppVersions( - ucConfig: Map[String, String]): Map[String, String] = { - val extra = ucConfig - .filterKeys(_.startsWith(APP_VERSIONS_PREFIX)) - .map { case (k, v) => (k.stripPrefix(APP_VERSIONS_PREFIX), v) } + ucConfig: java.util.Map[String, String]): Map[String, String] = { + val appPrefixLower = APP_VERSIONS_PREFIX.toLowerCase(java.util.Locale.ROOT) + val extra = ucConfig.entrySet().asScala.iterator + .collect { + case e if e.getKey.toLowerCase(java.util.Locale.ROOT).startsWith(appPrefixLower) => + e.getKey.substring(appPrefixLower.length) -> e.getValue + } .toMap defaultAppVersions ++ extra } @@ -382,5 +410,5 @@ case class UCCatalogConfig(catalogName: String, ucConfig: Map[String, String]) { * Prefers `auth.*` keys; falls back to legacy `token` key. */ def authConfig: Map[String, String] = - UCTokenBasedRestClientFactory.extractAuthConfig(ucConfig) + UCTokenBasedRestClientFactory.extractAuthConfig(ucConfig.asJava) } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilderSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilderSuite.scala index a073a70a625..1da0da4eead 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilderSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilderSuite.scala @@ -16,6 +16,8 @@ package org.apache.spark.sql.delta.coordinatedcommits +import scala.collection.JavaConverters._ + import io.delta.storage.commit.uccommitcoordinator.{UCClient, UCCommitCoordinatorClient} import org.mockito.{Mock, Mockito} import org.mockito.ArgumentMatchers.{any, eq => meq} @@ -50,9 +52,9 @@ class UCCommitCoordinatorBuilderSuite extends SparkFunSuite with SharedSparkSess * parsing: all sub-keys under spark.sql.catalog..* * with the prefix stripped. Includes `uri` when present. */ - def expectedUcConfig: Map[String, String] = { + def expectedUcConfig: java.util.Map[String, String] = { val base = configMap - uri.map(u => base + ("uri" -> u)).getOrElse(base) + uri.map(u => base + ("uri" -> u)).getOrElse(base).asJava } } @@ -241,7 +243,7 @@ class UCCommitCoordinatorBuilderSuite extends SparkFunSuite with SharedSparkSess val metastoreId = "shared-metastore-id" val sharedUri = "https://shared-test-uri.com" val sharedConfigMap = Map("type" -> "static", "token" -> "shared-test-token") - val sharedUcConfig = sharedConfigMap + ("uri" -> sharedUri) + val sharedUcConfig = (sharedConfigMap + ("uri" -> sharedUri)).asJava val catalog1 = CatalogTestConfig( name = "catalog1", uri = Some(sharedUri), @@ -292,7 +294,7 @@ class UCCommitCoordinatorBuilderSuite extends SparkFunSuite with SharedSparkSess } private def registerMetastoreId( - ucConfig: Map[String, String], + ucConfig: java.util.Map[String, String], metastoreId: String): Unit = { val mockClient = org.mockito.Mockito.mock(classOf[UCClient]) when(mockClient.getMetastoreId).thenReturn(metastoreId) @@ -300,7 +302,7 @@ class UCCommitCoordinatorBuilderSuite extends SparkFunSuite with SharedSparkSess } private def registerMetastoreIdException( - ucConfig: Map[String, String], + ucConfig: java.util.Map[String, String], exception: Throwable): Unit = { val mockClient = org.mockito.Mockito.mock(classOf[UCClient]) when(mockClient.getMetastoreId).thenThrow(exception) @@ -461,7 +463,7 @@ class UCCommitCoordinatorBuilderSuite extends SparkFunSuite with SharedSparkSess "auth.type" -> "static", "auth.token" -> "new-token" ) - val auth = UCTokenBasedRestClientFactory.extractAuthConfig(ucConfig) + val auth = UCTokenBasedRestClientFactory.extractAuthConfig(ucConfig.asJava) assert(auth("type") == "static") assert(auth("token") == "new-token") } @@ -471,7 +473,7 @@ class UCCommitCoordinatorBuilderSuite extends SparkFunSuite with SharedSparkSess "uri" -> "https://test.com", "token" -> "legacy-token" ) - val auth = UCTokenBasedRestClientFactory.extractAuthConfig(ucConfig) + val auth = UCTokenBasedRestClientFactory.extractAuthConfig(ucConfig.asJava) assert(auth("type") == "static") assert(auth("token") == "legacy-token") } @@ -490,7 +492,7 @@ class UCCommitCoordinatorBuilderSuite extends SparkFunSuite with SharedSparkSess assert(result.isInstanceOf[UCCommitCoordinatorClient]) verify(mockFactory).createUCClient( - any[Map[String, String]]() + any[java.util.Map[String, String]]() ) } } @@ -510,7 +512,7 @@ class UCCommitCoordinatorBuilderSuite extends SparkFunSuite with SharedSparkSess assert(result.isInstanceOf[UCCommitCoordinatorClient]) verify(mockFactory).createUCClient( - any[Map[String, String]]() + any[java.util.Map[String, String]]() ) } } @@ -521,7 +523,7 @@ class UCCommitCoordinatorBuilderSuite extends SparkFunSuite with SharedSparkSess "appVersions.Kernel" -> "0.7.0", "appVersions.Delta V2 connector" -> "true" ) - val versions = UCTokenBasedRestClientFactory.extractAppVersions(ucConfig) + val versions = UCTokenBasedRestClientFactory.extractAppVersions(ucConfig.asJava) assert(versions("Delta") === io.delta.VERSION) assert(versions("Spark") === org.apache.spark.SPARK_VERSION) assert(versions("Kernel") === "0.7.0") diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorClientSuiteBase.scala b/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorClientSuiteBase.scala index 1653fdac1e4..a99fefb1c9e 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorClientSuiteBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorClientSuiteBase.scala @@ -84,7 +84,7 @@ trait UCCommitCoordinatorClientSuiteBase extends CommitCoordinatorClientImplSuit CommitCoordinatorProvider.registerBuilder(UCCommitCoordinatorBuilder) ucCommitCoordinator = new InMemoryUCCommitCoordinator() ucClient = new InMemoryUCClient(metastoreId.toString, ucCommitCoordinator) - when(mockFactory.createUCClient(any[Map[String, String]]())).thenReturn(ucClient) + when(mockFactory.createUCClient(any[java.util.Map[String, String]]())).thenReturn(ucClient) } override protected def createTableCommitCoordinatorClient( deltaLog: DeltaLog): TableCommitCoordinatorClient = { diff --git a/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableIntegrationBaseTest.java b/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableIntegrationBaseTest.java index 62375568d7c..2f6d97d43e8 100644 --- a/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableIntegrationBaseTest.java +++ b/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableIntegrationBaseTest.java @@ -38,6 +38,7 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.delta.catalog.UCDeltaCatalogClientImpl; +import org.apache.spark.sql.delta.coordinatedcommits.UCTokenBasedRestClientFactory; import org.assertj.core.api.ThrowableAssert.ThrowingCallable; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -149,20 +150,27 @@ private SparkConf configureSparkWithUnityCatalog(SparkConf conf) { .set("spark.sql.catalog." + catalogName + ".uri", uc.serverUri()) .set("spark.sql.catalog." + catalogName + ".token", uc.serverToken()); if (useDeltaRestApiForTests()) { - conf = conf.set("spark.sql.catalog." + catalogName + ".deltaRestApi.enabled", "true"); + conf = + conf.set( + "spark.sql.catalog." + + catalogName + + "." + + UCTokenBasedRestClientFactory.DELTA_REST_API_ENABLED_KEY(), + "true"); } return conf; } /** Subclasses can override to false for A/B comparison with the legacy path. */ protected boolean useDeltaRestApiForTests() { - return true; + // TODO: turn this on once the Delta API is fully integrated. + return false; } /** * Whether the class-level @AfterAll should assert that the Delta REST API actually served at * least one load. Override to false in classes that intentionally exercise only the fallback path - * (which does NOT bump {@code SUCCESSFUL_DELTA_REST_API_LOADS}), so the class-level check doesn't + * (which does NOT bump the successfulDeltaRestApiLoads counter), so the class-level check doesn't * false-positive when test sharding distributes methods across CI shards. */ protected boolean expectDeltaRestApiSuccessAtClassLevel() { @@ -177,8 +185,8 @@ protected boolean expectDeltaRestApiSuccessAtClassLevel() { @BeforeAll public void captureDeltaRestApiBaseline() { deltaRestApiLoadsAtClassStart = - UCDeltaCatalogClientImpl.SUCCESSFUL_DELTA_REST_API_LOADS().get(); - loadTableInvocationsAtClassStart = UCDeltaCatalogClientImpl.LOAD_TABLE_INVOCATIONS().get(); + UCDeltaCatalogClientImpl.successfulDeltaRestApiLoadsForTesting(); + loadTableInvocationsAtClassStart = UCDeltaCatalogClientImpl.loadTableInvocationsForTesting(); } @AfterAll @@ -186,13 +194,13 @@ public void verifyDeltaRestApiExercisedAtClassLevel() { if (!useDeltaRestApiForTests() || !expectDeltaRestApiSuccessAtClassLevel()) { return; } - long loadInvocationsAfter = UCDeltaCatalogClientImpl.LOAD_TABLE_INVOCATIONS().get(); + long loadInvocationsAfter = UCDeltaCatalogClientImpl.loadTableInvocationsForTesting(); if (loadInvocationsAfter <= loadTableInvocationsAtClassStart) { // Every test in the suite was aborted (e.g. via Assumption.assumeTrue) before any // loadTable call ran, so there is nothing to assert about the Delta REST API path. return; } - long after = UCDeltaCatalogClientImpl.SUCCESSFUL_DELTA_REST_API_LOADS().get(); + long after = UCDeltaCatalogClientImpl.successfulDeltaRestApiLoadsForTesting(); if (after <= deltaRestApiLoadsAtClassStart) { throw new AssertionError( "Suite finished but no UCDeltaCatalogClientImpl.loadTable call actually returned a " diff --git a/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableNonDeltaFallbackTest.java b/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableNonDeltaFallbackTest.java index 74e4c9f4f5a..213f9d269df 100644 --- a/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableNonDeltaFallbackTest.java +++ b/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableNonDeltaFallbackTest.java @@ -31,12 +31,17 @@ * *

This class disables the class-level "Delta REST API served at least one load" assertion (see * {@link #expectDeltaRestApiSuccessAtClassLevel()}) because its tests intentionally exercise only - * the fallback path, which does not bump {@code SUCCESSFUL_DELTA_REST_API_LOADS}. CI sharding also + * the fallback path, which does not bump the successfulDeltaRestApiLoads counter. CI sharding also * makes a same-file "sanity" Delta test unreliable: methods can be distributed across shards, so * each shard's @AfterAll runs without a guarantee of seeing both methods. */ public class UCDeltaTableNonDeltaFallbackTest extends UCDeltaTableIntegrationBaseTest { + @Override + protected boolean useDeltaRestApiForTests() { + return true; + } + @Override protected boolean expectDeltaRestApiSuccessAtClassLevel() { return false; @@ -59,12 +64,12 @@ public void testLoadNonDeltaParquetExternalTableFallsBackToLegacyCatalog() throw fullTableName, tablePath); sql("INSERT INTO %s VALUES (1, 'alpha'), (2, 'beta'), (3, 'gamma')", fullTableName); - long invocationsBefore = UCDeltaCatalogClientImpl.LOAD_TABLE_INVOCATIONS().get(); - long successesBefore = UCDeltaCatalogClientImpl.SUCCESSFUL_DELTA_REST_API_LOADS().get(); + long invocationsBefore = UCDeltaCatalogClientImpl.loadTableInvocationsForTesting(); + long successesBefore = UCDeltaCatalogClientImpl.successfulDeltaRestApiLoadsForTesting(); // The Delta REST API path runs first: ucClient.loadTable -> UC server returns // UnsupportedTableFormatException (table isn't Delta-format) -> the catch handler - // calls fallbackLoadTable(ident) which is super.loadTable from AbstractDeltaCatalog + // calls fallbackLoadTableFunc(ident) which is super.loadTable from AbstractDeltaCatalog // (i.e. the legacy DelegatingCatalogExtension delegate). The SELECT below succeeds // only if that fallback hands back a usable V1 table for the Parquet data. List> rows = sql("SELECT id, name FROM %s ORDER BY id", fullTableName); @@ -75,11 +80,11 @@ public void testLoadNonDeltaParquetExternalTableFallsBackToLegacyCatalog() throw // did NOT successfully serve the load. A future regression that silently returned // a (wrong) Delta table from the REST path would bump the success counter and fail // this assertion, even though the row data check above would also still pass. - long invocationsAfter = UCDeltaCatalogClientImpl.LOAD_TABLE_INVOCATIONS().get(); - long successesAfter = UCDeltaCatalogClientImpl.SUCCESSFUL_DELTA_REST_API_LOADS().get(); + long invocationsAfter = UCDeltaCatalogClientImpl.loadTableInvocationsForTesting(); + long successesAfter = UCDeltaCatalogClientImpl.successfulDeltaRestApiLoadsForTesting(); if (invocationsAfter <= invocationsBefore) { throw new AssertionError( - "Expected LOAD_TABLE_INVOCATIONS to increase during the SELECT, but it did not" + "Expected loadTableInvocations to increase during the SELECT, but it did not" + " (before=" + invocationsBefore + ", after=" @@ -88,7 +93,7 @@ public void testLoadNonDeltaParquetExternalTableFallsBackToLegacyCatalog() throw } if (successesAfter != successesBefore) { throw new AssertionError( - "Expected SUCCESSFUL_DELTA_REST_API_LOADS to be unchanged (fallback path took" + "Expected successfulDeltaRestApiLoads to be unchanged (fallback path took" + " over), but it changed (before=" + successesBefore + ", after=" diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClient.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClient.java index 4a385cd9338..53c8be864dd 100644 --- a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClient.java +++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClient.java @@ -46,12 +46,14 @@ import io.unitycatalog.client.model.DataSourceFormat; import io.unitycatalog.client.model.GetMetastoreSummaryResponse; import io.unitycatalog.client.model.TableType; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import java.io.IOException; import java.net.URI; import java.util.*; +import java.util.function.Supplier; /** * A REST client implementation of {@link UCClient} for interacting with Unity Catalog's commit @@ -129,6 +131,22 @@ public UCTokenBasedRestClient( this.tablesApi = new TablesApi(apiClient); } + /** + * 6-arg constructor for symmetry with {@link UCDeltaTokenBasedRestClient}. The + * {@code credentialRenewalEnabled}, {@code credentialScopedFsEnabled}, and + * {@code hadoopConfSupplier} parameters are not used by this client and are accepted only so + * that callers can construct either client uniformly by reflection. + */ + public UCTokenBasedRestClient( + String baseUri, + TokenProvider tokenProvider, + Map appVersions, + boolean credentialRenewalEnabled, + boolean credentialScopedFsEnabled, + Supplier hadoopConfSupplier) { + this(baseUri, tokenProvider, appVersions); + } + /** * Ensures the client has not been closed. Must be called before any API operation. */ diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/exceptions/CredentialFetchFailedException.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/exceptions/CredentialFetchFailedException.java index 35742005bd3..ecdb7a4e9ba 100644 --- a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/exceptions/CredentialFetchFailedException.java +++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/exceptions/CredentialFetchFailedException.java @@ -18,13 +18,12 @@ import io.delta.storage.commit.uccommitcoordinator.UCDeltaClient; import io.delta.storage.commit.uccommitcoordinator.UCDeltaModels.TableInfo; -import java.io.IOException; /** * Thrown by {@link UCDeltaClient} when credential vending exhausts retries. Carries a * cred-less {@link TableInfo} so callers with a fallback (e.g. SSP) can recover. */ -public class CredentialFetchFailedException extends IOException { +public class CredentialFetchFailedException extends RuntimeException { private final TableInfo tableInfoWithoutCredentials; diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/exceptions/NoSuchTableException.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/exceptions/NoSuchTableException.java index c2f3d7c3d33..6f45a774255 100644 --- a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/exceptions/NoSuchTableException.java +++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/exceptions/NoSuchTableException.java @@ -16,13 +16,11 @@ package io.delta.storage.commit.uccommitcoordinator.exceptions; -import java.io.IOException; - /** * Thrown by {@link io.delta.storage.commit.uccommitcoordinator.UCDeltaClient} operations when the * catalog reports that the requested table does not exist (HTTP 404). */ -public class NoSuchTableException extends IOException { +public class NoSuchTableException extends RuntimeException { public NoSuchTableException(String message) { super(message); } diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/exceptions/UnsupportedTableFormatException.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/exceptions/UnsupportedTableFormatException.java index 02bedc8a0a9..329a2e125de 100644 --- a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/exceptions/UnsupportedTableFormatException.java +++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/exceptions/UnsupportedTableFormatException.java @@ -16,14 +16,12 @@ package io.delta.storage.commit.uccommitcoordinator.exceptions; -import java.io.IOException; - /** * Thrown when the catalog refuses to serve a non-Delta table. Callers should fall back to a * non-Delta-REST-API load path. Emitted as HTTP 400 with {@code error.type = * "UnsupportedTableFormatException"} on the wire. */ -public class UnsupportedTableFormatException extends IOException { +public class UnsupportedTableFormatException extends RuntimeException { public UnsupportedTableFormatException(String message) { super(message); }