Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import scala.jdk.javaapi.CollectionConverters;

/** DataSource V2 Table implementation for Delta Lake using the Delta Kernel API. */
public class SparkTable implements Table, SupportsRead, SupportsWrite, SupportsMetadataColumns {
Expand Down Expand Up @@ -263,24 +264,38 @@ public Set<TableCapability> capabilities() {
}

/**
* Exposes row-tracking metadata via a single DSv2 metadata struct column.
* Exposes file-level and row-tracking metadata via a single DSv2 metadata struct column.
*
* <p>This always returns one metadata column named {@code _metadata}. When row tracking is
* enabled, the struct contains fields {@code row_id} and {@code row_commit_version}. When row
* tracking is disabled, those fields are omitted from the struct.
* <p>This always returns one metadata column named {@code _metadata}. The struct contains Spark
* file-source base metadata fields with the same names and types as {@link
* org.apache.spark.sql.execution.datasources.FileFormat#BASE_METADATA_FIELDS} ({@code file_path},
* {@code file_name}, {@code file_size}, {@code file_block_start}, {@code file_block_length},
* {@code file_modification_time}); when row tracking is enabled, {@code row_id} and {@code
* row_commit_version} are appended. Values follow {@link
* org.apache.spark.sql.execution.datasources.PartitionedFile} semantics (see {@code
* FileFormat.BASE_METADATA_EXTRACTORS}).
*
* <p>Field order mirrors Spark's canonical {@code BASE_METADATA_FIELDS} for parity with V1 Delta,
* but resolution is name-based — order is not load-bearing for correctness.
*/
@Override
public MetadataColumn[] metadataColumns() {
SnapshotImpl snapshotImpl = (SnapshotImpl) initialSnapshot;
boolean rowTrackingEnabled =
RowTracking.isEnabled(snapshotImpl.getProtocol(), snapshotImpl.getMetadata());

final StructType metadataType =
rowTrackingEnabled
? new StructType()
.add(ROW_ID_METADATA_FIELD_NAME, DataTypes.LongType, false)
.add(ROW_COMMIT_VERSION_METADATA_FIELD_NAME, DataTypes.LongType, false)
: new StructType();
StructType metadataType = new StructType();
for (StructField field :
CollectionConverters.asJava(FileFormat$.MODULE$.BASE_METADATA_FIELDS())) {
metadataType = metadataType.add(field);
}
if (rowTrackingEnabled) {
metadataType =
metadataType
.add(ROW_ID_METADATA_FIELD_NAME, DataTypes.LongType, false)
.add(ROW_COMMIT_VERSION_METADATA_FIELD_NAME, DataTypes.LongType, false);
}
final StructType finalMetadataType = metadataType;

MetadataColumn[] columns = new MetadataColumn[1];
columns[0] =
Expand All @@ -292,7 +307,7 @@ public String name() {

@Override
public DataType dataType() {
return metadataType;
return finalMetadataType;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (2026) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.spark.internal.v2.read.metadata;

import java.io.Serializable;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;

/**
* File-scoped resolver produced by {@link MetadataValueSetterBuilder#buildWithFile}. Writes one
* {@code _metadata} struct ordinal per row.
*/
public interface BoundMetadataValueSetter extends Serializable {

/**
* Writes this field's value at {@code ordinal} of the caller-owned {@code metadataRow}. May read
* from {@code innerRow} (the raw row from the inner Parquet reader, including any row-tracking
* helper columns), or simply write a value captured at file-bind time.
*/
void setValue(GenericInternalRow metadataRow, int ordinal, InternalRow innerRow);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright (2026) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.spark.internal.v2.read.metadata;

import java.io.Serializable;
import java.util.Objects;
import org.apache.spark.sql.catalyst.expressions.Literal;
import org.apache.spark.sql.catalyst.expressions.Literal$;
import org.apache.spark.sql.execution.datasources.PartitionedFile;
import scala.Function1;

/**
* Builder for a {@code _metadata} field whose value is a per-file constant. Wraps a {@code
* PartitionedFile -> Any} extractor sourced from {@code
* DeltaParquetFileFormat#fileConstantMetadataExtractors} (which augments Spark's base map with
* Delta-specific extractors like {@code base_row_id} and {@code default_row_commit_version}).
*
* <p>{@link #buildWithFile} reads the value once; the returned bound setter writes the same value
* for every row read from the file.
*/
public final class FileConstantValueSetterBuilder implements MetadataValueSetterBuilder {

private final Function1<PartitionedFile, Object> extractor;

public FileConstantValueSetterBuilder(Function1<PartitionedFile, Object> extractor) {
this.extractor = Objects.requireNonNull(extractor);
}

@Override
public BoundMetadataValueSetter buildWithFile(PartitionedFile file) {
// Mirror Spark's FileFormat#updateMetadataInternalRow: wrap the raw extractor value in a
// Catalyst Literal so primitives (e.g. String -> UTF8String) get converted to their internal
// representations once, here, rather than on every row.
Literal literal = Literal$.MODULE$.apply(extractor.apply(file));
return new ConstantBoundSetter(literal.value());
}

/**
* Bound setter that always writes the captured Catalyst value (or null). {@code null} is
* preserved by writing it through {@link
* org.apache.spark.sql.catalyst.expressions.GenericInternalRow#update}, which is equivalent to
* {@code setNullAt} for the read-side use case where downstream reads use type-specific accessors
* against a nullable struct field.
*/
private static final class ConstantBoundSetter implements BoundMetadataValueSetter, Serializable {
private final Object value;

ConstantBoundSetter(Object value) {
this.value = value;
}

@Override
public void setValue(
org.apache.spark.sql.catalyst.expressions.GenericInternalRow metadataRow,
int ordinal,
org.apache.spark.sql.catalyst.InternalRow innerRow) {
if (value == null) {
metadataRow.setNullAt(ordinal);
} else {
metadataRow.update(ordinal, value);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright (2026) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.spark.internal.v2.read.metadata;

import io.delta.spark.internal.v2.utils.CloseableIterator;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Objects;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.ProjectingInternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.catalyst.expressions.JoinedRow;
import org.apache.spark.sql.execution.datasources.PartitionedFile;
import scala.Function1;
import scala.collection.Iterator;
import scala.runtime.AbstractFunction1;

/**
* Read-function decorator that materialises the DSv2 {@code _metadata} struct column for each row
* read from a {@link PartitionedFile}.
*
* <p>One {@link MetadataValueSetterBuilder} per requested {@code _metadata} subfield is held by the
* supplied {@link MetadataStructSchemaContext}. {@link #apply} binds each setter to the current
* file and runs them in pruned-struct order against the inner reader's row, writing into a
* caller-owned {@link GenericInternalRow}. The same row is reused across rows of the same file —
* setters write their slots in place, so there is no per-row {@code Object[]} copy.
*
* <p>Output row layout: {@code data columns + _metadata + partition columns}. Partition columns are
* projected out of the inner row using the ordinals tracked by {@link MetadataStructSchemaContext};
* the parquet read schema (with row-tracking helper columns when applicable) is also produced by
* the same context.
*/
public class MetadataStructReadFunction
extends AbstractFunction1<PartitionedFile, Iterator<InternalRow>> implements Serializable {

private final Function1<PartitionedFile, Iterator<InternalRow>> baseReadFunc;
private final MetadataStructSchemaContext metadataContext;

private MetadataStructReadFunction(
Function1<PartitionedFile, Iterator<InternalRow>> baseReadFunc,
MetadataStructSchemaContext metadataContext) {
this.baseReadFunc = Objects.requireNonNull(baseReadFunc);
this.metadataContext = Objects.requireNonNull(metadataContext);
}

/** Wraps a base reader to materialise {@code _metadata}. */
public static MetadataStructReadFunction wrap(
Function1<PartitionedFile, Iterator<InternalRow>> baseReadFunc,
MetadataStructSchemaContext metadataContext) {
return new MetadataStructReadFunction(baseReadFunc, metadataContext);
}

@Override
public Iterator<InternalRow> apply(PartitionedFile file) {
BoundMetadataValueSetter[] bound =
Arrays.stream(metadataContext.getValueSetterBuilders())
.map(builder -> builder.buildWithFile(file))
.toArray(BoundMetadataValueSetter[]::new);

GenericInternalRow metadataRow = new GenericInternalRow(bound.length);
GenericInternalRow metadataStruct = new GenericInternalRow(1);
metadataStruct.update(0, metadataRow);

ProjectingInternalRow dataProjection =
ProjectingInternalRow.apply(
metadataContext.getDataSchema(), metadataContext.getDataColumnsOrdinals());
ProjectingInternalRow partitionProjection =
ProjectingInternalRow.apply(
metadataContext.getPartitionSchema(), metadataContext.getPartitionColumnsOrdinals());
JoinedRow joinedDataAndMetadata = new JoinedRow();
JoinedRow joinedWithPartitions = new JoinedRow();
boolean hasPartitionColumns = metadataContext.hasPartitionColumns();

Iterator<InternalRow> baseIterator = baseReadFunc.apply(file);

return CloseableIterator.wrap(baseIterator)
.mapCloseable(
row -> {
for (int i = 0; i < bound.length; i++) {
bound[i].setValue(metadataRow, i, row);
}
dataProjection.project(row);
InternalRow withMetadata =
(InternalRow) joinedDataAndMetadata.apply(dataProjection, metadataStruct);
if (hasPartitionColumns) {
partitionProjection.project(row);
return (InternalRow) joinedWithPartitions.apply(withMetadata, partitionProjection);
}
return withMetadata;
});
}
}
Loading