diff --git a/build.sbt b/build.sbt index 7ea13d2b24d..b0dea5b121d 100644 --- a/build.sbt +++ b/build.sbt @@ -786,6 +786,23 @@ val unityCatalogVersion: String = sys.props.getOrElse( if (useDefaultUnityCatalogReleaseVersion) defaultUnityCatalogReleaseVersion else unityCatalogReleaseVersion.getOrElse(pinnedUnityCatalogVersion)) +/** + * Returns true when `current` is at least `target`. Numeric segments only; suffix after + * the first `-` (e.g. `-SNAPSHOT-abc1234`) is stripped before comparison. + */ +def isAtLeastVersion(current: String, target: String): Boolean = { + def parts(v: String): Seq[Int] = + v.takeWhile(_ != '-').split('.').iterator + .map(p => scala.util.Try(p.toInt).getOrElse(0)).toSeq + val cur = parts(current) + val tgt = parts(target) + val n = math.max(cur.length, tgt.length) + (0 until n).iterator + .map(i => (cur.lift(i).getOrElse(0), tgt.lift(i).getOrElse(0))) + .find { case (a, b) => a != b } + .forall { case (a, b) => a >= b } +} + val sparkUnityCatalogJacksonVersion = "2.15.4" // We are using Spark 4.0's Jackson version 2.15.x, to override Unity Catalog 0.3.0's version 2.18.x // Publishes the pinned UC jars to ~/.ivy2/local if they're not already cached there. Hooked @@ -1217,6 +1234,19 @@ lazy val storage = (project in file("storage")) "org.scalatest" %% "scalatest" % scalaTestVersion % "test", // Jackson datatype module needed for UC SDK tests (excluded from main compile scope) "com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % "2.15.4" % "test", + ) ++ ( + // unitycatalog-hadoop ships from UC 0.5.0 onward; older versions don't publish the + // artifact, so resolving it would fail. Used by UCDeltaTokenBasedRestClient for + // credential vending via UCCredentialHadoopConfs. + if (isAtLeastVersion(unityCatalogVersion, "0.5.0")) { + Seq("io.unitycatalog" % "unitycatalog-hadoop" % unityCatalogVersion excludeAll( + ExclusionRule(organization = "org.openapitools"), + ExclusionRule(organization = "com.fasterxml.jackson.core"), + ExclusionRule(organization = "com.fasterxml.jackson.module"), + ExclusionRule(organization = "com.fasterxml.jackson.datatype"), + ExclusionRule(organization = "com.fasterxml.jackson.dataformat") + )) + } else Nil ), // Publish the pinned UC jars before sbt tries to resolve them. storage is the transitive diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaClient.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaClient.java index 0cb061b920b..04f19608c99 100644 --- a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaClient.java +++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaClient.java @@ -16,8 +16,10 @@ package io.delta.storage.commit.uccommitcoordinator; +import io.delta.storage.commit.TableIdentifier; import io.delta.storage.commit.actions.AbstractMetadata; import io.delta.storage.commit.uccommitcoordinator.UCDeltaModels.StagingTableInfo; +import io.delta.storage.commit.uccommitcoordinator.UCDeltaModels.TableInfo; import java.io.IOException; import java.util.List; import java.util.Map; @@ -31,13 +33,12 @@ public interface UCDeltaClient extends UCClient { /** * Loads a table's metadata from Unity Catalog. * - * @param catalog the catalog name - * @param schema the schema name - * @param table the table name - * @return the table's {@link AbstractMetadata} + * @param tableIdentifier catalog + schema namespace and table name + * @return the table's {@link TableInfo}, carrying the catalog-supplied storage location and + * metadata * @throws IOException on network or API errors */ - AbstractMetadata loadTable(String catalog, String schema, String table) throws IOException; + TableInfo loadTable(TableIdentifier tableIdentifier) throws IOException; /** * Reserves a staging slot for a new Delta table. The returned response contains the table ID, diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaModels.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaModels.java index 083b9796a5d..69e18fbf3e5 100644 --- a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaModels.java +++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaModels.java @@ -16,6 +16,7 @@ package io.delta.storage.commit.uccommitcoordinator; +import io.delta.storage.commit.actions.AbstractMetadata; import io.delta.storage.commit.actions.AbstractProtocol; import java.util.Collection; import java.util.Collections; @@ -23,9 +24,10 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.UUID; /** - * Delta-owned models for the UC Delta REST Catalog API. These decouple the {@link UCDeltaClient} + * Delta-owned models for the UC Delta REST API. These decouple the {@link UCDeltaClient} * interface from any generated SDK types. */ public final class UCDeltaModels { @@ -101,9 +103,54 @@ public int hashCode() { } } + /** Result of {@link UCDeltaClient#loadTable}. */ + public static final class TableInfo { + + private final UUID tableId; + private final TableType tableType; + private final String location; + private final AbstractMetadata metadata; + private final Map storageProperties; + + public TableInfo( + UUID tableId, + TableType tableType, + String location, + AbstractMetadata metadata, + Map storageProperties) { + this.tableId = tableId; + this.tableType = tableType; + this.location = location; + this.metadata = metadata; + this.storageProperties = storageProperties; + } + + /** UC's {@code table_uuid}; distinct from {@link AbstractMetadata#getId()} (the Delta id). */ + public UUID getTableId() { + return tableId; + } + + public TableType getTableType() { + return tableType; + } + + public String getLocation() { + return location; + } + + public AbstractMetadata getMetadata() { + return metadata; + } + + /** Hadoop-style storage options (e.g. catalog-vended credentials). */ + public Map getStorageProperties() { + return storageProperties == null ? Collections.emptyMap() : storageProperties; + } + } + public static final class StagingTableInfo { - private final String tableId; + private final UUID tableId; private final TableType tableType; private final String location; private final DeltaProtocol requiredProtocol; @@ -112,7 +159,7 @@ public static final class StagingTableInfo { private final Map suggestedProperties; public StagingTableInfo( - String tableId, + UUID tableId, TableType tableType, String location, DeltaProtocol requiredProtocol, @@ -128,7 +175,7 @@ public StagingTableInfo( this.suggestedProperties = suggestedProperties; } - public String getTableId() { + public UUID getTableId() { return tableId; } diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaSchemaConverter.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaSchemaConverter.java new file mode 100644 index 00000000000..970b5d8a400 --- /dev/null +++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaSchemaConverter.java @@ -0,0 +1,178 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.storage.commit.uccommitcoordinator; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSetter; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.unitycatalog.client.JSON; +import io.unitycatalog.client.delta.model.ArrayType; +import io.unitycatalog.client.delta.model.DeltaType; +import io.unitycatalog.client.delta.model.MapType; +import io.unitycatalog.client.delta.model.PrimitiveType; +import io.unitycatalog.client.delta.model.StructField; +import io.unitycatalog.client.delta.model.StructType; +import io.unitycatalog.client.delta.serde.DeltaTypeModule; + +import java.util.List; +import java.util.Objects; +import java.util.Set; + +/** + * Conversion from the UC SDK's Delta schema model ({@link StructType}, {@link DeltaType}, + * {@link PrimitiveType}) to Delta's JSON schema wire format. The reverse direction + * ({@link #parseSchemaString}) is reserved for a follow-up and currently throws. + * + *

The SDK's default {@link ObjectMapper} (from {@link JSON#getDefault()}) does not emit + * Delta's wire format on its own: + *

    + *
  • {@link PrimitiveType} serializes as {@code {"type":"integer"}} by default, but Delta + * expects a bare string ({@code "integer"}). {@link DeltaTypeModule} provides custom + * serializers/deserializers that flatten primitives (and decimal) to bare strings.
  • + *
  • The SDK uses kebab-case JSON keys for nested types ({@code element-type}, + * {@code contains-null}, {@code key-type}, etc.). Delta's wire format uses camelCase + * ({@code elementType}, {@code containsNull}, {@code keyType}, ...). The + * {@link CamelCaseArrayMixin} / {@link CamelCaseMapMixin} mixins rename those keys via + * Jackson's {@link com.fasterxml.jackson.databind.ObjectMapper#addMixIn} mechanism.
  • + *
+ * The resulting JSON is parseable by Delta's schema readers (e.g. {@code DataType.fromJson}). + */ +final class UCDeltaSchemaConverter { + + /** Primitive type names that the legacy create-table path ({@link #toUCStructType}) accepts. */ + private static final Set PRIMITIVE_TYPE_NAMES = Set.of( + "BOOLEAN", "BYTE", "SHORT", "INT", "LONG", "FLOAT", "DOUBLE", + "DATE", "TIMESTAMP", "TIMESTAMP_NTZ", "STRING", "BINARY", "DECIMAL"); + + /** + * Singleton mapper preconfigured to emit Delta's wire format + * (bare-string primitives, camelCase keys for nested types). See class-level docs. + */ + private static final ObjectMapper DELTA_SCHEMA_MAPPER = createDeltaSchemaMapper(); + + private UCDeltaSchemaConverter() {} + + /** + * Serializes the SDK's {@link StructType} to Delta's JSON schema wire format. The resulting + * string is parseable by Delta's schema readers (e.g. {@code DataType.fromJson}). + * + * @return JSON string, or {@code null} if {@code columns} is {@code null}. + * @throws IllegalStateException if Jackson fails to serialize. + */ + static String serializeSchema(StructType columns) { + if (columns == null) { + return null; + } + try { + return DELTA_SCHEMA_MAPPER.writeValueAsString(columns); + } catch (JsonProcessingException e) { + throw new IllegalStateException("Failed to serialize UC schema to Delta JSON", e); + } + } + + /** + * Parses a Delta JSON schema string into the SDK's {@link StructType}. Reserved for the + * UpdateTable schema-diff path; not yet implemented. + */ + static StructType parseSchemaString(String schemaString) { + // TODO: implement full Delta schema string -> StructType conversion + throw new UnsupportedOperationException( + "Delta schema string to StructType conversion is not yet implemented."); + } + + /** + * Converts the legacy create-table path's {@link UCClient.ColumnDef} list into the UC SDK's + * {@link StructType}. Only primitive types are supported today; complex types throw + * {@link UnsupportedOperationException}. + * + * @throws NullPointerException if {@code columns} is {@code null}. + */ + static StructType toUCStructType(List columns) { + Objects.requireNonNull(columns, "columns must not be null"); + StructType structType = new StructType(); + for (UCClient.ColumnDef col : columns) { + structType.addFieldsItem(new StructField() + .name(col.getName()) + .nullable(col.isNullable()) + .type(toUCDeltaType(col))); + } + return structType; + } + + private static PrimitiveType toUCDeltaType(UCClient.ColumnDef col) { + if (!PRIMITIVE_TYPE_NAMES.contains(col.getTypeName())) { + throw new UnsupportedOperationException( + "Complex column type '" + col.getTypeName() + "' for column '" + col.getName() + + "' is not yet supported. Only primitive types are supported."); + } + return new PrimitiveType().type(col.getTypeText()); + } + + private static ObjectMapper createDeltaSchemaMapper() { + // Copy the SDK's default mapper so we inherit its base config (visibility, naming, etc.) + // without mutating the shared instance. + ObjectMapper m = JSON.getDefault().getMapper().copy(); + // DeltaTypeModule flattens PrimitiveType/DecimalType into bare type-name strings. + m.registerModule(new DeltaTypeModule()); + // The SDK ships ArrayType/MapType with kebab-case JSON keys; Delta's wire format uses + // camelCase. Mixins rewrite the property names without modifying the generated SDK + // classes themselves. + m.addMixIn(ArrayType.class, CamelCaseArrayMixin.class); + m.addMixIn(MapType.class, CamelCaseMapMixin.class); + return m; + } + + /** + * Jackson mixin that renames {@link ArrayType}'s JSON keys from kebab-case to camelCase + * (matching Delta's wire format). + * + *

The class is {@code abstract} and the methods abstract because Jackson never + * instantiates the mixin: it only inspects annotated method signatures and projects the + * annotations onto the target class. Making the class abstract makes that contract + * explicit and avoids a no-op constructor. + */ + private abstract static class CamelCaseArrayMixin { + @JsonProperty("elementType") + abstract DeltaType getElementType(); + @JsonSetter("elementType") + abstract void setElementType(DeltaType v); + @JsonProperty("containsNull") + abstract Boolean getContainsNull(); + @JsonSetter("containsNull") + abstract void setContainsNull(Boolean v); + } + + /** + * Jackson mixin that renames {@link MapType}'s JSON keys from kebab-case to camelCase + * (matching Delta's wire format). See {@link CamelCaseArrayMixin} for the mixin pattern. + */ + private abstract static class CamelCaseMapMixin { + @JsonProperty("keyType") + abstract DeltaType getKeyType(); + @JsonSetter("keyType") + abstract void setKeyType(DeltaType v); + @JsonProperty("valueType") + abstract DeltaType getValueType(); + @JsonSetter("valueType") + abstract void setValueType(DeltaType v); + @JsonProperty("valueContainsNull") + abstract Boolean getValueContainsNull(); + @JsonSetter("valueContainsNull") + abstract void setValueContainsNull(Boolean v); + } +} diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClient.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClient.java index dedd6abae5e..5e15c09aeca 100644 --- a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClient.java +++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClient.java @@ -22,6 +22,10 @@ import io.delta.storage.commit.TableIdentifier; import io.delta.storage.commit.actions.AbstractMetadata; import io.delta.storage.commit.actions.AbstractProtocol; +import io.delta.storage.commit.uccommitcoordinator.UCDeltaModels.TableInfo; +import io.delta.storage.commit.uccommitcoordinator.exceptions.CredentialFetchFailedException; +import io.delta.storage.commit.uccommitcoordinator.exceptions.NoSuchTableException; +import io.delta.storage.commit.uccommitcoordinator.exceptions.UnsupportedTableFormatException; import io.unitycatalog.client.ApiClient; import io.unitycatalog.client.ApiClientBuilder; import io.unitycatalog.client.ApiException; @@ -35,7 +39,6 @@ import io.unitycatalog.client.delta.model.DeltaCommit; import io.unitycatalog.client.delta.model.DeltaProtocol; import io.unitycatalog.client.delta.model.LoadTableResponse; -import io.unitycatalog.client.delta.model.PrimitiveType; import io.unitycatalog.client.delta.model.RemovePropertiesUpdate; import io.unitycatalog.client.delta.model.SetLatestBackfilledVersionUpdate; import io.unitycatalog.client.delta.model.SetPartitionColumnsUpdate; @@ -46,13 +49,13 @@ import io.unitycatalog.client.delta.model.StagingTableResponse; import io.unitycatalog.client.delta.model.StagingTableResponseRequiredProtocol; import io.unitycatalog.client.delta.model.StagingTableResponseSuggestedProtocol; -import io.unitycatalog.client.delta.model.StructField; -import io.unitycatalog.client.delta.model.StructType; import io.unitycatalog.client.delta.model.TableMetadata; import io.unitycatalog.client.delta.model.UniformMetadata; import io.unitycatalog.client.delta.model.UniformMetadataIceberg; import io.unitycatalog.client.delta.model.UpdateTableRequest; import io.unitycatalog.client.model.GetMetastoreSummaryResponse; +import io.unitycatalog.hadoop.UCCredentialHadoopConfs; +import io.unitycatalog.hadoop.UCCredentialHadoopConfs.TableOperation; import java.io.IOException; import java.net.URI; @@ -63,11 +66,13 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.Set; import java.util.UUID; +import java.util.function.Supplier; + +import org.apache.hadoop.conf.Configuration; /** - * A REST client implementation of {@link UCDeltaClient} that uses the UC Delta REST Catalog API for + * A REST client implementation of {@link UCDeltaClient} that uses the UC Delta REST API for * all table lifecycle and commit coordination operations. * *

This client uses {@code io.unitycatalog.client.delta.api.TablesApi} for Delta-specific @@ -80,24 +85,34 @@ public class UCDeltaTokenBasedRestClient implements UCDeltaClient { private static final int HTTP_CONFLICT = 409; private static final int HTTP_NOT_FOUND = 404; - private static final Set PRIMITIVE_TYPE_NAMES = Set.of( - "BOOLEAN", "BYTE", "SHORT", "INT", "LONG", "FLOAT", "DOUBLE", - "DATE", "TIMESTAMP", "TIMESTAMP_NTZ", "STRING", "BINARY", "DECIMAL"); - private TablesApi deltaTablesApi; private MetastoresApi metastoresApi; + private final ApiClient apiClient; + private final String baseUri; + private final TokenProvider tokenProvider; + private final Map appVersions; + private final boolean credentialRenewalEnabled; + private final boolean credentialScopedFsEnabled; + private final Supplier hadoopConfSupplier; + + public UCDeltaTokenBasedRestClient( + String baseUri, + TokenProvider tokenProvider, + Map appVersions) { + this(baseUri, tokenProvider, appVersions, false, false, null); + } /** - * Constructs a new UCDeltaTokenBasedRestClient. - * - * @param baseUri The base URI of the Unity Catalog server - * @param tokenProvider The TokenProvider to use for authentication - * @param appVersions A map of application name to version string for telemetry + * @param hadoopConfSupplier called once per request so engine-level changes are picked up; + * {@code null} defaults to {@code () -> new Configuration()}. */ public UCDeltaTokenBasedRestClient( String baseUri, TokenProvider tokenProvider, - Map appVersions) { + Map appVersions, + boolean credentialRenewalEnabled, + boolean credentialScopedFsEnabled, + Supplier hadoopConfSupplier) { Objects.requireNonNull(baseUri, "baseUri must not be null"); Objects.requireNonNull(tokenProvider, "tokenProvider must not be null"); Objects.requireNonNull(appVersions, "appVersions must not be null"); @@ -112,9 +127,72 @@ public UCDeltaTokenBasedRestClient( } }); - ApiClient apiClient = builder.build(); - this.deltaTablesApi = new TablesApi(apiClient); - this.metastoresApi = new MetastoresApi(apiClient); + this.apiClient = builder.build(); + this.deltaTablesApi = new TablesApi(this.apiClient); + this.metastoresApi = new MetastoresApi(this.apiClient); + this.baseUri = baseUri; + this.tokenProvider = tokenProvider; + this.appVersions = appVersions; + this.credentialRenewalEnabled = credentialRenewalEnabled; + this.credentialScopedFsEnabled = credentialScopedFsEnabled; + this.hadoopConfSupplier = hadoopConfSupplier != null ? hadoopConfSupplier : Configuration::new; + } + + /** + * Factory for callers that can't depend on {@code io.unitycatalog.client} directly: pass + * a flat {@code authConfigs} map ({@code type} + provider-specific keys) and the factory + * constructs the {@link TokenProvider} internally. + */ + public static UCDeltaTokenBasedRestClient create( + String baseUri, + Map authConfigs, + Map appVersions, + boolean credentialRenewalEnabled, + boolean credentialScopedFsEnabled, + Supplier hadoopConfSupplier) { + Objects.requireNonNull(authConfigs, "authConfigs must not be null"); + if (authConfigs.isEmpty()) { + throw new IllegalArgumentException( + "authConfigs must not be empty; expected at least a 'type' key plus the keys " + + "required by that TokenProvider type."); + } + return new UCDeltaTokenBasedRestClient( + baseUri, + TokenProvider.create(authConfigs), + appVersions, + credentialRenewalEnabled, + credentialScopedFsEnabled, + hadoopConfSupplier); + } + + /** Fresh builder per call: scheme depends on the table's location, hadoopConf is live. */ + private UCCredentialHadoopConfs.Builder newCredBuilder(String scheme) { + return UCCredentialHadoopConfs.builder(baseUri, scheme) + .tokenProvider(tokenProvider) + .apiClient(apiClient) + .enableCredentialRenewal(credentialRenewalEnabled) + .enableCredentialScopedFs(credentialScopedFsEnabled) + .addAppVersions(appVersions) + .hadoopConf(hadoopConfSupplier.get()); + } + + private static String schemeOf(String location) { + int colon = location.indexOf(':'); + return colon > 0 ? location.substring(0, colon) : ""; + } + + private Map fetchTableCredentials( + String catalog, String schema, String table, String location) throws ApiException { + UCCredentialHadoopConfs.Builder b = newCredBuilder(schemeOf(location)); + try { + return b.buildForTable(catalog, schema, table, TableOperation.READ_WRITE, location); + } catch (ApiException rw) { + return b.buildForTable(catalog, schema, table, TableOperation.READ, location); + } catch (IllegalArgumentException malformed) { + // UC Hadoop's response validator (DeltaStorageCredentialUtil.requireSingleCloudConfig) + // throws when the scheme has no cloud cred (e.g. file://). Treat as no creds. + return Collections.emptyMap(); + } } private void ensureOpen() { @@ -233,7 +311,7 @@ public void finalizeCreate( .properties(properties); if (!columns.isEmpty()) { - sdkRequest.columns(toSDKStructType(columns)); + sdkRequest.columns(UCDeltaSchemaConverter.toUCStructType(columns)); } try { @@ -259,17 +337,37 @@ public void close() throws IOException { // =========================== @Override - public AbstractMetadata loadTable( - String catalog, String schema, String table) throws IOException { + public TableInfo loadTable(TableIdentifier tableIdentifier) throws IOException { ensureOpen(); - Objects.requireNonNull(catalog, "catalog must not be null"); - Objects.requireNonNull(schema, "schema must not be null"); - Objects.requireNonNull(table, "table must not be null"); + Objects.requireNonNull(tableIdentifier, "tableIdentifier must not be null"); + String[] namespace = tableIdentifier.getNamespace(); + if (namespace == null || namespace.length != 2) { + throw new IllegalArgumentException( + "UC tableIdentifier must have a 2-component namespace [catalog, schema]; got " + + (namespace == null ? "null" : java.util.Arrays.toString(namespace))); + } + String catalog = namespace[0]; + String schema = namespace[1]; + String table = tableIdentifier.getName(); try { - LoadTableResponse response = deltaTablesApi.loadTable(catalog, schema, table); - return new DeltaTableMetadata(table, response.getMetadata()); + return toTableInfo( + deltaTablesApi.loadTable(catalog, schema, table), catalog, schema, table); } catch (ApiException e) { + if (e.getCode() == HTTP_NOT_FOUND) { + throw new NoSuchTableException( + String.format("Table %s.%s.%s not found in Unity Catalog", catalog, schema, table), e); + } + // UC encodes non-Delta-format errors as HTTP 400 with error.type = + // "UnsupportedTableFormatException"; substring-match the body to avoid coupling to an + // ErrorResponse parser. + String body = e.getResponseBody(); + if (body != null && body.contains("UnsupportedTableFormatException")) { + throw new UnsupportedTableFormatException( + String.format("Table %s.%s.%s is not in Delta format; the Delta REST API cannot " + + "serve it. Body: %s", catalog, schema, table, body), + e); + } throw new IOException( String.format("Failed to load table %s.%s.%s (HTTP %s): %s", catalog, schema, table, e.getCode(), e.getResponseBody()), e); @@ -347,6 +445,40 @@ public AbstractMetadata createTable( // Response Conversion Methods // =========================== + private TableInfo toTableInfo( + LoadTableResponse response, String catalog, String schema, String name) throws IOException { + TableMetadata m = response.getMetadata(); + String location = m.getLocation(); + if (location == null) { + throw new IOException("UC returned null location for table " + name); + } + UUID ucTableId = m.getTableUuid(); + if (ucTableId == null) { + throw new IOException("UC returned null table ID for table " + name); + } + if (m.getTableType() == null) { + throw new IOException("UC returned null table type for table " + name); + } + UCDeltaModels.TableType tableType = + UCDeltaModels.TableType.valueOf(m.getTableType().getValue()); + DeltaTableMetadata adapted = new DeltaTableMetadata(name, m); + Map storageProps; + try { + storageProps = fetchTableCredentials(catalog, schema, name, location); + } catch (ApiException e) { + // Surface as a typed failure so callers with a fallback (e.g. server-side planning) can + // recover. The exception carries the catalog-side TableInfo (with empty storageProperties) + // so the caller can still build a CatalogTable. + TableInfo withoutCreds = new TableInfo( + ucTableId, tableType, location, adapted, Collections.emptyMap()); + throw new CredentialFetchFailedException( + String.format("Credential fetch failed for table %s.%s.%s (HTTP %s): %s", + catalog, schema, name, e.getCode(), e.getResponseBody()), + e, withoutCreds); + } + return new TableInfo(ucTableId, tableType, location, adapted, storageProps); + } + private UCDeltaModels.StagingTableInfo toStagingTableInfo(StagingTableResponse r) { UCDeltaModels.TableType tableType = null; if (r.getTableType() != null) { @@ -354,7 +486,7 @@ private UCDeltaModels.StagingTableInfo toStagingTableInfo(StagingTableResponse r } return new UCDeltaModels.StagingTableInfo( - r.getTableId() != null ? r.getTableId().toString() : null, + r.getTableId(), tableType, r.getLocation(), toDeltaProtocol(r.getRequiredProtocol()), @@ -463,7 +595,7 @@ private void addMetadataUpdates( if (!Objects.equals(oldMetadata.getSchemaString(), newMetadata.getSchemaString())) { request.addUpdatesItem(new SetSchemaUpdate() .action("set-columns") - .columns(parseSchemaString(newMetadata.getSchemaString()))); + .columns(UCDeltaSchemaConverter.parseSchemaString(newMetadata.getSchemaString()))); } if (!Objects.equals(oldMetadata.getPartitionColumns(), newMetadata.getPartitionColumns())) { request.addUpdatesItem(new SetPartitionColumnsUpdate() @@ -508,32 +640,6 @@ private void addMetadataUpdates( } } - private StructType toSDKStructType(List columns) { - StructType structType = new StructType(); - for (ColumnDef col : columns) { - structType.addFieldsItem(new StructField() - .name(col.getName()) - .nullable(col.isNullable()) - .type(toSDKDeltaType(col))); - } - return structType; - } - - private PrimitiveType toSDKDeltaType(ColumnDef col) { - if (!PRIMITIVE_TYPE_NAMES.contains(col.getTypeName())) { - throw new UnsupportedOperationException( - "Complex column type '" + col.getTypeName() + "' for column '" + col.getName() + - "' is not yet supported. Only primitive types are supported."); - } - return new PrimitiveType().type(col.getTypeText()); - } - - private StructType parseSchemaString(String schemaString) { - // TODO: implement full Delta schema string -> StructType conversion - throw new UnsupportedOperationException( - "Delta schema string to StructType conversion is not yet implemented."); - } - // =========================== // Exception Handling // =========================== @@ -582,7 +688,10 @@ private static final class DeltaTableMetadata implements AbstractMetadata { @Override public String getId() { - return m.getTableUuid() != null ? m.getTableUuid().toString() : null; + // UC's loadTable response carries the UC table_uuid (exposed via TableInfo.getTableId), + // not the Delta Metadata.id. The Delta id only lives in the Delta log Metadata action and + // is never sent to UC; callers that need it must read the log. + return null; } @Override @@ -607,7 +716,7 @@ public Map getFormatOptions() { @Override public String getSchemaString() { - return m.getColumns() != null ? m.getColumns().toString() : null; + return UCDeltaSchemaConverter.serializeSchema(m.getColumns()); } @Override diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/exceptions/CredentialFetchFailedException.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/exceptions/CredentialFetchFailedException.java new file mode 100644 index 00000000000..35742005bd3 --- /dev/null +++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/exceptions/CredentialFetchFailedException.java @@ -0,0 +1,40 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.storage.commit.uccommitcoordinator.exceptions; + +import io.delta.storage.commit.uccommitcoordinator.UCDeltaClient; +import io.delta.storage.commit.uccommitcoordinator.UCDeltaModels.TableInfo; +import java.io.IOException; + +/** + * Thrown by {@link UCDeltaClient} when credential vending exhausts retries. Carries a + * cred-less {@link TableInfo} so callers with a fallback (e.g. SSP) can recover. + */ +public class CredentialFetchFailedException extends IOException { + + private final TableInfo tableInfoWithoutCredentials; + + public CredentialFetchFailedException( + String message, Throwable cause, TableInfo tableInfoWithoutCredentials) { + super(message, cause); + this.tableInfoWithoutCredentials = tableInfoWithoutCredentials; + } + + public TableInfo getTableInfoWithoutCredentials() { + return tableInfoWithoutCredentials; + } +} diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/exceptions/NoSuchTableException.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/exceptions/NoSuchTableException.java new file mode 100644 index 00000000000..c2f3d7c3d33 --- /dev/null +++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/exceptions/NoSuchTableException.java @@ -0,0 +1,33 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.storage.commit.uccommitcoordinator.exceptions; + +import java.io.IOException; + +/** + * Thrown by {@link io.delta.storage.commit.uccommitcoordinator.UCDeltaClient} operations when the + * catalog reports that the requested table does not exist (HTTP 404). + */ +public class NoSuchTableException extends IOException { + public NoSuchTableException(String message) { + super(message); + } + + public NoSuchTableException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/exceptions/UnsupportedTableFormatException.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/exceptions/UnsupportedTableFormatException.java new file mode 100644 index 00000000000..02bedc8a0a9 --- /dev/null +++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/exceptions/UnsupportedTableFormatException.java @@ -0,0 +1,34 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.storage.commit.uccommitcoordinator.exceptions; + +import java.io.IOException; + +/** + * Thrown when the catalog refuses to serve a non-Delta table. Callers should fall back to a + * non-Delta-REST-API load path. Emitted as HTTP 400 with {@code error.type = + * "UnsupportedTableFormatException"} on the wire. + */ +public class UnsupportedTableFormatException extends IOException { + public UnsupportedTableFormatException(String message) { + super(message); + } + + public UnsupportedTableFormatException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/storage/src/test/scala/io/delta/storage/commit/uccommitcoordinator/UCDeltaSchemaConverterSuite.scala b/storage/src/test/scala/io/delta/storage/commit/uccommitcoordinator/UCDeltaSchemaConverterSuite.scala new file mode 100644 index 00000000000..3c72978b7c6 --- /dev/null +++ b/storage/src/test/scala/io/delta/storage/commit/uccommitcoordinator/UCDeltaSchemaConverterSuite.scala @@ -0,0 +1,266 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.storage.commit.uccommitcoordinator + +import java.util.{Arrays, Collections} + +import com.fasterxml.jackson.databind.ObjectMapper +import io.unitycatalog.client.delta.model.{ArrayType, DeltaType, MapType, PrimitiveType} +import io.unitycatalog.client.delta.model.{StructField, StructType} + +import org.scalatest.funsuite.AnyFunSuite + +class UCDeltaSchemaConverterSuite extends AnyFunSuite { + + private val objectMapper = new ObjectMapper() + + private def prim(t: String): PrimitiveType = new PrimitiveType().`type`(t) + private def field(name: String, t: DeltaType, nullable: Boolean = true): StructField = + new StructField().name(name).nullable(nullable).`type`(t) + + // ---------------------------------------- + // serializeSchema + // ---------------------------------------- + + test("serializeSchema: null input returns null") { + assert(UCDeltaSchemaConverter.serializeSchema(null) === null) + } + + test("serializeSchema: empty struct produces an empty fields array") { + val json = UCDeltaSchemaConverter.serializeSchema(new StructType()) + val parsed = objectMapper.readTree(json) + assert(parsed.get("type").asText() === "struct") + assert(parsed.get("fields").isArray) + assert(parsed.get("fields").size() === 0) + } + + test("serializeSchema: primitives serialize as bare-string types (Delta wire format)") { + val s = new StructType() + .addFieldsItem(field("a", prim("integer"))) + .addFieldsItem(field("b", prim("string"), nullable = false)) + val parsed = objectMapper.readTree(UCDeltaSchemaConverter.serializeSchema(s)) + val fields = parsed.get("fields") + assert(fields.get(0).get("name").asText() === "a") + assert(fields.get(0).get("nullable").asBoolean() === true) + // bare string, not {"type":"integer"} + assert(fields.get(0).get("type").isTextual) + assert(fields.get(0).get("type").asText() === "integer") + assert(fields.get(1).get("name").asText() === "b") + assert(fields.get(1).get("nullable").asBoolean() === false) + assert(fields.get(1).get("type").asText() === "string") + } + + test("serializeSchema: decimal serializes as bare-string with parameters") { + val s = new StructType() + .addFieldsItem(field("d", prim("decimal(10,2)"))) + val parsed = objectMapper.readTree(UCDeltaSchemaConverter.serializeSchema(s)) + val t = parsed.get("fields").get(0).get("type") + assert(t.isTextual) + assert(t.asText() === "decimal(10,2)") + } + + test("serializeSchema: array uses camelCase keys (elementType, containsNull)") { + val arr = new ArrayType().elementType(prim("string")).containsNull(true) + val s = new StructType().addFieldsItem(field("a", arr)) + val parsed = objectMapper.readTree(UCDeltaSchemaConverter.serializeSchema(s)) + val t = parsed.get("fields").get(0).get("type") + assert(t.get("type").asText() === "array") + assert(t.get("elementType").asText() === "string") + assert(t.get("containsNull").asBoolean() === true) + // kebab-case keys should NOT appear + assert(!t.has("element-type")) + assert(!t.has("contains-null")) + } + + test("serializeSchema: map uses camelCase keys (keyType, valueType, valueContainsNull)") { + val m = new MapType() + .keyType(prim("string")) + .valueType(prim("integer")) + .valueContainsNull(false) + val s = new StructType().addFieldsItem(field("m", m)) + val parsed = objectMapper.readTree(UCDeltaSchemaConverter.serializeSchema(s)) + val t = parsed.get("fields").get(0).get("type") + assert(t.get("type").asText() === "map") + assert(t.get("keyType").asText() === "string") + assert(t.get("valueType").asText() === "integer") + assert(t.get("valueContainsNull").asBoolean() === false) + assert(!t.has("key-type")) + assert(!t.has("value-type")) + assert(!t.has("value-contains-null")) + } + + test("serializeSchema: nested array of map uses camelCase at every level") { + val inner = new MapType() + .keyType(prim("string")) + .valueType(prim("long")) + .valueContainsNull(true) + val outer = new ArrayType().elementType(inner).containsNull(false) + val s = new StructType().addFieldsItem(field("nested", outer)) + val parsed = objectMapper.readTree(UCDeltaSchemaConverter.serializeSchema(s)) + val arrType = parsed.get("fields").get(0).get("type") + assert(arrType.get("type").asText() === "array") + assert(arrType.get("containsNull").asBoolean() === false) + val mapType = arrType.get("elementType") + assert(mapType.get("type").asText() === "map") + assert(mapType.get("keyType").asText() === "string") + assert(mapType.get("valueType").asText() === "long") + assert(mapType.get("valueContainsNull").asBoolean() === true) + } + + // ---------------------------------------- + // parseSchemaString + // ---------------------------------------- + + test("parseSchemaString: throws UnsupportedOperationException (not yet implemented)") { + val e = intercept[UnsupportedOperationException] { + UCDeltaSchemaConverter.parseSchemaString("""{"type":"struct","fields":[]}""") + } + assert(e.getMessage.contains("not yet implemented")) + } + + // ---------------------------------------- + // toUCStructType (ColumnDef path) + // ---------------------------------------- + + private def col( + name: String, + typeName: String, + typeText: String = null, + nullable: Boolean = true): UCClient.ColumnDef = + new UCClient.ColumnDef( + name, typeName, if (typeText == null) typeName.toLowerCase else typeText, "{}", nullable, 0) + + test("toUCStructType: null input throws NullPointerException with descriptive message") { + val e = intercept[NullPointerException] { + UCDeltaSchemaConverter.toUCStructType(null) + } + assert(e.getMessage.contains("columns")) + } + + test("toUCStructType: empty list returns empty struct") { + val s = UCDeltaSchemaConverter.toUCStructType(Collections.emptyList()) + assert(s.getFields == null || s.getFields.isEmpty) + } + + test("toUCStructType: primitive columns map to StructFields with PrimitiveType") { + val cols = Arrays.asList( + col("a", "INT", "integer", nullable = false), + col("b", "STRING", "string", nullable = true), + col("c", "DECIMAL", "decimal(10,2)", nullable = true)) + val s = UCDeltaSchemaConverter.toUCStructType(cols) + assert(s.getFields.size() === 3) + val a = s.getFields.get(0) + assert(a.getName === "a") + assert(a.getNullable === false) + assert(a.getType.isInstanceOf[PrimitiveType]) + assert(a.getType.asInstanceOf[PrimitiveType].getType === "integer") + val c = s.getFields.get(2) + assert(c.getType.asInstanceOf[PrimitiveType].getType === "decimal(10,2)") + } + + test("toUCStructType: complex column type throws UnsupportedOperationException") { + val cols = Arrays.asList(col("a", "ARRAY", "array")) + val e = intercept[UnsupportedOperationException] { + UCDeltaSchemaConverter.toUCStructType(cols) + } + assert(e.getMessage.contains("Complex column type 'ARRAY'")) + assert(e.getMessage.contains("column 'a'")) + } + + test("toUCStructType: every primitive type name is accepted") { + val typeNames = Seq( + "BOOLEAN", "BYTE", "SHORT", "INT", "LONG", "FLOAT", "DOUBLE", + "DATE", "TIMESTAMP", "TIMESTAMP_NTZ", "STRING", "BINARY", "DECIMAL") + typeNames.foreach { tn => + val s = UCDeltaSchemaConverter.toUCStructType( + Arrays.asList(col("x", tn))) + assert(s.getFields.size() === 1) + } + } + + // ---------------------------------------- + // End-to-end: a complex schema with every supported shape, fully validated + // ---------------------------------------- + + test("serializeSchema: mixed schema preserves every field's name, type, and nullability") { + val arr = new ArrayType().elementType(prim("double")).containsNull(false) + val map = new MapType() + .keyType(prim("string")) + .valueType(prim("date")) + .valueContainsNull(true) + val arrOfMap = new ArrayType().elementType(map).containsNull(true) + + val s = new StructType() + .addFieldsItem(field("z_int", prim("integer"), nullable = false)) + .addFieldsItem(field("a_str", prim("string"))) + .addFieldsItem(field("dec", prim("decimal(38,18)"), nullable = false)) + .addFieldsItem(field("arr_dbl", arr)) + .addFieldsItem(field("map_sd", map, nullable = false)) + .addFieldsItem(field("nested", arrOfMap)) + + val parsed = objectMapper.readTree(UCDeltaSchemaConverter.serializeSchema(s)) + assert(parsed.get("type").asText() === "struct") + val fields = parsed.get("fields") + assert(fields.size() === 6) + + // Field ordering is preserved verbatim from the input list. + val expectedNames = Seq("z_int", "a_str", "dec", "arr_dbl", "map_sd", "nested") + for (i <- 0 until fields.size()) { + assert(fields.get(i).get("name").asText() === expectedNames(i)) + } + + // Primitive fields: bare-string type, correct nullability. + assert(fields.get(0).get("nullable").asBoolean() === false) + assert(fields.get(0).get("type").asText() === "integer") + assert(fields.get(1).get("nullable").asBoolean() === true) + assert(fields.get(1).get("type").asText() === "string") + assert(fields.get(2).get("nullable").asBoolean() === false) + assert(fields.get(2).get("type").asText() === "decimal(38,18)") + + // Array, non-nullable elements. + val arrJson = fields.get(3).get("type") + assert(arrJson.get("type").asText() === "array") + assert(arrJson.get("elementType").asText() === "double") + assert(arrJson.get("containsNull").asBoolean() === false) + + // Map, nullable values, non-nullable field itself. + val mapJson = fields.get(4).get("type") + assert(fields.get(4).get("nullable").asBoolean() === false) + assert(mapJson.get("type").asText() === "map") + assert(mapJson.get("keyType").asText() === "string") + assert(mapJson.get("valueType").asText() === "date") + assert(mapJson.get("valueContainsNull").asBoolean() === true) + + // Array> nested two deep, with the inner map's nullability flowing through. + val nestedArr = fields.get(5).get("type") + assert(nestedArr.get("type").asText() === "array") + assert(nestedArr.get("containsNull").asBoolean() === true) + val nestedMap = nestedArr.get("elementType") + assert(nestedMap.get("type").asText() === "map") + assert(nestedMap.get("keyType").asText() === "string") + assert(nestedMap.get("valueType").asText() === "date") + assert(nestedMap.get("valueContainsNull").asBoolean() === true) + + // Negative: confirm no kebab-case keys leaked through at any level. + val json = UCDeltaSchemaConverter.serializeSchema(s) + val kebabKeys = Seq( + "element-type", "contains-null", "key-type", "value-type", "value-contains-null") + kebabKeys.foreach { k => + assert(!json.contains("\"" + k + "\""), s"unexpected kebab-case key '$k'") + } + } +} diff --git a/storage/src/test/scala/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClientSuite.scala b/storage/src/test/scala/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClientSuite.scala index 47551d18ad7..c414534da8e 100644 --- a/storage/src/test/scala/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClientSuite.scala +++ b/storage/src/test/scala/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClientSuite.scala @@ -18,12 +18,13 @@ package io.delta.storage.commit.uccommitcoordinator import java.net.{InetSocketAddress, URI} import java.nio.charset.StandardCharsets -import java.util.{Collections, Optional, Set => JSet} +import java.util.{Collections, Optional, Set => JSet, UUID} import com.fasterxml.jackson.databind.ObjectMapper import com.sun.net.httpserver.{HttpExchange, HttpServer} import io.delta.storage.commit.{Commit, CommitFailedException, TableIdentifier} import io.delta.storage.commit.actions.{AbstractMetadata, AbstractProtocol} +import io.delta.storage.commit.uccommitcoordinator.exceptions.NoSuchTableException import io.delta.storage.commit.uniform.{IcebergMetadata, UniformMetadata} import io.unitycatalog.client.auth.TokenProvider @@ -37,7 +38,7 @@ class UCDeltaTokenBasedRestClientSuite with BeforeAndAfterAll with BeforeAndAfterEach { - private val testTableId = "550e8400-e29b-41d4-a716-446655440000" + private val testTableId = UUID.fromString("550e8400-e29b-41d4-a716-446655440000") private val testMetastoreId = "test-metastore-123" private val testCatalog = "cat" private val testSchema = "sch" @@ -77,9 +78,17 @@ class UCDeltaTokenBasedRestClientSuite // --------------- helpers --------------- private def loadTableJson( - tableUuid: String = testTableId, - format: String = "DELTA"): String = + tableUuid: UUID = testTableId, + format: String = "DELTA", + location: String = "s3://bucket/table", + tableType: String = "MANAGED"): String = s"""{"metadata":{"table-uuid":"$tableUuid","data-source-format":"$format",""" + + s""""table-type":"$tableType",""" + + s""""location":"$location",""" + + s""""columns":{"type":"struct","fields":[""" + + s"""{"name":"date","type":"string","nullable":true,"metadata":{}},""" + + s"""{"name":"value","type":"integer","nullable":true,"metadata":{}}""" + + s"""]},""" + s""""properties":{"key1":"val1"},"partition-columns":["date"],"created-time":1000}}""" private def readBody(exchange: HttpExchange): String = { @@ -183,26 +192,92 @@ class UCDeltaTokenBasedRestClientSuite // --------------- loadTable --------------- - test("loadTable returns AbstractMetadata with correct fields") { + test("loadTable returns TableInfo with catalog identity and Delta metadata") { withClient { c => - val m = c.loadTable(testCatalog, testSchema, testTable) + val info = c.loadTable(testIdentifier) + assert(info.getLocation === "s3://bucket/table") + assert(info.getTableId === testTableId) + assert(info.getTableType === UCDeltaModels.TableType.MANAGED) + val m = info.getMetadata assert(m.getName === testTable) - assert(m.getId === testTableId) + // UC's loadTable response does not carry the Delta Metadata.id; UC's table_uuid is exposed + // separately as TableInfo.getTableId. + assert(m.getId === null) assert(m.getProvider === "DELTA") assert(m.getConfiguration.get("key1") === "val1") assert(m.getPartitionColumns.get(0) === "date") assert(m.getCreatedTime === 1000L) + // Schema is a JSON string in Delta's wire format; parseable by Delta's schema readers. + val parsed = objectMapper.readTree(m.getSchemaString) + assert(parsed.get("type").asText() === "struct") + assert(parsed.get("fields").size() === 2) + assert(parsed.get("fields").get(0).get("name").asText() === "date") + assert(parsed.get("fields").get(1).get("type").asText() === "integer") + } + } + + test("loadTable schema emits Delta camelCase wire format for array and map") { + val nested = + s"""{"metadata":{"table-uuid":"$testTableId","data-source-format":"DELTA",""" + + s""""table-type":"MANAGED","location":"s3://b/t","partition-columns":[],""" + + s""""properties":{},"created-time":1,""" + + s""""columns":{"type":"struct","fields":[""" + + s"""{"name":"a","type":{"type":"array","element-type":"string","contains-null":true},""" + + s""""nullable":true,"metadata":{}},""" + + s"""{"name":"m","type":{"type":"map","key-type":"string","value-type":"integer",""" + + s""""value-contains-null":false},"nullable":true,"metadata":{}}""" + + s"""]}}}""" + deltaHandler = (exchange, _) => sendJson(exchange, HttpStatus.SC_OK, nested) + withClient { c => + val schema = c.loadTable(testIdentifier).getMetadata.getSchemaString + val parsed = objectMapper.readTree(schema) + val aType = parsed.get("fields").get(0).get("type") + assert(aType.get("type").asText() === "array") + assert(aType.get("elementType").asText() === "string") + assert(aType.get("containsNull").asBoolean() === true) + assert(aType.has("element-type") === false) + val mType = parsed.get("fields").get(1).get("type") + assert(mType.get("type").asText() === "map") + assert(mType.get("keyType").asText() === "string") + assert(mType.get("valueType").asText() === "integer") + assert(mType.get("valueContainsNull").asBoolean() === false) + assert(mType.has("key-type") === false) } } test("loadTable throws IOException on server error") { deltaHandler = (exchange, _) => sendJson(exchange, 500, """{"error":"fail"}""") withClient { c => - val e = intercept[java.io.IOException] { c.loadTable(testCatalog, testSchema, testTable) } + val e = intercept[java.io.IOException] { c.loadTable(testIdentifier) } assert(e.getMessage.contains("HTTP 500")) } } + test("loadTable throws NoSuchTableException on 404") { + deltaHandler = (exchange, _) => sendJson(exchange, 404, """{"error":"not found"}""") + withClient { c => + val e = intercept[NoSuchTableException] { + c.loadTable(testIdentifier) + } + assert(e.getMessage.contains(s"$testCatalog.$testSchema.$testTable")) + assert(e.getMessage.contains("not found")) + } + } + + test("loadTable throws UnsupportedTableFormatException on 400 with that error type") { + deltaHandler = (exchange, _) => sendJson( + exchange, 400, + """{"error":{"code":400,"type":"UnsupportedTableFormatException",""" + + s""""message":"Table is not a Delta table: ${testCatalog}.${testSchema}.${testTable}"}}""") + withClient { c => + val e = intercept[exceptions.UnsupportedTableFormatException] { + c.loadTable(testIdentifier) + } + assert(e.getMessage.contains(s"$testCatalog.$testSchema.$testTable")) + assert(e.getMessage.contains("not in Delta format")) + } + } + // --------------- commit via updateTable --------------- test("commit sends ASSERT_TABLE_UUID, ADD_COMMIT, and SET_LATEST_BACKFILLED_VERSION") { @@ -215,7 +290,7 @@ class UCDeltaTokenBasedRestClientSuite } withClient { c => - c.commit(testTableId, new URI("s3://bucket/table"), testIdentifier, + c.commit(testTableId.toString, new URI("s3://bucket/table"), testIdentifier, Optional.of(createCommit(5L)), Optional.of(java.lang.Long.valueOf(3L)), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty()) } @@ -224,7 +299,7 @@ class UCDeltaTokenBasedRestClientSuite val reqs = json.get("requirements") assert(reqs.size() === 1) assert(reqs.get(0).get("type").asText() === "assert-table-uuid") - assert(reqs.get(0).get("uuid").asText() === testTableId) + assert(reqs.get(0).get("uuid").asText() === testTableId.toString) val updates = json.get("updates") val actions = (0 until updates.size()).map(i => updates.get(i).get("action").asText()).toSet @@ -256,7 +331,7 @@ class UCDeltaTokenBasedRestClientSuite }) withClient { c => - c.commit(testTableId, new URI("s3://b/t"), testIdentifier, + c.commit(testTableId.toString, new URI("s3://b/t"), testIdentifier, Optional.of(createCommit(1L)), Optional.empty(), Optional.of(oldMeta), Optional.of(newMeta), Optional.empty(), Optional.empty(), Optional.empty()) @@ -285,7 +360,7 @@ class UCDeltaTokenBasedRestClientSuite java.util.Set.of("columnMapping"), java.util.Set.of("columnMapping", "v2Checkpoint")) withClient { c => - c.commit(testTableId, new URI("s3://b/t"), testIdentifier, + c.commit(testTableId.toString, new URI("s3://b/t"), testIdentifier, Optional.of(createCommit(1L)), Optional.empty(), Optional.empty(), Optional.empty(), Optional.of(oldProto), Optional.of(newProto), Optional.empty()) @@ -312,7 +387,7 @@ class UCDeltaTokenBasedRestClientSuite assert(m1 ne m2, "must be different objects") withClient { c => - c.commit(testTableId, new URI("s3://b/t"), testIdentifier, + c.commit(testTableId.toString, new URI("s3://b/t"), testIdentifier, Optional.of(createCommit(1L)), Optional.empty(), Optional.of(m1), Optional.of(m2), Optional.empty(), Optional.empty(), Optional.empty()) @@ -338,7 +413,7 @@ class UCDeltaTokenBasedRestClientSuite assert(p1 ne p2, "must be different objects") withClient { c => - c.commit(testTableId, new URI("s3://b/t"), testIdentifier, + c.commit(testTableId.toString, new URI("s3://b/t"), testIdentifier, Optional.of(createCommit(1L)), Optional.empty(), Optional.empty(), Optional.empty(), Optional.of(p1), Optional.of(p2), Optional.empty()) @@ -362,7 +437,7 @@ class UCDeltaTokenBasedRestClientSuite new IcebergMetadata("s3://bucket/v1.json", 42L, "1704337991423")) withClient { c => - c.commit(testTableId, new URI("s3://b/t"), testIdentifier, + c.commit(testTableId.toString, new URI("s3://b/t"), testIdentifier, Optional.of(createCommit(1L)), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.of(uniform)) @@ -392,7 +467,7 @@ class UCDeltaTokenBasedRestClientSuite new IcebergMetadata("s3://bucket/v1.json", 42L, "2025-01-04T03:13:11.423Z")) withClient { c => - c.commit(testTableId, new URI("s3://b/t"), testIdentifier, + c.commit(testTableId.toString, new URI("s3://b/t"), testIdentifier, Optional.of(createCommit(1L)), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.of(uniform)) @@ -420,7 +495,7 @@ class UCDeltaTokenBasedRestClientSuite } withClient { c => val e = intercept[CommitFailedException] { - c.commit(testTableId, new URI("s3://b/t"), testIdentifier, + c.commit(testTableId.toString, new URI("s3://b/t"), testIdentifier, Optional.of(createCommit(1L)), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty()) } @@ -439,7 +514,7 @@ class UCDeltaTokenBasedRestClientSuite } withClient { c => intercept[InvalidTargetTableException] { - c.commit(testTableId, new URI("s3://b/t"), testIdentifier, + c.commit(testTableId.toString, new URI("s3://b/t"), testIdentifier, Optional.of(createCommit(1L)), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty()) } @@ -513,7 +588,7 @@ class UCDeltaTokenBasedRestClientSuite test("getCommits throws UnsupportedOperationException") { withClient { c => intercept[UnsupportedOperationException] { - c.getCommits(testTableId, new URI("s3://b/t"), testIdentifier, + c.getCommits(testTableId.toString, new URI("s3://b/t"), testIdentifier, Optional.empty(), Optional.empty()) } } @@ -582,6 +657,8 @@ class UCDeltaTokenBasedRestClientSuite serverUri, tokenProvider(), Collections.emptyMap()) client.close() intercept[IllegalStateException] { client.getMetastoreId() } - intercept[IllegalStateException] { client.loadTable("c", "s", "t") } + intercept[IllegalStateException] { + client.loadTable(new TableIdentifier("c", "s", "t")) + } } }