Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import org.apache.spark.sql.execution.datasources.{DataSource, PartitioningUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.InsertableRelation
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap


/**
Expand All @@ -83,6 +84,22 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension

val spark = SparkSession.active

/**
* When defined, table operations are routed through this client instead of through the
* [[org.apache.spark.sql.connector.catalog.DelegatingCatalogExtension]] delegate that
* `AbstractDeltaCatalog` normally relies on. This lets the catalog inject custom
* interactions (e.g. talking to a REST endpoint, catalog-specific property handling,
* storage-credential vending) rather than going through the Spark
* [[org.apache.spark.sql.connector.catalog.TableCatalog]] API.
*/
private[catalog] var deltaCatalogClient: Option[AbstractDeltaCatalogClient] = None

override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {
super.initialize(name, options)
deltaCatalogClient =
AbstractDeltaCatalogClient.fromCatalogOptionsIfEnabled(name, options, super.loadTable)
}

private lazy val isUnityCatalog: Boolean = {
val delegateField = classOf[DelegatingCatalogExtension].getDeclaredField("delegate")
delegateField.setAccessible(true)
Expand Down Expand Up @@ -290,7 +307,9 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension
"DeltaCatalog", "loadTable") {
setVariantBlockingConfigIfUC()
try {
val table = super.loadTable(ident)
val table = deltaCatalogClient
.map(_.loadTable(ident))
.getOrElse(super.loadTable(ident))

ServerSidePlannedTable.tryCreate(spark, ident, table, isUnityCatalog).foreach { sspt =>
return sspt
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright (2026) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.catalog

import org.apache.spark.internal.Logging
import org.apache.spark.sql.connector.catalog.{Identifier, Table}
import org.apache.spark.sql.delta.coordinatedcommits.UCTokenBasedRestClientFactory
import org.apache.spark.sql.util.CaseInsensitiveStringMap

/**
* Backend hook through which [[AbstractDeltaCatalog]] injects custom catalog interactions
* that bypass the catalog operations normally provided by Spark's
* [[org.apache.spark.sql.connector.catalog.TableCatalog]] interface (the
* [[org.apache.spark.sql.connector.catalog.DelegatingCatalogExtension]] delegate that
* `AbstractDeltaCatalog` extends). Concrete implementations route table operations to a
* catalog-specific path, e.g. talking directly to a REST endpoint instead of the
* configured delegate, applying catalog-specific table-property handling, or vending
* storage credentials on the returned [[Table]]. Keeping these behind a client interface
* isolates that plumbing from `AbstractDeltaCatalog`.
*/
private[catalog] trait AbstractDeltaCatalogClient {

/**
* @throws org.apache.spark.sql.catalyst.analysis.NoSuchTableException if the catalog has
* no record of this identifier
*/
def loadTable(ident: Identifier): Table
}

/** Builds a [[AbstractDeltaCatalogClient]] from catalog options. */
private[catalog] trait AbstractDeltaCatalogClientFactory {
def fromCatalogOptions(
catalogName: String,
options: CaseInsensitiveStringMap,
fallbackLoadTableFunc: Identifier => Table): AbstractDeltaCatalogClient
}

private[catalog] object AbstractDeltaCatalogClient extends Logging {

private val UC_DELTA_CATALOG_CLIENT_IMPL_CLASS_NAME: String =
"org.apache.spark.sql.delta.catalog.UCDeltaCatalogClientImpl"

/**
* Returns a [[AbstractDeltaCatalogClient]] wrapped in [[Some]] when the catalog opted in via
* `deltaRestApi.enabled`, else [[None]]. The concrete impl is loaded reflectively so
* [[AbstractDeltaCatalog]] doesn't compile-depend on it. If opt-in is explicit but reflective
* loading fails, throws [[IllegalStateException]] rather than silently degrading.
*/
def fromCatalogOptionsIfEnabled(
catalogName: String,
options: CaseInsensitiveStringMap,
fallbackLoadTableFunc: Identifier => Table): Option[AbstractDeltaCatalogClient] = {
val key = UCTokenBasedRestClientFactory.DELTA_REST_API_ENABLED_KEY
if (!options.getBoolean(key, false)) {
return None
}
val factory = try {
// scalastyle:off classforname
val cls = Class.forName(UC_DELTA_CATALOG_CLIENT_IMPL_CLASS_NAME + "$")
// scalastyle:on classforname
cls.getField("MODULE$").get(null).asInstanceOf[AbstractDeltaCatalogClientFactory]
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the meaning of getField("MODULE$").get(null) ?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean what's the meaning of the null here ? seems weird.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just reading the javadoc of the Field.java

/**
     * Returns the value of the field represented by this {@code Field}, on
     * the specified object. The value is automatically wrapped in an
     * object if it has a primitive type.
     *
     * <p>The underlying field's value is obtained as follows:
     *
     * <p>If the underlying field is a static field, the {@code obj} argument
     * is ignored; it may be null.
     *
     * <p>Otherwise, the underlying field is an instance field.  If the
     * specified {@code obj} argument is null, the method throws a
     * {@code NullPointerException}. If the specified object is not an
     * instance of the class or interface declaring the underlying
     * field, the method throws an {@code IllegalArgumentException}.
     *
     * <p>If this {@code Field} object is enforcing Java language access control, and
     * the underlying field is inaccessible, the method throws an
     * {@code IllegalAccessException}.
     * If the underlying field is static, the class that declared the
     * field is initialized if it has not already been initialized.
     *
     * <p>Otherwise, the value is retrieved from the underlying instance
     * or static field.  If the field has a primitive type, the value
     * is wrapped in an object before being returned, otherwise it is
     * returned as is.
     *
     * <p>If the field is hidden in the type of {@code obj},
     * the field's value is obtained according to the preceding rules.
     *
     * @param obj object from which the represented field's value is
     * to be extracted
     * @return the value of the represented field in object
     * {@code obj}; primitive values are wrapped in an appropriate
     * object before being returned
     *
     * @throws    IllegalAccessException    if this {@code Field} object
     *              is enforcing Java language access control and the underlying
     *              field is inaccessible.
     * @throws    IllegalArgumentException  if the specified object is not an
     *              instance of the class or interface declaring the underlying
     *              field (or a subclass or implementor thereof).
     * @throws    NullPointerException      if the specified object is null
     *              and the field is an instance field.
     * @throws    ExceptionInInitializerError if the initialization provoked
     *              by this method fails.
     */
    @CallerSensitive
    @ForceInline // to ensure Reflection.getCallerClass optimization
    public Object get(Object obj)
        throws IllegalArgumentException, IllegalAccessException
    {
        if (!override) {
            Class<?> caller = Reflection.getCallerClass();
            checkAccess(caller, obj);
            return getFieldAccessor().get(obj);
        } else {
            return getOverrideFieldAccessor().get(obj);
        }
    }

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, because you are accessing the object UCDeltaCatalogClientImpl as a static class.

According to this key comment:

* <p>If the underlying field is a static field, the {@code obj} argument
     * is ignored; it may be null.
     *
     * <p>Otherwise, the underlying field is an instance field.  If the
     * specified {@code obj} argument is null, the method throws a
     * {@code NullPointerException}.

} catch {
case e: Exception =>
throw new IllegalStateException(
s"Failed to load $UC_DELTA_CATALOG_CLIENT_IMPL_CLASS_NAME though '$key' is true. " +
"Ensure the implementation JAR is on the classpath, or remove " +
s"'$key' from the catalog options to fall back to the legacy delegate.", e)
}
Some(factory.fromCatalogOptions(catalogName, options, fallbackLoadTableFunc))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
/*
* Copyright (2026) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.catalog

import java.net.URI
import java.util.concurrent.atomic.AtomicLong

import scala.jdk.CollectionConverters._

import io.delta.storage.commit.{TableIdentifier => StorageTableIdentifier}
import io.delta.storage.commit.uccommitcoordinator.{UCDeltaClient, UCDeltaModels}
import io.delta.storage.commit.uccommitcoordinator.UCDeltaModels.TableInfo
import io.delta.storage.commit.uccommitcoordinator.exceptions.{
CredentialFetchFailedException,
UnsupportedTableFormatException,
NoSuchTableException => StorageNoSuchTableException
}

import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.catalog.{
CatalogStorageFormat,
CatalogTable,
CatalogTableType
}
import org.apache.spark.sql.connector.catalog.{Identifier, Table, V1Table}
import org.apache.spark.sql.delta.coordinatedcommits.UCTokenBasedRestClientFactory
import org.apache.spark.sql.delta.logging.DeltaLogKeys
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap

/**
* [[AbstractDeltaCatalogClient]] backed by a [[UCDeltaClient]]; translates between
* Spark/Delta types and the storage-side UC types.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

define the fallbackLoadTable param

*/
private[catalog] class UCDeltaCatalogClientImpl(
catalogName: String,
ucClient: UCDeltaClient,
serverSidePlanningEnabled: Boolean = false,
fallbackLoadTableFunc: Identifier => Table
= UCDeltaCatalogClientImpl.defaultFallbackLoadTableFunc)
extends AbstractDeltaCatalogClient with Logging {

override def loadTable(ident: Identifier): Table = {
UCDeltaCatalogClientImpl.loadTableInvocationsCounter.incrementAndGet()
val tid = toStorageTableIdent(ident)
val info =
try ucClient.loadTable(tid)
catch {
case _: StorageNoSuchTableException => throw new NoSuchTableException(ident)
case e: UnsupportedTableFormatException =>
logInfo(log"Table ${MDC(DeltaLogKeys.TABLE_NAME, ident)} is not in Delta format; " +
log"falling back to the legacy catalog path. Cause: " +
log"${MDC(DeltaLogKeys.EXCEPTION, e.getMessage)}")
return fallbackLoadTableFunc(ident)
case e: CredentialFetchFailedException if serverSidePlanningEnabled =>
logWarning(log"Credential fetch failed for " +
log"${MDC(DeltaLogKeys.TABLE_NAME, fullQualifiedTableName(tid))}; enabling " +
log"server-side planning fallback. Cause: " +
log"${MDC(DeltaLogKeys.EXCEPTION, e.getMessage)}")
enableServerSidePlanningConfig(ident)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AI Generated] Two concerns here:

  1. Side-effect at a distance: spark.conf.set(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key, "true") silently mutates global session state during a loadTable call. This is surprising to callers and could affect unrelated tables loaded in the same session. Is this intentional? If SSP should only apply per-table, this approach is incorrect.

  2. Inconsistent logging: logWarning(s"Credential fetch failed for ...") uses unstructured string interpolation. All other log calls in this class use the structured log"..." + MDC(...) pattern. Please align for consistency.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same behavior of UCSingleCatalog.

e.getTableInfoWithoutCredentials
}
UCDeltaCatalogClientImpl.successfulDeltaRestApiLoadsCounter.incrementAndGet()
toV1Table(ident, info)
}

private def enableServerSidePlanningConfig(ident: Identifier): Unit = {
SparkSession.getActiveSession match {
case Some(spark) =>
spark.conf.set(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key, "true")
logInfo(log"Server-side planning enabled for table " +
log"${MDC(DeltaLogKeys.TABLE_NAME, ident)}; Delta will read via SSP with empty creds.")
case None =>
logWarning(log"Server-side planning requested for table " +
log"${MDC(DeltaLogKeys.TABLE_NAME, ident)} but no active SparkSession found.")
}
}

// ---------- conversions ----------

private def toStorageTableIdent(ident: Identifier): StorageTableIdentifier = {
val ns = ident.namespace()
require(
ns.length == 1,
s"UC identifiers must be of the form <schema>.<table>; got namespace of length " +
s"${ns.length}: '${ns.mkString(".")}' (full identifier: '${ident.toString}')")
new StorageTableIdentifier(Array(catalogName, ns(0)), ident.name())
}

/** Three-part dotted name from a `[catalog, schema]` + `name` storage identifier. */
private def fullQualifiedTableName(t: StorageTableIdentifier): String = {
val ns = t.getNamespace
s"${ns(0)}.${ns(1)}.${t.getName}"
}

private def toV1Table(ident: Identifier, info: TableInfo): V1Table = {
val m = info.getMetadata
val properties = Option(m.getConfiguration)
.map(_.asScala.toMap)
.getOrElse(Map.empty[String, String])
val partitionColumns = Option(m.getPartitionColumns)
.map(_.asScala.toSeq)
.getOrElse(Seq.empty[String])
val schema = Option(m.getSchemaString)
.map(DataType.fromJson(_).asInstanceOf[StructType])
.getOrElse(new StructType())
val storage = CatalogStorageFormat.empty.copy(
locationUri = Some(new URI(info.getLocation)),
properties = properties ++ info.getStorageProperties.asScala.toMap)
val catalogTable = CatalogTable(
identifier = TableIdentifier(ident.name(), ident.namespace().headOption, Some(catalogName)),
tableType = fromUcTableType(info.getTableType),
storage = storage,
schema = schema,
provider = Option(m.getProvider).map(_.toLowerCase(java.util.Locale.ROOT)),
partitionColumnNames = partitionColumns,
comment = Option(m.getDescription),
createTime = if (m.getCreatedTime != null) m.getCreatedTime else 0L,
tracksPartitionsInCatalog = false)
V1Table(catalogTable)
}

private def fromUcTableType(t: UCDeltaModels.TableType): CatalogTableType = t match {
case UCDeltaModels.TableType.MANAGED => CatalogTableType.MANAGED
case UCDeltaModels.TableType.EXTERNAL => CatalogTableType.EXTERNAL
}
}

object UCDeltaCatalogClientImpl extends AbstractDeltaCatalogClientFactory with Logging {
// Test-only instrumentation. The mutable counters are encapsulated so production code
// can neither read nor write them; read access is exposed via the `*ForTesting` methods
// below so cross-package integration tests (e.g. `io.sparkuctest.*`) don't need
// reflection.

/** Bumped at every `loadTable` entry regardless of outcome. Read via the *ForTesting API. */
private val loadTableInvocationsCounter: AtomicLong = new AtomicLong(0L)

/**
* Bumped only when `loadTable` returned a Delta table via the Delta REST API (no fallback,
* no rethrow). Read via the *ForTesting API.
*/
private val successfulDeltaRestApiLoadsCounter: AtomicLong = new AtomicLong(0L)

/**
* Test-only read accessor for the `loadTable` invocation counter. Used by integration
* tests to verify the Delta REST API code path ran. Not part of any public API; production
* code must not depend on it.
*/
def loadTableInvocationsForTesting: Long = loadTableInvocationsCounter.get()

/**
* Test-only read accessor for the count of `loadTable` calls served by the Delta REST API
* (no fallback, no rethrow). Not part of any public API.
*/
def successfulDeltaRestApiLoadsForTesting: Long = successfulDeltaRestApiLoadsCounter.get()

private[catalog] val ServerSidePlanningEnabledKey: String = "serverSidePlanning.enabled"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we also have the tests that cover the serverSidePlanning is also working for the delta rest catalog ?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes loadTable falls back to SSP on CredentialFetchFailedException when SSP is enabled and loadTable without serverSidePlanningEnabled rethrows CredentialFetchFailedException


private[catalog] val defaultFallbackLoadTableFunc: Identifier => Table = ident =>
throw new IllegalStateException(
s"Non-Delta table $ident cannot be served via the Delta REST API path and no " +
"fallback catalog was configured.")

/**
* Builds a [[UCDeltaCatalogClientImpl]] from catalog options. The `deltaRestApi.enabled` gate
* is the caller's responsibility ([[AbstractDeltaCatalogClient.fromCatalogOptionsIfEnabled]]).
* `fallbackLoadTableFunc` is invoked when UC reports `UnsupportedTableFormatException`. UC client
* construction is delegated to [[UCTokenBasedRestClientFactory]] with `renewCredential.enabled`
* defaulted to `true` and `credScopedFs.enabled` defaulted to `false` when not set.
*/
override def fromCatalogOptions(
catalogName: String,
options: CaseInsensitiveStringMap,
fallbackLoadTableFunc: Identifier => Table
): UCDeltaCatalogClientImpl = {
// Pre-flight: keep our user-facing errors instead of the factory's less specific ones.
if (options.get(UriKey) == null) {
throw new IllegalArgumentException(s"'$UriKey' is required (catalog '$catalogName')")
}
validateAuthConfigured(options, catalogName)

// `asCaseSensitiveMap()` preserves the user's original key case; `containsKey` is
// case-insensitive so defaults don't create duplicate keys.
val merged = new java.util.HashMap[String, String](options.asCaseSensitiveMap())
Seq(
UCTokenBasedRestClientFactory.DELTA_REST_API_ENABLED_KEY -> "true",
UCTokenBasedRestClientFactory.RENEW_CREDENTIAL_ENABLED_KEY -> "true",
UCTokenBasedRestClientFactory.CRED_SCOPED_FS_ENABLED_KEY -> "false"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait, why the credScopedFs.enabled is set to false by default, I think starts from uc 0.5.0, we will have to set it to true , since we already have such tpc-ds benchmark, and covered that the credScopedFs indeed help us to save the spawn threads and connection pools, for different tables with different credentials.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it was in UCSingleCatalog:

  public static final String CRED_SCOPED_FS_ENABLED = "credScopedFs.enabled";
  public static final boolean DEFAULT_CRED_SCOPED_FS_ENABLED = false;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a PR aimed for uc 0.5.0: unitycatalog/unitycatalog#1479 to make this true by default.

).foreach { case (k, v) => if (!options.containsKey(k)) merged.put(k, v) }
val ucClient = UCTokenBasedRestClientFactory
.createUCClient(new CaseInsensitiveStringMap(merged))
.asInstanceOf[UCDeltaClient]

val sspEnabled = options.getBoolean(ServerSidePlanningEnabledKey, false)
new UCDeltaCatalogClientImpl(catalogName, ucClient, sspEnabled, fallbackLoadTableFunc)
}

private val UriKey: String = "uri"
private val AuthPrefix: String = "auth."
private val LegacyTokenKey: String = "token"

/**
* Pre-flight: ensure at least one of `auth.*` or legacy `token` is present, so the user
* sees a clear error (and catalog name) instead of the factory's internal failure when
* `TokenProvider.create` is handed an empty config.
*/
private[catalog] def validateAuthConfigured(
options: CaseInsensitiveStringMap,
catalogName: String): Unit = {
val hasAuthPrefix = options.entrySet().asScala.exists(_.getKey.startsWith(AuthPrefix))
val hasLegacyToken = options.get(LegacyTokenKey) != null
if (!hasAuthPrefix && !hasLegacyToken) {
throw new IllegalArgumentException(
s"auth configuration is required when 'deltaRestApi.enabled' is true " +
s"(catalog '$catalogName'). Set either '${AuthPrefix}type' (with the corresponding " +
s"$AuthPrefix* keys) or the legacy '$LegacyTokenKey' option.")
}
}
}
Loading
Loading