Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,23 @@ val unityCatalogVersion: String = sys.props.getOrElse(
if (useDefaultUnityCatalogReleaseVersion) defaultUnityCatalogReleaseVersion
else unityCatalogReleaseVersion.getOrElse(pinnedUnityCatalogVersion))

/**
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move this into the file that defines the unityCatalogReleaseVersion??

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unityCatalogReleaseVersion is defined in this file.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aah .. i thought we have made a separate utility file. i think we should do that and remove all this crud from this master file... but that can happen in a later pr.

* Returns true when `current` is at least `target`. Numeric segments only; suffix after
* the first `-` (e.g. `-SNAPSHOT-abc1234`) is stripped before comparison.
*/
def isAtLeastVersion(current: String, target: String): Boolean = {
def parts(v: String): Seq[Int] =
v.takeWhile(_ != '-').split('.').iterator
.map(p => scala.util.Try(p.toInt).getOrElse(0)).toSeq
val cur = parts(current)
val tgt = parts(target)
val n = math.max(cur.length, tgt.length)
(0 until n).iterator
.map(i => (cur.lift(i).getOrElse(0), tgt.lift(i).getOrElse(0)))
.find { case (a, b) => a != b }
.forall { case (a, b) => a >= b }
}

val sparkUnityCatalogJacksonVersion = "2.15.4" // We are using Spark 4.0's Jackson version 2.15.x, to override Unity Catalog 0.3.0's version 2.18.x

// Publishes the pinned UC jars to ~/.ivy2/local if they're not already cached there. Hooked
Expand Down Expand Up @@ -1217,6 +1234,19 @@ lazy val storage = (project in file("storage"))
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
// Jackson datatype module needed for UC SDK tests (excluded from main compile scope)
"com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % "2.15.4" % "test",
) ++ (
// unitycatalog-hadoop ships from UC 0.5.0 onward; older versions don't publish the
// artifact, so resolving it would fail. Used by UCDeltaTokenBasedRestClient for
// credential vending via UCCredentialHadoopConfs.
if (isAtLeastVersion(unityCatalogVersion, "0.5.0")) {
Comment thread
yili-db marked this conversation as resolved.
Seq("io.unitycatalog" % "unitycatalog-hadoop" % unityCatalogVersion excludeAll(
ExclusionRule(organization = "org.openapitools"),
ExclusionRule(organization = "com.fasterxml.jackson.core"),
ExclusionRule(organization = "com.fasterxml.jackson.module"),
ExclusionRule(organization = "com.fasterxml.jackson.datatype"),
ExclusionRule(organization = "com.fasterxml.jackson.dataformat")
))
} else Nil
),

// Publish the pinned UC jars before sbt tries to resolve them. storage is the transitive
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

package io.delta.storage.commit.uccommitcoordinator;

import io.delta.storage.commit.TableIdentifier;
import io.delta.storage.commit.actions.AbstractMetadata;
import io.delta.storage.commit.uccommitcoordinator.UCDeltaModels.StagingTableInfo;
import io.delta.storage.commit.uccommitcoordinator.UCDeltaModels.TableInfo;
import java.io.IOException;
import java.util.List;
import java.util.Map;
Expand All @@ -31,13 +33,12 @@ public interface UCDeltaClient extends UCClient {
/**
* Loads a table's metadata from Unity Catalog.
*
* @param catalog the catalog name
* @param schema the schema name
* @param table the table name
* @return the table's {@link AbstractMetadata}
* @param tableIdentifier catalog + schema namespace and table name
* @return the table's {@link TableInfo}, carrying the catalog-supplied storage location and
* metadata
* @throws IOException on network or API errors
*/
AbstractMetadata loadTable(String catalog, String schema, String table) throws IOException;
TableInfo loadTable(TableIdentifier tableIdentifier) throws IOException;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the parameters, should we use the catalog, schema, table directly, rather than the TableIdentifier, so that we can keep align with all the existing public methods definition in this interface ?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to align with:

  @Override
  public void commit(
      String tableId,
      URI tableUri,
      TableIdentifier tableIdentifier,


/**
* Reserves a staging slot for a new Delta table. The returned response contains the table ID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@

package io.delta.storage.commit.uccommitcoordinator;

import io.delta.storage.commit.actions.AbstractMetadata;
import io.delta.storage.commit.actions.AbstractProtocol;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;

/**
* Delta-owned models for the UC Delta REST Catalog API. These decouple the {@link UCDeltaClient}
* Delta-owned models for the UC Delta REST API. These decouple the {@link UCDeltaClient}
* interface from any generated SDK types.
*/
public final class UCDeltaModels {
Expand Down Expand Up @@ -101,9 +103,54 @@ public int hashCode() {
}
}

/** Result of {@link UCDeltaClient#loadTable}. */
public static final class TableInfo {

private final UUID tableId;
private final TableType tableType;
private final String location;
private final AbstractMetadata metadata;
private final Map<String, String> storageProperties;

public TableInfo(
UUID tableId,
TableType tableType,
String location,
AbstractMetadata metadata,
Map<String, String> storageProperties) {
this.tableId = tableId;
this.tableType = tableType;
this.location = location;
this.metadata = metadata;
this.storageProperties = storageProperties;
}

/** UC's {@code table_uuid}; distinct from {@link AbstractMetadata#getId()} (the Delta id). */
public UUID getTableId() {
return tableId;
}

public TableType getTableType() {
return tableType;
}

public String getLocation() {
return location;
}

public AbstractMetadata getMetadata() {
return metadata;
}

/** Hadoop-style storage options (e.g. catalog-vended credentials). */
public Map<String, String> getStorageProperties() {
return storageProperties == null ? Collections.emptyMap() : storageProperties;
}
}

public static final class StagingTableInfo {

private final String tableId;
private final UUID tableId;
private final TableType tableType;
private final String location;
private final DeltaProtocol requiredProtocol;
Expand All @@ -112,7 +159,7 @@ public static final class StagingTableInfo {
private final Map<String, String> suggestedProperties;

public StagingTableInfo(
String tableId,
UUID tableId,
TableType tableType,
String location,
DeltaProtocol requiredProtocol,
Expand All @@ -128,7 +175,7 @@ public StagingTableInfo(
this.suggestedProperties = suggestedProperties;
}

public String getTableId() {
public UUID getTableId() {
return tableId;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Copyright (2026) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.storage.commit.uccommitcoordinator;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSetter;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.unitycatalog.client.JSON;
import io.unitycatalog.client.delta.model.ArrayType;
import io.unitycatalog.client.delta.model.DeltaType;
import io.unitycatalog.client.delta.model.MapType;
import io.unitycatalog.client.delta.model.PrimitiveType;
import io.unitycatalog.client.delta.model.StructField;
import io.unitycatalog.client.delta.model.StructType;
import io.unitycatalog.client.delta.serde.DeltaTypeModule;

import java.util.List;
import java.util.Objects;
import java.util.Set;

/**
* Conversion from the UC SDK's Delta schema model ({@link StructType}, {@link DeltaType},
* {@link PrimitiveType}) to Delta's JSON schema wire format. The reverse direction
* ({@link #parseSchemaString}) is reserved for a follow-up and currently throws.
*
* <p>The SDK's default {@link ObjectMapper} (from {@link JSON#getDefault()}) does not emit
* Delta's wire format on its own:
* <ul>
* <li>{@link PrimitiveType} serializes as {@code {"type":"integer"}} by default, but Delta
* expects a bare string ({@code "integer"}). {@link DeltaTypeModule} provides custom
* serializers/deserializers that flatten primitives (and decimal) to bare strings.</li>
* <li>The SDK uses kebab-case JSON keys for nested types ({@code element-type},
* {@code contains-null}, {@code key-type}, etc.). Delta's wire format uses camelCase
* ({@code elementType}, {@code containsNull}, {@code keyType}, ...). The
* {@link CamelCaseArrayMixin} / {@link CamelCaseMapMixin} mixins rename those keys via
* Jackson's {@link com.fasterxml.jackson.databind.ObjectMapper#addMixIn} mechanism.</li>
* </ul>
* The resulting JSON is parseable by Delta's schema readers (e.g. {@code DataType.fromJson}).
*/
final class UCDeltaSchemaConverter {

/** Primitive type names that the legacy create-table path ({@link #toUCStructType}) accepts. */
private static final Set<String> PRIMITIVE_TYPE_NAMES = Set.of(
"BOOLEAN", "BYTE", "SHORT", "INT", "LONG", "FLOAT", "DOUBLE",
"DATE", "TIMESTAMP", "TIMESTAMP_NTZ", "STRING", "BINARY", "DECIMAL");

/**
* Singleton mapper preconfigured to emit Delta's wire format
* (bare-string primitives, camelCase keys for nested types). See class-level docs.
*/
private static final ObjectMapper DELTA_SCHEMA_MAPPER = createDeltaSchemaMapper();

private UCDeltaSchemaConverter() {}

/**
* Serializes the SDK's {@link StructType} to Delta's JSON schema wire format. The resulting
* string is parseable by Delta's schema readers (e.g. {@code DataType.fromJson}).
*
* @return JSON string, or {@code null} if {@code columns} is {@code null}.
* @throws IllegalStateException if Jackson fails to serialize.
*/
static String serializeSchema(StructType columns) {
if (columns == null) {
return null;
}
try {
return DELTA_SCHEMA_MAPPER.writeValueAsString(columns);
} catch (JsonProcessingException e) {
throw new IllegalStateException("Failed to serialize UC schema to Delta JSON", e);
}
}

/**
* Parses a Delta JSON schema string into the SDK's {@link StructType}. Reserved for the
* UpdateTable schema-diff path; not yet implemented.
*/
static StructType parseSchemaString(String schemaString) {
// TODO: implement full Delta schema string -> StructType conversion
throw new UnsupportedOperationException(
"Delta schema string to StructType conversion is not yet implemented.");
}

/**
* Converts the legacy create-table path's {@link UCClient.ColumnDef} list into the UC SDK's
* {@link StructType}. Only primitive types are supported today; complex types throw
* {@link UnsupportedOperationException}.
*
* @throws NullPointerException if {@code columns} is {@code null}.
*/
static StructType toUCStructType(List<UCClient.ColumnDef> columns) {
Objects.requireNonNull(columns, "columns must not be null");
StructType structType = new StructType();
for (UCClient.ColumnDef col : columns) {
structType.addFieldsItem(new StructField()
.name(col.getName())
.nullable(col.isNullable())
.type(toUCDeltaType(col)));
}
return structType;
}

private static PrimitiveType toUCDeltaType(UCClient.ColumnDef col) {
if (!PRIMITIVE_TYPE_NAMES.contains(col.getTypeName())) {
throw new UnsupportedOperationException(
"Complex column type '" + col.getTypeName() + "' for column '" + col.getName() +
"' is not yet supported. Only primitive types are supported.");
}
return new PrimitiveType().type(col.getTypeText());
}

private static ObjectMapper createDeltaSchemaMapper() {
// Copy the SDK's default mapper so we inherit its base config (visibility, naming, etc.)
// without mutating the shared instance.
ObjectMapper m = JSON.getDefault().getMapper().copy();
// DeltaTypeModule flattens PrimitiveType/DecimalType into bare type-name strings.
m.registerModule(new DeltaTypeModule());
// The SDK ships ArrayType/MapType with kebab-case JSON keys; Delta's wire format uses
// camelCase. Mixins rewrite the property names without modifying the generated SDK
// classes themselves.
m.addMixIn(ArrayType.class, CamelCaseArrayMixin.class);
m.addMixIn(MapType.class, CamelCaseMapMixin.class);
return m;
}

/**
* Jackson mixin that renames {@link ArrayType}'s JSON keys from kebab-case to camelCase
* (matching Delta's wire format).
*
* <p>The class is {@code abstract} and the methods abstract because Jackson never
* instantiates the mixin: it only inspects annotated method signatures and projects the
* annotations onto the target class. Making the class abstract makes that contract
* explicit and avoids a no-op constructor.
*/
private abstract static class CamelCaseArrayMixin {
@JsonProperty("elementType")
abstract DeltaType getElementType();
@JsonSetter("elementType")
abstract void setElementType(DeltaType v);
@JsonProperty("containsNull")
abstract Boolean getContainsNull();
@JsonSetter("containsNull")
abstract void setContainsNull(Boolean v);
}

/**
* Jackson mixin that renames {@link MapType}'s JSON keys from kebab-case to camelCase
* (matching Delta's wire format). See {@link CamelCaseArrayMixin} for the mixin pattern.
*/
private abstract static class CamelCaseMapMixin {
@JsonProperty("keyType")
abstract DeltaType getKeyType();
@JsonSetter("keyType")
abstract void setKeyType(DeltaType v);
@JsonProperty("valueType")
abstract DeltaType getValueType();
@JsonSetter("valueType")
abstract void setValueType(DeltaType v);
@JsonProperty("valueContainsNull")
abstract Boolean getValueContainsNull();
@JsonSetter("valueContainsNull")
abstract void setValueContainsNull(Boolean v);
}
Copy link
Copy Markdown
Contributor

@tdas tdas May 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no bidirectional test suite for this?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added testsuite UCDeltaSchemaConverterSuite.scala

}
Loading
Loading