Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/unidoc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@
java-version: "17"
- uses: actions/checkout@f43a0e5ff2bd294095638e18286ca9a3d1956744 # v3.6.0
- name: generate unidoc
run: build/sbt -DuseDefaultUnityCatalogReleaseVersion=true "++ ${{ matrix.scala }}" unidoc
run: build/sbt "++ ${{ matrix.scala }}" unidoc
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -1171,6 +1171,8 @@ lazy val storage = (project in file("storage"))
// Note that the org.apache.hadoop.fs.s3a.Listing::createFileStatusListingIterator 3.3.1 API
// is not compatible with 3.3.2.
"org.apache.hadoop" % "hadoop-aws" % hadoopVersion % "provided",
"org.apache.hadoop" % "hadoop-azure" % hadoopVersion % "provided",
"com.google.cloud.bigdataoss" % "util-hadoop" % "3.0.2" % "provided",
"io.unitycatalog" % "unitycatalog-client" % unityCatalogVersion excludeAll(
ExclusionRule(organization = "org.openapitools"),
ExclusionRule(organization = "com.fasterxml.jackson.core"),
Expand Down
2 changes: 1 addition & 1 deletion project/scripts/setup_unitycatalog_main.sh
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ set -euo pipefail
# The pin. Bump both lines together if UC's version.sbt changed at the new SHA. build.sbt's
# `unityCatalogVersion` is obtained by running this script with `--print-version`, so these two
# values are the single source of truth.
UC_PIN_SHA=e3ab24e815b16a7614ff32044cf51067ef7ad16b
UC_PIN_SHA=30037019749a1b77461616f707cca297520a6988
UC_BASE_VERSION=0.5.0-SNAPSHOT
# ---------------------------------------------------------------------------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,10 @@ private SparkConf configureSparkWithUnityCatalog(SparkConf conf) {
String catalogName = uc.catalogName();
return conf.set("spark.sql.catalog." + catalogName, "io.unitycatalog.spark.UCSingleCatalog")
.set("spark.sql.catalog." + catalogName + ".uri", uc.serverUri())
.set("spark.sql.catalog." + catalogName + ".token", uc.serverToken());
.set("spark.sql.catalog." + catalogName + ".token", uc.serverToken())
.set(
"spark.sql.catalog." + catalogName + ".deltaRestApi.enabled",
String.valueOf(isUCDeltaRestCatalogApiEnabled()));
}

/** Stop the SparkSession after all tests. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@
*
* <p>Automatically starts a local Unity Catalog server before tests and stops it after. To use a
* remote server instead, set {@code UC_REMOTE=true} and configure {@code UC_URI}, {@code UC_TOKEN},
* {@code UC_CATALOG_NAME}, {@code UC_SCHEMA_NAME}, and {@code UC_BASE_TABLE_LOCATION}.
* {@code UC_CATALOG_NAME}, {@code UC_SCHEMA_NAME}, and {@code UC_BASE_TABLE_LOCATION}. Set {@code
* UC_DELTA_REST_CATALOG_API_ENABLED=false} to run the same tests through legacy UC Spark catalog
* behavior.
*
* <p>{@code unityCatalogInfo()} is the only API for subclasses, All other methods are internal
* implementation details.
Expand Down Expand Up @@ -129,12 +131,19 @@ public ApiClient createApiClient() {
public static final String UC_CATALOG_NAME = "UC_CATALOG_NAME";
public static final String UC_SCHEMA_NAME = "UC_SCHEMA_NAME";
public static final String UC_BASE_TABLE_LOCATION = "UC_BASE_TABLE_LOCATION";
public static final String UC_DELTA_REST_CATALOG_API_ENABLED =
"UC_DELTA_REST_CATALOG_API_ENABLED";

protected static boolean isUCRemoteConfigured() {
String ucRemote = System.getenv(UC_REMOTE);
return ucRemote != null && ucRemote.equalsIgnoreCase("true");
}

protected static boolean isUCDeltaRestCatalogApiEnabled() {
String deltaRestApiEnabled = System.getenv(UC_DELTA_REST_CATALOG_API_ENABLED);
return deltaRestApiEnabled == null || !deltaRestApiEnabled.equalsIgnoreCase("false");
}

/** The Unity Catalog info instance for subclasses access */
private UnityCatalogInfo ucInfo = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static io.sparkuctest.UnityCatalogSupport.UC_BASE_TABLE_LOCATION;
import static io.sparkuctest.UnityCatalogSupport.UC_CATALOG_NAME;
import static io.sparkuctest.UnityCatalogSupport.UC_DELTA_REST_CATALOG_API_ENABLED;
import static io.sparkuctest.UnityCatalogSupport.UC_REMOTE;
import static io.sparkuctest.UnityCatalogSupport.UC_SCHEMA_NAME;
import static io.sparkuctest.UnityCatalogSupport.UC_TOKEN;
Expand All @@ -37,7 +38,13 @@ public class UnityCatalogSupportTest {

private static final List<String> ALL_ENVS =
ImmutableList.of(
UC_REMOTE, UC_URI, UC_TOKEN, UC_CATALOG_NAME, UC_SCHEMA_NAME, UC_BASE_TABLE_LOCATION);
UC_REMOTE,
UC_URI,
UC_TOKEN,
UC_CATALOG_NAME,
UC_SCHEMA_NAME,
UC_BASE_TABLE_LOCATION,
UC_DELTA_REST_CATALOG_API_ENABLED);

@Test
public void testUnityCatalogInfo() throws Exception {
Expand Down Expand Up @@ -176,6 +183,23 @@ public void testNoBaseTableLocation() throws Exception {
});
}

@Test
public void testUCDeltaRestCatalogApiEnabledFromEnv() throws Exception {
withEnvTesting(
ImmutableMap.of(),
() -> {
TestingUCSupport uc = new TestingUCSupport();
assertThat(uc.isUCDeltaRestCatalogApiEnabledForTest()).isTrue();
});

withEnvTesting(
ImmutableMap.of(UC_DELTA_REST_CATALOG_API_ENABLED, "false"),
() -> {
TestingUCSupport uc = new TestingUCSupport();
assertThat(uc.isUCDeltaRestCatalogApiEnabledForTest()).isFalse();
});
}

public interface TestCall {

void call() throws Exception;
Expand Down Expand Up @@ -225,5 +249,9 @@ public UnityCatalogInfo accessUnityCatalogInfo() throws Exception {
setupServer();
return unityCatalogInfo();
}

public boolean isUCDeltaRestCatalogApiEnabledForTest() {
return isUCDeltaRestCatalogApiEnabled();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import io.delta.storage.commit.actions.AbstractMetadata;
import io.delta.storage.commit.actions.AbstractProtocol;
import io.delta.storage.commit.uniform.UniformMetadata;
import io.delta.storage.commit.uccommitcoordinator.UCDeltaModels.CredentialOperation;
import io.delta.storage.commit.uccommitcoordinator.UCDeltaModels.CredentialsResponse;

import java.io.IOException;
import java.net.URI;
Expand All @@ -42,6 +44,13 @@
*/
public interface UCClient extends AutoCloseable {

/**
* Returns whether this client can use UC Delta Rest Catalog API.
*/
default boolean supportsUCDeltaRestCatalogApi() {
return false;
}

/**
* Retrieves the metastore ID associated with this Unity Catalog instance.
*
Expand Down Expand Up @@ -171,6 +180,37 @@ void finalizeCreate(
List<ColumnDef> columns,
Map<String, String> properties) throws CommitFailedException;

/**
* Loads a Delta table from Unity Catalog through the UC Delta Rest Catalog API.
*
* <p>Implementations that do not support UC Delta Rest Catalog API should use the default
* implementation, which fails loudly so callers do not accidentally route UC Delta Rest Catalog
* API operations through a legacy-only client.
*/
default AbstractMetadata loadTable(
String catalog,
String schema,
String table) throws IOException {
throw new UnsupportedOperationException(
"loadTable requires UC Delta Rest Catalog API support.");
}

/**
* Gets temporary storage credentials for a table through the UC Delta Rest Catalog API.
*
* <p>Implementations that do not support UC Delta Rest Catalog API should use the default
* implementation, which fails loudly so callers do not accidentally treat legacy UC clients as
* credential-aware Delta clients.
*/
default CredentialsResponse getTableCredentials(
CredentialOperation operation,
String catalog,
String schema,
String table) throws IOException {
throw new UnsupportedOperationException(
"getTableCredentials requires UC Delta Rest Catalog API support.");
}

/**
* Closes any resources used by this client.
* This method should be called to properly release resources such as network
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright (2026) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.storage.commit.uccommitcoordinator;

import java.util.Collections;
import java.util.List;

/** Delta-owned models for UC Delta table credentials. */
public final class UCDeltaModels {
private UCDeltaModels() {}

public enum CredentialOperation {
READ,
READ_WRITE
}

public static final class CredentialsResponse {
private final List<StorageCredential> storageCredentials;

public CredentialsResponse(List<StorageCredential> storageCredentials) {
this.storageCredentials = storageCredentials;
}

public List<StorageCredential> getStorageCredentials() {
return storageCredentials == null ? Collections.emptyList() : storageCredentials;
}
}

public static final class StorageCredential {
private final String prefix;
private final CredentialOperation operation;
private final StorageCredentialConfig config;
private final Long expirationTimeMs;

public StorageCredential(
String prefix,
CredentialOperation operation,
StorageCredentialConfig config,
Long expirationTimeMs) {
this.prefix = prefix;
this.operation = operation;
this.config = config;
this.expirationTimeMs = expirationTimeMs;
}

public String getPrefix() {
return prefix;
}

public CredentialOperation getOperation() {
return operation;
}

public StorageCredentialConfig getConfig() {
return config;
}

public Long getExpirationTimeMs() {
return expirationTimeMs;
}
}

public static final class StorageCredentialConfig {
private final String s3AccessKeyId;
private final String s3SecretAccessKey;
private final String s3SessionToken;
private final String azureSasToken;
private final String gcsOauthToken;

public StorageCredentialConfig(
String s3AccessKeyId,
String s3SecretAccessKey,
String s3SessionToken,
String azureSasToken,
String gcsOauthToken) {
this.s3AccessKeyId = s3AccessKeyId;
this.s3SecretAccessKey = s3SecretAccessKey;
this.s3SessionToken = s3SessionToken;
this.azureSasToken = azureSasToken;
this.gcsOauthToken = gcsOauthToken;
}

public String getS3AccessKeyId() {
return s3AccessKeyId;
}

public String getS3SecretAccessKey() {
return s3SecretAccessKey;
}

public String getS3SessionToken() {
return s3SessionToken;
}

public String getAzureSasToken() {
return azureSasToken;
}

public String getGcsOauthToken() {
return gcsOauthToken;
}
}
}
Loading
Loading