Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 65 additions & 3 deletions flink/src/main/java/io/delta/flink/kernel/ColumnVectorUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,77 @@
import io.delta.kernel.data.ColumnVector;
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.data.FilteredColumnarBatch;
import io.delta.kernel.types.BooleanType;
import io.delta.kernel.types.DataType;
import io.delta.kernel.types.StructType;
import io.delta.kernel.types.*;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Predicate;

/**
* Static helpers for Kernel {@link ColumnVector} / {@link ColumnarBatch} / {@link
* FilteredColumnarBatch}: typed single-cell read ({@link #get}), batch wrapping ({@link #wrap},
* {@link #child}, {@link #notNullAt}), and selection-vector construction ({@link #filter}, {@link
* #notNull}).
*
* <p>The selection-vector convention is {@code true = keep, false = drop} — matches {@code
* FilteredColumnarBatch}.
*/
public class ColumnVectorUtils {

/**
* Returns the value at {@code rowId} from {@code input} as a plain Java object.
*
* <p>Mirrors the type coverage of {@link
* io.delta.flink.sink.Conversions.DeltaToJava#data(StructType, io.delta.kernel.data.Row, int)} —
* only primitive Kernel types are supported. Complex types (struct, array, map) throw {@link
* UnsupportedOperationException}; the caller is expected to descend into them explicitly via
* {@link ColumnVector#getChild(int)}, {@link ColumnVector#getArray(int)}, or {@link
* ColumnVector#getMap(int)}.
*
* @param input column vector to read from
* @param rowId 0-based row index within {@code input}
* @return the value at {@code (input, rowId)}; {@code null} if the cell is null
* @throws UnsupportedOperationException for complex types
*/
public static Object get(ColumnVector input, int rowId) {
if (input.isNullAt(rowId)) {
return null;
}
DataType type = input.getDataType();
if (type.equivalent(BooleanType.BOOLEAN)) {
return input.getBoolean(rowId);
} else if (type.equivalent(ByteType.BYTE)) {
return input.getByte(rowId);
} else if (type.equivalent(ShortType.SHORT)) {
return input.getShort(rowId);
} else if (type.equivalent(IntegerType.INTEGER)) {
return input.getInt(rowId);
} else if (type.equivalent(LongType.LONG)) {
return input.getLong(rowId);
} else if (type.equivalent(FloatType.FLOAT)) {
return input.getFloat(rowId);
} else if (type.equivalent(DoubleType.DOUBLE)) {
return input.getDouble(rowId);
} else if (type.equivalent(StringType.STRING)) {
return input.getString(rowId);
} else if (type.equivalent(BinaryType.BINARY)) {
return input.getBinary(rowId);
} else if (type.equivalent(DateType.DATE)) {
// Days since 1970-01-01
return input.getInt(rowId);
} else if (type.equivalent(TimestampType.TIMESTAMP)) {
// Microseconds since epoch, UTC
return input.getLong(rowId);
} else if (type.equivalent(TimestampNTZType.TIMESTAMP_NTZ)) {
// Microseconds since epoch, no time zone
return input.getLong(rowId);
} else if (type instanceof DecimalType) {
return input.getDecimal(rowId);
} else {
// StructType, ArrayType, MapType — caller must navigate these explicitly.
throw new UnsupportedOperationException("Unsupported column vector type: " + type);
}
}

public static FilteredColumnarBatch wrap(ColumnarBatch data) {
return new FilteredColumnarBatch(data, Optional.empty());
}
Expand Down
78 changes: 78 additions & 0 deletions flink/src/main/java/io/delta/flink/kernel/ExpressionUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright (2026) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.flink.kernel;

import io.delta.kernel.expressions.*;
import java.util.List;
import java.util.stream.Collectors;

/**
* Helpers for building Kernel {@link Predicate} expressions that aren't natively expressible in
* Kernel's expression API. Currently this is just the row-{@code IN} builder used to construct
* partition / primary-key membership filters in the upsert merge path; expand as needed when other
* compound expressions earn their own helper.
*/
public class ExpressionUtils {
/**
* Generate a predicate equivalent to <code>(colNames) IN (values)</code>. Empty {@code values} →
* {@link AlwaysFalse#ALWAYS_FALSE}. Single-column → native {@link In} for engine pushdown.
* Multi-column → left-deep OR-of-ANDs ({@code (c1=v11 AND ...) OR (c1=v21 AND ...) OR ...}),
* since Kernel has no row-IN expression.
*
* @param colNames left-side tuple of column names; must be non-empty
* @param values right-side list of value tuples; each row must have the same arity as {@code
* colNames}
* @throws IllegalArgumentException if {@code colNames} is empty or any tuple arity mismatches
*/
public static Predicate in(List<String> colNames, List<List<Literal>> values) {
if (colNames.isEmpty()) {
throw new IllegalArgumentException("in() requires at least one column");
}
if (values.isEmpty()) {
return AlwaysFalse.ALWAYS_FALSE;
}
for (List<Literal> row : values) {
if (row.size() != colNames.size()) {
throw new IllegalArgumentException(
"Value tuple arity " + row.size() + " does not match column count " + colNames.size());
}
}

// Single-column fast path: use the native IN predicate (the engine can push it down better
// than an OR chain).
if (colNames.size() == 1) {
Column col = new Column(colNames.get(0));
List<Expression> literals =
values.stream().map(row -> (Expression) row.get(0)).collect(Collectors.toList());
return new In(col, literals);
}

// Multi-column: build (c1=v11 AND c2=v12 AND ...) OR (c1=v21 AND c2=v22 AND ...) OR ...
// We accumulate left-deep AND/OR trees rather than building everything as a flat predicate,
// because Kernel's And/Or only support binary children.
Predicate result = null;
for (List<Literal> row : values) {
Predicate rowEq = null;
for (int i = 0; i < colNames.size(); i++) {
Predicate eq = new Predicate("=", new Column(colNames.get(i)), row.get(i));
rowEq = (rowEq == null) ? eq : new And(rowEq, eq);
}
result = (result == null) ? rowEq : new Or(result, rowEq);
}
return result;
}
}
64 changes: 61 additions & 3 deletions flink/src/main/java/io/delta/flink/sink/Conversions.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@

package io.delta.flink.sink;

import io.delta.kernel.data.Row;
import io.delta.kernel.expressions.Literal;
import io.delta.kernel.types.DataType;
import io.delta.kernel.types.StructField;
import io.delta.kernel.types.StructType;
import io.delta.kernel.types.*;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.*;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.MapType;

/**
* Provide conversions between Flink and Delta data structures. This now includes:
Expand Down Expand Up @@ -235,4 +237,60 @@ public static Object data(RowType rowType, RowData rowData, int colIdx) {
}
}
}

/** Convert Delta Kernel data to plain Java objects. */
public static class DeltaToJava {
/**
* Convert the value at column {@code colIdx} of a Kernel {@link Row} to a plain Java object.
*
* <p>Supports primitive types only. Complex types (struct, array, map) throw {@link
* UnsupportedOperationException}.
*
* @param schema row schema
* @param rowData Kernel row
* @param colIdx column ordinal
* @return the column value as a Java object, or {@code null} if the column is null at this row
* @throws UnsupportedOperationException for complex types (struct, array, map)
*/
public static Object data(StructType schema, Row rowData, int colIdx) {
DataType dataType = schema.fields().get(colIdx).getDataType();
if (rowData.isNullAt(colIdx)) {
return null;
}
if (dataType.equivalent(io.delta.kernel.types.BooleanType.BOOLEAN)) {
return rowData.getBoolean(colIdx);
} else if (dataType.equivalent(ByteType.BYTE)) {
return rowData.getByte(colIdx);
} else if (dataType.equivalent(ShortType.SHORT)) {
return rowData.getShort(colIdx);
} else if (dataType.equivalent(IntegerType.INTEGER)) {
return rowData.getInt(colIdx);
} else if (dataType.equivalent(LongType.LONG)) {
return rowData.getLong(colIdx);
} else if (dataType.equivalent(io.delta.kernel.types.FloatType.FLOAT)) {
return rowData.getFloat(colIdx);
} else if (dataType.equivalent(io.delta.kernel.types.DoubleType.DOUBLE)) {
return rowData.getDouble(colIdx);
} else if (dataType.equivalent(StringType.STRING)) {
return rowData.getString(colIdx);
} else if (dataType.equivalent(io.delta.kernel.types.BinaryType.BINARY)) {
return rowData.getBinary(colIdx);
} else if (dataType.equivalent(io.delta.kernel.types.DateType.DATE)) {
// Days since 1970-01-01
return rowData.getInt(colIdx);
} else if (dataType.equivalent(io.delta.kernel.types.TimestampType.TIMESTAMP)) {
// Microseconds since epoch, UTC
return rowData.getLong(colIdx);
} else if (dataType.equivalent(TimestampNTZType.TIMESTAMP_NTZ)) {
// Microseconds since epoch, no time zone
return rowData.getLong(colIdx);
} else if (dataType instanceof io.delta.kernel.types.DecimalType) {
return rowData.getDecimal(colIdx);
} else {
// StructType, ArrayType, MapType — out of scope per spec.
throw new UnsupportedOperationException(
"DeltaToJava.data does not support complex type: " + dataType);
}
}
}
}
32 changes: 32 additions & 0 deletions flink/src/main/java/io/delta/flink/sink/DeltaSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.*;
import org.apache.flink.core.io.SimpleVersionedSerializer;
Expand Down Expand Up @@ -249,6 +250,37 @@ public Builder withConfigurations(Map<String, String> configurations) {
return this;
}

/**
* Sets the sink {@code write.mode}. Use {@link DeltaSinkConf.WriteMode#APPEND} (default) or
* {@link DeltaSinkConf.WriteMode#UPSERT}.
*
* <p>Sink-only — does not propagate to the Delta table's persisted properties.
*/
public Builder withWriteMode(DeltaSinkConf.WriteMode writeMode) {
this.configurations.put(DeltaSinkConf.WRITE_MODE.key(), writeMode.name());
return this;
}

/**
* Sets the sink primary-key columns by 0-based ordinal in the Flink {@code RowType} (required
* for {@code write.mode = "upsert"}).
*
* <p>Callers are responsible for resolving column names to ordinals against the schema they
* pass via {@link #withFlinkSchema(RowType)}. The Flink SQL path performs this resolution in
* {@link io.delta.flink.sink.sql.DeltaDynamicTableSink}; DataStream API callers can resolve
* with {@code flinkSchema.getFieldIndex(columnName)}.
*
* <p>Sink-only — does not propagate to the Delta table's persisted properties.
*/
public Builder withPrimaryKey(List<Integer> primaryKeyOrdinals) {
if (primaryKeyOrdinals != null && !primaryKeyOrdinals.isEmpty()) {
this.configurations.put(
DeltaSinkConf.PRIMARY_KEY.key(),
primaryKeyOrdinals.stream().map(String::valueOf).collect(Collectors.joining(",")));
}
return this;
}

public DeltaSink build() {
Objects.requireNonNull(flinkSchema, "flinkSchema must not be null");
if (configurations == null) {
Expand Down
Loading
Loading