[Storage] Implement UC token-based getCommits#9
Draft
TimothyW553 wants to merge 16 commits into
Draft
Conversation
Please read my added comments in the conversation, they explain differences to the path I'd have expected to cleanly work. #### Which Delta project/connector is this regarding? - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (Build) ## Description Guarded behing a feature flag, this PR Implements B.2 of the [CDC SPIP](https://docs.google.com/document/d/1-4rCS3vsGIyhwnkAwPsEaqyUDg-AuVkdrYLotFPw0U0/edit?tab=t.0#heading=h.m1700lw4wsoj), so the `Changelog` interface. Furthermore, it adds catalog-driven Auto-CDF (`TableCatalog.loadChangelog`) for the DSv2 connector, so the kernel-based V2 reader stack answers `SELECT * FROM t CHANGES FROM VERSION/TIMESTAMP ...` batch queries introduced by [SPARK-55668](https://issues.apache.org/jira/browse/SPARK-55668) ([apache/spark#55508](apache/spark#55508)). The streaming CDC entrypoint shipped in delta-io#6359 is preserved unchanged and coexists with the new batch path through an explicit `CdcReadMode` flag. ### What the PR adds - **`DeltaCatalogChangelogSupport`** (new, in `spark-unified`) — abstract Scala class between `AbstractDeltaCatalog` (sparkV1) and the hybrid `DeltaCatalog` (spark-unified). Overrides `loadChangelog(ident, changelogInfo)` and dispatches on the `ChangelogRange` subtype (version / timestamp / unbounded). Resolves the table through `loadTable` (so it works for both `SparkTable` and `DeltaTableV2`), loads the latest Kernel snapshot via `SnapshotManagerFactory`, validates row tracking is enabled, applies bounds-inclusivity adjustments, and returns a `DeltaChangelog`. The class lives in `spark-unified` because the implementation references sparkV2 classes (`SparkTable`, `DeltaChangelog`, `SnapshotManagerFactory`, `V2SchemaUtils`) and `sparkV1` cannot depend on `sparkV2`. `DeltaCatalog.java` now extends `DeltaCatalogChangelogSupport`. - **`CdcReadMode`** (new enum, `sparkV2`) — replaces the prior `boolean isCDCRead` on `PartitionUtils.createDeltaParquetReaderFactory`: - `NONE` — non-CDC scan (`SparkBatch`). - `STREAMING` — opt-in streaming CDC (`SparkMicroBatchStream` when `readChangeFeed = true`). `PartitionUtils` owns the CDC schema augmentation and `CDCReadFunction` wrap, as before. - `BATCH_CHANGELOG` — Auto-CDF (`DeltaChangelogBatch`). `PartitionUtils` leaves schema and reader untouched; `CDCPartitionReaderFactory` in `DeltaChangelogBatch` injects `_change_type` / `_commit_version` / `_commit_timestamp` as per-partition constants instead. This avoids the double-injection / schema-vs-reader misalignment that the previous shared `isCDCRead=true` path caused. - **Feature flag** `DELTA_CHANGELOG_V2_ENABLED` (`changelogV2.enabled`, internal, default `false`). When disabled, `DeltaCatalogChangelogSupport.loadChangelog` delegates to `super.loadChangelog`, which surfaces the familiar `UNSUPPORTED_FEATURE.CHANGE_DATA_CAPTURE` error. Lets the implementation land without changing user-visible behavior until tests catch up. - **Per-commit ordering in `DeltaChangelogBatch.planInputPartitions`** — emits all `RemoveFile` partitions before `AddFile` partitions within a single commit, so Spark's batch CDC post-processor (`ResolveChangelogTable`) sees preimage → postimage pairs regardless of the action order in the on-disk commit log. The Delta protocol does not contract action order; pinning the partition order here makes the test expectations stable across protocol implementations. ### Why this is split this way The DSv2 Changelog interface ([apache/spark#55426](apache/spark#55426)) covers both batch and streaming, but the two enter Delta through different paths today: - Streaming opt-in via `option("readChangeFeed", "true")` → `SparkMicroBatchStream` → `PartitionUtils` with `CdcReadMode.STREAMING`. Existing surface from delta-io#6359; preserved by this PR. - Batch `CHANGES FROM` (SQL or DataFrame) → analyzer ([SPARK-56686](https://issues.apache.org/jira/browse/SPARK-56686), [SPARK-56687](https://issues.apache.org/jira/browse/SPARK-56687) / [apache/spark#55637](apache/spark#55637)) → `TableCatalog.loadChangelog` → `DeltaCatalog` → `DeltaCatalogChangelogSupport.loadChangelog` → `DeltaChangelog` → `DeltaChangelogScan` → `DeltaChangelogBatch` → `PartitionUtils` with `CdcReadMode.BATCH_CHANGELOG`. The follow-up `DeltaChangelogScan.toMicroBatchStream()` (so the catalog-driven Changelog also drives streaming reads, completing the surface that [apache/spark#55637](apache/spark#55637) builds on for `deduplicationMode = netChanges`) is left as a `TODO` in `DeltaChangelogScan` for a follow-up PR. ### Limitations / known follow-ups - Row tracking must be enabled on the source table (Auto-CDF surfaces a `requires row tracking` analysis error otherwise). - `UnboundedRange` is rejected with `DELTA_CHANGELOG_UNBOUNDED_RANGE`; Auto-CDF always operates over a bounded range. - `DeltaChangelogScan` does not yet implement `toMicroBatchStream()` — see inline `TODO`. - `DeltaChangelogBatch.planInputPartitions` still goes through `StreamingHelper.getCommitActionsFromRangeUnsafe` (marked `TODO` in the file). The helper is generic — only the class name is streaming-flavored. A separate rename / extract pass would be good. ### Build dependency The new code uses the `Changelog` / `ChangelogInfo` / `ChangelogRange` interfaces added in Spark 4.2 ([apache/spark#55426](apache/spark#55426)). This PR is tested by cherry picking changes from delta-io#6657, which re-enables the Spark 4.2 snapshot cross-build row. ## How was this patch tested? New tests (17 total, all green): - `DeltaChangelogDirectBatchExecutionTest` — exercises the `DeltaChangelogScan` → `Batch` → `PartitionReader` path directly, without going through SQL. Covers: initial insert + delete with paired DELETE/INSERT output, UPDATE-as-CoW producing paired preimage/postimage rows at the same commit, range slicing on non-zero start, and rowId / rowVersion field-reference contract. - `DeltaChangelogCatalogIntegrationTest` — exercises the catalog-routed entrypoint (`TableCatalog.loadChangelog`) over both SQL `CHANGES FROM` and the DataFrame API. Covers all four `ChangelogRange` shapes (version range, timestamp range, open-ended, exclusive bounds), boundary inclusivity, and the failure modes (timestamp before earliest commit, timestamp after latest commit, empty exclusive range, unbounded rejection). Pre-existing suites still pass: - `SparkMicroBatchStreamCDCTest` — confirms the streaming CDC entrypoint introduced in delta-io#6359 is unaffected by the `CdcReadMode` refactor. - `PartitionUtilsTest` — updated to pass `CdcReadMode` instead of `boolean`. Build / test command used locally: ``` build/sbt "sparkV2/testOnly io.delta.spark.internal.v2.read.changelog.*" ``` ## Does this PR introduce _any_ user-facing changes? No, not while the feature flag is at its default. With `changelogV2.enabled = true` (internal flag, default `false`): `SELECT * FROM <table> CHANGES FROM VERSION/TIMESTAMP ...` and the DataFrame `.changes(...)` builder become functional on row-tracking-enabled Delta tables via the V2 connector. Previously the V2 catalog rejected these queries with `UNSUPPORTED_FEATURE.CHANGE_DATA_CAPTURE`.
…delta-io#6683) ## What changes were proposed in this pull request? Adding support for `saveAsTable` through the `InMemorySparkTable`, onboarding more generated suites that use it. ## How was this patch tested? This patch onboards a couple more generated tests.
#### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [x] Kernel - [x] Other (Storage) ## Description Adds `TableIdentifier` to `UCClient#getCommits` so UC clients can receive the catalog/schema/table name when fetching commits. The identifier is forwarded from `UCCommitCoordinatorClient` when available in the `TableDescriptor`. Kernel catalog-managed snapshot loading also has an overload that forwards the identifier, and Spark v2 UC snapshot metadata now carries the identifier from `CatalogTable` into that Kernel path. The legacy `UCTokenBasedRestClient` accepts the new argument but keeps sending the existing legacy request fields. Resolves delta-io#6784. Addresses the `getCommits` API follow-up from delta-io#6780. ### tableIdentifier contract `tableIdentifier` is the three-part `catalog.schema.table` name (not the UC UUID `tableId`). Callers pass it when they have catalog context, null otherwise; receivers either require it (rejecting null) or ignore it. | Caller | Passes | |---|---| | Kernel `UCCatalogManagedClient` | Non-null (API requires) | | Spark V2 `UCManagedTableSnapshotManager` | Non-null (from `CatalogTable`) | | Spark V1 catalog access (via `DeltaTableV2`) | Non-null (threaded from `DeltaTableV2.catalogTable`) | | Spark V1 path-based (`DeltaTable.forPath`, `` delta.`s3://...` ``) | Null (no name exists) | | Flink `CatalogManagedTable` | Non-null (from qualified name) | what does each receiver do when it gets this new tableIdentifier? | Receiver | Behavior | |---|---| | `UCTokenBasedRestClient` (legacy) | Ignores | | `InMemoryUCClient` (test) | Ignores | | Future UC Delta REST client like `UCDeltaTokenBasedRestClient` | Will require non-null | ## How was this patch tested? ``` build/sbt javafmtAll scalafmtAll build/sbt "storage/testOnly io.delta.storage.commit.uccommitcoordinator.UCTokenBasedRestClientSuite" build/sbt "spark/testOnly org.apache.spark.sql.delta.coordinatedcommits.UCCommitCoordinatorClientSuite" build/sbt "kernelUnityCatalog/testOnly io.delta.kernel.unitycatalog.InMemoryUCClientSuite" "kernelUnityCatalog/testOnly io.delta.kernel.unitycatalog.UCCatalogManagedClientSuite" build/sbt "sparkV2/testOnly io.delta.spark.internal.v2.snapshot.unitycatalog.UCUtilsSuite" "sparkV2/testOnly io.delta.spark.internal.v2.snapshot.unitycatalog.UCManagedTableSnapshotManagerSuite" "sparkV2/testOnly io.delta.spark.internal.v2.snapshot.unitycatalog.UCTableInfoTest" ``` ## Does this PR introduce _any_ user-facing changes? No.
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [ ] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [x] Other (Protocol) ## Description This PR adds the Variant Shredding feature to the Delta protocol and makes relevant changes to the Variant Type section to suggest that Variants may also be stored in a shredded representation. Reference implementations: 1. Shredding of Variant in Parquet files was implemented in Spark in [this PR](apache/spark#52406). 2. Collection of per-file statistics on Variant data was implemented in this [Delta-Spark PR](delta-io#6664). 3. Variant shredding with per-file statistics have been used in production by Databricks customers since September 2025 ([doc](https://docs.databricks.com/aws/en/delta/variant-shredding)). ## How was this patch tested? NA ## Does this PR introduce _any_ user-facing changes? No --------- Co-authored-by: Ryan Johnson <scovich@users.noreply.github.com>
29c699c to
36aee29
Compare
…elta-io#6811) ## 🥞 Stacked PR Use this [link](https://github.com/delta-io/delta/pull/6811/files) to review incremental changes. - [**stack/UCDeltaTokenBasedRestClient_load**](delta-io#6811) [[Files changed](https://github.com/delta-io/delta/pull/6811/files)] - [stack/DeltaCatalogClient_load](delta-io#6796) [[Files changed](https://github.com/delta-io/delta/pull/6796/files/3e2f64f79dbdbbd3a360dbd1c619ee22b9a7417d..d3ae3e7daa1031672f2d974322a3e36c6ad913e6)] - stack/DeltaCatalogClient_create --------- #### Which Delta project/connector is this regarding? - [ ] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [x] Other (Storage) ## Description [Storage] Extend UCDeltaClient with table-loading ops + exceptions - loadTable / createStagingTable / createTable now take TableIdentifier; createTable takes AbstractMetadata + AbstractProtocol (mirrors commit()). - TableInfo realigned with StagingTableInfo (tableId field + ordering). - New typed exceptions: CredentialFetchFailedException, NoSuchTableException, UnsupportedTableFormatException. - build.sbt: conditional unitycatalog-hadoop dep gated on UC version >= 0.5.0 via a small isAtLeastVersion helper. ## How was this patch tested? Added tests ## Does this PR introduce _any_ user-facing changes? No --------- Signed-off-by: Yi Li <yi.li@databricks.com>
- Compare table UUIDs via UUID equality instead of case-sensitive string compare, matching the existing UUID.fromString pattern in commit(). - Extract fromDeltaCommit helper to mirror UCTokenBasedRestClient's fromDeltaCommitInfo and remove the inline null-check storm inside the Commit/FileStatus constructor. - Document why version filtering is client-side (loadTable does not expose server-side filters). - Drop the dead requireNonNull(response, ...) after a successful loadTable; the SDK never returns null on 2xx.
36aee29 to
473ef06
Compare
- Extract resolveThreePartName helper used by loadTable, commit, and getCommits, replacing three near-identical inline parses of TableIdentifier with one source of truth (per openinx review). - Change getCommits 404 from InvalidTargetTableException to NoSuchTableException, matching loadTable and the typed exception introduced in delta-io#6811. - Update the 404 test to mirror loadTable's NoSuchTableException test (asserts the qualified table name and the response body are in the error message).
57ee576 to
9b9b931
Compare
#### Which Delta project/connector is this regarding? - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description Broadens visibility of often-overriden members like `sparkConf` and `afterEach` from `protected` to `public`. This is required to mixin the `InMemoryTestTableMixinin` some places in the codebase, where e.g. `FooSuite` has overriden `sparkConf` and accidentally broadened visibility to public. ## How was this patch tested? This is a test-only change. ## Does this PR introduce _any_ user-facing changes? No --------- Co-authored-by: Matthis Gördel <matthis.goerdel@databricks.com>
…elta-io#6809) #### Which Delta project/connector is this regarding? - [ ] Spark - [ ] Standalone - [x] Flink - [ ] Kernel - [ ] Other ## Description Step 1 of the Flink upsert sink stack. This PR adds the public configuration and shared strategy contracts that the implementation PR builds on: - `DeltaSinkConf` primary-key ordinal configuration. - `MergeStrategy`, the sink-side contract for recording upserts/deletes and producing Delta actions. - `RowLocator`, the pluggable contract for finding candidate files by primary key. - `Upsert`, the abstract base class that owns per-checkpoint primary-key bookkeeping and delegates file lookup to a `RowLocator` plus row deletion materialization to concrete subclasses. This PR intentionally does not add concrete locators or rewrite strategies. Those live in delta-io#6818. ## How was this patch tested? - `build/sbt "flink / Compile / doc"` ## Does this PR introduce _any_ user-facing changes? No. --------- Co-authored-by: Hao Jiang <haojiang@Haos-MacBook-Air.local>
…nterface (delta-io#6792) #### Which Delta project/connector is this regarding? - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description Simplify `UCClientFactory` from a multi-method interface (`createUCClient(uri, authConfig)`, `createUCClientWithVersions(...)`, plus Java overloads) to a single unified method: ```scala def createUCClient(ucConfig: Map[String, String]): UCClient ``` **Why?** The previous design required callers to assemble separate `uri`, `authConfig`, and `appVersions` parameters. It also had a compile-time dependency on `UCDeltaTokenBasedRestClient`. This change: 1. Passes a single flat `ucConfig` map containing all keys (`uri`, `auth.*`, `deltaRestApi.enabled`, `ucclient.impl`, `appVersions.*`). 2. Uses reflection (`Utils.classForName`) to load the `UCClient` implementation at runtime, removing the compile-time dependency on `UCDeltaTokenBasedRestClient`. 3. Selects the implementation via `ucclient.impl` key, or `deltaRestApi.enabled` flag, defaulting to `UCTokenBasedRestClient`. ### Changes by file - **`UCCommitCoordinatorBuilder.scala`** - `UCClientFactory` trait: `createUCClient(ucConfig: Map[String, String])`. - `UCTokenBasedRestClientFactory`: single `createUCClient` that extracts URI, auth, impl class, and app versions from `ucConfig`. Reflection-based instantiation. - `getCatalogConfigs`: simplified from `(name, uri, authConfigMap)` to `(name, ucConfig)` — collects all sub-keys under `spark.sql.catalog.<name>.*` into a flat map. - `UCCatalogConfig`: changed from `(catalogName, uri, authConfig)` to `(catalogName, ucConfig)` with computed `uri` and `authConfig` accessors. - **`CreateTableBuilder.java` / `SnapshotManagerFactory.java`**: use `UCTableInfo.toUcConfig()` to build the flat config map. - **`UCTableInfo.java`**: added `toUcConfig()` to centralize the uri + auth config assembly. ## How was this patch tested? - `UCCommitCoordinatorBuilderSuite` — 14 tests pass (7 pre-existing Mockito/Java 21 ByteBuddy failures unrelated to this change). - `UCCommitCoordinatorClientSuite` — 11/11 pass. - New unit tests added for `extractAuthConfig` (auth.* preference over legacy token, legacy fallback) and `extractAppVersions` (merges caller-supplied entries with defaults). - scalastyle, javafmt, checkstyle all clean. ## Does this PR introduce _any_ user-facing changes? No. All changed APIs (`UCClientFactory`, `UCTokenBasedRestClientFactory`, `UCCatalogConfig`, `getCatalogConfigs`) are internal to the Delta coordinated commits module. No user-facing configs or behaviors are changed. --------- Signed-off-by: openinx <openinx@gmail.com> Co-authored-by: Timothy Wang <timothy.art@gmail.com>
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [ ] Spark - [ ] Standalone - [x] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> ## How was this patch tested? UT <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. -->
- Rename resolveThreePartName to requireThreePartName: the method validates and unpacks, it does not resolve. requireThreePartName matches the requireNonNull idiom. - Drop the per-field Objects.requireNonNull storm inside fromDeltaCommit and from the outer loop's version unbox. The SDK marks every DeltaCommit getter @nonnull; matching the sibling fromDeltaCommitInfo (which trusts the SDK) keeps the two helpers symmetric. - Import java.util.Arrays instead of fully-qualifying inside requireThreePartName. - Move scala.jdk.CollectionConverters._ into its own scala.* import group between java.* and the third-party block (scalafmt order). - Extend the getCommits null-parameter test to cover startVersion and endVersion as well, matching the five requireNonNull calls in the method body. - Assert message contents on the getCommits UUID-mismatch test (qualified table name plus both UUIDs), matching the assertion shape used by the loadTable 404 test.
Per openinx r3263890890: prefer plain String comparison over round-tripping tableId through UUID.fromString. The UC delta spec canonicalizes the UUID form so both sides produce the same string. Drops the upfront UUID.fromString validation step and keeps the mismatch error message in terms of the strings the caller passed in.
…ta-io#6818) ## Summary Step 2 of the Flink upsert sink stack. This PR depends on delta-io#6809 and adds the concrete merge-strategy implementation pieces and tests. `Upsert` itself now lives in delta-io#6809. Relative to that interface/base-strategy PR, this change supplies the concrete locators and rewrite strategies that plug into it. ## Changes - Adds `ExpressionUtils` helpers for building Kernel row `IN` predicates. - Adds `DeltaTable.scan(Predicate)` for row-locator candidate-file scans. - Adds `AppendOnly`, `ScanLocator`, `CoWUpsert`, and placeholder `IndexLocator` / `MoRUpsert` strategies. - Wires concrete upsert strategies to use `ScanLocator` through the `Upsert` base class from step 1. - Adds focused tests for `ScanLocator` and copy-on-write upsert file rewrites. ## Validation - `build/sbt "flink / Test / compile"` - `build/sbt "flink / Compile / doc"` - `build/sbt "flink/testOnly io.delta.flink.sink.mergestrategy.*"` (14 passed) --------- Co-authored-by: Hao Jiang <haojiang@Haos-MacBook-Air.local>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which Delta project/connector is this regarding?
Description
Stacked on delta-io#6788.
This PR implements
getCommitsinUCDeltaTokenBasedRestClientnow that the lower PR passes the Unity Catalog table identifier through theUCClient#getCommitspath.The implementation loads the UC Delta table by
catalog.schema.table, validates that the returned table UUID matches the requested table ID, converts returned UCDeltaCommitentries into storageCommitentries, and applies the optional start/end version filters.Stack diff:
728930d4ad8a122a4952c71ea4093a30ba81a46c29c699c52UCDeltaTokenBasedRestClient.java,UCDeltaTokenBasedRestClientSuite.scalaHow was this patch tested?
Does this PR introduce any user-facing changes?
No.