diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala index 86a0f9b4ee9..8502108d671 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala @@ -63,6 +63,7 @@ import org.apache.spark.sql.execution.datasources.{DataSource, PartitioningUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.InsertableRelation import org.apache.spark.sql.types.{IntegerType, StructField, StructType} +import org.apache.spark.sql.util.CaseInsensitiveStringMap /** @@ -83,6 +84,22 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension val spark = SparkSession.active + /** + * When defined, table operations are routed through this client instead of through the + * [[org.apache.spark.sql.connector.catalog.DelegatingCatalogExtension]] delegate that + * `AbstractDeltaCatalog` normally relies on. This lets the catalog inject custom + * interactions (e.g. talking to a REST endpoint, catalog-specific property handling, + * storage-credential vending) rather than going through the Spark + * [[org.apache.spark.sql.connector.catalog.TableCatalog]] API. + */ + private[catalog] var deltaCatalogClient: Option[AbstractDeltaCatalogClient] = None + + override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = { + super.initialize(name, options) + deltaCatalogClient = + AbstractDeltaCatalogClient.fromCatalogOptionsIfEnabled(name, options, super.loadTable) + } + private lazy val isUnityCatalog: Boolean = { val delegateField = classOf[DelegatingCatalogExtension].getDeclaredField("delegate") delegateField.setAccessible(true) @@ -290,7 +307,9 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension "DeltaCatalog", "loadTable") { setVariantBlockingConfigIfUC() try { - val table = super.loadTable(ident) + val table = deltaCatalogClient + .map(_.loadTable(ident)) + .getOrElse(super.loadTable(ident)) ServerSidePlannedTable.tryCreate(spark, ident, table, isUnityCatalog).foreach { sspt => return sspt diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalogClient.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalogClient.scala new file mode 100644 index 00000000000..defce5356a5 --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalogClient.scala @@ -0,0 +1,85 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.catalog + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.connector.catalog.{Identifier, Table} +import org.apache.spark.sql.delta.coordinatedcommits.UCTokenBasedRestClientFactory +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +/** + * Backend hook through which [[AbstractDeltaCatalog]] injects custom catalog interactions + * that bypass the catalog operations normally provided by Spark's + * [[org.apache.spark.sql.connector.catalog.TableCatalog]] interface (the + * [[org.apache.spark.sql.connector.catalog.DelegatingCatalogExtension]] delegate that + * `AbstractDeltaCatalog` extends). Concrete implementations route table operations to a + * catalog-specific path, e.g. talking directly to a REST endpoint instead of the + * configured delegate, applying catalog-specific table-property handling, or vending + * storage credentials on the returned [[Table]]. Keeping these behind a client interface + * isolates that plumbing from `AbstractDeltaCatalog`. + */ +private[catalog] trait AbstractDeltaCatalogClient { + + /** + * @throws org.apache.spark.sql.catalyst.analysis.NoSuchTableException if the catalog has + * no record of this identifier + */ + def loadTable(ident: Identifier): Table +} + +/** Builds a [[AbstractDeltaCatalogClient]] from catalog options. */ +private[catalog] trait AbstractDeltaCatalogClientFactory { + def fromCatalogOptions( + catalogName: String, + options: CaseInsensitiveStringMap, + fallbackLoadTableFunc: Identifier => Table): AbstractDeltaCatalogClient +} + +private[catalog] object AbstractDeltaCatalogClient extends Logging { + + private val UC_DELTA_CATALOG_CLIENT_IMPL_CLASS_NAME: String = + "org.apache.spark.sql.delta.catalog.UCDeltaCatalogClientImpl" + + /** + * Returns a [[AbstractDeltaCatalogClient]] wrapped in [[Some]] when the catalog opted in via + * `deltaRestApi.enabled`, else [[None]]. The concrete impl is loaded reflectively so + * [[AbstractDeltaCatalog]] doesn't compile-depend on it. If opt-in is explicit but reflective + * loading fails, throws [[IllegalStateException]] rather than silently degrading. + */ + def fromCatalogOptionsIfEnabled( + catalogName: String, + options: CaseInsensitiveStringMap, + fallbackLoadTableFunc: Identifier => Table): Option[AbstractDeltaCatalogClient] = { + val key = UCTokenBasedRestClientFactory.DELTA_REST_API_ENABLED_KEY + if (!options.getBoolean(key, false)) { + return None + } + val factory = try { + // scalastyle:off classforname + val cls = Class.forName(UC_DELTA_CATALOG_CLIENT_IMPL_CLASS_NAME + "$") + // scalastyle:on classforname + cls.getField("MODULE$").get(null).asInstanceOf[AbstractDeltaCatalogClientFactory] + } catch { + case e: Exception => + throw new IllegalStateException( + s"Failed to load $UC_DELTA_CATALOG_CLIENT_IMPL_CLASS_NAME though '$key' is true. " + + "Ensure the implementation JAR is on the classpath, or remove " + + s"'$key' from the catalog options to fall back to the legacy delegate.", e) + } + Some(factory.fromCatalogOptions(catalogName, options, fallbackLoadTableFunc)) + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/UCDeltaCatalogClientImpl.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/UCDeltaCatalogClientImpl.scala new file mode 100644 index 00000000000..aa1620bcde6 --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/UCDeltaCatalogClientImpl.scala @@ -0,0 +1,237 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.catalog + +import java.net.URI +import java.util.concurrent.atomic.AtomicLong + +import scala.jdk.CollectionConverters._ + +import io.delta.storage.commit.{TableIdentifier => StorageTableIdentifier} +import io.delta.storage.commit.uccommitcoordinator.{UCDeltaClient, UCDeltaModels} +import io.delta.storage.commit.uccommitcoordinator.UCDeltaModels.TableInfo +import io.delta.storage.commit.uccommitcoordinator.exceptions.{ + CredentialFetchFailedException, + UnsupportedTableFormatException, + NoSuchTableException => StorageNoSuchTableException +} + +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException +import org.apache.spark.sql.catalyst.catalog.{ + CatalogStorageFormat, + CatalogTable, + CatalogTableType +} +import org.apache.spark.sql.connector.catalog.{Identifier, Table, V1Table} +import org.apache.spark.sql.delta.coordinatedcommits.UCTokenBasedRestClientFactory +import org.apache.spark.sql.delta.logging.DeltaLogKeys +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +/** + * [[AbstractDeltaCatalogClient]] backed by a [[UCDeltaClient]]; translates between + * Spark/Delta types and the storage-side UC types. + */ +private[catalog] class UCDeltaCatalogClientImpl( + catalogName: String, + ucClient: UCDeltaClient, + serverSidePlanningEnabled: Boolean = false, + fallbackLoadTableFunc: Identifier => Table + = UCDeltaCatalogClientImpl.defaultFallbackLoadTableFunc) + extends AbstractDeltaCatalogClient with Logging { + + override def loadTable(ident: Identifier): Table = { + UCDeltaCatalogClientImpl.loadTableInvocationsCounter.incrementAndGet() + val tid = toStorageTableIdent(ident) + val info = + try ucClient.loadTable(tid) + catch { + case _: StorageNoSuchTableException => throw new NoSuchTableException(ident) + case e: UnsupportedTableFormatException => + logInfo(log"Table ${MDC(DeltaLogKeys.TABLE_NAME, ident)} is not in Delta format; " + + log"falling back to the legacy catalog path. Cause: " + + log"${MDC(DeltaLogKeys.EXCEPTION, e.getMessage)}") + return fallbackLoadTableFunc(ident) + case e: CredentialFetchFailedException if serverSidePlanningEnabled => + logWarning(log"Credential fetch failed for " + + log"${MDC(DeltaLogKeys.TABLE_NAME, fullQualifiedTableName(tid))}; enabling " + + log"server-side planning fallback. Cause: " + + log"${MDC(DeltaLogKeys.EXCEPTION, e.getMessage)}") + enableServerSidePlanningConfig(ident) + e.getTableInfoWithoutCredentials + } + UCDeltaCatalogClientImpl.successfulDeltaRestApiLoadsCounter.incrementAndGet() + toV1Table(ident, info) + } + + private def enableServerSidePlanningConfig(ident: Identifier): Unit = { + SparkSession.getActiveSession match { + case Some(spark) => + spark.conf.set(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key, "true") + logInfo(log"Server-side planning enabled for table " + + log"${MDC(DeltaLogKeys.TABLE_NAME, ident)}; Delta will read via SSP with empty creds.") + case None => + logWarning(log"Server-side planning requested for table " + + log"${MDC(DeltaLogKeys.TABLE_NAME, ident)} but no active SparkSession found.") + } + } + + // ---------- conversions ---------- + + private def toStorageTableIdent(ident: Identifier): StorageTableIdentifier = { + val ns = ident.namespace() + require( + ns.length == 1, + s"UC identifiers must be of the form .; got namespace of length " + + s"${ns.length}: '${ns.mkString(".")}' (full identifier: '${ident.toString}')") + new StorageTableIdentifier(Array(catalogName, ns(0)), ident.name()) + } + + /** Three-part dotted name from a `[catalog, schema]` + `name` storage identifier. */ + private def fullQualifiedTableName(t: StorageTableIdentifier): String = { + val ns = t.getNamespace + s"${ns(0)}.${ns(1)}.${t.getName}" + } + + private def toV1Table(ident: Identifier, info: TableInfo): V1Table = { + val m = info.getMetadata + val properties = Option(m.getConfiguration) + .map(_.asScala.toMap) + .getOrElse(Map.empty[String, String]) + val partitionColumns = Option(m.getPartitionColumns) + .map(_.asScala.toSeq) + .getOrElse(Seq.empty[String]) + val schema = Option(m.getSchemaString) + .map(DataType.fromJson(_).asInstanceOf[StructType]) + .getOrElse(new StructType()) + val storage = CatalogStorageFormat.empty.copy( + locationUri = Some(new URI(info.getLocation)), + properties = properties ++ info.getStorageProperties.asScala.toMap) + val catalogTable = CatalogTable( + identifier = TableIdentifier(ident.name(), ident.namespace().headOption, Some(catalogName)), + tableType = fromUcTableType(info.getTableType), + storage = storage, + schema = schema, + provider = Option(m.getProvider).map(_.toLowerCase(java.util.Locale.ROOT)), + partitionColumnNames = partitionColumns, + comment = Option(m.getDescription), + createTime = if (m.getCreatedTime != null) m.getCreatedTime else 0L, + tracksPartitionsInCatalog = false) + V1Table(catalogTable) + } + + private def fromUcTableType(t: UCDeltaModels.TableType): CatalogTableType = t match { + case UCDeltaModels.TableType.MANAGED => CatalogTableType.MANAGED + case UCDeltaModels.TableType.EXTERNAL => CatalogTableType.EXTERNAL + } +} + +object UCDeltaCatalogClientImpl extends AbstractDeltaCatalogClientFactory with Logging { + // Test-only instrumentation. The mutable counters are encapsulated so production code + // can neither read nor write them; read access is exposed via the `*ForTesting` methods + // below so cross-package integration tests (e.g. `io.sparkuctest.*`) don't need + // reflection. + + /** Bumped at every `loadTable` entry regardless of outcome. Read via the *ForTesting API. */ + private val loadTableInvocationsCounter: AtomicLong = new AtomicLong(0L) + + /** + * Bumped only when `loadTable` returned a Delta table via the Delta REST API (no fallback, + * no rethrow). Read via the *ForTesting API. + */ + private val successfulDeltaRestApiLoadsCounter: AtomicLong = new AtomicLong(0L) + + /** + * Test-only read accessor for the `loadTable` invocation counter. Used by integration + * tests to verify the Delta REST API code path ran. Not part of any public API; production + * code must not depend on it. + */ + def loadTableInvocationsForTesting: Long = loadTableInvocationsCounter.get() + + /** + * Test-only read accessor for the count of `loadTable` calls served by the Delta REST API + * (no fallback, no rethrow). Not part of any public API. + */ + def successfulDeltaRestApiLoadsForTesting: Long = successfulDeltaRestApiLoadsCounter.get() + + private[catalog] val ServerSidePlanningEnabledKey: String = "serverSidePlanning.enabled" + + private[catalog] val defaultFallbackLoadTableFunc: Identifier => Table = ident => + throw new IllegalStateException( + s"Non-Delta table $ident cannot be served via the Delta REST API path and no " + + "fallback catalog was configured.") + + /** + * Builds a [[UCDeltaCatalogClientImpl]] from catalog options. The `deltaRestApi.enabled` gate + * is the caller's responsibility ([[AbstractDeltaCatalogClient.fromCatalogOptionsIfEnabled]]). + * `fallbackLoadTableFunc` is invoked when UC reports `UnsupportedTableFormatException`. UC client + * construction is delegated to [[UCTokenBasedRestClientFactory]] with `renewCredential.enabled` + * defaulted to `true` and `credScopedFs.enabled` defaulted to `false` when not set. + */ + override def fromCatalogOptions( + catalogName: String, + options: CaseInsensitiveStringMap, + fallbackLoadTableFunc: Identifier => Table + ): UCDeltaCatalogClientImpl = { + // Pre-flight: keep our user-facing errors instead of the factory's less specific ones. + if (options.get(UriKey) == null) { + throw new IllegalArgumentException(s"'$UriKey' is required (catalog '$catalogName')") + } + validateAuthConfigured(options, catalogName) + + // `asCaseSensitiveMap()` preserves the user's original key case; `containsKey` is + // case-insensitive so defaults don't create duplicate keys. + val merged = new java.util.HashMap[String, String](options.asCaseSensitiveMap()) + Seq( + UCTokenBasedRestClientFactory.DELTA_REST_API_ENABLED_KEY -> "true", + UCTokenBasedRestClientFactory.RENEW_CREDENTIAL_ENABLED_KEY -> "true", + UCTokenBasedRestClientFactory.CRED_SCOPED_FS_ENABLED_KEY -> "false" + ).foreach { case (k, v) => if (!options.containsKey(k)) merged.put(k, v) } + val ucClient = UCTokenBasedRestClientFactory + .createUCClient(new CaseInsensitiveStringMap(merged)) + .asInstanceOf[UCDeltaClient] + + val sspEnabled = options.getBoolean(ServerSidePlanningEnabledKey, false) + new UCDeltaCatalogClientImpl(catalogName, ucClient, sspEnabled, fallbackLoadTableFunc) + } + + private val UriKey: String = "uri" + private val AuthPrefix: String = "auth." + private val LegacyTokenKey: String = "token" + + /** + * Pre-flight: ensure at least one of `auth.*` or legacy `token` is present, so the user + * sees a clear error (and catalog name) instead of the factory's internal failure when + * `TokenProvider.create` is handed an empty config. + */ + private[catalog] def validateAuthConfigured( + options: CaseInsensitiveStringMap, + catalogName: String): Unit = { + val hasAuthPrefix = options.entrySet().asScala.exists(_.getKey.startsWith(AuthPrefix)) + val hasLegacyToken = options.get(LegacyTokenKey) != null + if (!hasAuthPrefix && !hasLegacyToken) { + throw new IllegalArgumentException( + s"auth configuration is required when 'deltaRestApi.enabled' is true " + + s"(catalog '$catalogName'). Set either '${AuthPrefix}type' (with the corresponding " + + s"$AuthPrefix* keys) or the legacy '$LegacyTokenKey' option.") + } + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilder.scala b/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilder.scala index f9b752ba6e0..b291849fe8d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilder.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilder.scala @@ -18,12 +18,14 @@ package org.apache.spark.sql.delta.coordinatedcommits import java.net.{URI, URISyntaxException} import java.util.concurrent.ConcurrentHashMap +import java.util.function.Supplier import scala.collection.JavaConverters._ import scala.util.control.NonFatal import io.delta.storage.commit.CommitCoordinatorClient import io.delta.storage.commit.uccommitcoordinator.{UCClient, UCCommitCoordinatorClient, UCTokenBasedRestClient} +import org.apache.hadoop.conf.Configuration import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.metering.DeltaLogging @@ -31,6 +33,7 @@ import org.apache.spark.sql.delta.metering.DeltaLogging import io.unitycatalog.client.auth.TokenProvider import org.apache.spark.internal.MDC import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.Utils /** @@ -82,7 +85,7 @@ object UCCommitCoordinatorBuilder spark: SparkSession, catalogName: String): CommitCoordinatorClient = { val client = getCatalogConfigs(spark).find(_._1 == catalogName) match { - case Some((_, ucConfig)) => ucClientFactory.createUCClient(ucConfig) + case Some((_, ucConfig)) => ucClientFactory.createUCClient(ucConfig.asJava) case None => throw new IllegalArgumentException( s"Catalog $catalogName not found in the provided SparkSession configurations.") @@ -107,7 +110,7 @@ object UCCommitCoordinatorBuilder matchingConfigs match { case Nil => throw noMatchingCatalogException(metastoreId) - case ucConfig :: Nil => ucClientFactory.createUCClient(ucConfig) + case ucConfig :: Nil => ucClientFactory.createUCClient(ucConfig.asJava) case multiple => throw multipleMatchingCatalogs(metastoreId, multiple.map(_.getOrElse("uri", ""))) } @@ -125,7 +128,7 @@ object UCCommitCoordinatorBuilder val metastoreId = ucConfigToMetastoreIdCache.computeIfAbsent( ucConfig, _ => { - val ucClient = ucClientFactory.createUCClient(ucConfig) + val ucClient = ucClientFactory.createUCClient(ucConfig.asJava) try { ucClient.getMetastoreId } finally { @@ -246,7 +249,7 @@ object UCCommitCoordinatorBuilder /** Factory trait for creating [[UCClient]] instances from a unified configuration map. */ trait UCClientFactory { - def createUCClient(ucConfig: Map[String, String]): UCClient + def createUCClient(ucConfig: java.util.Map[String, String]): UCClient } /** @@ -288,21 +291,25 @@ object UCTokenBasedRestClientFactory extends UCClientFactory { final val AUTH_PREFIX = "auth." final val DELTA_REST_API_ENABLED_KEY = "deltaRestApi.enabled" final val APP_VERSIONS_PREFIX = "appVersions." + /** Opt-in: caller wants `UCDeltaTokenBasedRestClient` constructed with credential renewal. */ + final val RENEW_CREDENTIAL_ENABLED_KEY = "renewCredential.enabled" + /** Opt-in: caller wants `UCDeltaTokenBasedRestClient` constructed with cred-scoped FS. */ + final val CRED_SCOPED_FS_ENABLED_KEY = "credScopedFs.enabled" private val DEFAULT_UC_CLIENT_CLASS: String = classOf[UCTokenBasedRestClient].getName private val DELTA_UC_CLIENT_CLASS: String = "io.delta.storage.commit.uccommitcoordinator.UCDeltaTokenBasedRestClient" - override def createUCClient(ucConfig: Map[String, String]): UCClient = { - val uri = ucConfig.getOrElse(URI_KEY, + override def createUCClient(ucConfig: java.util.Map[String, String]): UCClient = { + val uri = Option(ucConfig.get(URI_KEY)).getOrElse( throw new IllegalArgumentException(s"UC config must contain '$URI_KEY'")) val authConfig = extractAuthConfig(ucConfig) val tokenProvider = TokenProvider.create(authConfig.asJava) val className = - if (ucConfig.get(DELTA_REST_API_ENABLED_KEY).exists(_.equalsIgnoreCase("true"))) { + if (Option(ucConfig.get(DELTA_REST_API_ENABLED_KEY)).exists(_.equalsIgnoreCase("true"))) { DELTA_UC_CLIENT_CLASS } else { DEFAULT_UC_CLIENT_CLASS @@ -312,14 +319,28 @@ object UCTokenBasedRestClientFactory extends UCClientFactory { require(classOf[UCClient].isAssignableFrom(cls), s"$className does not implement ${classOf[UCClient].getName}") val appVersions = extractAppVersions(ucConfig) + val renewCred = Option(ucConfig.get(RENEW_CREDENTIAL_ENABLED_KEY)).exists(_.toBoolean) + val credScopedFs = Option(ucConfig.get(CRED_SCOPED_FS_ENABLED_KEY)).exists(_.toBoolean) + val hadoopConfSupplier: Supplier[Configuration] = () => + SparkSession.getActiveSession + .map(_.sparkContext.hadoopConfiguration) + .getOrElse(new Configuration()) val ctor = cls.getConstructor( - classOf[String], classOf[TokenProvider], classOf[java.util.Map[_, _]]) - ctor.newInstance(uri, tokenProvider, appVersions.asJava).asInstanceOf[UCClient] - } - - /** Java-friendly overload that accepts a java.util.Map. */ - def createUCClient(ucConfig: java.util.Map[String, String]): UCClient = { - createUCClient(ucConfig.asScala.toMap) + classOf[String], + classOf[TokenProvider], + classOf[java.util.Map[_, _]], + java.lang.Boolean.TYPE, + java.lang.Boolean.TYPE, + classOf[Supplier[_]]) + ctor + .newInstance( + uri, + tokenProvider, + appVersions.asJava, + java.lang.Boolean.valueOf(renewCred), + java.lang.Boolean.valueOf(credScopedFs), + hadoopConfSupplier) + .asInstanceOf[UCClient] } /** @@ -327,16 +348,20 @@ object UCTokenBasedRestClientFactory extends UCClientFactory { * Prefers `auth.*` keys; falls back to legacy `token` key. */ private[coordinatedcommits] def extractAuthConfig( - ucConfig: Map[String, String]): Map[String, String] = { - val authConfig = ucConfig - .filterKeys(_.startsWith(AUTH_PREFIX)) - .map { case (k, v) => (k.stripPrefix(AUTH_PREFIX), v) } + ucConfig: java.util.Map[String, String]): Map[String, String] = { + val authPrefixLower = AUTH_PREFIX.toLowerCase(java.util.Locale.ROOT) + val authConfig = ucConfig.entrySet().asScala.iterator + .collect { + case e if e.getKey.toLowerCase(java.util.Locale.ROOT).startsWith(authPrefixLower) => + val suffix = e.getKey.substring(authPrefixLower.length) + suffix -> e.getValue + } .toMap if (authConfig.nonEmpty) { authConfig } else { - ucConfig.get("token") match { + Option(ucConfig.get("token")) match { case Some(token) => Map("type" -> "static", "token" -> token) case None => Map.empty } @@ -348,10 +373,13 @@ object UCTokenBasedRestClientFactory extends UCClientFactory { * Caller-supplied entries override defaults with the same key. */ private[coordinatedcommits] def extractAppVersions( - ucConfig: Map[String, String]): Map[String, String] = { - val extra = ucConfig - .filterKeys(_.startsWith(APP_VERSIONS_PREFIX)) - .map { case (k, v) => (k.stripPrefix(APP_VERSIONS_PREFIX), v) } + ucConfig: java.util.Map[String, String]): Map[String, String] = { + val appPrefixLower = APP_VERSIONS_PREFIX.toLowerCase(java.util.Locale.ROOT) + val extra = ucConfig.entrySet().asScala.iterator + .collect { + case e if e.getKey.toLowerCase(java.util.Locale.ROOT).startsWith(appPrefixLower) => + e.getKey.substring(appPrefixLower.length) -> e.getValue + } .toMap defaultAppVersions ++ extra } @@ -382,5 +410,5 @@ case class UCCatalogConfig(catalogName: String, ucConfig: Map[String, String]) { * Prefers `auth.*` keys; falls back to legacy `token` key. */ def authConfig: Map[String, String] = - UCTokenBasedRestClientFactory.extractAuthConfig(ucConfig) + UCTokenBasedRestClientFactory.extractAuthConfig(ucConfig.asJava) } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalogClientRoutingSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalogClientRoutingSuite.scala new file mode 100644 index 00000000000..25910d6ddaa --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalogClientRoutingSuite.scala @@ -0,0 +1,275 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.catalog + +import java.net.URI +import java.util +import java.util.{Collections, Optional, UUID} + +import io.delta.storage.commit.{Commit, GetCommitsResponse, TableIdentifier => StorageTableIdentifier} +import io.delta.storage.commit.actions.{AbstractMetadata, AbstractProtocol} +import io.delta.storage.commit.uccommitcoordinator.{UCClient, UCDeltaClient, UCDeltaModels} +import io.delta.storage.commit.uccommitcoordinator.UCDeltaModels.{DeltaProtocol, StagingTableInfo, TableInfo, TableType => UcTableType} +import io.delta.storage.commit.uccommitcoordinator.exceptions.CredentialFetchFailedException +import io.delta.storage.commit.uniform.UniformMetadata + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.catalog.CatalogTableType +import org.apache.spark.sql.connector.catalog.{Identifier, Table, V1Table} +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +/** + * Unit tests for the Delta REST API client wiring on [[AbstractDeltaCatalog]]. These verify + * only the catalog-option / initialize plumbing and the loadTable dispatch decision; they do + * not require a UC server. + */ +class AbstractDeltaCatalogClientRoutingSuite extends QueryTest with DeltaSQLCommandTest { + + private def options(kv: (String, String)*): CaseInsensitiveStringMap = { + val m = new util.HashMap[String, String]() + kv.foreach { case (k, v) => m.put(k, v) } + new CaseInsensitiveStringMap(m) + } + + test("deltaRestApi.enabled=false leaves deltaCatalogClient empty") { + val catalog = new AbstractDeltaCatalog + catalog.initialize("test_cat", options()) + assert(catalog.deltaCatalogClient.isEmpty, + "Delta REST API client should not be constructed when the catalog opts out") + } + + test("deltaRestApi.enabled=true requires uri") { + val catalog = new AbstractDeltaCatalog + val e = intercept[IllegalArgumentException] { + catalog.initialize("test_cat", options("deltaRestApi.enabled" -> "true")) + } + assert(e.getMessage.contains("'uri' is required")) + } + + test("deltaRestApi.enabled=true requires an auth configuration") { + val catalog = new AbstractDeltaCatalog + val e = intercept[IllegalArgumentException] { + catalog.initialize("test_cat", + options("deltaRestApi.enabled" -> "true", "uri" -> "http://uc")) + } + assert(e.getMessage.contains("auth configuration is required")) + } + + test("auth.* options are passed through to TokenProvider (new format)") { + val catalog = new AbstractDeltaCatalog + catalog.initialize("test_cat", + options( + "deltaRestApi.enabled" -> "true", + "uri" -> "http://uc", + "auth.type" -> "static", + "auth.token" -> "tok")) + assert(catalog.deltaCatalogClient.isDefined) + } + + test("deltaRestApi.enabled=true with uri+token constructs the Delta REST API client") { + val catalog = new AbstractDeltaCatalog + catalog.initialize("test_cat", + options("deltaRestApi.enabled" -> "true", "uri" -> "http://uc", "token" -> "tok")) + val client = catalog.deltaCatalogClient.getOrElse( + fail("Delta REST API client should be constructed when the catalog opts in")) + assert(client.isInstanceOf[UCDeltaCatalogClientImpl], + s"Delta REST API client should be UCDeltaCatalogClientImpl, was ${client.getClass}") + } + + test("AbstractDeltaCatalogClient.fromCatalogOptionsIfEnabled returns None when flag is off") { + val result = AbstractDeltaCatalogClient.fromCatalogOptionsIfEnabled( + "test_cat", options(), noFallback) + assert(result.isEmpty) + } + + test("AbstractDeltaCatalogClient.fromCatalogOptionsIfEnabled returns Some when flag is on") { + val result = AbstractDeltaCatalogClient.fromCatalogOptionsIfEnabled( + "test_cat", + options("deltaRestApi.enabled" -> "true", "uri" -> "http://uc", "token" -> "tok"), + noFallback) + assert(result.isDefined) + } + + private val noFallback: Identifier => Table = + _ => throw new UnsupportedOperationException("fallback not expected in this test") + + test("loadTable converts TableInfo to V1Table with catalog-supplied fields") { + val tableId = UUID.randomUUID() + val metadata = new AbstractMetadata { + override def getId: String = null + override def getName: String = "tbl" + override def getDescription: String = "a test table" + override def getProvider: String = "DELTA" + override def getFormatOptions: util.Map[String, String] = Collections.emptyMap() + override def getSchemaString: String = + """{"type":"struct","fields":[{"name":"id","type":"long","nullable":true,"metadata":{}}]}""" + override def getPartitionColumns: util.List[String] = Collections.emptyList() + override def getConfiguration: util.Map[String, String] = + util.Map.of("ucTableId", tableId.toString, "delta.feature.x", "supported") + override def getCreatedTime: java.lang.Long = 42L + } + val info = new TableInfo( + tableId, + UCDeltaModels.TableType.EXTERNAL, + "s3://bucket/table", + metadata, + util.Map.of("fs.s3a.access.key", "key")) + + val client = new UCDeltaCatalogClientImpl( + catalogName = "main", + ucClient = new StubUCDeltaClient(info)) + + val table = client.loadTable(Identifier.of(Array("sch"), "tbl")) + val v1 = table.asInstanceOf[V1Table].catalogTable + assert(v1.identifier.table === "tbl") + assert(v1.identifier.database === Some("sch")) + assert(v1.identifier.catalog === Some("main")) + assert(v1.tableType === CatalogTableType.EXTERNAL) + assert(v1.storage.locationUri.map(_.toString) === Some("s3://bucket/table")) + assert(v1.provider === Some("delta")) + assert(v1.comment === Some("a test table")) + assert(v1.createTime === 42L) + assert(!v1.tracksPartitionsInCatalog) + assert(v1.schema.fieldNames.toSeq === Seq("id")) + val merged = v1.storage.properties + assert(merged.get("ucTableId") === Some(tableId.toString)) + assert(merged.get("fs.s3a.access.key") === Some("key")) + } + + test("loadTable falls back to SSP on CredentialFetchFailedException when SSP is enabled") { + val tableId = UUID.randomUUID() + val metadata = new AbstractMetadata { + override def getId: String = null + override def getName: String = "tbl" + override def getDescription: String = null + override def getProvider: String = "DELTA" + override def getFormatOptions: util.Map[String, String] = Collections.emptyMap() + override def getSchemaString: String = + """{"type":"struct","fields":[{"name":"id","type":"long","nullable":true,"metadata":{}}]}""" + override def getPartitionColumns: util.List[String] = Collections.emptyList() + // No credential properties; this is the "without credentials" TableInfo. + override def getConfiguration: util.Map[String, String] = Collections.emptyMap() + override def getCreatedTime: java.lang.Long = 0L + } + val tableInfoNoCreds = new TableInfo( + tableId, + UCDeltaModels.TableType.EXTERNAL, + "s3://bucket/no-creds-table", + metadata, + Collections.emptyMap()) // no storage properties either + val credEx = new CredentialFetchFailedException( + "creds exhausted", new RuntimeException("simulated"), tableInfoNoCreds) + + val client = new UCDeltaCatalogClientImpl( + catalogName = "main", + ucClient = new StubUCDeltaClient(throw credEx), + serverSidePlanningEnabled = true) + + // Capture and restore the SSP conf so this test doesn't leak into others. + val sspKey = DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key + val originalSsp = spark.conf.getOption(sspKey) + spark.conf.unset(sspKey) + try { + val table = client.loadTable(Identifier.of(Array("sch"), "tbl")) + val v1 = table.asInstanceOf[V1Table].catalogTable + assert(v1.identifier.table === "tbl") + assert(v1.storage.locationUri.map(_.toString) === Some("s3://bucket/no-creds-table")) + assert(v1.storage.properties.isEmpty, + s"no credentials should be set on the V1Table; got ${v1.storage.properties}") + // The fallback path must have flipped SSP on. + assert(spark.conf.get(sspKey) === "true", + "Server-side planning conf should be set after CredentialFetchFailedException fallback") + } finally { + originalSsp match { + case Some(value) => spark.conf.set(sspKey, value) + case None => spark.conf.unset(sspKey) + } + } + } + + test("loadTable without serverSidePlanningEnabled rethrows CredentialFetchFailedException") { + val ex = new CredentialFetchFailedException( + "creds exhausted", new RuntimeException("simulated"), null) + val client = new UCDeltaCatalogClientImpl( + catalogName = "main", + ucClient = new StubUCDeltaClient(throw ex), + serverSidePlanningEnabled = false) + val thrown = intercept[CredentialFetchFailedException] { + client.loadTable(Identifier.of(Array("sch"), "tbl")) + } + assert(thrown eq ex) + } +} + +/** + * Returns the result of {@code loadTableResult} (a by-name parameter) from + * {@code loadTable}; throws on every other method. Pass a [[TableInfo]] to get a successful + * load, or {@code throw new ...} to simulate UC-side failures. + * + *

Because {@code loadTableResult} is by-name, the body re-evaluates on every + * {@code loadTable} invocation: a {@code throw} expression re-throws each time; a + * {@link TableInfo} reference is rebound (cheap). For tests that need to vary the result + * across calls, replace this with a {@code Supplier}-shaped constructor. + */ +private class StubUCDeltaClient(loadTableResult: => TableInfo) extends UCDeltaClient { + override def getMetastoreId(): String = throw new UnsupportedOperationException + override def loadTable(tableIdentifier: StorageTableIdentifier): TableInfo = loadTableResult + override def createStagingTable( + catalog: String, schema: String, table: String): StagingTableInfo = + throw new UnsupportedOperationException + override def createTable( + catalog: String, + schema: String, + name: String, + location: String, + tableType: UcTableType, + comment: String, + partitionColumns: util.List[String], + protocol: DeltaProtocol, + properties: util.Map[String, String]): AbstractMetadata = + throw new UnsupportedOperationException + override def commit( + tableId: String, + tableUri: URI, + tableIdentifier: StorageTableIdentifier, + commit: Optional[Commit], + lastKnownBackfilledVersion: Optional[java.lang.Long], + oldMetadata: Optional[AbstractMetadata], + newMetadata: Optional[AbstractMetadata], + oldProtocol: Optional[AbstractProtocol], + newProtocol: Optional[AbstractProtocol], + uniform: Optional[UniformMetadata]): Unit = + throw new UnsupportedOperationException + override def getCommits( + tableId: String, + tableUri: URI, + tableIdentifier: StorageTableIdentifier, + startVersion: Optional[java.lang.Long], + endVersion: Optional[java.lang.Long]): GetCommitsResponse = + throw new UnsupportedOperationException + override def finalizeCreate( + tableName: String, + catalogName: String, + schemaName: String, + storageLocation: String, + columns: util.List[UCClient.ColumnDef], + properties: util.Map[String, String]): Unit = + throw new UnsupportedOperationException + override def close(): Unit = () +} diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilderSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilderSuite.scala index a073a70a625..1da0da4eead 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilderSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilderSuite.scala @@ -16,6 +16,8 @@ package org.apache.spark.sql.delta.coordinatedcommits +import scala.collection.JavaConverters._ + import io.delta.storage.commit.uccommitcoordinator.{UCClient, UCCommitCoordinatorClient} import org.mockito.{Mock, Mockito} import org.mockito.ArgumentMatchers.{any, eq => meq} @@ -50,9 +52,9 @@ class UCCommitCoordinatorBuilderSuite extends SparkFunSuite with SharedSparkSess * parsing: all sub-keys under spark.sql.catalog..* * with the prefix stripped. Includes `uri` when present. */ - def expectedUcConfig: Map[String, String] = { + def expectedUcConfig: java.util.Map[String, String] = { val base = configMap - uri.map(u => base + ("uri" -> u)).getOrElse(base) + uri.map(u => base + ("uri" -> u)).getOrElse(base).asJava } } @@ -241,7 +243,7 @@ class UCCommitCoordinatorBuilderSuite extends SparkFunSuite with SharedSparkSess val metastoreId = "shared-metastore-id" val sharedUri = "https://shared-test-uri.com" val sharedConfigMap = Map("type" -> "static", "token" -> "shared-test-token") - val sharedUcConfig = sharedConfigMap + ("uri" -> sharedUri) + val sharedUcConfig = (sharedConfigMap + ("uri" -> sharedUri)).asJava val catalog1 = CatalogTestConfig( name = "catalog1", uri = Some(sharedUri), @@ -292,7 +294,7 @@ class UCCommitCoordinatorBuilderSuite extends SparkFunSuite with SharedSparkSess } private def registerMetastoreId( - ucConfig: Map[String, String], + ucConfig: java.util.Map[String, String], metastoreId: String): Unit = { val mockClient = org.mockito.Mockito.mock(classOf[UCClient]) when(mockClient.getMetastoreId).thenReturn(metastoreId) @@ -300,7 +302,7 @@ class UCCommitCoordinatorBuilderSuite extends SparkFunSuite with SharedSparkSess } private def registerMetastoreIdException( - ucConfig: Map[String, String], + ucConfig: java.util.Map[String, String], exception: Throwable): Unit = { val mockClient = org.mockito.Mockito.mock(classOf[UCClient]) when(mockClient.getMetastoreId).thenThrow(exception) @@ -461,7 +463,7 @@ class UCCommitCoordinatorBuilderSuite extends SparkFunSuite with SharedSparkSess "auth.type" -> "static", "auth.token" -> "new-token" ) - val auth = UCTokenBasedRestClientFactory.extractAuthConfig(ucConfig) + val auth = UCTokenBasedRestClientFactory.extractAuthConfig(ucConfig.asJava) assert(auth("type") == "static") assert(auth("token") == "new-token") } @@ -471,7 +473,7 @@ class UCCommitCoordinatorBuilderSuite extends SparkFunSuite with SharedSparkSess "uri" -> "https://test.com", "token" -> "legacy-token" ) - val auth = UCTokenBasedRestClientFactory.extractAuthConfig(ucConfig) + val auth = UCTokenBasedRestClientFactory.extractAuthConfig(ucConfig.asJava) assert(auth("type") == "static") assert(auth("token") == "legacy-token") } @@ -490,7 +492,7 @@ class UCCommitCoordinatorBuilderSuite extends SparkFunSuite with SharedSparkSess assert(result.isInstanceOf[UCCommitCoordinatorClient]) verify(mockFactory).createUCClient( - any[Map[String, String]]() + any[java.util.Map[String, String]]() ) } } @@ -510,7 +512,7 @@ class UCCommitCoordinatorBuilderSuite extends SparkFunSuite with SharedSparkSess assert(result.isInstanceOf[UCCommitCoordinatorClient]) verify(mockFactory).createUCClient( - any[Map[String, String]]() + any[java.util.Map[String, String]]() ) } } @@ -521,7 +523,7 @@ class UCCommitCoordinatorBuilderSuite extends SparkFunSuite with SharedSparkSess "appVersions.Kernel" -> "0.7.0", "appVersions.Delta V2 connector" -> "true" ) - val versions = UCTokenBasedRestClientFactory.extractAppVersions(ucConfig) + val versions = UCTokenBasedRestClientFactory.extractAppVersions(ucConfig.asJava) assert(versions("Delta") === io.delta.VERSION) assert(versions("Spark") === org.apache.spark.SPARK_VERSION) assert(versions("Kernel") === "0.7.0") diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorClientSuiteBase.scala b/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorClientSuiteBase.scala index 1653fdac1e4..a99fefb1c9e 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorClientSuiteBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorClientSuiteBase.scala @@ -84,7 +84,7 @@ trait UCCommitCoordinatorClientSuiteBase extends CommitCoordinatorClientImplSuit CommitCoordinatorProvider.registerBuilder(UCCommitCoordinatorBuilder) ucCommitCoordinator = new InMemoryUCCommitCoordinator() ucClient = new InMemoryUCClient(metastoreId.toString, ucCommitCoordinator) - when(mockFactory.createUCClient(any[Map[String, String]]())).thenReturn(ucClient) + when(mockFactory.createUCClient(any[java.util.Map[String, String]]())).thenReturn(ucClient) } override protected def createTableCommitCoordinatorClient( deltaLog: DeltaLog): TableCommitCoordinatorClient = { diff --git a/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableIntegrationBaseTest.java b/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableIntegrationBaseTest.java index 0139e56c093..2f6d97d43e8 100644 --- a/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableIntegrationBaseTest.java +++ b/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableIntegrationBaseTest.java @@ -32,10 +32,13 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.hadoop.fs.Path; +import org.apache.log4j.Logger; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.delta.catalog.UCDeltaCatalogClientImpl; +import org.apache.spark.sql.delta.coordinatedcommits.UCTokenBasedRestClientFactory; import org.assertj.core.api.ThrowableAssert.ThrowingCallable; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -142,9 +145,76 @@ private SparkConf configureSparkWithUnityCatalog(SparkConf conf) { // Set the catalog specific configs. UnityCatalogInfo uc = unityCatalogInfo(); String catalogName = uc.catalogName(); - return conf.set("spark.sql.catalog." + catalogName, "io.unitycatalog.spark.UCSingleCatalog") - .set("spark.sql.catalog." + catalogName + ".uri", uc.serverUri()) - .set("spark.sql.catalog." + catalogName + ".token", uc.serverToken()); + conf = + conf.set("spark.sql.catalog." + catalogName, "io.unitycatalog.spark.UCSingleCatalog") + .set("spark.sql.catalog." + catalogName + ".uri", uc.serverUri()) + .set("spark.sql.catalog." + catalogName + ".token", uc.serverToken()); + if (useDeltaRestApiForTests()) { + conf = + conf.set( + "spark.sql.catalog." + + catalogName + + "." + + UCTokenBasedRestClientFactory.DELTA_REST_API_ENABLED_KEY(), + "true"); + } + return conf; + } + + /** Subclasses can override to false for A/B comparison with the legacy path. */ + protected boolean useDeltaRestApiForTests() { + // TODO: turn this on once the Delta API is fully integrated. + return false; + } + + /** + * Whether the class-level @AfterAll should assert that the Delta REST API actually served at + * least one load. Override to false in classes that intentionally exercise only the fallback path + * (which does NOT bump the successfulDeltaRestApiLoads counter), so the class-level check doesn't + * false-positive when test sharding distributes methods across CI shards. + */ + protected boolean expectDeltaRestApiSuccessAtClassLevel() { + return true; + } + + private static final Logger LOG = Logger.getLogger(UCDeltaTableIntegrationBaseTest.class); + + private long deltaRestApiLoadsAtClassStart; + private long loadTableInvocationsAtClassStart; + + @BeforeAll + public void captureDeltaRestApiBaseline() { + deltaRestApiLoadsAtClassStart = + UCDeltaCatalogClientImpl.successfulDeltaRestApiLoadsForTesting(); + loadTableInvocationsAtClassStart = UCDeltaCatalogClientImpl.loadTableInvocationsForTesting(); + } + + @AfterAll + public void verifyDeltaRestApiExercisedAtClassLevel() { + if (!useDeltaRestApiForTests() || !expectDeltaRestApiSuccessAtClassLevel()) { + return; + } + long loadInvocationsAfter = UCDeltaCatalogClientImpl.loadTableInvocationsForTesting(); + if (loadInvocationsAfter <= loadTableInvocationsAtClassStart) { + // Every test in the suite was aborted (e.g. via Assumption.assumeTrue) before any + // loadTable call ran, so there is nothing to assert about the Delta REST API path. + return; + } + long after = UCDeltaCatalogClientImpl.successfulDeltaRestApiLoadsForTesting(); + if (after <= deltaRestApiLoadsAtClassStart) { + throw new AssertionError( + "Suite finished but no UCDeltaCatalogClientImpl.loadTable call actually returned a " + + "Delta table via the Delta REST API. deltaRestApi.enabled is on but every " + + "load either fell back to the legacy delegate or threw. baseline=" + + deltaRestApiLoadsAtClassStart + + ", after=" + + after); + } + LOG.info( + "[delta-api] " + + getClass().getSimpleName() + + " successful Delta REST API loads: " + + (after - deltaRestApiLoadsAtClassStart)); } /** Stop the SparkSession after all tests. */ @@ -202,6 +272,18 @@ protected void check(String tableName, List> expected) { getSqlExecutor().checkWithSQL("SELECT * FROM " + tableName + " ORDER BY 1", expected); } + /** + * Verify that {@code actual} equals {@code expected}, with an error message that includes both. + * Use this overload when the caller has already run the query and just needs to compare the row + * list (e.g. queries that aren't a plain {@code SELECT *}). + */ + protected void check(List> actual, List> expected) { + if (!actual.equals(expected)) { + throw new AssertionError( + String.format("Query results do not match.\nExpected: %s\nActual: %s", expected, actual)); + } + } + /** Helper method to run code with a temporary directory that gets cleaned up. */ protected void withTempDir(TempDirCode code) throws Exception { UnityCatalogInfo uc = unityCatalogInfo(); diff --git a/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableNonDeltaFallbackTest.java b/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableNonDeltaFallbackTest.java new file mode 100644 index 00000000000..213f9d269df --- /dev/null +++ b/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableNonDeltaFallbackTest.java @@ -0,0 +1,108 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sparkuctest; + +import java.util.List; +import org.apache.hadoop.fs.Path; +import org.apache.spark.sql.delta.catalog.UCDeltaCatalogClientImpl; +import org.junit.jupiter.api.Test; + +/** + * Integration test for the non-Delta fallback path in {@code UCDeltaCatalogClientImpl}. + * + *

When {@code deltaRestApi.enabled=true}, every {@code loadTable} call first asks the Delta REST + * API. If the target table is not in Delta format, UC returns {@code + * UnsupportedTableFormatException}; the client must then fall back to the legacy {@code + * TableCatalog} delegate so the non-Delta table is still readable. + * + *

This class disables the class-level "Delta REST API served at least one load" assertion (see + * {@link #expectDeltaRestApiSuccessAtClassLevel()}) because its tests intentionally exercise only + * the fallback path, which does not bump the successfulDeltaRestApiLoads counter. CI sharding also + * makes a same-file "sanity" Delta test unreliable: methods can be distributed across shards, so + * each shard's @AfterAll runs without a guarantee of seeing both methods. + */ +public class UCDeltaTableNonDeltaFallbackTest extends UCDeltaTableIntegrationBaseTest { + + @Override + protected boolean useDeltaRestApiForTests() { + return true; + } + + @Override + protected boolean expectDeltaRestApiSuccessAtClassLevel() { + return false; + } + + @Test + public void testLoadNonDeltaParquetExternalTableFallsBackToLegacyCatalog() throws Exception { + String tableName = "non_delta_parquet_fallback"; + String fullTableName = fullTableName(tableName); + withTempDir( + (Path dir) -> { + Path tablePath = new Path(dir, tableName); + sql("DROP TABLE IF EXISTS %s", fullTableName); + try { + // Create a non-Delta EXTERNAL Parquet table. UC accepts external non-Delta tables; + // managed non-Delta tables are rejected upstream, so EXTERNAL is the only shape + // that reaches the loadTable fallback path. + sql( + "CREATE TABLE %s (id INT, name STRING) USING parquet LOCATION '%s'", + fullTableName, tablePath); + sql("INSERT INTO %s VALUES (1, 'alpha'), (2, 'beta'), (3, 'gamma')", fullTableName); + + long invocationsBefore = UCDeltaCatalogClientImpl.loadTableInvocationsForTesting(); + long successesBefore = UCDeltaCatalogClientImpl.successfulDeltaRestApiLoadsForTesting(); + + // The Delta REST API path runs first: ucClient.loadTable -> UC server returns + // UnsupportedTableFormatException (table isn't Delta-format) -> the catch handler + // calls fallbackLoadTableFunc(ident) which is super.loadTable from AbstractDeltaCatalog + // (i.e. the legacy DelegatingCatalogExtension delegate). The SELECT below succeeds + // only if that fallback hands back a usable V1 table for the Parquet data. + List> rows = sql("SELECT id, name FROM %s ORDER BY id", fullTableName); + check( + rows, List.of(List.of("1", "alpha"), List.of("2", "beta"), List.of("3", "gamma"))); + + // Counter delta: loadTable was invoked (catch handler ran) but the Delta REST API + // did NOT successfully serve the load. A future regression that silently returned + // a (wrong) Delta table from the REST path would bump the success counter and fail + // this assertion, even though the row data check above would also still pass. + long invocationsAfter = UCDeltaCatalogClientImpl.loadTableInvocationsForTesting(); + long successesAfter = UCDeltaCatalogClientImpl.successfulDeltaRestApiLoadsForTesting(); + if (invocationsAfter <= invocationsBefore) { + throw new AssertionError( + "Expected loadTableInvocations to increase during the SELECT, but it did not" + + " (before=" + + invocationsBefore + + ", after=" + + invocationsAfter + + ")"); + } + if (successesAfter != successesBefore) { + throw new AssertionError( + "Expected successfulDeltaRestApiLoads to be unchanged (fallback path took" + + " over), but it changed (before=" + + successesBefore + + ", after=" + + successesAfter + + ")"); + } + } finally { + sql("DROP TABLE IF EXISTS %s", fullTableName); + } + }); + } +} diff --git a/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableReadTest.java b/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableReadTest.java index 66a9169ff35..8520b6e9c4f 100644 --- a/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableReadTest.java +++ b/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableReadTest.java @@ -134,11 +134,4 @@ public void testDeltaTableForPath(TableType tableType) throws Exception { } }); } - - private void check(List> actual, List> expected) { - if (!actual.equals(expected)) { - throw new AssertionError( - String.format("Query results do not match.\nExpected: %s\nActual: %s", expected, actual)); - } - } } diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClient.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClient.java index 4a385cd9338..53c8be864dd 100644 --- a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClient.java +++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClient.java @@ -46,12 +46,14 @@ import io.unitycatalog.client.model.DataSourceFormat; import io.unitycatalog.client.model.GetMetastoreSummaryResponse; import io.unitycatalog.client.model.TableType; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import java.io.IOException; import java.net.URI; import java.util.*; +import java.util.function.Supplier; /** * A REST client implementation of {@link UCClient} for interacting with Unity Catalog's commit @@ -129,6 +131,22 @@ public UCTokenBasedRestClient( this.tablesApi = new TablesApi(apiClient); } + /** + * 6-arg constructor for symmetry with {@link UCDeltaTokenBasedRestClient}. The + * {@code credentialRenewalEnabled}, {@code credentialScopedFsEnabled}, and + * {@code hadoopConfSupplier} parameters are not used by this client and are accepted only so + * that callers can construct either client uniformly by reflection. + */ + public UCTokenBasedRestClient( + String baseUri, + TokenProvider tokenProvider, + Map appVersions, + boolean credentialRenewalEnabled, + boolean credentialScopedFsEnabled, + Supplier hadoopConfSupplier) { + this(baseUri, tokenProvider, appVersions); + } + /** * Ensures the client has not been closed. Must be called before any API operation. */ diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/exceptions/CredentialFetchFailedException.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/exceptions/CredentialFetchFailedException.java index 35742005bd3..ecdb7a4e9ba 100644 --- a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/exceptions/CredentialFetchFailedException.java +++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/exceptions/CredentialFetchFailedException.java @@ -18,13 +18,12 @@ import io.delta.storage.commit.uccommitcoordinator.UCDeltaClient; import io.delta.storage.commit.uccommitcoordinator.UCDeltaModels.TableInfo; -import java.io.IOException; /** * Thrown by {@link UCDeltaClient} when credential vending exhausts retries. Carries a * cred-less {@link TableInfo} so callers with a fallback (e.g. SSP) can recover. */ -public class CredentialFetchFailedException extends IOException { +public class CredentialFetchFailedException extends RuntimeException { private final TableInfo tableInfoWithoutCredentials; diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/exceptions/NoSuchTableException.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/exceptions/NoSuchTableException.java index c2f3d7c3d33..6f45a774255 100644 --- a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/exceptions/NoSuchTableException.java +++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/exceptions/NoSuchTableException.java @@ -16,13 +16,11 @@ package io.delta.storage.commit.uccommitcoordinator.exceptions; -import java.io.IOException; - /** * Thrown by {@link io.delta.storage.commit.uccommitcoordinator.UCDeltaClient} operations when the * catalog reports that the requested table does not exist (HTTP 404). */ -public class NoSuchTableException extends IOException { +public class NoSuchTableException extends RuntimeException { public NoSuchTableException(String message) { super(message); } diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/exceptions/UnsupportedTableFormatException.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/exceptions/UnsupportedTableFormatException.java index 02bedc8a0a9..329a2e125de 100644 --- a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/exceptions/UnsupportedTableFormatException.java +++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/exceptions/UnsupportedTableFormatException.java @@ -16,14 +16,12 @@ package io.delta.storage.commit.uccommitcoordinator.exceptions; -import java.io.IOException; - /** * Thrown when the catalog refuses to serve a non-Delta table. Callers should fall back to a * non-Delta-REST-API load path. Emitted as HTTP 400 with {@code error.type = * "UnsupportedTableFormatException"} on the wire. */ -public class UnsupportedTableFormatException extends IOException { +public class UnsupportedTableFormatException extends RuntimeException { public UnsupportedTableFormatException(String message) { super(message); }