diff --git a/flink/skills.md b/flink/skills.md
new file mode 100644
index 00000000000..7865e54405b
--- /dev/null
+++ b/flink/skills.md
@@ -0,0 +1,29 @@
+# Flink Connector Development Skills
+
+## Pull Request Requirements
+
+### Repository
+- All newly created PRs must target the repository:
+ - `github.com/delta-io/delta`
+
+### Pre-push Validation
+Before pushing any PR, always run the following sbt tasks successfully:
+
+```bash
+build/sbt "flink / javafmt"
+build/sbt "flink / Test / javafmt"
+build/sbt "flink / test"
+build/sbt "flink / Compile / doc"
+```
+
+### PR Tracking
+For every newly created PR:
+- Add the PR link to the tracking issue:
+ - https://github.com/delta-io/delta/issues/5901
+
+## Development Expectations
+
+- Ensure formatting is clean before push.
+- Ensure all Flink tests pass locally before opening or updating a PR.
+- Ensure generated documentation compiles successfully.
+- Keep PR descriptions clear and scoped to a single logical change whenever possible.
diff --git a/flink/src/main/java/io/delta/flink/kernel/ColumnVectorUtils.java b/flink/src/main/java/io/delta/flink/kernel/ColumnVectorUtils.java
index fa47be02e28..7f047cd4a89 100644
--- a/flink/src/main/java/io/delta/flink/kernel/ColumnVectorUtils.java
+++ b/flink/src/main/java/io/delta/flink/kernel/ColumnVectorUtils.java
@@ -19,15 +19,77 @@
import io.delta.kernel.data.ColumnVector;
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.data.FilteredColumnarBatch;
-import io.delta.kernel.types.BooleanType;
-import io.delta.kernel.types.DataType;
-import io.delta.kernel.types.StructType;
+import io.delta.kernel.types.*;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Predicate;
+/**
+ * Static helpers for Kernel {@link ColumnVector} / {@link ColumnarBatch} / {@link
+ * FilteredColumnarBatch}: typed single-cell read ({@link #get}), batch wrapping ({@link #wrap},
+ * {@link #child}, {@link #notNullAt}), and selection-vector construction ({@link #filter}, {@link
+ * #notNull}).
+ *
+ *
The selection-vector convention is {@code true = keep, false = drop} — matches {@code
+ * FilteredColumnarBatch}.
+ */
public class ColumnVectorUtils {
+ /**
+ * Returns the value at {@code rowId} from {@code input} as a plain Java object.
+ *
+ *
Mirrors the type coverage of {@link
+ * io.delta.flink.sink.Conversions.DeltaToJava#data(StructType, io.delta.kernel.data.Row, int)} —
+ * only primitive Kernel types are supported. Complex types (struct, array, map) throw {@link
+ * UnsupportedOperationException}; the caller is expected to descend into them explicitly via
+ * {@link ColumnVector#getChild(int)}, {@link ColumnVector#getArray(int)}, or {@link
+ * ColumnVector#getMap(int)}.
+ *
+ * @param input column vector to read from
+ * @param rowId 0-based row index within {@code input}
+ * @return the value at {@code (input, rowId)}; {@code null} if the cell is null
+ * @throws UnsupportedOperationException for complex types
+ */
+ public static Object get(ColumnVector input, int rowId) {
+ if (input.isNullAt(rowId)) {
+ return null;
+ }
+ DataType type = input.getDataType();
+ if (type.equivalent(BooleanType.BOOLEAN)) {
+ return input.getBoolean(rowId);
+ } else if (type.equivalent(ByteType.BYTE)) {
+ return input.getByte(rowId);
+ } else if (type.equivalent(ShortType.SHORT)) {
+ return input.getShort(rowId);
+ } else if (type.equivalent(IntegerType.INTEGER)) {
+ return input.getInt(rowId);
+ } else if (type.equivalent(LongType.LONG)) {
+ return input.getLong(rowId);
+ } else if (type.equivalent(FloatType.FLOAT)) {
+ return input.getFloat(rowId);
+ } else if (type.equivalent(DoubleType.DOUBLE)) {
+ return input.getDouble(rowId);
+ } else if (type.equivalent(StringType.STRING)) {
+ return input.getString(rowId);
+ } else if (type.equivalent(BinaryType.BINARY)) {
+ return input.getBinary(rowId);
+ } else if (type.equivalent(DateType.DATE)) {
+ // Days since 1970-01-01
+ return input.getInt(rowId);
+ } else if (type.equivalent(TimestampType.TIMESTAMP)) {
+ // Microseconds since epoch, UTC
+ return input.getLong(rowId);
+ } else if (type.equivalent(TimestampNTZType.TIMESTAMP_NTZ)) {
+ // Microseconds since epoch, no time zone
+ return input.getLong(rowId);
+ } else if (type instanceof DecimalType) {
+ return input.getDecimal(rowId);
+ } else {
+ // StructType, ArrayType, MapType — caller must navigate these explicitly.
+ throw new UnsupportedOperationException("Unsupported column vector type: " + type);
+ }
+ }
+
public static FilteredColumnarBatch wrap(ColumnarBatch data) {
return new FilteredColumnarBatch(data, Optional.empty());
}
diff --git a/flink/src/main/java/io/delta/flink/kernel/ExpressionUtils.java b/flink/src/main/java/io/delta/flink/kernel/ExpressionUtils.java
new file mode 100644
index 00000000000..bde0e964185
--- /dev/null
+++ b/flink/src/main/java/io/delta/flink/kernel/ExpressionUtils.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright (2026) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.delta.flink.kernel;
+
+import io.delta.kernel.expressions.*;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Helpers for building Kernel {@link Predicate} expressions that aren't natively expressible in
+ * Kernel's expression API. Currently this is just the row-{@code IN} builder used to construct
+ * partition / primary-key membership filters in the upsert merge path; expand as needed when other
+ * compound expressions earn their own helper.
+ */
+public class ExpressionUtils {
+ /**
+ * Generate a predicate equivalent to (colNames) IN (values). Empty {@code values} →
+ * {@link AlwaysFalse#ALWAYS_FALSE}. Single-column → native {@link In} for engine pushdown.
+ * Multi-column → left-deep OR-of-ANDs ({@code (c1=v11 AND ...) OR (c1=v21 AND ...) OR ...}),
+ * since Kernel has no row-IN expression.
+ *
+ * @param colNames left-side tuple of column names; must be non-empty
+ * @param values right-side list of value tuples; each row must have the same arity as {@code
+ * colNames}
+ * @throws IllegalArgumentException if {@code colNames} is empty or any tuple arity mismatches
+ */
+ public static Predicate in(List colNames, List> values) {
+ if (colNames.isEmpty()) {
+ throw new IllegalArgumentException("in() requires at least one column");
+ }
+ if (values.isEmpty()) {
+ return AlwaysFalse.ALWAYS_FALSE;
+ }
+ for (List row : values) {
+ if (row.size() != colNames.size()) {
+ throw new IllegalArgumentException(
+ "Value tuple arity " + row.size() + " does not match column count " + colNames.size());
+ }
+ }
+
+ // Single-column fast path: use the native IN predicate (the engine can push it down better
+ // than an OR chain).
+ if (colNames.size() == 1) {
+ Column col = new Column(colNames.get(0));
+ List literals =
+ values.stream().map(row -> (Expression) row.get(0)).collect(Collectors.toList());
+ return new In(col, literals);
+ }
+
+ // Multi-column: build (c1=v11 AND c2=v12 AND ...) OR (c1=v21 AND c2=v22 AND ...) OR ...
+ // We accumulate left-deep AND/OR trees rather than building everything as a flat predicate,
+ // because Kernel's And/Or only support binary children.
+ Predicate result = null;
+ for (List row : values) {
+ Predicate rowEq = null;
+ for (int i = 0; i < colNames.size(); i++) {
+ Predicate eq = new Predicate("=", new Column(colNames.get(i)), row.get(i));
+ rowEq = (rowEq == null) ? eq : new And(rowEq, eq);
+ }
+ result = (result == null) ? rowEq : new Or(result, rowEq);
+ }
+ return result;
+ }
+}
diff --git a/flink/src/main/java/io/delta/flink/sink/Conversions.java b/flink/src/main/java/io/delta/flink/sink/Conversions.java
index e21f2226c1e..687fda3a3fd 100644
--- a/flink/src/main/java/io/delta/flink/sink/Conversions.java
+++ b/flink/src/main/java/io/delta/flink/sink/Conversions.java
@@ -16,16 +16,18 @@
package io.delta.flink.sink;
+import io.delta.kernel.data.Row;
import io.delta.kernel.expressions.Literal;
-import io.delta.kernel.types.DataType;
-import io.delta.kernel.types.StructField;
-import io.delta.kernel.types.StructType;
+import io.delta.kernel.types.*;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.*;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.MapType;
/**
* Provide conversions between Flink and Delta data structures. This now includes:
@@ -235,4 +237,60 @@ public static Object data(RowType rowType, RowData rowData, int colIdx) {
}
}
}
+
+ /** Convert Delta Kernel data to plain Java objects. */
+ public static class DeltaToJava {
+ /**
+ * Convert the value at column {@code colIdx} of a Kernel {@link Row} to a plain Java object.
+ *
+ * Supports primitive types only. Complex types (struct, array, map) throw {@link
+ * UnsupportedOperationException}.
+ *
+ * @param schema row schema
+ * @param rowData Kernel row
+ * @param colIdx column ordinal
+ * @return the column value as a Java object, or {@code null} if the column is null at this row
+ * @throws UnsupportedOperationException for complex types (struct, array, map)
+ */
+ public static Object data(StructType schema, Row rowData, int colIdx) {
+ DataType dataType = schema.fields().get(colIdx).getDataType();
+ if (rowData.isNullAt(colIdx)) {
+ return null;
+ }
+ if (dataType.equivalent(io.delta.kernel.types.BooleanType.BOOLEAN)) {
+ return rowData.getBoolean(colIdx);
+ } else if (dataType.equivalent(ByteType.BYTE)) {
+ return rowData.getByte(colIdx);
+ } else if (dataType.equivalent(ShortType.SHORT)) {
+ return rowData.getShort(colIdx);
+ } else if (dataType.equivalent(IntegerType.INTEGER)) {
+ return rowData.getInt(colIdx);
+ } else if (dataType.equivalent(LongType.LONG)) {
+ return rowData.getLong(colIdx);
+ } else if (dataType.equivalent(io.delta.kernel.types.FloatType.FLOAT)) {
+ return rowData.getFloat(colIdx);
+ } else if (dataType.equivalent(io.delta.kernel.types.DoubleType.DOUBLE)) {
+ return rowData.getDouble(colIdx);
+ } else if (dataType.equivalent(StringType.STRING)) {
+ return rowData.getString(colIdx);
+ } else if (dataType.equivalent(io.delta.kernel.types.BinaryType.BINARY)) {
+ return rowData.getBinary(colIdx);
+ } else if (dataType.equivalent(io.delta.kernel.types.DateType.DATE)) {
+ // Days since 1970-01-01
+ return rowData.getInt(colIdx);
+ } else if (dataType.equivalent(io.delta.kernel.types.TimestampType.TIMESTAMP)) {
+ // Microseconds since epoch, UTC
+ return rowData.getLong(colIdx);
+ } else if (dataType.equivalent(TimestampNTZType.TIMESTAMP_NTZ)) {
+ // Microseconds since epoch, no time zone
+ return rowData.getLong(colIdx);
+ } else if (dataType instanceof io.delta.kernel.types.DecimalType) {
+ return rowData.getDecimal(colIdx);
+ } else {
+ // StructType, ArrayType, MapType — out of scope per spec.
+ throw new UnsupportedOperationException(
+ "DeltaToJava.data does not support complex type: " + dataType);
+ }
+ }
+ }
}
diff --git a/flink/src/main/java/io/delta/flink/sink/DeltaSink.java b/flink/src/main/java/io/delta/flink/sink/DeltaSink.java
index 20539405d88..f200b4d55fa 100644
--- a/flink/src/main/java/io/delta/flink/sink/DeltaSink.java
+++ b/flink/src/main/java/io/delta/flink/sink/DeltaSink.java
@@ -21,6 +21,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.stream.Collectors;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.*;
import org.apache.flink.core.io.SimpleVersionedSerializer;
@@ -249,6 +250,37 @@ public Builder withConfigurations(Map configurations) {
return this;
}
+ /**
+ * Sets the sink {@code write.mode}. Use {@link DeltaSinkConf.WriteMode#APPEND} (default) or
+ * {@link DeltaSinkConf.WriteMode#UPSERT}.
+ *
+ * Sink-only — does not propagate to the Delta table's persisted properties.
+ */
+ public Builder withWriteMode(DeltaSinkConf.WriteMode writeMode) {
+ this.configurations.put(DeltaSinkConf.WRITE_MODE.key(), writeMode.name());
+ return this;
+ }
+
+ /**
+ * Sets the sink primary-key columns by 0-based ordinal in the Flink {@code RowType} (required
+ * for {@code write.mode = "upsert"}).
+ *
+ *
Callers are responsible for resolving column names to ordinals against the schema they
+ * pass via {@link #withFlinkSchema(RowType)}. The Flink SQL path performs this resolution in
+ * {@link io.delta.flink.sink.sql.DeltaDynamicTableSink}; DataStream API callers can resolve
+ * with {@code flinkSchema.getFieldIndex(columnName)}.
+ *
+ *
Sink-only — does not propagate to the Delta table's persisted properties.
+ */
+ public Builder withPrimaryKey(List primaryKeyOrdinals) {
+ if (primaryKeyOrdinals != null && !primaryKeyOrdinals.isEmpty()) {
+ this.configurations.put(
+ DeltaSinkConf.PRIMARY_KEY.key(),
+ primaryKeyOrdinals.stream().map(String::valueOf).collect(Collectors.joining(",")));
+ }
+ return this;
+ }
+
public DeltaSink build() {
Objects.requireNonNull(flinkSchema, "flinkSchema must not be null");
if (configurations == null) {
diff --git a/flink/src/main/java/io/delta/flink/sink/DeltaSinkConf.java b/flink/src/main/java/io/delta/flink/sink/DeltaSinkConf.java
index 0dc43b427f5..6a1e3b04174 100644
--- a/flink/src/main/java/io/delta/flink/sink/DeltaSinkConf.java
+++ b/flink/src/main/java/io/delta/flink/sink/DeltaSinkConf.java
@@ -16,10 +16,14 @@
package io.delta.flink.sink;
+import io.delta.flink.sink.mergestrategy.AppendOnly;
+import io.delta.flink.sink.mergestrategy.CoWUpsert;
+import io.delta.flink.sink.mergestrategy.Upsert;
import io.delta.kernel.internal.types.DataTypeJsonSerDe;
import io.delta.kernel.types.StructField;
import io.delta.kernel.types.StructType;
import java.io.Serializable;
+import java.util.Locale;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -114,6 +118,56 @@ public class DeltaSinkConf implements Serializable {
"Schema evolution policy: 'no' disallows any change; 'newcolumn' allows adding new "
+ "columns only.");
+ /**
+ * Write mode controlling how the sink interprets the incoming changelog.
+ *
+ * Supported values match the {@link WriteMode} enum constants (case-insensitive):
+ *
+ *
+ * {@code "append"} (default): treat every row as an INSERT. Compatible with {@code
+ * ChangelogMode.insertOnly()}.
+ * {@code "upsert"}: use the row {@code RowKind} together with a declared primary key to
+ * merge incoming changes into the table. {@code INSERT}/{@code UPDATE_AFTER} rows replace
+ * any existing row with the same primary key; {@code DELETE} rows remove the matching row.
+ * {@code UPDATE_BEFORE} rows are ignored.
+ *
+ *
+ * {@code upsert} mode requires a primary key declared on the Flink table (or supplied via
+ * {@link #PRIMARY_KEY}).
+ */
+ public static final ConfigOption WRITE_MODE =
+ ConfigOptions.key("write.mode")
+ .enumType(WriteMode.class)
+ .defaultValue(WriteMode.APPEND)
+ .withDescription(
+ "Write semantics. 'append' (default) treats all incoming rows as INSERTs. "
+ + "'upsert' merges incoming changes by primary key.");
+
+ /**
+ * Internal wire-format carrier for the primary-key column ordinals (0-based field indices into
+ * the sink schema) in {@code upsert} mode.
+ *
+ * Not a public option. Layers above the connector ({@link
+ * io.delta.flink.sink.sql.DeltaDynamicTableSink} for SQL and {@link DeltaSink.Builder} for the
+ * DataStream API) translate logical primary-key column names to ordinals against the resolved
+ * Flink schema and stamp the comma-separated ordinal list into this entry of the per-table
+ * configuration map. The map is serialized to TaskManagers, so this option must remain
+ * String-typed and round-trip cleanly.
+ *
+ *
Format: comma-separated, decimal, non-negative integers. Example: {@code "0,3"} declares a
+ * composite PK on field ordinals 0 and 3.
+ *
+ *
This option is intentionally not registered in {@code optionalOptions()} so that
+ * Flink SQL {@code WITH(...)} clauses cannot override the DDL primary key.
+ */
+ public static final ConfigOption PRIMARY_KEY =
+ ConfigOptions.key("primary_key")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Comma-separated primary key column ordinals. Resolved by upper layers "
+ + "from the table's PRIMARY KEY clause. Not a user-facing option.");
+
// ----------------------------------------------------------------------
// State
// ----------------------------------------------------------------------
@@ -123,6 +177,12 @@ public class DeltaSinkConf implements Serializable {
private final Map conf;
private final Configuration configuration;
private final SchemaEvolutionPolicy schemaEvolutionPolicy;
+ private final WriteMode writeMode;
+ /**
+ * Primary-key column ordinals stored as a primitive {@code int[]} to avoid per-row boxing on the
+ * writer's hot path. Treat as read-only; callers must not mutate the returned array.
+ */
+ private final int[] primaryKeyOrdinals;
private transient StructType sinkSchema;
@@ -141,7 +201,7 @@ public DeltaSinkConf(StructType sinkSchema, Map conf) {
// Materialize the raw Map into Flink Configuration for ConfigOption access.
this.configuration = Configuration.fromMap(conf);
- String mode = configuration.get(SCHEMA_EVOLUTION_MODE).toLowerCase();
+ String mode = configuration.get(SCHEMA_EVOLUTION_MODE).toLowerCase(Locale.ROOT);
switch (mode) {
case "newcolumn":
this.schemaEvolutionPolicy = new NewColumnEvolution();
@@ -152,6 +212,67 @@ public DeltaSinkConf(StructType sinkSchema, Map conf) {
default:
throw new IllegalArgumentException("unknown evolution mode:" + mode);
}
+
+ this.writeMode = configuration.get(WRITE_MODE);
+ this.primaryKeyOrdinals = parsePrimaryKeyOrdinals(configuration.get(PRIMARY_KEY), sinkSchema);
+ if (writeMode == WriteMode.UPSERT && primaryKeyOrdinals.length == 0) {
+ throw new IllegalArgumentException(
+ "write.mode = 'upsert' requires a non-empty primary key. Declare "
+ + "'PRIMARY KEY (...) NOT ENFORCED' on the Flink table (SQL), or call "
+ + "DeltaSink.Builder.withPrimaryKey(...) (DataStream API).");
+ }
+ }
+
+ /**
+ * Parses the comma-separated ordinal list stored under {@link #PRIMARY_KEY} into a primitive
+ * {@code int[]}, validating that each ordinal is in range {@code [0, schema.length())}.
+ *
+ * @param raw raw option value; may be null or empty
+ * @param schema sink schema, used to validate ordinal ranges
+ * @return primitive array of validated, in-range ordinals (length 0 if {@code raw} was empty)
+ * @throws IllegalArgumentException if a segment is not a valid integer or is out of range
+ */
+ private static int[] parsePrimaryKeyOrdinals(String raw, StructType schema) {
+ if (raw == null || raw.trim().isEmpty()) {
+ return new int[0];
+ }
+ int width = schema.length();
+ String[] segments = raw.split(",");
+ int[] tmp = new int[segments.length];
+ int n = 0;
+ for (String segment : segments) {
+ String trimmed = segment.trim();
+ if (trimmed.isEmpty()) {
+ continue;
+ }
+ int ordinal;
+ try {
+ ordinal = Integer.parseInt(trimmed);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException(
+ "Invalid primary-key ordinal '"
+ + trimmed
+ + "' in '"
+ + PRIMARY_KEY.key()
+ + "'. Expected a non-negative integer.",
+ e);
+ }
+ if (ordinal < 0 || ordinal >= width) {
+ throw new IllegalArgumentException(
+ "Primary-key ordinal "
+ + ordinal
+ + " is out of range for sink schema of width "
+ + width
+ + ".");
+ }
+ tmp[n++] = ordinal;
+ }
+ if (n == segments.length) {
+ return tmp;
+ }
+ int[] result = new int[n];
+ System.arraycopy(tmp, 0, result, 0, n);
+ return result;
}
/**
@@ -188,6 +309,18 @@ public StructType getSinkSchema() {
return this.sinkSchema;
}
+ /**
+ * Creates a fresh {@link MergeStrategy} for the configured {@link WriteMode}.
+ *
+ * Returns a new instance on every call — merge strategies hold per-checkpoint state and must
+ * not be shared across {@link DeltaSinkWriter} instances.
+ *
+ * @return {@link Upsert} when in upsert mode; {@link AppendOnly} otherwise
+ */
+ public MergeStrategy createMergeStrategy() {
+ return isUpsert() ? new CoWUpsert() : new AppendOnly();
+ }
+
/**
* Creates a {@link FileRollingStrategy} based on the per-table configuration.
*
@@ -215,6 +348,33 @@ public SchemaEvolutionPolicy getSchemaEvolutionPolicy() {
return schemaEvolutionPolicy;
}
+ /**
+ * Returns the configured write mode.
+ *
+ * @return the write mode (defaults to {@link WriteMode#APPEND})
+ */
+ public WriteMode getWriteMode() {
+ return writeMode;
+ }
+
+ /** Convenience: {@code true} iff {@link #getWriteMode()} is {@link WriteMode#UPSERT}. */
+ public boolean isUpsert() {
+ return writeMode == WriteMode.UPSERT;
+ }
+
+ /**
+ * Returns the primary-key column ordinals (0-based indices into the sink schema) as a primitive
+ * {@code int[]}.
+ *
+ *
Length 0 for {@link WriteMode#APPEND}; non-zero, validated, and in-range for {@link
+ * WriteMode#UPSERT}. The returned array is the live backing array; callers must not mutate
+ * it (kept as {@code int[]} rather than wrapping per call to avoid hot-path allocations in {@link
+ * io.delta.flink.sink.DeltaSinkWriter}).
+ */
+ public int[] getPrimaryKeyOrdinals() {
+ return primaryKeyOrdinals;
+ }
+
/**
* Returns the raw per-table configuration map.
*
@@ -349,6 +509,27 @@ public boolean shouldRoll(RowData row) {
}
}
+ // ----------------------------------------------------------------------
+ // Write mode
+ // ----------------------------------------------------------------------
+
+ /**
+ * Write semantics selected via {@link #WRITE_MODE}.
+ *
+ *
This is independent from Flink's {@link org.apache.flink.table.connector.ChangelogMode}; the
+ * dynamic table sink translates between the two.
+ */
+ public enum WriteMode {
+ /** Append-only writes; every row treated as INSERT regardless of {@code RowKind}. */
+ APPEND,
+ /**
+ * Upsert writes by primary key. {@code INSERT}/{@code UPDATE_AFTER} rows replace any existing
+ * row with the same primary key; {@code DELETE} rows remove the matching row; {@code
+ * UPDATE_BEFORE} rows are ignored.
+ */
+ UPSERT
+ }
+
/** Rolls files based on number of records written. */
static class CountRolling implements FileRollingStrategy {
diff --git a/flink/src/main/java/io/delta/flink/sink/DeltaSinkWriter.java b/flink/src/main/java/io/delta/flink/sink/DeltaSinkWriter.java
index 0302f3ae12c..1a685bb9918 100644
--- a/flink/src/main/java/io/delta/flink/sink/DeltaSinkWriter.java
+++ b/flink/src/main/java/io/delta/flink/sink/DeltaSinkWriter.java
@@ -22,7 +22,9 @@
import com.github.benmanes.caffeine.cache.RemovalListener;
import io.delta.flink.Conf;
import io.delta.flink.table.DeltaTable;
+import io.delta.kernel.data.Row;
import io.delta.kernel.expressions.Literal;
+import io.delta.kernel.types.StructType;
import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
@@ -79,6 +81,12 @@ public class DeltaSinkWriter
private final SinkWriterMetricGroup metricGroup;
+ /**
+ * Strategy that owns per-checkpoint upsert/delete bookkeeping and turns it into Delta actions.
+ * Selected once at construction from {@link DeltaSinkConf#getWriteMode()}.
+ */
+ private final MergeStrategy mergeStrategy;
+
private DeltaSinkWriter(
String jobId,
int subtaskId,
@@ -108,6 +116,12 @@ private DeltaSinkWriter(
.map(DeltaWriterTask::getResultBuffer)
.mapToLong(List::size)
.sum());
+
+ this.mergeStrategy = conf.createMergeStrategy();
+ LOG.debug(
+ "DeltaSinkWriter created in {} mode (primary-key ordinals = {})",
+ conf.getWriteMode(),
+ Arrays.toString(conf.getPrimaryKeyOrdinals()));
}
/**
@@ -128,13 +142,45 @@ public void write(RowData element, Context context) throws IOException, Interrup
partitionValues.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().toString()));
- writerTasksByPartition
- .get(
- writerKey,
- (key) ->
- new DeltaWriterTask(
- jobId, subtaskId, attemptNumber, deltaTable, conf, partitionValues))
- .write(element, context);
+ // "Trust the provided RowKind" upsert policy:
+ // - INSERT is taken at face value: the source claims this PK is new, so we just append.
+ // This keeps the hot path cheap for INSERT-heavy workloads (e.g. CDC bootstrap).
+ // Trade-off: if the source can redeliver an INSERT for a PK already in the table
+ // across a checkpoint boundary (operator-induced source replay, CDC re-snapshot,
+ // at-least-once source), the sink will produce duplicate rows for that PK. Flink's
+ // own failover within a checkpoint is still safe via the transactional committer.
+ // - UPDATE_AFTER carries a new image for an existing key. We record the PK so the
+ // merge step removes the pre-image, then fall through to the INSERT case to append
+ // the new image as a regular AddFile.
+ // - UPDATE_BEFORE conveys no information the matching UPDATE_AFTER doesn't already
+ // carry, so we drop it. Flink elides it for PK sinks anyway.
+ // - DELETE records the PK; the merge step emits the corresponding RemoveFile/DV
+ // without appending a row.
+ switch (element.getRowKind()) {
+ case UPDATE_AFTER:
+ mergeStrategy.recordUpsert(extractPrimaryKey(element), partitionValues);
+ // fall through: an UPDATE_AFTER is "record the PK for pre-image removal AND append
+ // the new image". The INSERT case below covers the append half.
+ case INSERT:
+ writerTasksByPartition
+ .get(
+ writerKey,
+ (key) ->
+ new DeltaWriterTask(
+ jobId, subtaskId, attemptNumber, deltaTable, conf, partitionValues))
+ .write(element, context);
+ break;
+ case UPDATE_BEFORE:
+ // Dropped — see policy comment above.
+ break;
+ case DELETE:
+ mergeStrategy.recordDelete(extractPrimaryKey(element), partitionValues);
+ break;
+ default:
+ // Defensive: if Flink ever introduces a new RowKind, we'd rather fail loudly than
+ // silently treat the row as a no-op while still incrementing the metric counters.
+ throw new IllegalStateException("Unexpected RowKind: " + element.getRowKind());
+ }
// Recording Metrics
if (element instanceof BinaryRowData) {
@@ -143,17 +189,51 @@ public void write(RowData element, Context context) throws IOException, Interrup
this.metricGroup.getNumRecordsSendCounter().inc();
}
+ /**
+ * Extracts the primary-key values of {@code row} as a {@code List} in PK column order.
+ *
+ * Uses {@link RowData#isNullAt} + {@link RowData}'s typed accessors so primitive types are
+ * boxed without going through the more expensive generic field access path.
+ */
+ private List extractPrimaryKey(RowData row) {
+ StructType schema = conf.getSinkSchema();
+ int[] ordinals = conf.getPrimaryKeyOrdinals();
+ List key = new ArrayList<>(ordinals.length);
+ for (int ord : ordinals) {
+ key.add(Conversions.FlinkToDelta.data(schema, row, ord));
+ }
+ return key;
+ }
+
@Override
public Collection prepareCommit() {
LOG.debug("Preparing commits");
writerTasksByPartition.invalidateAll();
+ runMergeStrategy();
+
List results = List.copyOf(completedWrites);
completedWrites.clear();
return results;
}
+ /**
+ * Invoke the configured {@link MergeStrategy} for the current checkpoint and append any returned
+ * actions to {@code completedWrites}. The strategy owns its own per-checkpoint state and is
+ * responsible for resetting it inside {@link MergeStrategy#merge}, so we don't need a guard here.
+ */
+ private void runMergeStrategy() {
+ try {
+ List extraActions = mergeStrategy.merge(deltaTable, conf);
+ if (!extraActions.isEmpty()) {
+ completedWrites.add(new DeltaWriterResult(extraActions, new WriterResultContext()));
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("merge failed for checkpoint", e);
+ }
+ }
+
@Override
public void flush(boolean endOfInput) {}
diff --git a/flink/src/main/java/io/delta/flink/sink/MergeStrategy.java b/flink/src/main/java/io/delta/flink/sink/MergeStrategy.java
new file mode 100644
index 00000000000..3130c620434
--- /dev/null
+++ b/flink/src/main/java/io/delta/flink/sink/MergeStrategy.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright (2026) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.delta.flink.sink;
+
+import io.delta.flink.table.DeltaTable;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.expressions.Literal;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Performing the row-level merge that turns an incoming upsert/delete stream into Delta {@code
+ * Add}/{@code Remove} file actions.
+ *
+ * A strategy owns the per-checkpoint bookkeeping it needs to do its job. {@link DeltaSinkWriter}
+ * pushes primary-key information into the strategy as rows arrive ({@link #recordUpsert} for {@code
+ * INSERT}/{@code UPDATE_AFTER}; {@link #recordDelete} for {@code DELETE}) and calls {@link #merge}
+ * once per checkpoint to materialize that bookkeeping into Delta actions and reset internal state.
+ *
+ *
Implementations are responsible for emitting:
+ *
+ *
+ * {@code RemoveFile} actions for any existing data file that contains rows whose PK matches
+ * an upserted or deleted PK;
+ * {@code AddFile} actions for any rewritten (kept) rows from those files.
+ *
+ *
+ * The newly-written upsert rows themselves are appended by the writer via the standard {@link
+ * DeltaTable#writeParquet} path and are not part of the strategy's responsibility.
+ *
+ *
Implementations must be {@link Serializable} so the writer can be shipped to TaskManagers.
+ * Implementations should keep their internal state empty between checkpoints (i.e. {@link #merge}
+ * must clear whatever {@link #recordUpsert} / {@link #recordDelete} accumulated).
+ */
+public interface MergeStrategy extends Serializable {
+
+ /**
+ * Records the primary key of an {@code INSERT} or {@code UPDATE_AFTER} row written during the
+ * current checkpoint, so the strategy can later remove any pre-existing row that shares this key.
+ * The new row itself has already been appended by the writer via {@link DeltaTable#writeParquet}.
+ *
+ * @param primaryKey the primary-key values, in {@link DeltaSinkConf#getPrimaryKeyOrdinals()}
+ * order; never {@code null} but individual elements may be {@code null}
+ * @param partitionValues the partition column values of the row; empty for unpartitioned tables
+ */
+ void recordUpsert(List primaryKey, Map partitionValues);
+
+ /**
+ * Records the primary key of a {@code DELETE} row observed during the current checkpoint. The
+ * {@code DELETE} row itself is not appended to the table — the strategy must remove the
+ * existing row with this key.
+ *
+ * @param primaryKey the primary-key values, in {@link DeltaSinkConf#getPrimaryKeyOrdinals()}
+ * order; never {@code null} but individual elements may be {@code null}
+ * @param partitionValues the partition column values of the row; empty for unpartitioned tables
+ */
+ void recordDelete(List primaryKey, Map partitionValues);
+
+ /**
+ * Resolves the recorded checkpoint work into Delta actions and clears internal state.
+ *
+ * If no records were observed since the last call, returns an empty list.
+ *
+ * @param table the target Delta table; implementations may use it to read the current snapshot
+ * and to write replacement data files via {@link DeltaTable#writeParquet}
+ * @param conf the sink configuration, providing the schema, primary-key columns, and rolling
+ * strategy
+ * @return additional actions to include in the commit. Empty if no existing files need
+ * modification.
+ * @throws IOException if reading existing files or writing replacement files fails
+ */
+ List merge(DeltaTable table, DeltaSinkConf conf) throws IOException;
+}
diff --git a/flink/src/main/java/io/delta/flink/sink/mergestrategy/AppendOnly.java b/flink/src/main/java/io/delta/flink/sink/mergestrategy/AppendOnly.java
new file mode 100644
index 00000000000..3840698ffa3
--- /dev/null
+++ b/flink/src/main/java/io/delta/flink/sink/mergestrategy/AppendOnly.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright (2026) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.delta.flink.sink.mergestrategy;
+
+import io.delta.flink.sink.DeltaSinkConf;
+import io.delta.flink.sink.MergeStrategy;
+import io.delta.flink.table.DeltaTable;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.expressions.Literal;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * No-op {@link MergeStrategy} used by {@link DeltaSinkConf.WriteMode#APPEND}.
+ *
+ * In append mode every incoming row is written via {@link DeltaTable#writeParquet} and no
+ * existing files need to be modified. The writer must never call {@link #recordUpsert} or {@link
+ * #recordDelete} in append mode; if it does, that's a programming error and we fail loudly.
+ */
+public class AppendOnly implements MergeStrategy {
+
+ @Override
+ public void recordUpsert(List primaryKey, Map partitionValues) {
+ throw new IllegalStateException(
+ "AppendOnlyMergeStrategy received an upsert record; this should only happen in "
+ + "upsert mode.");
+ }
+
+ @Override
+ public void recordDelete(List primaryKey, Map partitionValues) {
+ throw new IllegalStateException(
+ "AppendOnlyMergeStrategy received a delete record; this should only happen in "
+ + "upsert mode.");
+ }
+
+ @Override
+ public List merge(DeltaTable table, DeltaSinkConf conf) {
+ return Collections.emptyList();
+ }
+}
diff --git a/flink/src/main/java/io/delta/flink/sink/mergestrategy/CoWUpsert.java b/flink/src/main/java/io/delta/flink/sink/mergestrategy/CoWUpsert.java
new file mode 100644
index 00000000000..1a18a8a640f
--- /dev/null
+++ b/flink/src/main/java/io/delta/flink/sink/mergestrategy/CoWUpsert.java
@@ -0,0 +1,181 @@
+/*
+ * Copyright (2026) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.delta.flink.sink.mergestrategy;
+
+import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator;
+
+import io.delta.flink.kernel.ColumnVectorUtils;
+import io.delta.flink.table.DeltaTable;
+import io.delta.kernel.data.ColumnarBatch;
+import io.delta.kernel.data.FilteredColumnarBatch;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.expressions.Literal;
+import io.delta.kernel.internal.InternalScanFileUtils;
+import io.delta.kernel.internal.actions.AddFile;
+import io.delta.kernel.internal.actions.SingleAction;
+import io.delta.kernel.types.*;
+import io.delta.kernel.utils.CloseableIterator;
+import io.delta.kernel.utils.FileStatus;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.time.LocalDate;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.BiPredicate;
+
+/**
+ * Copy-on-write {@link Upsert}: for each candidate file, read it through Kernel, build a selection
+ * vector that masks out rows matching the deletion filter, write the survivors to a new Parquet
+ * file co-located with the source, and emit a {@code RemoveFile} for the source plus zero-or-more
+ * {@code AddFile}s for the rewrite.
+ *
+ * The {@code RemoveFile} is built via {@link
+ * io.delta.kernel.internal.actions.AddFile#toRemoveFileRow}, so it carries over the source's stats,
+ * partition values, tags, baseRowId, defaultRowCommitVersion and deletion vector — every field
+ * Delta requires for an {@code extendedFileMetadata} RemoveFile.
+ */
+public class CoWUpsert extends Upsert {
+
+ /**
+ * Read the source Parquet, mask out rows matching {@code filter} via a selection vector, stream
+ * the survivors through {@link DeltaTable#writeParquet} (co-located with the source), and prepend
+ * a {@code RemoveFile} for the source to the resulting {@code AddFile} stream. If every row
+ * matches the filter, no {@code AddFile} is emitted — just the {@code RemoveFile}.
+ *
+ * @throws UncheckedIOException if the Parquet read or write fails
+ */
+ @Override
+ protected CloseableIterator deleteRecords(
+ Row addFile, BiPredicate filter) {
+ Engine engine = table.getEngine();
+ final StructType schema = table.getSchema();
+
+ // Source file metadata from the scan file row.
+ final FileStatus source = InternalScanFileUtils.getAddFileStatus(addFile);
+ final Map partStrings = InternalScanFileUtils.getPartitionValues(addFile);
+ // addFile has the SCAN_FILE_SCHEMA shape (add, tableRoot), not the SingleAction shape.
+ // Use InternalScanFileUtils.ADD_FILE_ORDINAL — the ordinal of "add" within SCAN_FILE_SCHEMA.
+ final AddFile sourceAddFile =
+ new AddFile(addFile.getStruct(InternalScanFileUtils.ADD_FILE_ORDINAL));
+
+ // Convert serialized partition values to typed Literals — required by table.writeParquet for
+ // the new AddFile.
+ final Map partLiterals = toLiteralPartitionMap(schema, partStrings);
+
+ // Read → filter → write pipeline. The filter is applied lazily, batch-by-batch, via a
+ // selection vector so we don't materialize whole rows just to drop a few.
+ final CloseableIterator survivors;
+ final CloseableIterator addActions;
+ try {
+ survivors =
+ engine
+ .getParquetHandler()
+ .readParquetFiles(singletonCloseableIterator(source), schema, Optional.empty())
+ .map(result -> applyDeletionFilter(result.getData(), filter));
+
+ // Co-locate the rewrite with the source file so the new file inherits the same partition
+ // directory layout (Delta requires AddFile.partitionValues to match the path-encoded
+ // partition; staying in the same folder is the safest way to keep them consistent).
+ // ParquetFileWriter generates UUID-suffixed filenames so two rewrites of the same source
+ // file can't collide.
+ String pathSuffix = sourceAddFile.getPath();
+ int slash = sourceAddFile.getPath().lastIndexOf('/');
+ if (slash >= 0) {
+ pathSuffix = sourceAddFile.getPath().substring(0, slash);
+ }
+ addActions = table.writeParquet(pathSuffix, survivors, partLiterals);
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to rewrite " + sourceAddFile.getPath(), e);
+ }
+
+ // RemoveFile for the source file. AddFile.toRemoveFileRow carries over path, partition
+ // values, size, stats, tags, baseRowId, defaultRowCommitVersion and deletionVector — i.e.
+ // every field Delta requires for an "extendedFileMetadata" RemoveFile.
+ final Row removeAction =
+ SingleAction.createRemoveFileSingleAction(
+ sourceAddFile.toRemoveFileRow(true /* dataChange */, Optional.empty()));
+
+ // Emit RemoveFile first, then the AddFile(s). Delta commits don't care about action order
+ // within the same commit, but emitting Remove-before-Add reads more naturally.
+ return singletonCloseableIterator(removeAction).combine(addActions);
+ }
+
+ /**
+ * Wrap {@code batch} with a selection vector that excludes rows matching {@code filter}.
+ *
+ * {@link FilteredColumnarBatch}'s selection vector follows the SQL convention "true =
+ * keep, false = drop" . Our caller's contract is the opposite — {@code filter.test(...)}
+ * returns {@code true} for rows to be deleted — so we negate when building the selection
+ * vector. The underlying batch's column vectors are reused unmodified; no per-row materialization
+ * on the rewrite path.
+ */
+ private static FilteredColumnarBatch applyDeletionFilter(
+ ColumnarBatch batch, BiPredicate filter) {
+ return new FilteredColumnarBatch(
+ batch, ColumnVectorUtils.filter(batch.getSize(), rowId -> !filter.test(batch, rowId)));
+ }
+
+ /**
+ * Convert the partition string map from a scan-file row into a typed-Literal map keyed by the
+ * partition column names — the form expected by {@link DeltaTable#writeParquet}.
+ */
+ private static Map toLiteralPartitionMap(
+ StructType schema, Map partitionStrings) {
+ final Map out = new HashMap<>(partitionStrings.size());
+ for (Map.Entry e : partitionStrings.entrySet()) {
+ final int idx = schema.indexOf(e.getKey());
+ if (idx < 0) {
+ throw new IllegalStateException(
+ "Partition column " + e.getKey() + " not found in table schema " + schema);
+ }
+ final DataType type = schema.fields().get(idx).getDataType();
+ out.put(e.getKey(), parsePartitionLiteral(type, e.getValue()));
+ }
+ return out;
+ }
+
+ /**
+ * Parse the Delta-serialized string form of a partition value into a typed {@link Literal}.
+ *
+ * Mirrors {@code io.delta.kernel.internal.util.PartitionUtils#literalForPartitionValue}, which
+ * is package-private and not callable from here.
+ *
+ *
Timestamp, decimal, and binary partition values aren't yet covered — they require Delta's
+ * specific string-to-microseconds / string-to-BigDecimal parsing logic; add support for them when
+ * a real workload demands them.
+ */
+ private static Literal parsePartitionLiteral(DataType type, String value) {
+ if (value == null) {
+ return Literal.ofNull(type);
+ }
+ if (type instanceof BooleanType) return Literal.ofBoolean(Boolean.parseBoolean(value));
+ if (type instanceof ByteType) return Literal.ofByte(Byte.parseByte(value));
+ if (type instanceof ShortType) return Literal.ofShort(Short.parseShort(value));
+ if (type instanceof IntegerType) return Literal.ofInt(Integer.parseInt(value));
+ if (type instanceof LongType) return Literal.ofLong(Long.parseLong(value));
+ if (type instanceof FloatType) return Literal.ofFloat(Float.parseFloat(value));
+ if (type instanceof DoubleType) return Literal.ofDouble(Double.parseDouble(value));
+ if (type instanceof StringType) return Literal.ofString(value);
+ if (type instanceof DateType) {
+ return Literal.ofDate((int) LocalDate.parse(value).toEpochDay());
+ }
+ throw new UnsupportedOperationException(
+ "Unsupported partition column type for upsert rewrite: " + type);
+ }
+}
diff --git a/flink/src/main/java/io/delta/flink/sink/mergestrategy/IndexLocator.java b/flink/src/main/java/io/delta/flink/sink/mergestrategy/IndexLocator.java
new file mode 100644
index 00000000000..beac87e85ec
--- /dev/null
+++ b/flink/src/main/java/io/delta/flink/sink/mergestrategy/IndexLocator.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright (2026) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.delta.flink.sink.mergestrategy;
+
+import io.delta.flink.table.DeltaTable;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.expressions.Literal;
+import io.delta.kernel.utils.CloseableIterator;
+import java.util.List;
+
+/**
+ * {@link RowLocator} that consults an externally-maintained secondary index (Hudi-Bloom-style) over
+ * the PK columns to skip files Bloom-misses. Intended for tables where PKs aren't clustered by file
+ * and {@link ScanLocator} degrades to a full scan.
+ *
+ *
Status: not implemented. {@link #find} throws; needs the index storage, write path, and
+ * lookup logic to be built out.
+ */
+public class IndexLocator implements RowLocator {
+
+ @Override
+ public CloseableIterator find(DeltaTable table, int[] pkIndices, List> pks) {
+ throw new UnsupportedOperationException("Not implemented. WIP");
+ }
+}
diff --git a/flink/src/main/java/io/delta/flink/sink/mergestrategy/MoRUpsert.java b/flink/src/main/java/io/delta/flink/sink/mergestrategy/MoRUpsert.java
new file mode 100644
index 00000000000..8d9289b4704
--- /dev/null
+++ b/flink/src/main/java/io/delta/flink/sink/mergestrategy/MoRUpsert.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright (2026) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.delta.flink.sink.mergestrategy;
+
+import io.delta.kernel.data.ColumnarBatch;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.utils.CloseableIterator;
+import java.util.function.BiPredicate;
+
+/**
+ * Merge-on-read {@link Upsert} (stub). Logically deletes rows by writing deletion vectors over the
+ * existing data files; readers combine each base file with its DV at scan time.
+ *
+ * Status: not implemented. {@link #deleteRecords} throws; needs DV writing, the {@code
+ * deletionVectors} table feature, and concurrent-commit retry handling before it can ship.
+ */
+public class MoRUpsert extends Upsert {
+
+ @Override
+ protected CloseableIterator deleteRecords(
+ Row addFile, BiPredicate filter) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+}
diff --git a/flink/src/main/java/io/delta/flink/sink/mergestrategy/RowLocator.java b/flink/src/main/java/io/delta/flink/sink/mergestrategy/RowLocator.java
new file mode 100644
index 00000000000..0ecc7640e3f
--- /dev/null
+++ b/flink/src/main/java/io/delta/flink/sink/mergestrategy/RowLocator.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright (2026) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.delta.flink.sink.mergestrategy;
+
+import io.delta.flink.table.DeltaTable;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.expressions.Literal;
+import io.delta.kernel.utils.CloseableIterator;
+import java.util.List;
+
+/**
+ * Pluggable strategy for finding Delta data files that may contain rows whose primary key matches a
+ * given set of PK tuples. The "where do I look" half of a merge — {@link Upsert} pairs a locator
+ * with a row-level filter that does the exact intersection.
+ *
+ * Implementations must not return false negatives (would leave stale rows). False positives are
+ * fine — the row-level filter discards non-matches.
+ */
+public interface RowLocator {
+
+ /**
+ * Return scan-file rows for files that may contain at least one row whose PK is in {@code pks}.
+ *
+ * @param table the Delta table to search
+ * @param pkIndices ordinals of the PK columns in the table schema, in PK order
+ * @param pks PK tuples to locate; each inner list has the same arity as {@code pkIndices}. May be
+ * empty, in which case implementations should return an empty iterator.
+ * @return iterator of scan-file rows in Kernel's {@code Scan.getScanFiles(...)} shape
+ */
+ CloseableIterator find(DeltaTable table, int[] pkIndices, List> pks);
+}
diff --git a/flink/src/main/java/io/delta/flink/sink/mergestrategy/ScanLocator.java b/flink/src/main/java/io/delta/flink/sink/mergestrategy/ScanLocator.java
new file mode 100644
index 00000000000..90ed1fc08fa
--- /dev/null
+++ b/flink/src/main/java/io/delta/flink/sink/mergestrategy/ScanLocator.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright (2026) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.delta.flink.sink.mergestrategy;
+
+import static io.delta.flink.kernel.ExpressionUtils.in;
+
+import io.delta.flink.table.DeltaTable;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.expressions.Literal;
+import io.delta.kernel.expressions.Predicate;
+import io.delta.kernel.utils.CloseableIterator;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Default {@link RowLocator}: builds an {@code (pkCols) IN (pks)} predicate via {@link
+ * io.delta.flink.kernel.ExpressionUtils#in} and runs it through Kernel's snapshot scan-files step,
+ * so only files whose stats overlap the PK set are returned.
+ *
+ * Correct on any Delta table, no extra index required. Pruning quality depends on the Parquet
+ * min/max stats; tight for time-ordered PKs, degrades toward a full scan for UUID-like PKs (use
+ * {@link IndexLocator} once implemented for that case).
+ */
+public class ScanLocator implements RowLocator {
+
+ @Override
+ public CloseableIterator find(DeltaTable table, int[] pkIndices, List> pks) {
+
+ List pkNames =
+ Arrays.stream(pkIndices)
+ .mapToObj(idx -> table.getSchema().fieldNames().get(idx))
+ .collect(Collectors.toList());
+ Predicate filterPk = in(pkNames, pks);
+
+ return table.scan(filterPk);
+ }
+}
diff --git a/flink/src/main/java/io/delta/flink/sink/mergestrategy/Upsert.java b/flink/src/main/java/io/delta/flink/sink/mergestrategy/Upsert.java
new file mode 100644
index 00000000000..318dd32aff6
--- /dev/null
+++ b/flink/src/main/java/io/delta/flink/sink/mergestrategy/Upsert.java
@@ -0,0 +1,147 @@
+/*
+ * Copyright (2026) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.delta.flink.sink.mergestrategy;
+
+import io.delta.flink.kernel.ColumnVectorUtils;
+import io.delta.flink.sink.DeltaSinkConf;
+import io.delta.flink.sink.MergeStrategy;
+import io.delta.flink.table.AbstractKernelTable;
+import io.delta.flink.table.DeltaTable;
+import io.delta.kernel.data.ColumnarBatch;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.expressions.Literal;
+import io.delta.kernel.utils.CloseableIterator;
+import java.io.IOException;
+import java.util.*;
+import java.util.function.BiPredicate;
+import java.util.stream.Collectors;
+
+/**
+ * Abstract base for upsert merge strategies. Owns the per-checkpoint PK bookkeeping (upserted /
+ * deleted PKs, touched partitions, lookup index) and orchestrates {@link #merge}: ask the
+ * configured {@link RowLocator} for candidate files, hand each one to {@link #deleteRecords},
+ * flatten the resulting Delta actions, and clear internal state.
+ *
+ * Subclasses choose how to materialize deletes: {@link CoWUpsert} rewrites matched files; {@link
+ * MoRUpsert} (stub) writes deletion vectors over them.
+ */
+public abstract class Upsert implements MergeStrategy {
+ /**
+ * Primary-key values (in {@link DeltaSinkConf#getPrimaryKeyOrdinals()} order) of every {@code
+ * INSERT}/{@code UPDATE_AFTER} row recorded during the current checkpoint.
+ */
+ private final List> upsertedPrimaryKeys = new ArrayList<>();
+
+ /** Primary-key values of every {@code DELETE} row recorded during the current checkpoint. */
+ private final List> deletedPrimaryKeys = new ArrayList<>();
+
+ /** Indices for quick search of pks */
+ private final Set> primaryKeyIndices = new HashSet<>();
+ /**
+ * Distinct partition value maps touched during the checkpoint. Keyed by a stringified form of the
+ * partition map so we deduplicate cheaply; the value is the original {@code Literal}-based map
+ * used to bound the scan in {@link #merge}.
+ */
+ private final Map, Map> touchedPartitions =
+ new LinkedHashMap<>();
+
+ protected transient AbstractKernelTable table;
+
+ protected RowLocator rowLocator = new ScanLocator();
+
+ @Override
+ public void recordUpsert(List primaryKey, Map partitionValues) {
+ upsertedPrimaryKeys.add(primaryKey);
+ cachePrimaryKey(primaryKey);
+ cachePartition(partitionValues);
+ }
+
+ @Override
+ public void recordDelete(List primaryKey, Map partitionValues) {
+ deletedPrimaryKeys.add(primaryKey);
+ cachePrimaryKey(primaryKey);
+ cachePartition(partitionValues);
+ }
+
+ @Override
+ public List merge(DeltaTable table, DeltaSinkConf conf) throws IOException {
+ this.table = (AbstractKernelTable) table;
+ boolean hasWork = !upsertedPrimaryKeys.isEmpty() || !deletedPrimaryKeys.isEmpty();
+ try {
+ if (!hasWork) {
+ return Collections.emptyList();
+ }
+
+ // 1. Locate all data files that pending scan
+ List> pkToDelete = new ArrayList<>();
+ pkToDelete.addAll(upsertedPrimaryKeys);
+ pkToDelete.addAll(deletedPrimaryKeys);
+ CloseableIterator addFiles =
+ rowLocator.find(table, conf.getPrimaryKeyOrdinals(), pkToDelete);
+
+ // 2. Delete rows from found data files
+ BiPredicate filter =
+ (batch, rowId) ->
+ primaryKeyIndices.contains(
+ extractPrimaryKey(conf.getPrimaryKeyOrdinals(), batch, rowId));
+ return addFiles.flatMap(addFile -> deleteRecords(addFile, filter)).toInMemoryList();
+ } finally {
+ upsertedPrimaryKeys.clear();
+ deletedPrimaryKeys.clear();
+ primaryKeyIndices.clear();
+ touchedPartitions.clear();
+ }
+ }
+
+ /**
+ * Emit the Delta actions needed to logically delete from {@code addFile} every row matched by
+ * {@code filter}. Called once per candidate file returned by {@link RowLocator#find}; the
+ * returned iterator is flattened into {@link #merge}'s overall result. Implementations may assume
+ * {@link #table} has been initialized by the surrounding {@code merge(...)} call.
+ *
+ * @param addFile scan-file row in Kernel's {@code Scan.getScanFiles(...)} shape
+ * @param filter row-level predicate; {@code true} to remove the row, {@code false} to keep it
+ * @return single-action rows already wrapped as {@code SingleAction}s
+ */
+ protected abstract CloseableIterator deleteRecords(
+ Row addFile, BiPredicate filter);
+
+ private void cachePrimaryKey(List primaryKey) {
+ primaryKeyIndices.add(
+ primaryKey.stream()
+ .map(key -> Optional.ofNullable(key).map(Object::toString).orElse(""))
+ .collect(Collectors.toList()));
+ }
+
+ private void cachePartition(Map partitionValues) {
+ Map dedupKey =
+ partitionValues.entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e -> String.valueOf(e.getValue())));
+ touchedPartitions.putIfAbsent(dedupKey, partitionValues);
+ }
+
+ private List extractPrimaryKey(int[] ordinals, ColumnarBatch data, Integer rowId) {
+ List key = new ArrayList<>(ordinals.length);
+ for (int ord : ordinals) {
+ key.add(
+ Optional.ofNullable(ColumnVectorUtils.get(data.getColumnVector(ord), rowId))
+ .map(Object::toString)
+ .orElse(""));
+ }
+ return key;
+ }
+}
diff --git a/flink/src/main/java/io/delta/flink/sink/sql/DeltaDynamicTableSink.java b/flink/src/main/java/io/delta/flink/sink/sql/DeltaDynamicTableSink.java
index d63cb92849c..87b7be7a6be 100644
--- a/flink/src/main/java/io/delta/flink/sink/sql/DeltaDynamicTableSink.java
+++ b/flink/src/main/java/io/delta/flink/sink/sql/DeltaDynamicTableSink.java
@@ -19,11 +19,14 @@
import static io.delta.flink.sink.sql.DeltaDynamicTableSinkFactory.*;
import io.delta.flink.sink.DeltaSink;
+import io.delta.flink.sink.DeltaSinkConf;
import io.delta.kernel.internal.util.Preconditions;
import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.api.ValidationException;
@@ -37,6 +40,7 @@
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
/**
* Flink {@link DynamicTableSink} implementation that writes data to a Delta table via {@link
@@ -62,9 +66,23 @@ public DeltaDynamicTableSink(
@Override
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+ if (getWriteMode() == DeltaSinkConf.WriteMode.UPSERT) {
+ // Use Flink's standard upsert mode: INSERT, UPDATE_AFTER, DELETE. UPDATE_BEFORE is elided
+ // by Flink when the sink declares a primary key.
+ return ChangelogMode.newBuilder()
+ .addContainedKind(RowKind.INSERT)
+ .addContainedKind(RowKind.UPDATE_AFTER)
+ .addContainedKind(RowKind.DELETE)
+ .build();
+ }
return ChangelogMode.insertOnly();
}
+ /** Reads {@link DeltaSinkConf#WRITE_MODE} from the options map as a typed enum. */
+ private DeltaSinkConf.WriteMode getWriteMode() {
+ return Configuration.fromMap(options).get(DeltaSinkConf.WRITE_MODE);
+ }
+
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
RowType rowType = (RowType) consumedDataType.getLogicalType();
@@ -72,6 +90,18 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
Preconditions.checkArgument(
options.get(FactoryUtil.CONNECTOR.key()).equals(IDENTIFIER), "Target table must be delta");
+ DeltaSinkConf.WriteMode writeMode = getWriteMode();
+ // The factory already resolved column names to ordinals when building the options map.
+ // Here we just parse them back into Integers and hand them to DeltaSink.Builder.
+ List primaryKeyOrdinals =
+ writeMode == DeltaSinkConf.WriteMode.UPSERT
+ ? Arrays.stream(options.getOrDefault(DeltaSinkConf.PRIMARY_KEY.key(), "").split(","))
+ .map(String::trim)
+ .filter(s -> !s.isEmpty())
+ .map(Integer::parseInt)
+ .collect(java.util.stream.Collectors.toList())
+ : java.util.Collections.emptyList();
+
DeltaSink deltaSink;
if (options.containsKey(TABLE_PATH.key())) {
// Hadoop-based Sink
@@ -82,6 +112,8 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
.withTablePath(options.get(TABLE_PATH.key()))
.withPartitionColNames(
Arrays.asList(options.getOrDefault(PARTITIONS.key(), "").split(",")))
+ .withWriteMode(writeMode)
+ .withPrimaryKey(primaryKeyOrdinals)
.build();
} else {
// Fetch catalog and configs
@@ -97,6 +129,8 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
"unitycatalog.table_name", tableId.asSummaryString()))
.withEndpoint(options.get(FlinkUnityCatalogFactory.ENDPOINT.key()))
.withToken(options.get(FlinkUnityCatalogFactory.TOKEN.key()))
+ .withWriteMode(writeMode)
+ .withPrimaryKey(primaryKeyOrdinals)
.build();
}
diff --git a/flink/src/main/java/io/delta/flink/sink/sql/DeltaDynamicTableSinkFactory.java b/flink/src/main/java/io/delta/flink/sink/sql/DeltaDynamicTableSinkFactory.java
index 67b047abada..c5e382c2930 100644
--- a/flink/src/main/java/io/delta/flink/sink/sql/DeltaDynamicTableSinkFactory.java
+++ b/flink/src/main/java/io/delta/flink/sink/sql/DeltaDynamicTableSinkFactory.java
@@ -17,11 +17,18 @@
package io.delta.flink.sink.sql;
import io.delta.flink.sink.DeltaSinkConf;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.FactoryUtil;
@@ -66,8 +73,45 @@ public DynamicTableSink createDynamicTableSink(Context context) {
DataType consumedDataType = schema.toPhysicalRowDataType();
Integer sinkParallelism = options.getOptional(FactoryUtil.SINK_PARALLELISM).orElse(null);
+
+ Map resolvedOptions = new HashMap<>(options.toMap());
+ DeltaSinkConf.WriteMode writeMode = options.get(DeltaSinkConf.WRITE_MODE);
+
+ if (writeMode == DeltaSinkConf.WriteMode.UPSERT) {
+ Optional pkConstraint = schema.getPrimaryKey();
+ if (pkConstraint.isEmpty()) {
+ throw new ValidationException(
+ "write.mode = 'upsert' requires a 'PRIMARY KEY (...) NOT ENFORCED' clause on the "
+ + "table definition.");
+ }
+ // Resolve PK column names to 0-based ordinals against the physical schema. Lower layers
+ // operate on RowData ordinals; doing the lookup here lets us surface "unknown column"
+ // failures as ValidationException at planning time instead of as a writer-construction
+ // RuntimeException at job startup.
+ List physicalNames = schema.getColumnNames();
+ String ordinals =
+ pkConstraint.get().getColumns().stream()
+ .map(
+ name -> {
+ int idx = physicalNames.indexOf(name);
+ if (idx < 0) {
+ throw new ValidationException(
+ "Primary-key column '"
+ + name
+ + "' does not exist in the table schema "
+ + physicalNames
+ + ".");
+ }
+ return Integer.toString(idx);
+ })
+ .collect(Collectors.joining(","));
+ // PRIMARY_KEY is an internal wire-format option carrying the resolved ordinals to
+ // TaskManagers; it is intentionally not in optionalOptions().
+ resolvedOptions.put(DeltaSinkConf.PRIMARY_KEY.key(), ordinals);
+ }
+
return new DeltaDynamicTableSink(
- context.getObjectIdentifier(), consumedDataType, sinkParallelism, options.toMap());
+ context.getObjectIdentifier(), consumedDataType, sinkParallelism, resolvedOptions);
}
public static final String IDENTIFIER = "delta";
@@ -95,6 +139,7 @@ public Set> optionalOptions() {
DeltaSinkConf.SCHEMA_EVOLUTION_MODE,
DeltaSinkConf.FILE_ROLLING_STRATEGY,
DeltaSinkConf.FILE_ROLLING_SIZE,
- DeltaSinkConf.FILE_ROLLING_COUNT);
+ DeltaSinkConf.FILE_ROLLING_COUNT,
+ DeltaSinkConf.WRITE_MODE);
}
}
diff --git a/flink/src/main/java/io/delta/flink/table/AbstractKernelTable.java b/flink/src/main/java/io/delta/flink/table/AbstractKernelTable.java
index 407c4f4b449..c96f32e3a6d 100644
--- a/flink/src/main/java/io/delta/flink/table/AbstractKernelTable.java
+++ b/flink/src/main/java/io/delta/flink/table/AbstractKernelTable.java
@@ -16,6 +16,8 @@
package io.delta.flink.table;
+import static io.delta.kernel.internal.util.Utils.toCloseableIterator;
+
import dev.failsafe.Failsafe;
import dev.failsafe.Fallback;
import dev.failsafe.RetryPolicy;
@@ -34,6 +36,7 @@
import io.delta.kernel.exceptions.TableAlreadyExistsException;
import io.delta.kernel.expressions.Column;
import io.delta.kernel.expressions.Literal;
+import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.internal.DeltaLogActionUtils;
import io.delta.kernel.internal.data.TransactionStateRow;
import io.delta.kernel.metrics.TransactionReport;
@@ -318,6 +321,19 @@ public CloseableIterator writeParquet(
});
}
+ @Override
+ public CloseableIterator scan(Predicate predicate) {
+ return snapshot()
+ .map(
+ s ->
+ s.getScanBuilder()
+ .withFilter(predicate)
+ .build()
+ .getScanFiles(getEngine())
+ .flatMap(FilteredColumnarBatch::getRows))
+ .orElse(toCloseableIterator(Collections.emptyIterator()));
+ }
+
/**
* Load snapshot using a separated thread. This will allow external request to interrupt the
* thread during time-consuming operations in loading snapshot, such as log replay.
diff --git a/flink/src/main/java/io/delta/flink/table/DeltaTable.java b/flink/src/main/java/io/delta/flink/table/DeltaTable.java
index c554873e6e2..adc1330b071 100644
--- a/flink/src/main/java/io/delta/flink/table/DeltaTable.java
+++ b/flink/src/main/java/io/delta/flink/table/DeltaTable.java
@@ -20,6 +20,7 @@
import io.delta.kernel.data.FilteredColumnarBatch;
import io.delta.kernel.data.Row;
import io.delta.kernel.expressions.Literal;
+import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterable;
import io.delta.kernel.utils.CloseableIterator;
@@ -165,4 +166,6 @@ CloseableIterator writeParquet(
CloseableIterator data,
Map partitionValues)
throws IOException;
+
+ CloseableIterator scan(Predicate predicate);
}
diff --git a/flink/src/test/java/io/delta/flink/inttest/DeltaSinkIntTest.java b/flink/src/test/java/io/delta/flink/inttest/DeltaSinkIntTest.java
index bd8dc50126c..15ceb1bfc6b 100644
--- a/flink/src/test/java/io/delta/flink/inttest/DeltaSinkIntTest.java
+++ b/flink/src/test/java/io/delta/flink/inttest/DeltaSinkIntTest.java
@@ -16,8 +16,8 @@
package io.delta.flink.inttest;
-import static io.delta.flink.sink.ConversionsTest.f;
-import static io.delta.flink.sink.ConversionsTest.row;
+import static io.delta.flink.sink.ConversionsFlinkToDeltaTest.f;
+import static io.delta.flink.sink.ConversionsFlinkToDeltaTest.row;
import static org.junit.jupiter.api.Assertions.assertEquals;
import io.delta.flink.sink.DeltaSink;
diff --git a/flink/src/test/java/io/delta/flink/sink/ConversionsDeltaToJavaTest.java b/flink/src/test/java/io/delta/flink/sink/ConversionsDeltaToJavaTest.java
new file mode 100644
index 00000000000..fa2bd3450a3
--- /dev/null
+++ b/flink/src/test/java/io/delta/flink/sink/ConversionsDeltaToJavaTest.java
@@ -0,0 +1,179 @@
+/*
+ * Copyright (2026) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.delta.flink.sink;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import io.delta.kernel.data.Row;
+import io.delta.kernel.internal.data.GenericRow;
+import io.delta.kernel.types.ArrayType;
+import io.delta.kernel.types.BinaryType;
+import io.delta.kernel.types.BooleanType;
+import io.delta.kernel.types.ByteType;
+import io.delta.kernel.types.DataType;
+import io.delta.kernel.types.DateType;
+import io.delta.kernel.types.DecimalType;
+import io.delta.kernel.types.DoubleType;
+import io.delta.kernel.types.FloatType;
+import io.delta.kernel.types.IntegerType;
+import io.delta.kernel.types.LongType;
+import io.delta.kernel.types.MapType;
+import io.delta.kernel.types.ShortType;
+import io.delta.kernel.types.StringType;
+import io.delta.kernel.types.StructType;
+import io.delta.kernel.types.TimestampNTZType;
+import io.delta.kernel.types.TimestampType;
+import java.math.BigDecimal;
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+/** JUnit test suite for {@link Conversions.DeltaToJava}. */
+public class ConversionsDeltaToJavaTest {
+
+ // -------------------------------------------------------------------------
+ // data(): per-type round-trip — every primitive Delta type in one method
+ // -------------------------------------------------------------------------
+
+ @Test
+ void testData() {
+ // Boolean
+ assertEquals(Boolean.TRUE, dataAt(BooleanType.BOOLEAN, true), "BOOLEAN true");
+ assertEquals(Boolean.FALSE, dataAt(BooleanType.BOOLEAN, false), "BOOLEAN false");
+
+ // Integers
+ assertEquals(Byte.valueOf((byte) 7), dataAt(ByteType.BYTE, (byte) 7), "BYTE");
+ assertEquals(Short.valueOf((short) 1234), dataAt(ShortType.SHORT, (short) 1234), "SHORT");
+ assertEquals(Integer.valueOf(42), dataAt(IntegerType.INTEGER, 42), "INTEGER");
+ assertEquals(
+ Long.valueOf(1_000_000_000_000L), dataAt(LongType.LONG, 1_000_000_000_000L), "LONG");
+
+ // Floating point
+ assertEquals(Float.valueOf(3.5f), dataAt(FloatType.FLOAT, 3.5f), "FLOAT");
+ assertEquals(Double.valueOf(3.14159d), dataAt(DoubleType.DOUBLE, 3.14159d), "DOUBLE");
+
+ // String
+ assertEquals("delta", dataAt(StringType.STRING, "delta"), "STRING");
+
+ // Binary — DeltaToJava returns the raw byte[] (unlike FlinkToJava which base64-encodes).
+ byte[] bytes = new byte[] {0x01, 0x02, 0x03};
+ assertArrayEquals(bytes, (byte[]) dataAt(BinaryType.BINARY, bytes), "BINARY");
+
+ // Decimal
+ BigDecimal decValue = new BigDecimal("12.34");
+ assertEquals(decValue, dataAt(new DecimalType(10, 2), decValue), "DECIMAL");
+ }
+
+ // Temporal types: the production code reads them via getInt(date) / getLong(timestamp), which
+ // works on Row implementations backed by Parquet reads but throws on a strict GenericRow (which
+ // rejects "wide" access for typed columns). Once the helper is tested against a Parquet-backed
+ // Row or GenericRow relaxes its access checks, these can be enabled.
+ @Test
+ @Disabled("GenericRow rejects rowData.getInt(...) on a DateType column")
+ void testDate() {
+ assertEquals(Integer.valueOf(19000), dataAt(DateType.DATE, 19000));
+ }
+
+ @Test
+ @Disabled("GenericRow rejects rowData.getLong(...) on a TimestampType column")
+ void testTimestamp() {
+ assertEquals(
+ Long.valueOf(1_700_000_000_000_000L),
+ dataAt(TimestampType.TIMESTAMP, 1_700_000_000_000_000L));
+ }
+
+ @Test
+ @Disabled("GenericRow rejects rowData.getLong(...) on a TimestampNTZType column")
+ void testTimestampNTZ() {
+ assertEquals(
+ Long.valueOf(1_700_000_000_000_000L),
+ dataAt(TimestampNTZType.TIMESTAMP_NTZ, 1_700_000_000_000_000L));
+ }
+
+ // -------------------------------------------------------------------------
+ // data(): null handling
+ // -------------------------------------------------------------------------
+
+ @Test
+ void testNullReturnsNull() {
+ DataType[] types = {
+ BooleanType.BOOLEAN,
+ ByteType.BYTE,
+ ShortType.SHORT,
+ IntegerType.INTEGER,
+ LongType.LONG,
+ FloatType.FLOAT,
+ DoubleType.DOUBLE,
+ StringType.STRING,
+ BinaryType.BINARY,
+ DateType.DATE,
+ TimestampType.TIMESTAMP,
+ TimestampNTZType.TIMESTAMP_NTZ,
+ new DecimalType(10, 2)
+ };
+ for (DataType type : types) {
+ StructType schema = new StructType().add("col", type);
+ Row row = new GenericRow(schema, new HashMap<>()); // empty map → null at every ordinal
+ assertNull(Conversions.DeltaToJava.data(schema, row, 0), "expected null for type " + type);
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // data(): unsupported (complex) types throw
+ // -------------------------------------------------------------------------
+
+ @Test
+ void testStructTypeThrows() {
+ assertUnsupported(new StructType().add("inner", IntegerType.INTEGER));
+ }
+
+ @Test
+ void testArrayTypeThrows() {
+ assertUnsupported(new ArrayType(IntegerType.INTEGER, true));
+ }
+
+ @Test
+ void testMapTypeThrows() {
+ assertUnsupported(new MapType(StringType.STRING, IntegerType.INTEGER, true));
+ }
+
+ // -------------------------------------------------------------------------
+ // Helpers
+ // -------------------------------------------------------------------------
+
+ /** Build a one-column Row of the given type/value and invoke {@code DeltaToJava.data(...)}. */
+ private static Object dataAt(DataType type, Object value) {
+ StructType schema = new StructType().add("col", type);
+ Map values = new HashMap<>();
+ values.put(0, value);
+ Row row = new GenericRow(schema, values);
+ return Conversions.DeltaToJava.data(schema, row, 0);
+ }
+
+ /** Assert that {@code DeltaToJava.data} throws {@link UnsupportedOperationException}. */
+ private static void assertUnsupported(DataType type) {
+ StructType schema = new StructType().add("col", type);
+ // For complex types we still need a non-null cell so we don't short-circuit on the null
+ // path; use a sentinel object — the function should throw before accessing it.
+ Map values = new HashMap<>();
+ values.put(0, new Object());
+ Row row = new GenericRow(schema, values);
+ assertThrows(
+ UnsupportedOperationException.class, () -> Conversions.DeltaToJava.data(schema, row, 0));
+ }
+}
diff --git a/flink/src/test/java/io/delta/flink/sink/ConversionsTest.java b/flink/src/test/java/io/delta/flink/sink/ConversionsFlinkToDeltaTest.java
similarity index 99%
rename from flink/src/test/java/io/delta/flink/sink/ConversionsTest.java
rename to flink/src/test/java/io/delta/flink/sink/ConversionsFlinkToDeltaTest.java
index d1108561d67..5de4687b84a 100644
--- a/flink/src/test/java/io/delta/flink/sink/ConversionsTest.java
+++ b/flink/src/test/java/io/delta/flink/sink/ConversionsFlinkToDeltaTest.java
@@ -31,7 +31,7 @@
import org.junit.jupiter.api.Test;
/** JUnit test suite for Flink to Delta type conversions. */
-public class ConversionsTest implements FlinkTypeTests {
+public class ConversionsFlinkToDeltaTest implements FlinkTypeTests {
public static RowType row(RowField... fields) {
return new RowType(Arrays.asList(fields));
diff --git a/flink/src/test/java/io/delta/flink/sink/ConversionsFlinkToJavaTest.java b/flink/src/test/java/io/delta/flink/sink/ConversionsFlinkToJavaTest.java
new file mode 100644
index 00000000000..e3bbda0e1e4
--- /dev/null
+++ b/flink/src/test/java/io/delta/flink/sink/ConversionsFlinkToJavaTest.java
@@ -0,0 +1,176 @@
+/*
+ * Copyright (2026) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.delta.flink.sink;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.*;
+import org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution;
+import org.apache.flink.table.types.logical.YearMonthIntervalType.YearMonthResolution;
+import org.junit.jupiter.api.Test;
+
+/** JUnit test suite for {@link Conversions.FlinkToJava}. */
+public class ConversionsFlinkToJavaTest implements FlinkTypeTests {
+
+ // -------------------------------------------------------------------------
+ // data(): per-type round-trip — every primitive Flink type in one method
+ // -------------------------------------------------------------------------
+
+ @Test
+ void testData() {
+ // Boolean
+ assertEquals(Boolean.TRUE, dataAt(new BooleanType(), Boolean.TRUE), "BOOLEAN true");
+ assertEquals(Boolean.FALSE, dataAt(new BooleanType(), Boolean.FALSE), "BOOLEAN false");
+
+ // Integers
+ assertEquals(Byte.valueOf((byte) 7), dataAt(new TinyIntType(), (byte) 7), "TINYINT");
+ assertEquals(Short.valueOf((short) 1234), dataAt(new SmallIntType(), (short) 1234), "SMALLINT");
+ assertEquals(Integer.valueOf(42), dataAt(new IntType(), 42), "INTEGER");
+ assertEquals(
+ Long.valueOf(1_000_000_000_000L), dataAt(new BigIntType(), 1_000_000_000_000L), "BIGINT");
+
+ // Floating point
+ assertEquals(Float.valueOf(3.5f), dataAt(new FloatType(), 3.5f), "FLOAT");
+ assertEquals(Double.valueOf(3.14159d), dataAt(new DoubleType(), 3.14159d), "DOUBLE");
+
+ // Character
+ assertEquals("hello", dataAt(new CharType(5), StringData.fromString("hello")), "CHAR");
+ assertEquals(
+ "delta",
+ dataAt(new VarCharType(VarCharType.MAX_LENGTH), StringData.fromString("delta")),
+ "VARCHAR");
+
+ // Binary — FlinkToJava base64-encodes (unlike DeltaToJava which returns raw byte[]).
+ assertEquals("AQID", dataAt(new BinaryType(3), new byte[] {0x01, 0x02, 0x03}), "BINARY base64");
+ assertEquals(
+ "ECAwQA==",
+ dataAt(new VarBinaryType(VarBinaryType.MAX_LENGTH), new byte[] {0x10, 0x20, 0x30, 0x40}),
+ "VARBINARY base64");
+
+ // Decimal
+ DecimalData decValue = DecimalData.fromBigDecimal(new BigDecimal("12.34"), 10, 2);
+ assertEquals(decValue, dataAt(new DecimalType(10, 2), decValue), "DECIMAL");
+
+ // Temporal
+ assertEquals(Integer.valueOf(19000), dataAt(new DateType(), 19000), "DATE");
+ assertEquals(Integer.valueOf(123456), dataAt(new TimeType(3), 123456), "TIME");
+ // Timestamps are read via getLong; callers pre-store them as long microseconds.
+ assertEquals(
+ Long.valueOf(1_700_000_000_000_000L),
+ dataAt(new TimestampType(6), 1_700_000_000_000_000L),
+ "TIMESTAMP_WITHOUT_TIME_ZONE");
+ assertEquals(
+ Long.valueOf(1_700_000_000_000_000L),
+ dataAt(new LocalZonedTimestampType(6), 1_700_000_000_000_000L),
+ "TIMESTAMP_WITH_LOCAL_TIME_ZONE");
+
+ // Intervals
+ assertEquals(
+ Integer.valueOf(15),
+ dataAt(new YearMonthIntervalType(YearMonthResolution.YEAR_TO_MONTH, 4), 15),
+ "INTERVAL_YEAR_MONTH");
+ assertEquals(
+ Long.valueOf(86_400_000L),
+ dataAt(new DayTimeIntervalType(DayTimeResolution.DAY, 3, 6), 86_400_000L),
+ "INTERVAL_DAY_TIME");
+ }
+
+ // -------------------------------------------------------------------------
+ // data(): null handling — parameterized across all primitive types
+ // -------------------------------------------------------------------------
+
+ @TestAllFlinkTypes
+ void testPrimitiveDataNull(LogicalType primitiveType) {
+ RowType flinkType = RowType.of(new LogicalType[] {primitiveType}, new String[] {"id"});
+ RowData withNull = GenericRowData.of(new Object[] {null});
+ assertNull(Conversions.FlinkToJava.data(flinkType, withNull, 0));
+ }
+
+ // -------------------------------------------------------------------------
+ // data(): unsupported types
+ // -------------------------------------------------------------------------
+
+ @Test
+ void testUnsupportedTypeThrows() {
+ RowType rowType =
+ RowType.of(new LogicalType[] {new ArrayType(new IntType())}, new String[] {"arr"});
+ RowData row = GenericRowData.of(new Object[] {new Object()});
+ assertThrows(
+ UnsupportedOperationException.class, () -> Conversions.FlinkToJava.data(rowType, row, 0));
+ }
+
+ // -------------------------------------------------------------------------
+ // partitionValues(): shape and null behavior
+ // -------------------------------------------------------------------------
+
+ @Test
+ void testPartitionValuesReturnsOptionalForEachColumn() {
+ RowType schema =
+ RowType.of(
+ new LogicalType[] {new IntType(), new VarCharType(VarCharType.MAX_LENGTH)},
+ new String[] {"id", "part"});
+ RowData row = GenericRowData.of(42, StringData.fromString("p0"));
+
+ Map result =
+ Conversions.FlinkToJava.partitionValues(schema, List.of("id", "part"), row);
+
+ assertEquals(2, result.size());
+ assertEquals(Optional.of(42), result.get("id"));
+ assertEquals(Optional.of("p0"), result.get("part"));
+ }
+
+ @Test
+ void testPartitionValuesWrapsNullAsEmptyOptional() {
+ RowType schema = RowType.of(new LogicalType[] {new IntType()}, new String[] {"id"});
+ RowData row = GenericRowData.of(new Object[] {null});
+
+ Map result =
+ Conversions.FlinkToJava.partitionValues(schema, List.of("id"), row);
+
+ assertEquals(1, result.size());
+ assertEquals(Optional.empty(), result.get("id"));
+ }
+
+ @Test
+ void testPartitionValuesEmptyColumnsReturnsEmptyMap() {
+ RowType schema = RowType.of(new LogicalType[] {new IntType()}, new String[] {"id"});
+ RowData row = GenericRowData.of(42);
+
+ Map result = Conversions.FlinkToJava.partitionValues(schema, List.of(), row);
+
+ assertTrue(result.isEmpty());
+ }
+
+ // -------------------------------------------------------------------------
+ // Helpers
+ // -------------------------------------------------------------------------
+
+ /** Wrap {@code type} in a one-column RowType and invoke {@link Conversions.FlinkToJava#data}. */
+ private static Object dataAt(LogicalType type, Object value) {
+ RowType schema = RowType.of(new LogicalType[] {type}, new String[] {"col"});
+ RowData row = GenericRowData.of(value);
+ return Conversions.FlinkToJava.data(schema, row, 0);
+ }
+}
diff --git a/flink/src/test/java/io/delta/flink/sink/DeltaSinkConfTest.java b/flink/src/test/java/io/delta/flink/sink/DeltaSinkConfTest.java
index 6a97e734614..448b1c4bec0 100644
--- a/flink/src/test/java/io/delta/flink/sink/DeltaSinkConfTest.java
+++ b/flink/src/test/java/io/delta/flink/sink/DeltaSinkConfTest.java
@@ -16,10 +16,14 @@
package io.delta.flink.sink;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import io.delta.kernel.internal.util.ColumnMapping;
import io.delta.kernel.types.*;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.junit.jupiter.api.Test;
@@ -107,4 +111,108 @@ void testSchemaEvolutionModeAllowWithPhysicalName() {
.allMatch(
tableSchema -> conf.getSchemaEvolutionPolicy().allowEvolve(tableSchema, schema)));
}
+
+ // ----------------------------------------------------------------------
+ // write.mode / primary_key
+ // ----------------------------------------------------------------------
+
+ @Test
+ void testWriteModeDefaultsToAppend() {
+ StructType schema = new StructType().add("id", IntegerType.INTEGER);
+ DeltaSinkConf conf = new DeltaSinkConf(schema, Map.of());
+
+ assertEquals(DeltaSinkConf.WriteMode.APPEND, conf.getWriteMode());
+ assertEquals(0, conf.getPrimaryKeyOrdinals().length);
+ assertTrue(!conf.isUpsert());
+ }
+
+ @Test
+ void testUpsertModeRequiresPrimaryKey() {
+ StructType schema = new StructType().add("id", IntegerType.INTEGER);
+ Map opts = new HashMap<>();
+ opts.put(DeltaSinkConf.WRITE_MODE.key(), "upsert");
+
+ assertThrows(IllegalArgumentException.class, () -> new DeltaSinkConf(schema, opts));
+ }
+
+ @Test
+ void testUpsertModeWithPrimaryKey() {
+ StructType schema =
+ new StructType().add("id", IntegerType.INTEGER).add("name", StringType.STRING);
+ Map opts = new HashMap<>();
+ opts.put(DeltaSinkConf.WRITE_MODE.key(), "upsert");
+ opts.put(DeltaSinkConf.PRIMARY_KEY.key(), "0");
+
+ DeltaSinkConf conf = new DeltaSinkConf(schema, opts);
+ assertEquals(DeltaSinkConf.WriteMode.UPSERT, conf.getWriteMode());
+ assertTrue(conf.isUpsert());
+ assertArrayEquals(new int[] {0}, conf.getPrimaryKeyOrdinals());
+ }
+
+ @Test
+ void testPrimaryKeyOrdinalParsing() {
+ StructType schema =
+ new StructType()
+ .add("a", IntegerType.INTEGER)
+ .add("b", IntegerType.INTEGER)
+ .add("c", IntegerType.INTEGER);
+ Map opts = new HashMap<>();
+ opts.put(DeltaSinkConf.WRITE_MODE.key(), "upsert");
+ opts.put(DeltaSinkConf.PRIMARY_KEY.key(), " 0 ,1, 2 ");
+
+ DeltaSinkConf conf = new DeltaSinkConf(schema, opts);
+ assertArrayEquals(new int[] {0, 1, 2}, conf.getPrimaryKeyOrdinals());
+ }
+
+ @Test
+ void testNonIntegerPrimaryKeyThrows() {
+ // Stale name-based wire-format value should now fail loudly instead of being silently
+ // misinterpreted; this guards against accidental regressions of the contract.
+ StructType schema = new StructType().add("id", IntegerType.INTEGER);
+ Map opts = new HashMap<>();
+ opts.put(DeltaSinkConf.WRITE_MODE.key(), "upsert");
+ opts.put(DeltaSinkConf.PRIMARY_KEY.key(), "id");
+
+ assertThrows(IllegalArgumentException.class, () -> new DeltaSinkConf(schema, opts));
+ }
+
+ @Test
+ void testOutOfRangePrimaryKeyOrdinalThrows() {
+ StructType schema = new StructType().add("id", IntegerType.INTEGER);
+ Map opts = new HashMap<>();
+ opts.put(DeltaSinkConf.WRITE_MODE.key(), "upsert");
+ opts.put(DeltaSinkConf.PRIMARY_KEY.key(), "5");
+
+ assertThrows(IllegalArgumentException.class, () -> new DeltaSinkConf(schema, opts));
+ }
+
+ @Test
+ void testNegativePrimaryKeyOrdinalThrows() {
+ StructType schema = new StructType().add("id", IntegerType.INTEGER);
+ Map opts = new HashMap<>();
+ opts.put(DeltaSinkConf.WRITE_MODE.key(), "upsert");
+ opts.put(DeltaSinkConf.PRIMARY_KEY.key(), "-1");
+
+ assertThrows(IllegalArgumentException.class, () -> new DeltaSinkConf(schema, opts));
+ }
+
+ @Test
+ void testWriteModeIsCaseInsensitive() {
+ StructType schema = new StructType().add("id", IntegerType.INTEGER);
+ Map opts = new HashMap<>();
+ opts.put(DeltaSinkConf.WRITE_MODE.key(), "UPSERT");
+ opts.put(DeltaSinkConf.PRIMARY_KEY.key(), "0");
+
+ DeltaSinkConf conf = new DeltaSinkConf(schema, opts);
+ assertEquals(DeltaSinkConf.WriteMode.UPSERT, conf.getWriteMode());
+ }
+
+ @Test
+ void testUnknownWriteModeThrows() {
+ StructType schema = new StructType().add("id", IntegerType.INTEGER);
+ Map opts = new HashMap<>();
+ opts.put(DeltaSinkConf.WRITE_MODE.key(), "merge");
+
+ assertThrows(IllegalArgumentException.class, () -> new DeltaSinkConf(schema, opts));
+ }
}
diff --git a/flink/src/test/java/io/delta/flink/sink/DeltaSinkWriterTest.java b/flink/src/test/java/io/delta/flink/sink/DeltaSinkWriterTest.java
index 42e246ddf49..12a966cc93f 100644
--- a/flink/src/test/java/io/delta/flink/sink/DeltaSinkWriterTest.java
+++ b/flink/src/test/java/io/delta/flink/sink/DeltaSinkWriterTest.java
@@ -16,21 +16,37 @@
package io.delta.flink.sink;
+import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator;
+import static io.delta.kernel.internal.util.Utils.toCloseableIterator;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import io.delta.flink.TestHelper;
import io.delta.flink.table.HadoopTable;
+import io.delta.kernel.Scan;
+import io.delta.kernel.Snapshot;
+import io.delta.kernel.TableManager;
+import io.delta.kernel.data.FilteredColumnarBatch;
+import io.delta.kernel.data.Row;
import io.delta.kernel.defaults.engine.DefaultEngine;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.internal.InternalScanFileUtils;
+import io.delta.kernel.internal.data.ScanStateRow;
import io.delta.kernel.types.*;
+import io.delta.kernel.utils.CloseableIterable;
+import io.delta.kernel.utils.CloseableIterator;
+import io.delta.kernel.utils.FileStatus;
import java.lang.management.ManagementFactory;
import java.net.URI;
import java.util.*;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.StringData;
+import org.apache.flink.types.RowKind;
import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
@@ -258,4 +274,335 @@ void testMemoryIsStableOnLargeAmountOfPartitions() {
}
});
}
+
+ // ---------------------------------------------------------------------------
+ // Upsert-mode tests: verify the RowKind routing in DeltaSinkWriter.write and
+ // the wiring between the writer and its MergeStrategy at prepareCommit time.
+ //
+ // These tests assert on the *logical* row content of the table after committing
+ // the actions produced by prepareCommit. We deliberately do NOT inspect the
+ // physical action shape (AddFile / RemoveFile / deletion vectors) so the same
+ // expectations remain valid when the merge strategy switches from Copy-on-Write
+ // to Merge-on-Read.
+ // ---------------------------------------------------------------------------
+
+ @Test
+ void testUpsertInsertOnEmptyTable() throws Exception {
+ withTempDir(
+ dir -> {
+ StructType schema =
+ new StructType().add("id", IntegerType.INTEGER).add("part", StringType.STRING);
+ HadoopTable table =
+ new HadoopTable(
+ URI.create(dir.getAbsolutePath()),
+ Collections.emptyMap(),
+ schema,
+ Collections.emptyList());
+ table.open();
+
+ DeltaSinkWriter sinkWriter = newSinkWriter(table, upsertConf(schema, new int[] {0}));
+
+ GenericRowData row = GenericRowData.of(5, StringData.fromString("p0"));
+ row.setRowKind(RowKind.INSERT);
+ sinkWriter.write(row, new TestSinkWriterContext(0, 0));
+
+ commitResults(table, sinkWriter.prepareCommit());
+
+ List> rows = readAllRows(dir.getAbsolutePath(), schema);
+ assertEquals(List.of(List.of(5, "p0")), rows);
+ });
+ }
+
+ @Test
+ void testUpsertUpdateAfterReplacesPreImage() throws Exception {
+ withTempDir(
+ dir -> {
+ StructType schema =
+ new StructType().add("id", IntegerType.INTEGER).add("part", StringType.STRING);
+ HadoopTable table =
+ new HadoopTable(
+ URI.create(dir.getAbsolutePath()),
+ Collections.emptyMap(),
+ schema,
+ Collections.emptyList());
+ table.open();
+
+ prePopulate(
+ table,
+ schema,
+ List.of(
+ GenericRowData.of(1, StringData.fromString("old")),
+ GenericRowData.of(5, StringData.fromString("old")),
+ GenericRowData.of(9, StringData.fromString("old"))));
+
+ DeltaSinkWriter sinkWriter = newSinkWriter(table, upsertConf(schema, new int[] {0}));
+
+ GenericRowData row = GenericRowData.of(5, StringData.fromString("new"));
+ row.setRowKind(RowKind.UPDATE_AFTER);
+ sinkWriter.write(row, new TestSinkWriterContext(0, 0));
+
+ commitResults(table, sinkWriter.prepareCommit());
+
+ // id=5 now carries the new image; id=1 and id=9 are untouched.
+ List> rows = sortById(readAllRows(dir.getAbsolutePath(), schema));
+ assertEquals(List.of(List.of(1, "old"), List.of(5, "new"), List.of(9, "old")), rows);
+ });
+ }
+
+ @Test
+ void testUpsertDeleteRemovesPreImage() throws Exception {
+ withTempDir(
+ dir -> {
+ StructType schema =
+ new StructType().add("id", IntegerType.INTEGER).add("part", StringType.STRING);
+ HadoopTable table =
+ new HadoopTable(
+ URI.create(dir.getAbsolutePath()),
+ Collections.emptyMap(),
+ schema,
+ Collections.emptyList());
+ table.open();
+
+ prePopulate(
+ table,
+ schema,
+ List.of(
+ GenericRowData.of(1, StringData.fromString("a")),
+ GenericRowData.of(7, StringData.fromString("b")),
+ GenericRowData.of(9, StringData.fromString("c"))));
+
+ DeltaSinkWriter sinkWriter = newSinkWriter(table, upsertConf(schema, new int[] {0}));
+
+ GenericRowData row = GenericRowData.of(7, StringData.fromString("b"));
+ row.setRowKind(RowKind.DELETE);
+ sinkWriter.write(row, new TestSinkWriterContext(0, 0));
+
+ commitResults(table, sinkWriter.prepareCommit());
+
+ List> rows = sortById(readAllRows(dir.getAbsolutePath(), schema));
+ assertEquals(List.of(List.of(1, "a"), List.of(9, "c")), rows);
+ });
+ }
+
+ @Test
+ void testUpsertUpdateBeforeIsDropped() throws Exception {
+ withTempDir(
+ dir -> {
+ StructType schema =
+ new StructType().add("id", IntegerType.INTEGER).add("part", StringType.STRING);
+ HadoopTable table =
+ new HadoopTable(
+ URI.create(dir.getAbsolutePath()),
+ Collections.emptyMap(),
+ schema,
+ Collections.emptyList());
+ table.open();
+
+ prePopulate(table, schema, List.of(GenericRowData.of(11, StringData.fromString("kept"))));
+
+ DeltaSinkWriter sinkWriter = newSinkWriter(table, upsertConf(schema, new int[] {0}));
+
+ GenericRowData row = GenericRowData.of(11, StringData.fromString("dropped"));
+ row.setRowKind(RowKind.UPDATE_BEFORE);
+ sinkWriter.write(row, new TestSinkWriterContext(0, 0));
+
+ // UPDATE_BEFORE produces no work for the writer or the merge strategy. The
+ // pre-existing row must remain in the table untouched.
+ commitResults(table, sinkWriter.prepareCommit());
+
+ List> rows = readAllRows(dir.getAbsolutePath(), schema);
+ assertEquals(List.of(List.of(11, "kept")), rows);
+ });
+ }
+
+ @Test
+ void testAppendModeUpdateAfterThrows() {
+ withTempDir(
+ dir -> {
+ StructType schema =
+ new StructType().add("id", IntegerType.INTEGER).add("part", StringType.STRING);
+ HadoopTable table =
+ new HadoopTable(
+ URI.create(dir.getAbsolutePath()),
+ Collections.emptyMap(),
+ schema,
+ Collections.emptyList());
+ table.open();
+
+ DeltaSinkWriter sinkWriter =
+ newSinkWriter(table, new DeltaSinkConf(schema, Collections.emptyMap()));
+
+ GenericRowData row = GenericRowData.of(1, StringData.fromString("p0"));
+ row.setRowKind(RowKind.UPDATE_AFTER);
+ assertThrows(
+ IllegalStateException.class,
+ () -> sinkWriter.write(row, new TestSinkWriterContext(0, 0)));
+ });
+ }
+
+ @Test
+ void testAppendModeDeleteThrows() {
+ withTempDir(
+ dir -> {
+ StructType schema =
+ new StructType().add("id", IntegerType.INTEGER).add("part", StringType.STRING);
+ HadoopTable table =
+ new HadoopTable(
+ URI.create(dir.getAbsolutePath()),
+ Collections.emptyMap(),
+ schema,
+ Collections.emptyList());
+ table.open();
+
+ DeltaSinkWriter sinkWriter =
+ newSinkWriter(table, new DeltaSinkConf(schema, Collections.emptyMap()));
+
+ GenericRowData row = GenericRowData.of(1, StringData.fromString("p0"));
+ row.setRowKind(RowKind.DELETE);
+ assertThrows(
+ IllegalStateException.class,
+ () -> sinkWriter.write(row, new TestSinkWriterContext(0, 0)));
+ });
+ }
+
+ // ---------------------------------------------------------------------------
+ // Test-only helpers
+ // ---------------------------------------------------------------------------
+
+ /** Build an upsert-mode {@link DeltaSinkConf} for the given schema and PK ordinals. */
+ private static DeltaSinkConf upsertConf(StructType schema, int[] pkOrdinals) {
+ Map opts = new HashMap<>();
+ opts.put("write.mode", "upsert");
+ opts.put(
+ "primary_key",
+ Arrays.stream(pkOrdinals).mapToObj(Integer::toString).collect(Collectors.joining(",")));
+ return new DeltaSinkConf(schema, opts);
+ }
+
+ /** Build a {@link DeltaSinkWriter} with the standard test wiring. */
+ private static DeltaSinkWriter newSinkWriter(HadoopTable table, DeltaSinkConf conf) {
+ return new DeltaSinkWriter.Builder()
+ .withJobId("test-job")
+ .withSubtaskId(0)
+ .withAttemptNumber(1)
+ .withDeltaTable(table)
+ .withConf(conf)
+ .withMetricGroup(UnregisteredMetricsGroup.createSinkWriterMetricGroup())
+ .build();
+ }
+
+ /**
+ * Write the given rows into {@code table} via a {@link DeltaWriterTask} and commit them. After
+ * this returns, {@code table}'s snapshot reflects the new AddFile actions.
+ */
+ private static void prePopulate(HadoopTable table, StructType schema, List rows)
+ throws Exception {
+ DeltaSinkConf conf = new DeltaSinkConf(schema, Collections.emptyMap());
+ DeltaWriterTask task =
+ new DeltaWriterTask(
+ "setup-job",
+ /* subtaskId= */ 0,
+ /* attemptNumber= */ 0,
+ table,
+ conf,
+ Collections.emptyMap());
+ for (GenericRowData row : rows) {
+ task.write(row, new TestSinkWriterContext(0, 0));
+ }
+ commitActions(table, task.complete());
+ }
+
+ /** Commit the actions inside {@code results} into {@code table}. */
+ private static void commitResults(HadoopTable table, Collection results) {
+ if (results.isEmpty()) {
+ return;
+ }
+ commitActions(table, results);
+ }
+
+ private static final AtomicLong TXN_COUNTER = new AtomicLong();
+
+ private static void commitActions(HadoopTable table, Collection results) {
+ List actions = new ArrayList<>();
+ for (DeltaWriterResult r : results) {
+ actions.addAll(r.getDeltaActions());
+ }
+ if (actions.isEmpty()) {
+ return;
+ }
+ // Use a unique txnId per commit. Reusing the same (appId, txnId) pair makes Delta treat
+ // the second commit as an idempotent duplicate and silently skip it, which would mask the
+ // logical effect of the merge strategy from the assertions below.
+ table.commit(
+ CloseableIterable.inMemoryIterable(toCloseableIterator(actions.iterator())),
+ "test-app",
+ TXN_COUNTER.incrementAndGet(),
+ Collections.emptyMap());
+ }
+
+ /**
+ * Read all logical rows of the table at {@code tablePath} through the Kernel scan API. Each row
+ * is returned as a {@code List} whose entries are the column values in schema order.
+ *
+ * This reader honors any deletion vectors / logical row filtering that the strategy may have
+ * applied, so the result reflects the table's logical view regardless of whether the merge ran as
+ * Copy-on-Write or Merge-on-Read.
+ */
+ private static List> readAllRows(String tablePath, StructType schema)
+ throws Exception {
+ Engine engine = DefaultEngine.create(new Configuration());
+ Snapshot snapshot = TableManager.loadSnapshot(tablePath).build(engine);
+ Scan scan = snapshot.getScanBuilder().withReadSchema(schema).build();
+ Row scanState = scan.getScanState(engine);
+ StructType physicalReadSchema = ScanStateRow.getPhysicalDataReadSchema(scanState);
+
+ List> rows = new ArrayList<>();
+ try (CloseableIterator scanFileIter = scan.getScanFiles(engine)) {
+ while (scanFileIter.hasNext()) {
+ FilteredColumnarBatch scanFilesBatch = scanFileIter.next();
+ try (CloseableIterator scanFileRows = scanFilesBatch.getRows()) {
+ while (scanFileRows.hasNext()) {
+ Row scanFileRow = scanFileRows.next();
+ FileStatus fileStatus = InternalScanFileUtils.getAddFileStatus(scanFileRow);
+ try (CloseableIterator transformedData =
+ Scan.transformPhysicalData(
+ engine,
+ scanState,
+ scanFileRow,
+ engine
+ .getParquetHandler()
+ .readParquetFiles(
+ singletonCloseableIterator(fileStatus),
+ physicalReadSchema,
+ Optional.empty())
+ .map(res -> res.getData()))) {
+ while (transformedData.hasNext()) {
+ FilteredColumnarBatch batch = transformedData.next();
+ try (CloseableIterator rowIter = batch.getRows()) {
+ while (rowIter.hasNext()) {
+ rows.add(rowToList(rowIter.next(), schema));
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ return rows;
+ }
+
+ private static List rowToList(Row row, StructType schema) {
+ List cells = new ArrayList<>(schema.length());
+ for (int i = 0; i < schema.length(); i++) {
+ cells.add(row.isNullAt(i) ? null : Conversions.DeltaToJava.data(schema, row, i));
+ }
+ return cells;
+ }
+
+ /** Sort {@code rows} ascending by the first column (assumed integer-typed id). */
+ private static List> sortById(List> rows) {
+ rows.sort(Comparator.comparingInt(r -> (Integer) r.get(0)));
+ return rows;
+ }
}
diff --git a/flink/src/test/java/io/delta/flink/sink/mergestrategy/CoWUpsertTest.java b/flink/src/test/java/io/delta/flink/sink/mergestrategy/CoWUpsertTest.java
new file mode 100644
index 00000000000..dc211ab9917
--- /dev/null
+++ b/flink/src/test/java/io/delta/flink/sink/mergestrategy/CoWUpsertTest.java
@@ -0,0 +1,516 @@
+/*
+ * Copyright (2026) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.delta.flink.sink.mergestrategy;
+
+import static io.delta.kernel.internal.util.Utils.toCloseableIterator;
+import static org.junit.jupiter.api.Assertions.*;
+
+import io.delta.flink.TestHelper;
+import io.delta.flink.sink.DeltaSinkConf;
+import io.delta.flink.sink.DeltaWriterResult;
+import io.delta.flink.sink.DeltaWriterTask;
+import io.delta.flink.sink.TestSinkWriterContext;
+import io.delta.flink.table.AbstractKernelTable;
+import io.delta.flink.table.HadoopTable;
+import io.delta.kernel.data.ColumnarBatch;
+import io.delta.kernel.data.MapValue;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.expressions.AlwaysTrue;
+import io.delta.kernel.expressions.Literal;
+import io.delta.kernel.internal.InternalScanFileUtils;
+import io.delta.kernel.internal.actions.AddFile;
+import io.delta.kernel.internal.actions.RemoveFile;
+import io.delta.kernel.internal.actions.SingleAction;
+import io.delta.kernel.types.IntegerType;
+import io.delta.kernel.types.StringType;
+import io.delta.kernel.types.StructType;
+import io.delta.kernel.utils.CloseableIterable;
+import io.delta.kernel.utils.CloseableIterator;
+import java.io.File;
+import java.net.URI;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiPredicate;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Per-file tests for {@link CoWUpsert#deleteRecords}.
+ *
+ * This suite exercises the file-rewrite half of the upsert pipeline in isolation — it supplies a
+ * scan-file row and a row-level filter directly, bypassing {@link
+ * io.delta.flink.sink.MergeStrategy#recordUpsert} / {@link
+ * io.delta.flink.sink.MergeStrategy#recordDelete} / {@link Upsert#merge}. End-to-end tests of the
+ * full merge pipeline (including {@link RowLocator} and the PK-driven filter construction) are out
+ * of scope here.
+ *
+ *
{@link CoWUpsert#deleteRecords} is {@code protected}, so we wrap it in a tiny test-only
+ * subclass ({@link ExposedCoWUpsert}) that exposes the method plus the {@code table} field.
+ */
+class CoWUpsertTest extends TestHelper {
+
+ private static final StructType SCHEMA =
+ new StructType().add("id", IntegerType.INTEGER).add("name", StringType.STRING);
+
+ /** Column ordinal of {@code id} in {@link #SCHEMA}. */
+ private static final int ID_ORDINAL = 0;
+
+ // -------------------------------------------------------------------------
+ // Tests
+ // -------------------------------------------------------------------------
+
+ @Test
+ void testDropSomeRows() {
+ withTempDir(
+ dir -> {
+ HadoopTable table = openTable(dir, SCHEMA, Collections.emptyList());
+ Row scanFileRow = writeAndScan(table, SCHEMA, 10, Collections.emptyMap());
+
+ // Drop rows whose id is even — 5 survivors expected.
+ BiPredicate deleteEvens = idMatches(i -> i % 2 == 0);
+
+ List actions = runDeleteRecords(table, scanFileRow, deleteEvens).toInMemoryList();
+
+ assertEquals(2, actions.size(), "expected RemoveFile + AddFile");
+ assertIsRemoveAction(actions.get(0));
+ assertIsAddAction(actions.get(1));
+
+ AddFile addFile = addFileOf(actions.get(1));
+ Path newFile = dir.toPath().resolve(addFile.getPath()).toAbsolutePath();
+ List survivors = readParquet(newFile, SCHEMA);
+ assertEquals(5, survivors.size());
+ for (Row r : survivors) {
+ assertEquals(1, r.getInt(0) % 2, "even id leaked through filter");
+ }
+
+ // RemoveFile path matches source AddFile path (same relative form).
+ RemoveFile removeFile =
+ new RemoveFile(actions.get(0).getStruct(SingleAction.REMOVE_FILE_ORDINAL));
+ assertEquals(sourceAddFile(scanFileRow).getPath(), removeFile.getPath());
+ });
+ }
+
+ @Test
+ void testDropAllRows() {
+ withTempDir(
+ dir -> {
+ HadoopTable table = openTable(dir, SCHEMA, Collections.emptyList());
+ Row scanFileRow = writeAndScan(table, SCHEMA, 10, Collections.emptyMap());
+
+ List actions =
+ runDeleteRecords(table, scanFileRow, /* drop all */ (b, r) -> true).toInMemoryList();
+
+ // Exactly one RemoveFile is required (the source file must be retired).
+ int removes = 0;
+ int addsWithSurvivors = 0;
+ for (Row action : actions) {
+ if (!action.isNullAt(SingleAction.REMOVE_FILE_ORDINAL)) {
+ removes++;
+ }
+ if (!action.isNullAt(SingleAction.ADD_FILE_ORDINAL)) {
+ // Whether the writer emits an empty-file AddFile is implementation-defined; if it
+ // does, the resulting Parquet file must be row-empty.
+ AddFile addFile = addFileOf(action);
+ Path newFile = dir.toPath().resolve(addFile.getPath()).toAbsolutePath();
+ if (!readParquet(newFile, SCHEMA).isEmpty()) {
+ addsWithSurvivors++;
+ }
+ }
+ }
+ assertEquals(1, removes, "expected exactly one RemoveFile");
+ assertEquals(
+ 0, addsWithSurvivors, "no AddFile should contain any rows when all are deleted");
+ });
+ }
+
+ @Test
+ void testKeepAllRows() {
+ withTempDir(
+ dir -> {
+ HadoopTable table = openTable(dir, SCHEMA, Collections.emptyList());
+ final int numRows = 7;
+ Row scanFileRow = writeAndScan(table, SCHEMA, numRows, Collections.emptyMap());
+
+ List actions =
+ runDeleteRecords(table, scanFileRow, /* keep all */ (b, r) -> false).toInMemoryList();
+
+ assertEquals(2, actions.size(), "expected RemoveFile + AddFile");
+ assertIsRemoveAction(actions.get(0));
+ assertIsAddAction(actions.get(1));
+
+ AddFile addFile = addFileOf(actions.get(1));
+ Path newFile = dir.toPath().resolve(addFile.getPath()).toAbsolutePath();
+ List survivors = readParquet(newFile, SCHEMA);
+ assertEquals(numRows, survivors.size());
+ for (int i = 0; i < numRows; i++) {
+ assertEquals(i, survivors.get(i).getInt(0), "row order changed");
+ }
+ });
+ }
+
+ @Test
+ void testPreservesPartitionValues() {
+ withTempDir(
+ dir -> {
+ StructType partSchema =
+ new StructType().add("id", IntegerType.INTEGER).add("part", StringType.STRING);
+ HadoopTable table = openTable(dir, partSchema, List.of("part"));
+
+ Map partValues = Map.of("part", Literal.ofString("p0"));
+ Row scanFileRow = writeAndScan(table, partSchema, 6, partValues);
+
+ List actions =
+ runDeleteRecords(table, scanFileRow, idMatches(i -> i < 3)).toInMemoryList();
+
+ assertEquals(2, actions.size());
+ AddFile addFile = addFileOf(actions.get(1));
+
+ // Partition values round-trip onto the new AddFile.
+ MapValue pm = addFile.getPartitionValues();
+ assertEquals(1, pm.getSize());
+ assertEquals("part", pm.getKeys().getString(0));
+ assertEquals("p0", pm.getValues().getString(0));
+
+ Path newFile = dir.toPath().resolve(addFile.getPath()).toAbsolutePath();
+ List survivors = readParquet(newFile, partSchema);
+ assertEquals(3, survivors.size());
+ for (Row r : survivors) {
+ assertTrue(r.getInt(0) >= 3);
+ }
+ });
+ }
+
+ @Test
+ void testRewriteCoLocatedWithSource() {
+ withTempDir(
+ dir -> {
+ StructType partSchema =
+ new StructType().add("id", IntegerType.INTEGER).add("part", StringType.STRING);
+ HadoopTable table = openTable(dir, partSchema, List.of("part"));
+
+ Map partValues = Map.of("part", Literal.ofString("p7"));
+ Row scanFileRow = writeAndScan(table, partSchema, 4, partValues);
+
+ List actions =
+ runDeleteRecords(table, scanFileRow, idMatches(i -> i == 0)).toInMemoryList();
+
+ assertEquals(2, actions.size());
+ String sourcePath = sourceAddFile(scanFileRow).getPath();
+ String newPath = addFileOf(actions.get(1)).getPath();
+
+ // The actual co-location guarantee: rewrite shares the source's parent directory.
+ // Note: the on-disk layout for Flink-Delta writes is NOT partition-based — files land
+ // under --/.parquet and the partition is
+ // tracked only in AddFile.partitionValues. So we don't assert anything about a
+ // "part=p7/" prefix in the path; we just check the parent dirs agree.
+ assertEquals(
+ parentDir(sourcePath),
+ parentDir(newPath),
+ "rewrite must land in the same folder as the source");
+ });
+ }
+
+ @Test
+ void testCarriesOverStatsFromSourceAddFileToRemove() {
+ withTempDir(
+ dir -> {
+ HadoopTable table = openTable(dir, SCHEMA, Collections.emptyList());
+ Row scanFileRow = writeAndScan(table, SCHEMA, 10, Collections.emptyMap());
+
+ List actions =
+ runDeleteRecords(table, scanFileRow, /* keep all */ (b, r) -> false).toInMemoryList();
+
+ // The RemoveFile constructed via AddFile.toRemoveFileRow carries over size and path;
+ // verify the metadata wasn't silently dropped during the conversion.
+ AddFile srcAdd = sourceAddFile(scanFileRow);
+ RemoveFile removeFile =
+ new RemoveFile(actions.get(0).getStruct(SingleAction.REMOVE_FILE_ORDINAL));
+
+ // RemoveFile.getSize() is Optional; AddFile.getSize() is a plain long. Compare
+ // the values (the Optional should always be present here because toRemoveFileRow
+ // copies the source's size unconditionally).
+ assertTrue(removeFile.getSize().isPresent(), "RemoveFile.size missing");
+ assertEquals(srcAdd.getSize(), removeFile.getSize().get().longValue());
+ assertEquals(srcAdd.getPath(), removeFile.getPath());
+ });
+ }
+
+ @Test
+ void testFilterReceivesEveryRow() {
+ // Defensive: ensure the filter is invoked for every input row exactly once, in row order.
+ withTempDir(
+ dir -> {
+ HadoopTable table = openTable(dir, SCHEMA, Collections.emptyList());
+ final int numRows = 8;
+ Row scanFileRow = writeAndScan(table, SCHEMA, numRows, Collections.emptyMap());
+
+ List idsSeen = new ArrayList<>();
+ BiPredicate recording =
+ (batch, rowId) -> {
+ idsSeen.add(batch.getColumnVector(ID_ORDINAL).getInt(rowId));
+ return false; // keep all
+ };
+
+ List actions = runDeleteRecords(table, scanFileRow, recording).toInMemoryList();
+ // Force survivor materialization (writeParquet is lazy until the iterator is consumed).
+ AddFile addFile = addFileOf(actions.get(1));
+ readParquet(dir.toPath().resolve(addFile.getPath()).toAbsolutePath(), SCHEMA);
+
+ // 8 ids observed, in ascending order matching how writeAndScan populated the file.
+ assertEquals(numRows, idsSeen.size());
+ for (int i = 0; i < numRows; i++) {
+ assertEquals(Integer.valueOf(i), idsSeen.get(i));
+ }
+ });
+ }
+
+ @Test
+ void testDropByCompositeKey() {
+ // Exercises a multi-column primary-key delete. The filter reads two columns
+ // (org_id, region) and removes the rows whose tuple is in a delete set — the same shape an
+ // Upsert strategy uses when the PK has more than one column.
+ withTempDir(
+ dir -> {
+ StructType schema =
+ new StructType()
+ .add("org_id", IntegerType.INTEGER)
+ .add("region", StringType.STRING)
+ .add("payload", StringType.STRING);
+
+ HadoopTable table = openTable(dir, schema, Collections.emptyList());
+
+ // 6 rows; (org_id, region) is the composite PK.
+ List rows =
+ List.of(
+ GenericRowData.of(1, StringData.fromString("us"), StringData.fromString("p0")),
+ GenericRowData.of(1, StringData.fromString("eu"), StringData.fromString("p1")),
+ GenericRowData.of(2, StringData.fromString("us"), StringData.fromString("p2")),
+ GenericRowData.of(2, StringData.fromString("eu"), StringData.fromString("p3")),
+ GenericRowData.of(3, StringData.fromString("us"), StringData.fromString("p4")),
+ GenericRowData.of(3, StringData.fromString("eu"), StringData.fromString("p5")));
+ Row scanFileRow = writeAndScanRows(table, schema, rows);
+
+ // Delete two specific PKs: (1, "eu") and (3, "us"). 4 survivors expected.
+ Set> deleteKeys = Set.of(List.of(1, "eu"), List.of(3, "us"));
+ BiPredicate filter =
+ (batch, rowId) -> {
+ int orgId = batch.getColumnVector(0).getInt(rowId);
+ String region = batch.getColumnVector(1).getString(rowId);
+ return deleteKeys.contains(List.of(orgId, region));
+ };
+
+ List actions = runDeleteRecords(table, scanFileRow, filter).toInMemoryList();
+
+ assertEquals(2, actions.size(), "expected RemoveFile + AddFile");
+ assertIsRemoveAction(actions.get(0));
+ assertIsAddAction(actions.get(1));
+
+ AddFile addFile = addFileOf(actions.get(1));
+ Path newFile = dir.toPath().resolve(addFile.getPath()).toAbsolutePath();
+ List survivors = readParquet(newFile, schema);
+
+ // 4 survivors, none matching any delete key.
+ assertEquals(4, survivors.size(), "expected 4 surviving rows");
+ for (Row r : survivors) {
+ int orgId = r.getInt(0);
+ String region = r.getString(1);
+ assertFalse(
+ deleteKeys.contains(List.of(orgId, region)),
+ "row (" + orgId + ", " + region + ") should have been deleted");
+ }
+
+ // Order-preserving spot-check by payload: expected survivors in source order are
+ // p0 (1,us), p2 (2,us), p3 (2,eu), p5 (3,eu).
+ assertEquals("p0", survivors.get(0).getString(2));
+ assertEquals("p2", survivors.get(1).getString(2));
+ assertEquals("p3", survivors.get(2).getString(2));
+ assertEquals("p5", survivors.get(3).getString(2));
+ });
+ }
+
+ // -------------------------------------------------------------------------
+ // Helpers
+ // -------------------------------------------------------------------------
+
+ /**
+ * Test-only subclass of {@link CoWUpsert} that exposes the {@code protected} {@code
+ * deleteRecords(...)} method and the inherited {@code table} field. Bypassing {@link
+ * Upsert#merge} lets us hand the strategy a scan-file row and a synthetic filter directly,
+ * isolating the file-rewrite behavior from the locator and PK-bookkeeping concerns.
+ */
+ private static final class ExposedCoWUpsert extends CoWUpsert {
+ ExposedCoWUpsert(AbstractKernelTable backingTable) {
+ this.table = backingTable;
+ }
+
+ CloseableIterator deleteRecordsForTest(
+ Row addFile, BiPredicate filter) {
+ return deleteRecords(addFile, filter);
+ }
+ }
+
+ /** Build a "drop rows where {@code id} matches {@code idTest}" filter. */
+ private static BiPredicate idMatches(
+ java.util.function.IntPredicate idTest) {
+ return (batch, rowId) -> idTest.test(batch.getColumnVector(ID_ORDINAL).getInt(rowId));
+ }
+
+ /** Open a fresh HadoopTable rooted at {@code dir} with the given schema and partition cols. */
+ private HadoopTable openTable(File dir, StructType schema, List partitionCols) {
+ HadoopTable table =
+ new HadoopTable(
+ URI.create(dir.getAbsolutePath()),
+ Collections.emptyMap(),
+ schema,
+ partitionCols.isEmpty() ? Collections.emptyList() : partitionCols);
+ table.open();
+ return table;
+ }
+
+ /**
+ * Write {@code numRows} rows into {@code table} via a {@link DeltaWriterTask}, commit the
+ * resulting AddFile actions, and return the scan-file row for the (single) data file produced.
+ *
+ * Rows are populated as {@code (id=i, name="row-i")} for unpartitioned tables, or {@code
+ * (id=i, part=)} when {@code partitionValues} is non-empty.
+ */
+ private Row writeAndScan(
+ HadoopTable table, StructType schema, int numRows, Map partitionValues) {
+ DeltaSinkConf conf = new DeltaSinkConf(schema, Collections.emptyMap());
+ DeltaWriterTask task =
+ new DeltaWriterTask(
+ "test-job", /* subtaskId= */ 0, /* attemptNumber= */ 0, table, conf, partitionValues);
+
+ String partString =
+ partitionValues.isEmpty()
+ ? null
+ : String.valueOf(partitionValues.values().iterator().next().getValue());
+
+ for (int i = 0; i < numRows; i++) {
+ Object[] cells = new Object[schema.length()];
+ cells[0] = Integer.valueOf(i);
+ cells[1] =
+ partString != null
+ ? StringData.fromString(partString)
+ : StringData.fromString("row-" + i);
+ try {
+ task.write(GenericRowData.of(cells), new TestSinkWriterContext(0, 0));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return commitAndScan(table, task);
+ }
+
+ /**
+ * Variant of {@link #writeAndScan} for callers that have already built the rows themselves —
+ * useful for schemas with more than two columns or non-trivial row payloads.
+ *
+ * The DeltaWriterTask is created with empty partition values (i.e. unpartitioned), so this
+ * helper is only suitable for unpartitioned tables.
+ */
+ private Row writeAndScanRows(HadoopTable table, StructType schema, List rows) {
+ DeltaSinkConf conf = new DeltaSinkConf(schema, Collections.emptyMap());
+ DeltaWriterTask task =
+ new DeltaWriterTask(
+ "test-job", /* subtaskId= */ 0, /* attemptNumber= */ 0, table, conf, Map.of());
+ try {
+ for (GenericRowData row : rows) {
+ task.write(row, new TestSinkWriterContext(0, 0));
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return commitAndScan(table, task);
+ }
+
+ /**
+ * Shared tail of {@link #writeAndScan} and {@link #writeAndScanRows}: finalize the writer task,
+ * commit the produced AddFile actions to the table, and return the (single) scan-file row.
+ */
+ private Row commitAndScan(HadoopTable table, DeltaWriterTask task) {
+ List results;
+ try {
+ results = task.complete();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ List actions = new ArrayList<>();
+ for (DeltaWriterResult r : results) {
+ actions.addAll(r.getDeltaActions());
+ }
+ table.commit(
+ CloseableIterable.inMemoryIterable(toCloseableIterator(actions.iterator())),
+ "test-app",
+ /* txnId= */ 0L,
+ Collections.emptyMap());
+
+ // Materialize scan-file rows so they outlive the scan iterator. We don't wrap in
+ // try-with-resources here: toInMemoryList() closes the iterator internally, and an explicit
+ // double-close triggers an IllegalStateException inside Kernel's ScanImpl close path
+ // (it calls hasNext() on an already-closed underlying iterator).
+ List all = table.scan(AlwaysTrue.ALWAYS_TRUE).toInMemoryList();
+ assertEquals(1, all.size(), "expected exactly one scan file from commitAndScan");
+ return all.get(0);
+ }
+
+ /**
+ * Invoke {@link CoWUpsert#deleteRecords} for the given scan-file row and filter, going through
+ * the test-only subclass that exposes the protected method.
+ */
+ private CloseableIterator runDeleteRecords(
+ HadoopTable table, Row scanFileRow, BiPredicate filter) {
+ return new ExposedCoWUpsert(table).deleteRecordsForTest(scanFileRow, filter);
+ }
+
+ private static AddFile addFileOf(Row singleAction) {
+ return new AddFile(singleAction.getStruct(SingleAction.ADD_FILE_ORDINAL));
+ }
+
+ /** Unwrap the {@code add} struct from a scan-file row, using the public ordinal constant. */
+ private static AddFile sourceAddFile(Row scanFileRow) {
+ return new AddFile(scanFileRow.getStruct(InternalScanFileUtils.ADD_FILE_ORDINAL));
+ }
+
+ private static String parentDir(String relativePath) {
+ int slash = relativePath.lastIndexOf('/');
+ return slash >= 0 ? relativePath.substring(0, slash) : "";
+ }
+
+ private static void assertIsRemoveAction(Row action) {
+ assertFalse(
+ action.isNullAt(SingleAction.REMOVE_FILE_ORDINAL),
+ "expected the action to carry a RemoveFile");
+ assertTrue(
+ action.isNullAt(SingleAction.ADD_FILE_ORDINAL),
+ "did not expect the action to also carry an AddFile");
+ }
+
+ private static void assertIsAddAction(Row action) {
+ assertFalse(
+ action.isNullAt(SingleAction.ADD_FILE_ORDINAL), "expected the action to carry an AddFile");
+ assertTrue(
+ action.isNullAt(SingleAction.REMOVE_FILE_ORDINAL),
+ "did not expect the action to also carry a RemoveFile");
+ }
+}
diff --git a/flink/src/test/java/io/delta/flink/sink/mergestrategy/ScanLocatorTest.java b/flink/src/test/java/io/delta/flink/sink/mergestrategy/ScanLocatorTest.java
new file mode 100644
index 00000000000..1d431d5db9f
--- /dev/null
+++ b/flink/src/test/java/io/delta/flink/sink/mergestrategy/ScanLocatorTest.java
@@ -0,0 +1,257 @@
+/*
+ * Copyright (2026) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.delta.flink.sink.mergestrategy;
+
+import static io.delta.kernel.internal.util.Utils.toCloseableIterator;
+import static org.junit.jupiter.api.Assertions.*;
+
+import io.delta.flink.TestHelper;
+import io.delta.flink.sink.DeltaSinkConf;
+import io.delta.flink.sink.DeltaWriterResult;
+import io.delta.flink.sink.DeltaWriterTask;
+import io.delta.flink.sink.TestSinkWriterContext;
+import io.delta.flink.table.HadoopTable;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.expressions.Literal;
+import io.delta.kernel.internal.InternalScanFileUtils;
+import io.delta.kernel.internal.actions.AddFile;
+import io.delta.kernel.utils.CloseableIterable;
+import java.io.File;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+import org.junit.jupiter.api.Test;
+
+/** JUnit test suite for {@link ScanLocator}. */
+class ScanLocatorTest extends TestHelper {
+
+ private static final io.delta.kernel.types.StructType SCHEMA =
+ new io.delta.kernel.types.StructType()
+ .add("id", io.delta.kernel.types.IntegerType.INTEGER)
+ .add("name", io.delta.kernel.types.StringType.STRING);
+
+ /** PK = single column "id" at ordinal 0. */
+ private static final int[] SINGLE_PK = {0};
+
+ // -------------------------------------------------------------------------
+ // Tests
+ // -------------------------------------------------------------------------
+
+ @Test
+ void testFindReturnsFileWhenPkExists() {
+ withTempDir(
+ dir -> {
+ HadoopTable table = openTable(dir, SCHEMA, Collections.emptyList());
+ writeRows(table, SCHEMA, 10, Collections.emptyMap());
+
+ // Single PK 5 — exists in the file (id range 0..9). Expect the file to be returned.
+ List> pks = List.of(List.of(Literal.ofInt(5)));
+ List files = new ScanLocator().find(table, SINGLE_PK, pks).toInMemoryList();
+
+ assertEquals(1, files.size(), "expected exactly one candidate file");
+ // The row should be a scan-file row carrying an `add` substruct.
+ AddFile addFile =
+ new AddFile(files.get(0).getStruct(InternalScanFileUtils.ADD_FILE_ORDINAL));
+ assertNotNull(addFile.getPath(), "scan-file row should carry a non-null AddFile.path");
+ });
+ }
+
+ @Test
+ void testFindReturnsFileForMultiplePks() {
+ withTempDir(
+ dir -> {
+ HadoopTable table = openTable(dir, SCHEMA, Collections.emptyList());
+ writeRows(table, SCHEMA, 10, Collections.emptyMap());
+
+ // Multiple PKs all exist in the (single) file.
+ List> pks =
+ List.of(
+ List.of(Literal.ofInt(2)), List.of(Literal.ofInt(4)), List.of(Literal.ofInt(7)));
+ List files = new ScanLocator().find(table, SINGLE_PK, pks).toInMemoryList();
+
+ assertEquals(1, files.size(), "expected exactly one candidate file");
+ });
+ }
+
+ @Test
+ void testFindReturnsEmptyForEmptyPks() {
+ // Empty pks list → ExpressionUtils.in returns AlwaysFalse → scan returns no files at all.
+ withTempDir(
+ dir -> {
+ HadoopTable table = openTable(dir, SCHEMA, Collections.emptyList());
+ writeRows(table, SCHEMA, 10, Collections.emptyMap());
+
+ List files =
+ new ScanLocator().find(table, SINGLE_PK, Collections.emptyList()).toInMemoryList();
+
+ assertTrue(files.isEmpty(), "empty pks list should yield no candidate files");
+ });
+ }
+
+ @Test
+ void testFindEmptyTableReturnsEmpty() {
+ withTempDir(
+ dir -> {
+ // Open the table without writing any data.
+ HadoopTable table = openTable(dir, SCHEMA, Collections.emptyList());
+ // We need to create the empty table first by going through the open() path; that's
+ // already done by openTable(). No commits → no scan files.
+
+ List> pks = List.of(List.of(Literal.ofInt(5)));
+ List files = new ScanLocator().find(table, SINGLE_PK, pks).toInMemoryList();
+
+ assertTrue(files.isEmpty(), "no files have been committed; scan should return empty");
+ });
+ }
+
+ @Test
+ void testFindWithCompositeKey() {
+ withTempDir(
+ dir -> {
+ io.delta.kernel.types.StructType compositeSchema =
+ new io.delta.kernel.types.StructType()
+ .add("org_id", io.delta.kernel.types.IntegerType.INTEGER)
+ .add("region", io.delta.kernel.types.StringType.STRING)
+ .add("payload", io.delta.kernel.types.StringType.STRING);
+
+ HadoopTable table = openTable(dir, compositeSchema, Collections.emptyList());
+
+ List rows =
+ List.of(
+ GenericRowData.of(1, StringData.fromString("us"), StringData.fromString("p0")),
+ GenericRowData.of(2, StringData.fromString("eu"), StringData.fromString("p1")),
+ GenericRowData.of(3, StringData.fromString("us"), StringData.fromString("p2")));
+ writeRowData(table, compositeSchema, rows, Collections.emptyMap());
+
+ // (org_id, region) → ordinals (0, 1). Find tuples (1, "us") and (3, "us").
+ int[] compositePk = {0, 1};
+ List> pks =
+ List.of(
+ List.of(Literal.ofInt(1), Literal.ofString("us")),
+ List.of(Literal.ofInt(3), Literal.ofString("us")));
+
+ List files = new ScanLocator().find(table, compositePk, pks).toInMemoryList();
+
+ assertEquals(1, files.size(), "expected the single file containing matching tuples");
+ });
+ }
+
+ @Test
+ void testFindPreservesPkOrdinalOrder() {
+ // ScanLocator maps int[] pkIndices → List column names by indexing the schema's
+ // fieldNames. Verify the order is preserved when the caller passes ordinals in reverse.
+ withTempDir(
+ dir -> {
+ io.delta.kernel.types.StructType compositeSchema =
+ new io.delta.kernel.types.StructType()
+ .add("a", io.delta.kernel.types.IntegerType.INTEGER)
+ .add("b", io.delta.kernel.types.IntegerType.INTEGER);
+ HadoopTable table = openTable(dir, compositeSchema, Collections.emptyList());
+
+ // Write rows (a=1, b=10), (a=2, b=20), (a=3, b=30).
+ List rows =
+ List.of(GenericRowData.of(1, 10), GenericRowData.of(2, 20), GenericRowData.of(3, 30));
+ writeRowData(table, compositeSchema, rows, Collections.emptyMap());
+
+ // Pass ordinals in (b, a) order — the value tuples must be (b-value, a-value).
+ int[] reversedPk = {1, 0};
+ List> pks = List.of(List.of(Literal.ofInt(10), Literal.ofInt(1)));
+
+ List files = new ScanLocator().find(table, reversedPk, pks).toInMemoryList();
+ assertEquals(1, files.size(), "expected the file containing (a=1, b=10)");
+ });
+ }
+
+ // -------------------------------------------------------------------------
+ // Helpers (kept consistent with CoWUpsertTest)
+ // -------------------------------------------------------------------------
+
+ /** Open a HadoopTable at {@code dir} with the given schema/partition cols. */
+ private HadoopTable openTable(
+ File dir, io.delta.kernel.types.StructType schema, List partitionCols) {
+ HadoopTable table =
+ new HadoopTable(
+ URI.create(dir.getAbsolutePath()),
+ Collections.emptyMap(),
+ schema,
+ partitionCols.isEmpty() ? Collections.emptyList() : partitionCols);
+ table.open();
+ return table;
+ }
+
+ /** Write {@code numRows} rows {@code (id=i, name="row-i")} via a DeltaWriterTask and commit. */
+ private void writeRows(
+ HadoopTable table,
+ io.delta.kernel.types.StructType schema,
+ int numRows,
+ Map partitionValues) {
+ DeltaSinkConf conf = new DeltaSinkConf(schema, Collections.emptyMap());
+ DeltaWriterTask task = new DeltaWriterTask("test-job", 0, 0, table, conf, partitionValues);
+ for (int i = 0; i < numRows; i++) {
+ try {
+ task.write(
+ GenericRowData.of(i, StringData.fromString("row-" + i)),
+ new TestSinkWriterContext(0, 0));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ commit(table, task);
+ }
+
+ /**
+ * Variant that takes pre-built rows — for schemas that don't fit the {@code (int, string)} mold.
+ */
+ private void writeRowData(
+ HadoopTable table,
+ io.delta.kernel.types.StructType schema,
+ List rows,
+ Map partitionValues) {
+ DeltaSinkConf conf = new DeltaSinkConf(schema, Collections.emptyMap());
+ DeltaWriterTask task = new DeltaWriterTask("test-job", 0, 0, table, conf, partitionValues);
+ try {
+ for (GenericRowData row : rows) {
+ task.write(row, new TestSinkWriterContext(0, 0));
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ commit(table, task);
+ }
+
+ private void commit(HadoopTable table, DeltaWriterTask task) {
+ List results;
+ try {
+ results = task.complete();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ List actions = new ArrayList<>();
+ for (DeltaWriterResult r : results) {
+ actions.addAll(r.getDeltaActions());
+ }
+ table.commit(
+ CloseableIterable.inMemoryIterable(toCloseableIterator(actions.iterator())),
+ "test-app",
+ /* txnId= */ 0L,
+ Collections.emptyMap());
+ }
+}
diff --git a/flink/src/test/java/io/delta/flink/sink/sql/DeltaDynamicTableSinkTest.java b/flink/src/test/java/io/delta/flink/sink/sql/DeltaDynamicTableSinkTest.java
index db50b7ebcaf..778013b9f71 100644
--- a/flink/src/test/java/io/delta/flink/sink/sql/DeltaDynamicTableSinkTest.java
+++ b/flink/src/test/java/io/delta/flink/sink/sql/DeltaDynamicTableSinkTest.java
@@ -16,16 +16,27 @@
package io.delta.flink.sink.sql;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import io.delta.flink.TestHelper;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.types.RowKind;
import org.junit.jupiter.api.Test;
/** Test suite for {@link DeltaDynamicTableSink}. */
@@ -37,26 +48,7 @@ void testLoadTable() {
dir -> {
Map options = Map.of("connector", "delta", "table_path", dir.getPath());
- CatalogTable table =
- CatalogTable.newBuilder()
- .schema(
- Schema.newBuilder()
- .column("id", DataTypes.BIGINT())
- .column("dt", DataTypes.STRING())
- .build())
- .comment("test table")
- .partitionKeys(List.of())
- .options(options)
- .build();
-
- ResolvedCatalogTable resolvedTable =
- new ResolvedCatalogTable(
- table,
- ResolvedSchema.physical(
- new String[] {"id", "dt"},
- new org.apache.flink.table.types.DataType[] {
- DataTypes.BIGINT(), DataTypes.STRING()
- }));
+ ResolvedCatalogTable resolvedTable = simpleTable(options, /* pk = */ null);
TestDynamicTableSinkContext context = new TestDynamicTableSinkContext(resolvedTable);
@@ -64,6 +56,113 @@ void testLoadTable() {
var sink = factory.createDynamicTableSink(context);
assertTrue(sink instanceof DeltaDynamicTableSink);
+ // Default is append mode → insert-only changelog.
+ assertEquals(
+ ChangelogMode.insertOnly(), sink.getChangelogMode(ChangelogMode.insertOnly()));
});
}
+
+ @Test
+ void testUpsertModeAdvertisesUpsertChangelogMode() {
+ withTempDir(
+ dir -> {
+ Map options = new HashMap<>();
+ options.put("connector", "delta");
+ options.put("table_path", dir.getPath());
+ options.put("write.mode", "upsert");
+
+ ResolvedCatalogTable resolvedTable = simpleTable(options, List.of("id"));
+
+ DynamicTableSink sink =
+ new DeltaDynamicTableSinkFactory()
+ .createDynamicTableSink(new TestDynamicTableSinkContext(resolvedTable));
+
+ ChangelogMode mode = sink.getChangelogMode(ChangelogMode.upsert());
+ assertTrue(mode.contains(RowKind.INSERT), "upsert mode must allow INSERT");
+ assertTrue(mode.contains(RowKind.UPDATE_AFTER), "upsert mode must allow UPDATE_AFTER");
+ assertTrue(mode.contains(RowKind.DELETE), "upsert mode must allow DELETE");
+ // UPDATE_BEFORE is elided by Flink for PK sinks.
+ assertTrue(
+ !mode.contains(RowKind.UPDATE_BEFORE),
+ "upsert mode should not request UPDATE_BEFORE");
+ });
+ }
+
+ @Test
+ void testUpsertModeWithoutPrimaryKeyThrows() {
+ withTempDir(
+ dir -> {
+ Map options = new HashMap<>();
+ options.put("connector", "delta");
+ options.put("table_path", dir.getPath());
+ options.put("write.mode", "upsert");
+
+ ResolvedCatalogTable resolvedTable = simpleTable(options, /* pk = */ null);
+
+ assertThrows(
+ ValidationException.class,
+ () ->
+ new DeltaDynamicTableSinkFactory()
+ .createDynamicTableSink(new TestDynamicTableSinkContext(resolvedTable)));
+ });
+ }
+
+ @Test
+ void testUpsertModeRejectsUnknownPrimaryKeyOption() {
+ withTempDir(
+ dir -> {
+ // The 'primary_key' option is NOT a public option; the factory's `validate()` call
+ // should reject it as an unknown option.
+ Map options = new HashMap<>();
+ options.put("connector", "delta");
+ options.put("table_path", dir.getPath());
+ options.put("write.mode", "upsert");
+ options.put("primary_key", "id");
+
+ ResolvedCatalogTable resolvedTable = simpleTable(options, List.of("id"));
+
+ assertThrows(
+ ValidationException.class,
+ () ->
+ new DeltaDynamicTableSinkFactory()
+ .createDynamicTableSink(new TestDynamicTableSinkContext(resolvedTable)));
+ });
+ }
+
+ /**
+ * Build a {@link ResolvedCatalogTable} with two columns {@code (id BIGINT, dt STRING)} and the
+ * given options + optional primary-key columns.
+ */
+ private static ResolvedCatalogTable simpleTable(Map options, List pk) {
+ Schema.Builder schemaBuilder =
+ Schema.newBuilder()
+ .column("id", DataTypes.BIGINT().notNull())
+ .column("dt", DataTypes.STRING());
+ if (pk != null && !pk.isEmpty()) {
+ schemaBuilder.primaryKey(pk);
+ }
+
+ CatalogTable table =
+ CatalogTable.newBuilder()
+ .schema(schemaBuilder.build())
+ .comment("test table")
+ .partitionKeys(Collections.emptyList())
+ .options(options)
+ .build();
+
+ ResolvedSchema resolvedSchema =
+ pk == null || pk.isEmpty()
+ ? ResolvedSchema.physical(
+ new String[] {"id", "dt"},
+ new org.apache.flink.table.types.DataType[] {
+ DataTypes.BIGINT().notNull(), DataTypes.STRING()
+ })
+ : new ResolvedSchema(
+ Arrays.asList(
+ Column.physical("id", DataTypes.BIGINT().notNull()),
+ Column.physical("dt", DataTypes.STRING())),
+ Collections.emptyList(),
+ UniqueConstraint.primaryKey("pk", pk));
+ return new ResolvedCatalogTable(table, resolvedSchema);
+ }
}