Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 115 additions & 14 deletions PROTOCOL.md

Large diffs are not rendered by default.

50 changes: 40 additions & 10 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -480,16 +480,11 @@ lazy val sparkV2 = {
libraryDependencies ++= Seq(
"io.delta" % "delta-kernel-api" % v,
"io.delta" % "delta-kernel-defaults" % v,
"io.delta" % "delta-kernel-unitycatalog" % v
),
// Kernel main classes are pulled from Maven at version `v`, but several
// sparkV2 tests depend on test-only helpers (e.g. InMemoryUCClient,
// UCCatalogManagedTestUtils) that are not published. Build those test
// jars from the in-tree kernel sources and add them to the test classpath.
Test / unmanagedJars ++= Seq(
(kernelApi / Test / packageBin).value,
(kernelDefaults / Test / packageBin).value,
(kernelUnityCatalog / Test / packageBin).value
"io.delta" % "delta-kernel-unitycatalog" % v,
// sparkV2 tests depend on UC test helpers (InMemoryUCClient,
// UCCatalogManagedTestUtils) that live in kernel-unitycatalog's test sources.
// Consume them via the published tests-classifier jar.
"io.delta" % "delta-kernel-unitycatalog" % v % Test classifier "tests"
)
)
}
Expand Down Expand Up @@ -791,6 +786,23 @@ val unityCatalogVersion: String = sys.props.getOrElse(
if (useDefaultUnityCatalogReleaseVersion) defaultUnityCatalogReleaseVersion
else unityCatalogReleaseVersion.getOrElse(pinnedUnityCatalogVersion))

/**
* Returns true when `current` is at least `target`. Numeric segments only; suffix after
* the first `-` (e.g. `-SNAPSHOT-abc1234`) is stripped before comparison.
*/
def isAtLeastVersion(current: String, target: String): Boolean = {
def parts(v: String): Seq[Int] =
v.takeWhile(_ != '-').split('.').iterator
.map(p => scala.util.Try(p.toInt).getOrElse(0)).toSeq
val cur = parts(current)
val tgt = parts(target)
val n = math.max(cur.length, tgt.length)
(0 until n).iterator
.map(i => (cur.lift(i).getOrElse(0), tgt.lift(i).getOrElse(0)))
.find { case (a, b) => a != b }
.forall { case (a, b) => a >= b }
}

val sparkUnityCatalogJacksonVersion = "2.15.4" // We are using Spark 4.0's Jackson version 2.15.x, to override Unity Catalog 0.3.0's version 2.18.x

// Publishes the pinned UC jars to ~/.ivy2/local if they're not already cached there. Hooked
Expand Down Expand Up @@ -1169,6 +1181,11 @@ lazy val kernelUnityCatalog = (project in file("kernel/unitycatalog"))
// Publish the pinned UC jars before sbt tries to resolve them.
update := update.dependsOn(ensurePinnedUnityCatalog).value,

// Also publish a test-jar (classifier = "tests") so consumers (e.g. sparkV2 in
// Maven mode) can depend on UC test helpers (InMemoryUCClient,
// UCCatalogManagedTestUtils) via a published artifact.
Test / publishArtifact := true,

// Put the shaded kernel-api JAR on the classpath (compile & test)
Compile / unmanagedJars += (kernelApi / Compile / packageBin).value,
Test / unmanagedJars += (kernelApi / Compile / packageBin).value,
Expand Down Expand Up @@ -1217,6 +1234,19 @@ lazy val storage = (project in file("storage"))
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
// Jackson datatype module needed for UC SDK tests (excluded from main compile scope)
"com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % "2.15.4" % "test",
) ++ (
// unitycatalog-hadoop ships from UC 0.5.0 onward; older versions don't publish the
// artifact, so resolving it would fail. Used by UCDeltaTokenBasedRestClient for
// credential vending via UCCredentialHadoopConfs.
if (isAtLeastVersion(unityCatalogVersion, "0.5.0")) {
Seq("io.unitycatalog" % "unitycatalog-hadoop" % unityCatalogVersion excludeAll(
ExclusionRule(organization = "org.openapitools"),
ExclusionRule(organization = "com.fasterxml.jackson.core"),
ExclusionRule(organization = "com.fasterxml.jackson.module"),
ExclusionRule(organization = "com.fasterxml.jackson.datatype"),
ExclusionRule(organization = "com.fasterxml.jackson.dataformat")
))
} else Nil
),

// Publish the pinned UC jars before sbt tries to resolve them. storage is the transitive
Expand Down
29 changes: 29 additions & 0 deletions flink/skills.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Flink Connector Development Skills

## Pull Request Requirements

### Repository
- All newly created PRs must target the repository:
- `github.com/delta-io/delta`

### Pre-push Validation
Before pushing any PR, always run the following sbt tasks successfully:

```bash
build/sbt "flink / javafmt"
build/sbt "flink / Test / javafmt"
build/sbt "flink / test"
build/sbt "flink / Compile / doc"
```

### PR Tracking
For every newly created PR:
- Add the PR link to the tracking issue:
- https://github.com/delta-io/delta/issues/5901

## Development Expectations

- Ensure formatting is clean before push.
- Ensure all Flink tests pass locally before opening or updating a PR.
- Ensure generated documentation compiles successfully.
- Keep PR descriptions clear and scoped to a single logical change whenever possible.
78 changes: 78 additions & 0 deletions flink/src/main/java/io/delta/flink/kernel/ExpressionUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright (2026) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.flink.kernel;

import io.delta.kernel.expressions.*;
import java.util.List;
import java.util.stream.Collectors;

/**
* Helpers for building Kernel {@link Predicate} expressions that aren't natively expressible in
* Kernel's expression API. Currently this is just the row-{@code IN} builder used to construct
* partition / primary-key membership filters in the upsert merge path; expand as needed when other
* compound expressions earn their own helper.
*/
public class ExpressionUtils {
/**
* Generate a predicate equivalent to <code>(colNames) IN (values)</code>. Empty {@code values} →
* {@link AlwaysFalse#ALWAYS_FALSE}. Single-column → native {@link In} for engine pushdown.
* Multi-column → left-deep OR-of-ANDs ({@code (c1=v11 AND ...) OR (c1=v21 AND ...) OR ...}),
* since Kernel has no row-IN expression.
*
* @param colNames left-side tuple of column names; must be non-empty
* @param values right-side list of value tuples; each row must have the same arity as {@code
* colNames}
* @throws IllegalArgumentException if {@code colNames} is empty or any tuple arity mismatches
*/
public static Predicate in(List<String> colNames, List<List<Literal>> values) {
if (colNames.isEmpty()) {
throw new IllegalArgumentException("in() requires at least one column");
}
if (values.isEmpty()) {
return AlwaysFalse.ALWAYS_FALSE;
}
for (List<Literal> row : values) {
if (row.size() != colNames.size()) {
throw new IllegalArgumentException(
"Value tuple arity " + row.size() + " does not match column count " + colNames.size());
}
}

// Single-column fast path: use the native IN predicate (the engine can push it down better
// than an OR chain).
if (colNames.size() == 1) {
Column col = new Column(colNames.get(0));
List<Expression> literals =
values.stream().map(row -> (Expression) row.get(0)).collect(Collectors.toList());
return new In(col, literals);
}

// Multi-column: build (c1=v11 AND c2=v12 AND ...) OR (c1=v21 AND c2=v22 AND ...) OR ...
// We accumulate left-deep AND/OR trees rather than building everything as a flat predicate,
// because Kernel's And/Or only support binary children.
Predicate result = null;
for (List<Literal> row : values) {
Predicate rowEq = null;
for (int i = 0; i < colNames.size(); i++) {
Predicate eq = new Predicate("=", new Column(colNames.get(i)), row.get(i));
rowEq = (rowEq == null) ? eq : new And(rowEq, eq);
}
result = (result == null) ? rowEq : new Or(result, rowEq);
}
return result;
}
}
67 changes: 48 additions & 19 deletions flink/src/main/java/io/delta/flink/sink/DeltaCommitter.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
/*
* Copyright (2026) The Delta Lake Project Authors.
* Copyright (2026) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.flink.sink;
Expand All @@ -26,6 +26,7 @@
import java.util.*;
import java.util.stream.Collectors;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -71,6 +72,8 @@ public class DeltaCommitter implements Committer<DeltaCommittable> {
private final DeltaTable deltaTable;
private final DeltaSinkConf conf;
private final SinkCommitterMetricGroup metricGroup;
private final Counter numActionsCommittedCounter;
private volatile long lastCommitDurationMs;

private DeltaCommitter(
String jobId,
Expand All @@ -85,6 +88,8 @@ private DeltaCommitter(
this.deltaTable = deltaTable;
this.conf = conf;
this.metricGroup = metricGroup;
this.numActionsCommittedCounter = metricGroup.counter("numActionsCommitted");
metricGroup.gauge("lastCommitDurationMs", () -> lastCommitDurationMs);
}

@Override
Expand All @@ -107,6 +112,7 @@ private void commitForSingleCheckpointId(
return;
}
LOG.debug("Committing {} committables on checkpoint {}", committables.size(), checkpointId);
metricGroup.getNumCommittablesTotalCounter().inc(committables.size());

deltaTable.refresh();
StructType latestSchema = deltaTable.getSchema();
Expand All @@ -116,6 +122,12 @@ private void commitForSingleCheckpointId(
"Invalid schema evolution observed. Sink schema: {}, latest table schema: {}",
conf.getSinkSchema(),
latestSchema);
committables.forEach(
c -> {
c.signalFailedWithKnownReason(
new IllegalStateException("Invalid schema evolution observed"));
metricGroup.getNumCommittablesFailureCounter().inc();
});
throw new IllegalStateException("Invalid schema evolution observed, aborting committing");
}

Expand All @@ -137,6 +149,10 @@ private void commitForSingleCheckpointId(
a[1] = Math.max(a[1], b[1]);
return a;
});

long numActions =
committables.stream().mapToLong(req -> req.getCommittable().getDeltaActions().size()).sum();

final CloseableIterable<Row> dataActions =
new CloseableIterable<Row>() {
@Override
Expand All @@ -152,15 +168,28 @@ public void close() throws IOException {
// Nothing to close
}
};
deltaTable.commit(
dataActions,
committerId,
checkpointId,
Map.of(
"flink.low-watermark",
String.valueOf(watermarks[0]),
"flink.high-watermark",
String.valueOf(watermarks[1])));

long startMs = System.currentTimeMillis();
try {
deltaTable.commit(
dataActions,
committerId,
checkpointId,
Map.of(
"flink.low-watermark",
String.valueOf(watermarks[0]),
"flink.high-watermark",
String.valueOf(watermarks[1])));
lastCommitDurationMs = System.currentTimeMillis() - startMs;

numActionsCommittedCounter.inc(numActions);
metricGroup.getNumCommittablesSuccessCounter().inc(committables.size());
} catch (Exception e) {
lastCommitDurationMs = System.currentTimeMillis() - startMs;
metricGroup.getNumCommittablesFailureCounter().inc(committables.size());
committables.forEach(c -> c.signalFailedWithKnownReason(e));
throw e;
}
}

private TreeMap<Long, List<CommitRequest<DeltaCommittable>>> sortCommittablesByCheckpointId(
Expand Down
Loading