From 153b88af95fb97b4607a17ae272120857bec0f5e Mon Sep 17 00:00:00 2001 From: Timothy Wang Date: Tue, 19 May 2026 00:21:55 +0000 Subject: [PATCH 1/5] [Storage] Implement UC token-based getCommits --- .../UCDeltaTokenBasedRestClient.java | 87 ++++++++++++++- .../UCDeltaTokenBasedRestClientSuite.scala | 105 +++++++++++++++++- 2 files changed, 185 insertions(+), 7 deletions(-) diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClient.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClient.java index 5e15c09aeca..1c089cfbbf5 100644 --- a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClient.java +++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClient.java @@ -18,6 +18,7 @@ import io.delta.storage.commit.Commit; import io.delta.storage.commit.CommitFailedException; +import io.delta.storage.commit.CoordinatedCommitsUtils; import io.delta.storage.commit.GetCommitsResponse; import io.delta.storage.commit.TableIdentifier; import io.delta.storage.commit.actions.AbstractMetadata; @@ -70,6 +71,8 @@ import java.util.function.Supplier; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; /** * A REST client implementation of {@link UCDeltaClient} that uses the UC Delta REST API for @@ -284,9 +287,87 @@ public GetCommitsResponse getCommits( TableIdentifier tableIdentifier, Optional startVersion, Optional endVersion) throws IOException, UCCommitCoordinatorException { - throw new UnsupportedOperationException( - "getCommits is not yet supported by UCDeltaTokenBasedRestClient. " + - "A separate PR will add this once the tableIdentifier mapping is available."); + ensureOpen(); + Objects.requireNonNull(tableId, "tableId must not be null"); + Objects.requireNonNull(tableUri, "tableUri must not be null"); + Objects.requireNonNull(tableIdentifier, "tableIdentifier must not be null"); + Objects.requireNonNull(startVersion, "startVersion must not be null"); + Objects.requireNonNull(endVersion, "endVersion must not be null"); + + String[] namespace = Objects.requireNonNull( + tableIdentifier.getNamespace(), "tableIdentifier namespace must not be null"); + if (namespace.length != 2) { + throw new IllegalArgumentException( + "tableIdentifier must be a three-part Unity Catalog table name"); + } + String catalog = Objects.requireNonNull(namespace[0], "catalog name must not be null"); + String schema = Objects.requireNonNull(namespace[1], "schema name must not be null"); + String table = Objects.requireNonNull(tableIdentifier.getName(), "table name must not be null"); + String fullName = catalog + "." + schema + "." + table; + + LoadTableResponse response; + try { + response = deltaTablesApi.loadTable(catalog, schema, table); + } catch (ApiException e) { + if (e.getCode() == HTTP_NOT_FOUND) { + throw new InvalidTargetTableException( + String.format("Table not found %s: %s", fullName, e.getResponseBody())); + } + throw new IOException( + String.format("Failed to load commits for table %s (HTTP %s): %s", + fullName, e.getCode(), e.getResponseBody()), + e); + } + + Objects.requireNonNull(response, "loadTable response must not be null"); + String actualTableId = response.getMetadata() != null + && response.getMetadata().getTableUuid() != null + ? response.getMetadata().getTableUuid().toString() + : null; + if (!Objects.equals(tableId, actualTableId)) { + throw new InvalidTargetTableException( + String.format( + "Table UUID mismatch for %s: expected %s but got %s", + fullName, + tableId, + actualTableId)); + } + + Path basePath = CoordinatedCommitsUtils.commitDirPath( + CoordinatedCommitsUtils.logDirPath(new Path(tableUri))); + List commits = new ArrayList<>(); + if (response.getCommits() != null) { + for (DeltaCommit deltaCommit : response.getCommits()) { + long version = Objects.requireNonNull( + deltaCommit.getVersion(), "commit version must not be null"); + if (startVersion.isPresent() && version < startVersion.get()) { + continue; + } + if (endVersion.isPresent() && version > endVersion.get()) { + continue; + } + + commits.add(new Commit( + version, + new FileStatus( + Objects.requireNonNull( + deltaCommit.getFileSize(), "commit fileSize must not be null"), + false /* isdir */, + 0 /* block_replication */, + 0 /* blocksize */, + Objects.requireNonNull( + deltaCommit.getFileModificationTimestamp(), + "commit fileModificationTimestamp must not be null"), + new Path(basePath, Objects.requireNonNull( + deltaCommit.getFileName(), "commit fileName must not be null"))), + Objects.requireNonNull( + deltaCommit.getTimestamp(), "commit timestamp must not be null"))); + } + } + + long latestTableVersion = response.getLatestTableVersion() != null + ? response.getLatestTableVersion() : -1L; + return new GetCommitsResponse(commits, latestTableVersion); } @Override diff --git a/storage/src/test/scala/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClientSuite.scala b/storage/src/test/scala/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClientSuite.scala index c414534da8e..35d122670d6 100644 --- a/storage/src/test/scala/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClientSuite.scala +++ b/storage/src/test/scala/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClientSuite.scala @@ -32,6 +32,7 @@ import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.http.HttpStatus import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.scalatest.funsuite.AnyFunSuite +import scala.jdk.CollectionConverters._ class UCDeltaTokenBasedRestClientSuite extends AnyFunSuite @@ -81,7 +82,9 @@ class UCDeltaTokenBasedRestClientSuite tableUuid: UUID = testTableId, format: String = "DELTA", location: String = "s3://bucket/table", - tableType: String = "MANAGED"): String = + tableType: String = "MANAGED", + commitsJson: String = "[]", + latestTableVersion: Long = -1L): String = s"""{"metadata":{"table-uuid":"$tableUuid","data-source-format":"$format",""" + s""""table-type":"$tableType",""" + s""""location":"$location",""" + @@ -89,7 +92,17 @@ class UCDeltaTokenBasedRestClientSuite s"""{"name":"date","type":"string","nullable":true,"metadata":{}},""" + s"""{"name":"value","type":"integer","nullable":true,"metadata":{}}""" + s"""]},""" + - s""""properties":{"key1":"val1"},"partition-columns":["date"],"created-time":1000}}""" + s""""properties":{"key1":"val1"},"partition-columns":["date"],"created-time":1000},""" + + s""""commits":$commitsJson,"latest-table-version":$latestTableVersion}""" + + private def deltaCommitJson( + version: Long, + fileName: String, + fileSize: Long, + timestamp: Long, + fileModificationTimestamp: Long): String = + s"""{"version":$version,"file-name":"$fileName","file-size":$fileSize,""" + + s""""timestamp":$timestamp,"file-modification-timestamp":$fileModificationTimestamp}""" private def readBody(exchange: HttpExchange): String = { val is = exchange.getRequestBody @@ -585,9 +598,93 @@ class UCDeltaTokenBasedRestClientSuite // --------------- getCommits --------------- - test("getCommits throws UnsupportedOperationException") { + test("getCommits loads table by identifier and returns commits") { + var capturedMethod: String = null + var capturedPath: String = null + val commitsJson = "[" + + deltaCommitJson(2L, "00000000000000000002.uuid.json", 200L, 2000L, 2001L) + + "]" + deltaHandler = (exchange, _) => { + capturedMethod = exchange.getRequestMethod + capturedPath = exchange.getRequestURI.getPath + sendJson(exchange, HttpStatus.SC_OK, + loadTableJson(commitsJson = commitsJson, latestTableVersion = 2L)) + } + + withClient { c => + val response = c.getCommits(testTableId.toString, new URI("s3://b/t"), testIdentifier, + Optional.empty(), Optional.empty()) + + assert(capturedMethod === "GET") + assert(capturedPath === + "/api/2.1/unity-catalog/delta/v1/catalogs/cat/schemas/sch/tables/tbl") + assert(response.getLatestTableVersion === 2L) + assert(response.getCommits.size() === 1) + + val commit = response.getCommits.get(0) + assert(commit.getVersion === 2L) + assert(commit.getCommitTimestamp === 2000L) + assert(commit.getFileStatus.getLen === 200L) + assert(commit.getFileStatus.getModificationTime === 2001L) + assert(commit.getFileStatus.getPath.toString === + "s3://b/t/_delta_log/_staged_commits/00000000000000000002.uuid.json") + } + } + + test("getCommits filters loaded commits by requested version range") { + val commitsJson = Seq( + deltaCommitJson(1L, "1.uuid.json", 100L, 1000L, 1001L), + deltaCommitJson(2L, "2.uuid.json", 200L, 2000L, 2001L), + deltaCommitJson(3L, "3.uuid.json", 300L, 3000L, 3001L), + deltaCommitJson(4L, "4.uuid.json", 400L, 4000L, 4001L) + ).mkString("[", ",", "]") + deltaHandler = (exchange, _) => + sendJson(exchange, HttpStatus.SC_OK, + loadTableJson(commitsJson = commitsJson, latestTableVersion = 4L)) + withClient { c => - intercept[UnsupportedOperationException] { + val response = c.getCommits(testTableId.toString, new URI("s3://b/t"), testIdentifier, + Optional.of(java.lang.Long.valueOf(2L)), + Optional.of(java.lang.Long.valueOf(3L))) + + assert(response.getLatestTableVersion === 4L) + assert(response.getCommits.asScala.map(_.getVersion).toSeq === Seq(2L, 3L)) + } + } + + test("getCommits validates required parameters") { + withClient { c => + intercept[NullPointerException] { + c.getCommits(null, new URI("s3://b/t"), testIdentifier, + Optional.empty(), Optional.empty()) + } + intercept[NullPointerException] { + c.getCommits(testTableId.toString, null, testIdentifier, + Optional.empty(), Optional.empty()) + } + intercept[NullPointerException] { + c.getCommits(testTableId.toString, new URI("s3://b/t"), null, + Optional.empty(), Optional.empty()) + } + } + } + + test("getCommits throws InvalidTargetTableException on 404") { + deltaHandler = (exchange, _) => sendJson(exchange, HttpStatus.SC_NOT_FOUND, "{}") + withClient { c => + intercept[InvalidTargetTableException] { + c.getCommits(testTableId.toString, new URI("s3://b/t"), testIdentifier, + Optional.empty(), Optional.empty()) + } + } + } + + test("getCommits throws InvalidTargetTableException when table UUID does not match") { + deltaHandler = (exchange, _) => + sendJson(exchange, HttpStatus.SC_OK, + loadTableJson(tableUuid = UUID.fromString("550e8400-e29b-41d4-a716-446655440001"))) + withClient { c => + intercept[InvalidTargetTableException] { c.getCommits(testTableId.toString, new URI("s3://b/t"), testIdentifier, Optional.empty(), Optional.empty()) } From 473ef06ee380dbfe4e5d992f8ca4dd3a00cfb90f Mon Sep 17 00:00:00 2001 From: Timothy Wang Date: Tue, 19 May 2026 01:28:44 +0000 Subject: [PATCH 2/5] [Storage] Address review feedback for UC token-based getCommits - Compare table UUIDs via UUID equality instead of case-sensitive string compare, matching the existing UUID.fromString pattern in commit(). - Extract fromDeltaCommit helper to mirror UCTokenBasedRestClient's fromDeltaCommitInfo and remove the inline null-check storm inside the Commit/FileStatus constructor. - Document why version filtering is client-side (loadTable does not expose server-side filters). - Drop the dead requireNonNull(response, ...) after a successful loadTable; the SDK never returns null on 2xx. --- .../UCDeltaTokenBasedRestClient.java | 53 ++++++++++--------- 1 file changed, 29 insertions(+), 24 deletions(-) diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClient.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClient.java index 1c089cfbbf5..4f9ff06b5a6 100644 --- a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClient.java +++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClient.java @@ -294,6 +294,7 @@ public GetCommitsResponse getCommits( Objects.requireNonNull(startVersion, "startVersion must not be null"); Objects.requireNonNull(endVersion, "endVersion must not be null"); + UUID expectedTableUuid = UUID.fromString(tableId); String[] namespace = Objects.requireNonNull( tableIdentifier.getNamespace(), "tableIdentifier namespace must not be null"); if (namespace.length != 2) { @@ -305,6 +306,9 @@ public GetCommitsResponse getCommits( String table = Objects.requireNonNull(tableIdentifier.getName(), "table name must not be null"); String fullName = catalog + "." + schema + "." + table; + // The UC loadTable endpoint does not support server-side filtering by version range, so + // we fetch the full unbackfilled commit window and filter client-side below. The server + // bounds the window size, so this list is not unbounded in practice. LoadTableResponse response; try { response = deltaTablesApi.loadTable(catalog, schema, table); @@ -319,18 +323,15 @@ public GetCommitsResponse getCommits( e); } - Objects.requireNonNull(response, "loadTable response must not be null"); - String actualTableId = response.getMetadata() != null - && response.getMetadata().getTableUuid() != null - ? response.getMetadata().getTableUuid().toString() - : null; - if (!Objects.equals(tableId, actualTableId)) { + TableMetadata metadata = response.getMetadata(); + UUID actualTableUuid = metadata != null ? metadata.getTableUuid() : null; + if (!expectedTableUuid.equals(actualTableUuid)) { throw new InvalidTargetTableException( String.format( "Table UUID mismatch for %s: expected %s but got %s", fullName, - tableId, - actualTableId)); + expectedTableUuid, + actualTableUuid)); } Path basePath = CoordinatedCommitsUtils.commitDirPath( @@ -346,22 +347,7 @@ public GetCommitsResponse getCommits( if (endVersion.isPresent() && version > endVersion.get()) { continue; } - - commits.add(new Commit( - version, - new FileStatus( - Objects.requireNonNull( - deltaCommit.getFileSize(), "commit fileSize must not be null"), - false /* isdir */, - 0 /* block_replication */, - 0 /* blocksize */, - Objects.requireNonNull( - deltaCommit.getFileModificationTimestamp(), - "commit fileModificationTimestamp must not be null"), - new Path(basePath, Objects.requireNonNull( - deltaCommit.getFileName(), "commit fileName must not be null"))), - Objects.requireNonNull( - deltaCommit.getTimestamp(), "commit timestamp must not be null"))); + commits.add(fromDeltaCommit(deltaCommit, basePath)); } } @@ -370,6 +356,25 @@ public GetCommitsResponse getCommits( return new GetCommitsResponse(commits, latestTableVersion); } + /** Converts a UC SDK {@link DeltaCommit} to a Delta {@link Commit}. */ + private Commit fromDeltaCommit(DeltaCommit deltaCommit, Path basePath) { + FileStatus fileStatus = new FileStatus( + Objects.requireNonNull( + deltaCommit.getFileSize(), "commit fileSize must not be null"), + false /* isdir */, + 0 /* block_replication */, + 0 /* blocksize */, + Objects.requireNonNull( + deltaCommit.getFileModificationTimestamp(), + "commit fileModificationTimestamp must not be null"), + new Path(basePath, Objects.requireNonNull( + deltaCommit.getFileName(), "commit fileName must not be null"))); + return new Commit( + Objects.requireNonNull(deltaCommit.getVersion(), "commit version must not be null"), + fileStatus, + Objects.requireNonNull(deltaCommit.getTimestamp(), "commit timestamp must not be null")); + } + @Override public void finalizeCreate( String tableName, From 9b9b93172a6af8529de21be84a0a6a4365144f40 Mon Sep 17 00:00:00 2001 From: Timothy Wang Date: Tue, 19 May 2026 05:18:32 +0000 Subject: [PATCH 3/5] [Storage] Address openinx review comments - Extract resolveThreePartName helper used by loadTable, commit, and getCommits, replacing three near-identical inline parses of TableIdentifier with one source of truth (per openinx review). - Change getCommits 404 from InvalidTargetTableException to NoSuchTableException, matching loadTable and the typed exception introduced in #6811. - Update the 404 test to mirror loadTable's NoSuchTableException test (asserts the qualified table name and the response body are in the error message). --- .../UCDeltaTokenBasedRestClient.java | 90 +++++++++++-------- .../UCDeltaTokenBasedRestClientSuite.scala | 9 +- 2 files changed, 57 insertions(+), 42 deletions(-) diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClient.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClient.java index 4f9ff06b5a6..2d612ef866b 100644 --- a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClient.java +++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClient.java @@ -236,7 +236,7 @@ public void commit( throws IOException, CommitFailedException, UCCommitCoordinatorException { ensureOpen(); Objects.requireNonNull(tableId, "tableId must not be null"); - Objects.requireNonNull(tableIdentifier, "tableIdentifier must not be null"); + ResolvedTableName name = resolveThreePartName(tableIdentifier); UpdateTableRequest request = new UpdateTableRequest(); request.addRequirementsItem(new AssertTableUUID() @@ -269,14 +269,10 @@ public void commit( .protocol(toSDKDeltaProtocol(newProtocol.get()))); } - String catalog = tableIdentifier.getNamespace()[0]; - String schema = tableIdentifier.getNamespace()[1]; - String table = tableIdentifier.getName(); - try { - deltaTablesApi.updateTable(catalog, schema, table, request); + deltaTablesApi.updateTable(name.catalog, name.schema, name.table, request); } catch (ApiException e) { - handleUpdateTableException(e, catalog, schema, table); + handleUpdateTableException(e, name.catalog, name.schema, name.table); } } @@ -290,36 +286,26 @@ public GetCommitsResponse getCommits( ensureOpen(); Objects.requireNonNull(tableId, "tableId must not be null"); Objects.requireNonNull(tableUri, "tableUri must not be null"); - Objects.requireNonNull(tableIdentifier, "tableIdentifier must not be null"); Objects.requireNonNull(startVersion, "startVersion must not be null"); Objects.requireNonNull(endVersion, "endVersion must not be null"); UUID expectedTableUuid = UUID.fromString(tableId); - String[] namespace = Objects.requireNonNull( - tableIdentifier.getNamespace(), "tableIdentifier namespace must not be null"); - if (namespace.length != 2) { - throw new IllegalArgumentException( - "tableIdentifier must be a three-part Unity Catalog table name"); - } - String catalog = Objects.requireNonNull(namespace[0], "catalog name must not be null"); - String schema = Objects.requireNonNull(namespace[1], "schema name must not be null"); - String table = Objects.requireNonNull(tableIdentifier.getName(), "table name must not be null"); - String fullName = catalog + "." + schema + "." + table; + ResolvedTableName name = resolveThreePartName(tableIdentifier); // The UC loadTable endpoint does not support server-side filtering by version range, so // we fetch the full unbackfilled commit window and filter client-side below. The server // bounds the window size, so this list is not unbounded in practice. LoadTableResponse response; try { - response = deltaTablesApi.loadTable(catalog, schema, table); + response = deltaTablesApi.loadTable(name.catalog, name.schema, name.table); } catch (ApiException e) { if (e.getCode() == HTTP_NOT_FOUND) { - throw new InvalidTargetTableException( - String.format("Table not found %s: %s", fullName, e.getResponseBody())); + throw new NoSuchTableException( + String.format("Table %s not found in Unity Catalog", name.fullName), e); } throw new IOException( String.format("Failed to load commits for table %s (HTTP %s): %s", - fullName, e.getCode(), e.getResponseBody()), + name.fullName, e.getCode(), e.getResponseBody()), e); } @@ -329,7 +315,7 @@ public GetCommitsResponse getCommits( throw new InvalidTargetTableException( String.format( "Table UUID mismatch for %s: expected %s but got %s", - fullName, + name.fullName, expectedTableUuid, actualTableUuid)); } @@ -425,24 +411,16 @@ public void close() throws IOException { @Override public TableInfo loadTable(TableIdentifier tableIdentifier) throws IOException { ensureOpen(); - Objects.requireNonNull(tableIdentifier, "tableIdentifier must not be null"); - String[] namespace = tableIdentifier.getNamespace(); - if (namespace == null || namespace.length != 2) { - throw new IllegalArgumentException( - "UC tableIdentifier must have a 2-component namespace [catalog, schema]; got " + - (namespace == null ? "null" : java.util.Arrays.toString(namespace))); - } - String catalog = namespace[0]; - String schema = namespace[1]; - String table = tableIdentifier.getName(); + ResolvedTableName name = resolveThreePartName(tableIdentifier); try { return toTableInfo( - deltaTablesApi.loadTable(catalog, schema, table), catalog, schema, table); + deltaTablesApi.loadTable(name.catalog, name.schema, name.table), + name.catalog, name.schema, name.table); } catch (ApiException e) { if (e.getCode() == HTTP_NOT_FOUND) { throw new NoSuchTableException( - String.format("Table %s.%s.%s not found in Unity Catalog", catalog, schema, table), e); + String.format("Table %s not found in Unity Catalog", name.fullName), e); } // UC encodes non-Delta-format errors as HTTP 400 with error.type = // "UnsupportedTableFormatException"; substring-match the body to avoid coupling to an @@ -450,13 +428,13 @@ public TableInfo loadTable(TableIdentifier tableIdentifier) throws IOException { String body = e.getResponseBody(); if (body != null && body.contains("UnsupportedTableFormatException")) { throw new UnsupportedTableFormatException( - String.format("Table %s.%s.%s is not in Delta format; the Delta REST API cannot " - + "serve it. Body: %s", catalog, schema, table, body), + String.format("Table %s is not in Delta format; the Delta REST API cannot " + + "serve it. Body: %s", name.fullName, body), e); } throw new IOException( - String.format("Failed to load table %s.%s.%s (HTTP %s): %s", - catalog, schema, table, e.getCode(), e.getResponseBody()), e); + String.format("Failed to load table %s (HTTP %s): %s", + name.fullName, e.getCode(), e.getResponseBody()), e); } } @@ -726,6 +704,25 @@ private void addMetadataUpdates( } } + // =========================== + // Table Identifier Helpers + // =========================== + + /** + * Validates that the given {@code tableIdentifier} is a Unity Catalog three-part + * (catalog.schema.table) name and returns its resolved parts. + */ + private static ResolvedTableName resolveThreePartName(TableIdentifier tableIdentifier) { + Objects.requireNonNull(tableIdentifier, "tableIdentifier must not be null"); + String[] namespace = tableIdentifier.getNamespace(); + if (namespace == null || namespace.length != 2) { + throw new IllegalArgumentException( + "UC tableIdentifier must have a 2-component namespace [catalog, schema]; got " + + (namespace == null ? "null" : java.util.Arrays.toString(namespace))); + } + return new ResolvedTableName(namespace[0], namespace[1], tableIdentifier.getName()); + } + // =========================== // Exception Handling // =========================== @@ -759,6 +756,21 @@ private void handleUpdateTableException( // Inner Classes // =========================== + /** A Unity Catalog three-part table name resolved from a {@link TableIdentifier}. */ + private static final class ResolvedTableName { + final String catalog; + final String schema; + final String table; + final String fullName; + + ResolvedTableName(String catalog, String schema, String table) { + this.catalog = catalog; + this.schema = schema; + this.table = table; + this.fullName = catalog + "." + schema + "." + table; + } + } + /** * Adapts a UC SDK {@link TableMetadata} to {@link AbstractMetadata}. */ diff --git a/storage/src/test/scala/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClientSuite.scala b/storage/src/test/scala/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClientSuite.scala index 35d122670d6..6e311aeae67 100644 --- a/storage/src/test/scala/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClientSuite.scala +++ b/storage/src/test/scala/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClientSuite.scala @@ -669,13 +669,16 @@ class UCDeltaTokenBasedRestClientSuite } } - test("getCommits throws InvalidTargetTableException on 404") { - deltaHandler = (exchange, _) => sendJson(exchange, HttpStatus.SC_NOT_FOUND, "{}") + test("getCommits throws NoSuchTableException on 404") { + deltaHandler = (exchange, _) => + sendJson(exchange, HttpStatus.SC_NOT_FOUND, """{"error":"not found"}""") withClient { c => - intercept[InvalidTargetTableException] { + val e = intercept[NoSuchTableException] { c.getCommits(testTableId.toString, new URI("s3://b/t"), testIdentifier, Optional.empty(), Optional.empty()) } + assert(e.getMessage.contains(s"$testCatalog.$testSchema.$testTable")) + assert(e.getMessage.contains("not found")) } } From d6167e4d673fbda00a0e27128808ffe82b163235 Mon Sep 17 00:00:00 2001 From: Timothy Wang Date: Tue, 19 May 2026 18:24:54 +0000 Subject: [PATCH 4/5] [Storage] Address review nits - Rename resolveThreePartName to requireThreePartName: the method validates and unpacks, it does not resolve. requireThreePartName matches the requireNonNull idiom. - Drop the per-field Objects.requireNonNull storm inside fromDeltaCommit and from the outer loop's version unbox. The SDK marks every DeltaCommit getter @Nonnull; matching the sibling fromDeltaCommitInfo (which trusts the SDK) keeps the two helpers symmetric. - Import java.util.Arrays instead of fully-qualifying inside requireThreePartName. - Move scala.jdk.CollectionConverters._ into its own scala.* import group between java.* and the third-party block (scalafmt order). - Extend the getCommits null-parameter test to cover startVersion and endVersion as well, matching the five requireNonNull calls in the method body. - Assert message contents on the getCommits UUID-mismatch test (qualified table name plus both UUIDs), matching the assertion shape used by the loadTable 404 test. --- .../UCDeltaTokenBasedRestClient.java | 29 +++++++------------ .../UCDeltaTokenBasedRestClientSuite.scala | 20 ++++++++++--- 2 files changed, 27 insertions(+), 22 deletions(-) diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClient.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClient.java index 2d612ef866b..b687ed71131 100644 --- a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClient.java +++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClient.java @@ -61,6 +61,7 @@ import java.io.IOException; import java.net.URI; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; @@ -236,7 +237,7 @@ public void commit( throws IOException, CommitFailedException, UCCommitCoordinatorException { ensureOpen(); Objects.requireNonNull(tableId, "tableId must not be null"); - ResolvedTableName name = resolveThreePartName(tableIdentifier); + ResolvedTableName name = requireThreePartName(tableIdentifier); UpdateTableRequest request = new UpdateTableRequest(); request.addRequirementsItem(new AssertTableUUID() @@ -290,7 +291,7 @@ public GetCommitsResponse getCommits( Objects.requireNonNull(endVersion, "endVersion must not be null"); UUID expectedTableUuid = UUID.fromString(tableId); - ResolvedTableName name = resolveThreePartName(tableIdentifier); + ResolvedTableName name = requireThreePartName(tableIdentifier); // The UC loadTable endpoint does not support server-side filtering by version range, so // we fetch the full unbackfilled commit window and filter client-side below. The server @@ -325,8 +326,7 @@ public GetCommitsResponse getCommits( List commits = new ArrayList<>(); if (response.getCommits() != null) { for (DeltaCommit deltaCommit : response.getCommits()) { - long version = Objects.requireNonNull( - deltaCommit.getVersion(), "commit version must not be null"); + long version = deltaCommit.getVersion(); if (startVersion.isPresent() && version < startVersion.get()) { continue; } @@ -345,20 +345,13 @@ public GetCommitsResponse getCommits( /** Converts a UC SDK {@link DeltaCommit} to a Delta {@link Commit}. */ private Commit fromDeltaCommit(DeltaCommit deltaCommit, Path basePath) { FileStatus fileStatus = new FileStatus( - Objects.requireNonNull( - deltaCommit.getFileSize(), "commit fileSize must not be null"), + deltaCommit.getFileSize(), false /* isdir */, 0 /* block_replication */, 0 /* blocksize */, - Objects.requireNonNull( - deltaCommit.getFileModificationTimestamp(), - "commit fileModificationTimestamp must not be null"), - new Path(basePath, Objects.requireNonNull( - deltaCommit.getFileName(), "commit fileName must not be null"))); - return new Commit( - Objects.requireNonNull(deltaCommit.getVersion(), "commit version must not be null"), - fileStatus, - Objects.requireNonNull(deltaCommit.getTimestamp(), "commit timestamp must not be null")); + deltaCommit.getFileModificationTimestamp(), + new Path(basePath, deltaCommit.getFileName())); + return new Commit(deltaCommit.getVersion(), fileStatus, deltaCommit.getTimestamp()); } @Override @@ -411,7 +404,7 @@ public void close() throws IOException { @Override public TableInfo loadTable(TableIdentifier tableIdentifier) throws IOException { ensureOpen(); - ResolvedTableName name = resolveThreePartName(tableIdentifier); + ResolvedTableName name = requireThreePartName(tableIdentifier); try { return toTableInfo( @@ -712,13 +705,13 @@ private void addMetadataUpdates( * Validates that the given {@code tableIdentifier} is a Unity Catalog three-part * (catalog.schema.table) name and returns its resolved parts. */ - private static ResolvedTableName resolveThreePartName(TableIdentifier tableIdentifier) { + private static ResolvedTableName requireThreePartName(TableIdentifier tableIdentifier) { Objects.requireNonNull(tableIdentifier, "tableIdentifier must not be null"); String[] namespace = tableIdentifier.getNamespace(); if (namespace == null || namespace.length != 2) { throw new IllegalArgumentException( "UC tableIdentifier must have a 2-component namespace [catalog, schema]; got " + - (namespace == null ? "null" : java.util.Arrays.toString(namespace))); + (namespace == null ? "null" : Arrays.toString(namespace))); } return new ResolvedTableName(namespace[0], namespace[1], tableIdentifier.getName()); } diff --git a/storage/src/test/scala/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClientSuite.scala b/storage/src/test/scala/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClientSuite.scala index 6e311aeae67..23cdba745f2 100644 --- a/storage/src/test/scala/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClientSuite.scala +++ b/storage/src/test/scala/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClientSuite.scala @@ -20,6 +20,8 @@ import java.net.{InetSocketAddress, URI} import java.nio.charset.StandardCharsets import java.util.{Collections, Optional, Set => JSet, UUID} +import scala.jdk.CollectionConverters._ + import com.fasterxml.jackson.databind.ObjectMapper import com.sun.net.httpserver.{HttpExchange, HttpServer} import io.delta.storage.commit.{Commit, CommitFailedException, TableIdentifier} @@ -32,7 +34,6 @@ import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.http.HttpStatus import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.scalatest.funsuite.AnyFunSuite -import scala.jdk.CollectionConverters._ class UCDeltaTokenBasedRestClientSuite extends AnyFunSuite @@ -666,6 +667,14 @@ class UCDeltaTokenBasedRestClientSuite c.getCommits(testTableId.toString, new URI("s3://b/t"), null, Optional.empty(), Optional.empty()) } + intercept[NullPointerException] { + c.getCommits(testTableId.toString, new URI("s3://b/t"), testIdentifier, + null, Optional.empty()) + } + intercept[NullPointerException] { + c.getCommits(testTableId.toString, new URI("s3://b/t"), testIdentifier, + Optional.empty(), null) + } } } @@ -683,14 +692,17 @@ class UCDeltaTokenBasedRestClientSuite } test("getCommits throws InvalidTargetTableException when table UUID does not match") { + val actualUuid = UUID.fromString("550e8400-e29b-41d4-a716-446655440001") deltaHandler = (exchange, _) => - sendJson(exchange, HttpStatus.SC_OK, - loadTableJson(tableUuid = UUID.fromString("550e8400-e29b-41d4-a716-446655440001"))) + sendJson(exchange, HttpStatus.SC_OK, loadTableJson(tableUuid = actualUuid)) withClient { c => - intercept[InvalidTargetTableException] { + val e = intercept[InvalidTargetTableException] { c.getCommits(testTableId.toString, new URI("s3://b/t"), testIdentifier, Optional.empty(), Optional.empty()) } + assert(e.getMessage.contains(s"$testCatalog.$testSchema.$testTable")) + assert(e.getMessage.contains(testTableId.toString)) + assert(e.getMessage.contains(actualUuid.toString)) } } From ee014c8cf801fc4a42ebf0a098dcd421be6ec130 Mon Sep 17 00:00:00 2001 From: Timothy Wang Date: Tue, 19 May 2026 18:30:42 +0000 Subject: [PATCH 5/5] [Storage] Compare tableId as String, not UUID Per openinx r3263890890: prefer plain String comparison over round-tripping tableId through UUID.fromString. The UC delta spec canonicalizes the UUID form so both sides produce the same string. Drops the upfront UUID.fromString validation step and keeps the mismatch error message in terms of the strings the caller passed in. --- .../UCDeltaTokenBasedRestClient.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClient.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClient.java index b687ed71131..850de22b391 100644 --- a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClient.java +++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClient.java @@ -290,7 +290,6 @@ public GetCommitsResponse getCommits( Objects.requireNonNull(startVersion, "startVersion must not be null"); Objects.requireNonNull(endVersion, "endVersion must not be null"); - UUID expectedTableUuid = UUID.fromString(tableId); ResolvedTableName name = requireThreePartName(tableIdentifier); // The UC loadTable endpoint does not support server-side filtering by version range, so @@ -311,14 +310,16 @@ public GetCommitsResponse getCommits( } TableMetadata metadata = response.getMetadata(); - UUID actualTableUuid = metadata != null ? metadata.getTableUuid() : null; - if (!expectedTableUuid.equals(actualTableUuid)) { + String actualTableId = metadata != null && metadata.getTableUuid() != null + ? metadata.getTableUuid().toString() + : null; + if (!tableId.equals(actualTableId)) { throw new InvalidTargetTableException( String.format( "Table UUID mismatch for %s: expected %s but got %s", name.fullName, - expectedTableUuid, - actualTableUuid)); + tableId, + actualTableId)); } Path basePath = CoordinatedCommitsUtils.commitDirPath(