diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedApiClientProvider.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedApiClientProvider.java new file mode 100644 index 00000000000..37ac19c8719 --- /dev/null +++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedApiClientProvider.java @@ -0,0 +1,76 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.storage.commit.uccommitcoordinator; + +import io.unitycatalog.client.ApiClient; +import io.unitycatalog.client.ApiClientBuilder; +import io.unitycatalog.client.auth.TokenProvider; +import io.unitycatalog.client.retry.JitterDelayRetryPolicy; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +abstract class UCTokenBasedApiClientProvider { + private ApiClient apiClient; + + protected UCTokenBasedApiClientProvider( + String baseUri, + TokenProvider tokenProvider, + Map appVersions) { + this.apiClient = buildApiClient(baseUri, tokenProvider, appVersions); + } + + protected UCTokenBasedApiClientProvider( + String baseUri, + TokenProvider tokenProvider, + Map appVersions, + String catalog) { + // The catalog is consumed by subclasses that probe catalog-scoped Delta API support. + this(baseUri, tokenProvider, appVersions); + } + + private static ApiClient buildApiClient( + String baseUri, + TokenProvider tokenProvider, + Map appVersions) { + Objects.requireNonNull(baseUri, "baseUri must not be null"); + Objects.requireNonNull(tokenProvider, "tokenProvider must not be null"); + Objects.requireNonNull(appVersions, "appVersions must not be null"); + + ApiClientBuilder builder = ApiClientBuilder.create() + .uri(baseUri) + .tokenProvider(tokenProvider) + .retryPolicy(JitterDelayRetryPolicy.builder().build()); + + appVersions.forEach((name, version) -> { + if (version != null) { + builder.addAppVersion(name, version); + } + }); + + return builder.build(); + } + + protected ApiClient getApiClient() { + return apiClient; + } + + public void close() throws IOException { + apiClient = null; + } +} diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClient.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClient.java index 67b582a1b00..a60f9d67290 100644 --- a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClient.java +++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClient.java @@ -26,7 +26,6 @@ import io.delta.storage.commit.uniform.IcebergMetadata; import io.delta.storage.commit.uniform.UniformMetadata; import io.unitycatalog.client.ApiClient; -import io.unitycatalog.client.ApiClientBuilder; import io.unitycatalog.client.ApiException; import io.unitycatalog.client.api.DeltaCommitsApi; import io.unitycatalog.client.api.MetastoresApi; @@ -83,7 +82,7 @@ * @see GetCommitsResponse * @see TokenProvider */ -public class UCTokenBasedRestClient implements UCClient { +public class UCTokenBasedRestClient extends UCTokenBasedApiClientProvider implements UCClient { private DeltaCommitsApi deltaCommitsApi; private MetastoresApi metastoresApi; @@ -109,21 +108,20 @@ public UCTokenBasedRestClient( String baseUri, TokenProvider tokenProvider, Map appVersions) { - Objects.requireNonNull(baseUri, "baseUri must not be null"); - Objects.requireNonNull(tokenProvider, "tokenProvider must not be null"); - Objects.requireNonNull(appVersions, "appVersions must not be null"); - - ApiClientBuilder builder = ApiClientBuilder.create() - .uri(baseUri) - .tokenProvider(tokenProvider); - - appVersions.forEach((name, version) -> { - if (version != null) { - builder.addAppVersion(name, version); - } - }); + super(baseUri, tokenProvider, appVersions); + ApiClient apiClient = getApiClient(); + this.deltaCommitsApi = new DeltaCommitsApi(apiClient); + this.metastoresApi = new MetastoresApi(apiClient); + this.tablesApi = new TablesApi(apiClient); + } - ApiClient apiClient = builder.build(); + public UCTokenBasedRestClient( + String baseUri, + TokenProvider tokenProvider, + Map appVersions, + String catalog) { + super(baseUri, tokenProvider, appVersions, catalog); + ApiClient apiClient = getApiClient(); this.deltaCommitsApi = new DeltaCommitsApi(apiClient); this.metastoresApi = new MetastoresApi(apiClient); this.tablesApi = new TablesApi(apiClient); @@ -233,6 +231,7 @@ public GetCommitsResponse getCommits( @Override public void close() throws IOException { + super.close(); // Nulling out the API instances makes them eligible for GC. Once garbage collected, // the underlying connection pool is freed and destroyed. this.deltaCommitsApi = null;