Skip to content

[Spark] Wire UC Delta Rest Catalog API loadTable into Delta catalog#6660

Open
TimothyW553 wants to merge 2 commits into
delta-io:masterfrom
TimothyW553:stack/drc-loadtable-catalog
Open

[Spark] Wire UC Delta Rest Catalog API loadTable into Delta catalog#6660
TimothyW553 wants to merge 2 commits into
delta-io:masterfrom
TimothyW553:stack/drc-loadtable-catalog

Conversation

@TimothyW553
Copy link
Copy Markdown
Collaborator

@TimothyW553 TimothyW553 commented Apr 23, 2026

🥞 Stacked PR

Use this link to review incremental changes.


Which Delta project/connector is this regarding?

Spark / Unity Catalog

Description

This PR wires named UC Delta table loadTable in the Spark Delta catalog through the storage client added in the previous PR.

Flow:

Spark resolves catalog.schema.table
  -> UCSingleCatalog delegates Delta tables to DeltaCatalog
  -> AbstractDeltaCatalog.loadTable(...)
  -> DeltaCatalogClient.loadTable(...)
  -> UCClient.loadTable(...) through UCTokenBasedRestClient
  -> V1Table(CatalogTable) returned to the existing Delta load path

This PR adds:

  • DeltaCatalogClient for Spark-side UC Delta Rest Catalog API configuration and named-table loading.
  • Kernel StructType to Spark StructType conversion at the Spark boundary.
  • Named-table credential retrieval for cloud-backed UC tables, with READ_WRITE first and READ fallback for read-only principals.
  • Conversion of returned credentials into Hadoop storage properties.
  • Explicit failure when the Spark config enables UC Delta Rest Catalog API but UC does not advertise the required API support.

This is named-table loadTable only. Raw path-based Delta access does not have a table identifier and is handled by the next stacked PR using temporary path credentials.

How was this patch tested?

Covered by DeltaCatalogClientSuite for named-table loading, credential handling, local path behavior, server error propagation, disabled config behavior, and unsupported API failures.

Covered by UCDeltaRestCatalogApiSchemaConverterSuite for Kernel schema to Spark schema conversion.

Covered by UCDeltaTableReadTest in the Spark UC integration tests.

Local verification:

./build/sbt "spark/testOnly org.apache.spark.sql.delta.catalog.UCDeltaRestCatalogApiSchemaConverterSuite org.apache.spark.sql.delta.catalog.DeltaCatalogClientSuite"

Does this PR introduce any user-facing changes?

No released user-facing change. The new path is controlled by spark.sql.catalog.<catalog>.deltaRestApi.enabled; when that config is disabled, Delta stays on the existing legacy path.

@TimothyW553
Copy link
Copy Markdown
Collaborator Author

Range-diff: stack/drc-loadtable-storage (a215f41 -> 018519b)
spark/src/test/scala/org/apache/spark/sql/delta/catalog/DeltaCatalogLoadTableSuite.scala
@@ -0,0 +1,244 @@
+diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/catalog/DeltaCatalogLoadTableSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/catalog/DeltaCatalogLoadTableSuite.scala
+new file mode 100644
+--- /dev/null
++++ b/spark/src/test/scala/org/apache/spark/sql/delta/catalog/DeltaCatalogLoadTableSuite.scala
++/*
++ * Copyright (2026) The Delta Lake Project Authors.
++ *
++ * Licensed under the Apache License, Version 2.0 (the "License");
++ * you may not use this file except in compliance with the License.
++ * You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.spark.sql.delta.catalog
++
++import java.net.InetSocketAddress
++import java.nio.charset.StandardCharsets
++import java.util.{Collections, Optional}
++
++import com.fasterxml.jackson.databind.ObjectMapper
++import com.sun.net.httpserver.{HttpExchange, HttpServer}
++import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorClient.UC_TABLE_ID_KEY
++import io.unitycatalog.client.{ApiClient, ApiClientBuilder}
++import io.unitycatalog.client.auth.TokenProvider
++import io.unitycatalog.client.delta.DeltaRestClientProvider
++import io.unitycatalog.client.delta.api.TablesApi
++
++import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
++
++import org.apache.spark.sql.{QueryTest, connector}
++import org.apache.spark.sql.catalyst.catalog.CatalogTable
++import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog, TableChange}
++import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
++import org.apache.spark.sql.types.StructType
++import org.apache.spark.sql.util.CaseInsensitiveStringMap
++
++class DeltaCatalogLoadTableSuite
++    extends QueryTest
++    with DeltaSQLCommandTest
++    with BeforeAndAfterAll
++    with BeforeAndAfterEach {
++
++  private val objectMapper = new ObjectMapper()
++
++  private var server: HttpServer = _
++  private var serverUri: String = _
++  private var drcLoadTableHandler: HttpExchange => Unit = _
++
++  override def beforeAll(): Unit = {
++    super.beforeAll()
++    server = HttpServer.create(new InetSocketAddress("localhost", 0), 0)
++    server.createContext("/api/2.1/unity-catalog/delta/v1/catalogs", exchange => {
++      if (drcLoadTableHandler != null) drcLoadTableHandler(exchange)
++      else sendJson(exchange, 404, "{}")
++      exchange.close()
++    })
++    server.start()
++    serverUri = s"http://localhost:${server.getAddress.getPort}"
++  }
++
++  override def afterAll(): Unit = {
++    if (server != null) server.stop(0)
++    super.afterAll()
++  }
++
++  override def beforeEach(): Unit = {
++    super.beforeEach()
++    drcLoadTableHandler = null
++  }
++
++  test("loadTable uses DRC for single-schema identifiers and converts metadata") {
++    drcLoadTableHandler = exchange => {
++      assert(exchange.getRequestMethod === "GET")
++      assert(
++        exchange.getRequestURI.getPath ===
++          "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl")
++      sendJson(exchange, 200, loadTableResponseJson("DELTA"))
++    }
++
++    val fallbackTable = MarkerTable("delegate")
++    val apiClient = createApiClient()
++    val delegate = new TestDeltaRestProviderCatalog(
++      apiClient,
++      Optional.of(new TablesApi(apiClient)),
++      fallbackTable)
++    val catalog = new TestDeltaCatalog()
++    catalog.setDelegateCatalog(delegate)
++
++    val loaded = catalog.loadTable(Identifier.of(Array("default"), "tbl"))
++
++    assert(loaded.name() === "rest")
++    assert(delegate.loadTableCalls === 0)
++    assert(catalog.loadedCatalogTable.identifier.table === "tbl")
++    assert(catalog.loadedCatalogTable.storage.properties(UC_TABLE_ID_KEY) ===
++      "11111111-1111-1111-1111-111111111111")
++  }
++
++  test("loadTable falls back to delegate for non-delta metadata") {
++    drcLoadTableHandler = exchange => {
++      sendJson(exchange, 200, loadTableResponseJson("PARQUET"))
++    }
++
++    val fallbackTable = MarkerTable("delegate")
++    val apiClient = createApiClient()
++    val delegate = new TestDeltaRestProviderCatalog(
++      apiClient,
++      Optional.of(new TablesApi(apiClient)),
++      fallbackTable)
++    val catalog = new TestDeltaCatalog()
++    catalog.setDelegateCatalog(delegate)
++
++    val loaded = catalog.loadTable(Identifier.of(Array("default"), "tbl"))
++
++    assert(loaded eq fallbackTable)
++    assert(delegate.loadTableCalls === 1)
++    assert(catalog.loadedCatalogTable == null)
++  }
++
++  test("loadTable falls back to delegate for non-schema identifiers") {
++    val fallbackTable = MarkerTable("delegate")
++    val apiClient = createApiClient()
++    val delegate = new TestDeltaRestProviderCatalog(
++      apiClient,
++      Optional.of(new TablesApi(apiClient)),
++      fallbackTable)
++    val catalog = new TestDeltaCatalog()
++    catalog.setDelegateCatalog(delegate)
++
++    val loaded = catalog.loadTable(Identifier.of(Array("catalog", "schema"), "tbl"))
++
++    assert(loaded eq fallbackTable)
++    assert(delegate.loadTableCalls === 1)
++  }
++
++  private def createApiClient(): ApiClient = ApiClientBuilder.create()
++    .uri(serverUri)
++    .tokenProvider(new TokenProvider {
++      override def accessToken(): String = "mock-token"
++      override def initialize(configs: java.util.Map[String, String]): Unit = {}
++      override def configs(): java.util.Map[String, String] = Collections.emptyMap()
++    })
++    .build()
++
++  private def loadTableResponseJson(dataSourceFormat: String): String =
++    s"""{
++       |  "metadata": {
++       |    "etag": "etag-1",
++       |    "data-source-format": "$dataSourceFormat",
++       |    "table-type": "MANAGED",
++       |    "table-uuid": "11111111-1111-1111-1111-111111111111",
++       |    "location": "file:/tmp/delta-table",
++       |    "created-time": 10,
++       |    "updated-time": 11,
++       |    "securable-type": "TABLE",
++       |    "columns": {
++       |      "type": "struct",
++       |      "fields": [
++       |        {
++       |          "name": "id",
++       |          "type": "long",
++       |          "nullable": false,
++       |          "metadata": {}
++       |        }
++       |      ]
++       |    },
++       |    "partition-columns": ["id"],
++       |    "properties": {"delta.appendOnly":"true"}
++       |  },
++       |  "commits": []
++       |}""".stripMargin
++
++  private def sendJson(exchange: HttpExchange, status: Int, body: String): Unit = {
++    val bytes = body.getBytes(StandardCharsets.UTF_8)
++    exchange.getResponseHeaders.add("Content-Type", "application/json")
++    exchange.sendResponseHeaders(status, bytes.length)
++    exchange.getResponseBody.write(bytes)
++    exchange.getResponseBody.close()
++  }
++
++  private case class MarkerTable(marker: String) extends Table {
++    override def name(): String = marker
++    override def schema(): StructType = StructType(Nil)
++    override def capabilities(): java.util.Set[connector.catalog.TableCapability] =
++      Collections.emptySet()
++  }
++
++  private class TestDeltaCatalog extends AbstractDeltaCatalog {
++    var loadedCatalogTable: CatalogTable = _
++
++    override def loadCatalogTable(ident: Identifier, catalogTable: CatalogTable): Table = {
++      loadedCatalogTable = catalogTable
++      MarkerTable("rest")
++    }
++  }
++
++  private class TestDeltaRestProviderCatalog(
++      apiClient: ApiClient,
++      deltaTablesApi: Optional[TablesApi],
++      fallbackTable: Table)
++    extends TableCatalog
++    with DeltaRestClientProvider {
++
++    var loadTableCalls: Int = 0
++
++    override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {}
++
++    override def name(): String = "uc"
++
++    override def listTables(namespace: Array[String]): Array[Identifier] = Array.empty
++
++    override def loadTable(ident: Identifier): Table = {
++      loadTableCalls += 1
++      fallbackTable
++    }
++
++    override def createTable(
++        ident: Identifier,
++        schema: StructType,
++        partitions: Array[org.apache.spark.sql.connector.expressions.Transform],
++        properties: java.util.Map[String, String]): Table = {
++      throw new UnsupportedOperationException("not needed in this test")
++    }
++
++    override def alterTable(ident: Identifier, changes: TableChange*): Table =
++      throw new UnsupportedOperationException("not needed in this test")
++
++    override def dropTable(ident: Identifier): Boolean =
++      throw new UnsupportedOperationException("not needed in this test")
++
++    override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit =
++      throw new UnsupportedOperationException("not needed in this test")
++
++    override def getDeltaTablesApi(): Optional[TablesApi] = deltaTablesApi
++
++    override def getApiClient(): ApiClient = apiClient
++  }
++}
\ No newline at end of file

Reproduce locally: git range-diff 4610c42..a215f41 4610c42..018519b | Disable: git config gitstack.push-range-diff false

@TimothyW553 TimothyW553 force-pushed the stack/drc-loadtable-catalog branch from 018519b to 900cb31 Compare April 23, 2026 07:26
@TimothyW553
Copy link
Copy Markdown
Collaborator Author

Range-diff: stack/drc-loadtable-storage (018519b -> 900cb31)
spark/src/test/scala/org/apache/spark/sql/delta/catalog/DeltaCatalogLoadTableSuite.scala
@@ -38,7 +38,7 @@
 +import org.apache.spark.sql.catalyst.catalog.CatalogTable
 +import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog, TableChange}
 +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
-+import org.apache.spark.sql.types.StructType
++import org.apache.spark.sql.types.{ArrayType, DecimalType, LongType, StringType, StructType}
 +import org.apache.spark.sql.util.CaseInsensitiveStringMap
 +
 +class DeltaCatalogLoadTableSuite
@@ -100,6 +100,19 @@
 +    assert(catalog.loadedCatalogTable.identifier.table === "tbl")
 +    assert(catalog.loadedCatalogTable.storage.properties(UC_TABLE_ID_KEY) ===
 +      "11111111-1111-1111-1111-111111111111")
++    assert(catalog.loadedCatalogTable.partitionColumnNames === Seq("id"))
++
++    val schema = catalog.loadedCatalogTable.schema
++    assert(schema("id").dataType === LongType)
++    assert(schema("payload").metadata.getString("comment") === "payload struct")
++
++    val payloadType = schema("payload").dataType.asInstanceOf[StructType]
++    val tagsField = payloadType("tags")
++    val amountField = payloadType("amount")
++
++    assert(tagsField.dataType === ArrayType(StringType, containsNull = true))
++    assert(tagsField.metadata.getString("comment") === "nested tags")
++    assert(amountField.dataType === DecimalType(10, 2))
 +  }
 +
 +  test("loadTable falls back to delegate for non-delta metadata") {
@@ -166,7 +179,33 @@
 +       |          "name": "id",
 +       |          "type": "long",
 +       |          "nullable": false,
-+       |          "metadata": {}
++       |          "metadata": {"delta.columnMapping.id": 1}
++       |        },
++       |        {
++       |          "name": "payload",
++       |          "type": {
++       |            "type": "struct",
++       |            "fields": [
++       |              {
++       |                "name": "tags",
++       |                "type": {
++       |                  "type": "array",
++       |                  "element-type": "string",
++       |                  "contains-null": true
++       |                },
++       |                "nullable": true,
++       |                "metadata": {"comment": "nested tags"}
++       |              },
++       |              {
++       |                "name": "amount",
++       |                "type": "decimal(10,2)",
++       |                "nullable": false,
++       |                "metadata": {}
++       |              }
++       |            ]
++       |          },
++       |          "nullable": true,
++       |          "metadata": {"comment": "payload struct"}
 +       |        }
 +       |      ]
 +       |    },
spark/src/test/scala/org/apache/spark/sql/delta/catalog/DeltaRestSchemaConverterSuite.scala
@@ -0,0 +1,46 @@
+diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/catalog/DeltaRestSchemaConverterSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/catalog/DeltaRestSchemaConverterSuite.scala
+new file mode 100644
+--- /dev/null
++++ b/spark/src/test/scala/org/apache/spark/sql/delta/catalog/DeltaRestSchemaConverterSuite.scala
++/*
++ * Copyright (2026) The Delta Lake Project Authors.
++ *
++ * Licensed under the Apache License, Version 2.0 (the "License");
++ * you may not use this file except in compliance with the License.
++ * You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.spark.sql.delta.catalog
++
++import java.util.Collections
++
++import io.unitycatalog.client.delta.model
++
++import org.scalatest.funsuite.AnyFunSuite
++
++class DeltaRestSchemaConverterSuite extends AnyFunSuite {
++
++  test("toSparkSchema fails loudly for unsupported Delta REST types") {
++    val unsupportedType = new model.DeltaType().`type`("mystery")
++    val field = new model.StructField()
++      .name("bad")
++      .`type`(unsupportedType)
++      .nullable(true)
++      .metadata(Collections.emptyMap())
++    val schema = new model.StructType().fields(Collections.singletonList(field))
++
++    val error = intercept[IllegalArgumentException] {
++      DeltaRestSchemaConverter.toSparkSchema(schema)
++    }
++
++    assert(error.getMessage.contains("Unsupported Delta REST schema type"))
++  }
++}
\ No newline at end of file

Reproduce locally: git range-diff 4610c42..018519b 5c40a3a..900cb31 | Disable: git config gitstack.push-range-diff false

@TimothyW553 TimothyW553 force-pushed the stack/drc-loadtable-catalog branch from 900cb31 to b2a7aff Compare April 23, 2026 07:34
@TimothyW553
Copy link
Copy Markdown
Collaborator Author

Range-diff: stack/drc-loadtable-storage (900cb31 -> b2a7aff)
spark/src/test/scala/org/apache/spark/sql/delta/catalog/DeltaCatalogLoadTableSuite.scala
@@ -24,9 +24,9 @@
 +import java.nio.charset.StandardCharsets
 +import java.util.{Collections, Optional}
 +
-+import com.fasterxml.jackson.databind.ObjectMapper
 +import com.sun.net.httpserver.{HttpExchange, HttpServer}
 +import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorClient.UC_TABLE_ID_KEY
++import io.unitycatalog.client.delta.model
 +import io.unitycatalog.client.{ApiClient, ApiClientBuilder}
 +import io.unitycatalog.client.auth.TokenProvider
 +import io.unitycatalog.client.delta.DeltaRestClientProvider
@@ -46,8 +46,6 @@
 +    with DeltaSQLCommandTest
 +    with BeforeAndAfterAll
 +    with BeforeAndAfterEach {
-+
-+  private val objectMapper = new ObjectMapper()
 +
 +  private var server: HttpServer = _
 +  private var serverUri: String = _
@@ -85,13 +83,7 @@
 +    }
 +
 +    val fallbackTable = MarkerTable("delegate")
-+    val apiClient = createApiClient()
-+    val delegate = new TestDeltaRestProviderCatalog(
-+      apiClient,
-+      Optional.of(new TablesApi(apiClient)),
-+      fallbackTable)
-+    val catalog = new TestDeltaCatalog()
-+    catalog.setDelegateCatalog(delegate)
++    val (catalog, delegate) = createCatalog(fallbackTable)
 +
 +    val loaded = catalog.loadTable(Identifier.of(Array("default"), "tbl"))
 +
@@ -121,13 +113,7 @@
 +    }
 +
 +    val fallbackTable = MarkerTable("delegate")
-+    val apiClient = createApiClient()
-+    val delegate = new TestDeltaRestProviderCatalog(
-+      apiClient,
-+      Optional.of(new TablesApi(apiClient)),
-+      fallbackTable)
-+    val catalog = new TestDeltaCatalog()
-+    catalog.setDelegateCatalog(delegate)
++    val (catalog, delegate) = createCatalog(fallbackTable)
 +
 +    val loaded = catalog.loadTable(Identifier.of(Array("default"), "tbl"))
 +
@@ -138,13 +124,7 @@
 +
 +  test("loadTable falls back to delegate for non-schema identifiers") {
 +    val fallbackTable = MarkerTable("delegate")
-+    val apiClient = createApiClient()
-+    val delegate = new TestDeltaRestProviderCatalog(
-+      apiClient,
-+      Optional.of(new TablesApi(apiClient)),
-+      fallbackTable)
-+    val catalog = new TestDeltaCatalog()
-+    catalog.setDelegateCatalog(delegate)
++    val (catalog, delegate) = createCatalog(fallbackTable)
 +
 +    val loaded = catalog.loadTable(Identifier.of(Array("catalog", "schema"), "tbl"))
 +
@@ -152,6 +132,22 @@
 +    assert(delegate.loadTableCalls === 1)
 +  }
 +
++  test("DeltaRestSchemaConverter fails loudly for unsupported Delta REST types") {
++    val unsupportedType = new model.DeltaType().`type`("mystery")
++    val field = new model.StructField()
++      .name("bad")
++      .`type`(unsupportedType)
++      .nullable(true)
++      .metadata(Collections.emptyMap())
++    val schema = new model.StructType().fields(Collections.singletonList(field))
++
++    val error = intercept[IllegalArgumentException] {
++      DeltaRestSchemaConverter.toSparkSchema(schema)
++    }
++
++    assert(error.getMessage.contains("Unsupported Delta REST schema type"))
++  }
++
 +  private def createApiClient(): ApiClient = ApiClientBuilder.create()
 +    .uri(serverUri)
 +    .tokenProvider(new TokenProvider {
@@ -161,6 +157,18 @@
 +    })
 +    .build()
 +
++  private def createCatalog(
++      fallbackTable: Table): (TestDeltaCatalog, TestDeltaRestProviderCatalog) = {
++    val apiClient = createApiClient()
++    val delegate = new TestDeltaRestProviderCatalog(
++      apiClient,
++      Optional.of(new TablesApi(apiClient)),
++      fallbackTable)
++    val catalog = new TestDeltaCatalog()
++    catalog.setDelegateCatalog(delegate)
++    (catalog, delegate)
++  }
++
 +  private def loadTableResponseJson(dataSourceFormat: String): String =
 +    s"""{
 +       |  "metadata": {
spark/src/test/scala/org/apache/spark/sql/delta/catalog/DeltaRestSchemaConverterSuite.scala
@@ -1,46 +0,0 @@
-diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/catalog/DeltaRestSchemaConverterSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/catalog/DeltaRestSchemaConverterSuite.scala
-new file mode 100644
---- /dev/null
-+++ b/spark/src/test/scala/org/apache/spark/sql/delta/catalog/DeltaRestSchemaConverterSuite.scala
-+/*
-+ * Copyright (2026) The Delta Lake Project Authors.
-+ *
-+ * Licensed under the Apache License, Version 2.0 (the "License");
-+ * you may not use this file except in compliance with the License.
-+ * You may obtain a copy of the License at
-+ *
-+ * http://www.apache.org/licenses/LICENSE-2.0
-+ *
-+ * Unless required by applicable law or agreed to in writing, software
-+ * distributed under the License is distributed on an "AS IS" BASIS,
-+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-+ * See the License for the specific language governing permissions and
-+ * limitations under the License.
-+ */
-+
-+package org.apache.spark.sql.delta.catalog
-+
-+import java.util.Collections
-+
-+import io.unitycatalog.client.delta.model
-+
-+import org.scalatest.funsuite.AnyFunSuite
-+
-+class DeltaRestSchemaConverterSuite extends AnyFunSuite {
-+
-+  test("toSparkSchema fails loudly for unsupported Delta REST types") {
-+    val unsupportedType = new model.DeltaType().`type`("mystery")
-+    val field = new model.StructField()
-+      .name("bad")
-+      .`type`(unsupportedType)
-+      .nullable(true)
-+      .metadata(Collections.emptyMap())
-+    val schema = new model.StructType().fields(Collections.singletonList(field))
-+
-+    val error = intercept[IllegalArgumentException] {
-+      DeltaRestSchemaConverter.toSparkSchema(schema)
-+    }
-+
-+    assert(error.getMessage.contains("Unsupported Delta REST schema type"))
-+  }
-+}
\ No newline at end of file

Reproduce locally: git range-diff 5c40a3a..900cb31 8361784..b2a7aff | Disable: git config gitstack.push-range-diff false

@TimothyW553
Copy link
Copy Markdown
Collaborator Author

Range-diff: stack/drc-loadtable-storage (b2a7aff -> 62b4288)
spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalogClient.scala
@@ -20,12 +20,14 @@
 +
 +package org.apache.spark.sql.delta.catalog
 +
++import java.net.URI
++
 +import scala.collection.JavaConverters._
 +
 +import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorClient.UC_TABLE_ID_KEY
 +import io.delta.storage.commit.uccommitcoordinator.{UCDeltaClient, UCTokenBasedRestClient}
 +import io.unitycatalog.client.delta.DeltaRestClientProvider
-+import io.unitycatalog.client.delta.model.{DataSourceFormat => DeltaDataSourceFormat, TableType => DeltaTableType}
++import io.unitycatalog.client.delta.model.{CredentialOperation, CredentialsResponse, DataSourceFormat => DeltaDataSourceFormat, StorageCredential, TableType => DeltaTableType}
 +
 +import org.apache.spark.internal.Logging
 +import org.apache.spark.sql.catalyst.TableIdentifier
@@ -41,11 +43,16 @@
 +  def loadTable(ident: Identifier): Table = {
 +    ucDeltaClient match {
 +      case Some(client) if ident.namespace().length == 1 =>
-+        val metadata = client
-+          .loadTable(catalogName, ident.namespace().head, ident.name())
-+          .getMetadata
++        val schemaName = ident.namespace().head
++        val tableName = ident.name()
++        val metadata = client.loadTable(catalogName, schemaName, tableName).getMetadata
 +        if (metadata.getDataSourceFormat == DeltaDataSourceFormat.DELTA) {
-+          V1Table(toCatalogTable(ident, metadata))
++          val credentials = client.getTableCredentials(
++            CredentialOperation.READ,
++            catalogName,
++            schemaName,
++            tableName)
++          V1Table(toCatalogTable(ident, metadata, credentials))
 +        } else {
 +          delegate.loadTable(ident)
 +        }
@@ -56,7 +63,8 @@
 +
 +  private def toCatalogTable(
 +      ident: Identifier,
-+      metadata: io.unitycatalog.client.delta.model.TableMetadata): CatalogTable = {
++      metadata: io.unitycatalog.client.delta.model.TableMetadata,
++      credentials: CredentialsResponse): CatalogTable = {
 +    CatalogTable(
 +      identifier =
 +        TableIdentifier(ident.name(), ident.namespace().lastOption, Some(catalogName)),
@@ -70,7 +78,7 @@
 +        outputFormat = None,
 +        serde = None,
 +        compressed = false,
-+        properties = Map(UC_TABLE_ID_KEY -> metadata.getTableUuid.toString)),
++        properties = toStorageProperties(metadata, credentials)),
 +      schema = DeltaRestSchemaConverter.toSparkSchema(metadata.getColumns),
 +      provider = Some(DeltaSourceUtils.ALT_NAME),
 +      partitionColumnNames = Option(metadata.getPartitionColumns)
@@ -80,13 +88,76 @@
 +        .map(_.asScala.toMap)
 +        .getOrElse(Map.empty))
 +  }
++
++  private def toStorageProperties(
++      metadata: io.unitycatalog.client.delta.model.TableMetadata,
++      credentials: CredentialsResponse): Map[String, String] = {
++    Map(UC_TABLE_ID_KEY -> metadata.getTableUuid.toString) ++
++      selectStorageCredential(metadata.getLocation, credentials).map { credential =>
++        credentialToHadoopProperties(metadata.getLocation, credential)
++      }.getOrElse(Map.empty)
++  }
++
++  private def selectStorageCredential(
++      location: String,
++      credentials: CredentialsResponse): Option[StorageCredential] = {
++    Option(credentials)
++      .flatMap(c => Option(c.getStorageCredentials))
++      .map(_.asScala.toSeq)
++      .getOrElse(Seq.empty)
++      .filter { credential =>
++        val prefix = Option(credential.getPrefix).getOrElse("")
++        prefix.isEmpty || location.startsWith(prefix)
++      }
++      .sortBy(credential => Option(credential.getPrefix).map(_.length).getOrElse(0))
++      .lastOption
++  }
++
++  private def credentialToHadoopProperties(
++      location: String,
++      credential: StorageCredential): Map[String, String] = {
++    val config = credential.getConfig
++    if (config == null) {
++      Map.empty
++    } else if (config.getS3AccessKeyId != null || config.getS3SecretAccessKey != null ||
++        config.getS3SessionToken != null) {
++      Map(
++        "fs.s3a.access.key" -> Option(config.getS3AccessKeyId),
++        "fs.s3a.secret.key" -> Option(config.getS3SecretAccessKey),
++        "fs.s3a.session.token" -> Option(config.getS3SessionToken)
++      ).collect { case (key, Some(value)) => key -> value }
++    } else if (config.getAzureSasToken != null) {
++      val accountHost = storageUriHost(Option(credential.getPrefix).getOrElse(location))
++      Map(
++        "fs.abfs.impl.disable.cache" -> "true",
++        "fs.abfss.impl.disable.cache" -> "true",
++        s"fs.azure.account.auth.type.$accountHost" -> "SAS",
++        s"fs.azure.sas.fixed.token.$accountHost" -> config.getAzureSasToken)
++    } else if (config.getGcsOauthToken != null) {
++      Map(
++        "fs.gs.impl.disable.cache" -> "true",
++        "fs.gs.auth.type" -> "ACCESS_TOKEN_PROVIDER",
++        "fs.gs.auth.access.token" -> config.getGcsOauthToken
++      ) ++ Option(credential.getExpirationTimeMs)
++        .map(expiration => "fs.gs.auth.access.token.expiration.ms" -> expiration.toString)
++    } else {
++      Map.empty
++    }
++  }
++
++  private def storageUriHost(location: String): String = {
++    val uri = new URI(location)
++    Option(uri.getHost).getOrElse {
++      throw new IllegalArgumentException(s"Missing storage host in location: $location")
++    }
++  }
 +}
 +
 +private object DeltaCatalogClient {
 +  def apply(delegatePlugin: CatalogPlugin): DeltaCatalogClient = {
 +    val delegate = delegatePlugin.asInstanceOf[TableCatalog]
 +    val ucDeltaClient = delegatePlugin match {
-+      case provider: DeltaRestClientProvider =>
++      case provider: DeltaRestClientProvider if provider.getDeltaTablesApi.isPresent =>
 +        Some(new UCTokenBasedRestClient(
 +          provider.getApiClient(),
 +          provider.getDeltaTablesApi))
spark/src/test/scala/org/apache/spark/sql/delta/catalog/DeltaCatalogLoadTableSuite.scala
@@ -50,6 +50,8 @@
 +  private var server: HttpServer = _
 +  private var serverUri: String = _
 +  private var drcLoadTableHandler: HttpExchange => Unit = _
++  private var loadTableRequestCount: Int = _
++  private var credentialRequestCount: Int = _
 +
 +  override def beforeAll(): Unit = {
 +    super.beforeAll()
@@ -71,15 +73,25 @@
 +  override def beforeEach(): Unit = {
 +    super.beforeEach()
 +    drcLoadTableHandler = null
++    loadTableRequestCount = 0
++    credentialRequestCount = 0
 +  }
 +
 +  test("loadTable uses DRC for single-schema identifiers and converts metadata") {
 +    drcLoadTableHandler = exchange => {
-+      assert(exchange.getRequestMethod === "GET")
-+      assert(
-+        exchange.getRequestURI.getPath ===
-+          "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl")
-+      sendJson(exchange, 200, loadTableResponseJson("DELTA"))
++      exchange.getRequestURI.getPath match {
++        case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl" =>
++          loadTableRequestCount += 1
++          assert(exchange.getRequestMethod === "GET")
++          sendJson(exchange, 200, loadTableResponseJson("DELTA"))
++        case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl/credentials" =>
++          credentialRequestCount += 1
++          assert(exchange.getRequestMethod === "GET")
++          assert(exchange.getRequestURI.getQuery === "operation=READ")
++          sendJson(exchange, 200, credentialsResponseJson())
++        case path =>
++          fail(s"Unexpected DRC request path: $path")
++      }
 +    }
 +
 +    val fallbackTable = MarkerTable("delegate")
@@ -89,9 +101,17 @@
 +
 +    assert(loaded.name() === "rest")
 +    assert(delegate.loadTableCalls === 0)
++    assert(loadTableRequestCount === 1)
++    assert(credentialRequestCount === 1)
 +    assert(catalog.loadedCatalogTable.identifier.table === "tbl")
 +    assert(catalog.loadedCatalogTable.storage.properties(UC_TABLE_ID_KEY) ===
 +      "11111111-1111-1111-1111-111111111111")
++    assert(catalog.loadedCatalogTable.storage.properties("fs.s3a.access.key") ===
++      "fakeAccessKey")
++    assert(catalog.loadedCatalogTable.storage.properties("fs.s3a.secret.key") ===
++      "fakeSecretKey")
++    assert(catalog.loadedCatalogTable.storage.properties("fs.s3a.session.token") ===
++      "fakeSessionToken")
 +    assert(catalog.loadedCatalogTable.partitionColumnNames === Seq("id"))
 +
 +    val schema = catalog.loadedCatalogTable.schema
@@ -109,7 +129,13 @@
 +
 +  test("loadTable falls back to delegate for non-delta metadata") {
 +    drcLoadTableHandler = exchange => {
-+      sendJson(exchange, 200, loadTableResponseJson("PARQUET"))
++      exchange.getRequestURI.getPath match {
++        case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl" =>
++          loadTableRequestCount += 1
++          sendJson(exchange, 200, loadTableResponseJson("PARQUET"))
++        case path =>
++          fail(s"Unexpected DRC request path: $path")
++      }
 +    }
 +
 +    val fallbackTable = MarkerTable("delegate")
@@ -119,6 +145,8 @@
 +
 +    assert(loaded eq fallbackTable)
 +    assert(delegate.loadTableCalls === 1)
++    assert(loadTableRequestCount === 1)
++    assert(credentialRequestCount === 0)
 +    assert(catalog.loadedCatalogTable == null)
 +  }
 +
@@ -176,7 +204,7 @@
 +       |    "data-source-format": "$dataSourceFormat",
 +       |    "table-type": "MANAGED",
 +       |    "table-uuid": "11111111-1111-1111-1111-111111111111",
-+       |    "location": "file:/tmp/delta-table",
++       |    "location": "s3://bucket/path/to/table",
 +       |    "created-time": 10,
 +       |    "updated-time": 11,
 +       |    "securable-type": "TABLE",
@@ -223,6 +251,22 @@
 +       |  "commits": []
 +       |}""".stripMargin
 +
++  private def credentialsResponseJson(): String =
++    """{
++      |  "storage-credentials": [
++      |    {
++      |      "prefix": "s3://bucket/path/to/table",
++      |      "operation": "READ",
++      |      "config": {
++      |        "s3.access-key-id": "fakeAccessKey",
++      |        "s3.secret-access-key": "fakeSecretKey",
++      |        "s3.session-token": "fakeSessionToken"
++      |      },
++      |      "expiration-time-ms": 1700000000000
++      |    }
++      |  ]
++      |}""".stripMargin
++
 +  private def sendJson(exchange: HttpExchange, status: Int, body: String): Unit = {
 +    val bytes = body.getBytes(StandardCharsets.UTF_8)
 +    exchange.getResponseHeaders.add("Content-Type", "application/json")

Reproduce locally: git range-diff 8361784..b2a7aff 8361784..62b4288 | Disable: git config gitstack.push-range-diff false

@TimothyW553 TimothyW553 force-pushed the stack/drc-loadtable-catalog branch 2 times, most recently from a2b0fa0 to b91d3d0 Compare April 23, 2026 23:28
@TimothyW553
Copy link
Copy Markdown
Collaborator Author

Range-diff: stack/drc-loadtable-storage (a2b0fa0 -> b91d3d0)
build.sbt
@@ -1,11 +0,0 @@
-diff --git a/build.sbt b/build.sbt
---- a/build.sbt
-+++ b/build.sbt
-   ).configureUnidoc()
- 
- 
--val unityCatalogVersion = sys.props.getOrElse("unityCatalogVersion", "0.4.1")
-+val unityCatalogVersion = sys.props.getOrElse("unityCatalogVersion", "0.5.0-SNAPSHOT")
- val sparkUnityCatalogJacksonVersion = "2.15.4" // We are using Spark 4.0's Jackson version 2.15.x, to override Unity Catalog 0.3.0's version 2.18.x
- 
- lazy val sparkUnityCatalog = (project in file("spark/unitycatalog"))
\ No newline at end of file
storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaClient.java
@@ -1,108 +0,0 @@
-diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaClient.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaClient.java
-new file mode 100644
---- /dev/null
-+++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaClient.java
-+/*
-+ * Copyright (2026) The Delta Lake Project Authors.
-+ *
-+ * Licensed under the Apache License, Version 2.0 (the "License");
-+ * you may not use this file except in compliance with the License.
-+ * You may obtain a copy of the License at
-+ *
-+ * http://www.apache.org/licenses/LICENSE-2.0
-+ *
-+ * Unless required by applicable law or agreed to in writing, software
-+ * distributed under the License is distributed on an "AS IS" BASIS,
-+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-+ * See the License for the specific language governing permissions and
-+ * limitations under the License.
-+ */
-+
-+package io.delta.storage.commit.uccommitcoordinator;
-+
-+import io.delta.storage.commit.Commit;
-+import io.delta.storage.commit.CommitFailedException;
-+import io.delta.storage.commit.GetCommitsResponse;
-+import io.delta.storage.commit.actions.AbstractMetadata;
-+import io.delta.storage.commit.actions.AbstractProtocol;
-+import io.delta.storage.commit.uniform.UniformMetadata;
-+import io.unitycatalog.client.delta.model.CredentialOperation;
-+import io.unitycatalog.client.delta.model.CredentialsResponse;
-+import io.unitycatalog.client.delta.model.LoadTableResponse;
-+
-+import java.io.IOException;
-+import java.net.URI;
-+import java.util.Optional;
-+
-+/**
-+ * Delta-specific UC client surface layered on top of the legacy {@link UCClient}.
-+ */
-+public interface UCDeltaClient extends UCClient {
-+
-+  /**
-+   * Loads table metadata via the Delta REST Catalog API.
-+   */
-+  LoadTableResponse loadTable(
-+      String catalog,
-+      String schema,
-+      String table) throws IOException;
-+
-+  /**
-+   * Vends temporary storage credentials for a table via the Delta REST Catalog API.
-+   */
-+  CredentialsResponse getTableCredentials(
-+      CredentialOperation operation,
-+      String catalog,
-+      String schema,
-+      String table) throws IOException;
-+
-+  /**
-+   * Placeholder for the future DRC createTable path.
-+   */
-+  default void createTable(
-+      String catalog,
-+      String schema,
-+      String table,
-+      String location,
-+      AbstractMetadata metadata,
-+      AbstractProtocol protocol,
-+      boolean isManaged) throws CommitFailedException {
-+    throw new UnsupportedOperationException("DRC createTable is not implemented yet.");
-+  }
-+
-+  /**
-+   * Placeholder for the future name-based DRC commit path.
-+   */
-+  default void commit(
-+      String catalog,
-+      String schema,
-+      String table,
-+      String tableId,
-+      URI tableUri,
-+      Optional<Commit> commit,
-+      Optional<Long> lastKnownBackfilledVersion,
-+      boolean disown,
-+      Optional<AbstractMetadata> oldMetadata,
-+      Optional<AbstractMetadata> newMetadata,
-+      Optional<AbstractProtocol> oldProtocol,
-+      Optional<AbstractProtocol> newProtocol,
-+      Optional<UniformMetadata> uniform,
-+      Optional<String> etag)
-+      throws IOException, CommitFailedException, UCCommitCoordinatorException {
-+    throw new UnsupportedOperationException("DRC commit is not implemented yet.");
-+  }
-+
-+  /**
-+   * Placeholder for the future name-based DRC getCommits path.
-+   */
-+  default GetCommitsResponse getCommits(
-+      String catalog,
-+      String schema,
-+      String table,
-+      String tableId,
-+      URI tableUri,
-+      Optional<Long> startVersion,
-+      Optional<Long> endVersion) throws IOException, UCCommitCoordinatorException {
-+    throw new UnsupportedOperationException("DRC getCommits is not implemented yet.");
-+  }
-+}
\ No newline at end of file
storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaRestCatalogUtils.java
@@ -1,141 +0,0 @@
-diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaRestCatalogUtils.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaRestCatalogUtils.java
-new file mode 100644
---- /dev/null
-+++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaRestCatalogUtils.java
-+/*
-+ * Copyright (2026) The Delta Lake Project Authors.
-+ *
-+ * Licensed under the Apache License, Version 2.0 (the "License");
-+ * you may not use this file except in compliance with the License.
-+ * You may obtain a copy of the License at
-+ *
-+ * http://www.apache.org/licenses/LICENSE-2.0
-+ *
-+ * Unless required by applicable law or agreed to in writing, software
-+ * distributed under the License is distributed on an "AS IS" BASIS,
-+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-+ * See the License for the specific language governing permissions and
-+ * limitations under the License.
-+ */
-+
-+package io.delta.storage.commit.uccommitcoordinator;
-+
-+import com.fasterxml.jackson.core.JsonProcessingException;
-+import com.fasterxml.jackson.databind.ObjectMapper;
-+import io.unitycatalog.client.ApiException;
-+import io.unitycatalog.client.api.TablesApi;
-+import io.unitycatalog.client.delta.model.LoadTableResponse;
-+import io.unitycatalog.client.model.ColumnInfo;
-+import io.unitycatalog.client.model.TableInfo;
-+
-+import java.io.IOException;
-+import java.util.ArrayList;
-+import java.util.Collections;
-+import java.util.Comparator;
-+import java.util.List;
-+import java.util.UUID;
-+import java.util.stream.Collectors;
-+
-+final class UCDeltaRestCatalogUtils {
-+
-+  private UCDeltaRestCatalogUtils() {}
-+
-+  static LoadTableResponse loadTableViaLegacyApi(
-+      TablesApi tablesApi,
-+      ObjectMapper objectMapper,
-+      String catalog,
-+      String schema,
-+      String table) throws IOException, ApiException {
-+    final String fullName = String.format("%s.%s.%s", catalog, schema, table);
-+    TableInfo tableInfo = tablesApi.getTable(fullName, true, true);
-+    LoadTableResponse response = new LoadTableResponse();
-+    response.setMetadata(toDeltaTableMetadata(tableInfo, objectMapper));
-+    response.setCommits(Collections.emptyList());
-+    return response;
-+  }
-+
-+  private static io.unitycatalog.client.delta.model.TableMetadata toDeltaTableMetadata(
-+      TableInfo tableInfo,
-+      ObjectMapper objectMapper) throws IOException {
-+    io.unitycatalog.client.delta.model.TableMetadata metadata =
-+        new io.unitycatalog.client.delta.model.TableMetadata();
-+    metadata.setEtag("");
-+    metadata.setDataSourceFormat(io.unitycatalog.client.delta.model.DataSourceFormat.fromValue(
-+        tableInfo.getDataSourceFormat() != null ? tableInfo.getDataSourceFormat().getValue() : null));
-+    metadata.setTableType(io.unitycatalog.client.delta.model.TableType.fromValue(
-+        tableInfo.getTableType() != null ? tableInfo.getTableType().getValue() : null));
-+    metadata.setTableUuid(UUID.fromString(tableInfo.getTableId()));
-+    metadata.setLocation(tableInfo.getStorageLocation());
-+    metadata.setCreatedTime(tableInfo.getCreatedAt() != null ? tableInfo.getCreatedAt() : 0L);
-+    metadata.setUpdatedTime(tableInfo.getUpdatedAt() != null ? tableInfo.getUpdatedAt() : 0L);
-+    metadata.setSecurableType(io.unitycatalog.client.delta.model.SecurableType.TABLE);
-+    metadata.setColumns(toDeltaStructType(tableInfo.getColumns(), objectMapper));
-+    metadata.setPartitionColumns(toPartitionColumns(tableInfo.getColumns()));
-+    metadata.setProperties(
-+        tableInfo.getProperties() != null ? tableInfo.getProperties() : Collections.emptyMap());
-+    return metadata;
-+  }
-+
-+  private static io.unitycatalog.client.delta.model.StructType toDeltaStructType(
-+      List<ColumnInfo> columns,
-+      ObjectMapper objectMapper) throws IOException {
-+    io.unitycatalog.client.delta.model.StructType structType =
-+        new io.unitycatalog.client.delta.model.StructType();
-+    if (columns == null) {
-+      structType.setFields(Collections.emptyList());
-+      return structType;
-+    }
-+
-+    List<io.unitycatalog.client.delta.model.StructField> fields = new ArrayList<>();
-+    for (ColumnInfo column : columns) {
-+      io.unitycatalog.client.delta.model.StructField field =
-+          new io.unitycatalog.client.delta.model.StructField();
-+      field.setName(column.getName());
-+      field.setNullable(column.getNullable() == null || column.getNullable());
-+      field.setMetadata(Collections.emptyMap());
-+      field.setType(toDeltaType(column, objectMapper));
-+      fields.add(field);
-+    }
-+    structType.setFields(fields);
-+    return structType;
-+  }
-+
-+  private static io.unitycatalog.client.delta.model.DeltaType toDeltaType(
-+      ColumnInfo column,
-+      ObjectMapper objectMapper) throws IOException {
-+    if (column.getTypeJson() != null && !column.getTypeJson().isEmpty()) {
-+      try {
-+        return objectMapper.readValue(
-+            column.getTypeJson(),
-+            io.unitycatalog.client.delta.model.DeltaType.class);
-+      } catch (JsonProcessingException e) {
-+        throw new IOException(
-+            String.format("Failed to parse legacy column type JSON for column %s: %s",
-+                column.getName(), column.getTypeJson()),
-+            e);
-+      }
-+    }
-+
-+    if (column.getTypeText() != null && !column.getTypeText().isEmpty()) {
-+      io.unitycatalog.client.delta.model.PrimitiveType primitiveType =
-+          new io.unitycatalog.client.delta.model.PrimitiveType();
-+      primitiveType.setType(column.getTypeText());
-+      return primitiveType;
-+    }
-+
-+    throw new IOException(
-+        String.format("Legacy column %s is missing both type_json and type_text.", column.getName()));
-+  }
-+
-+  private static List<String> toPartitionColumns(List<ColumnInfo> columns) {
-+    if (columns == null) {
-+      return Collections.emptyList();
-+    }
-+
-+    return columns.stream()
-+        .filter(column -> column.getPartitionIndex() != null)
-+        .sorted(Comparator.comparingInt(ColumnInfo::getPartitionIndex))
-+        .map(ColumnInfo::getName)
-+        .collect(Collectors.toList());
-+  }
-+}
\ No newline at end of file
storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClient.java
@@ -1,141 +0,0 @@
-diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClient.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClient.java
---- a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClient.java
-+++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClient.java
- import io.unitycatalog.client.api.MetastoresApi;
- import io.unitycatalog.client.api.TablesApi;
- import io.unitycatalog.client.auth.TokenProvider;
-+import io.unitycatalog.client.delta.model.CredentialOperation;
-+import io.unitycatalog.client.delta.model.CredentialsResponse;
-+import io.unitycatalog.client.delta.model.LoadTableResponse;
- import io.unitycatalog.client.model.DeltaCommit;
- import io.unitycatalog.client.model.DeltaCommitInfo;
- import io.unitycatalog.client.model.DeltaCommitMetadataProperties;
-  * @see GetCommitsResponse
-  * @see TokenProvider
-  */
--public class UCTokenBasedRestClient implements UCClient {
-+public class UCTokenBasedRestClient implements UCDeltaClient {
- 
-+  private final ApiClient apiClient;
-   private DeltaCommitsApi deltaCommitsApi;
-   private MetastoresApi metastoresApi;
-   private TablesApi tablesApi;
-+  private io.unitycatalog.client.delta.api.TablesApi deltaTablesApi;
-+  private io.unitycatalog.client.delta.api.TemporaryCredentialsApi deltaTemporaryCredentialsApi;
- 
-   // HTTP status codes for error handling
-   private static final int HTTP_BAD_REQUEST = 400;
-       }
-     });
- 
--    ApiClient apiClient = builder.build();
-+    this.apiClient = builder.build();
-     this.deltaCommitsApi = new DeltaCommitsApi(apiClient);
-     this.metastoresApi = new MetastoresApi(apiClient);
-     this.tablesApi = new TablesApi(apiClient);
-+    this.deltaTablesApi = new io.unitycatalog.client.delta.api.TablesApi(apiClient);
-+    this.deltaTemporaryCredentialsApi =
-+        new io.unitycatalog.client.delta.api.TemporaryCredentialsApi(apiClient);
-+  }
-+
-+  public UCTokenBasedRestClient(ApiClient apiClient) {
-+    this(
-+        apiClient,
-+        Optional.of(new io.unitycatalog.client.delta.api.TablesApi(apiClient)));
-+  }
-+
-+  public UCTokenBasedRestClient(
-+      ApiClient apiClient,
-+      Optional<io.unitycatalog.client.delta.api.TablesApi> deltaTablesApi) {
-+    Objects.requireNonNull(apiClient, "apiClient must not be null");
-+    Objects.requireNonNull(deltaTablesApi, "deltaTablesApi must not be null");
-+    this.apiClient = apiClient;
-+    this.deltaCommitsApi = new DeltaCommitsApi(apiClient);
-+    this.metastoresApi = new MetastoresApi(apiClient);
-+    this.tablesApi = new TablesApi(apiClient);
-+    this.deltaTablesApi = deltaTablesApi.orElse(null);
-+    this.deltaTemporaryCredentialsApi = deltaTablesApi.isPresent()
-+        ? new io.unitycatalog.client.delta.api.TemporaryCredentialsApi(apiClient)
-+        : null;
-   }
- 
-   /**
-     if (deltaCommitsApi == null || metastoresApi == null || tablesApi == null) {
-       throw new IllegalStateException("UCTokenBasedRestClient has been closed.");
-     }
-+    if (deltaTablesApi != null && deltaTemporaryCredentialsApi == null) {
-+      throw new IllegalStateException("UCTokenBasedRestClient has been closed.");
-+    }
-   }
- 
-   @Override
-     }
-   }
- 
-+  @Override
-+  public LoadTableResponse loadTable(
-+      String catalog,
-+      String schema,
-+      String table) throws IOException {
-+    ensureOpen();
-+    Objects.requireNonNull(catalog, "catalog must not be null.");
-+    Objects.requireNonNull(schema, "schema must not be null.");
-+    Objects.requireNonNull(table, "table must not be null.");
-+
-+    try {
-+      if (deltaTablesApi == null) {
-+        return UCDeltaRestCatalogUtils.loadTableViaLegacyApi(
-+            tablesApi,
-+            apiClient.getObjectMapper(),
-+            catalog,
-+            schema,
-+            table);
-+      }
-+
-+      return deltaTablesApi.loadTable(catalog, schema, table);
-+    } catch (ApiException e) {
-+      String apiPath = deltaTablesApi == null ? "legacy UC API" : "DRC";
-+      throw new IOException(
-+          String.format("Failed to load table via %s (HTTP %s): %s",
-+              apiPath, e.getCode(), e.getResponseBody()),
-+          e);
-+    }
-+  }
-+
-+  @Override
-+  public CredentialsResponse getTableCredentials(
-+      CredentialOperation operation,
-+      String catalog,
-+      String schema,
-+      String table) throws IOException {
-+    ensureOpen();
-+    Objects.requireNonNull(operation, "operation must not be null.");
-+    Objects.requireNonNull(catalog, "catalog must not be null.");
-+    Objects.requireNonNull(schema, "schema must not be null.");
-+    Objects.requireNonNull(table, "table must not be null.");
-+
-+    try {
-+      if (deltaTablesApi == null) {
-+        throw new IOException("Legacy UC table credential vending is not implemented yet.");
-+      }
-+
-+      return deltaTemporaryCredentialsApi.getTableCredentials(operation, catalog, schema, table);
-+    } catch (ApiException e) {
-+      throw new IOException(
-+          String.format("Failed to get table credentials via DRC (HTTP %s): %s",
-+              e.getCode(), e.getResponseBody()),
-+          e);
-+    }
-+  }
-+
-   @Override
-   public void close() throws IOException {
-     // Nulling out the API instances makes them eligible for GC. Once garbage collected,
-     this.deltaCommitsApi = null;
-     this.metastoresApi = null;
-     this.tablesApi = null;
-+    this.deltaTablesApi = null;
-+    this.deltaTemporaryCredentialsApi = null;
-   }
- 
-   /**
\ No newline at end of file
storage/src/test/scala/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClientSuite.scala
@@ -1,123 +0,0 @@
-diff --git a/storage/src/test/scala/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClientSuite.scala b/storage/src/test/scala/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClientSuite.scala
---- a/storage/src/test/scala/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClientSuite.scala
-+++ b/storage/src/test/scala/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClientSuite.scala
- import io.delta.storage.commit.{Commit, CommitFailedException}
- import io.delta.storage.commit.actions.AbstractMetadata
- import io.delta.storage.commit.uniform.{IcebergMetadata, UniformMetadata}
-+import io.unitycatalog.client.ApiClientBuilder
- import io.unitycatalog.client.auth.TokenProvider
-+import io.unitycatalog.client.delta.model
- 
- import org.apache.hadoop.fs.{FileStatus, Path}
- import org.apache.http.HttpStatus
-   private var serverUri: String = _
-   private var metastoreHandler: HttpExchange => Unit = _
-   private var commitsHandler: HttpExchange => Unit = _
-+  private var legacyTablesHandler: HttpExchange => Unit = _
-   private val objectMapper = new ObjectMapper()
- 
-   override def beforeAll(): Unit = {
-       }
-       exchange.close()
-     })
-+    server.createContext("/api/2.1/unity-catalog/tables", exchange => {
-+      if (legacyTablesHandler != null) legacyTablesHandler(exchange)
-+      else sendJson(exchange, HttpStatus.SC_NOT_FOUND, "{}")
-+      exchange.close()
-+    })
-     server.start()
-     serverUri = s"http://localhost:${server.getAddress.getPort}"
-   }
-   override def beforeEach(): Unit = {
-     metastoreHandler = null
-     commitsHandler = null
-+    legacyTablesHandler = null
-   }
- 
-   private def readRequestBody(exchange: HttpExchange): String = {
-   private def createClient(): UCTokenBasedRestClient =
-     new UCTokenBasedRestClient(serverUri, createTokenProvider(), Collections.emptyMap())
- 
-+  private def createLegacyClient(): UCTokenBasedRestClient = {
-+    val apiClient = ApiClientBuilder.create()
-+      .uri(serverUri)
-+      .tokenProvider(createTokenProvider())
-+      .build()
-+    new UCTokenBasedRestClient(apiClient, Optional.empty())
-+  }
-+
-   private def withClient(fn: UCTokenBasedRestClient => Unit): Unit = {
-     val client = createClient()
-     try fn(client) finally client.close()
-   }
- 
-+  private def withLegacyClient(fn: UCTokenBasedRestClient => Unit): Unit = {
-+    val client = createLegacyClient()
-+    try fn(client) finally client.close()
-+  }
-+
-   private def createCommit(version: Long): Commit = {
-     val fs = new FileStatus(1024L, false, 1, 4096L, System.currentTimeMillis(),
-       new Path(s"/path/_delta_log/_staged_commits/$version.uuid.json"))
-     }
-   }
- 
-+  test("loadTable falls back to legacy UC API and converts metadata") {
-+    legacyTablesHandler = exchange => {
-+      assert(exchange.getRequestMethod === "GET")
-+      sendJson(
-+        exchange,
-+        HttpStatus.SC_OK,
-+        """{
-+          |  "name": "tbl",
-+          |  "catalog_name": "main",
-+          |  "schema_name": "default",
-+          |  "table_id": "11111111-1111-1111-1111-111111111111",
-+          |  "table_type": "MANAGED",
-+          |  "data_source_format": "DELTA",
-+          |  "storage_location": "s3://bucket/path/to/table",
-+          |  "created_at": 10,
-+          |  "updated_at": 11,
-+          |  "properties": {"delta.appendOnly":"true"},
-+          |  "columns": [
-+          |    {
-+          |      "name":"payload",
-+          |      "nullable":false,
-+          |      "position":0,
-+          |      "type_json":"{\"type\":\"struct\",\"fields\":[{\"name\":\"tags\",\"type\":{\"type\":\"array\",\"element-type\":\"string\",\"contains-null\":true},\"nullable\":true,\"metadata\":{\"comment\":\"nested tags\"}}]}"
-+          |    },
-+          |    {"name":"value","type_text":"string","nullable":true,"position":1},
-+          |    {"name":"region","type_text":"string","nullable":false,"position":2,"partition_index":1},
-+          |    {"name":"date","type_text":"date","nullable":false,"position":3,"partition_index":0}
-+          |  ]
-+          |}""".stripMargin)
-+    }
-+
-+    withLegacyClient { client =>
-+      val response = client.loadTable("main", "default", "tbl")
-+      val metadata = response.getMetadata
-+      val payloadField = metadata.getColumns.getFields.get(0)
-+      val payloadType = payloadField.getType.asInstanceOf[model.StructType]
-+      val nestedField = payloadType.getFields.get(0)
-+      val nestedType = nestedField.getType.asInstanceOf[model.ArrayType]
-+
-+      assert(metadata.getLocation === "s3://bucket/path/to/table")
-+      assert(metadata.getPartitionColumns === java.util.Arrays.asList("date", "region"))
-+      assert(metadata.getProperties.get("delta.appendOnly") === "true")
-+      assert(metadata.getColumns.getFields.size() === 4)
-+      assert(payloadField.getName === "payload")
-+      assert(!payloadField.getNullable)
-+      assert(metadata.getColumns.getFields.get(2).getName === "region")
-+      assert(!metadata.getColumns.getFields.get(2).getNullable)
-+      assert(nestedField.getName === "tags")
-+      assert(nestedField.getMetadata.get("comment") === "nested tags")
-+      assert(nestedType.getElementType.isInstanceOf[model.PrimitiveType])
-+      assert(nestedType.getContainsNull)
-+      assert(
-+        nestedType.getElementType.asInstanceOf[model.PrimitiveType].getType === "string")
-+    }
-+  }
-+
-   // commit tests
-   test("commit succeeds with valid parameters") {
-     withClient { client =>
\ No newline at end of file

Reproduce locally: git range-diff 5f1e465..a2b0fa0 1ca5b16..b91d3d0 | Disable: git config gitstack.push-range-diff false

@TimothyW553 TimothyW553 force-pushed the stack/drc-loadtable-catalog branch 19 times, most recently from e43d057 to 2dda66c Compare April 24, 2026 03:49
@TimothyW553 TimothyW553 force-pushed the stack/drc-loadtable-catalog branch 10 times, most recently from 4f79913 to 2c0be33 Compare April 27, 2026 01:38
}
Some(V1Table(toCatalogTable(ident, metadata, locationUri, credentials)))
case _ =>
None
Copy link
Copy Markdown
Collaborator

@yili-db yili-db Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to gracefully handle non-Delta passthrough even in Delta code I think. See AbstractDeltaCatalog.loadTable:

      val table = super.loadTable(ident)

      table match {
        case v1: V1Table if DeltaTableUtils.isDeltaTable(v1.catalogTable) =>
          loadCatalogTable(ident, v1.catalogTable)
        case o => o
      }

@tdas please correct me if I am wrong. This also means UC should provide limited support in loadTable to allow the non-Delta tables to be loaded via DRC.

s"Unsupported Delta REST table type for " +
s"$catalogName.${ident.namespace().mkString(".")}.${ident.name()}: $other")
},
storage = CatalogStorageFormat(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in UCSingleCatalog this is: CatalogStorageFormat.empty.copy(

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

compressed = false,
properties = toStorageProperties(metadata, credentials, locationUri.getScheme)),
schema = DeltaRestSchemaConverter.toSparkType(metadata.getColumns),
provider = Some(DeltaSourceUtils.ALT_NAME),
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should pick up from metadata

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, picked up from metadata and matched UCSingleCatalog:

provider = Some(metadata.getDataSourceFormat.getValue.toLowerCase(Locale.ROOT))

partitionColumnNames = Option(metadata.getPartitionColumns)
.map(_.asScala.toSeq)
.getOrElse(Nil),
properties = Map.empty)
Copy link
Copy Markdown
Collaborator

@yili-db yili-db Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

properties = Map.empty is unnecessary.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

// features here so the Delta load path receives them with the same option.* shape as other
// storage-level UC properties, while CatalogTable.properties stays reserved for Spark metadata.
Option(metadata.getProperties).map(_.asScala.toMap).getOrElse(Map.empty) ++
Map(UC_TABLE_ID_KEY -> metadata.getTableUuid.toString) ++
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For managed delta tables, UC_TABLE_ID_KEY should be in metadata.getProperties already. If not, it means it's an external table and doesn't need it anyway.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it. removed this to stop synthesizing UC_TABLE_ID_KEY

normalizedLocation.charAt(normalizedPrefix.length) == '/'))
}

private def storageCredentialToProperties(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function mirrors CredPropsUtil.createTableCredProps from UC-Spark which happens to be public. Should it just call CredPropsUtil.createTableCredProps instead?
Also @tdas please take a look.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we depend on UC-Spark? I was thinking of refactoring CredPropsUtil.createTableCredProps and other cloud-credential utils to uc-client

@TimothyW553 TimothyW553 force-pushed the stack/drc-loadtable-catalog branch from 2c0be33 to 041d54a Compare April 27, 2026 18:56
case _ => false
}

private def isNumeric(value: Any): Boolean = isIntegral(value) || (value match {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just make it isFloatOrDouble and remove the call to isIntegral

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


val spark = SparkSession.active

private lazy val deltaCatalogClient: DeltaCatalogClient = DeltaCatalogClient(delegate, spark)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you enable this in UC integration test in UnityCatalogSupport already?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, yes. i've just been enabling it via compiler flag. updated.

"For managed tables, path-based access should fail");
} else {
// For EXTERNAL tables, path-based access should work
// TODO: Enable remote external path reads after Delta wires DRC
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the next PR removes this as this PR doesn't handle path-based tables.

UCTokenBasedRestClientFactory
}

private class DeltaCatalogClient private (
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

class docs based on offline discussion. we need to have clear idea of the difference between this interface and UCClient and UCDeltaClient

Copy link
Copy Markdown
Collaborator Author

@TimothyW553 TimothyW553 May 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added class docs clarifying DeltaCatalogClient

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants