diff --git a/.github/workflows/unidoc.yaml b/.github/workflows/unidoc.yaml index 73e061fa06d..b70fdc96035 100644 --- a/.github/workflows/unidoc.yaml +++ b/.github/workflows/unidoc.yaml @@ -20,4 +20,4 @@ java-version: "17" - uses: actions/checkout@f43a0e5ff2bd294095638e18286ca9a3d1956744 # v3.6.0 - name: generate unidoc - run: build/sbt -DuseDefaultUnityCatalogReleaseVersion=true "++ ${{ matrix.scala }}" unidoc + run: build/sbt "++ ${{ matrix.scala }}" unidoc diff --git a/build.sbt b/build.sbt index bb837c72321..7101a570a35 100644 --- a/build.sbt +++ b/build.sbt @@ -1171,6 +1171,8 @@ lazy val storage = (project in file("storage")) // Note that the org.apache.hadoop.fs.s3a.Listing::createFileStatusListingIterator 3.3.1 API // is not compatible with 3.3.2. "org.apache.hadoop" % "hadoop-aws" % hadoopVersion % "provided", + "org.apache.hadoop" % "hadoop-azure" % hadoopVersion % "provided", + "com.google.cloud.bigdataoss" % "util-hadoop" % "3.0.2" % "provided", "io.unitycatalog" % "unitycatalog-client" % unityCatalogVersion excludeAll( ExclusionRule(organization = "org.openapitools"), ExclusionRule(organization = "com.fasterxml.jackson.core"), diff --git a/project/scripts/setup_unitycatalog_main.sh b/project/scripts/setup_unitycatalog_main.sh index 6f8faa728f6..2e5cc01e463 100755 --- a/project/scripts/setup_unitycatalog_main.sh +++ b/project/scripts/setup_unitycatalog_main.sh @@ -57,7 +57,7 @@ set -euo pipefail # The pin. Bump both lines together if UC's version.sbt changed at the new SHA. build.sbt's # `unityCatalogVersion` is obtained by running this script with `--print-version`, so these two # values are the single source of truth. -UC_PIN_SHA=e3ab24e815b16a7614ff32044cf51067ef7ad16b +UC_PIN_SHA=30037019749a1b77461616f707cca297520a6988 UC_BASE_VERSION=0.5.0-SNAPSHOT # --------------------------------------------------------------------------------------------- diff --git a/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableIntegrationBaseTest.java b/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableIntegrationBaseTest.java index 0139e56c093..0b5cce98449 100644 --- a/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableIntegrationBaseTest.java +++ b/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableIntegrationBaseTest.java @@ -144,7 +144,10 @@ private SparkConf configureSparkWithUnityCatalog(SparkConf conf) { String catalogName = uc.catalogName(); return conf.set("spark.sql.catalog." + catalogName, "io.unitycatalog.spark.UCSingleCatalog") .set("spark.sql.catalog." + catalogName + ".uri", uc.serverUri()) - .set("spark.sql.catalog." + catalogName + ".token", uc.serverToken()); + .set("spark.sql.catalog." + catalogName + ".token", uc.serverToken()) + .set( + "spark.sql.catalog." + catalogName + ".deltaRestApi.enabled", + String.valueOf(isUCDeltaRestCatalogApiEnabled())); } /** Stop the SparkSession after all tests. */ diff --git a/spark/unitycatalog/src/test/java/io/sparkuctest/UnityCatalogSupport.java b/spark/unitycatalog/src/test/java/io/sparkuctest/UnityCatalogSupport.java index c6633febd78..93cba86dd86 100644 --- a/spark/unitycatalog/src/test/java/io/sparkuctest/UnityCatalogSupport.java +++ b/spark/unitycatalog/src/test/java/io/sparkuctest/UnityCatalogSupport.java @@ -45,7 +45,9 @@ * *

Automatically starts a local Unity Catalog server before tests and stops it after. To use a * remote server instead, set {@code UC_REMOTE=true} and configure {@code UC_URI}, {@code UC_TOKEN}, - * {@code UC_CATALOG_NAME}, {@code UC_SCHEMA_NAME}, and {@code UC_BASE_TABLE_LOCATION}. + * {@code UC_CATALOG_NAME}, {@code UC_SCHEMA_NAME}, and {@code UC_BASE_TABLE_LOCATION}. Set {@code + * UC_DELTA_REST_CATALOG_API_ENABLED=false} to run the same tests through legacy UC Spark catalog + * behavior. * *

{@code unityCatalogInfo()} is the only API for subclasses, All other methods are internal * implementation details. @@ -129,12 +131,19 @@ public ApiClient createApiClient() { public static final String UC_CATALOG_NAME = "UC_CATALOG_NAME"; public static final String UC_SCHEMA_NAME = "UC_SCHEMA_NAME"; public static final String UC_BASE_TABLE_LOCATION = "UC_BASE_TABLE_LOCATION"; + public static final String UC_DELTA_REST_CATALOG_API_ENABLED = + "UC_DELTA_REST_CATALOG_API_ENABLED"; protected static boolean isUCRemoteConfigured() { String ucRemote = System.getenv(UC_REMOTE); return ucRemote != null && ucRemote.equalsIgnoreCase("true"); } + protected static boolean isUCDeltaRestCatalogApiEnabled() { + String deltaRestApiEnabled = System.getenv(UC_DELTA_REST_CATALOG_API_ENABLED); + return deltaRestApiEnabled == null || !deltaRestApiEnabled.equalsIgnoreCase("false"); + } + /** The Unity Catalog info instance for subclasses access */ private UnityCatalogInfo ucInfo = null; diff --git a/spark/unitycatalog/src/test/java/io/sparkuctest/UnityCatalogSupportTest.java b/spark/unitycatalog/src/test/java/io/sparkuctest/UnityCatalogSupportTest.java index 5ad795cae0d..92b78367d73 100644 --- a/spark/unitycatalog/src/test/java/io/sparkuctest/UnityCatalogSupportTest.java +++ b/spark/unitycatalog/src/test/java/io/sparkuctest/UnityCatalogSupportTest.java @@ -18,6 +18,7 @@ import static io.sparkuctest.UnityCatalogSupport.UC_BASE_TABLE_LOCATION; import static io.sparkuctest.UnityCatalogSupport.UC_CATALOG_NAME; +import static io.sparkuctest.UnityCatalogSupport.UC_DELTA_REST_CATALOG_API_ENABLED; import static io.sparkuctest.UnityCatalogSupport.UC_REMOTE; import static io.sparkuctest.UnityCatalogSupport.UC_SCHEMA_NAME; import static io.sparkuctest.UnityCatalogSupport.UC_TOKEN; @@ -37,7 +38,13 @@ public class UnityCatalogSupportTest { private static final List ALL_ENVS = ImmutableList.of( - UC_REMOTE, UC_URI, UC_TOKEN, UC_CATALOG_NAME, UC_SCHEMA_NAME, UC_BASE_TABLE_LOCATION); + UC_REMOTE, + UC_URI, + UC_TOKEN, + UC_CATALOG_NAME, + UC_SCHEMA_NAME, + UC_BASE_TABLE_LOCATION, + UC_DELTA_REST_CATALOG_API_ENABLED); @Test public void testUnityCatalogInfo() throws Exception { @@ -176,6 +183,23 @@ public void testNoBaseTableLocation() throws Exception { }); } + @Test + public void testUCDeltaRestCatalogApiEnabledFromEnv() throws Exception { + withEnvTesting( + ImmutableMap.of(), + () -> { + TestingUCSupport uc = new TestingUCSupport(); + assertThat(uc.isUCDeltaRestCatalogApiEnabledForTest()).isTrue(); + }); + + withEnvTesting( + ImmutableMap.of(UC_DELTA_REST_CATALOG_API_ENABLED, "false"), + () -> { + TestingUCSupport uc = new TestingUCSupport(); + assertThat(uc.isUCDeltaRestCatalogApiEnabledForTest()).isFalse(); + }); + } + public interface TestCall { void call() throws Exception; @@ -225,5 +249,9 @@ public UnityCatalogInfo accessUnityCatalogInfo() throws Exception { setupServer(); return unityCatalogInfo(); } + + public boolean isUCDeltaRestCatalogApiEnabledForTest() { + return isUCDeltaRestCatalogApiEnabled(); + } } } diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCClient.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCClient.java index 17a526762f9..32055d9983b 100644 --- a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCClient.java +++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCClient.java @@ -22,6 +22,8 @@ import io.delta.storage.commit.actions.AbstractMetadata; import io.delta.storage.commit.actions.AbstractProtocol; import io.delta.storage.commit.uniform.UniformMetadata; +import io.delta.storage.commit.uccommitcoordinator.UCDeltaModels.CredentialOperation; +import io.delta.storage.commit.uccommitcoordinator.UCDeltaModels.CredentialsResponse; import java.io.IOException; import java.net.URI; @@ -42,6 +44,13 @@ */ public interface UCClient extends AutoCloseable { + /** + * Returns whether this client can use UC Delta Rest Catalog API. + */ + default boolean supportsUCDeltaRestCatalogApi() { + return false; + } + /** * Retrieves the metastore ID associated with this Unity Catalog instance. * @@ -171,6 +180,37 @@ void finalizeCreate( List columns, Map properties) throws CommitFailedException; + /** + * Loads a Delta table from Unity Catalog through the UC Delta Rest Catalog API. + * + *

Implementations that do not support UC Delta Rest Catalog API should use the default + * implementation, which fails loudly so callers do not accidentally route UC Delta Rest Catalog + * API operations through a legacy-only client. + */ + default AbstractMetadata loadTable( + String catalog, + String schema, + String table) throws IOException { + throw new UnsupportedOperationException( + "loadTable requires UC Delta Rest Catalog API support."); + } + + /** + * Gets temporary storage credentials for a table through the UC Delta Rest Catalog API. + * + *

Implementations that do not support UC Delta Rest Catalog API should use the default + * implementation, which fails loudly so callers do not accidentally treat legacy UC clients as + * credential-aware Delta clients. + */ + default CredentialsResponse getTableCredentials( + CredentialOperation operation, + String catalog, + String schema, + String table) throws IOException { + throw new UnsupportedOperationException( + "getTableCredentials requires UC Delta Rest Catalog API support."); + } + /** * Closes any resources used by this client. * This method should be called to properly release resources such as network diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaModels.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaModels.java new file mode 100644 index 00000000000..b84ebb087c6 --- /dev/null +++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaModels.java @@ -0,0 +1,117 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.storage.commit.uccommitcoordinator; + +import java.util.Collections; +import java.util.List; + +/** Delta-owned models for UC Delta table credentials. */ +public final class UCDeltaModels { + private UCDeltaModels() {} + + public enum CredentialOperation { + READ, + READ_WRITE + } + + public static final class CredentialsResponse { + private final List storageCredentials; + + public CredentialsResponse(List storageCredentials) { + this.storageCredentials = storageCredentials; + } + + public List getStorageCredentials() { + return storageCredentials == null ? Collections.emptyList() : storageCredentials; + } + } + + public static final class StorageCredential { + private final String prefix; + private final CredentialOperation operation; + private final StorageCredentialConfig config; + private final Long expirationTimeMs; + + public StorageCredential( + String prefix, + CredentialOperation operation, + StorageCredentialConfig config, + Long expirationTimeMs) { + this.prefix = prefix; + this.operation = operation; + this.config = config; + this.expirationTimeMs = expirationTimeMs; + } + + public String getPrefix() { + return prefix; + } + + public CredentialOperation getOperation() { + return operation; + } + + public StorageCredentialConfig getConfig() { + return config; + } + + public Long getExpirationTimeMs() { + return expirationTimeMs; + } + } + + public static final class StorageCredentialConfig { + private final String s3AccessKeyId; + private final String s3SecretAccessKey; + private final String s3SessionToken; + private final String azureSasToken; + private final String gcsOauthToken; + + public StorageCredentialConfig( + String s3AccessKeyId, + String s3SecretAccessKey, + String s3SessionToken, + String azureSasToken, + String gcsOauthToken) { + this.s3AccessKeyId = s3AccessKeyId; + this.s3SecretAccessKey = s3SecretAccessKey; + this.s3SessionToken = s3SessionToken; + this.azureSasToken = azureSasToken; + this.gcsOauthToken = gcsOauthToken; + } + + public String getS3AccessKeyId() { + return s3AccessKeyId; + } + + public String getS3SecretAccessKey() { + return s3SecretAccessKey; + } + + public String getS3SessionToken() { + return s3SessionToken; + } + + public String getAzureSasToken() { + return azureSasToken; + } + + public String getGcsOauthToken() { + return gcsOauthToken; + } + } +} diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClient.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClient.java index 1325935a39a..d76496bf22a 100644 --- a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClient.java +++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClient.java @@ -24,6 +24,9 @@ import io.delta.storage.commit.actions.AbstractProtocol; import io.delta.storage.commit.uniform.IcebergMetadata; import io.delta.storage.commit.uniform.UniformMetadata; +import io.delta.storage.commit.uccommitcoordinator.UCDeltaModels.CredentialOperation; +import io.delta.storage.commit.uccommitcoordinator.UCDeltaModels.CredentialsResponse; +import io.unitycatalog.client.JSON; import io.unitycatalog.client.ApiClient; import io.unitycatalog.client.ApiClientBuilder; import io.unitycatalog.client.ApiException; @@ -31,6 +34,11 @@ import io.unitycatalog.client.api.MetastoresApi; import io.unitycatalog.client.api.TablesApi; import io.unitycatalog.client.auth.TokenProvider; +import io.unitycatalog.client.delta.api.ConfigurationApi; +import io.unitycatalog.client.delta.api.TemporaryCredentialsApi; +import io.unitycatalog.client.delta.model.CatalogConfig; +import io.unitycatalog.client.delta.model.StructType; +import io.unitycatalog.client.delta.model.TableMetadata; import io.unitycatalog.client.model.DeltaCommit; import io.unitycatalog.client.model.DeltaCommitInfo; import io.unitycatalog.client.model.DeltaCommitMetadataProperties; @@ -47,8 +55,11 @@ import io.unitycatalog.client.model.TableType; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.UncheckedIOException; import java.net.URI; import java.util.*; @@ -62,6 +73,8 @@ *

  • Retrieving metastore information
  • *
  • Committing changes to Delta tables
  • *
  • Fetching unbackfilled commit histories
  • + *
  • Loading table metadata through the UC Delta Rest Catalog API
  • + *
  • Getting table credentials through the UC Delta Rest Catalog API
  • * * *

    All requests are authenticated using a TokenProvider that generates Bearer tokens dynamically. @@ -84,15 +97,27 @@ */ public class UCTokenBasedRestClient implements UCClient { + private static final Logger LOG = LoggerFactory.getLogger(UCTokenBasedRestClient.class); + + private final boolean supportsUCDeltaRestCatalogApi; private DeltaCommitsApi deltaCommitsApi; private MetastoresApi metastoresApi; private TablesApi tablesApi; + private io.unitycatalog.client.delta.api.TablesApi deltaTablesApi; + private TemporaryCredentialsApi deltaTemporaryCredentialsApi; // HTTP status codes for error handling private static final int HTTP_BAD_REQUEST = 400; private static final int HTTP_NOT_FOUND = 404; private static final int HTTP_CONFLICT = 409; private static final int HTTP_TOO_MANY_REQUESTS = 429; + private static final String UC_DELTA_API_PROTOCOL_VERSION = "1.0"; + // Endpoint identifiers advertised by the UC Delta Rest Catalog API /config endpoint, not + // concrete URLs. + private static final List REQUIRED_UC_DELTA_API_ENDPOINT_IDS = + Arrays.asList( + "GET /v1/catalogs/{catalog}/schemas/{schema}/tables/{table}", + "GET /v1/catalogs/{catalog}/schemas/{schema}/tables/{table}/credentials"); /** * Constructs a new UCTokenBasedRestClient with the specified base URI, TokenProvider, @@ -108,6 +133,35 @@ public UCTokenBasedRestClient( String baseUri, TokenProvider tokenProvider, Map appVersions) { + this(buildApiClient(baseUri, tokenProvider, appVersions), true); + } + + /** + * Constructs a new UCTokenBasedRestClient and probes whether the catalog supports UC Delta Rest + * Catalog API. + * + *

    This constructor issues a network request to the UC Delta Rest Catalog API config endpoint. + * + *

    When the config endpoint is absent or does not advertise the required UC Delta Rest Catalog + * API endpoints, this client reports {@link #supportsUCDeltaRestCatalogApi()} as false. UC Delta + * Rest Catalog API table methods then throw {@link UnsupportedOperationException}. + */ + public UCTokenBasedRestClient( + String baseUri, + TokenProvider tokenProvider, + Map appVersions, + String catalog) { + this(buildApiClient(baseUri, tokenProvider, appVersions), catalog); + } + + private UCTokenBasedRestClient(ApiClient apiClient, String catalog) { + this(apiClient, supportsUCDeltaRestCatalogApi(apiClient, catalog)); + } + + private static ApiClient buildApiClient( + String baseUri, + TokenProvider tokenProvider, + Map appVersions) { Objects.requireNonNull(baseUri, "baseUri must not be null"); Objects.requireNonNull(tokenProvider, "tokenProvider must not be null"); Objects.requireNonNull(appVersions, "appVersions must not be null"); @@ -122,10 +176,62 @@ public UCTokenBasedRestClient( } }); - ApiClient apiClient = builder.build(); + return builder.build(); + } + + private static boolean supportsUCDeltaRestCatalogApi(ApiClient apiClient, String catalog) { + Objects.requireNonNull(apiClient, "apiClient must not be null"); + Objects.requireNonNull(catalog, "catalog must not be null"); + try { + CatalogConfig config = + new ConfigurationApi(apiClient).getConfig(catalog, UC_DELTA_API_PROTOCOL_VERSION); + return config != null && + config.getEndpoints() != null && + config.getEndpoints().containsAll(REQUIRED_UC_DELTA_API_ENDPOINT_IDS); + } catch (ApiException e) { + if (e.getCode() == HTTP_NOT_FOUND) { + LOG.warn( + "UC Delta Rest Catalog API config endpoint is unavailable for catalog {}. " + + "UC Delta Rest Catalog API will be disabled.", + catalog, + e); + return false; + } + throw new IllegalArgumentException( + String.format( + "Failed to determine UC Delta Rest Catalog API support for catalog %s (HTTP %s): %s", + catalog, + e.getCode(), + e.getResponseBody()), + e); + } + } + + /** + * Builds a client around an existing UC {@link ApiClient}. + * + *

    If {@code supportsUCDeltaRestCatalogApi} is false, UC Delta Rest Catalog API table methods + * are unsupported. Commit-coordinator methods continue to use the legacy UC APIs. + */ + private UCTokenBasedRestClient( + ApiClient apiClient, + boolean supportsUCDeltaRestCatalogApi) { + Objects.requireNonNull(apiClient, "apiClient must not be null"); + this.supportsUCDeltaRestCatalogApi = supportsUCDeltaRestCatalogApi; this.deltaCommitsApi = new DeltaCommitsApi(apiClient); this.metastoresApi = new MetastoresApi(apiClient); this.tablesApi = new TablesApi(apiClient); + this.deltaTablesApi = supportsUCDeltaRestCatalogApi + ? new io.unitycatalog.client.delta.api.TablesApi(apiClient) + : null; + this.deltaTemporaryCredentialsApi = supportsUCDeltaRestCatalogApi + ? new TemporaryCredentialsApi(apiClient) + : null; + } + + @Override + public boolean supportsUCDeltaRestCatalogApi() { + return supportsUCDeltaRestCatalogApi; } /** @@ -137,6 +243,14 @@ private void ensureOpen() { } } + private void ensureUCDeltaRestCatalogApiSupported(String operation) { + ensureOpen(); + if (!supportsUCDeltaRestCatalogApi) { + throw new UnsupportedOperationException( + operation + " requires UC Delta Rest Catalog API support."); + } + } + @Override public String getMetastoreId() throws IOException { ensureOpen(); @@ -228,6 +342,235 @@ public GetCommitsResponse getCommits( } } + /** + * Loads one table from Unity Catalog. + * + *

    This uses the UC Delta Rest Catalog API. Callers that need legacy UC loadTable behavior + * should use the existing catalog path instead of this API-only method. + */ + @Override + public AbstractMetadata loadTable( + String catalog, + String schema, + String table) throws IOException { + ensureUCDeltaRestCatalogApiSupported("loadTable"); + Objects.requireNonNull(catalog, "catalog must not be null."); + Objects.requireNonNull(schema, "schema must not be null."); + Objects.requireNonNull(table, "table must not be null."); + + try { + io.unitycatalog.client.delta.model.LoadTableResponse response = + deltaTablesApi.loadTable(catalog, schema, table); + return response == null ? null : toTableMetadata(response.getMetadata()); + } catch (ApiException e) { + throw new IOException( + String.format( + "Failed to load table %s.%s.%s via UC Delta Rest Catalog API (HTTP %s): %s", + catalog, + schema, + table, + e.getCode(), + e.getResponseBody()), + e); + } + } + + /** + * Gets temporary credentials for one table through the UC Delta Rest Catalog API. + * + *

    This method is only supported when this client has the UC Delta Rest Catalog API. + */ + @Override + public CredentialsResponse getTableCredentials( + CredentialOperation operation, + String catalog, + String schema, + String table) throws IOException { + ensureUCDeltaRestCatalogApiSupported("getTableCredentials"); + Objects.requireNonNull(operation, "operation must not be null."); + Objects.requireNonNull(catalog, "catalog must not be null."); + Objects.requireNonNull(schema, "schema must not be null."); + Objects.requireNonNull(table, "table must not be null."); + + try { + return toCredentialsResponse(deltaTemporaryCredentialsApi.getTableCredentials( + toSdkCredentialOperation(operation), + catalog, + schema, + table)); + } catch (ApiException e) { + throw new IOException( + String.format( + "Failed to get table credentials for %s.%s.%s (HTTP %s): %s", + catalog, + schema, + table, + e.getCode(), + e.getResponseBody()), + e); + } + } + + private static TableMetadataAdapter toTableMetadata(TableMetadata metadata) { + if (metadata == null) { + return null; + } + return new TableMetadataAdapter(metadata); + } + + private static CredentialsResponse toCredentialsResponse( + io.unitycatalog.client.delta.model.CredentialsResponse response) { + if (response == null) { + return null; + } + List credentials = new ArrayList<>(); + if (response.getStorageCredentials() != null) { + for (io.unitycatalog.client.delta.model.StorageCredential credential : + response.getStorageCredentials()) { + credentials.add(toStorageCredential(credential)); + } + } + return new CredentialsResponse(credentials); + } + + private static UCDeltaModels.StorageCredential toStorageCredential( + io.unitycatalog.client.delta.model.StorageCredential credential) { + if (credential == null) { + return null; + } + return new UCDeltaModels.StorageCredential( + credential.getPrefix(), + toCredentialOperation(credential.getOperation()), + toStorageCredentialConfig(credential.getConfig()), + credential.getExpirationTimeMs()); + } + + private static UCDeltaModels.StorageCredentialConfig toStorageCredentialConfig( + io.unitycatalog.client.delta.model.StorageCredentialConfig config) { + if (config == null) { + return null; + } + return new UCDeltaModels.StorageCredentialConfig( + config.getS3AccessKeyId(), + config.getS3SecretAccessKey(), + config.getS3SessionToken(), + config.getAzureSasToken(), + config.getGcsOauthToken()); + } + + private static CredentialOperation toCredentialOperation( + io.unitycatalog.client.delta.model.CredentialOperation operation) { + if (operation == null) { + return null; + } + switch (operation) { + case READ: + return CredentialOperation.READ; + case READ_WRITE: + return CredentialOperation.READ_WRITE; + default: + throw new IllegalArgumentException("Unsupported UC Delta credential operation: " + operation); + } + } + + private static io.unitycatalog.client.delta.model.CredentialOperation toSdkCredentialOperation( + CredentialOperation operation) { + switch (operation) { + case READ: + return io.unitycatalog.client.delta.model.CredentialOperation.READ; + case READ_WRITE: + return io.unitycatalog.client.delta.model.CredentialOperation.READ_WRITE; + default: + throw new IllegalArgumentException("Unsupported UC Delta credential operation: " + operation); + } + } + + /** + * Adapts UC Delta SDK table metadata to Delta's storage-level metadata interface. + * + *

    The UC SDK schema stays hidden behind this OSS-only client implementation. DRC-aware + * callers can access the raw UC SDK schema through {@link #getSchema()} and convert it directly + * to their local schema representation. + */ + public static final class TableMetadataAdapter implements AbstractMetadata { + private final TableMetadata delegate; + private volatile String cachedSchemaJson; + + private TableMetadataAdapter(TableMetadata delegate) { + this.delegate = Objects.requireNonNull(delegate, "delegate must not be null."); + } + + public StructType getSchema() { + return delegate.getColumns(); + } + + public String getTableType() { + return delegate.getTableType() == null ? null : delegate.getTableType().getValue(); + } + + public UUID getTableUuid() { + return delegate.getTableUuid(); + } + + public String getLocation() { + return delegate.getLocation(); + } + + @Override + public String getId() { + return delegate.getTableUuid() == null ? null : delegate.getTableUuid().toString(); + } + + @Override + public String getName() { + return null; + } + + @Override + public String getDescription() { + return null; + } + + @Override + public String getProvider() { + return delegate.getDataSourceFormat() == null + ? null + : delegate.getDataSourceFormat().getValue().toLowerCase(Locale.ROOT); + } + + @Override + public Map getFormatOptions() { + return Collections.emptyMap(); + } + + @Override + public String getSchemaString() { + if (cachedSchemaJson == null && delegate.getColumns() != null) { + try { + cachedSchemaJson = JSON.getDefault().getMapper().writeValueAsString(delegate.getColumns()); + } catch (IOException e) { + throw new UncheckedIOException("Failed to serialize UC Delta schema to JSON.", e); + } + } + return cachedSchemaJson; + } + + @Override + public List getPartitionColumns() { + return delegate.getPartitionColumns(); + } + + @Override + public Map getConfiguration() { + return delegate.getProperties(); + } + + @Override + public Long getCreatedTime() { + return delegate.getCreatedTime(); + } + } + @Override public void close() throws IOException { // Nulling out the API instances makes them eligible for GC. Once garbage collected, @@ -235,6 +578,8 @@ public void close() throws IOException { this.deltaCommitsApi = null; this.metastoresApi = null; this.tablesApi = null; + this.deltaTablesApi = null; + this.deltaTemporaryCredentialsApi = null; } /** diff --git a/storage/src/main/java/io/delta/storage/unitycatalog/hadoop/CredPropsUtil.java b/storage/src/main/java/io/delta/storage/unitycatalog/hadoop/CredPropsUtil.java new file mode 100644 index 00000000000..5cce77beddb --- /dev/null +++ b/storage/src/main/java/io/delta/storage/unitycatalog/hadoop/CredPropsUtil.java @@ -0,0 +1,643 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.storage.unitycatalog.hadoop; + +import static io.delta.storage.unitycatalog.hadoop.UCDeltaRestCatalogApiCredentialConf.FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME; +import static io.delta.storage.unitycatalog.hadoop.UCDeltaRestCatalogApiCredentialConf.FS_AZURE_ACCOUNT_IS_HNS_ENABLED; +import static io.delta.storage.unitycatalog.hadoop.UCDeltaRestCatalogApiCredentialConf.FS_AZURE_SAS_TOKEN_PROVIDER_TYPE; + +import io.delta.storage.commit.uccommitcoordinator.UCDeltaModels.CredentialOperation; +import io.delta.storage.commit.uccommitcoordinator.UCDeltaModels.StorageCredential; +import io.delta.storage.commit.uccommitcoordinator.UCDeltaModels.StorageCredentialConfig; +import io.unitycatalog.client.auth.TokenProvider; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; + +public class CredPropsUtil { + // Keep these as strings so callers can build credential options without loading cloud SDKs. + private static final String ABFS_VENDED_TOKEN_PROVIDER_CLASS = + "io.delta.storage.unitycatalog.hadoop.credentials.AbfsVendedTokenProvider"; + private static final String AWS_VENDED_TOKEN_PROVIDER_CLASS = + "io.delta.storage.unitycatalog.hadoop.credentials.AwsVendedTokenProvider"; + private static final String GCS_VENDED_TOKEN_PROVIDER_CLASS = + "io.delta.storage.unitycatalog.hadoop.credentials.GcsVendedTokenProvider"; + private static final String CRED_SCOPED_FILE_SYSTEM_CLASS = + "io.delta.storage.unitycatalog.hadoop.fs.CredScopedFileSystem"; + private static final String CRED_SCOPED_FS_CLASS = + "io.delta.storage.unitycatalog.hadoop.fs.CredScopedFs"; + private static final String AZURE_ACCESS_TOKEN_KEY = "fs.azure.sas.fixed.token"; + private static final String GCS_ACCESS_TOKEN_KEY = "fs.gs.auth.access.token.credential"; + private static final String GCS_ACCESS_TOKEN_EXPIRATION_KEY = + "fs.gs.auth.access.token.expiration"; + + private CredPropsUtil() {} + + private abstract static class PropsBuilder> { + private final Map builder = new LinkedHashMap<>(); + + public T set(String key, String value) { + builder.put(key, value); + return self(); + } + + public T uri(String uri) { + builder.put(UCDeltaRestCatalogApiCredentialConf.UC_URI_KEY, uri); + return self(); + } + + public T tokenProvider(TokenProvider tokenProvider) { + tokenProvider + .configs() + .forEach( + (key, value) -> + builder.put(UCDeltaRestCatalogApiCredentialConf.UC_AUTH_PREFIX + key, value)); + return self(); + } + + public T uid(String uid) { + builder.put(UCDeltaRestCatalogApiCredentialConf.UC_CREDENTIALS_UID_KEY, uid); + return self(); + } + + public T credentialType(String credType) { + if (!UCDeltaRestCatalogApiCredentialConf.UC_CREDENTIALS_TYPE_PATH_VALUE.equals(credType) + && !UCDeltaRestCatalogApiCredentialConf.UC_CREDENTIALS_TYPE_TABLE_VALUE.equals(credType)) { + throw new IllegalArgumentException( + String.format("Invalid credential type '%s', must be either 'path' or 'table'.", credType)); + } + builder.put(UCDeltaRestCatalogApiCredentialConf.UC_CREDENTIALS_TYPE_KEY, credType); + return self(); + } + + public T tableId(String tableId) { + builder.put(UCDeltaRestCatalogApiCredentialConf.UC_TABLE_ID_KEY, tableId); + return self(); + } + + public T table(String catalog, String schema, String table, String location) { + builder.put(UCDeltaRestCatalogApiCredentialConf.UC_TABLE_CATALOG_KEY, catalog); + builder.put(UCDeltaRestCatalogApiCredentialConf.UC_TABLE_SCHEMA_KEY, schema); + builder.put(UCDeltaRestCatalogApiCredentialConf.UC_TABLE_NAME_KEY, table); + builder.put(UCDeltaRestCatalogApiCredentialConf.UC_TABLE_LOCATION_KEY, location); + return self(); + } + + public T tableOperation(CredentialOperation tableOp) { + builder.put( + UCDeltaRestCatalogApiCredentialConf.UC_TABLE_OPERATION_KEY, + toTableOperationValue(tableOp)); + return self(); + } + + public T path(String path) { + builder.put(UCDeltaRestCatalogApiCredentialConf.UC_PATH_KEY, path); + return self(); + } + + public T pathOperation(CredentialOperation pathOp) { + builder.put( + UCDeltaRestCatalogApiCredentialConf.UC_PATH_OPERATION_KEY, + toPathOperationValue(pathOp)); + return self(); + } + + public T saveAndOverride( + Map fsImplProps, String key, String defaultOriginal, String newValue) { + builder.put(key + ".original", fsImplProps.getOrDefault(key, defaultOriginal)); + builder.put(key, newValue); + return self(); + } + + protected abstract T self(); + + public Map build() { + return Collections.unmodifiableMap(new LinkedHashMap<>(builder)); + } + } + + private static class S3PropsBuilder extends PropsBuilder { + S3PropsBuilder(boolean credScopedFsEnabled, Map fsImplProps) { + set("fs.s3a.path.style.access", "true"); + set("fs.s3.impl.disable.cache", "true"); + set("fs.s3a.impl.disable.cache", "true"); + + if (credScopedFsEnabled) { + saveAndOverride( + fsImplProps, + "fs.s3.impl", + "org.apache.hadoop.fs.s3a.S3AFileSystem", + CRED_SCOPED_FILE_SYSTEM_CLASS); + saveAndOverride( + fsImplProps, + "fs.s3a.impl", + "org.apache.hadoop.fs.s3a.S3AFileSystem", + CRED_SCOPED_FILE_SYSTEM_CLASS); + saveAndOverride( + fsImplProps, + "fs.AbstractFileSystem.s3.impl", + "org.apache.hadoop.fs.s3a.S3A", + CRED_SCOPED_FS_CLASS); + saveAndOverride( + fsImplProps, + "fs.AbstractFileSystem.s3a.impl", + "org.apache.hadoop.fs.s3a.S3A", + CRED_SCOPED_FS_CLASS); + } + } + + @Override + protected S3PropsBuilder self() { + return this; + } + } + + private static class GcsPropsBuilder extends PropsBuilder { + GcsPropsBuilder(boolean credScopedFsEnabled, Map fsImplProps) { + set("fs.gs.create.items.conflict.check.enable", "true"); + set("fs.gs.impl.disable.cache", "true"); + + if (credScopedFsEnabled) { + saveAndOverride( + fsImplProps, + "fs.gs.impl", + "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", + CRED_SCOPED_FILE_SYSTEM_CLASS); + saveAndOverride( + fsImplProps, + "fs.AbstractFileSystem.gs.impl", + "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS", + CRED_SCOPED_FS_CLASS); + } + } + + @Override + protected GcsPropsBuilder self() { + return this; + } + } + + private static class AbfsPropsBuilder extends PropsBuilder { + AbfsPropsBuilder(boolean credScopedFsEnabled, Map fsImplProps) { + set(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, "SAS"); + set(FS_AZURE_ACCOUNT_IS_HNS_ENABLED, "true"); + set("fs.abfs.impl.disable.cache", "true"); + set("fs.abfss.impl.disable.cache", "true"); + + if (credScopedFsEnabled) { + saveAndOverride( + fsImplProps, + "fs.abfs.impl", + "org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem", + CRED_SCOPED_FILE_SYSTEM_CLASS); + saveAndOverride( + fsImplProps, + "fs.abfss.impl", + "org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem", + CRED_SCOPED_FILE_SYSTEM_CLASS); + saveAndOverride( + fsImplProps, + "fs.AbstractFileSystem.abfs.impl", + "org.apache.hadoop.fs.azurebfs.Abfs", + CRED_SCOPED_FS_CLASS); + saveAndOverride( + fsImplProps, + "fs.AbstractFileSystem.abfss.impl", + "org.apache.hadoop.fs.azurebfs.Abfss", + CRED_SCOPED_FS_CLASS); + } + } + + @Override + protected AbfsPropsBuilder self() { + return this; + } + } + + private static Map s3FixedCredProps( + boolean credScopedFsEnabled, + Map fsImplProps, + StorageCredential credential) { + StorageCredentialConfig config = requireSingleCloudConfig(credential); + return new S3PropsBuilder(credScopedFsEnabled, fsImplProps) + .set("fs.s3a.access.key", requireCredentialField( + config.getS3AccessKeyId(), credential, "S3 access key")) + .set("fs.s3a.secret.key", requireCredentialField( + config.getS3SecretAccessKey(), credential, "S3 secret key")) + .set("fs.s3a.session.token", requireCredentialField( + config.getS3SessionToken(), credential, "S3 session token")) + .build(); + } + + private static S3PropsBuilder s3TempCredPropsBuilder( + boolean credScopedFsEnabled, + Map fsImplProps, + String uri, + TokenProvider tokenProvider, + StorageCredential credential) { + StorageCredentialConfig config = requireSingleCloudConfig(credential); + S3PropsBuilder builder = + new S3PropsBuilder(credScopedFsEnabled, fsImplProps) + .set( + UCDeltaRestCatalogApiCredentialConf.S3A_CREDENTIALS_PROVIDER, + AWS_VENDED_TOKEN_PROVIDER_CLASS) + .uri(uri) + .tokenProvider(tokenProvider) + .uid(UUID.randomUUID().toString()) + .set( + UCDeltaRestCatalogApiCredentialConf.S3A_INIT_ACCESS_KEY, + requireCredentialField(config.getS3AccessKeyId(), credential, "S3 access key")) + .set( + UCDeltaRestCatalogApiCredentialConf.S3A_INIT_SECRET_KEY, + requireCredentialField(config.getS3SecretAccessKey(), credential, "S3 secret key")) + .set( + UCDeltaRestCatalogApiCredentialConf.S3A_INIT_SESSION_TOKEN, + requireCredentialField(config.getS3SessionToken(), credential, "S3 session token")); + + if (credential.getExpirationTimeMs() != null) { + builder.set( + UCDeltaRestCatalogApiCredentialConf.S3A_INIT_CRED_EXPIRED_TIME, + String.valueOf(credential.getExpirationTimeMs())); + } + + return builder; + } + + private static Map s3TableTempCredProps( + boolean credScopedFsEnabled, + Map fsImplProps, + String uri, + TokenProvider tokenProvider, + String tableId, + String catalog, + String schema, + String table, + String location, + StorageCredential credential) { + return s3TempCredPropsBuilder(credScopedFsEnabled, fsImplProps, uri, tokenProvider, credential) + .credentialType(UCDeltaRestCatalogApiCredentialConf.UC_CREDENTIALS_TYPE_TABLE_VALUE) + .tableId(tableId) + .table(catalog, schema, table, location) + .tableOperation(requireOperation(credential)) + .build(); + } + + private static Map s3PathTempCredProps( + boolean credScopedFsEnabled, + Map fsImplProps, + String uri, + TokenProvider tokenProvider, + String path, + StorageCredential credential) { + return s3TempCredPropsBuilder(credScopedFsEnabled, fsImplProps, uri, tokenProvider, credential) + .credentialType(UCDeltaRestCatalogApiCredentialConf.UC_CREDENTIALS_TYPE_PATH_VALUE) + .path(path) + .pathOperation(requireOperation(credential)) + .build(); + } + + private static Map gsFixedCredProps( + boolean credScopedFsEnabled, + Map fsImplProps, + StorageCredential credential) { + StorageCredentialConfig config = requireSingleCloudConfig(credential); + Long expirationTime = + credential.getExpirationTimeMs() == null ? Long.MAX_VALUE : credential.getExpirationTimeMs(); + return new GcsPropsBuilder(credScopedFsEnabled, fsImplProps) + .set(GCS_ACCESS_TOKEN_KEY, requireCredentialField( + config.getGcsOauthToken(), credential, "GCS OAuth token")) + .set(GCS_ACCESS_TOKEN_EXPIRATION_KEY, String.valueOf(expirationTime)) + .build(); + } + + private static GcsPropsBuilder gcsTempCredPropsBuilder( + boolean credScopedFsEnabled, + Map fsImplProps, + String uri, + TokenProvider tokenProvider, + StorageCredential credential) { + StorageCredentialConfig config = requireSingleCloudConfig(credential); + GcsPropsBuilder builder = + new GcsPropsBuilder(credScopedFsEnabled, fsImplProps) + .set("fs.gs.auth.type", "ACCESS_TOKEN_PROVIDER") + .set("fs.gs.auth.access.token.provider", GCS_VENDED_TOKEN_PROVIDER_CLASS) + .uri(uri) + .tokenProvider(tokenProvider) + .uid(UUID.randomUUID().toString()) + .set( + UCDeltaRestCatalogApiCredentialConf.GCS_INIT_OAUTH_TOKEN, + requireCredentialField(config.getGcsOauthToken(), credential, "GCS OAuth token")); + + if (credential.getExpirationTimeMs() != null) { + builder.set( + UCDeltaRestCatalogApiCredentialConf.GCS_INIT_OAUTH_TOKEN_EXPIRATION_TIME, + String.valueOf(credential.getExpirationTimeMs())); + } + + return builder; + } + + private static Map gsTableTempCredProps( + boolean credScopedFsEnabled, + Map fsImplProps, + String uri, + TokenProvider tokenProvider, + String tableId, + String catalog, + String schema, + String table, + String location, + StorageCredential credential) { + return gcsTempCredPropsBuilder(credScopedFsEnabled, fsImplProps, uri, tokenProvider, credential) + .credentialType(UCDeltaRestCatalogApiCredentialConf.UC_CREDENTIALS_TYPE_TABLE_VALUE) + .tableId(tableId) + .table(catalog, schema, table, location) + .tableOperation(requireOperation(credential)) + .build(); + } + + private static Map gsPathTempCredProps( + boolean credScopedFsEnabled, + Map fsImplProps, + String uri, + TokenProvider tokenProvider, + String path, + StorageCredential credential) { + return gcsTempCredPropsBuilder(credScopedFsEnabled, fsImplProps, uri, tokenProvider, credential) + .credentialType(UCDeltaRestCatalogApiCredentialConf.UC_CREDENTIALS_TYPE_PATH_VALUE) + .path(path) + .pathOperation(requireOperation(credential)) + .build(); + } + + private static Map abfsFixedCredProps( + boolean credScopedFsEnabled, + Map fsImplProps, + StorageCredential credential) { + StorageCredentialConfig config = requireSingleCloudConfig(credential); + return new AbfsPropsBuilder(credScopedFsEnabled, fsImplProps) + .set(AZURE_ACCESS_TOKEN_KEY, requireCredentialField( + config.getAzureSasToken(), credential, "Azure SAS token")) + .build(); + } + + private static AbfsPropsBuilder abfsTempCredPropsBuilder( + boolean credScopedFsEnabled, + Map fsImplProps, + String uri, + TokenProvider tokenProvider, + StorageCredential credential) { + StorageCredentialConfig config = requireSingleCloudConfig(credential); + AbfsPropsBuilder builder = + new AbfsPropsBuilder(credScopedFsEnabled, fsImplProps) + .set(FS_AZURE_SAS_TOKEN_PROVIDER_TYPE, ABFS_VENDED_TOKEN_PROVIDER_CLASS) + .uri(uri) + .tokenProvider(tokenProvider) + .uid(UUID.randomUUID().toString()) + .set( + UCDeltaRestCatalogApiCredentialConf.AZURE_INIT_SAS_TOKEN, + requireCredentialField(config.getAzureSasToken(), credential, "Azure SAS token")); + + if (credential.getExpirationTimeMs() != null) { + builder.set( + UCDeltaRestCatalogApiCredentialConf.AZURE_INIT_SAS_TOKEN_EXPIRED_TIME, + String.valueOf(credential.getExpirationTimeMs())); + } + + return builder; + } + + private static Map abfsTableTempCredProps( + boolean credScopedFsEnabled, + Map fsImplProps, + String uri, + TokenProvider tokenProvider, + String tableId, + String catalog, + String schema, + String table, + String location, + StorageCredential credential) { + return abfsTempCredPropsBuilder(credScopedFsEnabled, fsImplProps, uri, tokenProvider, credential) + .credentialType(UCDeltaRestCatalogApiCredentialConf.UC_CREDENTIALS_TYPE_TABLE_VALUE) + .tableId(tableId) + .table(catalog, schema, table, location) + .tableOperation(requireOperation(credential)) + .build(); + } + + private static Map abfsPathTempCredProps( + boolean credScopedFsEnabled, + Map fsImplProps, + String uri, + TokenProvider tokenProvider, + String path, + StorageCredential credential) { + return abfsTempCredPropsBuilder(credScopedFsEnabled, fsImplProps, uri, tokenProvider, credential) + .credentialType(UCDeltaRestCatalogApiCredentialConf.UC_CREDENTIALS_TYPE_PATH_VALUE) + .path(path) + .pathOperation(requireOperation(credential)) + .build(); + } + + public static Map createTableCredProps( + boolean renewCredEnabled, + boolean credScopedFsEnabled, + Map fsImplProps, + String scheme, + String uri, + TokenProvider tokenProvider, + String tableId, + String catalog, + String schema, + String table, + String location, + StorageCredential credential) { + switch (scheme) { + case "s3": + case "s3a": + if (renewCredEnabled) { + return s3TableTempCredProps( + credScopedFsEnabled, + fsImplProps, + uri, + tokenProvider, + tableId, + catalog, + schema, + table, + location, + credential); + } else { + return s3FixedCredProps(credScopedFsEnabled, fsImplProps, credential); + } + case "gs": + if (renewCredEnabled) { + return gsTableTempCredProps( + credScopedFsEnabled, + fsImplProps, + uri, + tokenProvider, + tableId, + catalog, + schema, + table, + location, + credential); + } else { + return gsFixedCredProps(credScopedFsEnabled, fsImplProps, credential); + } + case "abfss": + case "abfs": + if (renewCredEnabled) { + return abfsTableTempCredProps( + credScopedFsEnabled, + fsImplProps, + uri, + tokenProvider, + tableId, + catalog, + schema, + table, + location, + credential); + } else { + return abfsFixedCredProps(credScopedFsEnabled, fsImplProps, credential); + } + default: + return Collections.emptyMap(); + } + } + + public static Map createPathCredProps( + boolean renewCredEnabled, + boolean credScopedFsEnabled, + Map fsImplProps, + String scheme, + String uri, + TokenProvider tokenProvider, + String path, + StorageCredential credential) { + switch (scheme) { + case "s3": + case "s3a": + if (renewCredEnabled) { + return s3PathTempCredProps( + credScopedFsEnabled, fsImplProps, uri, tokenProvider, path, credential); + } else { + return s3FixedCredProps(credScopedFsEnabled, fsImplProps, credential); + } + case "gs": + if (renewCredEnabled) { + return gsPathTempCredProps( + credScopedFsEnabled, fsImplProps, uri, tokenProvider, path, credential); + } else { + return gsFixedCredProps(credScopedFsEnabled, fsImplProps, credential); + } + case "abfss": + case "abfs": + if (renewCredEnabled) { + return abfsPathTempCredProps( + credScopedFsEnabled, fsImplProps, uri, tokenProvider, path, credential); + } else { + return abfsFixedCredProps(credScopedFsEnabled, fsImplProps, credential); + } + default: + return Collections.emptyMap(); + } + } + + private static StorageCredentialConfig requireSingleCloudConfig(StorageCredential credential) { + StorageCredentialConfig config = requireConfig(credential); + boolean hasS3 = + config.getS3AccessKeyId() != null + || config.getS3SecretAccessKey() != null + || config.getS3SessionToken() != null; + boolean hasAzure = config.getAzureSasToken() != null; + boolean hasGcs = config.getGcsOauthToken() != null; + int configuredClouds = (hasS3 ? 1 : 0) + (hasAzure ? 1 : 0) + (hasGcs ? 1 : 0); + if (configuredClouds != 1) { + throw new IllegalArgumentException( + String.format( + "UC Delta Rest Catalog API storage credential for prefix %s must contain exactly " + + "one cloud credential config.", + credential.getPrefix())); + } + return config; + } + + private static StorageCredentialConfig requireConfig(StorageCredential credential) { + Objects.requireNonNull(credential, "credential cannot be null"); + StorageCredentialConfig config = credential.getConfig(); + if (config == null) { + throw new IllegalArgumentException( + String.format( + "UC Delta Rest Catalog API storage credential for prefix %s is missing config.", + credential.getPrefix())); + } + return config; + } + + private static CredentialOperation requireOperation(StorageCredential credential) { + Objects.requireNonNull(credential, "credential cannot be null"); + CredentialOperation operation = credential.getOperation(); + if (operation == null) { + throw new IllegalArgumentException( + String.format( + "UC Delta Rest Catalog API storage credential for prefix %s is missing operation.", + credential.getPrefix())); + } + return operation; + } + + private static String requireCredentialField( + String value, StorageCredential credential, String field) { + if (value == null) { + throw new IllegalArgumentException( + String.format( + "UC Delta Rest Catalog API storage credential for prefix %s is missing %s.", + credential.getPrefix(), field)); + } + return value; + } + + private static String toTableOperationValue(CredentialOperation operation) { + switch (operation) { + case READ: + case READ_WRITE: + return operation.name(); + default: + throw new IllegalArgumentException( + "Unsupported UC Delta Rest Catalog API credential operation: " + operation); + } + } + + private static String toPathOperationValue(CredentialOperation operation) { + switch (operation) { + case READ: + return "PATH_READ"; + case READ_WRITE: + return "PATH_READ_WRITE"; + default: + throw new IllegalArgumentException( + "Unsupported UC Delta Rest Catalog API credential operation: " + operation); + } + } +} diff --git a/storage/src/main/java/io/delta/storage/unitycatalog/hadoop/UCDeltaRestCatalogApiCredentialConf.java b/storage/src/main/java/io/delta/storage/unitycatalog/hadoop/UCDeltaRestCatalogApiCredentialConf.java new file mode 100644 index 00000000000..6e857511956 --- /dev/null +++ b/storage/src/main/java/io/delta/storage/unitycatalog/hadoop/UCDeltaRestCatalogApiCredentialConf.java @@ -0,0 +1,76 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.storage.unitycatalog.hadoop; + +public class UCDeltaRestCatalogApiCredentialConf { + private UCDeltaRestCatalogApiCredentialConf() {} + + public static final String S3A_CREDENTIALS_PROVIDER = "fs.s3a.aws.credentials.provider"; + + public static final String S3A_INIT_ACCESS_KEY = "fs.s3a.init.access.key"; + public static final String S3A_INIT_SECRET_KEY = "fs.s3a.init.secret.key"; + public static final String S3A_INIT_SESSION_TOKEN = "fs.s3a.init.session.token"; + public static final String S3A_INIT_CRED_EXPIRED_TIME = + "fs.s3a.init.credential.expired.time"; + + public static final String AZURE_INIT_SAS_TOKEN = "fs.azure.init.sas.token"; + public static final String AZURE_INIT_SAS_TOKEN_EXPIRED_TIME = + "fs.azure.init.sas.token.expired.time"; + + public static final String FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME = + "fs.azure.account.auth.type"; + public static final String FS_AZURE_ACCOUNT_IS_HNS_ENABLED = "fs.azure.account.hns.enabled"; + public static final String FS_AZURE_SAS_TOKEN_PROVIDER_TYPE = + "fs.azure.sas.token.provider.type"; + + public static final String GCS_INIT_OAUTH_TOKEN = "fs.gs.init.oauth.token"; + public static final String GCS_INIT_OAUTH_TOKEN_EXPIRATION_TIME = + "fs.gs.init.oauth.token.expiration.time"; + + // Delta filters many table options before building Hadoop configurations, so all custom keys + // that must reach FileSystem implementations intentionally use the fs.* namespace. + public static final String UC_URI_KEY = "fs.unitycatalog.uri"; + public static final String UC_AUTH_PREFIX = "fs.unitycatalog.auth."; + public static final String UC_AUTH_TYPE = "fs.unitycatalog.auth.type"; + public static final String UC_AUTH_TOKEN_KEY = "fs.unitycatalog.auth.token"; + + public static final String UC_RENEWAL_LEAD_TIME_KEY = + "fs.unitycatalog.renewal.leadTimeMillis"; + public static final long UC_RENEWAL_LEAD_TIME_DEFAULT_VALUE = 30_000L; + + public static final String UC_TEST_CLOCK_NAME = "fs.unitycatalog.test.clock.name"; + + public static final String UC_CREDENTIALS_UID_KEY = "fs.unitycatalog.credentials.uid"; + + public static final String UC_TABLE_ID_KEY = "fs.unitycatalog.table.id"; + public static final String UC_TABLE_CATALOG_KEY = "fs.unitycatalog.table.catalog"; + public static final String UC_TABLE_SCHEMA_KEY = "fs.unitycatalog.table.schema"; + public static final String UC_TABLE_NAME_KEY = "fs.unitycatalog.table.name"; + public static final String UC_TABLE_LOCATION_KEY = "fs.unitycatalog.table.location"; + public static final String UC_TABLE_OPERATION_KEY = "fs.unitycatalog.table.operation"; + + public static final String UC_PATH_KEY = "fs.unitycatalog.path"; + public static final String UC_PATH_OPERATION_KEY = "fs.unitycatalog.path.operation"; + + public static final String UC_CREDENTIALS_TYPE_KEY = "fs.unitycatalog.credentials.type"; + public static final String UC_CREDENTIALS_TYPE_TABLE_VALUE = "table"; + public static final String UC_CREDENTIALS_TYPE_PATH_VALUE = "path"; + + public static final String UC_CREDENTIAL_CACHE_ENABLED_KEY = + "fs.unitycatalog.credential.cache.enabled"; + public static final boolean UC_CREDENTIAL_CACHE_ENABLED_DEFAULT_VALUE = true; +} diff --git a/storage/src/main/java/io/delta/storage/unitycatalog/hadoop/credentials/AbfsVendedTokenProvider.java b/storage/src/main/java/io/delta/storage/unitycatalog/hadoop/credentials/AbfsVendedTokenProvider.java new file mode 100644 index 00000000000..fadf5e8a260 --- /dev/null +++ b/storage/src/main/java/io/delta/storage/unitycatalog/hadoop/credentials/AbfsVendedTokenProvider.java @@ -0,0 +1,73 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.storage.unitycatalog.hadoop.credentials; + +import io.unitycatalog.client.model.AzureUserDelegationSAS; + +import java.util.Objects; + +import io.delta.storage.unitycatalog.hadoop.UCDeltaRestCatalogApiCredentialConf; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider; + +public class AbfsVendedTokenProvider extends GenericCredentialProvider implements SASTokenProvider { + public static final String ACCESS_TOKEN_KEY = "fs.azure.sas.fixed.token"; + + public AbfsVendedTokenProvider() {} + + @Override + public void initialize(Configuration conf, String accountName) { + initialize(conf); + } + + @Override + public GenericCredential initGenericCredential(Configuration conf) { + if (conf.get(UCDeltaRestCatalogApiCredentialConf.AZURE_INIT_SAS_TOKEN) != null + && conf.get(UCDeltaRestCatalogApiCredentialConf.AZURE_INIT_SAS_TOKEN_EXPIRED_TIME) != null) { + String sasToken = conf.get(UCDeltaRestCatalogApiCredentialConf.AZURE_INIT_SAS_TOKEN); + Objects.requireNonNull( + sasToken, + String.format( + "Azure SAS token not set, please check '%s' in hadoop configuration", + UCDeltaRestCatalogApiCredentialConf.AZURE_INIT_SAS_TOKEN)); + + long expiredTimeMillis = + conf.getLong(UCDeltaRestCatalogApiCredentialConf.AZURE_INIT_SAS_TOKEN_EXPIRED_TIME, 0L); + if (expiredTimeMillis <= 0) { + throw new IllegalStateException( + String.format( + "Azure SAS token expired time must be greater than 0, please check '%s' in " + + "hadoop configuration", + UCDeltaRestCatalogApiCredentialConf.AZURE_INIT_SAS_TOKEN_EXPIRED_TIME)); + } + + return GenericCredential.forAzure(sasToken, expiredTimeMillis); + } else { + return null; + } + } + + @Override + public String getSASToken(String account, String fileSystem, String path, String operation) { + GenericCredential generic = accessCredentials(); + + AzureUserDelegationSAS azureSAS = generic.temporaryCredentials().getAzureUserDelegationSas(); + Objects.requireNonNull(azureSAS, "Azure SAS of generic credential cannot be null"); + + return azureSAS.getSasToken(); + } +} diff --git a/storage/src/main/java/io/delta/storage/unitycatalog/hadoop/credentials/AwsVendedTokenProvider.java b/storage/src/main/java/io/delta/storage/unitycatalog/hadoop/credentials/AwsVendedTokenProvider.java new file mode 100644 index 00000000000..9de63de2513 --- /dev/null +++ b/storage/src/main/java/io/delta/storage/unitycatalog/hadoop/credentials/AwsVendedTokenProvider.java @@ -0,0 +1,73 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.storage.unitycatalog.hadoop.credentials; + +import java.util.Objects; + +import org.apache.hadoop.conf.Configuration; +import io.delta.storage.unitycatalog.hadoop.UCDeltaRestCatalogApiCredentialConf; +import software.amazon.awssdk.auth.credentials.AwsCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; + +public class AwsVendedTokenProvider extends GenericCredentialProvider + implements AwsCredentialsProvider { + + public AwsVendedTokenProvider(Configuration conf) { + initialize(conf); + } + + @Override + public GenericCredential initGenericCredential(Configuration conf) { + if (conf.get(UCDeltaRestCatalogApiCredentialConf.S3A_INIT_ACCESS_KEY) != null + && conf.get(UCDeltaRestCatalogApiCredentialConf.S3A_INIT_SECRET_KEY) != null + && conf.get(UCDeltaRestCatalogApiCredentialConf.S3A_INIT_SESSION_TOKEN) != null) { + String accessKey = conf.get(UCDeltaRestCatalogApiCredentialConf.S3A_INIT_ACCESS_KEY); + String secretKey = conf.get(UCDeltaRestCatalogApiCredentialConf.S3A_INIT_SECRET_KEY); + String sessionToken = conf.get(UCDeltaRestCatalogApiCredentialConf.S3A_INIT_SESSION_TOKEN); + + long expiredTimeMillis = + conf.getLong(UCDeltaRestCatalogApiCredentialConf.S3A_INIT_CRED_EXPIRED_TIME, Long.MAX_VALUE); + if (expiredTimeMillis <= 0) { + throw new IllegalStateException( + String.format( + "Expired time %s must be greater than 0, please check configure key '%s'", + expiredTimeMillis, + UCDeltaRestCatalogApiCredentialConf.S3A_INIT_CRED_EXPIRED_TIME)); + } + + return GenericCredential.forAws(accessKey, secretKey, sessionToken, expiredTimeMillis); + } else { + return null; + } + } + + @Override + public AwsCredentials resolveCredentials() { + GenericCredential generic = accessCredentials(); + + io.unitycatalog.client.model.AwsCredentials awsTempCred = + generic.temporaryCredentials().getAwsTempCredentials(); + Objects.requireNonNull(awsTempCred, "AWS temp credential of generic credentials cannot be null"); + + return AwsSessionCredentials.builder() + .accessKeyId(awsTempCred.getAccessKeyId()) + .secretAccessKey(awsTempCred.getSecretAccessKey()) + .sessionToken(awsTempCred.getSessionToken()) + .build(); + } +} diff --git a/storage/src/main/java/io/delta/storage/unitycatalog/hadoop/credentials/GcsVendedTokenProvider.java b/storage/src/main/java/io/delta/storage/unitycatalog/hadoop/credentials/GcsVendedTokenProvider.java new file mode 100644 index 00000000000..1b2826677ba --- /dev/null +++ b/storage/src/main/java/io/delta/storage/unitycatalog/hadoop/credentials/GcsVendedTokenProvider.java @@ -0,0 +1,96 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.storage.unitycatalog.hadoop.credentials; + +import com.google.cloud.hadoop.util.AccessTokenProvider; +import io.unitycatalog.client.model.GcpOauthToken; + +import java.io.IOException; +import java.time.Instant; +import java.util.Objects; + +import org.apache.hadoop.conf.Configuration; +import io.delta.storage.unitycatalog.hadoop.UCDeltaRestCatalogApiCredentialConf; + +public class GcsVendedTokenProvider extends GenericCredentialProvider + implements AccessTokenProvider { + + public static final String ACCESS_TOKEN_KEY = "fs.gs.auth.access.token.credential"; + public static final String ACCESS_TOKEN_EXPIRATION_KEY = "fs.gs.auth.access.token.expiration"; + private Configuration conf; + + public GcsVendedTokenProvider() {} + + @Override + public GenericCredential initGenericCredential(Configuration conf) { + if (conf.get(UCDeltaRestCatalogApiCredentialConf.GCS_INIT_OAUTH_TOKEN) != null) { + String oauthToken = conf.get(UCDeltaRestCatalogApiCredentialConf.GCS_INIT_OAUTH_TOKEN); + Objects.requireNonNull( + oauthToken, + String.format( + "GCS OAuth token not set, please check '%s' in hadoop configuration", + UCDeltaRestCatalogApiCredentialConf.GCS_INIT_OAUTH_TOKEN)); + + long expiredTimeMillis = + conf.getLong(UCDeltaRestCatalogApiCredentialConf.GCS_INIT_OAUTH_TOKEN_EXPIRATION_TIME, + Long.MAX_VALUE); + if (expiredTimeMillis <= 0) { + throw new IllegalStateException( + String.format( + "Expired time %s must be greater than 0, please check configure key '%s'", + expiredTimeMillis, + UCDeltaRestCatalogApiCredentialConf.GCS_INIT_OAUTH_TOKEN_EXPIRATION_TIME)); + } + + return GenericCredential.forGcs(oauthToken, expiredTimeMillis); + } else { + return null; + } + } + + @Override + public AccessToken getAccessToken() { + GenericCredential generic = accessCredentials(); + + GcpOauthToken gcpToken = generic.temporaryCredentials().getGcpOauthToken(); + Objects.requireNonNull(gcpToken, "GCS OAuth token of generic credential cannot be null"); + + String tokenValue = gcpToken.getOauthToken(); + Objects.requireNonNull(tokenValue, "GCS OAuth token value cannot be null"); + + Long expirationMillis = generic.temporaryCredentials().getExpirationTime(); + Instant expirationInstant = + expirationMillis == null ? null : Instant.ofEpochMilli(expirationMillis); + return new AccessToken(tokenValue, expirationInstant); + } + + @Override + public void refresh() throws IOException { + // Renewal happens when getAccessToken() calls accessCredentials(). + } + + @Override + public void setConf(Configuration configuration) { + this.conf = configuration; + initialize(configuration); + } + + @Override + public Configuration getConf() { + return conf; + } +} diff --git a/storage/src/main/java/io/delta/storage/unitycatalog/hadoop/credentials/GenericCredential.java b/storage/src/main/java/io/delta/storage/unitycatalog/hadoop/credentials/GenericCredential.java new file mode 100644 index 00000000000..39880fcafd9 --- /dev/null +++ b/storage/src/main/java/io/delta/storage/unitycatalog/hadoop/credentials/GenericCredential.java @@ -0,0 +1,126 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.storage.unitycatalog.hadoop.credentials; + +import io.unitycatalog.client.internal.Clock; +import io.unitycatalog.client.model.AwsCredentials; +import io.unitycatalog.client.model.AzureUserDelegationSAS; +import io.unitycatalog.client.model.GcpOauthToken; +import io.unitycatalog.client.model.TemporaryCredentials; + +import java.util.Objects; + +import io.delta.storage.commit.uccommitcoordinator.UCDeltaModels.StorageCredential; +import io.delta.storage.commit.uccommitcoordinator.UCDeltaModels.StorageCredentialConfig; + +public class GenericCredential { + private final TemporaryCredentials tempCred; + + public GenericCredential(TemporaryCredentials tempCred) { + this.tempCred = tempCred; + } + + public static GenericCredential forAws( + String accessKey, String secretKey, String sessionToken, long expiredTimeMillis) { + AwsCredentials awsCredentials = new AwsCredentials(); + awsCredentials.setAccessKeyId(accessKey); + awsCredentials.setSecretAccessKey(secretKey); + awsCredentials.setSessionToken(sessionToken); + + TemporaryCredentials tempCred = new TemporaryCredentials(); + tempCred.setAwsTempCredentials(awsCredentials); + tempCred.setExpirationTime(expiredTimeMillis); + + return new GenericCredential(tempCred); + } + + public static GenericCredential forAzure(String sasToken, long expiredTimeMillis) { + AzureUserDelegationSAS azureSAS = new AzureUserDelegationSAS(); + azureSAS.setSasToken(sasToken); + + TemporaryCredentials tempCred = new TemporaryCredentials(); + tempCred.setAzureUserDelegationSas(azureSAS); + tempCred.setExpirationTime(expiredTimeMillis); + + return new GenericCredential(tempCred); + } + + public static GenericCredential forGcs(String oauthToken, long expiredTimeMillis) { + GcpOauthToken gcpOauthToken = new GcpOauthToken(); + gcpOauthToken.setOauthToken(oauthToken); + + TemporaryCredentials tempCred = new TemporaryCredentials(); + tempCred.setGcpOauthToken(gcpOauthToken); + tempCred.setExpirationTime(expiredTimeMillis); + + return new GenericCredential(tempCred); + } + + public static GenericCredential fromStorageCredential(StorageCredential storageCredential) { + Objects.requireNonNull(storageCredential, "storageCredential cannot be null"); + StorageCredentialConfig config = storageCredential.getConfig(); + Objects.requireNonNull(config, "storageCredential config cannot be null"); + long expirationTime = expirationTimeOrNever(storageCredential); + if (config.getS3AccessKeyId() != null + || config.getS3SecretAccessKey() != null + || config.getS3SessionToken() != null) { + return forAws( + config.getS3AccessKeyId(), + config.getS3SecretAccessKey(), + config.getS3SessionToken(), + expirationTime); + } else if (config.getAzureSasToken() != null) { + return forAzure(config.getAzureSasToken(), expirationTime); + } else if (config.getGcsOauthToken() != null) { + return forGcs(config.getGcsOauthToken(), expirationTime); + } + throw new IllegalArgumentException("storageCredential config contains no cloud credential"); + } + + private static long expirationTimeOrNever(StorageCredential storageCredential) { + Long expirationTimeMs = storageCredential.getExpirationTimeMs(); + return expirationTimeMs == null ? Long.MAX_VALUE : expirationTimeMs; + } + + public TemporaryCredentials temporaryCredentials() { + return tempCred; + } + + public boolean readyToRenew(Clock clock, long renewalLeadTimeMillis) { + return tempCred.getExpirationTime() != null + && tempCred.getExpirationTime() <= clock.now().toEpochMilli() + renewalLeadTimeMillis; + } + + @Override + public int hashCode() { + return Objects.hashCode(tempCred); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + GenericCredential that = (GenericCredential) o; + return Objects.equals(tempCred, that.tempCred); + } +} diff --git a/storage/src/main/java/io/delta/storage/unitycatalog/hadoop/credentials/GenericCredentialProvider.java b/storage/src/main/java/io/delta/storage/unitycatalog/hadoop/credentials/GenericCredentialProvider.java new file mode 100644 index 00000000000..9f65ae700e4 --- /dev/null +++ b/storage/src/main/java/io/delta/storage/unitycatalog/hadoop/credentials/GenericCredentialProvider.java @@ -0,0 +1,217 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.storage.unitycatalog.hadoop.credentials; + +import io.unitycatalog.client.auth.TokenProvider; +import io.unitycatalog.client.internal.Clock; + +import java.io.IOException; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import io.delta.storage.commit.uccommitcoordinator.UCClient; +import io.delta.storage.commit.uccommitcoordinator.UCDeltaModels.CredentialOperation; +import io.delta.storage.commit.uccommitcoordinator.UCDeltaModels.CredentialsResponse; +import io.delta.storage.commit.uccommitcoordinator.UCDeltaModels.StorageCredential; +import io.delta.storage.commit.uccommitcoordinator.UCTokenBasedRestClient; +import io.delta.storage.unitycatalog.hadoop.UCDeltaRestCatalogApiCredentialConf; +import org.apache.hadoop.conf.Configuration; + +public abstract class GenericCredentialProvider { + static final Map globalCache; + private static final String UC_CREDENTIAL_CACHE_MAX_SIZE = + "unitycatalog.credential.cache.maxSize"; + private static final long UC_CREDENTIAL_CACHE_MAX_SIZE_DEFAULT = 1024; + + static { + final long maxSize = + Long.getLong(UC_CREDENTIAL_CACHE_MAX_SIZE, UC_CREDENTIAL_CACHE_MAX_SIZE_DEFAULT); + globalCache = + Collections.synchronizedMap( + new LinkedHashMap(16, 0.75f, true) { + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() > maxSize; + } + }); + } + + private Configuration conf; + private Clock clock; + private long renewalLeadTimeMillis; + private String ucUri; + private TokenProvider tokenProvider; + private String credUid; + private boolean credCacheEnabled; + + private volatile GenericCredential credential; + private volatile UCClient ucClient; + + protected void initialize(Configuration conf) { + this.conf = conf; + + String clockName = conf.get(UCDeltaRestCatalogApiCredentialConf.UC_TEST_CLOCK_NAME); + this.clock = clockName != null ? Clock.getManualClock(clockName) : Clock.systemClock(); + + this.renewalLeadTimeMillis = + conf.getLong( + UCDeltaRestCatalogApiCredentialConf.UC_RENEWAL_LEAD_TIME_KEY, + UCDeltaRestCatalogApiCredentialConf.UC_RENEWAL_LEAD_TIME_DEFAULT_VALUE); + + String ucUriStr = conf.get(UCDeltaRestCatalogApiCredentialConf.UC_URI_KEY); + Objects.requireNonNull( + ucUriStr, + String.format( + "'%s' is not set in hadoop configuration", + UCDeltaRestCatalogApiCredentialConf.UC_URI_KEY)); + this.ucUri = ucUriStr; + + this.tokenProvider = + TokenProvider.create( + conf.getPropsWithPrefix(UCDeltaRestCatalogApiCredentialConf.UC_AUTH_PREFIX)); + + this.credUid = conf.get(UCDeltaRestCatalogApiCredentialConf.UC_CREDENTIALS_UID_KEY); + checkState( + credUid != null && !credUid.isEmpty(), + "Credential UID cannot be null or empty, '%s' is not set in hadoop configuration", + UCDeltaRestCatalogApiCredentialConf.UC_CREDENTIALS_UID_KEY); + + this.credCacheEnabled = + conf.getBoolean( + UCDeltaRestCatalogApiCredentialConf.UC_CREDENTIAL_CACHE_ENABLED_KEY, + UCDeltaRestCatalogApiCredentialConf.UC_CREDENTIAL_CACHE_ENABLED_DEFAULT_VALUE); + + this.credential = initGenericCredential(conf); + } + + public abstract GenericCredential initGenericCredential(Configuration conf); + + public GenericCredential accessCredentials() { + if (credential == null || credential.readyToRenew(clock, renewalLeadTimeMillis)) { + synchronized (this) { + if (credential == null || credential.readyToRenew(clock, renewalLeadTimeMillis)) { + try { + credential = renewCredential(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + } + + return credential; + } + + protected UCClient ucClient() { + if (ucClient == null) { + synchronized (this) { + if (ucClient == null) { + ucClient = new UCTokenBasedRestClient(ucUri, tokenProvider, Collections.emptyMap()); + } + } + } + + return ucClient; + } + + private GenericCredential renewCredential() throws IOException { + if (credCacheEnabled) { + synchronized (globalCache) { + GenericCredential cached = globalCache.get(credUid); + if (cached != null && !cached.readyToRenew(clock, renewalLeadTimeMillis)) { + return cached; + } + GenericCredential renewed = createGenericCredentials(); + globalCache.put(credUid, renewed); + return renewed; + } + } else { + return createGenericCredentials(); + } + } + + private GenericCredential createGenericCredentials() throws IOException { + String type = conf.get(UCDeltaRestCatalogApiCredentialConf.UC_CREDENTIALS_TYPE_KEY); + if (UCDeltaRestCatalogApiCredentialConf.UC_CREDENTIALS_TYPE_PATH_VALUE.equals(type)) { + throw new UnsupportedOperationException( + "UC Delta Rest Catalog API path credential renewal is not supported by this Delta " + + "version."); + } else if (UCDeltaRestCatalogApiCredentialConf.UC_CREDENTIALS_TYPE_TABLE_VALUE.equals(type)) { + String catalog = requireConf(UCDeltaRestCatalogApiCredentialConf.UC_TABLE_CATALOG_KEY); + String schema = requireConf(UCDeltaRestCatalogApiCredentialConf.UC_TABLE_SCHEMA_KEY); + String table = requireConf(UCDeltaRestCatalogApiCredentialConf.UC_TABLE_NAME_KEY); + String location = requireConf(UCDeltaRestCatalogApiCredentialConf.UC_TABLE_LOCATION_KEY); + String tableOperation = requireConf(UCDeltaRestCatalogApiCredentialConf.UC_TABLE_OPERATION_KEY); + + CredentialsResponse response = + ucClient() + .getTableCredentials( + CredentialOperation.valueOf(tableOperation), catalog, schema, table); + return GenericCredential.fromStorageCredential( + selectStorageCredential(location, response.getStorageCredentials())); + } else { + throw new IllegalArgumentException( + String.format( + "Unsupported unity catalog temporary credentials type '%s', please check '%s'", + type, UCDeltaRestCatalogApiCredentialConf.UC_CREDENTIALS_TYPE_KEY)); + } + } + + private String requireConf(String key) { + String value = conf.get(key); + checkState(value != null && !value.isEmpty(), "'%s' is not set", key); + return value; + } + + private StorageCredential selectStorageCredential( + String location, List storageCredentials) { + StorageCredential bestMatch = null; + for (StorageCredential credential : storageCredentials) { + if (credential != null + && credential.getPrefix() != null + && matchesCredentialPrefix(location, credential.getPrefix()) + && (bestMatch == null || credential.getPrefix().length() > bestMatch.getPrefix().length())) { + bestMatch = credential; + } + } + checkState( + bestMatch != null, "No UC Delta Rest Catalog API credential matched location '%s'.", location); + return bestMatch; + } + + private boolean matchesCredentialPrefix(String location, String prefix) { + String normalizedLocation = stripTrailingSlash(location); + String normalizedPrefix = stripTrailingSlash(prefix); + return !normalizedPrefix.isEmpty() + && (normalizedLocation.equals(normalizedPrefix) + || (normalizedLocation.startsWith(normalizedPrefix) + && normalizedLocation.charAt(normalizedPrefix.length()) == '/')); + } + + private String stripTrailingSlash(String value) { + return value.endsWith("/") ? value.substring(0, value.length() - 1) : value; + } + + private static void checkState(boolean expression, String message, Object... args) { + if (!expression) { + throw new IllegalStateException(String.format(message, args)); + } + } +} diff --git a/storage/src/main/java/io/delta/storage/unitycatalog/hadoop/fs/CredScopedFileSystem.java b/storage/src/main/java/io/delta/storage/unitycatalog/hadoop/fs/CredScopedFileSystem.java new file mode 100644 index 00000000000..a32bacc12e7 --- /dev/null +++ b/storage/src/main/java/io/delta/storage/unitycatalog/hadoop/fs/CredScopedFileSystem.java @@ -0,0 +1,121 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.storage.unitycatalog.hadoop.fs; + +import java.io.IOException; +import java.net.URI; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FilterFileSystem; + +/** + * Hadoop {@link FileSystem} wrapper that lets several UC credential scopes coexist in one Spark + * session without forcing one global filesystem cache entry per cloud scheme. + */ +public class CredScopedFileSystem extends FilterFileSystem { + + private static final String CRED_SCOPED_FS_CACHE_MAX_SIZE = + "unitycatalog.credScopedFs.cache.maxSize"; + private static final long CRED_SCOPED_FS_CACHE_MAX_SIZE_DEFAULT = 100; + + static final Map CACHE; + + static { + final long maxSize = + Long.getLong(CRED_SCOPED_FS_CACHE_MAX_SIZE, CRED_SCOPED_FS_CACHE_MAX_SIZE_DEFAULT); + CACHE = + Collections.synchronizedMap( + new LinkedHashMap(16, 0.75f, true) { + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + if (size() <= maxSize) { + return false; + } + closeQuietly(eldest.getValue()); + return true; + } + }); + } + + static void clearCacheForTesting() { + synchronized (CACHE) { + CACHE.values().forEach(CredScopedFileSystem::closeQuietly); + CACHE.clear(); + } + } + + FileSystem getDelegate() { + return this.fs; + } + + @Override + public void initialize(URI uri, Configuration conf) throws IOException { + CredScopedKey key = CredScopedKey.create(uri, conf); + synchronized (CACHE) { + FileSystem cachedFs = CACHE.get(key); + if (cachedFs == null) { + cachedFs = newFileSystem(uri, conf); + CACHE.put(key, cachedFs); + } + this.fs = cachedFs; + } + } + + private static void restoreImpl(Configuration fsConf, String key, String defaultImpl) { + fsConf.set(key, fsConf.get(key + ".original", defaultImpl)); + } + + private static FileSystem newFileSystem(URI uri, Configuration conf) throws IOException { + Configuration fsConf = new Configuration(conf); + + restoreImpl(fsConf, "fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"); + restoreImpl(fsConf, "fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"); + restoreImpl(fsConf, "fs.AbstractFileSystem.s3.impl", "org.apache.hadoop.fs.s3a.S3A"); + restoreImpl(fsConf, "fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A"); + fsConf.set("fs.s3.impl.disable.cache", "true"); + fsConf.set("fs.s3a.impl.disable.cache", "true"); + + restoreImpl(fsConf, "fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem"); + restoreImpl( + fsConf, "fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS"); + fsConf.set("fs.gs.impl.disable.cache", "true"); + + restoreImpl(fsConf, "fs.abfs.impl", "org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem"); + restoreImpl(fsConf, "fs.abfss.impl", "org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem"); + restoreImpl(fsConf, "fs.AbstractFileSystem.abfs.impl", "org.apache.hadoop.fs.azurebfs.Abfs"); + restoreImpl(fsConf, "fs.AbstractFileSystem.abfss.impl", "org.apache.hadoop.fs.azurebfs.Abfss"); + fsConf.set("fs.abfs.impl.disable.cache", "true"); + fsConf.set("fs.abfss.impl.disable.cache", "true"); + + return FileSystem.get(uri, fsConf); + } + + private static void closeQuietly(FileSystem fs) { + if (fs == null) { + return; + } + try { + fs.close(); + } catch (IOException e) { + // ignore close failures on eviction + } + } +} diff --git a/storage/src/main/java/io/delta/storage/unitycatalog/hadoop/fs/CredScopedFs.java b/storage/src/main/java/io/delta/storage/unitycatalog/hadoop/fs/CredScopedFs.java new file mode 100644 index 00000000000..b34245a6e58 --- /dev/null +++ b/storage/src/main/java/io/delta/storage/unitycatalog/hadoop/fs/CredScopedFs.java @@ -0,0 +1,30 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.storage.unitycatalog.hadoop.fs; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.DelegateToFileSystem; + +public class CredScopedFs extends DelegateToFileSystem { + protected CredScopedFs(URI uri, Configuration conf) throws IOException, URISyntaxException { + super(uri, new CredScopedFileSystem(), conf, uri.getScheme(), false); + } +} diff --git a/storage/src/main/java/io/delta/storage/unitycatalog/hadoop/fs/CredScopedKey.java b/storage/src/main/java/io/delta/storage/unitycatalog/hadoop/fs/CredScopedKey.java new file mode 100644 index 00000000000..da088d2e38d --- /dev/null +++ b/storage/src/main/java/io/delta/storage/unitycatalog/hadoop/fs/CredScopedKey.java @@ -0,0 +1,134 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.storage.unitycatalog.hadoop.fs; + +import java.net.URI; +import java.util.Objects; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import io.delta.storage.unitycatalog.hadoop.UCDeltaRestCatalogApiCredentialConf; + +public interface CredScopedKey { + + static CredScopedKey create(URI uri, Configuration conf) { + String type = conf.get(UCDeltaRestCatalogApiCredentialConf.UC_CREDENTIALS_TYPE_KEY); + if (UCDeltaRestCatalogApiCredentialConf.UC_CREDENTIALS_TYPE_PATH_VALUE.equals(type)) { + String path = conf.get(UCDeltaRestCatalogApiCredentialConf.UC_PATH_KEY); + String pathOperation = conf.get(UCDeltaRestCatalogApiCredentialConf.UC_PATH_OPERATION_KEY); + + return new PathCredScopedKey(path, pathOperation); + } else if (UCDeltaRestCatalogApiCredentialConf.UC_CREDENTIALS_TYPE_TABLE_VALUE.equals(type)) { + String tableId = conf.get(UCDeltaRestCatalogApiCredentialConf.UC_TABLE_ID_KEY); + String tableOperation = conf.get(UCDeltaRestCatalogApiCredentialConf.UC_TABLE_OPERATION_KEY); + return new TableCredScopedKey(tableId, tableOperation); + } + + return new DefaultCredScopedKey(uri, conf); + } + + class PathCredScopedKey implements CredScopedKey { + private final String path; + private final String pathOperation; + + public PathCredScopedKey(String path, String pathOperation) { + this.path = path; + this.pathOperation = pathOperation; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof PathCredScopedKey)) return false; + PathCredScopedKey that = (PathCredScopedKey) o; + return Objects.equals(path, that.path) && Objects.equals(pathOperation, that.pathOperation); + } + + @Override + public int hashCode() { + return Objects.hash(path, pathOperation); + } + + @Override + public String toString() { + return "PathCredScopedKey{path=" + path + ", op=" + pathOperation + "}"; + } + } + + class TableCredScopedKey implements CredScopedKey { + private final String tableId; + private final String tableOperation; + + public TableCredScopedKey(String tableId, String tableOperation) { + this.tableId = tableId; + this.tableOperation = tableOperation; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof TableCredScopedKey)) return false; + TableCredScopedKey that = (TableCredScopedKey) o; + return Objects.equals(tableId, that.tableId) + && Objects.equals(tableOperation, that.tableOperation); + } + + @Override + public int hashCode() { + return Objects.hash(tableId, tableOperation); + } + + @Override + public String toString() { + return "TableCredScopedKey{tableId=" + tableId + ", op=" + tableOperation + "}"; + } + } + + class DefaultCredScopedKey implements CredScopedKey { + private final String scheme; + private final String authority; + + public DefaultCredScopedKey(URI uri, Configuration conf) { + if (uri.getScheme() == null && uri.getAuthority() == null) { + URI defaultUri = FileSystem.getDefaultUri(conf); + this.scheme = defaultUri.getScheme(); + this.authority = defaultUri.getAuthority(); + } else { + this.scheme = uri.getScheme(); + this.authority = uri.getAuthority(); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof DefaultCredScopedKey)) return false; + DefaultCredScopedKey that = (DefaultCredScopedKey) o; + return Objects.equals(scheme, that.scheme) && Objects.equals(authority, that.authority); + } + + @Override + public int hashCode() { + return Objects.hash(scheme, authority); + } + + @Override + public String toString() { + return "DefaultCredScopedKey{scheme=" + scheme + ", authority=" + authority + "}"; + } + } +} diff --git a/storage/src/test/scala/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClientSuite.scala b/storage/src/test/scala/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClientSuite.scala index 43c56d41ed2..a16a66a48f8 100644 --- a/storage/src/test/scala/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClientSuite.scala +++ b/storage/src/test/scala/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClientSuite.scala @@ -22,9 +22,10 @@ import java.util.{Collections, Optional} import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper} import com.sun.net.httpserver.{HttpExchange, HttpServer} -import io.delta.storage.commit.{Commit, CommitFailedException} -import io.delta.storage.commit.actions.AbstractMetadata +import io.delta.storage.commit.{Commit, CommitFailedException, GetCommitsResponse} +import io.delta.storage.commit.actions.{AbstractMetadata, AbstractProtocol} import io.delta.storage.commit.uniform.{IcebergMetadata, UniformMetadata} +import io.delta.storage.commit.uccommitcoordinator.UCDeltaModels.CredentialOperation import io.unitycatalog.client.auth.TokenProvider import org.apache.hadoop.fs.{FileStatus, Path} @@ -45,6 +46,9 @@ class UCTokenBasedRestClientSuite private var serverUri: String = _ private var metastoreHandler: HttpExchange => Unit = _ private var commitsHandler: HttpExchange => Unit = _ + private var deltaConfigHandler: HttpExchange => Unit = _ + private var deltaTablesHandler: HttpExchange => Unit = _ + private var legacyTablesHandler: HttpExchange => Unit = _ private val objectMapper = new ObjectMapper() override def beforeAll(): Unit = { @@ -63,6 +67,21 @@ class UCTokenBasedRestClientSuite } exchange.close() }) + server.createContext("/api/2.1/unity-catalog/delta/v1/config", exchange => { + if (deltaConfigHandler != null) deltaConfigHandler(exchange) + else sendJson(exchange, HttpStatus.SC_NOT_FOUND, "{}") + exchange.close() + }) + server.createContext("/api/2.1/unity-catalog/delta/v1/catalogs", exchange => { + if (deltaTablesHandler != null) deltaTablesHandler(exchange) + else sendJson(exchange, HttpStatus.SC_NOT_FOUND, "{}") + exchange.close() + }) + server.createContext("/api/2.1/unity-catalog/tables", exchange => { + if (legacyTablesHandler != null) legacyTablesHandler(exchange) + else sendJson(exchange, HttpStatus.SC_NOT_FOUND, "{}") + exchange.close() + }) server.start() serverUri = s"http://localhost:${server.getAddress.getPort}" } @@ -72,6 +91,9 @@ class UCTokenBasedRestClientSuite override def beforeEach(): Unit = { metastoreHandler = null commitsHandler = null + deltaConfigHandler = null + deltaTablesHandler = null + legacyTablesHandler = null } private def readRequestBody(exchange: HttpExchange): String = { @@ -123,6 +145,71 @@ class UCTokenBasedRestClientSuite new UniformMetadata( new IcebergMetadata("s3://bucket/metadata/v1.json", 42L, "2025-01-04T03:13:11.423Z")) + private def loadTableResponseJson: String = + """{ + | "metadata": { + | "name": "tbl", + | "catalog-name": "main", + | "schema-name": "default", + | "table-type": "MANAGED", + | "data-source-format": "DELTA", + | "table-uuid": "11111111-1111-1111-1111-111111111111", + | "location": "file:/tmp/uc/table", + | "created-at": 10, + | "updated-at": 11, + | "columns": { + | "type": "struct", + | "fields": [ + | { + | "name": "id", + | "type": "long", + | "nullable": false, + | "metadata": {} + | } + | ] + | }, + | "partition-columns": [], + | "properties": {} + | }, + | "commits": [] + |}""".stripMargin + + test("UCClient defaults fail loudly for UC Delta Rest Catalog API") { + val client = new UCClient { + override def getMetastoreId(): String = testMetastoreId + override def commit( + tableId: String, + tableUri: URI, + commit: Optional[Commit], + lastKnownBackfilledVersion: Optional[java.lang.Long], + disown: Boolean, + newMetadata: Optional[AbstractMetadata], + newProtocol: Optional[AbstractProtocol], + uniform: Optional[UniformMetadata]): Unit = {} + override def getCommits( + tableId: String, + tableUri: URI, + startVersion: Optional[java.lang.Long], + endVersion: Optional[java.lang.Long]): GetCommitsResponse = null + override def finalizeCreate( + tableName: String, + catalogName: String, + schemaName: String, + storageLocation: String, + columns: java.util.List[UCClient.ColumnDef], + properties: java.util.Map[String, String]): Unit = {} + override def close(): Unit = {} + } + + assert(!client.supportsUCDeltaRestCatalogApi()) + assert(intercept[UnsupportedOperationException] { + client.loadTable("main", "default", "tbl") + }.getMessage === "loadTable requires UC Delta Rest Catalog API support.") + assert(intercept[UnsupportedOperationException] { + client.getTableCredentials(CredentialOperation.READ, "main", "default", "tbl") + }.getMessage === "getTableCredentials requires UC Delta Rest Catalog API support.") + } + // Constructor tests test("constructor validates required parameters") { intercept[NullPointerException] { @@ -150,6 +237,113 @@ class UCTokenBasedRestClientSuite } } + test("catalog-aware constructor uses UC Delta Rest Catalog API when config lists required endpoints") { + deltaConfigHandler = exchange => { + assert(exchange.getRequestURI.getQuery.contains("catalog=main")) + assert(exchange.getRequestURI.getQuery.contains("protocol-versions=1.0")) + sendJson(exchange, HttpStatus.SC_OK, + """{ + | "endpoints": [ + | "GET /v1/catalogs/{catalog}/schemas/{schema}/tables/{table}", + | "GET /v1/catalogs/{catalog}/schemas/{schema}/tables/{table}/credentials" + | ], + | "protocol-version": "1.0" + |}""".stripMargin) + } + deltaTablesHandler = exchange => { + assert(exchange.getRequestURI.getPath === + "/api/2.1/unity-catalog/delta/v1/catalogs/main/schemas/default/tables/tbl") + sendJson(exchange, HttpStatus.SC_OK, loadTableResponseJson) + } + legacyTablesHandler = exchange => fail(s"Unexpected legacy request: ${exchange.getRequestURI}") + + val client = + new UCTokenBasedRestClient(serverUri, createTokenProvider(), Collections.emptyMap(), "main") + try { + assert(client.supportsUCDeltaRestCatalogApi()) + val metadata = client.loadTable("main", "default", "tbl") + assert(metadata.getId === "11111111-1111-1111-1111-111111111111") + assert(metadata.getProvider === "delta") + val adapter = metadata.asInstanceOf[UCTokenBasedRestClient.TableMetadataAdapter] + assert(adapter.getLocation === "file:/tmp/uc/table") + assert(adapter.getSchema.getFields.get(0).getName === "id") + } finally { + client.close() + } + } + + test("catalog-aware constructor fails UC Delta Rest Catalog API loadTable when config is unavailable") { + deltaConfigHandler = exchange => sendJson(exchange, HttpStatus.SC_NOT_FOUND, "{}") + deltaTablesHandler = exchange => + fail(s"Unexpected UC Delta Rest Catalog API request: ${exchange.getRequestURI}") + legacyTablesHandler = exchange => + fail(s"Unexpected legacy request: ${exchange.getRequestURI}") + + val client = + new UCTokenBasedRestClient(serverUri, createTokenProvider(), Collections.emptyMap(), "main") + try { + assert(!client.supportsUCDeltaRestCatalogApi()) + val e = intercept[UnsupportedOperationException] { + client.loadTable("main", "default", "tbl") + } + assert(e.getMessage === "loadTable requires UC Delta Rest Catalog API support.") + val credentialsError = intercept[UnsupportedOperationException] { + client.getTableCredentials(CredentialOperation.READ, "main", "default", "tbl") + } + assert(credentialsError.getMessage === + "getTableCredentials requires UC Delta Rest Catalog API support.") + } finally { + client.close() + } + } + + test("catalog-aware constructor disables UC Delta Rest Catalog API when config does not list loadTable") { + deltaConfigHandler = exchange => sendJson(exchange, HttpStatus.SC_OK, + """{ + | "endpoints": [ + | "GET /v1/catalogs/{catalog}/schemas/{schema}/tables/{table}/credentials" + | ], + | "protocol-version": "1.0" + |}""".stripMargin) + + val client = + new UCTokenBasedRestClient(serverUri, createTokenProvider(), Collections.emptyMap(), "main") + try { + assert(!client.supportsUCDeltaRestCatalogApi()) + } finally { + client.close() + } + } + + test("catalog-aware constructor disables UC Delta Rest Catalog API when config does not list credentials") { + deltaConfigHandler = exchange => sendJson(exchange, HttpStatus.SC_OK, + """{ + | "endpoints": [ + | "GET /v1/catalogs/{catalog}/schemas/{schema}/tables/{table}" + | ], + | "protocol-version": "1.0" + |}""".stripMargin) + + val client = + new UCTokenBasedRestClient(serverUri, createTokenProvider(), Collections.emptyMap(), "main") + try { + assert(!client.supportsUCDeltaRestCatalogApi()) + } finally { + client.close() + } + } + + test("catalog-aware constructor fails when config probe fails") { + deltaConfigHandler = exchange => + sendJson(exchange, HttpStatus.SC_INTERNAL_SERVER_ERROR, """{"error":"boom"}""") + + val e = intercept[IllegalArgumentException] { + new UCTokenBasedRestClient(serverUri, createTokenProvider(), Collections.emptyMap(), "main") + } + assert(e.getMessage.contains("Failed to determine UC Delta Rest Catalog API support")) + assert(e.getMessage.contains("HTTP 500")) + } + // commit tests test("commit succeeds with valid parameters") { withClient { client => diff --git a/storage/src/test/scala/io/delta/storage/unitycatalog/hadoop/UCDeltaRestCatalogApiCredentialPropsUtilSuite.scala b/storage/src/test/scala/io/delta/storage/unitycatalog/hadoop/UCDeltaRestCatalogApiCredentialPropsUtilSuite.scala new file mode 100644 index 00000000000..4969839924f --- /dev/null +++ b/storage/src/test/scala/io/delta/storage/unitycatalog/hadoop/UCDeltaRestCatalogApiCredentialPropsUtilSuite.scala @@ -0,0 +1,371 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.storage.unitycatalog.hadoop + +import java.net.InetSocketAddress +import java.nio.charset.StandardCharsets + +import scala.collection.JavaConverters._ + +import com.sun.net.httpserver.{HttpExchange, HttpServer} +import io.unitycatalog.client.auth.TokenProvider +import io.delta.storage.commit.uccommitcoordinator.UCDeltaModels.{ + CredentialOperation, + StorageCredential, + StorageCredentialConfig +} +import org.apache.hadoop.conf.Configuration +import org.scalatest.funsuite.AnyFunSuite + +import io.delta.storage.unitycatalog.hadoop.{ + CredPropsUtil, + UCDeltaRestCatalogApiCredentialConf} +import io.delta.storage.unitycatalog.hadoop.fs.{CredScopedFileSystem, CredScopedFs} +import io.delta.storage.unitycatalog.hadoop.credentials.{ + AbfsVendedTokenProvider, + AwsVendedTokenProvider, + GcsVendedTokenProvider +} + +class UCDeltaRestCatalogApiCredentialPropsUtilSuite extends AnyFunSuite { + private val TableId = "table-id" + private val TableCatalog = "uc" + private val TableSchema = "default" + private val TableName = "tbl" + private val TableLocation = "s3://bucket/table" + + test("creates static S3 credential properties") { + val props = CredPropsUtil.createTableCredProps( + false, + false, + Map.empty[String, String].asJava, + "s3a", + "https://uc.example", + tokenProvider(), + TableId, + TableCatalog, + TableSchema, + TableName, + TableLocation, + awsCredentials()).asScala.toMap + + assert(props === Map( + "fs.s3a.path.style.access" -> "true", + "fs.s3.impl.disable.cache" -> "true", + "fs.s3a.impl.disable.cache" -> "true", + "fs.s3a.access.key" -> "ak", + "fs.s3a.secret.key" -> "sk", + "fs.s3a.session.token" -> "st")) + } + + test("creates static Azure and GCS credential properties") { + val azureProps = CredPropsUtil.createTableCredProps( + false, + false, + Map.empty[String, String].asJava, + "abfss", + "https://uc.example", + tokenProvider(), + TableId, + TableCatalog, + TableSchema, + TableName, + "abfss://container/path/table", + azureCredentials()).asScala.toMap + val gcsProps = CredPropsUtil.createTableCredProps( + false, + false, + Map.empty[String, String].asJava, + "gs", + "https://uc.example", + tokenProvider(), + TableId, + TableCatalog, + TableSchema, + TableName, + "gs://bucket/table", + gcsCredentials()).asScala.toMap + + assert(azureProps === Map( + "fs.azure.account.auth.type" -> "SAS", + "fs.azure.account.hns.enabled" -> "true", + "fs.abfs.impl.disable.cache" -> "true", + "fs.abfss.impl.disable.cache" -> "true", + "fs.azure.sas.fixed.token" -> "sas")) + assert(gcsProps === Map( + "fs.gs.create.items.conflict.check.enable" -> "true", + "fs.gs.impl.disable.cache" -> "true", + "fs.gs.auth.access.token.credential" -> "token", + "fs.gs.auth.access.token.expiration" -> Long.MaxValue.toString)) + } + + test("creates renewable table credential properties") { + val props = CredPropsUtil.createTableCredProps( + true, + false, + Map.empty[String, String].asJava, + "s3", + "https://uc.example", + tokenProvider(), + TableId, + TableCatalog, + TableSchema, + TableName, + TableLocation, + awsCredentials(expirationTimeMs = 123L)).asScala.toMap + + assert(props(UCDeltaRestCatalogApiCredentialConf.S3A_CREDENTIALS_PROVIDER) === + classOf[AwsVendedTokenProvider].getName) + assert(props(UCDeltaRestCatalogApiCredentialConf.UC_URI_KEY) === "https://uc.example") + assert(props(UCDeltaRestCatalogApiCredentialConf.UC_AUTH_PREFIX + "type") === "static") + assert(props(UCDeltaRestCatalogApiCredentialConf.UC_AUTH_PREFIX + "token") === "token") + assert(props(UCDeltaRestCatalogApiCredentialConf.UC_CREDENTIALS_TYPE_KEY) === + UCDeltaRestCatalogApiCredentialConf.UC_CREDENTIALS_TYPE_TABLE_VALUE) + assert(props(UCDeltaRestCatalogApiCredentialConf.UC_TABLE_ID_KEY) === TableId) + assert(props(UCDeltaRestCatalogApiCredentialConf.UC_TABLE_CATALOG_KEY) === TableCatalog) + assert(props(UCDeltaRestCatalogApiCredentialConf.UC_TABLE_SCHEMA_KEY) === TableSchema) + assert(props(UCDeltaRestCatalogApiCredentialConf.UC_TABLE_NAME_KEY) === TableName) + assert(props(UCDeltaRestCatalogApiCredentialConf.UC_TABLE_LOCATION_KEY) === TableLocation) + assert(props(UCDeltaRestCatalogApiCredentialConf.UC_TABLE_OPERATION_KEY) === "READ_WRITE") + assert(props(UCDeltaRestCatalogApiCredentialConf.S3A_INIT_ACCESS_KEY) === "ak") + assert(props(UCDeltaRestCatalogApiCredentialConf.S3A_INIT_SECRET_KEY) === "sk") + assert(props(UCDeltaRestCatalogApiCredentialConf.S3A_INIT_SESSION_TOKEN) === "st") + assert(props(UCDeltaRestCatalogApiCredentialConf.S3A_INIT_CRED_EXPIRED_TIME) === "123") + assert(props.contains(UCDeltaRestCatalogApiCredentialConf.UC_CREDENTIALS_UID_KEY)) + } + + test("renews table credentials through UC Delta Rest Catalog API") { + var sawDeltaCredentialRequest = false + var sawLegacyCredentialRequest = false + var deltaCredentialRequestQuery: String = null + + withHttpServer { server => + server.createContext("/", exchange => { + exchange.getRequestURI.getPath match { + case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl" | + "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl/" => + sendJson(exchange, 500, "{}") + case "/api/2.1/unity-catalog/delta/v1/catalogs/uc/schemas/default/tables/tbl/credentials" => + sawDeltaCredentialRequest = true + deltaCredentialRequestQuery = exchange.getRequestURI.getQuery + sendJson(exchange, 200, s3CredentialsResponseJson) + case "/api/2.1/unity-catalog/temporary-table-credentials" => + sawLegacyCredentialRequest = true + sendJson(exchange, 500, "{}") + case path => + sendJson(exchange, 404, s"""{"unexpected_uri":"${exchange.getRequestURI}"}""") + } + }) + + val ucUri = s"http://127.0.0.1:${server.getAddress.getPort}" + val props = CredPropsUtil.createTableCredProps( + true, + false, + Map.empty[String, String].asJava, + "s3", + ucUri, + tokenProvider(), + TableId, + TableCatalog, + TableSchema, + TableName, + TableLocation, + awsCredentials(expirationTimeMs = 1L)).asScala.toMap + val conf = new Configuration(false) + props.foreach { case (key, value) => conf.set(key, value) } + + val credentials = new AwsVendedTokenProvider(conf).resolveCredentials() + + assert(credentials.accessKeyId() === "renewed-ak") + assert(sawDeltaCredentialRequest) + assert(deltaCredentialRequestQuery.contains("operation=READ_WRITE")) + assert(!sawLegacyCredentialRequest) + } + } + + test("creates renewable path credential properties") { + val azureProps = CredPropsUtil.createPathCredProps( + true, + false, + Map.empty[String, String].asJava, + "abfs", + "https://uc.example", + tokenProvider(), + "abfs://container/path/table", + azureCredentials(operation = CredentialOperation.READ, expirationTimeMs = 123L)) + .asScala.toMap + val gcsProps = CredPropsUtil.createPathCredProps( + true, + false, + Map.empty[String, String].asJava, + "gs", + "https://uc.example", + tokenProvider(), + "gs://bucket/table", + gcsCredentials(operation = CredentialOperation.READ, expirationTimeMs = 456L)) + .asScala.toMap + + assert(azureProps(UCDeltaRestCatalogApiCredentialConf.FS_AZURE_SAS_TOKEN_PROVIDER_TYPE) === + classOf[AbfsVendedTokenProvider].getName) + assert(azureProps(UCDeltaRestCatalogApiCredentialConf.AZURE_INIT_SAS_TOKEN) === "sas") + assert( + azureProps(UCDeltaRestCatalogApiCredentialConf.AZURE_INIT_SAS_TOKEN_EXPIRED_TIME) === "123") + assert( + azureProps(UCDeltaRestCatalogApiCredentialConf.UC_PATH_OPERATION_KEY) === + "PATH_READ") + + assert(gcsProps("fs.gs.auth.type") === "ACCESS_TOKEN_PROVIDER") + assert(gcsProps("fs.gs.auth.access.token.provider") === + classOf[GcsVendedTokenProvider].getName) + assert(gcsProps(UCDeltaRestCatalogApiCredentialConf.GCS_INIT_OAUTH_TOKEN) === "token") + assert( + gcsProps(UCDeltaRestCatalogApiCredentialConf.GCS_INIT_OAUTH_TOKEN_EXPIRATION_TIME) === "456") + assert(gcsProps(UCDeltaRestCatalogApiCredentialConf.UC_CREDENTIALS_TYPE_KEY) === + UCDeltaRestCatalogApiCredentialConf.UC_CREDENTIALS_TYPE_PATH_VALUE) + assert(gcsProps(UCDeltaRestCatalogApiCredentialConf.UC_PATH_KEY) === "gs://bucket/table") + } + + test("adds credential-scoped filesystem overrides when enabled") { + val props = CredPropsUtil.createTableCredProps( + true, + true, + Map("fs.s3a.impl" -> "com.example.CustomS3A").asJava, + "s3", + "https://uc.example", + tokenProvider(), + TableId, + TableCatalog, + TableSchema, + TableName, + TableLocation, + awsCredentials(operation = CredentialOperation.READ)).asScala.toMap + + assert(props("fs.s3a.impl.original") === "com.example.CustomS3A") + assert(props("fs.s3.impl.original") === "org.apache.hadoop.fs.s3a.S3AFileSystem") + assert(props("fs.s3a.impl") === classOf[CredScopedFileSystem].getName) + assert(props("fs.AbstractFileSystem.s3a.impl") === classOf[CredScopedFs].getName) + } + + test("returns no credential properties for non-UC-vended schemes") { + val props = CredPropsUtil.createTableCredProps( + true, + false, + Map.empty[String, String].asJava, + "file", + "https://uc.example", + tokenProvider(), + TableId, + TableCatalog, + TableSchema, + TableName, + TableLocation, + awsCredentials(operation = CredentialOperation.READ)).asScala.toMap + + assert(props.isEmpty) + } + + private def tokenProvider(): TokenProvider = { + TokenProvider.create(Map("type" -> "static", "token" -> "token").asJava) + } + + private def awsCredentials( + operation: CredentialOperation = CredentialOperation.READ_WRITE, + expirationTimeMs: java.lang.Long = null): StorageCredential = { + newCredential( + operation = operation, + s3AccessKeyId = "ak", + s3SecretAccessKey = "sk", + s3SessionToken = "st", + expirationTimeMs = expirationTimeMs) + } + + private def azureCredentials( + operation: CredentialOperation = CredentialOperation.READ_WRITE, + expirationTimeMs: java.lang.Long = null): StorageCredential = { + newCredential( + operation = operation, + azureSasToken = "sas", + expirationTimeMs = expirationTimeMs) + } + + private def gcsCredentials( + operation: CredentialOperation = CredentialOperation.READ_WRITE, + expirationTimeMs: java.lang.Long = null): StorageCredential = { + newCredential( + operation = operation, + gcsOauthToken = "token", + expirationTimeMs = expirationTimeMs) + } + + private def newCredential( + operation: CredentialOperation, + s3AccessKeyId: String = null, + s3SecretAccessKey: String = null, + s3SessionToken: String = null, + azureSasToken: String = null, + gcsOauthToken: String = null, + expirationTimeMs: java.lang.Long = null): StorageCredential = { + new StorageCredential( + TableLocation, + operation, + new StorageCredentialConfig( + s3AccessKeyId, + s3SecretAccessKey, + s3SessionToken, + azureSasToken, + gcsOauthToken), + expirationTimeMs) + } + + private def withHttpServer[T](body: HttpServer => T): T = { + val server = HttpServer.create(new InetSocketAddress("127.0.0.1", 0), 0) + try { + server.start() + body(server) + } finally { + server.stop(0) + } + } + + private def sendJson(exchange: HttpExchange, status: Int, json: String): Unit = { + val bytes = json.getBytes(StandardCharsets.UTF_8) + exchange.getResponseHeaders.add("Content-Type", "application/json") + exchange.sendResponseHeaders(status, bytes.length) + val body = exchange.getResponseBody + try { + body.write(bytes) + } finally { + body.close() + } + } + + private def s3CredentialsResponseJson: String = + """{ + | "storage-credentials": [ + | { + | "prefix": "s3://bucket/table", + | "operation": "READ_WRITE", + | "expiration-time-ms": 9999999999999, + | "config": { + | "s3.access-key-id": "renewed-ak", + | "s3.secret-access-key": "renewed-sk", + | "s3.session-token": "renewed-st" + | } + | } + | ] + |}""".stripMargin +}