[Storage] Add UC Delta Rest Catalog API loadTable client APIs#6659
[Storage] Add UC Delta Rest Catalog API loadTable client APIs#6659TimothyW553 wants to merge 1 commit into
Conversation
17be210 to
4610c42
Compare
Range-diff: master (17be210 -> 4610c42)
Reproduce locally: |
4610c42 to
5c40a3a
Compare
Range-diff: master (4610c42 -> 5c40a3a)
Reproduce locally: |
5c40a3a to
8361784
Compare
Range-diff: master (5c40a3a -> 8361784)
Reproduce locally: |
4ae951d to
1ca5b16
Compare
Range-diff: master (4ae951d -> 1ca5b16)
Reproduce locally: |
99cc249 to
57e131c
Compare
0a7d129 to
6839aec
Compare
2473c52 to
1d0d53a
Compare
|
|
||
| class UCDeltaRestCatalogUtilsSuite extends AnyFunSuite { | ||
|
|
||
| private val objectMapper = new ObjectMapper().registerModule(new DeltaTypeModule()) |
There was a problem hiding this comment.
This diverges from production code where it is using ApiClient.getObjectMapper.
This code is trying to do the same as ApiClient.getObjectMapper but it may drift away.
There was a problem hiding this comment.
The old test-local mapper is gone. This path parses legacy ColumnInfo.type_json (which uses camelCase), not a DRC response body, so it cannot use ApiClient.getObjectMapper(). The parsing now lives in UCLegacyLoadTableAdapter so its shared.
| */ | ||
| public UCTokenBasedRestClient( | ||
| ApiClient apiClient, | ||
| Optional<io.unitycatalog.client.delta.api.TablesApi> deltaTablesApi) { |
There was a problem hiding this comment.
Since we no longer need the shim, we can always create this UCTokenBasedRestClient with both tables API to make the construction interface simpler.
supportsDeltaRestCatalog can come from another way like a boolean.
There was a problem hiding this comment.
Done. The existing-ApiClient constructor now takes a boolean instead of Optional<...TablesApi>.
| Objects.requireNonNull(schema, "schema must not be null."); | ||
| Objects.requireNonNull(table, "table must not be null."); | ||
|
|
||
| boolean usingLegacyTablesApi = !supportsDeltaRestCatalog; |
There was a problem hiding this comment.
Done. I removed that redundant boolean while cleaning up the loadTable branches.
| catalog, | ||
| schema, | ||
| table); | ||
| } |
There was a problem hiding this comment.
} else { for better readability
d9fd9bf to
38a0fc6
Compare
| /** | ||
| * Interface for Unity Catalog Delta APIs. | ||
| * | ||
| * <p>This keeps UC Delta Rest Catalog API operations separate from legacy UC commit coordination. |
There was a problem hiding this comment.
UC commit coordination -> UC client
There was a problem hiding this comment.
done, changed this to “legacy UC client” because this interface separates Delta API methods from the old UC client surface.
| } | ||
| } | ||
|
|
||
| protected static class UCDeltaRestCatalogApiSupport { |
There was a problem hiding this comment.
It's better to move this class before the first usage.
There was a problem hiding this comment.
done, moved the helper class before the initialization methods that use it.
| } | ||
|
|
||
| private static DataType toKernelType(io.unitycatalog.client.delta.model.DeltaType deltaType) { | ||
| if (deltaType instanceof io.unitycatalog.client.delta.model.PrimitiveType) { |
There was a problem hiding this comment.
This file should only deal with Delta models, not the existing UC models. Then can you import these types?
There was a problem hiding this comment.
callers only see Delta storage models; the fully-qualified UC SDK types are kept only inside this conversion boundary because names like StructType and StructField conflict with Kernel types.
| val adapter = metadata.asInstanceOf[UCDeltaTokenBasedRestClient.TableMetadataAdapter] | ||
| assert(adapter.getLocation === "file:/tmp/uc/table") | ||
| assert(adapter.getCreatedTime === Long.box(10L)) | ||
| assert(adapter.getSchema.at(0).getName === "id") |
There was a problem hiding this comment.
Should have more coverage on this function to test the schema conversion.
There was a problem hiding this comment.
done, added schema conversion coverage for nested struct, array, map, decimal, metadata values, and nullability.
| }); | ||
|
|
||
| ApiClient apiClient = builder.build(); | ||
| // Register the shared SDK client with the UC Delta base class without probing DRC support. |
There was a problem hiding this comment.
done, changed the wording from DRC to Delta API.
| ApiClient apiClient = builder.build(); | ||
| // Register the shared SDK client with the UC Delta base class without probing DRC support. | ||
| // The catalog-aware constructor below performs the optional config probe. | ||
| initializeUCDeltaRestCatalogApi(apiClient, false); |
There was a problem hiding this comment.
Which callers still call this constructor? Can they easily migrate to the one below?
There was a problem hiding this comment.
existing legacy commit-coordinator callers use this constructor and may not have a catalog, so this constructor stays non-probing while the catalog-aware constructor enables Delta API probing.
| } | ||
| exchange.close() | ||
| }) | ||
| server.createContext("/api/2.1/unity-catalog/delta/v1/config", exchange => { |
There was a problem hiding this comment.
Can you move them into a separate test file?
There was a problem hiding this comment.
done, moved the Delta API-specific client tests into UCDeltaTokenBasedRestClientSuite.
|
|
||
| protected static boolean isUCDeltaRestCatalogApiEnabled() { | ||
| String deltaRestApiEnabled = System.getenv(UC_DELTA_REST_CATALOG_API_ENABLED); | ||
| return deltaRestApiEnabled == null || !deltaRestApiEnabled.equalsIgnoreCase("false"); |
There was a problem hiding this comment.
So by default, we will run those integration tests with the new api protocol ?
!deltaRestApiEnabled.equalsIgnoreCase("false");
will it be more easier to read if make it as deltaRestApiEnabled.equalsIgnoreCase("true") ?
There was a problem hiding this comment.
yes, default is enabled. i updated the code to make this explicit: unset means enabled, "true" means enabled, and anything else is disabled.
| import java.util.List; | ||
|
|
||
| /** Delta-owned models for UC Delta table credentials. */ | ||
| public final class UCDeltaModels { |
There was a problem hiding this comment.
So those are the APIs that we will need to carefully maintain in future.
There was a problem hiding this comment.
yes, these are Delta-owned boundary models for credential responses, and they stay small so callers do not depend on UC SDK credential model classes.
| protected final void initializeUCDeltaRestCatalogApi( | ||
| ApiClient apiClient, | ||
| String catalog) { | ||
| initializeUCDeltaRestCatalogApi(apiClient, getUCDeltaRestCatalogApiSupport(apiClient, catalog)); | ||
| } |
There was a problem hiding this comment.
Do we still need this ? I see no usage for this.
There was a problem hiding this comment.
removed the unused helper overload; the token client now probes support through configureUCDeltaRestCatalogApiSupport(catalog).
|
|
||
| UC_DIR="${UC_DIR:-/tmp/unitycatalog}" | ||
| UC_REPO="${UC_REPO:-https://github.com/unitycatalog/unitycatalog.git}" | ||
| UC_REPO="${UC_REPO:-https://github.com/yili-db/unitycatalog.git}" |
There was a problem hiding this comment.
this needs to be updated to some uc master branch reference + hash . .right?
There was a problem hiding this comment.
yes, this is temporary
tdas
left a comment
There was a problem hiding this comment.
LGTM on build changes and high level direction.
| JsonMapper.builder().serializationInclusion(JsonInclude.Include.NON_NULL).build(); | ||
|
|
||
| static { | ||
| DELTA_TYPE_OBJECT_MAPPER.registerModule(new DeltaTypeModule()); |
There was a problem hiding this comment.
DeltaTypeModule is good for ser/deser the RPC shape columns which has kebab casing.
The schemaString is camelCasing. In UC server it used this to properly ser/deser the camelCasing:
private static ObjectMapper createTypeMapper() {
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new DeltaTypeModule());
mapper.addMixIn(ArrayType.class, CamelCaseArrayMixin.class);
mapper.addMixIn(MapType.class, CamelCaseMapMixin.class);
return mapper;
}
There was a problem hiding this comment.
fixed. added camelCase ArrayType/MapType mixins for schemaString serialization and a test covering kebab-case UC Delta API schema -> camelCase Delta schemaString.
|
|
||
| private boolean supportsUCDeltaRestCatalogApi; | ||
| private volatile boolean closed; | ||
| private ApiClient deltaApiClient; |
There was a problem hiding this comment.
nit: this ApiClient can be the same ApiClient used by UCTokenBasedRestClient. They can share the same ApiClient. It doesn't have to be done in this PR. Can be a follow up.
There was a problem hiding this comment.
done . UCTokenBasedRestClient now passes its ApiClient to the UC Delta API base class, so both paths share the same ApiClient.
There was a problem hiding this comment.
we will do this in a follow up
|
|
||
| @Override | ||
| public String getName() { | ||
| return null; |
There was a problem hiding this comment.
null? Can you save the name in this class?
There was a problem hiding this comment.
fixed. TableMetadataAdapter now stores the table name passed from loadTable/createTable/updateTable and returns it from getName().
| .set("spark.sql.catalog." + catalogName + ".token", uc.serverToken()); | ||
| .set("spark.sql.catalog." + catalogName + ".token", uc.serverToken()) | ||
| .set( | ||
| "spark.sql.catalog." + catalogName + ".deltaRestApi.enabled", |
There was a problem hiding this comment.
qq: this catalog config actually won't affect the old and new credentials api in the unitycatalog's UCSingleCatalog , right ?
I think for next step we will need to add the PR in the unitycatalog repo to enable this catalog level spark config.
There was a problem hiding this comment.
right, this Delta-side config only controls whether DeltaCatalogClient tries the Delta API path. UC-side catalog-level credential config should be handled in the Unity Catalog follow-up.
| /** | ||
| * Gets temporary storage credentials for a table through the UC Delta Rest Catalog API. | ||
| */ | ||
| default CredentialsResponse getTableCredentials( | ||
| CredentialOperation operation, | ||
| String catalog, | ||
| String schema, | ||
| String table) throws IOException { | ||
| throw new UnsupportedOperationException( | ||
| "getTableCredentials requires UC Delta Rest Catalog API support."); | ||
| } |
There was a problem hiding this comment.
@TimothyW553 @yili-db @tdas , do we really need this getTableCredentials API ? since the unitycatalog-hadoop public API will help us handle everything about the credentials, even for the credential initialziation and renewal itself.
My option is: we should entirely remove the getTableCredentials API definition here.
There was a problem hiding this comment.
Right. Since unitycatalog/unitycatalog@ee60cae this is no longer needed here.
And the CredentialsResponse can be gone as well.
There was a problem hiding this comment.
done, I removed getTableCredentials from the Delta storage client surface. table credential setup now goes through unitycatalog-hadoop instead.
There was a problem hiding this comment.
done, CredentialsResponse is gone from this PR and the Delta API storage surface now only exposes loadTable.
| @Override | ||
| public CredentialsResponse getTableCredentials( | ||
| CredentialOperation operation, | ||
| String catalog, | ||
| String schema, | ||
| String table) throws IOException { | ||
| ensureUCDeltaRestCatalogApiSupported("getTableCredentials"); | ||
| Objects.requireNonNull(operation, "operation must not be null."); | ||
| Objects.requireNonNull(catalog, "catalog must not be null."); | ||
| Objects.requireNonNull(schema, "schema must not be null."); | ||
| Objects.requireNonNull(table, "table must not be null."); | ||
|
|
||
| try { | ||
| return toCredentialsResponse(deltaTemporaryCredentialsApi.getTableCredentials( | ||
| toSdkCredentialOperation(operation), | ||
| catalog, | ||
| schema, | ||
| table)); | ||
| } catch (ApiException e) { | ||
| throw new IOException( | ||
| String.format( | ||
| "Failed to get table credentials for %s.%s.%s (HTTP %s): %s", | ||
| catalog, | ||
| schema, | ||
| table, | ||
| e.getCode(), | ||
| e.getResponseBody()), | ||
| e); | ||
| } | ||
| } | ||
|
|
||
| private static TableMetadataAdapter toTableMetadata(String tableName, TableMetadata metadata) { | ||
| if (metadata == null) { | ||
| return null; | ||
| } | ||
| return new TableMetadataAdapter(tableName, metadata); | ||
| } | ||
|
|
||
| protected static CredentialsResponse toCredentialsResponse( | ||
| io.unitycatalog.client.delta.model.CredentialsResponse response) { | ||
| if (response == null) { | ||
| return null; | ||
| } | ||
| List<UCDeltaModels.StorageCredential> credentials = new ArrayList<>(); | ||
| if (response.getStorageCredentials() != null) { | ||
| for (io.unitycatalog.client.delta.model.StorageCredential credential : | ||
| response.getStorageCredentials()) { | ||
| credentials.add(toStorageCredential(credential)); | ||
| } | ||
| } | ||
| return new CredentialsResponse(credentials); | ||
| } | ||
|
|
||
| private static UCDeltaModels.StorageCredential toStorageCredential( | ||
| io.unitycatalog.client.delta.model.StorageCredential credential) { | ||
| if (credential == null) { | ||
| return null; | ||
| } | ||
| return new UCDeltaModels.StorageCredential( | ||
| credential.getPrefix(), | ||
| toCredentialOperation(credential.getOperation()), | ||
| toStorageCredentialConfig(credential.getConfig()), | ||
| credential.getExpirationTimeMs()); | ||
| } | ||
|
|
||
| private static UCDeltaModels.StorageCredentialConfig toStorageCredentialConfig( | ||
| io.unitycatalog.client.delta.model.StorageCredentialConfig config) { | ||
| if (config == null) { | ||
| return null; | ||
| } | ||
| return new UCDeltaModels.StorageCredentialConfig( | ||
| getCredentialConfigValue( | ||
| config, | ||
| config.getS3AccessKeyId(), | ||
| io.unitycatalog.client.delta.model.StorageCredentialConfig | ||
| .JSON_PROPERTY_S3_ACCESS_KEY_ID), | ||
| getCredentialConfigValue( | ||
| config, | ||
| config.getS3SecretAccessKey(), | ||
| io.unitycatalog.client.delta.model.StorageCredentialConfig | ||
| .JSON_PROPERTY_S3_SECRET_ACCESS_KEY), | ||
| getCredentialConfigValue( | ||
| config, | ||
| config.getS3SessionToken(), | ||
| io.unitycatalog.client.delta.model.StorageCredentialConfig | ||
| .JSON_PROPERTY_S3_SESSION_TOKEN), | ||
| getCredentialConfigValue( | ||
| config, | ||
| config.getAzureSasToken(), | ||
| io.unitycatalog.client.delta.model.StorageCredentialConfig | ||
| .JSON_PROPERTY_AZURE_SAS_TOKEN), | ||
| getCredentialConfigValue( | ||
| config, | ||
| config.getGcsOauthToken(), | ||
| io.unitycatalog.client.delta.model.StorageCredentialConfig | ||
| .JSON_PROPERTY_GCS_OAUTH_TOKEN)); | ||
| } | ||
|
|
||
| private static String getCredentialConfigValue( | ||
| io.unitycatalog.client.delta.model.StorageCredentialConfig config, | ||
| String getterValue, | ||
| String key) { | ||
| return getterValue != null ? getterValue : config.get(key); | ||
| } | ||
|
|
||
| private static CredentialOperation toCredentialOperation( | ||
| io.unitycatalog.client.delta.model.CredentialOperation operation) { | ||
| if (operation == null) { | ||
| return null; | ||
| } | ||
| switch (operation) { | ||
| case READ: | ||
| return CredentialOperation.READ; | ||
| case READ_WRITE: | ||
| return CredentialOperation.READ_WRITE; | ||
| default: | ||
| throw new IllegalArgumentException("Unsupported UC Delta credential operation: " + operation); | ||
| } | ||
| } | ||
|
|
||
| protected static io.unitycatalog.client.delta.model.CredentialOperation toSdkCredentialOperation( | ||
| CredentialOperation operation) { | ||
| switch (operation) { | ||
| case READ: | ||
| return io.unitycatalog.client.delta.model.CredentialOperation.READ; | ||
| case READ_WRITE: | ||
| return io.unitycatalog.client.delta.model.CredentialOperation.READ_WRITE; | ||
| default: | ||
| throw new IllegalArgumentException("Unsupported UC Delta credential operation: " + operation); | ||
| } | ||
| } |
There was a problem hiding this comment.
I'm doubting if it's worth for us to duplicate the table credential code in the oss-delta repo again, considering we already have such a completed credentials support inside the oss-unitycatalog.
Anyway, I will take a serious look about the entire stacked PRs, and see if we could eliminate the code duplication. --- this is a serious problem i think.
There was a problem hiding this comment.
This entire block of code of getting table initial credential was written before unitycatalog/unitycatalog@ee60cae and was trying to mirror the same logic that was once in UCSingleCatalog.
It can be greatly simplified now.
There was a problem hiding this comment.
agreed, this was duplicating logic from unitycatalog-hadoop. I simplified this so Delta no longer owns table credential vending logic here.
There was a problem hiding this comment.
done, the table credential initialization moved to unitycatalog-hadoop in the Spark catalog layer, so UCDeltaTokenBasedRestClient only handles loadTable now.
| String baseUri, | ||
| TokenProvider tokenProvider, | ||
| Map<String, String> appVersions, | ||
| String catalog) { |
There was a problem hiding this comment.
Is this catalog still using ?
There was a problem hiding this comment.
yes, the catalog arg is used by UCDeltaTokenBasedRestClient to probe catalog-scoped Delta API support; the shared provider constructor keeps the same signature so OSS and DBR can stay close.
|
|
||
| ApiClientBuilder builder = ApiClientBuilder.create() | ||
| .uri(baseUri) | ||
| .tokenProvider(tokenProvider); |
There was a problem hiding this comment.
unrelated: Do we also need to customized the retry policy ? not a blocker for this PR.
There was a problem hiding this comment.
Should be an easy fix: JitterDelayRetryPolicy.builder().build()
There was a problem hiding this comment.
done, I added the default jitter retry policy through JitterDelayRetryPolicy.builder().build().
There was a problem hiding this comment.
done, added JitterDelayRetryPolicy.builder().build() in the shared ApiClient builder.
🥞 Stacked PR
Use this link to review incremental changes.
Which Delta project/connector is this regarding?
Storage / Unity Catalog
Description
This PR adds the storage-layer client support needed for named-table
loadTablethrough the UC Delta Rest Catalog API.Without this change, Delta has no common storage-level client API for UC Delta Rest Catalog API named-table metadata and credentials. The Spark catalog wiring is added in the next PR.
This PR adds:
UCClient.supportsUCDeltaRestCatalogApi(),loadTable(...), andgetTableCredentials(...).UCDeltaClientandUCDeltaTokenBasedRestClientas the UC Delta Rest Catalog API surface below the existing UC client.UCTokenBasedRestClientprobing of the UC Delta Rest Catalog API config endpoint.UC_DELTA_REST_CATALOG_API_ENABLEDfor Spark UC integration tests; unset means enabled by default, andfalseruns the legacy UC Spark catalog path.How was this patch tested?
Covered by
UCTokenBasedRestClientSuitefor supported endpoints, missing endpoints, config probe failures, default unsupported client behavior, metadata conversion, and table credential handling.Covered by
UnityCatalogSupportTestfor the integration-test environment variable behavior.Local verification:
./build/sbt "storage/testOnly io.delta.storage.commit.uccommitcoordinator.UCTokenBasedRestClientSuite"Does this PR introduce any user-facing changes?
No. This adds internal client APIs and an integration-test toggle.