Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -480,16 +480,11 @@ lazy val sparkV2 = {
libraryDependencies ++= Seq(
"io.delta" % "delta-kernel-api" % v,
"io.delta" % "delta-kernel-defaults" % v,
"io.delta" % "delta-kernel-unitycatalog" % v
),
// Kernel main classes are pulled from Maven at version `v`, but several
// sparkV2 tests depend on test-only helpers (e.g. InMemoryUCClient,
// UCCatalogManagedTestUtils) that are not published. Build those test
// jars from the in-tree kernel sources and add them to the test classpath.
Test / unmanagedJars ++= Seq(
(kernelApi / Test / packageBin).value,
(kernelDefaults / Test / packageBin).value,
(kernelUnityCatalog / Test / packageBin).value
"io.delta" % "delta-kernel-unitycatalog" % v,
Copy link
Copy Markdown
Collaborator Author

@TimothyW553 TimothyW553 May 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is needed because in Maven mode (-DkernelVersion=...), sparkV2 can't reach kernel-unitycatalog's test helpers (InMemoryUCClient, UCCatalogManagedTestUtils) via the source-mode test->test dep, so we publish and consume them as a -tests classifier jar -- same pattern kernelApi already uses.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as the change below.

// sparkV2 tests depend on UC test helpers (InMemoryUCClient,
// UCCatalogManagedTestUtils) that live in kernel-unitycatalog's test sources.
// Consume them via the published tests-classifier jar.
"io.delta" % "delta-kernel-unitycatalog" % v % Test classifier "tests"
)
)
}
Expand Down Expand Up @@ -1169,6 +1164,11 @@ lazy val kernelUnityCatalog = (project in file("kernel/unitycatalog"))
// Publish the pinned UC jars before sbt tries to resolve them.
update := update.dependsOn(ensurePinnedUnityCatalog).value,

// Also publish a test-jar (classifier = "tests") so consumers (e.g. sparkV2 in
// Maven mode) can depend on UC test helpers (InMemoryUCClient,
// UCCatalogManagedTestUtils) via a published artifact.
Test / publishArtifact := true,

// Put the shaded kernel-api JAR on the classpath (compile & test)
Compile / unmanagedJars += (kernelApi / Compile / packageBin).value,
Test / unmanagedJars += (kernelApi / Compile / packageBin).value,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.delta.kernel.transaction.CreateTableTransactionBuilder;
import io.delta.kernel.types.StructType;
import io.delta.kernel.unitycatalog.UCCatalogManagedClient;
import io.delta.kernel.unitycatalog.UCTableIdentifier;
import io.delta.kernel.utils.CloseableIterable;
import io.delta.storage.commit.uccommitcoordinator.UCClient;
import io.delta.storage.commit.uccommitcoordinator.UCTokenBasedRestClient;
Expand Down Expand Up @@ -72,6 +73,12 @@ public class CatalogManagedTable extends AbstractKernelTable {
protected transient UCClient ucClient;
protected transient UCCatalogManagedClient catalogManagedClient;

private UCTableIdentifier getUcTableIdentifier() {
Preconditions.checkArgument(
catalog instanceof UnityCatalog, "Catalog-managed tables require a UnityCatalog catalog");
return ((UnityCatalog) catalog).toUcTableIdentifier(tableId);
}

public CatalogManagedTable(
DeltaCatalog catalog, String tableId, Map<String, String> conf, URI endpoint, String token) {
this(catalog, tableId, conf, null, null, endpoint, token);
Expand Down Expand Up @@ -141,6 +148,7 @@ protected Snapshot loadLatestSnapshot() {
/* engine */ getEngine(),
/* ucTableId */ tableUUID,
/* tablePath */ tablePath.toString(),
/* ucTableIdentifier */ getUcTableIdentifier(),
/* versionOpt */ Optional.empty(),
/* timestampOpt */ Optional.empty()));
}
Expand Down Expand Up @@ -182,6 +190,7 @@ protected boolean versionExists(Long version) {
/* engine */ getEngine(),
/* ucTableId */ getTableUUID(),
/* tablePath */ getTablePath().toString(),
/* ucTableIdentifier */ getUcTableIdentifier(),
/* startVersionOpt */ Optional.of(version),
/* startTimestampOpt */ Optional.empty(),
/* endVersionOpt */ Optional.empty(),
Expand Down
47 changes: 31 additions & 16 deletions flink/src/main/java/io/delta/flink/table/UnityCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import dev.failsafe.function.CheckedSupplier;
import io.delta.kernel.internal.types.DataTypeJsonSerDe;
import io.delta.kernel.types.*;
import io.delta.kernel.unitycatalog.UCTableIdentifier;
import io.unitycatalog.client.ApiClient;
import io.unitycatalog.client.ApiClientBuilder;
import io.unitycatalog.client.ApiException;
Expand Down Expand Up @@ -231,6 +232,33 @@ public ApiClient getApiClient() {
return apiClient;
}

/**
* Parses {@code schema.table} or {@code catalog.schema.table} into a {@link UCTableIdentifier}.
* In the 2-part form the catalog defaults to this catalog's name; in the 3-part form the leading
* segment must equal this catalog's name.
*/
UCTableIdentifier toUcTableIdentifier(String qualifiedTableName) {
String[] namespaces = qualifiedTableName.split("\\.");
Preconditions.checkArgument(namespaces.length == 2 || namespaces.length == 3);
String catalogName;
String schemaName;
String tableName;
if (namespaces.length == 3) {
Preconditions.checkArgument(
namespaces[0].equals(getName()),
String.format(
"table's catalog name %s must match catalog's name %s", namespaces[0], getName()));
catalogName = namespaces[0];
schemaName = namespaces[1];
tableName = namespaces[2];
} else {
catalogName = getName();
schemaName = namespaces[0];
tableName = namespaces[1];
}
return new UCTableIdentifier(catalogName, schemaName, tableName);
}

@Override
public void open() {
if (apiClient == null) {
Expand Down Expand Up @@ -329,22 +357,9 @@ public void createTable(
() -> {
TablesApi tablesApi = new TablesApi(apiClient);
// Obtain names
String[] namespaces = tableId.split("\\.");
Preconditions.checkArgument(namespaces.length == 2 || namespaces.length == 3);
String schemaName;
String tableName;
if (namespaces.length == 3) {
Preconditions.checkArgument(
namespaces[0].equals(getName()),
String.format(
"table's catalog name %s must match catalog's name %s",
namespaces[0], getName()));
schemaName = namespaces[1];
tableName = namespaces[2];
} else {
schemaName = namespaces[0];
tableName = namespaces[1];
}
UCTableIdentifier tableIdentifier = toUcTableIdentifier(tableId);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: okay, so this tableId is table identifier, while in this PR https://github.com/delta-io/delta/pull/6796/changes#diff-3faa0eba650a9174ae36b64cd5c3ba38387988ccf586b119bb1386592442dc1aR108 , @yili-db and I discussed to use the tableId to represent table uuid. I think we may need to unify the naming for all the code.

not a blocker for the PR, just a comment here, for knowing the context.

String schemaName = tableIdentifier.getSchemaName();
String tableName = tableIdentifier.getTableName();
// Column Info
List<ColumnInfo> columnInfos =
IntStream.range(0, schema.fields().size())
Expand Down
26 changes: 26 additions & 0 deletions flink/src/test/java/io/delta/flink/table/UnityCatalogTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,42 @@
package io.delta.flink.table;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

import io.delta.flink.MockHttp;
import io.delta.flink.TestHelper;
import io.delta.kernel.types.*;
import io.delta.kernel.unitycatalog.UCTableIdentifier;
import java.net.URI;
import org.junit.jupiter.api.Test;

/** JUnit 6 test suite for UnityCatalog. */
class UnityCatalogTest extends TestHelper {

@Test
void testToUcTableIdentifier() {
UnityCatalog catalog = new UnityCatalog("main", URI.create("http://localhost"), "");

UCTableIdentifier twoPartIdentifier = catalog.toUcTableIdentifier("default.tbl");
assertEquals("main", twoPartIdentifier.getCatalogName());
assertEquals("default", twoPartIdentifier.getSchemaName());
assertEquals("tbl", twoPartIdentifier.getTableName());

UCTableIdentifier threePartIdentifier = catalog.toUcTableIdentifier("main.default.tbl");
assertEquals("main", threePartIdentifier.getCatalogName());
assertEquals("default", threePartIdentifier.getSchemaName());
assertEquals("tbl", threePartIdentifier.getTableName());
}

@Test
void testToUcTableIdentifierRejectsInvalidNames() {
UnityCatalog catalog = new UnityCatalog("main", URI.create("http://localhost"), "");

assertThrows(IllegalArgumentException.class, () -> catalog.toUcTableIdentifier("tbl"));
assertThrows(
IllegalArgumentException.class, () -> catalog.toUcTableIdentifier("other.default.tbl"));
}

@Test
void testGetTable() {
withTempDir(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.delta.kernel.internal.util.FileNames;
import io.delta.kernel.unitycatalog.InMemoryUCClient;
import io.delta.kernel.unitycatalog.UCCatalogManagedCommitter;
import io.delta.kernel.unitycatalog.UCTableIdentifier;
import io.delta.kernel.utils.FileStatus;
import io.delta.storage.commit.Commit;
import java.io.File;
Expand All @@ -43,6 +44,9 @@
* <pre>{@code
* {
* "uc_table_id": "12345678-1234-1234-1234-123456789abc",
* "catalog_name": "benchmark_catalog",
* "schema_name": "benchmark_schema",
* "table_name": "benchmark_table",
* "max_ratified_version": 2,
* "log_tail": [
* {
Expand Down Expand Up @@ -141,6 +145,18 @@ public Commit toCommit(Engine engine, String tableRoot) throws IOException {
@JsonProperty(value = "uc_table_id", required = true)
private String ucTableId;

/** The Unity Catalog catalog name. */
@JsonProperty(value = "catalog_name", required = true)
private String catalogName;

/** The Unity Catalog schema name. */
@JsonProperty(value = "schema_name", required = true)
private String schemaName;

/** The Unity Catalog table name. */
@JsonProperty(value = "table_name", required = true)
private String tableName;

/** The maximum ratified version for this table in Unity Catalog. */
@JsonProperty(value = "max_ratified_version", required = true)
private long maxRatifiedVersion;
Expand All @@ -167,6 +183,11 @@ public String getUcTableId() {
return ucTableId;
}

/** @return the three-part Unity Catalog table identifier */
public UCTableIdentifier getUcTableIdentifier() {
return new UCTableIdentifier(catalogName, schemaName, tableName);
}

/**
* Creates an InMemoryUCClient for this table with the staged commits pre-loaded.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ protected Snapshot loadSnapshot(Engine engine, TableInfo tableInfo, Optional<Lon
engine,
ucCatalogInfo.getUcTableId(),
tableUri.toString(),
ucCatalogInfo.getUcTableIdentifier(),
versionOpt,
Optional.empty() /* timestampOpt */);
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
{
"uc_table_id": "12345678-1234-1234-1234-123456789abc",
"catalog_name": "benchmark_catalog",
"schema_name": "benchmark_schema",
"table_name": "benchmark_table",
"max_ratified_version": 3,
"log_tail": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.delta.kernel.unitycatalog.metrics.UcLoadSnapshotTelemetry;
import io.delta.storage.commit.Commit;
import io.delta.storage.commit.GetCommitsResponse;
import io.delta.storage.commit.TableIdentifier;
import io.delta.storage.commit.uccommitcoordinator.UCClient;
import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorException;
import java.io.IOException;
Expand Down Expand Up @@ -82,6 +83,7 @@ public UCCatalogManagedClient(UCClient ucClient) {
* @param engine The Delta Kernel {@link Engine} to use for loading the table.
* @param ucTableId The Unity Catalog table ID, which is a unique identifier for the table in UC.
* @param tablePath The path to the Delta table in the underlying storage system.
* @param ucTableIdentifier The three-part Unity Catalog table identifier.
* @param versionOpt The optional version to time-travel to when loading the table. This must be
* mutually exclusive with timestampOpt.
* @param timestampOpt The optional timestamp to time-travel to when loading the table. This must
Expand All @@ -93,11 +95,13 @@ public Snapshot loadSnapshot(
Engine engine,
String ucTableId,
String tablePath,
UCTableIdentifier ucTableIdentifier,
Optional<Long> versionOpt,
Optional<Long> timestampOpt) {
Objects.requireNonNull(engine, "engine is null");
Objects.requireNonNull(ucTableId, "ucTableId is null");
Objects.requireNonNull(tablePath, "tablePath is null");
Objects.requireNonNull(ucTableIdentifier, "ucTableIdentifier is null");
Objects.requireNonNull(versionOpt, "versionOpt is null");
Objects.requireNonNull(timestampOpt, "timestampOpt is null");
versionOpt.ifPresent(version -> checkArgument(version >= 0, "version must be non-negative"));
Expand All @@ -122,7 +126,13 @@ public Snapshot loadSnapshot(
() -> {
final GetCommitsResponse response =
metricsCollector.getCommitsTimer.timeChecked(
() -> getRatifiedCommitsFromUC(ucTableId, tablePath, versionOpt));
() ->
getRatifiedCommitsFromUC(
ucTableId,
tablePath,
versionOpt,
UCCatalogManagedCommitter.toStorageTableIdentifier(
ucTableIdentifier)));

metricsCollector.setNumCatalogCommits(response.getCommits().size());

Expand Down Expand Up @@ -249,6 +259,7 @@ public CreateTableTransactionBuilder buildCreateTableTransaction(
* @param engine The Delta Kernel {@link Engine} to use for loading the table.
* @param ucTableId The Unity Catalog table ID, which is a unique identifier for the table in UC.
* @param tablePath The path to the Delta table in the underlying storage system.
* @param ucTableIdentifier The three-part Unity Catalog table identifier.
* @param startVersionOpt The optional start version boundary. This must be mutually exclusive
* with startTimestampOpt. Either this or startTimestampOpt must be provided.
* @param startTimestampOpt The optional start timestamp boundary. This must be mutually exclusive
Expand All @@ -267,13 +278,15 @@ public CommitRange loadCommitRange(
Engine engine,
String ucTableId,
String tablePath,
UCTableIdentifier ucTableIdentifier,
Optional<Long> startVersionOpt,
Optional<Long> startTimestampOpt,
Optional<Long> endVersionOpt,
Optional<Long> endTimestampOpt) {
Objects.requireNonNull(engine, "engine is null");
Objects.requireNonNull(ucTableId, "ucTableId is null");
Objects.requireNonNull(tablePath, "tablePath is null");
Objects.requireNonNull(ucTableIdentifier, "ucTableIdentifier is null");
Objects.requireNonNull(startVersionOpt, "startVersionOpt is null");
Objects.requireNonNull(startTimestampOpt, "startTimestampOpt is null");
Objects.requireNonNull(endVersionOpt, "endVersionOpt is null");
Expand Down Expand Up @@ -308,7 +321,11 @@ public CommitRange loadCommitRange(
Optional<Long> endVersionOptForCommitQuery =
endVersionOpt.filter(v -> !startTimestampOpt.isPresent());
final GetCommitsResponse response =
getRatifiedCommitsFromUC(ucTableId, tablePath, endVersionOptForCommitQuery);
getRatifiedCommitsFromUC(
ucTableId,
tablePath,
endVersionOptForCommitQuery,
UCCatalogManagedCommitter.toStorageTableIdentifier(ucTableIdentifier));
final long ucTableVersion = response.getLatestTableVersion();
validateVersionBoundariesExist(ucTableId, startVersionOpt, endVersionOpt, ucTableVersion);
final List<ParsedLogData> logData =
Expand Down Expand Up @@ -413,7 +430,11 @@ private String getCommitRangeBoundariesString(
}

private GetCommitsResponse getRatifiedCommitsFromUC(
String ucTableId, String tablePath, Optional<Long> versionOpt) {
String ucTableId,
String tablePath,
Optional<Long> versionOpt,
TableIdentifier tableIdentifier) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if all the caller need to conver the UCTableIdentifier into the TableIdentifier, then a good approach for us may be, define the UCTableIdentifier directly in the getRatifiedCommitsFromUC, and push conversion inside the getRatifiedCommitsFromUC impl . but not a blocker for this PR.

Objects.requireNonNull(tableIdentifier, "tableIdentifier is null");
logger.info(
"[{}] Invoking the UCClient to get ratified commits at version {}",
ucTableId,
Expand All @@ -430,6 +451,7 @@ private GetCommitsResponse getRatifiedCommitsFromUC(
return ucClient.getCommits(
ucTableId,
new Path(tablePath).toUri(),
tableIdentifier,
Optional.empty() /* startVersion */,
versionOpt /* endVersion */);
} catch (IOException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import static java.util.Objects.requireNonNull;

/** Logical Unity Catalog table identifier used for create-time finalization. */
/** Logical Unity Catalog table identifier used for table lifecycle and read operations. */
public final class UCTableIdentifier {
private final String catalogName;
private final String schemaName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ class InMemoryUCClient(ucMetastoreId: String) extends UCClient {
override def getCommits(
tableId: String,
tableUri: URI,
tableIdentifier: TableIdentifier,
startVersion: Optional[JLong],
endVersion: Optional[JLong]): GetCommitsResponse = {
val tableData = getTableDataElseThrow(tableId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@ class InMemoryUCClientSuite extends AnyFunSuite with UCCatalogManagedTestUtils {
endVersionOpt: Optional[JLong],
expectedVersions: Seq[Long]): Unit = {
val client = getInMemoryUCClientWithCommitsForTableId("tableId", allVersions)
val response = client.getCommits("tableId", fakeURI, startVersionOpt, endVersionOpt)
val response = client.getCommits(
"tableId",
fakeURI,
/* tableIdentifier = */ null,
startVersionOpt,
endVersionOpt)
val actualVersions = response.getCommits.asScala.map(_.getVersion)

assert(actualVersions == expectedVersions)
Expand Down Expand Up @@ -82,7 +87,12 @@ class InMemoryUCClientSuite extends AnyFunSuite with UCCatalogManagedTestUtils {
test("getCommits throws InvalidTargetTableException for non-existent table") {
val client = new InMemoryUCClient("ucMetastoreId")
val exception = intercept[InvalidTargetTableException] {
client.getCommits("abcd", new URI("s3://bucket/table"), Optional.empty(), Optional.empty())
client.getCommits(
"abcd",
new URI("s3://bucket/table"),
/* tableIdentifier = */ null,
Optional.empty(),
Optional.empty())
}
assert(exception.getMessage.contains(s"Table not found: abcd"))
}
Expand Down
Loading
Loading