diff --git a/build.sbt b/build.sbt index 27cf4cacd61..7ea13d2b24d 100644 --- a/build.sbt +++ b/build.sbt @@ -480,16 +480,11 @@ lazy val sparkV2 = { libraryDependencies ++= Seq( "io.delta" % "delta-kernel-api" % v, "io.delta" % "delta-kernel-defaults" % v, - "io.delta" % "delta-kernel-unitycatalog" % v - ), - // Kernel main classes are pulled from Maven at version `v`, but several - // sparkV2 tests depend on test-only helpers (e.g. InMemoryUCClient, - // UCCatalogManagedTestUtils) that are not published. Build those test - // jars from the in-tree kernel sources and add them to the test classpath. - Test / unmanagedJars ++= Seq( - (kernelApi / Test / packageBin).value, - (kernelDefaults / Test / packageBin).value, - (kernelUnityCatalog / Test / packageBin).value + "io.delta" % "delta-kernel-unitycatalog" % v, + // sparkV2 tests depend on UC test helpers (InMemoryUCClient, + // UCCatalogManagedTestUtils) that live in kernel-unitycatalog's test sources. + // Consume them via the published tests-classifier jar. + "io.delta" % "delta-kernel-unitycatalog" % v % Test classifier "tests" ) ) } @@ -1169,6 +1164,11 @@ lazy val kernelUnityCatalog = (project in file("kernel/unitycatalog")) // Publish the pinned UC jars before sbt tries to resolve them. update := update.dependsOn(ensurePinnedUnityCatalog).value, + // Also publish a test-jar (classifier = "tests") so consumers (e.g. sparkV2 in + // Maven mode) can depend on UC test helpers (InMemoryUCClient, + // UCCatalogManagedTestUtils) via a published artifact. + Test / publishArtifact := true, + // Put the shaded kernel-api JAR on the classpath (compile & test) Compile / unmanagedJars += (kernelApi / Compile / packageBin).value, Test / unmanagedJars += (kernelApi / Compile / packageBin).value, diff --git a/flink/src/main/java/io/delta/flink/table/CatalogManagedTable.java b/flink/src/main/java/io/delta/flink/table/CatalogManagedTable.java index f37df3fdabd..6756886288f 100644 --- a/flink/src/main/java/io/delta/flink/table/CatalogManagedTable.java +++ b/flink/src/main/java/io/delta/flink/table/CatalogManagedTable.java @@ -24,6 +24,7 @@ import io.delta.kernel.transaction.CreateTableTransactionBuilder; import io.delta.kernel.types.StructType; import io.delta.kernel.unitycatalog.UCCatalogManagedClient; +import io.delta.kernel.unitycatalog.UCTableIdentifier; import io.delta.kernel.utils.CloseableIterable; import io.delta.storage.commit.uccommitcoordinator.UCClient; import io.delta.storage.commit.uccommitcoordinator.UCTokenBasedRestClient; @@ -72,6 +73,12 @@ public class CatalogManagedTable extends AbstractKernelTable { protected transient UCClient ucClient; protected transient UCCatalogManagedClient catalogManagedClient; + private UCTableIdentifier getUcTableIdentifier() { + Preconditions.checkArgument( + catalog instanceof UnityCatalog, "Catalog-managed tables require a UnityCatalog catalog"); + return ((UnityCatalog) catalog).toUcTableIdentifier(tableId); + } + public CatalogManagedTable( DeltaCatalog catalog, String tableId, Map conf, URI endpoint, String token) { this(catalog, tableId, conf, null, null, endpoint, token); @@ -141,6 +148,7 @@ protected Snapshot loadLatestSnapshot() { /* engine */ getEngine(), /* ucTableId */ tableUUID, /* tablePath */ tablePath.toString(), + /* ucTableIdentifier */ getUcTableIdentifier(), /* versionOpt */ Optional.empty(), /* timestampOpt */ Optional.empty())); } @@ -182,6 +190,7 @@ protected boolean versionExists(Long version) { /* engine */ getEngine(), /* ucTableId */ getTableUUID(), /* tablePath */ getTablePath().toString(), + /* ucTableIdentifier */ getUcTableIdentifier(), /* startVersionOpt */ Optional.of(version), /* startTimestampOpt */ Optional.empty(), /* endVersionOpt */ Optional.empty(), diff --git a/flink/src/main/java/io/delta/flink/table/UnityCatalog.java b/flink/src/main/java/io/delta/flink/table/UnityCatalog.java index 68152be128f..f3b6b0d8c79 100644 --- a/flink/src/main/java/io/delta/flink/table/UnityCatalog.java +++ b/flink/src/main/java/io/delta/flink/table/UnityCatalog.java @@ -21,6 +21,7 @@ import dev.failsafe.function.CheckedSupplier; import io.delta.kernel.internal.types.DataTypeJsonSerDe; import io.delta.kernel.types.*; +import io.delta.kernel.unitycatalog.UCTableIdentifier; import io.unitycatalog.client.ApiClient; import io.unitycatalog.client.ApiClientBuilder; import io.unitycatalog.client.ApiException; @@ -231,6 +232,33 @@ public ApiClient getApiClient() { return apiClient; } + /** + * Parses {@code schema.table} or {@code catalog.schema.table} into a {@link UCTableIdentifier}. + * In the 2-part form the catalog defaults to this catalog's name; in the 3-part form the leading + * segment must equal this catalog's name. + */ + UCTableIdentifier toUcTableIdentifier(String qualifiedTableName) { + String[] namespaces = qualifiedTableName.split("\\."); + Preconditions.checkArgument(namespaces.length == 2 || namespaces.length == 3); + String catalogName; + String schemaName; + String tableName; + if (namespaces.length == 3) { + Preconditions.checkArgument( + namespaces[0].equals(getName()), + String.format( + "table's catalog name %s must match catalog's name %s", namespaces[0], getName())); + catalogName = namespaces[0]; + schemaName = namespaces[1]; + tableName = namespaces[2]; + } else { + catalogName = getName(); + schemaName = namespaces[0]; + tableName = namespaces[1]; + } + return new UCTableIdentifier(catalogName, schemaName, tableName); + } + @Override public void open() { if (apiClient == null) { @@ -329,22 +357,9 @@ public void createTable( () -> { TablesApi tablesApi = new TablesApi(apiClient); // Obtain names - String[] namespaces = tableId.split("\\."); - Preconditions.checkArgument(namespaces.length == 2 || namespaces.length == 3); - String schemaName; - String tableName; - if (namespaces.length == 3) { - Preconditions.checkArgument( - namespaces[0].equals(getName()), - String.format( - "table's catalog name %s must match catalog's name %s", - namespaces[0], getName())); - schemaName = namespaces[1]; - tableName = namespaces[2]; - } else { - schemaName = namespaces[0]; - tableName = namespaces[1]; - } + UCTableIdentifier tableIdentifier = toUcTableIdentifier(tableId); + String schemaName = tableIdentifier.getSchemaName(); + String tableName = tableIdentifier.getTableName(); // Column Info List columnInfos = IntStream.range(0, schema.fields().size()) diff --git a/flink/src/test/java/io/delta/flink/table/UnityCatalogTest.java b/flink/src/test/java/io/delta/flink/table/UnityCatalogTest.java index 021d2e5c86c..c420f754d03 100644 --- a/flink/src/test/java/io/delta/flink/table/UnityCatalogTest.java +++ b/flink/src/test/java/io/delta/flink/table/UnityCatalogTest.java @@ -17,16 +17,42 @@ package io.delta.flink.table; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import io.delta.flink.MockHttp; import io.delta.flink.TestHelper; import io.delta.kernel.types.*; +import io.delta.kernel.unitycatalog.UCTableIdentifier; import java.net.URI; import org.junit.jupiter.api.Test; /** JUnit 6 test suite for UnityCatalog. */ class UnityCatalogTest extends TestHelper { + @Test + void testToUcTableIdentifier() { + UnityCatalog catalog = new UnityCatalog("main", URI.create("http://localhost"), ""); + + UCTableIdentifier twoPartIdentifier = catalog.toUcTableIdentifier("default.tbl"); + assertEquals("main", twoPartIdentifier.getCatalogName()); + assertEquals("default", twoPartIdentifier.getSchemaName()); + assertEquals("tbl", twoPartIdentifier.getTableName()); + + UCTableIdentifier threePartIdentifier = catalog.toUcTableIdentifier("main.default.tbl"); + assertEquals("main", threePartIdentifier.getCatalogName()); + assertEquals("default", threePartIdentifier.getSchemaName()); + assertEquals("tbl", threePartIdentifier.getTableName()); + } + + @Test + void testToUcTableIdentifierRejectsInvalidNames() { + UnityCatalog catalog = new UnityCatalog("main", URI.create("http://localhost"), ""); + + assertThrows(IllegalArgumentException.class, () -> catalog.toUcTableIdentifier("tbl")); + assertThrows( + IllegalArgumentException.class, () -> catalog.toUcTableIdentifier("other.default.tbl")); + } + @Test void testGetTable() { withTempDir( diff --git a/kernel/kernel-benchmarks/src/test/java/io/delta/kernel/benchmarks/models/UcCatalogInfo.java b/kernel/kernel-benchmarks/src/test/java/io/delta/kernel/benchmarks/models/UcCatalogInfo.java index 21e9c7b5f80..6ea9657abd2 100644 --- a/kernel/kernel-benchmarks/src/test/java/io/delta/kernel/benchmarks/models/UcCatalogInfo.java +++ b/kernel/kernel-benchmarks/src/test/java/io/delta/kernel/benchmarks/models/UcCatalogInfo.java @@ -24,6 +24,7 @@ import io.delta.kernel.internal.util.FileNames; import io.delta.kernel.unitycatalog.InMemoryUCClient; import io.delta.kernel.unitycatalog.UCCatalogManagedCommitter; +import io.delta.kernel.unitycatalog.UCTableIdentifier; import io.delta.kernel.utils.FileStatus; import io.delta.storage.commit.Commit; import java.io.File; @@ -43,6 +44,9 @@ *
{@code
  * {
  *   "uc_table_id": "12345678-1234-1234-1234-123456789abc",
+ *   "catalog_name": "benchmark_catalog",
+ *   "schema_name": "benchmark_schema",
+ *   "table_name": "benchmark_table",
  *   "max_ratified_version": 2,
  *   "log_tail": [
  *     {
@@ -141,6 +145,18 @@ public Commit toCommit(Engine engine, String tableRoot) throws IOException {
   @JsonProperty(value = "uc_table_id", required = true)
   private String ucTableId;
 
+  /** The Unity Catalog catalog name. */
+  @JsonProperty(value = "catalog_name", required = true)
+  private String catalogName;
+
+  /** The Unity Catalog schema name. */
+  @JsonProperty(value = "schema_name", required = true)
+  private String schemaName;
+
+  /** The Unity Catalog table name. */
+  @JsonProperty(value = "table_name", required = true)
+  private String tableName;
+
   /** The maximum ratified version for this table in Unity Catalog. */
   @JsonProperty(value = "max_ratified_version", required = true)
   private long maxRatifiedVersion;
@@ -167,6 +183,11 @@ public String getUcTableId() {
     return ucTableId;
   }
 
+  /** @return the three-part Unity Catalog table identifier */
+  public UCTableIdentifier getUcTableIdentifier() {
+    return new UCTableIdentifier(catalogName, schemaName, tableName);
+  }
+
   /**
    * Creates an InMemoryUCClient for this table with the staged commits pre-loaded.
    *
diff --git a/kernel/kernel-benchmarks/src/test/java/io/delta/kernel/benchmarks/workloadrunners/WorkloadRunner.java b/kernel/kernel-benchmarks/src/test/java/io/delta/kernel/benchmarks/workloadrunners/WorkloadRunner.java
index 932ed4a814d..f2421517699 100644
--- a/kernel/kernel-benchmarks/src/test/java/io/delta/kernel/benchmarks/workloadrunners/WorkloadRunner.java
+++ b/kernel/kernel-benchmarks/src/test/java/io/delta/kernel/benchmarks/workloadrunners/WorkloadRunner.java
@@ -111,6 +111,7 @@ protected Snapshot loadSnapshot(Engine engine, TableInfo tableInfo, Optional versionOpt,
       Optional timestampOpt) {
     Objects.requireNonNull(engine, "engine is null");
     Objects.requireNonNull(ucTableId, "ucTableId is null");
     Objects.requireNonNull(tablePath, "tablePath is null");
+    Objects.requireNonNull(ucTableIdentifier, "ucTableIdentifier is null");
     Objects.requireNonNull(versionOpt, "versionOpt is null");
     Objects.requireNonNull(timestampOpt, "timestampOpt is null");
     versionOpt.ifPresent(version -> checkArgument(version >= 0, "version must be non-negative"));
@@ -122,7 +126,13 @@ public Snapshot loadSnapshot(
               () -> {
                 final GetCommitsResponse response =
                     metricsCollector.getCommitsTimer.timeChecked(
-                        () -> getRatifiedCommitsFromUC(ucTableId, tablePath, versionOpt));
+                        () ->
+                            getRatifiedCommitsFromUC(
+                                ucTableId,
+                                tablePath,
+                                versionOpt,
+                                UCCatalogManagedCommitter.toStorageTableIdentifier(
+                                    ucTableIdentifier)));
 
                 metricsCollector.setNumCatalogCommits(response.getCommits().size());
 
@@ -249,6 +259,7 @@ public CreateTableTransactionBuilder buildCreateTableTransaction(
    * @param engine The Delta Kernel {@link Engine} to use for loading the table.
    * @param ucTableId The Unity Catalog table ID, which is a unique identifier for the table in UC.
    * @param tablePath The path to the Delta table in the underlying storage system.
+   * @param ucTableIdentifier The three-part Unity Catalog table identifier.
    * @param startVersionOpt The optional start version boundary. This must be mutually exclusive
    *     with startTimestampOpt. Either this or startTimestampOpt must be provided.
    * @param startTimestampOpt The optional start timestamp boundary. This must be mutually exclusive
@@ -267,6 +278,7 @@ public CommitRange loadCommitRange(
       Engine engine,
       String ucTableId,
       String tablePath,
+      UCTableIdentifier ucTableIdentifier,
       Optional startVersionOpt,
       Optional startTimestampOpt,
       Optional endVersionOpt,
@@ -274,6 +286,7 @@ public CommitRange loadCommitRange(
     Objects.requireNonNull(engine, "engine is null");
     Objects.requireNonNull(ucTableId, "ucTableId is null");
     Objects.requireNonNull(tablePath, "tablePath is null");
+    Objects.requireNonNull(ucTableIdentifier, "ucTableIdentifier is null");
     Objects.requireNonNull(startVersionOpt, "startVersionOpt is null");
     Objects.requireNonNull(startTimestampOpt, "startTimestampOpt is null");
     Objects.requireNonNull(endVersionOpt, "endVersionOpt is null");
@@ -308,7 +321,11 @@ public CommitRange loadCommitRange(
     Optional endVersionOptForCommitQuery =
         endVersionOpt.filter(v -> !startTimestampOpt.isPresent());
     final GetCommitsResponse response =
-        getRatifiedCommitsFromUC(ucTableId, tablePath, endVersionOptForCommitQuery);
+        getRatifiedCommitsFromUC(
+            ucTableId,
+            tablePath,
+            endVersionOptForCommitQuery,
+            UCCatalogManagedCommitter.toStorageTableIdentifier(ucTableIdentifier));
     final long ucTableVersion = response.getLatestTableVersion();
     validateVersionBoundariesExist(ucTableId, startVersionOpt, endVersionOpt, ucTableVersion);
     final List logData =
@@ -413,7 +430,11 @@ private String getCommitRangeBoundariesString(
   }
 
   private GetCommitsResponse getRatifiedCommitsFromUC(
-      String ucTableId, String tablePath, Optional versionOpt) {
+      String ucTableId,
+      String tablePath,
+      Optional versionOpt,
+      TableIdentifier tableIdentifier) {
+    Objects.requireNonNull(tableIdentifier, "tableIdentifier is null");
     logger.info(
         "[{}] Invoking the UCClient to get ratified commits at version {}",
         ucTableId,
@@ -430,6 +451,7 @@ private GetCommitsResponse getRatifiedCommitsFromUC(
                 return ucClient.getCommits(
                     ucTableId,
                     new Path(tablePath).toUri(),
+                    tableIdentifier,
                     Optional.empty() /* startVersion */,
                     versionOpt /* endVersion */);
               } catch (IOException ex) {
diff --git a/kernel/unitycatalog/src/main/java/io/delta/kernel/unitycatalog/UCTableIdentifier.java b/kernel/unitycatalog/src/main/java/io/delta/kernel/unitycatalog/UCTableIdentifier.java
index 8e5bcc8c25e..bb6fbfe8145 100644
--- a/kernel/unitycatalog/src/main/java/io/delta/kernel/unitycatalog/UCTableIdentifier.java
+++ b/kernel/unitycatalog/src/main/java/io/delta/kernel/unitycatalog/UCTableIdentifier.java
@@ -18,7 +18,7 @@
 
 import static java.util.Objects.requireNonNull;
 
-/** Logical Unity Catalog table identifier used for create-time finalization. */
+/** Logical Unity Catalog table identifier used for table lifecycle and read operations. */
 public final class UCTableIdentifier {
   private final String catalogName;
   private final String schemaName;
diff --git a/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/InMemoryUCClient.scala b/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/InMemoryUCClient.scala
index 2e7b07da450..94b4e8ba73c 100644
--- a/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/InMemoryUCClient.scala
+++ b/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/InMemoryUCClient.scala
@@ -206,6 +206,7 @@ class InMemoryUCClient(ucMetastoreId: String) extends UCClient {
   override def getCommits(
       tableId: String,
       tableUri: URI,
+      tableIdentifier: TableIdentifier,
       startVersion: Optional[JLong],
       endVersion: Optional[JLong]): GetCommitsResponse = {
     val tableData = getTableDataElseThrow(tableId)
diff --git a/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/InMemoryUCClientSuite.scala b/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/InMemoryUCClientSuite.scala
index a593d4609c0..fb04a29fe5b 100644
--- a/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/InMemoryUCClientSuite.scala
+++ b/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/InMemoryUCClientSuite.scala
@@ -37,7 +37,12 @@ class InMemoryUCClientSuite extends AnyFunSuite with UCCatalogManagedTestUtils {
       endVersionOpt: Optional[JLong],
       expectedVersions: Seq[Long]): Unit = {
     val client = getInMemoryUCClientWithCommitsForTableId("tableId", allVersions)
-    val response = client.getCommits("tableId", fakeURI, startVersionOpt, endVersionOpt)
+    val response = client.getCommits(
+      "tableId",
+      fakeURI,
+      /* tableIdentifier = */ null,
+      startVersionOpt,
+      endVersionOpt)
     val actualVersions = response.getCommits.asScala.map(_.getVersion)
 
     assert(actualVersions == expectedVersions)
@@ -82,7 +87,12 @@ class InMemoryUCClientSuite extends AnyFunSuite with UCCatalogManagedTestUtils {
   test("getCommits throws InvalidTargetTableException for non-existent table") {
     val client = new InMemoryUCClient("ucMetastoreId")
     val exception = intercept[InvalidTargetTableException] {
-      client.getCommits("abcd", new URI("s3://bucket/table"), Optional.empty(), Optional.empty())
+      client.getCommits(
+        "abcd",
+        new URI("s3://bucket/table"),
+        /* tableIdentifier = */ null,
+        Optional.empty(),
+        Optional.empty())
     }
     assert(exception.getMessage.contains(s"Table not found: abcd"))
   }
diff --git a/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UCCatalogManagedClientCommitRangeSuite.scala b/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UCCatalogManagedClientCommitRangeSuite.scala
index 54046489e89..8bcfffc69e2 100644
--- a/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UCCatalogManagedClientCommitRangeSuite.scala
+++ b/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UCCatalogManagedClientCommitRangeSuite.scala
@@ -19,7 +19,7 @@ import java.util.Optional
 
 import io.delta.kernel.engine.Engine
 import io.delta.kernel.exceptions.KernelException
-import io.delta.storage.commit.uccommitcoordinator.{InvalidTargetTableException, UCClient}
+import io.delta.storage.commit.uccommitcoordinator.InvalidTargetTableException
 
 import org.scalatest.funsuite.AnyFunSuite
 
@@ -31,6 +31,7 @@ class UCCatalogManagedClientCommitRangeSuite extends AnyFunSuite with UCCatalogM
       engine: Engine = defaultEngine,
       ucTableId: String = "testUcTableId",
       tablePath: String = "testUcTablePath",
+      ucTableIdentifier: UCTableIdentifier = new UCTableIdentifier("cat", "sch", "tbl"),
       startVersionOpt: Optional[java.lang.Long] = emptyLongOpt,
       startTimestampOpt: Optional[java.lang.Long] = emptyLongOpt,
       endVersionOpt: Optional[java.lang.Long] = emptyLongOpt,
@@ -39,6 +40,7 @@ class UCCatalogManagedClientCommitRangeSuite extends AnyFunSuite with UCCatalogM
       engine,
       ucTableId,
       tablePath,
+      ucTableIdentifier,
       startVersionOpt,
       startTimestampOpt,
       endVersionOpt,
@@ -84,6 +86,10 @@ class UCCatalogManagedClientCommitRangeSuite extends AnyFunSuite with UCCatalogM
       // tablePath is null
       loadCommitRange(ucCatalogManagedClient, tablePath = null)
     }
+    assertThrows[NullPointerException] {
+      // ucTableIdentifier is null
+      loadCommitRange(ucCatalogManagedClient, ucTableIdentifier = null)
+    }
     assertThrows[NullPointerException] {
       // startVersionOpt is null
       loadCommitRange(ucCatalogManagedClient, startVersionOpt = null)
diff --git a/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UCCatalogManagedClientSuite.scala b/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UCCatalogManagedClientSuite.scala
index 86699717c1d..2e1d509d80e 100644
--- a/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UCCatalogManagedClientSuite.scala
+++ b/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UCCatalogManagedClientSuite.scala
@@ -89,6 +89,10 @@ class UCCatalogManagedClientSuite extends AnyFunSuite with UCCatalogManagedTestU
       // tablePath is null
       loadSnapshot(ucCatalogManagedClient, tablePath = null)
     }
+    assertThrows[NullPointerException] {
+      // ucTableIdentifier is null
+      loadSnapshot(ucCatalogManagedClient, ucTableIdentifier = null)
+    }
     assertThrows[NullPointerException] {
       // versionToLoad is null
       loadSnapshot(ucCatalogManagedClient, versionToLoad = null)
@@ -153,6 +157,26 @@ class UCCatalogManagedClientSuite extends AnyFunSuite with UCCatalogManagedTestU
     testCatalogManagedTable()
   }
 
+  test("loadTable forwards table identifier to getCommits") {
+    withUCClientAndTestTable { (ucClient, tablePath, _) =>
+      val ucCatalogManagedClient = new UCCatalogManagedClient(ucClient)
+      val ucTableIdentifier = new UCTableIdentifier("cat", "sch", "tbl")
+
+      ucCatalogManagedClient.loadSnapshot(
+        defaultEngine,
+        testUcTableId,
+        tablePath,
+        ucTableIdentifier,
+        emptyLongOpt,
+        emptyLongOpt)
+
+      val tableIdentifier = ucClient.getLastGetCommitsTableIdentifier
+      assert(tableIdentifier != null)
+      assert(tableIdentifier.getNamespace.toSeq == Seq("cat", "sch"))
+      assert(tableIdentifier.getName == "tbl")
+    }
+  }
+
   /* ---- Time-travel-by-version tests --- */
   test("loadTable correctly loads a UC table -- versionToLoad is a ratified commit (the max)") {
     testCatalogManagedTable(versionToLoad = Optional.of(2L))
diff --git a/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UCCatalogManagedTestUtils.scala b/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UCCatalogManagedTestUtils.scala
index 83c4a9ecf0f..cdce4b4f24c 100644
--- a/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UCCatalogManagedTestUtils.scala
+++ b/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UCCatalogManagedTestUtils.scala
@@ -38,7 +38,7 @@ import io.delta.kernel.internal.util.Utils.singletonCloseableIterator
 import io.delta.kernel.metrics.MetricsReport
 import io.delta.kernel.test.{ActionUtils, TestFixtures}
 import io.delta.kernel.utils.CloseableIterator
-import io.delta.storage.commit.{Commit, GetCommitsResponse}
+import io.delta.storage.commit.{Commit, GetCommitsResponse, TableIdentifier}
 
 import InMemoryUCClient.TableData
 import org.apache.hadoop.conf.Configuration
@@ -91,12 +91,14 @@ trait UCCatalogManagedTestUtils
       engine: Engine = defaultEngine,
       ucTableId: String = "testUcTableId",
       tablePath: String = "testUcTablePath",
+      ucTableIdentifier: UCTableIdentifier = new UCTableIdentifier("cat", "sch", "tbl"),
       versionToLoad: Optional[java.lang.Long] = emptyLongOpt,
       timestampToLoad: Optional[java.lang.Long] = emptyLongOpt): SnapshotImpl = {
     ucCatalogManagedClient.loadSnapshot(
       engine,
       ucTableId,
       tablePath,
+      ucTableIdentifier,
       versionToLoad,
       timestampToLoad).asInstanceOf[SnapshotImpl]
   }
@@ -204,16 +206,21 @@ trait UCCatalogManagedTestUtils
   /** Wrapper class around InMemoryUCClient that tracks number of getCommit calls made */
   class InMemoryUCClientWithMetrics(ucMetastoreId: String) extends InMemoryUCClient(ucMetastoreId) {
     private var numGetCommitsCalls: Long = 0
+    private var lastGetCommitsTableIdentifier: TableIdentifier = _
 
     override def getCommits(
         tableId: String,
         tableUri: URI,
+        tableIdentifier: TableIdentifier,
         startVersion: Optional[JLong],
         endVersion: Optional[JLong]): GetCommitsResponse = {
       numGetCommitsCalls += 1
-      super.getCommits(tableId, tableUri, startVersion, endVersion)
+      lastGetCommitsTableIdentifier = tableIdentifier
+      super.getCommits(tableId, tableUri, tableIdentifier, startVersion, endVersion)
     }
 
     def getNumGetCommitCalls: Long = numGetCommitsCalls
+
+    def getLastGetCommitsTableIdentifier: TableIdentifier = lastGetCommitsTableIdentifier
   }
 }
diff --git a/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UCE2ESuite.scala b/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UCE2ESuite.scala
index 591885d63fe..40081a71e5d 100644
--- a/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UCE2ESuite.scala
+++ b/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UCE2ESuite.scala
@@ -19,7 +19,6 @@ package io.delta.kernel.unitycatalog
 import java.util.Optional
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
 
 import io.delta.kernel.{CommitRange, Operation}
 import io.delta.kernel.Snapshot
@@ -29,9 +28,8 @@ import io.delta.kernel.internal.SnapshotImpl
 import io.delta.kernel.internal.util.FileNames
 import io.delta.kernel.unitycatalog.UCCatalogManagedCommitter
 import io.delta.kernel.utils.CloseableIterable
-import io.delta.storage.commit.{Commit, GetCommitsResponse}
+import io.delta.storage.commit.{GetCommitsResponse, TableIdentifier}
 
-import InMemoryUCClient.TableData
 import org.scalatest.funsuite.AnyFunSuite
 
 class UCE2ESuite extends AnyFunSuite with UCCatalogManagedTestUtils {
@@ -385,6 +383,7 @@ class UCE2ESuite extends AnyFunSuite with UCCatalogManagedTestUtils {
         ucClient,
         expCommitVersion = 3,
         expNumCatalogCommits = 3)
+      val ucTableIdentifier = new UCTableIdentifier("cat", "sch", "tbl")
 
       // Step 3: Publish all versions
       postCommitSnapshot3.publish(engine)
@@ -394,6 +393,7 @@ class UCE2ESuite extends AnyFunSuite with UCCatalogManagedTestUtils {
         engine,
         testUcTableId,
         tablePath,
+        ucTableIdentifier,
         Optional.of(0),
         emptyLongOpt,
         emptyLongOpt,
@@ -410,6 +410,7 @@ class UCE2ESuite extends AnyFunSuite with UCCatalogManagedTestUtils {
         engine,
         testUcTableId,
         tablePath,
+        ucTableIdentifier,
         Optional.of(0),
         emptyLongOpt,
         emptyLongOpt,
@@ -423,6 +424,7 @@ class UCE2ESuite extends AnyFunSuite with UCCatalogManagedTestUtils {
         engine,
         testUcTableId,
         tablePath,
+        ucTableIdentifier,
         Optional.of(2),
         emptyLongOpt,
         emptyLongOpt,
@@ -446,9 +448,10 @@ object UCE2ESuite {
     override def getCommits(
         tableId: String,
         tableUri: java.net.URI,
+        tableIdentifier: TableIdentifier,
         startVersion: Optional[java.lang.Long],
         endVersion: Optional[java.lang.Long]): GetCommitsResponse = {
-      val response = super.getCommits(tableId, tableUri, startVersion, endVersion)
+      val response = super.getCommits(tableId, tableUri, tableIdentifier, startVersion, endVersion)
       maxVersionLimit match {
         case Some(limit) =>
           // Filter commits and limit maxRatifiedVersion
diff --git a/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UcLoadSnapshotTelemetrySuite.scala b/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UcLoadSnapshotTelemetrySuite.scala
index 338a12695b9..89180d982a0 100644
--- a/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UcLoadSnapshotTelemetrySuite.scala
+++ b/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UcLoadSnapshotTelemetrySuite.scala
@@ -71,6 +71,7 @@ class UcLoadSnapshotTelemetrySuite
       val engine = createEngineWithMetricsCapture(reporter)
       val tablePath = engine.getFileSystemClient.resolvePath(tablePathUnresolved)
       val (ucClient, ucCatalogManagedClient) = createUCClientAndCatalogManagedClient()
+      val ucTableIdentifier = new UCTableIdentifier("cat", "sch", "tbl")
       setupTableWithCommits(engine, tablePath, ucClient, ucCatalogManagedClient)
       reporter.reports.clear()
 
@@ -79,6 +80,7 @@ class UcLoadSnapshotTelemetrySuite
         engine,
         "testUcTableId",
         tablePath,
+        ucTableIdentifier,
         Optional.empty(),
         Optional.empty())
 
@@ -112,6 +114,7 @@ class UcLoadSnapshotTelemetrySuite
       val engine = createEngineWithMetricsCapture(reporter)
       val tablePath = engine.getFileSystemClient.resolvePath(tablePathUnresolved)
       val (ucClient, ucCatalogManagedClient) = createUCClientAndCatalogManagedClient()
+      val ucTableIdentifier = new UCTableIdentifier("cat", "sch", "tbl")
 
       val timestampBetweenV1AndV2 =
         setupTableWithCommits(engine, tablePath, ucClient, ucCatalogManagedClient)
@@ -122,6 +125,7 @@ class UcLoadSnapshotTelemetrySuite
         engine,
         "testUcTableId",
         tablePath,
+        ucTableIdentifier,
         Optional.empty(),
         Optional.of(timestampBetweenV1AndV2))
 
@@ -157,6 +161,7 @@ class UcLoadSnapshotTelemetrySuite
 
       val tablePath = engine.getFileSystemClient.resolvePath(tablePathUnresolved)
       val (_, ucCatalogManagedClient) = createUCClientAndCatalogManagedClient()
+      val ucTableIdentifier = new UCTableIdentifier("cat", "sch", "tbl")
 
       intercept[RuntimeException] {
         // Try to load snapshot from non-existent table
@@ -164,6 +169,7 @@ class UcLoadSnapshotTelemetrySuite
           engineWithReporter,
           "nonExistentTableId",
           tablePath,
+          ucTableIdentifier,
           Optional.empty(),
           Optional.empty())
       }
diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/InMemoryUCClient.scala b/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/InMemoryUCClient.scala
index 939e95a0e87..2d13edb4724 100644
--- a/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/InMemoryUCClient.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/InMemoryUCClient.scala
@@ -44,6 +44,7 @@ import io.delta.storage.commit.uniform.UniformMetadata
  * val getCommitsResponse = client.getCommits(
  *     "tableId",
  *     new URI("tableUri"),
+ *     /* tableIdentifier = */ null,
  *     Optional.empty(),
  *     Optional.empty())
  * }}}
@@ -87,6 +88,7 @@ class InMemoryUCClient(
   override def getCommits(
       tableId: String,
       tableUri: URI,
+      tableIdentifier: TableIdentifier,
       startVersion: Optional[JLong],
       endVersion: Optional[JLong]): JGetCommitsResponse = {
     ucCommitCoordinator.getCommitsFromCoordinator(
diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorClientSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorClientSuite.scala
index 743a95d1865..640ba1af396 100644
--- a/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorClientSuite.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorClientSuite.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.delta.coordinatedcommits
 
 import java.io.IOException
 import java.lang.{Long => JLong}
-import java.util.{List => JList, Optional}
+import java.util.{Collections, List => JList, Optional}
 
 import scala.collection.JavaConverters._
 import scala.jdk.OptionConverters._
@@ -42,7 +42,9 @@ import io.delta.storage.commit.{
   Commit => JCommit,
   CommitFailedException => JCommitFailedException,
   CoordinatedCommitsUtils => JCoordinatedCommitsUtils,
+  GetCommitsResponse => JGetCommitsResponse,
   TableDescriptor,
+  TableIdentifier => JTableIdentifier,
   UpdatedActions
 }
 import io.delta.storage.commit.uccommitcoordinator.{
@@ -89,6 +91,36 @@ class UCCommitCoordinatorClientSuite extends UCCommitCoordinatorClientSuiteBase
     })
   }
 
+  test("getCommits forwards table identifier to UCClient") {
+    withTempTableDir { tempDir =>
+      val log = DeltaLog.forTable(spark, tempDir.toString)
+      var capturedTableIdentifier: JTableIdentifier = null
+      val capturingUCClient = new InMemoryUCClient(metastoreId.toString, ucCommitCoordinator) {
+        override def getCommits(
+            tableId: String,
+            tableUri: java.net.URI,
+            tableIdentifier: JTableIdentifier,
+            startVersion: Optional[JLong],
+            endVersion: Optional[JLong]): JGetCommitsResponse = {
+          capturedTableIdentifier = tableIdentifier
+          new JGetCommitsResponse(Collections.emptyList(), -1L)
+        }
+      }
+      val tableCommitCoordinatorClient = TableCommitCoordinatorClient(
+        new UCCommitCoordinatorClient(Map.empty[String, String].asJava, capturingUCClient),
+        log,
+        Map(UCCommitCoordinatorClient.UC_TABLE_ID_KEY -> tableUUID.toString)
+      )
+      val tableIdentifier = TableIdentifier("tbl", Some("default"), Some("main"))
+
+      tableCommitCoordinatorClient.getCommits(Some(tableIdentifier))
+
+      assert(capturedTableIdentifier != null)
+      assert(capturedTableIdentifier.getNamespace.toSeq == Seq("main", "default"))
+      assert(capturedTableIdentifier.getName == "tbl")
+    }
+  }
+
   test("incorrect last known backfilled version") {
     withTempTableDir { tempDir =>
       val log = DeltaLog.forTable(spark, tempDir.toString)
diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/snapshot/unitycatalog/UCManagedTableSnapshotManager.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/snapshot/unitycatalog/UCManagedTableSnapshotManager.java
index 57c90984882..1b2a4622440 100644
--- a/spark/v2/src/main/java/io/delta/spark/internal/v2/snapshot/unitycatalog/UCManagedTableSnapshotManager.java
+++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/snapshot/unitycatalog/UCManagedTableSnapshotManager.java
@@ -24,6 +24,7 @@
 import io.delta.kernel.internal.SnapshotImpl;
 import io.delta.kernel.internal.files.ParsedCatalogCommitData;
 import io.delta.kernel.unitycatalog.UCCatalogManagedClient;
+import io.delta.kernel.unitycatalog.UCTableIdentifier;
 import io.delta.spark.internal.v2.exception.VersionNotFoundException;
 import io.delta.spark.internal.v2.snapshot.DeltaSnapshotManager;
 import java.util.List;
@@ -40,6 +41,7 @@ public class UCManagedTableSnapshotManager implements DeltaSnapshotManager {
   private final UCCatalogManagedClient ucCatalogManagedClient;
   private final String tableId;
   private final String tablePath;
+  private final UCTableIdentifier tableIdentifier;
   private final Engine engine;
 
   /**
@@ -56,6 +58,7 @@ public UCManagedTableSnapshotManager(
     requireNonNull(tableInfo, "tableInfo is null");
     this.tableId = tableInfo.getTableId();
     this.tablePath = tableInfo.getTablePath();
+    this.tableIdentifier = tableInfo.getTableIdentifier();
     this.engine = requireNonNull(engine, "engine is null");
   }
 
@@ -66,18 +69,22 @@ public UCManagedTableSnapshotManager(
    */
   @Override
   public Snapshot loadLatestSnapshot() {
-    return ucCatalogManagedClient.loadSnapshot(
-        engine,
-        tableId,
-        tablePath,
-        Optional.empty() /* versionOpt */,
-        Optional.empty() /* timestampOpt */);
+    return loadSnapshot(Optional.empty() /* versionOpt */);
   }
 
   @Override
   public Snapshot loadSnapshotAt(long version) {
+    return loadSnapshot(Optional.of(version));
+  }
+
+  private Snapshot loadSnapshot(Optional versionOpt) {
     return ucCatalogManagedClient.loadSnapshot(
-        engine, tableId, tablePath, Optional.of(version), Optional.empty() /* timestampOpt */);
+        engine,
+        tableId,
+        tablePath,
+        tableIdentifier,
+        versionOpt,
+        Optional.empty() /* timestampOpt */);
   }
 
   /**
@@ -172,6 +179,7 @@ public CommitRange getTableChanges(Engine engine, long startVersion, Optional authConfig;
 
   public UCTableInfo(
-      String tableId, String tablePath, String ucUri, Map authConfig) {
+      String tableId,
+      String tablePath,
+      UCTableIdentifier tableIdentifier,
+      String ucUri,
+      Map authConfig) {
     this.tableId = requireNonNull(tableId, "tableId is null");
     this.tablePath = requireNonNull(tablePath, "tablePath is null");
+    this.tableIdentifier = requireNonNull(tableIdentifier, "tableIdentifier is null");
     this.ucUri = requireNonNull(ucUri, "ucUri is null");
     this.authConfig = Collections.unmodifiableMap(requireNonNull(authConfig, "authConfig is null"));
   }
@@ -49,6 +56,10 @@ public String getTablePath() {
     return tablePath;
   }
 
+  public UCTableIdentifier getTableIdentifier() {
+    return tableIdentifier;
+  }
+
   public String getUcUri() {
     return ucUri;
   }
diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/snapshot/unitycatalog/UCUtils.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/snapshot/unitycatalog/UCUtils.java
index f0f6133b577..dcdba945e9f 100644
--- a/spark/v2/src/main/java/io/delta/spark/internal/v2/snapshot/unitycatalog/UCUtils.java
+++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/snapshot/unitycatalog/UCUtils.java
@@ -18,6 +18,7 @@
 import static java.util.Objects.requireNonNull;
 import static scala.jdk.javaapi.CollectionConverters.asJava;
 
+import io.delta.kernel.unitycatalog.UCTableIdentifier;
 import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorClient;
 import java.util.Map;
 import java.util.Optional;
@@ -87,7 +88,13 @@ public static Optional extractTableInfo(
     UCCatalogConfig config = configOpt.get();
     String ucUri = config.uri();
 
-    return Optional.of(new UCTableInfo(tableId, tablePath, ucUri, asJava(config.authConfig())));
+    return Optional.of(
+        new UCTableInfo(
+            tableId,
+            tablePath,
+            extractTableIdentifier(catalogTable),
+            ucUri,
+            asJava(config.authConfig())));
   }
 
   private static String extractUCTableId(CatalogTable catalogTable) {
@@ -110,4 +117,23 @@ private static String extractTablePath(CatalogTable catalogTable) {
     }
     return catalogTable.location().toString();
   }
+
+  private static UCTableIdentifier extractTableIdentifier(CatalogTable catalogTable) {
+    scala.Option catalogOption = catalogTable.identifier().catalog();
+    if (catalogOption.isEmpty()) {
+      throw new IllegalArgumentException(
+          "Unable to determine Unity Catalog for table "
+              + catalogTable.identifier()
+              + ": catalog name is missing.");
+    }
+    scala.Option schemaOption = catalogTable.identifier().database();
+    if (schemaOption.isEmpty()) {
+      throw new IllegalArgumentException(
+          "Unable to determine Unity Catalog schema for table "
+              + catalogTable.identifier()
+              + ": schema name is missing.");
+    }
+    return new UCTableIdentifier(
+        catalogOption.get(), schemaOption.get(), catalogTable.identifier().table());
+  }
 }
diff --git a/spark/v2/src/test/java/io/delta/spark/internal/v2/ddl/DDLRequestContextTest.java b/spark/v2/src/test/java/io/delta/spark/internal/v2/ddl/DDLRequestContextTest.java
index b48e4292bda..7d08bcf80fc 100644
--- a/spark/v2/src/test/java/io/delta/spark/internal/v2/ddl/DDLRequestContextTest.java
+++ b/spark/v2/src/test/java/io/delta/spark/internal/v2/ddl/DDLRequestContextTest.java
@@ -28,6 +28,7 @@
 import io.delta.kernel.types.StructField;
 import io.delta.kernel.types.StructType;
 import io.delta.kernel.types.TimestampType;
+import io.delta.kernel.unitycatalog.UCTableIdentifier;
 import io.delta.spark.internal.v2.snapshot.unitycatalog.UCTableInfo;
 import java.util.HashMap;
 import java.util.List;
@@ -55,7 +56,11 @@ public void testConstructionWithAllFields() {
         DataLayoutSpec.clustered(List.of(new Column("user_id"), new Column("event_ts")));
     UCTableInfo ucInfo =
         new UCTableInfo(
-            "uc-table-id-123", tablePath, "https://uc.example.com", Map.of("token", "fake-token"));
+            "uc-table-id-123",
+            tablePath,
+            new UCTableIdentifier("prod_catalog", "analytics", "page_views"),
+            "https://uc.example.com",
+            Map.of("token", "fake-token"));
     CreateTableTransactionBuilder txnBuilder =
         TableManager.buildCreateTableTransaction(tablePath, schema, "test");
 
diff --git a/spark/v2/src/test/java/io/delta/spark/internal/v2/snapshot/unitycatalog/UCTableInfoTest.java b/spark/v2/src/test/java/io/delta/spark/internal/v2/snapshot/unitycatalog/UCTableInfoTest.java
index 1e52030457f..1b21790647f 100644
--- a/spark/v2/src/test/java/io/delta/spark/internal/v2/snapshot/unitycatalog/UCTableInfoTest.java
+++ b/spark/v2/src/test/java/io/delta/spark/internal/v2/snapshot/unitycatalog/UCTableInfoTest.java
@@ -16,7 +16,9 @@
 package io.delta.spark.internal.v2.snapshot.unitycatalog;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
+import io.delta.kernel.unitycatalog.UCTableIdentifier;
 import java.util.HashMap;
 import java.util.Map;
 import org.junit.jupiter.api.Test;
@@ -29,6 +31,8 @@ void testConstructor() {
     // Use distinctive values that would fail if implementation had hardcoded defaults
     String tableId = "uc_tbl_7f3a9b2c-e8d1-4f6a";
     String tablePath = "abfss://container@acct.dfs.core.windows.net/delta/v2";
+    UCTableIdentifier tableIdentifier =
+        new UCTableIdentifier("prod_catalog", "analytics", "page_views");
     String ucUri = "https://uc-server.example.net/api/2.1/uc";
     String ucToken = "dapi_Kx9mN$2pQr#7vWz";
 
@@ -36,14 +40,34 @@ void testConstructor() {
     authConfig.put("type", "static");
     authConfig.put("token", ucToken);
 
-    UCTableInfo info = new UCTableInfo(tableId, tablePath, ucUri, authConfig);
+    UCTableInfo info = new UCTableInfo(tableId, tablePath, tableIdentifier, ucUri, authConfig);
 
     assertEquals(tableId, info.getTableId(), "Table ID should be stored correctly");
     assertEquals(tablePath, info.getTablePath(), "Table path should be stored correctly");
+    assertEquals(tableIdentifier, info.getTableIdentifier(), "Identifier should be stored");
     assertEquals(ucUri, info.getUcUri(), "UC URI should be stored correctly");
 
     Map ret = info.getAuthConfig();
     assertEquals("static", ret.get("type"), "Type should be static");
     assertEquals(ucToken, ret.get("token"), "UC token should be stored correctly in configMap");
   }
+
+  @Test
+  void testConstructorRequiresTableIdentifier() {
+    Map authConfig = new HashMap<>();
+    authConfig.put("token", "fake-token");
+
+    NullPointerException ex =
+        assertThrows(
+            NullPointerException.class,
+            () ->
+                new UCTableInfo(
+                    "uc_tbl_123",
+                    "s3://bucket/table",
+                    null,
+                    "https://uc-server.example.net/api/2.1/uc",
+                    authConfig));
+
+    assertEquals("tableIdentifier is null", ex.getMessage());
+  }
 }
diff --git a/spark/v2/src/test/scala/io/delta/spark/internal/v2/snapshot/unitycatalog/UCManagedTableSnapshotManagerSuite.scala b/spark/v2/src/test/scala/io/delta/spark/internal/v2/snapshot/unitycatalog/UCManagedTableSnapshotManagerSuite.scala
index a801dfb139d..534edaa4b0f 100644
--- a/spark/v2/src/test/scala/io/delta/spark/internal/v2/snapshot/unitycatalog/UCManagedTableSnapshotManagerSuite.scala
+++ b/spark/v2/src/test/scala/io/delta/spark/internal/v2/snapshot/unitycatalog/UCManagedTableSnapshotManagerSuite.scala
@@ -20,7 +20,7 @@ import java.util.Optional
 import scala.jdk.CollectionConverters._
 
 import io.delta.kernel.exceptions.KernelException
-import io.delta.kernel.unitycatalog.{InMemoryUCClient, UCCatalogManagedClient, UCCatalogManagedTestUtils}
+import io.delta.kernel.unitycatalog.{InMemoryUCClient, UCCatalogManagedClient, UCCatalogManagedTestUtils, UCTableIdentifier}
 import io.delta.spark.internal.v2.exception.VersionNotFoundException
 import io.delta.storage.commit.uccommitcoordinator.InvalidTargetTableException
 
@@ -32,6 +32,7 @@ class UCManagedTableSnapshotManagerSuite
     with UCCatalogManagedTestUtils {
 
   private val testUcTableId = "testUcTableId"
+  private val testTableIdentifier = new UCTableIdentifier("cat", "sch", "tbl")
   private val testUcUri = "https://test-uc.example.com"
   private val testUcToken = "test-token"
   private val testUcAuthConfig = Map("token" -> testUcToken).asJava
@@ -40,7 +41,8 @@ class UCManagedTableSnapshotManagerSuite
       ucClient: InMemoryUCClient,
       tablePath: String) = {
     val client = new UCCatalogManagedClient(ucClient)
-    val tableInfo = new UCTableInfo(testUcTableId, tablePath, testUcUri, testUcAuthConfig)
+    val tableInfo =
+      new UCTableInfo(testUcTableId, tablePath, testTableIdentifier, testUcUri, testUcAuthConfig)
     new UCManagedTableSnapshotManager(client, tableInfo, defaultEngine)
   }
 
@@ -49,7 +51,13 @@ class UCManagedTableSnapshotManagerSuite
   test("constructor rejects null arguments") {
     val ucClient = new InMemoryUCClient("testMetastore")
     val client = new UCCatalogManagedClient(ucClient)
-    val tableInfo = new UCTableInfo(testUcTableId, "/test/path", testUcUri, testUcAuthConfig)
+    val tableInfo =
+      new UCTableInfo(
+        testUcTableId,
+        "/test/path",
+        testTableIdentifier,
+        testUcUri,
+        testUcAuthConfig)
 
     val ex1 = intercept[NullPointerException] {
       new UCManagedTableSnapshotManager(null, tableInfo, defaultEngine)
@@ -76,12 +84,19 @@ class UCManagedTableSnapshotManagerSuite
       val snapshot = manager.loadLatestSnapshot()
 
       assert(snapshot.getVersion == maxRatifiedVersion)
+      assert(ucClient.getLastGetCommitsTableIdentifier.getNamespace.toSeq == Seq("cat", "sch"))
+      assert(ucClient.getLastGetCommitsTableIdentifier.getName == "tbl")
     }
   }
 
   test("loadLatestSnapshot: throws when table does not exist in catalog") {
     val ucClient = new InMemoryUCClient("ucMetastoreId")
-    val tableInfo = new UCTableInfo("nonExistentTableId", "/fake/path", testUcUri, testUcAuthConfig)
+    val tableInfo = new UCTableInfo(
+      "nonExistentTableId",
+      "/fake/path",
+      testTableIdentifier,
+      testUcUri,
+      testUcAuthConfig)
     val client = new UCCatalogManagedClient(ucClient)
     val manager = new UCManagedTableSnapshotManager(client, tableInfo, defaultEngine)
 
@@ -256,6 +271,8 @@ class UCManagedTableSnapshotManagerSuite
       val fullRange = manager.getTableChanges(defaultEngine, 0L, Optional.of(maxRatifiedVersion))
       assert(fullRange.getStartVersion == 0L)
       assert(fullRange.getEndVersion == maxRatifiedVersion)
+      assert(ucClient.getLastGetCommitsTableIdentifier.getNamespace.toSeq == Seq("cat", "sch"))
+      assert(ucClient.getLastGetCommitsTableIdentifier.getName == "tbl")
 
       val toLatest = manager.getTableChanges(defaultEngine, 1L, Optional.empty())
       assert(toLatest.getStartVersion == 1L)
@@ -294,7 +311,12 @@ class UCManagedTableSnapshotManagerSuite
 
   test("operations propagate InvalidTargetTableException from client") {
     val ucClient = new InMemoryUCClient("ucMetastoreId")
-    val tableInfo = new UCTableInfo("nonExistentTableId", "/fake/path", testUcUri, testUcAuthConfig)
+    val tableInfo = new UCTableInfo(
+      "nonExistentTableId",
+      "/fake/path",
+      testTableIdentifier,
+      testUcUri,
+      testUcAuthConfig)
     val client = new UCCatalogManagedClient(ucClient)
     val manager = new UCManagedTableSnapshotManager(client, tableInfo, defaultEngine)
 
diff --git a/spark/v2/src/test/scala/io/delta/spark/internal/v2/snapshot/unitycatalog/UCUtilsSuite.scala b/spark/v2/src/test/scala/io/delta/spark/internal/v2/snapshot/unitycatalog/UCUtilsSuite.scala
index 9def2d32602..64486098f85 100644
--- a/spark/v2/src/test/scala/io/delta/spark/internal/v2/snapshot/unitycatalog/UCUtilsSuite.scala
+++ b/spark/v2/src/test/scala/io/delta/spark/internal/v2/snapshot/unitycatalog/UCUtilsSuite.scala
@@ -163,6 +163,9 @@ class UCUtilsSuite extends SparkFunSuite with SharedSparkSession {
       assert(
         info.getTablePath == TABLE_PATH_ALPHA,
         s"Table path mismatch: got ${info.getTablePath}")
+      assert(info.getTableIdentifier.getCatalogName == CATALOG_ALPHA)
+      assert(info.getTableIdentifier.getSchemaName == "default")
+      assert(info.getTableIdentifier.getTableName == "tbl")
       assert(info.getUcUri == UC_URI_ALPHA, s"UC URI mismatch: got ${info.getUcUri}")
       val configMap = info.getAuthConfig
       assert(
@@ -223,6 +226,9 @@ class UCUtilsSuite extends SparkFunSuite with SharedSparkSession {
       assert(
         info.getTablePath == tablePathBeta,
         s"Should extract tablePathBeta, got: ${info.getTablePath}")
+      assert(info.getTableIdentifier.getCatalogName == catalogBeta)
+      assert(info.getTableIdentifier.getSchemaName == "default")
+      assert(info.getTableIdentifier.getTableName == "tbl")
     } finally {
       configs.foreach { case (key, _) =>
         originalValues.get(key).flatten match {
diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCClient.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCClient.java
index 7343dc2d84b..a32d0028abc 100644
--- a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCClient.java
+++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCClient.java
@@ -63,7 +63,9 @@ public interface UCClient extends AutoCloseable {
    * @param tableUri The URI of the storage location of the table (e.g.,
    *                 {@code s3://bucket/path/to/table}, not the {@code _delta_log} path).
    * @param tableIdentifier The three-part table identifier (catalog, schema, table name)
-   *                        for the table in Unity Catalog, or null if unavailable.
+   *                        for the table in Unity Catalog, or null when the caller has no
+   *                        catalog context. Implementations that require this value must
+   *                        reject null; implementations that don't use it may ignore it.
    * @param commit An Optional containing the Commit object with the changes to be committed.
    *               If empty, it indicates that no new data is being added in this commit.
    * @param lastKnownBackfilledVersion An Optional containing the last known backfilled version
@@ -110,6 +112,10 @@ void commit(
    *                 (and not s3://bucket/path/to/table/_delta_log).
    *                 If the tableId exists but the tableUri is different from the one previously
    *                 registered (e.g., if the table as moved), the request will fail.
+   * @param tableIdentifier The three-part table identifier (catalog, schema, table name)
+   *                        for the table in Unity Catalog, or null when the caller has no
+   *                        catalog context. Implementations that require this value must
+   *                        reject null; implementations that don't use it may ignore it.
    * @param startVersion An Optional containing the start version of the range of commits to
    *                     retrieve.
    * @param endVersion An Optional containing the end version of the range of commits to retrieve.
@@ -124,6 +130,7 @@ void commit(
   GetCommitsResponse getCommits(
       String tableId,
       URI tableUri,
+      TableIdentifier tableIdentifier,
       Optional startVersion,
       Optional endVersion) throws IOException, UCCommitCoordinatorException;
 
diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCCommitCoordinatorClient.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCCommitCoordinatorClient.java
index f387b1c9805..7cca7efb171 100644
--- a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCCommitCoordinatorClient.java
+++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCCommitCoordinatorClient.java
@@ -853,6 +853,7 @@ protected GetCommitsResponse getCommitsFromUCImpl(
       return ucClient.getCommits(
         extractUCTableId(tableDesc),
         CoordinatedCommitsUtils.getTablePath(tableDesc.getLogPath()).toUri(),
+        tableDesc.getTableIdentifier().orElse(null),
         startVersion,
         endVersion);
     } catch (IOException | UCCommitCoordinatorException e) {
diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClient.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClient.java
index 1f19840106f..dedd6abae5e 100644
--- a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClient.java
+++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClient.java
@@ -203,6 +203,7 @@ public void commit(
   public GetCommitsResponse getCommits(
       String tableId,
       URI tableUri,
+      TableIdentifier tableIdentifier,
       Optional startVersion,
       Optional endVersion) throws IOException, UCCommitCoordinatorException {
     throw new UnsupportedOperationException(
diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClient.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClient.java
index 67b582a1b00..4a385cd9338 100644
--- a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClient.java
+++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClient.java
@@ -199,6 +199,7 @@ public void commit(
   public GetCommitsResponse getCommits(
       String tableId,
       URI tableUri,
+      TableIdentifier tableIdentifier,
       Optional startVersion,
       Optional endVersion) throws IOException, UCCommitCoordinatorException {
     ensureOpen();
diff --git a/storage/src/test/scala/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClientSuite.scala b/storage/src/test/scala/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClientSuite.scala
index 04d55788f98..47551d18ad7 100644
--- a/storage/src/test/scala/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClientSuite.scala
+++ b/storage/src/test/scala/io/delta/storage/commit/uccommitcoordinator/UCDeltaTokenBasedRestClientSuite.scala
@@ -513,7 +513,8 @@ class UCDeltaTokenBasedRestClientSuite
   test("getCommits throws UnsupportedOperationException") {
     withClient { c =>
       intercept[UnsupportedOperationException] {
-        c.getCommits(testTableId, new URI("s3://b/t"), Optional.empty(), Optional.empty())
+        c.getCommits(testTableId, new URI("s3://b/t"), testIdentifier,
+          Optional.empty(), Optional.empty())
       }
     }
   }
diff --git a/storage/src/test/scala/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClientSuite.scala b/storage/src/test/scala/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClientSuite.scala
index 8d0a52ba2d2..cdc6ffc93ff 100644
--- a/storage/src/test/scala/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClientSuite.scala
+++ b/storage/src/test/scala/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClientSuite.scala
@@ -228,7 +228,11 @@ class UCTokenBasedRestClientSuite
     commitsHandler = exchange => sendJson(exchange, HttpStatus.SC_OK, responseJson)
     withClient { client =>
       val response = client.getCommits(
-        testTableId, testTableUri, Optional.empty(), Optional.empty())
+        testTableId,
+        testTableUri,
+        /* tableIdentifier = */ null,
+        Optional.empty(),
+        Optional.empty())
       assert(response.getCommits.size() === 1)
       assert(response.getCommits.get(0).getVersion === 1L)
       assert(response.getLatestTableVersion === 1L)
@@ -238,10 +242,20 @@ class UCTokenBasedRestClientSuite
   test("getCommits validates required parameters") {
     withClient { client =>
       intercept[NullPointerException] {
-        client.getCommits(null, testTableUri, Optional.empty(), Optional.empty())
+        client.getCommits(
+          null,
+          testTableUri,
+          /* tableIdentifier = */ null,
+          Optional.empty(),
+          Optional.empty())
       }
       intercept[NullPointerException] {
-        client.getCommits(testTableId, null, Optional.empty(), Optional.empty())
+        client.getCommits(
+          testTableId,
+          null,
+          /* tableIdentifier = */ null,
+          Optional.empty(),
+          Optional.empty())
       }
     }
   }
@@ -250,7 +264,12 @@ class UCTokenBasedRestClientSuite
     commitsHandler = exchange => sendJson(exchange, HttpStatus.SC_NOT_FOUND, "{}")
     withClient { client =>
       intercept[InvalidTargetTableException] {
-        client.getCommits(testTableId, testTableUri, Optional.empty(), Optional.empty())
+        client.getCommits(
+          testTableId,
+          testTableUri,
+          /* tableIdentifier = */ null,
+          Optional.empty(),
+          Optional.empty())
       }
     }
   }