diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClient.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClient.java index 5e15c09aeca..850de22b391 100644 --- a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClient.java +++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClient.java @@ -18,6 +18,7 @@ import io.delta.storage.commit.Commit; import io.delta.storage.commit.CommitFailedException; +import io.delta.storage.commit.CoordinatedCommitsUtils; import io.delta.storage.commit.GetCommitsResponse; import io.delta.storage.commit.TableIdentifier; import io.delta.storage.commit.actions.AbstractMetadata; @@ -60,6 +61,7 @@ import java.io.IOException; import java.net.URI; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; @@ -70,6 +72,8 @@ import java.util.function.Supplier; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; /** * A REST client implementation of {@link UCDeltaClient} that uses the UC Delta REST API for @@ -233,7 +237,7 @@ public void commit( throws IOException, CommitFailedException, UCCommitCoordinatorException { ensureOpen(); Objects.requireNonNull(tableId, "tableId must not be null"); - Objects.requireNonNull(tableIdentifier, "tableIdentifier must not be null"); + ResolvedTableName name = requireThreePartName(tableIdentifier); UpdateTableRequest request = new UpdateTableRequest(); request.addRequirementsItem(new AssertTableUUID() @@ -266,14 +270,10 @@ public void commit( .protocol(toSDKDeltaProtocol(newProtocol.get()))); } - String catalog = tableIdentifier.getNamespace()[0]; - String schema = tableIdentifier.getNamespace()[1]; - String table = tableIdentifier.getName(); - try { - deltaTablesApi.updateTable(catalog, schema, table, request); + deltaTablesApi.updateTable(name.catalog, name.schema, name.table, request); } catch (ApiException e) { - handleUpdateTableException(e, catalog, schema, table); + handleUpdateTableException(e, name.catalog, name.schema, name.table); } } @@ -284,9 +284,75 @@ public GetCommitsResponse getCommits( TableIdentifier tableIdentifier, Optional startVersion, Optional endVersion) throws IOException, UCCommitCoordinatorException { - throw new UnsupportedOperationException( - "getCommits is not yet supported by UCDeltaTokenBasedRestClient. " + - "A separate PR will add this once the tableIdentifier mapping is available."); + ensureOpen(); + Objects.requireNonNull(tableId, "tableId must not be null"); + Objects.requireNonNull(tableUri, "tableUri must not be null"); + Objects.requireNonNull(startVersion, "startVersion must not be null"); + Objects.requireNonNull(endVersion, "endVersion must not be null"); + + ResolvedTableName name = requireThreePartName(tableIdentifier); + + // The UC loadTable endpoint does not support server-side filtering by version range, so + // we fetch the full unbackfilled commit window and filter client-side below. The server + // bounds the window size, so this list is not unbounded in practice. + LoadTableResponse response; + try { + response = deltaTablesApi.loadTable(name.catalog, name.schema, name.table); + } catch (ApiException e) { + if (e.getCode() == HTTP_NOT_FOUND) { + throw new NoSuchTableException( + String.format("Table %s not found in Unity Catalog", name.fullName), e); + } + throw new IOException( + String.format("Failed to load commits for table %s (HTTP %s): %s", + name.fullName, e.getCode(), e.getResponseBody()), + e); + } + + TableMetadata metadata = response.getMetadata(); + String actualTableId = metadata != null && metadata.getTableUuid() != null + ? metadata.getTableUuid().toString() + : null; + if (!tableId.equals(actualTableId)) { + throw new InvalidTargetTableException( + String.format( + "Table UUID mismatch for %s: expected %s but got %s", + name.fullName, + tableId, + actualTableId)); + } + + Path basePath = CoordinatedCommitsUtils.commitDirPath( + CoordinatedCommitsUtils.logDirPath(new Path(tableUri))); + List commits = new ArrayList<>(); + if (response.getCommits() != null) { + for (DeltaCommit deltaCommit : response.getCommits()) { + long version = deltaCommit.getVersion(); + if (startVersion.isPresent() && version < startVersion.get()) { + continue; + } + if (endVersion.isPresent() && version > endVersion.get()) { + continue; + } + commits.add(fromDeltaCommit(deltaCommit, basePath)); + } + } + + long latestTableVersion = response.getLatestTableVersion() != null + ? response.getLatestTableVersion() : -1L; + return new GetCommitsResponse(commits, latestTableVersion); + } + + /** Converts a UC SDK {@link DeltaCommit} to a Delta {@link Commit}. */ + private Commit fromDeltaCommit(DeltaCommit deltaCommit, Path basePath) { + FileStatus fileStatus = new FileStatus( + deltaCommit.getFileSize(), + false /* isdir */, + 0 /* block_replication */, + 0 /* blocksize */, + deltaCommit.getFileModificationTimestamp(), + new Path(basePath, deltaCommit.getFileName())); + return new Commit(deltaCommit.getVersion(), fileStatus, deltaCommit.getTimestamp()); } @Override @@ -339,24 +405,16 @@ public void close() throws IOException { @Override public TableInfo loadTable(TableIdentifier tableIdentifier) throws IOException { ensureOpen(); - Objects.requireNonNull(tableIdentifier, "tableIdentifier must not be null"); - String[] namespace = tableIdentifier.getNamespace(); - if (namespace == null || namespace.length != 2) { - throw new IllegalArgumentException( - "UC tableIdentifier must have a 2-component namespace [catalog, schema]; got " + - (namespace == null ? "null" : java.util.Arrays.toString(namespace))); - } - String catalog = namespace[0]; - String schema = namespace[1]; - String table = tableIdentifier.getName(); + ResolvedTableName name = requireThreePartName(tableIdentifier); try { return toTableInfo( - deltaTablesApi.loadTable(catalog, schema, table), catalog, schema, table); + deltaTablesApi.loadTable(name.catalog, name.schema, name.table), + name.catalog, name.schema, name.table); } catch (ApiException e) { if (e.getCode() == HTTP_NOT_FOUND) { throw new NoSuchTableException( - String.format("Table %s.%s.%s not found in Unity Catalog", catalog, schema, table), e); + String.format("Table %s not found in Unity Catalog", name.fullName), e); } // UC encodes non-Delta-format errors as HTTP 400 with error.type = // "UnsupportedTableFormatException"; substring-match the body to avoid coupling to an @@ -364,13 +422,13 @@ public TableInfo loadTable(TableIdentifier tableIdentifier) throws IOException { String body = e.getResponseBody(); if (body != null && body.contains("UnsupportedTableFormatException")) { throw new UnsupportedTableFormatException( - String.format("Table %s.%s.%s is not in Delta format; the Delta REST API cannot " - + "serve it. Body: %s", catalog, schema, table, body), + String.format("Table %s is not in Delta format; the Delta REST API cannot " + + "serve it. Body: %s", name.fullName, body), e); } throw new IOException( - String.format("Failed to load table %s.%s.%s (HTTP %s): %s", - catalog, schema, table, e.getCode(), e.getResponseBody()), e); + String.format("Failed to load table %s (HTTP %s): %s", + name.fullName, e.getCode(), e.getResponseBody()), e); } } @@ -640,6 +698,25 @@ private void addMetadataUpdates( } } + // =========================== + // Table Identifier Helpers + // =========================== + + /** + * Validates that the given {@code tableIdentifier} is a Unity Catalog three-part + * (catalog.schema.table) name and returns its resolved parts. + */ + private static ResolvedTableName requireThreePartName(TableIdentifier tableIdentifier) { + Objects.requireNonNull(tableIdentifier, "tableIdentifier must not be null"); + String[] namespace = tableIdentifier.getNamespace(); + if (namespace == null || namespace.length != 2) { + throw new IllegalArgumentException( + "UC tableIdentifier must have a 2-component namespace [catalog, schema]; got " + + (namespace == null ? "null" : Arrays.toString(namespace))); + } + return new ResolvedTableName(namespace[0], namespace[1], tableIdentifier.getName()); + } + // =========================== // Exception Handling // =========================== @@ -673,6 +750,21 @@ private void handleUpdateTableException( // Inner Classes // =========================== + /** A Unity Catalog three-part table name resolved from a {@link TableIdentifier}. */ + private static final class ResolvedTableName { + final String catalog; + final String schema; + final String table; + final String fullName; + + ResolvedTableName(String catalog, String schema, String table) { + this.catalog = catalog; + this.schema = schema; + this.table = table; + this.fullName = catalog + "." + schema + "." + table; + } + } + /** * Adapts a UC SDK {@link TableMetadata} to {@link AbstractMetadata}. */ diff --git a/storage/src/test/scala/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClientSuite.scala b/storage/src/test/scala/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClientSuite.scala index c414534da8e..23cdba745f2 100644 --- a/storage/src/test/scala/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClientSuite.scala +++ b/storage/src/test/scala/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClientSuite.scala @@ -20,6 +20,8 @@ import java.net.{InetSocketAddress, URI} import java.nio.charset.StandardCharsets import java.util.{Collections, Optional, Set => JSet, UUID} +import scala.jdk.CollectionConverters._ + import com.fasterxml.jackson.databind.ObjectMapper import com.sun.net.httpserver.{HttpExchange, HttpServer} import io.delta.storage.commit.{Commit, CommitFailedException, TableIdentifier} @@ -81,7 +83,9 @@ class UCDeltaTokenBasedRestClientSuite tableUuid: UUID = testTableId, format: String = "DELTA", location: String = "s3://bucket/table", - tableType: String = "MANAGED"): String = + tableType: String = "MANAGED", + commitsJson: String = "[]", + latestTableVersion: Long = -1L): String = s"""{"metadata":{"table-uuid":"$tableUuid","data-source-format":"$format",""" + s""""table-type":"$tableType",""" + s""""location":"$location",""" + @@ -89,7 +93,17 @@ class UCDeltaTokenBasedRestClientSuite s"""{"name":"date","type":"string","nullable":true,"metadata":{}},""" + s"""{"name":"value","type":"integer","nullable":true,"metadata":{}}""" + s"""]},""" + - s""""properties":{"key1":"val1"},"partition-columns":["date"],"created-time":1000}}""" + s""""properties":{"key1":"val1"},"partition-columns":["date"],"created-time":1000},""" + + s""""commits":$commitsJson,"latest-table-version":$latestTableVersion}""" + + private def deltaCommitJson( + version: Long, + fileName: String, + fileSize: Long, + timestamp: Long, + fileModificationTimestamp: Long): String = + s"""{"version":$version,"file-name":"$fileName","file-size":$fileSize,""" + + s""""timestamp":$timestamp,"file-modification-timestamp":$fileModificationTimestamp}""" private def readBody(exchange: HttpExchange): String = { val is = exchange.getRequestBody @@ -585,12 +599,110 @@ class UCDeltaTokenBasedRestClientSuite // --------------- getCommits --------------- - test("getCommits throws UnsupportedOperationException") { + test("getCommits loads table by identifier and returns commits") { + var capturedMethod: String = null + var capturedPath: String = null + val commitsJson = "[" + + deltaCommitJson(2L, "00000000000000000002.uuid.json", 200L, 2000L, 2001L) + + "]" + deltaHandler = (exchange, _) => { + capturedMethod = exchange.getRequestMethod + capturedPath = exchange.getRequestURI.getPath + sendJson(exchange, HttpStatus.SC_OK, + loadTableJson(commitsJson = commitsJson, latestTableVersion = 2L)) + } + + withClient { c => + val response = c.getCommits(testTableId.toString, new URI("s3://b/t"), testIdentifier, + Optional.empty(), Optional.empty()) + + assert(capturedMethod === "GET") + assert(capturedPath === + "/api/2.1/unity-catalog/delta/v1/catalogs/cat/schemas/sch/tables/tbl") + assert(response.getLatestTableVersion === 2L) + assert(response.getCommits.size() === 1) + + val commit = response.getCommits.get(0) + assert(commit.getVersion === 2L) + assert(commit.getCommitTimestamp === 2000L) + assert(commit.getFileStatus.getLen === 200L) + assert(commit.getFileStatus.getModificationTime === 2001L) + assert(commit.getFileStatus.getPath.toString === + "s3://b/t/_delta_log/_staged_commits/00000000000000000002.uuid.json") + } + } + + test("getCommits filters loaded commits by requested version range") { + val commitsJson = Seq( + deltaCommitJson(1L, "1.uuid.json", 100L, 1000L, 1001L), + deltaCommitJson(2L, "2.uuid.json", 200L, 2000L, 2001L), + deltaCommitJson(3L, "3.uuid.json", 300L, 3000L, 3001L), + deltaCommitJson(4L, "4.uuid.json", 400L, 4000L, 4001L) + ).mkString("[", ",", "]") + deltaHandler = (exchange, _) => + sendJson(exchange, HttpStatus.SC_OK, + loadTableJson(commitsJson = commitsJson, latestTableVersion = 4L)) + + withClient { c => + val response = c.getCommits(testTableId.toString, new URI("s3://b/t"), testIdentifier, + Optional.of(java.lang.Long.valueOf(2L)), + Optional.of(java.lang.Long.valueOf(3L))) + + assert(response.getLatestTableVersion === 4L) + assert(response.getCommits.asScala.map(_.getVersion).toSeq === Seq(2L, 3L)) + } + } + + test("getCommits validates required parameters") { withClient { c => - intercept[UnsupportedOperationException] { + intercept[NullPointerException] { + c.getCommits(null, new URI("s3://b/t"), testIdentifier, + Optional.empty(), Optional.empty()) + } + intercept[NullPointerException] { + c.getCommits(testTableId.toString, null, testIdentifier, + Optional.empty(), Optional.empty()) + } + intercept[NullPointerException] { + c.getCommits(testTableId.toString, new URI("s3://b/t"), null, + Optional.empty(), Optional.empty()) + } + intercept[NullPointerException] { + c.getCommits(testTableId.toString, new URI("s3://b/t"), testIdentifier, + null, Optional.empty()) + } + intercept[NullPointerException] { + c.getCommits(testTableId.toString, new URI("s3://b/t"), testIdentifier, + Optional.empty(), null) + } + } + } + + test("getCommits throws NoSuchTableException on 404") { + deltaHandler = (exchange, _) => + sendJson(exchange, HttpStatus.SC_NOT_FOUND, """{"error":"not found"}""") + withClient { c => + val e = intercept[NoSuchTableException] { c.getCommits(testTableId.toString, new URI("s3://b/t"), testIdentifier, Optional.empty(), Optional.empty()) } + assert(e.getMessage.contains(s"$testCatalog.$testSchema.$testTable")) + assert(e.getMessage.contains("not found")) + } + } + + test("getCommits throws InvalidTargetTableException when table UUID does not match") { + val actualUuid = UUID.fromString("550e8400-e29b-41d4-a716-446655440001") + deltaHandler = (exchange, _) => + sendJson(exchange, HttpStatus.SC_OK, loadTableJson(tableUuid = actualUuid)) + withClient { c => + val e = intercept[InvalidTargetTableException] { + c.getCommits(testTableId.toString, new URI("s3://b/t"), testIdentifier, + Optional.empty(), Optional.empty()) + } + assert(e.getMessage.contains(s"$testCatalog.$testSchema.$testTable")) + assert(e.getMessage.contains(testTableId.toString)) + assert(e.getMessage.contains(actualUuid.toString)) } }