diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/catalog/DeltaV2Table.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/catalog/DeltaV2Table.java index f6188d41a85..385f96f9093 100644 --- a/spark/v2/src/main/java/io/delta/spark/internal/v2/catalog/DeltaV2Table.java +++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/catalog/DeltaV2Table.java @@ -57,6 +57,7 @@ import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import scala.jdk.javaapi.CollectionConverters; /** DataSource V2 Table implementation for Delta Lake using the Delta Kernel API. */ public class DeltaV2Table implements Table, SupportsRead, SupportsWrite, SupportsMetadataColumns { @@ -313,11 +314,19 @@ public Set capabilities() { } /** - * Exposes row-tracking metadata via a single DSv2 metadata struct column. + * Exposes file-level and row-tracking metadata via a single DSv2 metadata struct column. * - *

This always returns one metadata column named {@code _metadata}. When row tracking is - * enabled, the struct contains fields {@code row_id} and {@code row_commit_version}. When row - * tracking is disabled, those fields are omitted from the struct. + *

This always returns one metadata column named {@code _metadata}. The struct contains Spark + * file-source base metadata fields with the same names and types as {@link + * org.apache.spark.sql.execution.datasources.FileFormat#BASE_METADATA_FIELDS} ({@code file_path}, + * {@code file_name}, {@code file_size}, {@code file_block_start}, {@code file_block_length}, + * {@code file_modification_time}); when row tracking is enabled, {@code row_id} and {@code + * row_commit_version} are appended. Values follow {@link + * org.apache.spark.sql.execution.datasources.PartitionedFile} semantics (see {@code + * FileFormat.BASE_METADATA_EXTRACTORS}). + * + *

Field order mirrors Spark's canonical {@code BASE_METADATA_FIELDS} for parity with V1 Delta, + * but resolution is name-based — order is not load-bearing for correctness. */ @Override public MetadataColumn[] metadataColumns() { @@ -325,12 +334,18 @@ public MetadataColumn[] metadataColumns() { boolean rowTrackingEnabled = RowTracking.isEnabled(snapshotImpl.getProtocol(), snapshotImpl.getMetadata()); - final StructType metadataType = - rowTrackingEnabled - ? new StructType() - .add(ROW_ID_METADATA_FIELD_NAME, DataTypes.LongType, false) - .add(ROW_COMMIT_VERSION_METADATA_FIELD_NAME, DataTypes.LongType, false) - : new StructType(); + StructType metadataType = new StructType(); + for (StructField field : + CollectionConverters.asJava(FileFormat$.MODULE$.BASE_METADATA_FIELDS())) { + metadataType = metadataType.add(field); + } + if (rowTrackingEnabled) { + metadataType = + metadataType + .add(ROW_ID_METADATA_FIELD_NAME, DataTypes.LongType, false) + .add(ROW_COMMIT_VERSION_METADATA_FIELD_NAME, DataTypes.LongType, false); + } + final StructType finalMetadataType = metadataType; MetadataColumn[] columns = new MetadataColumn[1]; columns[0] = @@ -342,7 +357,7 @@ public String name() { @Override public DataType dataType() { - return metadataType; + return finalMetadataType; } @Override diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/read/metadata/BoundMetadataValueSetter.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/read/metadata/BoundMetadataValueSetter.java new file mode 100644 index 00000000000..733c46069f1 --- /dev/null +++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/read/metadata/BoundMetadataValueSetter.java @@ -0,0 +1,34 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.spark.internal.v2.read.metadata; + +import java.io.Serializable; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; + +/** + * File-scoped resolver produced by {@link MetadataValueSetterBuilder#buildWithFile}. Writes one + * {@code _metadata} struct ordinal per row. + */ +public interface BoundMetadataValueSetter extends Serializable { + + /** + * Writes this field's value at {@code ordinal} of the caller-owned {@code metadataRow}. May read + * from {@code innerRow} (the raw row from the inner Parquet reader, including any row-tracking + * helper columns), or simply write a value captured at file-bind time. + */ + void setValue(GenericInternalRow metadataRow, int ordinal, InternalRow innerRow); +} diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/read/metadata/FileConstantValueSetterBuilder.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/read/metadata/FileConstantValueSetterBuilder.java new file mode 100644 index 00000000000..68e3d4084d2 --- /dev/null +++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/read/metadata/FileConstantValueSetterBuilder.java @@ -0,0 +1,77 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.spark.internal.v2.read.metadata; + +import java.io.Serializable; +import java.util.Objects; +import org.apache.spark.sql.catalyst.expressions.Literal; +import org.apache.spark.sql.catalyst.expressions.Literal$; +import org.apache.spark.sql.execution.datasources.PartitionedFile; +import scala.Function1; + +/** + * Builder for a {@code _metadata} field whose value is a per-file constant. Wraps a {@code + * PartitionedFile -> Any} extractor sourced from {@code + * DeltaParquetFileFormat#fileConstantMetadataExtractors} (which augments Spark's base map with + * Delta-specific extractors like {@code base_row_id} and {@code default_row_commit_version}). + * + *

{@link #buildWithFile} reads the value once; the returned bound setter writes the same value + * for every row read from the file. + */ +public final class FileConstantValueSetterBuilder implements MetadataValueSetterBuilder { + + private final Function1 extractor; + + public FileConstantValueSetterBuilder(Function1 extractor) { + this.extractor = Objects.requireNonNull(extractor); + } + + @Override + public BoundMetadataValueSetter buildWithFile(PartitionedFile file) { + // Mirror Spark's FileFormat#updateMetadataInternalRow: wrap the raw extractor value in a + // Catalyst Literal so primitives (e.g. String -> UTF8String) get converted to their internal + // representations once, here, rather than on every row. + Literal literal = Literal$.MODULE$.apply(extractor.apply(file)); + return new ConstantBoundSetter(literal.value()); + } + + /** + * Bound setter that always writes the captured Catalyst value (or null). {@code null} is + * preserved by writing it through {@link + * org.apache.spark.sql.catalyst.expressions.GenericInternalRow#update}, which is equivalent to + * {@code setNullAt} for the read-side use case where downstream reads use type-specific accessors + * against a nullable struct field. + */ + private static final class ConstantBoundSetter implements BoundMetadataValueSetter, Serializable { + private final Object value; + + ConstantBoundSetter(Object value) { + this.value = value; + } + + @Override + public void setValue( + org.apache.spark.sql.catalyst.expressions.GenericInternalRow metadataRow, + int ordinal, + org.apache.spark.sql.catalyst.InternalRow innerRow) { + if (value == null) { + metadataRow.setNullAt(ordinal); + } else { + metadataRow.update(ordinal, value); + } + } + } +} diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/read/metadata/MetadataStructReadFunction.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/read/metadata/MetadataStructReadFunction.java new file mode 100644 index 00000000000..bc868abda25 --- /dev/null +++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/read/metadata/MetadataStructReadFunction.java @@ -0,0 +1,105 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.spark.internal.v2.read.metadata; + +import io.delta.spark.internal.v2.utils.CloseableIterator; +import java.io.Serializable; +import java.util.Arrays; +import java.util.Objects; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.ProjectingInternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.catalyst.expressions.JoinedRow; +import org.apache.spark.sql.execution.datasources.PartitionedFile; +import scala.Function1; +import scala.collection.Iterator; +import scala.runtime.AbstractFunction1; + +/** + * Read-function decorator that materialises the DSv2 {@code _metadata} struct column for each row + * read from a {@link PartitionedFile}. + * + *

One {@link MetadataValueSetterBuilder} per requested {@code _metadata} subfield is held by the + * supplied {@link MetadataStructSchemaContext}. {@link #apply} binds each setter to the current + * file and runs them in pruned-struct order against the inner reader's row, writing into a + * caller-owned {@link GenericInternalRow}. The same row is reused across rows of the same file — + * setters write their slots in place, so there is no per-row {@code Object[]} copy. + * + *

Output row layout: {@code data columns + _metadata + partition columns}. Partition columns are + * projected out of the inner row using the ordinals tracked by {@link MetadataStructSchemaContext}; + * the parquet read schema (with row-tracking helper columns when applicable) is also produced by + * the same context. + */ +public class MetadataStructReadFunction + extends AbstractFunction1> implements Serializable { + + private final Function1> baseReadFunc; + private final MetadataStructSchemaContext metadataContext; + + private MetadataStructReadFunction( + Function1> baseReadFunc, + MetadataStructSchemaContext metadataContext) { + this.baseReadFunc = Objects.requireNonNull(baseReadFunc); + this.metadataContext = Objects.requireNonNull(metadataContext); + } + + /** Wraps a base reader to materialise {@code _metadata}. */ + public static MetadataStructReadFunction wrap( + Function1> baseReadFunc, + MetadataStructSchemaContext metadataContext) { + return new MetadataStructReadFunction(baseReadFunc, metadataContext); + } + + @Override + public Iterator apply(PartitionedFile file) { + BoundMetadataValueSetter[] bound = + Arrays.stream(metadataContext.getValueSetterBuilders()) + .map(builder -> builder.buildWithFile(file)) + .toArray(BoundMetadataValueSetter[]::new); + + GenericInternalRow metadataRow = new GenericInternalRow(bound.length); + GenericInternalRow metadataStruct = new GenericInternalRow(1); + metadataStruct.update(0, metadataRow); + + ProjectingInternalRow dataProjection = + ProjectingInternalRow.apply( + metadataContext.getDataSchema(), metadataContext.getDataColumnsOrdinals()); + ProjectingInternalRow partitionProjection = + ProjectingInternalRow.apply( + metadataContext.getPartitionSchema(), metadataContext.getPartitionColumnsOrdinals()); + JoinedRow joinedDataAndMetadata = new JoinedRow(); + JoinedRow joinedWithPartitions = new JoinedRow(); + boolean hasPartitionColumns = metadataContext.hasPartitionColumns(); + + Iterator baseIterator = baseReadFunc.apply(file); + + return CloseableIterator.wrap(baseIterator) + .mapCloseable( + row -> { + for (int i = 0; i < bound.length; i++) { + bound[i].setValue(metadataRow, i, row); + } + dataProjection.project(row); + InternalRow withMetadata = + (InternalRow) joinedDataAndMetadata.apply(dataProjection, metadataStruct); + if (hasPartitionColumns) { + partitionProjection.project(row); + return (InternalRow) joinedWithPartitions.apply(withMetadata, partitionProjection); + } + return withMetadata; + }); + } +} diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/read/metadata/MetadataStructSchemaContext.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/read/metadata/MetadataStructSchemaContext.java new file mode 100644 index 00000000000..414e4cd5a8f --- /dev/null +++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/read/metadata/MetadataStructSchemaContext.java @@ -0,0 +1,281 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.spark.internal.v2.read.metadata; + +import io.delta.kernel.internal.actions.Metadata; +import io.delta.kernel.internal.rowtracking.MaterializedRowTrackingColumn; +import java.io.Serializable; +import java.util.Arrays; +import java.util.Optional; +import org.apache.spark.sql.delta.DeltaParquetFileFormatBase; +import org.apache.spark.sql.delta.RowCommitVersion$; +import org.apache.spark.sql.delta.RowId$; +import org.apache.spark.sql.execution.datasources.FileFormat$; +import org.apache.spark.sql.execution.datasources.PartitionedFile; +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import scala.Function1; +import scala.collection.immutable.Map; +import scala.collection.immutable.Seq; + +/** + * Schema context for the DSv2 {@code _metadata} struct column. + * + *

Single owner of everything needed to materialise the pruned {@code _metadata} struct at read + * time: + * + *

+ * + *

Constructed via the static {@link #forSchema} factory which returns {@code Optional.empty()} + * when the requested read schema does not contain a top-level {@code _metadata} struct — the caller + * therefore avoids paying any context-construction cost when the scan does not request {@code + * _metadata} at all. + * + *

Row tracking is modelled as two specialised setters ({@link RowIdValueSetterBuilder}, {@link + * RowCommitVersionValueSetterBuilder}) registered alongside generic {@link + * FileConstantValueSetterBuilder}s for Spark's file-source base fields and any Delta-specific + * extractors exposed by {@link DeltaParquetFileFormatBase#fileConstantMetadataExtractors}. There is + * no separate row-tracking schema/resolver class — the row-tracking concern lives entirely inside + * its setter implementations. + */ +public class MetadataStructSchemaContext implements Serializable { + + private static final String METADATA_COLUMN_NAME = FileFormat$.MODULE$.METADATA_NAME(); + private static final String ROW_ID_FIELD_NAME = RowId$.MODULE$.ROW_ID(); + private static final String ROW_COMMIT_VERSION_FIELD_NAME = + RowCommitVersion$.MODULE$.METADATA_STRUCT_FIELD_NAME(); + + private final StructType prunedMetadataStruct; + private final StructType parquetReadSchema; + private final StructType dataSchema; + private final StructType partitionSchema; + private final Seq dataColumnsOrdinals; + private final Seq partitionColumnsOrdinals; + private final MetadataValueSetterBuilder[] valueSetterBuilders; + + /** + * Builds a context iff {@code readDataSchema} contains a top-level {@code _metadata} struct. + * + * @param readDataSchema the requested read schema, including {@code _metadata} when requested + * @param partitionSchema the partition columns appended by Spark after the inner read row + * @param deltaFormat the Delta Parquet format whose {@link + * DeltaParquetFileFormatBase#fileConstantMetadataExtractors} drives the file-constant setters + * @param tableMetadata Kernel table metadata used to resolve the materialised row-tracking + * helper-column physical names when row tracking fields are requested + * @return a present {@code MetadataStructSchemaContext} when {@code _metadata} is requested, + * otherwise {@code Optional.empty()} + */ + public static Optional forSchema( + StructType readDataSchema, + StructType partitionSchema, + DeltaParquetFileFormatBase deltaFormat, + Metadata tableMetadata) { + return forSchemaWithExtractors( + readDataSchema, + partitionSchema, + deltaFormat.fileConstantMetadataExtractors(), + tableMetadata); + } + + /** + * Internal factory variant that takes the file-constant extractors map directly. Visible for + * testing — production code should always use {@link #forSchema} which sources the map from + * {@link DeltaParquetFileFormatBase#fileConstantMetadataExtractors}. + */ + static Optional forSchemaWithExtractors( + StructType readDataSchema, + StructType partitionSchema, + Map> extractors, + Metadata tableMetadata) { + StructField metadataColumn = + Arrays.stream(readDataSchema.fields()) + .filter(field -> METADATA_COLUMN_NAME.equals(field.name())) + .findFirst() + .orElse(null); + if (metadataColumn == null || !(metadataColumn.dataType() instanceof StructType)) { + return Optional.empty(); + } + StructType prunedMetadata = (StructType) metadataColumn.dataType(); + StructType baseSchema = + new StructType( + Arrays.stream(readDataSchema.fields()) + .filter(f -> !METADATA_COLUMN_NAME.equals(f.name())) + .toArray(StructField[]::new)); + return Optional.of( + new MetadataStructSchemaContext( + prunedMetadata, baseSchema, partitionSchema, extractors, tableMetadata)); + } + + private MetadataStructSchemaContext( + StructType prunedMetadata, + StructType baseSchema, + StructType partitionSchema, + Map> extractors, + Metadata tableMetadata) { + this.prunedMetadataStruct = prunedMetadata; + this.dataSchema = baseSchema; + this.partitionSchema = partitionSchema; + + boolean rowIdRequested = containsField(prunedMetadata, ROW_ID_FIELD_NAME); + boolean rowCommitVersionRequested = + containsField(prunedMetadata, ROW_COMMIT_VERSION_FIELD_NAME); + + StructType augmented = baseSchema; + int materializedRowIdIdx = -1; + int rowIndexIdx = -1; + int materializedRowCommitVersionIdx = -1; + + if (rowIdRequested) { + String rowIdColumnName = + MaterializedRowTrackingColumn.MATERIALIZED_ROW_ID.getPhysicalColumnName( + tableMetadata.getConfiguration()); + augmented = augmented.add(rowIdColumnName, DataTypes.LongType, true); + materializedRowIdIdx = augmented.fields().length - 1; + + augmented = + augmented.add( + ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME(), DataTypes.LongType, true); + rowIndexIdx = augmented.fields().length - 1; + } + if (rowCommitVersionRequested) { + String rowCommitVersionColumnName = + MaterializedRowTrackingColumn.MATERIALIZED_ROW_COMMIT_VERSION.getPhysicalColumnName( + tableMetadata.getConfiguration()); + augmented = augmented.add(rowCommitVersionColumnName, DataTypes.LongType, true); + materializedRowCommitVersionIdx = augmented.fields().length - 1; + } + this.parquetReadSchema = augmented; + + int dataLen = baseSchema.fields().length; + int partitionStart = augmented.fields().length; + this.dataColumnsOrdinals = buildRangeOrdinals(0, dataLen); + this.partitionColumnsOrdinals = + buildRangeOrdinals(partitionStart, partitionStart + partitionSchema.fields().length); + + this.valueSetterBuilders = + buildValueSetters( + prunedMetadata, + extractors, + materializedRowIdIdx, + rowIndexIdx, + materializedRowCommitVersionIdx); + } + + /** + * Builds the per-field setter array. Each pruned-struct field maps to either a {@link + * FileConstantValueSetterBuilder} (driven by {@code + * DeltaParquetFileFormatBase#fileConstantMetadataExtractors}) or one of the row-tracking setters. + * Throws {@link IllegalStateException} for any unsupported subfield name — fail-fast at context + * construction rather than per-row. + */ + private static MetadataValueSetterBuilder[] buildValueSetters( + StructType prunedMetadata, + Map> extractors, + int materializedRowIdIdx, + int rowIndexIdx, + int materializedRowCommitVersionIdx) { + StructField[] fields = prunedMetadata.fields(); + MetadataValueSetterBuilder[] setters = new MetadataValueSetterBuilder[fields.length]; + for (int i = 0; i < fields.length; i++) { + String name = fields[i].name(); + if (ROW_ID_FIELD_NAME.equals(name)) { + setters[i] = new RowIdValueSetterBuilder(materializedRowIdIdx, rowIndexIdx); + } else if (ROW_COMMIT_VERSION_FIELD_NAME.equals(name)) { + setters[i] = new RowCommitVersionValueSetterBuilder(materializedRowCommitVersionIdx); + } else if (extractors.contains(name)) { + setters[i] = new FileConstantValueSetterBuilder(extractors.apply(name)); + } else { + throw new IllegalStateException( + "Unsupported _metadata subfield '" + + name + + "': expected a file-source base metadata field exposed by " + + "DeltaParquetFileFormatBase#fileConstantMetadataExtractors or a row-tracking field " + + "(row_id, row_commit_version)."); + } + } + return setters; + } + + /** Pruned {@code _metadata} struct type. Field order matches setter / output order. */ + public StructType getPrunedMetadataStruct() { + return prunedMetadataStruct; + } + + /** + * Schema fed to the inner Parquet reader: data columns plus row-tracking helper columns when + * {@code row_id} / {@code row_commit_version} are requested. The DSv2 read pipeline may further + * augment this with a deletion-vector column outside the metadata context. + */ + public StructType getParquetReadSchema() { + return parquetReadSchema; + } + + /** + * Output type of the data-columns projection used to compose {@code data + _metadata + + * partition}. + */ + public StructType getDataSchema() { + return dataSchema; + } + + public Seq getDataColumnsOrdinals() { + return dataColumnsOrdinals; + } + + public StructType getPartitionSchema() { + return partitionSchema; + } + + public Seq getPartitionColumnsOrdinals() { + return partitionColumnsOrdinals; + } + + public boolean hasPartitionColumns() { + return partitionSchema.fields().length > 0; + } + + /** + * Ordered setter array, one per pruned-struct field, used by {@link MetadataStructReadFunction}. + */ + public MetadataValueSetterBuilder[] getValueSetterBuilders() { + return valueSetterBuilders; + } + + private static boolean containsField(StructType struct, String name) { + return Arrays.stream(struct.fields()).anyMatch(field -> name.equals(field.name())); + } + + private static Seq buildRangeOrdinals(int startInclusive, int endExclusive) { + int len = Math.max(0, endExclusive - startInclusive); + int[] ordinals = new int[len]; + for (int i = 0; i < len; i++) { + ordinals[i] = startInclusive + i; + } + return scala.Predef.wrapIntArray(ordinals).toList(); + } +} diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/read/metadata/MetadataValueSetterBuilder.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/read/metadata/MetadataValueSetterBuilder.java new file mode 100644 index 00000000000..37ca5a11e4d --- /dev/null +++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/read/metadata/MetadataValueSetterBuilder.java @@ -0,0 +1,51 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.spark.internal.v2.read.metadata; + +import java.io.Serializable; +import org.apache.spark.sql.execution.datasources.PartitionedFile; + +/** + * Builder for a {@link BoundMetadataValueSetter}, parametrised by a {@link PartitionedFile}. + * + *

One instance is registered per requested subfield of the DSv2 {@code _metadata} struct (e.g. + * {@code file_path}, {@code file_size}, {@code row_id}, {@code row_commit_version}). {@link + * MetadataStructSchemaContext} owns the ordered array of builders in pruned-struct field order; + * {@link MetadataStructReadFunction#apply} invokes {@link #buildWithFile} once per file and runs + * the resulting bound setters per row. + * + *

Three implementations exist today, all under {@code v2/read/metadata/}: + * + *

    + *
  • {@link FileConstantValueSetterBuilder} — for fields whose value is a constant of the file + * (Spark file-source base fields plus any Delta-specific file-constant extractors exposed by + * {@code DeltaParquetFileFormat#fileConstantMetadataExtractors}). + *
  • {@link RowIdValueSetterBuilder} / {@link RowCommitVersionValueSetterBuilder} — for the two + * row-tracking fields whose value is computed per row from materialised helper columns plus + * file-constant fallbacks. + *
+ * + *

All three flow through the same per-row materialisation step, keeping the per-field plumbing + * uniform regardless of where the value comes from. + */ +public interface MetadataValueSetterBuilder extends Serializable { + + /** + * Returns a {@link BoundMetadataValueSetter} parametrised by {@code file}'s constant metadata. + * The returned setter is invoked per row by {@link MetadataStructReadFunction}. + */ + BoundMetadataValueSetter buildWithFile(PartitionedFile file); +} diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/read/metadata/RowCommitVersionValueSetterBuilder.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/read/metadata/RowCommitVersionValueSetterBuilder.java new file mode 100644 index 00000000000..c3a185dd208 --- /dev/null +++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/read/metadata/RowCommitVersionValueSetterBuilder.java @@ -0,0 +1,78 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.spark.internal.v2.read.metadata; + +import java.io.Serializable; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.delta.DefaultRowCommitVersion$; +import org.apache.spark.sql.execution.datasources.PartitionedFile; + +/** + * Builder for {@code _metadata.row_commit_version}. Per-row coalesce against the materialised + * row-commit-version helper column: if non-null, use it; otherwise fall back to the file's {@code + * default_row_commit_version} constant. + */ +public final class RowCommitVersionValueSetterBuilder implements MetadataValueSetterBuilder { + + private final int materializedRowCommitVersionIdx; + + public RowCommitVersionValueSetterBuilder(int materializedRowCommitVersionIdx) { + this.materializedRowCommitVersionIdx = materializedRowCommitVersionIdx; + } + + @Override + public BoundMetadataValueSetter buildWithFile(PartitionedFile file) { + // default_row_commit_version is required whenever row tracking is enabled on the file; fail + // fast with an actionable error rather than NPE if it's missing. + Object rawDefault = + file.otherConstantMetadataColumnValues() + .get(DefaultRowCommitVersion$.MODULE$.METADATA_STRUCT_FIELD_NAME()) + .getOrElse(() -> null); + if (rawDefault == null) { + throw new IllegalStateException( + "Missing '" + + DefaultRowCommitVersion$.MODULE$.METADATA_STRUCT_FIELD_NAME() + + "' on PartitionedFile '" + + file.filePath() + + "' for row tracking — every Delta " + + "file with row tracking enabled must carry a default_row_commit_version constant."); + } + long defaultCommitVersion = ((Number) rawDefault).longValue(); + return new Bound(defaultCommitVersion, materializedRowCommitVersionIdx); + } + + private static final class Bound implements BoundMetadataValueSetter, Serializable { + private final long defaultCommitVersion; + private final int materializedRowCommitVersionIdx; + + Bound(long defaultCommitVersion, int materializedRowCommitVersionIdx) { + this.defaultCommitVersion = defaultCommitVersion; + this.materializedRowCommitVersionIdx = materializedRowCommitVersionIdx; + } + + @Override + public void setValue(GenericInternalRow metadataRow, int ordinal, InternalRow innerRow) { + long version; + if (innerRow.isNullAt(materializedRowCommitVersionIdx)) { + version = defaultCommitVersion; + } else { + version = innerRow.getLong(materializedRowCommitVersionIdx); + } + metadataRow.setLong(ordinal, version); + } + } +} diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/read/metadata/RowIdValueSetterBuilder.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/read/metadata/RowIdValueSetterBuilder.java new file mode 100644 index 00000000000..6533c8e0177 --- /dev/null +++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/read/metadata/RowIdValueSetterBuilder.java @@ -0,0 +1,89 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.spark.internal.v2.read.metadata; + +import java.io.Serializable; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.delta.RowId$; +import org.apache.spark.sql.execution.datasources.PartitionedFile; + +/** + * Builder for {@code _metadata.row_id}. Per-row coalesce against the materialised row-id helper + * column: if present and non-null in the inner row, use it; otherwise fall back to {@code baseRowId + * + physicalRowIndex}. + */ +public final class RowIdValueSetterBuilder implements MetadataValueSetterBuilder { + + private final int materializedRowIdIdx; + private final int rowIndexIdx; + + public RowIdValueSetterBuilder(int materializedRowIdIdx, int rowIndexIdx) { + this.materializedRowIdIdx = materializedRowIdIdx; + this.rowIndexIdx = rowIndexIdx; + } + + @Override + public BoundMetadataValueSetter buildWithFile(PartitionedFile file) { + // base_row_id is required whenever row tracking is enabled on the file; if it's not present + // (would yield null via Scala's `apply` throwing NoSuchElementException, or returning null + // from a Java-shaped map), fail fast with an actionable error rather than NPE on the + // subsequent .longValue() call. + Object rawBaseRowId = + file.otherConstantMetadataColumnValues() + .get(RowId$.MODULE$.BASE_ROW_ID()) + .getOrElse(() -> null); + if (rawBaseRowId == null) { + throw new IllegalStateException( + "Missing '" + + RowId$.MODULE$.BASE_ROW_ID() + + "' on PartitionedFile '" + + file.filePath() + + "' for row tracking — every Delta file with row tracking " + + "enabled must carry a base_row_id constant."); + } + long baseRowId = ((Number) rawBaseRowId).longValue(); + return new Bound(baseRowId, materializedRowIdIdx, rowIndexIdx); + } + + private static final class Bound implements BoundMetadataValueSetter, Serializable { + private final long baseRowId; + private final int materializedRowIdIdx; + private final int rowIndexIdx; + + Bound(long baseRowId, int materializedRowIdIdx, int rowIndexIdx) { + this.baseRowId = baseRowId; + this.materializedRowIdIdx = materializedRowIdIdx; + this.rowIndexIdx = rowIndexIdx; + } + + @Override + public void setValue(GenericInternalRow metadataRow, int ordinal, InternalRow innerRow) { + long rowId; + // Materialized row IDs are written only for rows rewritten by UPDATE/MERGE; for plain + // INSERTs the column is null and we fall back to baseRowId + physicalRowIndex. We don't + // need to guard against the column being absent from the parquet file — the reader produces + // nulls for any requested column missing from the file, which is the same contract V1's + // Coalesce in GenerateRowIDs.scala relies on. + if (innerRow.isNullAt(materializedRowIdIdx)) { + rowId = baseRowId + innerRow.getLong(rowIndexIdx); + } else { + rowId = innerRow.getLong(materializedRowIdIdx); + } + metadataRow.setLong(ordinal, rowId); + } + } +} diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/read/rowtracking/RowTrackingReadFunction.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/read/rowtracking/RowTrackingReadFunction.java deleted file mode 100644 index 5074291731c..00000000000 --- a/spark/v2/src/main/java/io/delta/spark/internal/v2/read/rowtracking/RowTrackingReadFunction.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Copyright (2026) The Delta Lake Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.delta.spark.internal.v2.read.rowtracking; - -import io.delta.spark.internal.v2.utils.CloseableIterator; -import java.io.Serializable; -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.catalyst.ProjectingInternalRow; -import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; -import org.apache.spark.sql.catalyst.expressions.JoinedRow; -import org.apache.spark.sql.delta.DefaultRowCommitVersion$; -import org.apache.spark.sql.delta.RowId$; -import org.apache.spark.sql.execution.datasources.PartitionedFile; -import scala.Function1; -import scala.collection.Iterator; -import scala.runtime.AbstractFunction1; - -/** - * Read-function decorator that appends row-tracking values under the DSv2 {@code _metadata} struct. - * - *

This wrapper consumes rows that include internal helper columns introduced by {@link - * RowTrackingSchemaContext}, computes requested row-tracking metadata fields using Delta - * null-coalesce semantics, and returns rows in logical output order: {@code data columns + - * _metadata + partition columns}. - */ -public class RowTrackingReadFunction - extends AbstractFunction1> implements Serializable { - private final Function1> baseReadFunc; - private final RowTrackingSchemaContext rowTrackingSchemaContext; - - private RowTrackingReadFunction( - Function1> baseReadFunc, - RowTrackingSchemaContext rowTrackingSchemaContext) { - this.baseReadFunc = baseReadFunc; - this.rowTrackingSchemaContext = rowTrackingSchemaContext; - } - - /** - * Produces rows with a single {@code _metadata} struct column that contains row-tracking values. - * - *

For each row, computes {@code row_id} as {@code COALESCE(materialized_row_id, base_row_id + - * physical_row_index)} and computes {@code row_commit_version} as {@code - * COALESCE(materialized_row_commit_version, default_row_commit_version)}. - */ - @Override - public Iterator apply(PartitionedFile file) { - final long baseRowId; - if (rowTrackingSchemaContext.isRowIdRequested()) { - baseRowId = - ((Number) file.otherConstantMetadataColumnValues().apply(RowId$.MODULE$.BASE_ROW_ID())) - .longValue(); - } else { - baseRowId = 0L; - } - - final long commitVersionId; - if (rowTrackingSchemaContext.isRowCommitVersionRequested()) { - commitVersionId = - ((Number) - file.otherConstantMetadataColumnValues() - .apply(DefaultRowCommitVersion$.MODULE$.METADATA_STRUCT_FIELD_NAME())) - .longValue(); - } else { - commitVersionId = 0L; - } - - int rowTrackingFieldsCount = - (rowTrackingSchemaContext.isRowIdRequested() ? 1 : 0) - + (rowTrackingSchemaContext.isRowCommitVersionRequested() ? 1 : 0); - - Iterator baseIterator = baseReadFunc.apply(file); - - GenericInternalRow metadataStruct = new GenericInternalRow(1); - // The fields inside the metadata structs are ordered: row_id first / row_commit_version second - GenericInternalRow rowTrackingFields = new GenericInternalRow(rowTrackingFieldsCount); - - ProjectingInternalRow dataProjection = - ProjectingInternalRow.apply( - rowTrackingSchemaContext.getDataSchema(), - rowTrackingSchemaContext.getDataColumnsOrdinals()); - ProjectingInternalRow partitionProjection = - ProjectingInternalRow.apply( - rowTrackingSchemaContext.getPartitionSchema(), - rowTrackingSchemaContext.getPartitionColumnsOrdinals()); - JoinedRow joinedDataAndMetadata = new JoinedRow(); - JoinedRow joinedWithPartitions = new JoinedRow(); - - return CloseableIterator.wrap(baseIterator) - .mapCloseable( - row -> { - int index = 0; - if (rowTrackingSchemaContext.isRowIdRequested()) { - int materializedRowIdIndex = rowTrackingSchemaContext.getMaterializedRowIdIndex(); - int rowIndexColumnIndex = rowTrackingSchemaContext.getRowIndexColumnIndex(); - long physicalRowIndex = row.getLong(rowIndexColumnIndex); - // When reading tables with f.e. mixed file history, the materialized RowIds can be - // absent so materializedRowIdIndex can be beyond the row's width. Treat this case - // like null and fall back to baseRowId + physicalRowIndex. - long rowId = - (row.numFields() <= materializedRowIdIndex - || row.isNullAt(materializedRowIdIndex)) - ? baseRowId + physicalRowIndex - : row.getLong(materializedRowIdIndex); - rowTrackingFields.setLong(index++, rowId); - } - - if (rowTrackingSchemaContext.isRowCommitVersionRequested()) { - int materializedCommitVersionIndex = - rowTrackingSchemaContext.getMaterializedRowCommitVersionIndex(); - long rowCommitVersion = - row.isNullAt(materializedCommitVersionIndex) - ? commitVersionId - : row.getLong(materializedCommitVersionIndex); - rowTrackingFields.setLong(index, rowCommitVersion); - } - dataProjection.project(row); - partitionProjection.project(row); - metadataStruct.update(0, rowTrackingFields.copy()); - - // Partition columns are appended after data columns in readSchema, so insert - // `_metadata` between projected data columns and partition columns to preserve - // output column order. - // Assuming that metadata column is always inserted after all data columns in - // readSchema. - // Needs to be revisited if the _metadata struct position can be arbitrary. - InternalRow dataWithMetadata = - (InternalRow) joinedDataAndMetadata.apply(dataProjection, metadataStruct); - if (rowTrackingSchemaContext.hasPartitionColumns()) { - return (InternalRow) - joinedWithPartitions.apply(dataWithMetadata, partitionProjection); - } - return dataWithMetadata; - }); - } - - /** Creates a row-tracking read-function wrapper around a base Parquet read function. */ - public static RowTrackingReadFunction wrap( - Function1> baseReadFunc, - RowTrackingSchemaContext context) { - return new RowTrackingReadFunction(baseReadFunc, context); - } -} diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/read/rowtracking/RowTrackingSchemaContext.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/read/rowtracking/RowTrackingSchemaContext.java deleted file mode 100644 index 1d84474c4c4..00000000000 --- a/spark/v2/src/main/java/io/delta/spark/internal/v2/read/rowtracking/RowTrackingSchemaContext.java +++ /dev/null @@ -1,187 +0,0 @@ -/* - * Copyright (2026) The Delta Lake Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.delta.spark.internal.v2.read.rowtracking; - -import io.delta.kernel.internal.actions.Metadata; -import io.delta.kernel.internal.rowtracking.MaterializedRowTrackingColumn; -import java.io.Serializable; -import java.util.Arrays; -import org.apache.spark.sql.delta.RowCommitVersion$; -import org.apache.spark.sql.delta.RowId$; -import org.apache.spark.sql.execution.datasources.FileFormat$; -import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat; -import org.apache.spark.sql.types.DataTypes; -import org.apache.spark.sql.types.StructField; -import org.apache.spark.sql.types.StructType; -import scala.collection.immutable.Seq; - -/** - * Schema context for row tracking in the V2 connector. - * - *

Parses requested {@code _metadata} row-tracking fields from the read schema, augments the - * physical read schema with the required helper columns, and pre-computes indices/projections used - * by {@link RowTrackingReadFunction}. - * - *

Helper columns are added only for requested fields: - * - *

    - *
  • {@code row_id}: materialized row ID + temporary row-index column - *
  • {@code row_commit_version}: materialized row-commit-version column - *
- */ -public class RowTrackingSchemaContext implements Serializable { - - private static final String METADATA_COLUMN_NAME = FileFormat$.MODULE$.METADATA_NAME(); - private static final String ROW_ID_METADATA_FIELD_NAME = RowId$.MODULE$.ROW_ID(); - private static final String ROW_COMMIT_VERSION_METADATA_FIELD_NAME = - RowCommitVersion$.MODULE$.METADATA_STRUCT_FIELD_NAME(); - - private StructType schemaWithRowTrackingColumns; - private int materializedRowIdIndex = -1; - private int materializedRowCommitVersionIndex = -1; - private int rowIndexColumnIndex = -1; - private StructType dataSchema; - private Seq dataColumnsOrdinals; - private StructType partitionSchema; - private Seq partitionColumnsOrdinals; - - public RowTrackingSchemaContext( - StructType readDataSchema, Metadata metadata, StructType partitionSchema) { - StructField metadataColumn = - Arrays.stream(readDataSchema.fields()) - .filter(field -> METADATA_COLUMN_NAME.equals(field.name())) - .findFirst() - .orElse(null); - if (metadataColumn == null || !(metadataColumn.dataType() instanceof StructType)) { - return; - } - StructType metadataType = (StructType) metadataColumn.dataType(); - - StructType baseSchemaWithoutMetadata = - new StructType( - Arrays.stream(readDataSchema.fields()) - .filter(f -> !METADATA_COLUMN_NAME.equals(f.name())) - .toArray(StructField[]::new)); - - this.schemaWithRowTrackingColumns = baseSchemaWithoutMetadata; - - int internalColumnsStartIndex = baseSchemaWithoutMetadata.fields().length; - int index = internalColumnsStartIndex; - int internalColumnsCount = 0; - - boolean rowIdRequested = - containsRowTrackingMetadataField(metadataType, ROW_ID_METADATA_FIELD_NAME); - boolean rowCommitVersionRequested = - containsRowTrackingMetadataField(metadataType, ROW_COMMIT_VERSION_METADATA_FIELD_NAME); - - if (rowIdRequested) { - String rowIdColumnName = - MaterializedRowTrackingColumn.MATERIALIZED_ROW_ID.getPhysicalColumnName( - metadata.getConfiguration()); - schemaWithRowTrackingColumns = - schemaWithRowTrackingColumns.add(rowIdColumnName, DataTypes.LongType, true); - materializedRowIdIndex = index++; - internalColumnsCount++; - - schemaWithRowTrackingColumns = - schemaWithRowTrackingColumns.add( - ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME(), DataTypes.LongType, true); - rowIndexColumnIndex = index++; - internalColumnsCount++; - } - - if (rowCommitVersionRequested) { - String rowCommitVersionColumnName = - MaterializedRowTrackingColumn.MATERIALIZED_ROW_COMMIT_VERSION.getPhysicalColumnName( - metadata.getConfiguration()); - schemaWithRowTrackingColumns = - schemaWithRowTrackingColumns.add(rowCommitVersionColumnName, DataTypes.LongType, true); - materializedRowCommitVersionIndex = index++; - internalColumnsCount++; - } - - this.dataSchema = baseSchemaWithoutMetadata; - this.dataColumnsOrdinals = buildRangeOrdinals(0, internalColumnsStartIndex); - this.partitionSchema = partitionSchema; - this.partitionColumnsOrdinals = - buildRangeOrdinals( - internalColumnsStartIndex + internalColumnsCount, - schemaWithRowTrackingColumns.fields().length + partitionSchema.fields().length); - } - - public StructType getSchemaWithRowTrackingColumns() { - return schemaWithRowTrackingColumns; - } - - public int getMaterializedRowIdIndex() { - return materializedRowIdIndex; - } - - public int getMaterializedRowCommitVersionIndex() { - return materializedRowCommitVersionIndex; - } - - public int getRowIndexColumnIndex() { - return rowIndexColumnIndex; - } - - public boolean isRowIdRequested() { - return materializedRowIdIndex != -1; - } - - public boolean isRowCommitVersionRequested() { - return materializedRowCommitVersionIndex != -1; - } - - public boolean areRowTrackingMetadataFieldsRequested() { - return isRowIdRequested() || isRowCommitVersionRequested(); - } - - private static Seq buildRangeOrdinals(int startInclusive, int endExclusive) { - int len = Math.max(0, endExclusive - startInclusive); - int[] ordinals = new int[len]; - for (int i = 0; i < len; i++) { - ordinals[i] = startInclusive + i; - } - return scala.Predef.wrapIntArray(ordinals).toList(); - } - - public StructType getDataSchema() { - return dataSchema; - } - - public Seq getDataColumnsOrdinals() { - return dataColumnsOrdinals; - } - - public StructType getPartitionSchema() { - return partitionSchema; - } - - public Seq getPartitionColumnsOrdinals() { - return partitionColumnsOrdinals; - } - - public boolean hasPartitionColumns() { - return partitionSchema.fields().length > 0; - } - - private static boolean containsRowTrackingMetadataField( - StructType metadataType, String metadataFieldName) { - return Arrays.stream(metadataType.fields()) - .anyMatch(field -> metadataFieldName.equals(field.name())); - } -} diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/utils/PartitionUtils.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/utils/PartitionUtils.java index ba17d4d3243..55c7525579a 100644 --- a/spark/v2/src/main/java/io/delta/spark/internal/v2/utils/PartitionUtils.java +++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/utils/PartitionUtils.java @@ -30,8 +30,8 @@ import io.delta.spark.internal.v2.read.cdc.CDCSchemaContext; import io.delta.spark.internal.v2.read.deletionvector.DeletionVectorReadFunction; import io.delta.spark.internal.v2.read.deletionvector.DeletionVectorSchemaContext; -import io.delta.spark.internal.v2.read.rowtracking.RowTrackingReadFunction; -import io.delta.spark.internal.v2.read.rowtracking.RowTrackingSchemaContext; +import io.delta.spark.internal.v2.read.metadata.MetadataStructReadFunction; +import io.delta.spark.internal.v2.read.metadata.MetadataStructSchemaContext; import java.time.ZoneId; import java.util.Arrays; import java.util.HashMap; @@ -370,20 +370,31 @@ public static PartitionReaderFactory createDeltaParquetReaderFactory( readDataSchema = cdcSchemaContext.get().getReadDataSchemaWithCDC(); } - boolean metadataColumnRequested = - Arrays.stream(readDataSchema.fields()) - .anyMatch(field -> FileFormat$.MODULE$.METADATA_NAME().equals(field.name())); - Optional rowTrackingSchemaContext = Optional.empty(); - if (metadataColumnRequested) { - RowTrackingSchemaContext context = - new RowTrackingSchemaContext(readDataSchema, snapshotImpl.getMetadata(), partitionSchema); - rowTrackingSchemaContext = Optional.of(context); - readDataSchema = context.getSchemaWithRowTrackingColumns(); + // DV presence governs `optimizationsEnabled` / `useMetadataRowIndex`, both of which feed + // into `DeltaParquetFileFormatV2` construction below. Compute it upfront so the format can + // be built before MetadataStructSchemaContext (which consumes the format's + // `fileConstantMetadataExtractors`). + boolean tableSupportsDV = tableSupportsDeletionVectors(snapshot); + boolean optimizationsEnabled = !tableSupportsDV; + Option useMetadataRowIndex = + tableSupportsDV ? Option.apply(Boolean.FALSE) : Option.empty(); + DeltaParquetFileFormatV2 deltaFormat = + createDeltaParquetFileFormat( + snapshot, tablePath, optimizationsEnabled, useMetadataRowIndex, isWriteTimeCDCRead); + + // Build the metadata context only when the scan requested `_metadata`. The context owns the + // pruned struct, the parquet read schema (with row-tracking helper columns when needed), + // and the per-field MetadataValueSetterBuilder array used to materialise values per row. + Optional metadataSchemaContext = + MetadataStructSchemaContext.forSchema( + readDataSchema, partitionSchema, deltaFormat, snapshotImpl.getMetadata()); + if (metadataSchemaContext.isPresent()) { + readDataSchema = metadataSchemaContext.get().getParquetReadSchema(); } // Create DV schema context if table supports deletion vectors Optional dvSchemaContext = - tableSupportsDeletionVectors(snapshot) + tableSupportsDV ? Optional.of(new DeletionVectorSchemaContext(readDataSchema, partitionSchema)) : Optional.empty(); if (dvSchemaContext.isPresent()) { @@ -391,9 +402,11 @@ public static PartitionReaderFactory createDeltaParquetReaderFactory( } boolean enableVectorizedReader = - // Disabled because RowTrackingReadFunction operates on individual InternalRows to compute - // row_id from _tmp_metadata_row_index and coalesce with materialized columns. - !rowTrackingSchemaContext.isPresent() + // Disabled because MetadataStructReadFunction operates on individual InternalRows (to + // synthesise the `_metadata` struct, including row-tracking field coalesce against + // materialised helper columns). The wrapper does not currently produce ColumnarBatch + // output. + !metadataSchemaContext.isPresent() && ParquetUtils.isBatchReadSupportedForSchema(sqlConf, readDataSchema); scala.collection.immutable.Map optionsWithVectorizedReading = scalaOptions.$plus( @@ -402,14 +415,9 @@ public static PartitionReaderFactory createDeltaParquetReaderFactory( String.valueOf(enableVectorizedReader))); // TODO(https://github.com/delta-io/delta/issues/5859): Enable file splitting for DV tables - boolean optimizationsEnabled = !dvSchemaContext.isPresent(); - - // TODO(https://github.com/delta-io/delta/issues/5859): Support _metadata.row_index for DV - Option useMetadataRowIndex = - dvSchemaContext.isPresent() ? Option.apply(Boolean.FALSE) : Option.empty(); - DeltaParquetFileFormatV2 deltaFormat = - createDeltaParquetFileFormat( - snapshot, tablePath, optimizationsEnabled, useMetadataRowIndex, isWriteTimeCDCRead); + // (`optimizationsEnabled`) and support _metadata.row_index for DV (`useMetadataRowIndex`). + // Both flags are computed above the metadata-context construction so the format can drive + // its `fileConstantMetadataExtractors`. Function1> readFunc = deltaFormat.buildReaderWithPartitionValues( @@ -423,22 +431,28 @@ public static PartitionReaderFactory createDeltaParquetReaderFactory( // Wrap reader to filter deleted rows and remove internal columns if DV is enabled. // DV must be the inner wrapper so it sees the raw reader output with the DV column - // at its expected index, before row tracking changes the column layout. + // at its expected index, before row tracking or metadata injection changes the column + // layout. if (dvSchemaContext.isPresent()) { readFunc = DeletionVectorReadFunction.wrap(readFunc, dvSchemaContext.get(), enableVectorizedReader); } - // Wrap reader to add rowTracking metadata. - // RT wraps DV: _tmp_metadata_row_index values are per-row physical positions generated by - // the Parquet reader, so they remain correct after DV filtering. - if (rowTrackingSchemaContext.isPresent()) { - readFunc = RowTrackingReadFunction.wrap(readFunc, rowTrackingSchemaContext.get()); + // Wrap reader to materialise the `_metadata` struct whenever any `_metadata` subfield is + // requested by the scan. The metadata context owns one MetadataValueSetter per requested + // field; both file-source base fields and Delta row-tracking fields flow through the same + // per-row materialisation step. + if (metadataSchemaContext.isPresent()) { + readFunc = MetadataStructReadFunction.wrap(readFunc, metadataSchemaContext.get()); } // TODO(#5319): add e2e test for CDC reads (full schema + column pruning) when streaming CDC // reads become user-reachable end-to-end. if (cdcSchemaContext.isPresent()) { + if (metadataSchemaContext.isPresent()) { + throw new UnsupportedOperationException( + "CDC reads combined with _metadata reads are not supported"); + } readFunc = CDCReadFunction.wrap(readFunc, cdcSchemaContext.get(), enableVectorizedReader); } diff --git a/spark/v2/src/test/java/io/delta/spark/internal/v2/V2MetadataReadTest.java b/spark/v2/src/test/java/io/delta/spark/internal/v2/V2MetadataReadTest.java new file mode 100644 index 00000000000..30a8f852ad4 --- /dev/null +++ b/spark/v2/src/test/java/io/delta/spark/internal/v2/V2MetadataReadTest.java @@ -0,0 +1,294 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.spark.internal.v2; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.File; +import java.nio.file.Paths; +import java.sql.Timestamp; +import java.util.List; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.util.Utils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +/** + * Integration tests for DSv2 {@code _metadata} column reads that are not tied to row tracking. + * + *

This suite exercises generic metadata subfields against real Delta tables, including tables + * where row tracking is disabled, to verify that DSv2 exposes metadata independently of + * row-tracking machinery. + */ +public class V2MetadataReadTest extends DeltaV2TestBase { + + private String tablePath; + + @AfterEach + public void tearDownTable() { + if (tablePath != null) { + Utils.deleteRecursively(new File(tablePath)); + } + } + + // --------------------------------------------------------------------------- + // Single-subfield projection + // --------------------------------------------------------------------------- + + @ParameterizedTest(name = "rowTrackingEnabled={0}") + @ValueSource(booleans = {true, false}) + public void testReadFilePathRegardlessOfRowTracking(boolean rowTrackingEnabled) { + tablePath = newTablePath("delta-test-metadata-rt-" + (rowTrackingEnabled ? "on-" : "off-")); + createTable(tablePath, rowTrackingEnabled); + insertRows(tablePath); + + Dataset df = + spark.sql( + String.format( + "SELECT id, _metadata.file_path FROM dsv2.delta.`%s` ORDER BY id", tablePath)); + assertArrayEquals( + new String[] {"id", "file_path"}, + df.schema().fieldNames(), + "Expected query output columns: id, file_path"); + + assertFilePathRows(df.collectAsList(), tablePath); + } + + /** + * Round-trip every Spark base {@code _metadata} subfield. We assert each value's shape/range + * matches what {@link + * org.apache.spark.sql.execution.datasources.FileFormat.BASE_METADATA_EXTRACTORS} would derive + * from the underlying {@code PartitionedFile}. + */ + @ParameterizedTest(name = "subfield={0}") + @ValueSource( + strings = { + "file_path", + "file_name", + "file_size", + "file_block_start", + "file_block_length", + "file_modification_time" + }) + public void testReadEachBaseMetadataSubfield(String subfield) { + tablePath = newTablePath("delta-test-metadata-subfield-" + subfield + "-"); + createTable(tablePath, /* rowTrackingEnabled= */ false); + insertRows(tablePath); + + Dataset df = + spark.sql( + String.format( + "SELECT id, _metadata.%s AS m FROM dsv2.delta.`%s` ORDER BY id", + subfield, tablePath)); + List rows = df.collectAsList(); + assertEquals(3, rows.size()); + for (Row row : rows) { + Object value = row.get(1); + assertNotNull(value, "Expected _metadata." + subfield + " to be populated"); + assertSubfieldValueShape(subfield, value, tablePath); + } + + // All three rows came from a single INSERT → same file → identical subfield values. + Object firstValue = rows.get(0).get(1); + for (int i = 1; i < rows.size(); i++) { + assertEquals( + firstValue, + rows.get(i).get(1), + "Single-INSERT rows should share the same _metadata." + subfield); + } + } + + // --------------------------------------------------------------------------- + // Whole-struct projection — `SELECT _metadata`, `SELECT _metadata, *` + // --------------------------------------------------------------------------- + + @Test + public void testSelectMetadataStructAlone() { + tablePath = newTablePath("delta-test-metadata-whole-"); + createTable(tablePath, /* rowTrackingEnabled= */ false); + insertRows(tablePath); + + Dataset df = spark.sql(String.format("SELECT _metadata FROM dsv2.delta.`%s`", tablePath)); + assertArrayEquals( + new String[] {"_metadata"}, + df.schema().fieldNames(), + "Output must have a single top-level _metadata column"); + + StructField metadataField = df.schema().fields()[0]; + assertTrue(metadataField.dataType() instanceof StructType, "_metadata must be a struct"); + String[] metadataSubfieldNames = ((StructType) metadataField.dataType()).fieldNames(); + // Spark expands a bare `_metadata` reference to the full base set when row tracking is off. + assertArrayEquals( + new String[] { + "file_path", + "file_name", + "file_size", + "file_block_start", + "file_block_length", + "file_modification_time" + }, + metadataSubfieldNames, + "Bare _metadata should expand to all six Spark base fields"); + + List rows = df.collectAsList(); + assertEquals(3, rows.size()); + for (Row row : rows) { + Row metadata = row.getStruct(0); + assertNotNull(metadata); + assertSubfieldValueShape("file_path", metadata.get(0), tablePath); + assertSubfieldValueShape("file_name", metadata.get(1), tablePath); + assertSubfieldValueShape("file_size", metadata.get(2), tablePath); + assertSubfieldValueShape("file_block_start", metadata.get(3), tablePath); + assertSubfieldValueShape("file_block_length", metadata.get(4), tablePath); + assertSubfieldValueShape("file_modification_time", metadata.get(5), tablePath); + } + } + + @Test + public void testSelectMetadataAlongsideStar() { + tablePath = newTablePath("delta-test-metadata-star-"); + createTable(tablePath, /* rowTrackingEnabled= */ false); + insertRows(tablePath); + + Dataset df = + spark.sql(String.format("SELECT _metadata, * FROM dsv2.delta.`%s` ORDER BY id", tablePath)); + // Output columns: _metadata + data columns (id, name). _metadata position is leading. + String[] outputNames = df.schema().fieldNames(); + assertArrayEquals( + new String[] {"_metadata", "id", "name"}, + outputNames, + "Output should be [_metadata, id, name] in that order"); + + List rows = df.collectAsList(); + assertEquals(3, rows.size()); + for (int i = 0; i < rows.size(); i++) { + Row row = rows.get(i); + Row metadata = row.getStruct(0); + assertNotNull(metadata, "_metadata struct must be populated"); + assertSubfieldValueShape("file_path", metadata.get(0), tablePath); + assertEquals(i + 1L, row.getLong(1), "data column `id` mismatch"); + assertNotNull(row.getString(2), "data column `name` should be populated"); + } + } + + // --------------------------------------------------------------------------- + // Helpers + // --------------------------------------------------------------------------- + + private static String newTablePath(String prefix) { + return Paths.get(System.getProperty("java.io.tmpdir"), prefix + System.nanoTime()).toString(); + } + + private void createTable(String path, boolean rowTrackingEnabled) { + spark.sql( + String.format( + "CREATE TABLE delta.`%s` (id LONG, name STRING) USING delta " + + "TBLPROPERTIES ('delta.enableRowTracking' = '%s')", + path, rowTrackingEnabled ? "true" : "false")); + } + + private void insertRows(String path) { + spark.sql( + String.format( + "INSERT INTO delta.`%s` VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Charlie')", path)); + } + + /** + * Per-subfield value-shape assertions. Each metadata subfield has a deterministic type and value + * range; we sanity-check the field is plausible rather than re-computing the expected value from + * the underlying file metadata. + */ + private static void assertSubfieldValueShape(String subfield, Object value, String tablePath) { + switch (subfield) { + case "file_path": + { + String filePath = (String) value; + assertTrue(filePath.startsWith("file:"), "Expected file:// URI, got: " + filePath); + assertTrue(filePath.endsWith(".parquet"), "Expected Parquet extension, got: " + filePath); + assertTrue( + filePath.contains(tablePath), + "file_path should reference the table path, got: " + filePath); + break; + } + case "file_name": + { + String fileName = (String) value; + assertTrue( + fileName.endsWith(".parquet"), + "Expected Parquet extension on file_name, got: " + fileName); + assertTrue( + !fileName.contains("/"), + "file_name must be a leaf name, not a path, got: " + fileName); + break; + } + case "file_size": + { + long size = ((Number) value).longValue(); + assertTrue(size > 0, "file_size must be positive, got: " + size); + break; + } + case "file_block_start": + { + long blockStart = ((Number) value).longValue(); + assertTrue(blockStart >= 0, "file_block_start must be non-negative, got: " + blockStart); + break; + } + case "file_block_length": + { + long blockLength = ((Number) value).longValue(); + assertTrue(blockLength > 0, "file_block_length must be positive, got: " + blockLength); + break; + } + case "file_modification_time": + { + // Spark exposes this as a Timestamp with microsecond granularity. + assertTrue( + value instanceof Timestamp, + "Expected Timestamp for file_modification_time, got: " + value.getClass()); + long millis = ((Timestamp) value).getTime(); + assertTrue(millis > 0, "file_modification_time must be a positive epoch, got: " + millis); + break; + } + default: + throw new IllegalArgumentException("Unknown subfield: " + subfield); + } + } + + private static void assertFilePathRows(List rows, String tablePath) { + assertEquals(3, rows.size()); + for (int i = 0; i < rows.size(); i++) { + assertEquals(i + 1L, rows.get(i).getLong(0)); + String filePath = rows.get(i).getString(1); + assertSubfieldValueShape("file_path", filePath, tablePath); + } + // A single INSERT produces one parquet file, so every row must carry the same file_path. + String firstFilePath = rows.get(0).getString(1); + for (int i = 1; i < rows.size(); i++) { + assertEquals( + firstFilePath, + rows.get(i).getString(1), + "All rows from a single INSERT should share the same _metadata.file_path"); + } + } +} diff --git a/spark/v2/src/test/java/io/delta/spark/internal/v2/V2RowTrackingReadTest.java b/spark/v2/src/test/java/io/delta/spark/internal/v2/V2RowTrackingReadTest.java index 5ecc502c57f..f4c4033b447 100644 --- a/spark/v2/src/test/java/io/delta/spark/internal/v2/V2RowTrackingReadTest.java +++ b/spark/v2/src/test/java/io/delta/spark/internal/v2/V2RowTrackingReadTest.java @@ -24,6 +24,7 @@ import org.apache.spark.sql.AnalysisException; import org.apache.spark.sql.Row; import org.apache.spark.sql.delta.DeltaLog; +import org.apache.spark.sql.execution.datasources.FileFormat$; import org.apache.spark.sql.types.StructType; import org.apache.spark.util.Utils; import org.junit.jupiter.api.AfterEach; @@ -169,16 +170,27 @@ public void testReadWithRowTrackingDisabledMetadataFieldAccess(@TempDir File tem org.apache.spark.sql.Dataset metadataDf = spark.sql(String.format("SELECT _metadata FROM dsv2.delta.`%s`", noRtPath)); StructType metadataSchema = (StructType) metadataDf.schema().apply("_metadata").dataType(); + // Spark file-source base metadata fields; row tracking fields are absent when RT is disabled. assertEquals( - 0, + 6, metadataSchema.fields().length, - "Expected _metadata to be an empty struct for row-tracking-disabled tables"); + "Expected _metadata to contain six Spark base metadata fields when RT is disabled"); + assertArrayEquals( + new String[] { + FileFormat$.MODULE$.FILE_PATH(), + FileFormat$.MODULE$.FILE_NAME(), + FileFormat$.MODULE$.FILE_SIZE(), + FileFormat$.MODULE$.FILE_BLOCK_START(), + FileFormat$.MODULE$.FILE_BLOCK_LENGTH(), + FileFormat$.MODULE$.FILE_MODIFICATION_TIME() + }, + metadataSchema.fieldNames()); List metadataRows = metadataDf.collectAsList(); assertEquals(2, metadataRows.size()); for (Row row : metadataRows) { assertNotNull(row.getStruct(0), "Expected _metadata struct value to be present"); - assertEquals(0, row.getStruct(0).size(), "Expected _metadata struct to have no fields"); + assertEquals(6, row.getStruct(0).size(), "Expected full base _metadata struct width"); } AnalysisException e = @@ -243,6 +255,47 @@ public void testUpdateKeepsRowIdStable() { originalUpdatedRowId, rows.get(0).getLong(2), "Updated row should keep the same row_id"); } + /** + * Exercises the {@code _metadata.row_id} fallback across mixed file history: one file written by + * INSERT (materialized row_id column null or absent in the parquet file) plus another file + * written by UPDATE rewrite (materialized row_id column populated). Both branches of the {@code + * RowIdValueSetterBuilder} coalesce must produce the right value, and updated rows must keep + * their original row_id. + */ + @Test + public void testMixedFileHistoryRowIdResolves() { + // Snapshot row_id assignments from the initial INSERT (id 1..3) before we mutate the table. + List before = queryRowTrackingWithRowTrackingMetadata(tablePath); + assertEquals(3, before.size()); + long bobRowIdBefore = before.get(1).getLong(2); // ordered by id, so index 1 = Bob (id=2) + + // UPDATE rewrites Bob -> Bobby. With row tracking, the rewritten row materialises its + // original row_id into a hidden helper column. Other untouched files keep the materialised + // column null / absent for the rows they contain. + spark.sql(String.format("UPDATE delta.`%s` SET name = 'Bobby' WHERE id = 2", tablePath)); + + // Add a third file via a fresh INSERT (also materialised-row-id-absent). + spark.sql(String.format("INSERT INTO delta.`%s` VALUES (4, 'David')", tablePath)); + + List rows = queryRowTrackingWithRowTrackingMetadata(tablePath); + assertEquals(4, rows.size()); + + // id=2 must keep its original row_id (came from the materialised column). + assertEquals(2L, rows.get(1).getLong(0), "result ordered by id"); + assertEquals("Bobby", rows.get(1).getString(1)); + assertEquals( + bobRowIdBefore, + rows.get(1).getLong(2), + "UPDATEd row should keep its original row_id (materialised column path)"); + + // All row_ids must be non-null and unique across the mixed history. + Set uniqueRowIds = new HashSet<>(); + for (Row row : rows) { + assertFalse(row.isNullAt(2), "row_id must be non-null for every row in mixed history"); + assertTrue(uniqueRowIds.add(row.getLong(2)), "row_id collision across mixed-history files"); + } + } + // --------------------------------------------------------------------------- // DV + row tracking: wrapper order keeps physical row IDs // --------------------------------------------------------------------------- @@ -338,7 +391,16 @@ public void testPartitionedTableReadWithRowTrackingPreservesPartitionColumns( "Expected output ordering: data columns, _metadata, partition columns"); StructType metadataSchema = (StructType) df.schema().apply("_metadata").dataType(); assertArrayEquals( - new String[] {"row_id", "row_commit_version"}, + new String[] { + FileFormat$.MODULE$.FILE_PATH(), + FileFormat$.MODULE$.FILE_NAME(), + FileFormat$.MODULE$.FILE_SIZE(), + FileFormat$.MODULE$.FILE_BLOCK_START(), + FileFormat$.MODULE$.FILE_BLOCK_LENGTH(), + FileFormat$.MODULE$.FILE_MODIFICATION_TIME(), + "row_id", + "row_commit_version" + }, metadataSchema.fieldNames(), "Unexpected _metadata struct field order"); @@ -354,8 +416,14 @@ public void testPartitionedTableReadWithRowTrackingPreservesPartitionColumns( Row metadata = row.getStruct(3); String date = row.getString(4); String city = row.getString(5); - long rowId = metadata.getLong(0); - long rowCommitVersion = metadata.getLong(1); + // Base metadata (0..5), then row_id (6), row_commit_version (7) + assertNotNull(metadata.getString(0)); + assertTrue( + metadata.getString(0).contains("part-") || metadata.getString(0).endsWith(".parquet"), + "file_path should reference a data file"); + assertTrue(metadata.getLong(2) > 0L, "file_size should be positive"); + long rowId = metadata.getLong(6); + long rowCommitVersion = metadata.getLong(7); if (id == 1L) { assertEquals("Alice", name); diff --git a/spark/v2/src/test/java/io/delta/spark/internal/v2/catalog/DeltaV2TableTest.java b/spark/v2/src/test/java/io/delta/spark/internal/v2/catalog/DeltaV2TableTest.java index 5aae329d7c7..85aaaaf963f 100644 --- a/spark/v2/src/test/java/io/delta/spark/internal/v2/catalog/DeltaV2TableTest.java +++ b/spark/v2/src/test/java/io/delta/spark/internal/v2/catalog/DeltaV2TableTest.java @@ -17,6 +17,7 @@ import static org.apache.spark.sql.connector.catalog.TableCapability.BATCH_READ; import static org.apache.spark.sql.connector.catalog.TableCapability.BATCH_WRITE; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; @@ -48,8 +49,10 @@ import org.apache.hadoop.fs.Path; import org.apache.spark.sql.catalyst.TableIdentifier; import org.apache.spark.sql.catalyst.catalog.CatalogTable; +import org.apache.spark.sql.catalyst.expressions.FileSourceConstantMetadataStructField; import org.apache.spark.sql.connector.catalog.Column; import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.MetadataColumn; import org.apache.spark.sql.connector.catalog.SupportsWrite; import org.apache.spark.sql.connector.expressions.Transform; import org.apache.spark.sql.connector.write.LogicalWriteInfo; @@ -58,6 +61,7 @@ import org.apache.spark.sql.delta.sources.DeltaSQLConf; import org.apache.spark.sql.delta.sources.DeltaSourceMetadataTrackingLog; import org.apache.spark.sql.delta.sources.PersistedMetadata; +import org.apache.spark.sql.execution.datasources.FileFormat$; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.CaseInsensitiveStringMap; @@ -530,6 +534,70 @@ public CaseInsensitiveStringMap options() { assertNotNull(table.newWriteBuilder(writeInfo), "newWriteBuilder should return non-null"); } + @Test + public void testMetadataColumnsExposeSparkBaseFieldsWhenRowTrackingDisabled( + @TempDir File tempDir) { + String path = tempDir.getAbsolutePath(); + spark.sql( + String.format( + "CREATE TABLE test_metadata_disabled (id INT) USING delta " + + "TBLPROPERTIES ('delta.enableRowTracking' = 'false') LOCATION '%s'", + path)); + + SparkTable table = + new SparkTable(Identifier.of(new String[] {"default"}, "test_metadata_disabled"), path); + + MetadataColumn[] metadataColumns = table.metadataColumns(); + assertEquals(1, metadataColumns.length, "Expected a single _metadata column"); + assertEquals("_metadata", metadataColumns[0].name()); + + StructType metadataType = (StructType) metadataColumns[0].dataType(); + assertArrayEquals( + new String[] { + FileFormat$.MODULE$.FILE_PATH(), + FileFormat$.MODULE$.FILE_NAME(), + FileFormat$.MODULE$.FILE_SIZE(), + FileFormat$.MODULE$.FILE_BLOCK_START(), + FileFormat$.MODULE$.FILE_BLOCK_LENGTH(), + FileFormat$.MODULE$.FILE_MODIFICATION_TIME() + }, + metadataType.fieldNames(), + "Expected Spark base metadata fields (without row tracking) in FileFormat order"); + assertTrue( + FileSourceConstantMetadataStructField.isValid( + metadataType.fields()[0].dataType(), metadataType.fields()[0].metadata()), + "file_path should be surfaced as a file-source constant metadata field"); + } + + @Test + public void testMetadataColumnsAppendRowTrackingAfterSparkBaseFieldsWhenEnabled( + @TempDir File tempDir) { + String path = tempDir.getAbsolutePath(); + spark.sql( + String.format( + "CREATE TABLE test_metadata_enabled (id INT) USING delta " + + "TBLPROPERTIES ('delta.enableRowTracking' = 'true') LOCATION '%s'", + path)); + + SparkTable table = + new SparkTable(Identifier.of(new String[] {"default"}, "test_metadata_enabled"), path); + + StructType metadataType = (StructType) table.metadataColumns()[0].dataType(); + assertArrayEquals( + new String[] { + FileFormat$.MODULE$.FILE_PATH(), + FileFormat$.MODULE$.FILE_NAME(), + FileFormat$.MODULE$.FILE_SIZE(), + FileFormat$.MODULE$.FILE_BLOCK_START(), + FileFormat$.MODULE$.FILE_BLOCK_LENGTH(), + FileFormat$.MODULE$.FILE_MODIFICATION_TIME(), + "row_id", + "row_commit_version" + }, + metadataType.fieldNames(), + "Row-tracking fields should follow the Spark base metadata fields"); + } + @Test public void testSchemaWithReadChangeFeedIncludesCDCColumns(@TempDir File tempDir) { String path = tempDir.getAbsolutePath(); diff --git a/spark/v2/src/test/java/io/delta/spark/internal/v2/read/SparkScanTest.java b/spark/v2/src/test/java/io/delta/spark/internal/v2/read/SparkScanTest.java index 6fdc2190299..3d4ed475601 100644 --- a/spark/v2/src/test/java/io/delta/spark/internal/v2/read/SparkScanTest.java +++ b/spark/v2/src/test/java/io/delta/spark/internal/v2/read/SparkScanTest.java @@ -197,7 +197,7 @@ public void testColumnarSupportModeWithMetadataColumnPruned() { StructType prunedSchema = new StructType() .add("name", DataTypes.StringType) - .add("_metadata", new StructType()) + .add("_metadata", new StructType().add("file_path", DataTypes.StringType)) .add("date", DataTypes.StringType) .add("city", DataTypes.StringType) .add("part", DataTypes.IntegerType); diff --git a/spark/v2/src/test/java/io/delta/spark/internal/v2/read/metadata/MetadataStructReadFunctionTest.java b/spark/v2/src/test/java/io/delta/spark/internal/v2/read/metadata/MetadataStructReadFunctionTest.java new file mode 100644 index 00000000000..b2cecaa034c --- /dev/null +++ b/spark/v2/src/test/java/io/delta/spark/internal/v2/read/metadata/MetadataStructReadFunctionTest.java @@ -0,0 +1,345 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.spark.internal.v2.read.metadata; + +import static io.delta.spark.internal.v2.InternalRowTestUtils.collectRows; +import static io.delta.spark.internal.v2.InternalRowTestUtils.mockReader; +import static io.delta.spark.internal.v2.InternalRowTestUtils.row; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import io.delta.kernel.data.ArrayValue; +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.internal.actions.Format; +import io.delta.kernel.internal.actions.Metadata; +import io.delta.kernel.internal.rowtracking.MaterializedRowTrackingColumn; +import java.util.List; +import java.util.Map; +import org.apache.spark.paths.SparkPath; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.delta.DefaultRowCommitVersion$; +import org.apache.spark.sql.delta.RowCommitVersion$; +import org.apache.spark.sql.delta.RowId$; +import org.apache.spark.sql.execution.datasources.FileFormat$; +import org.apache.spark.sql.execution.datasources.PartitionedFile; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.junit.jupiter.api.Test; +import scala.Tuple2; +import scala.collection.immutable.Map$; +import scala.jdk.javaapi.CollectionConverters; + +/** + * End-to-end tests for {@link MetadataStructReadFunction} covering Spark base fields and the + * row-tracking fields, all driven through {@link MetadataValueSetterBuilder} via the unified {@link + * MetadataStructSchemaContext}. + */ +public class MetadataStructReadFunctionTest { + + private static final String MATERIALIZED_ROW_ID_COLUMN = "__row_id"; + private static final String MATERIALIZED_ROW_COMMIT_VERSION_COLUMN = "__row_commit_version"; + private static final StructType EMPTY_PARTITION_SCHEMA = new StructType(); + + // --------------------------------------------------------------------------- + // Base Spark file-source metadata fields + // --------------------------------------------------------------------------- + + @Test + public void testFilePathAndFileSizeMatchPartitionedFileAndDisplayPath() { + StructType metadataSchema = + new StructType() + .add(FileFormat$.MODULE$.FILE_PATH(), DataTypes.StringType, false) + .add(FileFormat$.MODULE$.FILE_SIZE(), DataTypes.LongType, false); + StructType readSchema = readSchemaWithMetadata(metadataSchema); + + PartitionedFile file = + new PartitionedFile( + row(), + SparkPath.fromUrlString("file:///tmp/metadata-test.parquet"), + 5L, + 100L, + new String[0], + 1234L, + 9999L, + Map$.MODULE$.empty()); + + MetadataStructReadFunction readFunction = + MetadataStructReadFunction.wrap( + mockReader(List.of(row(1L))), context(readSchema, EMPTY_PARTITION_SCHEMA)); + + List result = collectRows(readFunction.apply(file)); + + assertEquals(1, result.size()); + InternalRow out = result.get(0); + assertEquals(2, out.numFields()); + InternalRow metadata = out.getStruct(1, 2); + assertEquals( + FileFormat$.MODULE$ + .BASE_METADATA_EXTRACTORS() + .apply(FileFormat$.MODULE$.FILE_PATH()) + .apply(file), + metadata.getUTF8String(0)); + assertEquals(9999L, metadata.getLong(1)); + } + + /** + * All six Spark base {@code _metadata} fields must match {@link + * org.apache.spark.sql.execution.datasources.FileFormat#BASE_METADATA_EXTRACTORS} for the same + * {@link PartitionedFile} (distinct start/length/size/mtime so values cannot be confused). + */ + @Test + public void testAllSixBaseMetadataFieldsMatchFileFormatExtractors() { + StructType metadataSchema = new StructType(); + for (StructField f : CollectionConverters.asJava(FileFormat$.MODULE$.BASE_METADATA_FIELDS())) { + metadataSchema = metadataSchema.add(f); + } + StructType readSchema = readSchemaWithMetadata(metadataSchema); + + PartitionedFile file = + new PartitionedFile( + row(), + SparkPath.fromUrlString("file:///tmp/all-six-meta-test-unique.parquet"), + 7L, + 42L, + new String[0], + 87_654_321L, + 999L, + Map$.MODULE$.empty()); + + MetadataStructReadFunction readFunction = + MetadataStructReadFunction.wrap( + mockReader(List.of(row(1L))), context(readSchema, EMPTY_PARTITION_SCHEMA)); + + List result = collectRows(readFunction.apply(file)); + assertEquals(1, result.size()); + InternalRow metadata = result.get(0).getStruct(1, 6); + + assertEquals( + FileFormat$.MODULE$ + .BASE_METADATA_EXTRACTORS() + .apply(FileFormat$.MODULE$.FILE_PATH()) + .apply(file), + metadata.getUTF8String(0)); + assertEquals( + FileFormat$.MODULE$ + .BASE_METADATA_EXTRACTORS() + .apply(FileFormat$.MODULE$.FILE_NAME()) + .apply(file), + metadata.getUTF8String(1)); + assertEquals(999L, metadata.getLong(2)); + assertEquals(7L, metadata.getLong(3)); + assertEquals(42L, metadata.getLong(4)); + assertEquals(87_654_321L * 1000L, metadata.getLong(5)); + } + + @Test + public void testUnsupportedMetadataSubfieldThrowsAtContextConstruction() { + StructType metadataSchema = + new StructType() + .add("not_a_supported_metadata_field", DataTypes.StringType, false) + .add(FileFormat$.MODULE$.FILE_PATH(), DataTypes.StringType, false); + StructType readSchema = readSchemaWithMetadata(metadataSchema); + + IllegalStateException ex = + assertThrows( + IllegalStateException.class, () -> context(readSchema, EMPTY_PARTITION_SCHEMA)); + assertTrue(ex.getMessage().contains("not_a_supported_metadata_field")); + assertTrue( + ex.getMessage().contains("row-tracking"), + "Error should mention the row-tracking fallback so callers know what's allowed"); + } + + @Test + public void testMetadataWithOnlyBaseFieldsHasNoRowTrackingSlots() { + StructType metadataSchema = + new StructType().add(FileFormat$.MODULE$.FILE_PATH(), DataTypes.StringType, false); + StructType readSchema = readSchemaWithMetadata(metadataSchema); + + MetadataStructReadFunction readFunction = + MetadataStructReadFunction.wrap( + mockReader(List.of(row(1L))), context(readSchema, EMPTY_PARTITION_SCHEMA)); + List result = collectRows(readFunction.apply(simplePartitionedFile(null, null))); + + assertEquals(1, result.size()); + InternalRow metadata = result.get(0).getStruct(1, 1); + assertEquals(1, metadata.numFields(), "Only file_path slot should exist"); + } + + // --------------------------------------------------------------------------- + // Row-tracking integration + // --------------------------------------------------------------------------- + + @Test + public void testRowIdProjectionUsesMaterializedRowIdWhenPresent() { + StructType metadataSchema = + new StructType().add(RowId$.MODULE$.ROW_ID(), DataTypes.LongType, true); + StructType readSchema = readSchemaWithMetadata(metadataSchema); + + MetadataStructReadFunction readFunction = + MetadataStructReadFunction.wrap( + mockReader(List.of(row(1L, 101L, 0L))), context(readSchema, EMPTY_PARTITION_SCHEMA)); + + List result = collectRows(readFunction.apply(simplePartitionedFile(50L, null))); + + assertEquals(1, result.size()); + assertRowIdResult(result.get(0), 1L, 101L); + } + + @Test + public void testRowIdProjectionFallsBackToBaseRowIdPlusPhysicalRowIndex() { + StructType metadataSchema = + new StructType().add(RowId$.MODULE$.ROW_ID(), DataTypes.LongType, true); + StructType readSchema = readSchemaWithMetadata(metadataSchema); + + MetadataStructReadFunction readFunction = + MetadataStructReadFunction.wrap( + mockReader(List.of(row(2L, null, 2L))), context(readSchema, EMPTY_PARTITION_SCHEMA)); + + List result = collectRows(readFunction.apply(simplePartitionedFile(50L, null))); + + assertEquals(1, result.size()); + assertRowIdResult(result.get(0), 2L, 52L); + } + + @Test + public void testRowCommitVersionProjectionUsesMaterializedCommitVersionWhenPresent() { + StructType metadataSchema = + new StructType() + .add(RowCommitVersion$.MODULE$.METADATA_STRUCT_FIELD_NAME(), DataTypes.LongType, true); + StructType readSchema = readSchemaWithMetadata(metadataSchema); + + MetadataStructReadFunction readFunction = + MetadataStructReadFunction.wrap( + mockReader(List.of(row(1L, 7L))), context(readSchema, EMPTY_PARTITION_SCHEMA)); + + List result = collectRows(readFunction.apply(simplePartitionedFile(null, 9L))); + + assertEquals(1, result.size()); + assertCommitVersionResult(result.get(0), 1L, 7L); + } + + @Test + public void testRowCommitVersionProjectionFallsBackToDefaultCommitVersion() { + StructType metadataSchema = + new StructType() + .add(RowCommitVersion$.MODULE$.METADATA_STRUCT_FIELD_NAME(), DataTypes.LongType, true); + StructType readSchema = readSchemaWithMetadata(metadataSchema); + + MetadataStructReadFunction readFunction = + MetadataStructReadFunction.wrap( + mockReader(List.of(row(2L, null))), context(readSchema, EMPTY_PARTITION_SCHEMA)); + + List result = collectRows(readFunction.apply(simplePartitionedFile(null, 9L))); + + assertEquals(1, result.size()); + assertCommitVersionResult(result.get(0), 2L, 9L); + } + + // --------------------------------------------------------------------------- + // Helpers + // --------------------------------------------------------------------------- + + private static void assertRowIdResult(InternalRow row, long expectedId, long expectedRowId) { + assertEquals(2, row.numFields()); + assertEquals(expectedId, row.getLong(0)); + InternalRow metadata = row.getStruct(1, 1); + assertEquals(expectedRowId, metadata.getLong(0)); + } + + private static void assertCommitVersionResult( + InternalRow row, long expectedId, long expectedCommitVersion) { + assertEquals(2, row.numFields()); + assertEquals(expectedId, row.getLong(0)); + InternalRow metadata = row.getStruct(1, 1); + assertEquals(expectedCommitVersion, metadata.getLong(0)); + } + + private static StructType readSchemaWithMetadata(StructType metadataSchema) { + return new StructType() + .add("id", DataTypes.LongType, false) + .add(FileFormat$.MODULE$.METADATA_NAME(), metadataSchema, false); + } + + private static MetadataStructSchemaContext context( + StructType readSchema, StructType partitionSchema) { + return MetadataStructSchemaContext.forSchemaWithExtractors( + readSchema, + partitionSchema, + FileFormat$.MODULE$.BASE_METADATA_EXTRACTORS(), + createMetadata()) + .orElseThrow(() -> new AssertionError("expected non-empty context")); + } + + private static PartitionedFile simplePartitionedFile( + Long baseRowId, Long defaultRowCommitVersion) { + scala.collection.immutable.Map otherMetadata = Map$.MODULE$.empty(); + if (baseRowId != null) { + otherMetadata = otherMetadata.$plus(new Tuple2<>(RowId$.MODULE$.BASE_ROW_ID(), baseRowId)); + } + if (defaultRowCommitVersion != null) { + otherMetadata = + otherMetadata.$plus( + new Tuple2<>( + DefaultRowCommitVersion$.MODULE$.METADATA_STRUCT_FIELD_NAME(), + defaultRowCommitVersion)); + } + return new PartitionedFile( + row(), + SparkPath.fromUrlString("file:///tmp/metadata-test.parquet"), + 0L, + 1L, + new String[0], + 0L, + 1L, + otherMetadata); + } + + private static Metadata createMetadata() { + Map configuration = + Map.of( + MaterializedRowTrackingColumn.MATERIALIZED_ROW_ID.getMaterializedColumnNameProperty(), + MATERIALIZED_ROW_ID_COLUMN, + MaterializedRowTrackingColumn.MATERIALIZED_ROW_COMMIT_VERSION + .getMaterializedColumnNameProperty(), + MATERIALIZED_ROW_COMMIT_VERSION_COLUMN); + io.delta.kernel.types.StructType kernelSchema = + new io.delta.kernel.types.StructType().add("id", io.delta.kernel.types.LongType.LONG); + ArrayValue emptyPartitionColumns = + new ArrayValue() { + @Override + public int getSize() { + return 0; + } + + @Override + public ColumnVector getElements() { + return null; + } + }; + return new Metadata( + "id", + java.util.Optional.empty(), + java.util.Optional.empty(), + new Format(), + kernelSchema.toJson(), + kernelSchema, + emptyPartitionColumns, + java.util.Optional.empty(), + io.delta.kernel.internal.util.VectorUtils.stringStringMapValue(configuration)); + } +} diff --git a/spark/v2/src/test/java/io/delta/spark/internal/v2/read/metadata/MetadataStructSchemaContextTest.java b/spark/v2/src/test/java/io/delta/spark/internal/v2/read/metadata/MetadataStructSchemaContextTest.java new file mode 100644 index 00000000000..a30237d63f2 --- /dev/null +++ b/spark/v2/src/test/java/io/delta/spark/internal/v2/read/metadata/MetadataStructSchemaContextTest.java @@ -0,0 +1,215 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.spark.internal.v2.read.metadata; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import io.delta.kernel.data.ArrayValue; +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.internal.actions.Format; +import io.delta.kernel.internal.actions.Metadata; +import io.delta.kernel.internal.rowtracking.MaterializedRowTrackingColumn; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.spark.sql.delta.RowCommitVersion$; +import org.apache.spark.sql.delta.RowId$; +import org.apache.spark.sql.execution.datasources.FileFormat$; +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructType; +import org.junit.jupiter.api.Test; +import scala.collection.immutable.Seq; +import scala.jdk.javaapi.CollectionConverters; + +public class MetadataStructSchemaContextTest { + + private static final String MATERIALIZED_ROW_ID_COLUMN = "__row_id"; + private static final String MATERIALIZED_ROW_COMMIT_VERSION_COLUMN = "__row_commit_version"; + + private static final StructType BASE_TABLE_SCHEMA = + new StructType().add("id", DataTypes.LongType, false).add("name", DataTypes.StringType, true); + private static final StructType PARTITION_SCHEMA = + new StructType().add("date", DataTypes.StringType, true); + private static final StructType EMPTY_PARTITION_SCHEMA = new StructType(); + + @Test + public void testForSchemaReturnsEmptyWhenNoMetadataColumn() { + Optional ctx = + MetadataStructSchemaContext.forSchemaWithExtractors( + BASE_TABLE_SCHEMA, + PARTITION_SCHEMA, + FileFormat$.MODULE$.BASE_METADATA_EXTRACTORS(), + createMetadata()); + assertFalse(ctx.isPresent(), "no _metadata struct should yield Optional.empty()"); + } + + @Test + public void testFilePathOnlyPrunedStruct() { + StructType readSchema = readSchemaWithMetadata(FileFormat$.MODULE$.FILE_PATH()); + MetadataStructSchemaContext ctx = forSchema(readSchema, EMPTY_PARTITION_SCHEMA); + + assertArrayEquals( + new String[] {FileFormat$.MODULE$.FILE_PATH()}, ctx.getPrunedMetadataStruct().fieldNames()); + // No row tracking helper columns when only base fields are requested. + assertEquals(BASE_TABLE_SCHEMA, ctx.getParquetReadSchema()); + assertEquals(BASE_TABLE_SCHEMA, ctx.getDataSchema()); + assertEquals(List.of(0, 1), toJavaOrdinals(ctx.getDataColumnsOrdinals())); + assertEquals(0, ctx.getPartitionColumnsOrdinals().size()); + assertEquals(1, ctx.getValueSetterBuilders().length); + assertTrue( + ctx.getValueSetterBuilders()[0] instanceof FileConstantValueSetterBuilder, + "file_path should be a FileConstantValueSetterBuilder"); + } + + @Test + public void testRowIdOnlyAddsTwoHelperColumns() { + StructType readSchema = readSchemaWithMetadata(RowId$.MODULE$.ROW_ID()); + MetadataStructSchemaContext ctx = forSchema(readSchema, PARTITION_SCHEMA); + + // Parquet read schema = base data + materialized row id + parquet row index. + assertEquals(4, ctx.getParquetReadSchema().fields().length); + assertEquals(MATERIALIZED_ROW_ID_COLUMN, ctx.getParquetReadSchema().fields()[2].name()); + assertEquals( + ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME(), + ctx.getParquetReadSchema().fields()[3].name()); + + // Data ordinals unchanged; partition ordinals shift past helpers. + assertEquals(BASE_TABLE_SCHEMA, ctx.getDataSchema()); + assertEquals(List.of(0, 1), toJavaOrdinals(ctx.getDataColumnsOrdinals())); + assertEquals(List.of(4), toJavaOrdinals(ctx.getPartitionColumnsOrdinals())); + + assertEquals(1, ctx.getValueSetterBuilders().length); + assertTrue(ctx.getValueSetterBuilders()[0] instanceof RowIdValueSetterBuilder); + } + + @Test + public void testRowCommitVersionOnlyAddsOneHelperColumn() { + StructType readSchema = + readSchemaWithMetadata(RowCommitVersion$.MODULE$.METADATA_STRUCT_FIELD_NAME()); + MetadataStructSchemaContext ctx = forSchema(readSchema, EMPTY_PARTITION_SCHEMA); + + // Parquet read schema = base data + materialized row commit version (no row index column for + // commit version alone). + assertEquals(3, ctx.getParquetReadSchema().fields().length); + assertEquals( + MATERIALIZED_ROW_COMMIT_VERSION_COLUMN, ctx.getParquetReadSchema().fields()[2].name()); + assertEquals(1, ctx.getValueSetterBuilders().length); + assertTrue(ctx.getValueSetterBuilders()[0] instanceof RowCommitVersionValueSetterBuilder); + } + + @Test + public void testBaseFieldsAndRowTrackingFieldsMixed() { + StructType readSchema = + readSchemaWithMetadata( + FileFormat$.MODULE$.FILE_PATH(), + RowId$.MODULE$.ROW_ID(), + RowCommitVersion$.MODULE$.METADATA_STRUCT_FIELD_NAME()); + MetadataStructSchemaContext ctx = forSchema(readSchema, EMPTY_PARTITION_SCHEMA); + + // Parquet schema: base data (2) + row_id helpers (2) + row_commit_version helper (1) = 5. + assertEquals(5, ctx.getParquetReadSchema().fields().length); + assertEquals(3, ctx.getValueSetterBuilders().length); + assertTrue(ctx.getValueSetterBuilders()[0] instanceof FileConstantValueSetterBuilder); + assertTrue(ctx.getValueSetterBuilders()[1] instanceof RowIdValueSetterBuilder); + assertTrue(ctx.getValueSetterBuilders()[2] instanceof RowCommitVersionValueSetterBuilder); + } + + @Test + public void testUnsupportedSubfieldThrowsAtContextConstruction() { + StructType readSchema = readSchemaWithMetadata("not_a_known_metadata_field"); + IllegalStateException ex = + assertThrows( + IllegalStateException.class, () -> forSchema(readSchema, EMPTY_PARTITION_SCHEMA)); + assertTrue(ex.getMessage().contains("not_a_known_metadata_field")); + assertTrue( + ex.getMessage().contains("row-tracking"), + "Error message should reference row-tracking fallback"); + } + + // --------------------------------------------------------------------------- + // Helpers + // --------------------------------------------------------------------------- + + private static MetadataStructSchemaContext forSchema( + StructType readSchema, StructType partitionSchema) { + return MetadataStructSchemaContext.forSchemaWithExtractors( + readSchema, + partitionSchema, + FileFormat$.MODULE$.BASE_METADATA_EXTRACTORS(), + createMetadata()) + .orElseThrow(() -> new AssertionError("expected non-empty context")); + } + + private static StructType readSchemaWithMetadata(String... metadataFieldNames) { + StructType metadataSchema = new StructType(); + for (String name : metadataFieldNames) { + metadataSchema = + metadataSchema.add( + name, + FileFormat$.MODULE$.FILE_PATH().equals(name) + ? DataTypes.StringType + : DataTypes.LongType, + true); + } + return BASE_TABLE_SCHEMA.add(FileFormat$.MODULE$.METADATA_NAME(), metadataSchema, false); + } + + private static List toJavaOrdinals(Seq ordinals) { + return CollectionConverters.asJava(ordinals).stream() + .map(v -> ((Number) v).intValue()) + .collect(Collectors.toList()); + } + + private static Metadata createMetadata() { + Map configuration = + Map.of( + MaterializedRowTrackingColumn.MATERIALIZED_ROW_ID.getMaterializedColumnNameProperty(), + MATERIALIZED_ROW_ID_COLUMN, + MaterializedRowTrackingColumn.MATERIALIZED_ROW_COMMIT_VERSION + .getMaterializedColumnNameProperty(), + MATERIALIZED_ROW_COMMIT_VERSION_COLUMN); + io.delta.kernel.types.StructType kernelSchema = + new io.delta.kernel.types.StructType().add("id", io.delta.kernel.types.LongType.LONG); + ArrayValue emptyPartitionColumns = + new ArrayValue() { + @Override + public int getSize() { + return 0; + } + + @Override + public ColumnVector getElements() { + return null; + } + }; + return new Metadata( + "id", + java.util.Optional.empty(), + java.util.Optional.empty(), + new Format(), + kernelSchema.toJson(), + kernelSchema, + emptyPartitionColumns, + java.util.Optional.empty(), + io.delta.kernel.internal.util.VectorUtils.stringStringMapValue(configuration)); + } +} diff --git a/spark/v2/src/test/java/io/delta/spark/internal/v2/read/metadata/MetadataValueSetterTest.java b/spark/v2/src/test/java/io/delta/spark/internal/v2/read/metadata/MetadataValueSetterTest.java new file mode 100644 index 00000000000..e869f8e8cac --- /dev/null +++ b/spark/v2/src/test/java/io/delta/spark/internal/v2/read/metadata/MetadataValueSetterTest.java @@ -0,0 +1,213 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.spark.internal.v2.read.metadata; + +import static io.delta.spark.internal.v2.InternalRowTestUtils.row; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.apache.spark.paths.SparkPath; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.delta.DefaultRowCommitVersion$; +import org.apache.spark.sql.delta.RowId$; +import org.apache.spark.sql.execution.datasources.FileFormat$; +import org.apache.spark.sql.execution.datasources.PartitionedFile; +import org.junit.jupiter.api.Test; +import scala.Tuple2; +import scala.collection.immutable.Map$; + +/** + * Unit tests for the three {@link MetadataValueSetterBuilder} implementations. + * + *

Each setter is tested independently of the larger schema-context wiring: bind to a synthetic + * {@link PartitionedFile}, run {@link BoundMetadataValueSetter#setValue} against a known inner row, + * and verify the metadata-row ordinal. + */ +public class MetadataValueSetterTest { + + // --------------------------------------------------------------------------- + // FileConstantValueSetterBuilder + // --------------------------------------------------------------------------- + + @Test + public void testFileConstantValueSetterWritesFilePath() { + FileConstantValueSetterBuilder setter = + new FileConstantValueSetterBuilder( + FileFormat$.MODULE$.BASE_METADATA_EXTRACTORS().apply(FileFormat$.MODULE$.FILE_PATH())); + + PartitionedFile file = simplePartitionedFile(); + BoundMetadataValueSetter bound = setter.buildWithFile(file); + + GenericInternalRow metadataRow = new GenericInternalRow(1); + bound.setValue(metadataRow, 0, row(1L)); + + assertEquals( + FileFormat$.MODULE$ + .BASE_METADATA_EXTRACTORS() + .apply(FileFormat$.MODULE$.FILE_PATH()) + .apply(file), + metadataRow.get(0, null)); + } + + @Test + public void testFileConstantValueSetterWritesFileSize() { + FileConstantValueSetterBuilder setter = + new FileConstantValueSetterBuilder( + FileFormat$.MODULE$.BASE_METADATA_EXTRACTORS().apply(FileFormat$.MODULE$.FILE_SIZE())); + + PartitionedFile file = partitionedFileWithSize(9999L); + BoundMetadataValueSetter bound = setter.buildWithFile(file); + + GenericInternalRow metadataRow = new GenericInternalRow(1); + bound.setValue(metadataRow, 0, row(1L)); + + assertEquals(9999L, metadataRow.getLong(0)); + } + + // --------------------------------------------------------------------------- + // RowIdValueSetterBuilder + // --------------------------------------------------------------------------- + + @Test + public void testRowIdSetterUsesMaterializedValueWhenPresent() { + // Inner row layout: [data | materialized_row_id | row_index] + int materializedIdx = 1; + int rowIndexIdx = 2; + RowIdValueSetterBuilder setter = new RowIdValueSetterBuilder(materializedIdx, rowIndexIdx); + + BoundMetadataValueSetter bound = setter.buildWithFile(partitionedFileWithBaseRowId(50L)); + + GenericInternalRow metadataRow = new GenericInternalRow(1); + InternalRow innerRow = row(1L, 101L, 0L); + bound.setValue(metadataRow, 0, innerRow); + + assertEquals(101L, metadataRow.getLong(0), "materialized row_id should win when present"); + } + + @Test + public void testRowIdSetterFallsBackToBaseRowIdPlusRowIndex() { + int materializedIdx = 1; + int rowIndexIdx = 2; + RowIdValueSetterBuilder setter = new RowIdValueSetterBuilder(materializedIdx, rowIndexIdx); + + BoundMetadataValueSetter bound = setter.buildWithFile(partitionedFileWithBaseRowId(50L)); + + GenericInternalRow metadataRow = new GenericInternalRow(1); + InternalRow innerRow = row(2L, null, 2L); + bound.setValue(metadataRow, 0, innerRow); + + assertEquals(52L, metadataRow.getLong(0), "row_id should fall back to baseRowId + rowIndex"); + } + + // --------------------------------------------------------------------------- + // RowCommitVersionValueSetterBuilder + // --------------------------------------------------------------------------- + + @Test + public void testRowCommitVersionSetterUsesMaterializedValueWhenPresent() { + int materializedIdx = 1; + RowCommitVersionValueSetterBuilder setter = + new RowCommitVersionValueSetterBuilder(materializedIdx); + + BoundMetadataValueSetter bound = + setter.buildWithFile(partitionedFileWithDefaultCommitVersion(9L)); + + GenericInternalRow metadataRow = new GenericInternalRow(1); + InternalRow innerRow = row(1L, 7L); + bound.setValue(metadataRow, 0, innerRow); + + assertEquals(7L, metadataRow.getLong(0)); + } + + @Test + public void testRowCommitVersionSetterFallsBackToDefault() { + int materializedIdx = 1; + RowCommitVersionValueSetterBuilder setter = + new RowCommitVersionValueSetterBuilder(materializedIdx); + + BoundMetadataValueSetter bound = + setter.buildWithFile(partitionedFileWithDefaultCommitVersion(9L)); + + GenericInternalRow metadataRow = new GenericInternalRow(1); + InternalRow innerRow = row(2L, null); + bound.setValue(metadataRow, 0, innerRow); + + assertEquals(9L, metadataRow.getLong(0)); + } + + // --------------------------------------------------------------------------- + // Helpers + // --------------------------------------------------------------------------- + + private static PartitionedFile simplePartitionedFile() { + return new PartitionedFile( + row(), + SparkPath.fromUrlString("file:///tmp/setter-test.parquet"), + 0L, + 1L, + new String[0], + 0L, + 1L, + Map$.MODULE$.empty()); + } + + private static PartitionedFile partitionedFileWithSize(long size) { + return new PartitionedFile( + row(), + SparkPath.fromUrlString("file:///tmp/setter-test.parquet"), + 0L, + size, + new String[0], + 0L, + size, + Map$.MODULE$.empty()); + } + + private static PartitionedFile partitionedFileWithBaseRowId(long baseRowId) { + scala.collection.immutable.Map otherMetadata = + Map$.MODULE$ + .empty() + .$plus(new Tuple2<>(RowId$.MODULE$.BASE_ROW_ID(), (Object) baseRowId)); + return new PartitionedFile( + row(), + SparkPath.fromUrlString("file:///tmp/setter-test.parquet"), + 0L, + 1L, + new String[0], + 0L, + 1L, + otherMetadata); + } + + private static PartitionedFile partitionedFileWithDefaultCommitVersion(long version) { + scala.collection.immutable.Map otherMetadata = + Map$.MODULE$ + .empty() + .$plus( + new Tuple2<>( + DefaultRowCommitVersion$.MODULE$.METADATA_STRUCT_FIELD_NAME(), + (Object) version)); + return new PartitionedFile( + row(), + SparkPath.fromUrlString("file:///tmp/setter-test.parquet"), + 0L, + 1L, + new String[0], + 0L, + 1L, + otherMetadata); + } +} diff --git a/spark/v2/src/test/java/io/delta/spark/internal/v2/read/rowtracking/RowTrackingReadFunctionTest.java b/spark/v2/src/test/java/io/delta/spark/internal/v2/read/rowtracking/RowTrackingReadFunctionTest.java deleted file mode 100644 index 6e49f6ae5fa..00000000000 --- a/spark/v2/src/test/java/io/delta/spark/internal/v2/read/rowtracking/RowTrackingReadFunctionTest.java +++ /dev/null @@ -1,203 +0,0 @@ -/* - * Copyright (2026) The Delta Lake Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.delta.spark.internal.v2.read.rowtracking; - -import static io.delta.spark.internal.v2.InternalRowTestUtils.collectRows; -import static io.delta.spark.internal.v2.InternalRowTestUtils.mockReader; -import static io.delta.spark.internal.v2.InternalRowTestUtils.row; -import static org.junit.jupiter.api.Assertions.assertEquals; - -import io.delta.kernel.data.ArrayValue; -import io.delta.kernel.data.ColumnVector; -import io.delta.kernel.internal.actions.Format; -import io.delta.kernel.internal.actions.Metadata; -import io.delta.kernel.internal.rowtracking.MaterializedRowTrackingColumn; -import java.util.List; -import java.util.Map; -import org.apache.spark.paths.SparkPath; -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.delta.DefaultRowCommitVersion$; -import org.apache.spark.sql.delta.RowCommitVersion$; -import org.apache.spark.sql.delta.RowId$; -import org.apache.spark.sql.execution.datasources.FileFormat$; -import org.apache.spark.sql.execution.datasources.PartitionedFile; -import org.apache.spark.sql.types.DataTypes; -import org.apache.spark.sql.types.StructType; -import org.junit.jupiter.api.Test; -import scala.Tuple2; -import scala.collection.immutable.Map$; - -public class RowTrackingReadFunctionTest { - - private static final String MATERIALIZED_ROW_ID_COLUMN = "__row_id"; - private static final String MATERIALIZED_ROW_COMMIT_VERSION_COLUMN = "__row_commit_version"; - private static final StructType EMPTY_PARTITION_SCHEMA = new StructType(); - - @Test - public void testRowIdProjectionUsesMaterializedRowIdWhenPresent() { - RowTrackingSchemaContext context = - createContext(readSchemaWithMetadata(RowId$.MODULE$.ROW_ID()), EMPTY_PARTITION_SCHEMA); - - List inputRows = List.of(row(1L, 101L, 0L)); - RowTrackingReadFunction readFunction = - RowTrackingReadFunction.wrap(mockReader(inputRows), context); - - List result = collectRows(readFunction.apply(createPartitionedFile(50L, null))); - - assertEquals(1, result.size()); - assertRowIdResult(result.get(0), 1L, 101L); - } - - @Test - public void testRowIdProjectionFallsBackToBaseRowIdPlusPhysicalRowIndex() { - RowTrackingSchemaContext context = - createContext(readSchemaWithMetadata(RowId$.MODULE$.ROW_ID()), EMPTY_PARTITION_SCHEMA); - - List inputRows = List.of(row(2L, null, 2L)); - RowTrackingReadFunction readFunction = - RowTrackingReadFunction.wrap(mockReader(inputRows), context); - - List result = collectRows(readFunction.apply(createPartitionedFile(50L, null))); - - assertEquals(1, result.size()); - assertRowIdResult(result.get(0), 2L, 52L); - } - - @Test - public void testRowCommitVersionProjectionUsesMaterializedCommitVersionWhenPresent() { - RowTrackingSchemaContext context = - createContext( - readSchemaWithMetadata(RowCommitVersion$.MODULE$.METADATA_STRUCT_FIELD_NAME()), - EMPTY_PARTITION_SCHEMA); - - List inputRows = List.of(row(1L, 7L)); - RowTrackingReadFunction readFunction = - RowTrackingReadFunction.wrap(mockReader(inputRows), context); - - List result = collectRows(readFunction.apply(createPartitionedFile(null, 9L))); - - assertEquals(1, result.size()); - assertCommitVersionResult(result.get(0), 1L, 7L); - } - - @Test - public void testRowCommitVersionProjectionFallsBackToDefaultCommitVersion() { - RowTrackingSchemaContext context = - createContext( - readSchemaWithMetadata(RowCommitVersion$.MODULE$.METADATA_STRUCT_FIELD_NAME()), - EMPTY_PARTITION_SCHEMA); - - List inputRows = List.of(row(2L, null)); - RowTrackingReadFunction readFunction = - RowTrackingReadFunction.wrap(mockReader(inputRows), context); - - List result = collectRows(readFunction.apply(createPartitionedFile(null, 9L))); - - assertEquals(1, result.size()); - assertCommitVersionResult(result.get(0), 2L, 9L); - } - - private static void assertRowIdResult(InternalRow row, long expectedId, long expectedRowId) { - assertEquals(2, row.numFields()); - assertEquals(expectedId, row.getLong(0)); - InternalRow metadata = row.getStruct(1, 1); - assertEquals(expectedRowId, metadata.getLong(0)); - } - - private static void assertCommitVersionResult( - InternalRow row, long expectedId, long expectedCommitVersion) { - assertEquals(2, row.numFields()); - assertEquals(expectedId, row.getLong(0)); - InternalRow metadata = row.getStruct(1, 1); - assertEquals(expectedCommitVersion, metadata.getLong(0)); - } - - private static RowTrackingSchemaContext createContext( - StructType readSchema, StructType partitionSchema) { - return new RowTrackingSchemaContext(readSchema, createMetadata(), partitionSchema); - } - - private static StructType readSchemaWithMetadata(String... metadataFieldNames) { - StructType metadataSchema = new StructType(); - for (String metadataFieldName : metadataFieldNames) { - metadataSchema = metadataSchema.add(metadataFieldName, DataTypes.LongType, true); - } - return new StructType() - .add("id", DataTypes.LongType, false) - .add(FileFormat$.MODULE$.METADATA_NAME(), metadataSchema, false); - } - - private static PartitionedFile createPartitionedFile( - Long baseRowId, Long defaultRowCommitVersion) { - scala.collection.immutable.Map otherMetadata = Map$.MODULE$.empty(); - if (baseRowId != null) { - otherMetadata = otherMetadata.$plus(new Tuple2<>(RowId$.MODULE$.BASE_ROW_ID(), baseRowId)); - } - if (defaultRowCommitVersion != null) { - otherMetadata = - otherMetadata.$plus( - new Tuple2<>( - DefaultRowCommitVersion$.MODULE$.METADATA_STRUCT_FIELD_NAME(), - defaultRowCommitVersion)); - } - return new PartitionedFile( - row(), - SparkPath.fromUrlString("file:///tmp/row-tracking-test.parquet"), - 0L, - 1L, - new String[0], - 0L, - 1L, - otherMetadata); - } - - private static Metadata createMetadata() { - return createMetadata( - Map.of( - MaterializedRowTrackingColumn.MATERIALIZED_ROW_ID.getMaterializedColumnNameProperty(), - MATERIALIZED_ROW_ID_COLUMN, - MaterializedRowTrackingColumn.MATERIALIZED_ROW_COMMIT_VERSION - .getMaterializedColumnNameProperty(), - MATERIALIZED_ROW_COMMIT_VERSION_COLUMN)); - } - - private static Metadata createMetadata(Map configuration) { - io.delta.kernel.types.StructType kernelSchema = - new io.delta.kernel.types.StructType().add("id", io.delta.kernel.types.LongType.LONG); - ArrayValue emptyPartitionColumns = - new ArrayValue() { - @Override - public int getSize() { - return 0; - } - - @Override - public ColumnVector getElements() { - return null; - } - }; - return new Metadata( - "id", - java.util.Optional.empty() /* name */, - java.util.Optional.empty() /* description */, - new Format(), - kernelSchema.toJson(), - kernelSchema, - emptyPartitionColumns, - java.util.Optional.empty() /* createdTime */, - io.delta.kernel.internal.util.VectorUtils.stringStringMapValue(configuration)); - } -} diff --git a/spark/v2/src/test/java/io/delta/spark/internal/v2/read/rowtracking/RowTrackingSchemaContextTest.java b/spark/v2/src/test/java/io/delta/spark/internal/v2/read/rowtracking/RowTrackingSchemaContextTest.java deleted file mode 100644 index d69b18679e3..00000000000 --- a/spark/v2/src/test/java/io/delta/spark/internal/v2/read/rowtracking/RowTrackingSchemaContextTest.java +++ /dev/null @@ -1,204 +0,0 @@ -/* - * Copyright (2026) The Delta Lake Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.delta.spark.internal.v2.read.rowtracking; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -import io.delta.kernel.data.ArrayValue; -import io.delta.kernel.data.ColumnVector; -import io.delta.kernel.internal.actions.Format; -import io.delta.kernel.internal.actions.Metadata; -import io.delta.kernel.internal.rowtracking.MaterializedRowTrackingColumn; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; -import org.apache.spark.sql.delta.RowCommitVersion$; -import org.apache.spark.sql.delta.RowId$; -import org.apache.spark.sql.execution.datasources.FileFormat$; -import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat; -import org.apache.spark.sql.types.DataTypes; -import org.apache.spark.sql.types.StructType; -import org.junit.jupiter.api.Test; -import scala.collection.immutable.Seq; -import scala.jdk.javaapi.CollectionConverters; - -public class RowTrackingSchemaContextTest { - - private static final String MATERIALIZED_ROW_ID_COLUMN = "__row_id"; - private static final String MATERIALIZED_ROW_COMMIT_VERSION_COLUMN = "__row_commit_version"; - private static final StructType BASE_TABLE_SCHEMA = - new StructType().add("id", DataTypes.LongType, false).add("name", DataTypes.StringType, true); - private static final StructType PARTITION_SCHEMA = - new StructType().add("date", DataTypes.StringType, true); - - @Test - public void testNoRequestedRowTrackingFieldsKeepBaseProjectionState() { - RowTrackingSchemaContext context = - new RowTrackingSchemaContext(readSchemaWithMetadata(), createMetadata(), PARTITION_SCHEMA); - - assertEquals(false, context.areRowTrackingMetadataFieldsRequested()); - assertEquals(false, context.isRowIdRequested()); - assertEquals(false, context.isRowCommitVersionRequested()); - assertEquals(-1, context.getMaterializedRowIdIndex()); - assertEquals(-1, context.getRowIndexColumnIndex()); - assertEquals(-1, context.getMaterializedRowCommitVersionIndex()); - - assertEquals(BASE_TABLE_SCHEMA, context.getSchemaWithRowTrackingColumns()); - assertEquals(BASE_TABLE_SCHEMA, context.getDataSchema()); - assertEquals(PARTITION_SCHEMA, context.getPartitionSchema()); - assertEquals(List.of(0, 1), toJavaOrdinals(context.getDataColumnsOrdinals())); - assertEquals(List.of(2), toJavaOrdinals(context.getPartitionColumnsOrdinals())); - } - - @Test - public void testRowIdRequestAddsMaterializedRowIdAndRowIndexColumns() { - RowTrackingSchemaContext context = - new RowTrackingSchemaContext( - readSchemaWithMetadata(RowId$.MODULE$.ROW_ID()), createMetadata(), PARTITION_SCHEMA); - - StructType expectedPhysicalReadSchema = - new StructType() - .add("id", DataTypes.LongType, false) - .add("name", DataTypes.StringType, true) - .add(MATERIALIZED_ROW_ID_COLUMN, DataTypes.LongType, true) - .add(ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME(), DataTypes.LongType, true); - - assertEquals(true, context.areRowTrackingMetadataFieldsRequested()); - assertEquals(true, context.isRowIdRequested()); - assertEquals(false, context.isRowCommitVersionRequested()); - assertEquals(2, context.getMaterializedRowIdIndex()); - assertEquals(3, context.getRowIndexColumnIndex()); - assertEquals(-1, context.getMaterializedRowCommitVersionIndex()); - - assertEquals(expectedPhysicalReadSchema, context.getSchemaWithRowTrackingColumns()); - assertEquals(BASE_TABLE_SCHEMA, context.getDataSchema()); - assertEquals(PARTITION_SCHEMA, context.getPartitionSchema()); - assertEquals(List.of(0, 1), toJavaOrdinals(context.getDataColumnsOrdinals())); - assertEquals(List.of(4), toJavaOrdinals(context.getPartitionColumnsOrdinals())); - } - - @Test - public void testRowCommitVersionRequestOnlyAddsCommitVersionColumn() { - RowTrackingSchemaContext context = - new RowTrackingSchemaContext( - readSchemaWithMetadata(RowCommitVersion$.MODULE$.METADATA_STRUCT_FIELD_NAME()), - createMetadata(), - PARTITION_SCHEMA); - - StructType expectedPhysicalReadSchema = - new StructType() - .add("id", DataTypes.LongType, false) - .add("name", DataTypes.StringType, true) - .add(MATERIALIZED_ROW_COMMIT_VERSION_COLUMN, DataTypes.LongType, true); - - assertEquals(true, context.areRowTrackingMetadataFieldsRequested()); - assertEquals(false, context.isRowIdRequested()); - assertEquals(true, context.isRowCommitVersionRequested()); - assertEquals(-1, context.getMaterializedRowIdIndex()); - assertEquals(-1, context.getRowIndexColumnIndex()); - assertEquals(2, context.getMaterializedRowCommitVersionIndex()); - - assertEquals(expectedPhysicalReadSchema, context.getSchemaWithRowTrackingColumns()); - assertEquals(BASE_TABLE_SCHEMA, context.getDataSchema()); - assertEquals(PARTITION_SCHEMA, context.getPartitionSchema()); - assertEquals(List.of(0, 1), toJavaOrdinals(context.getDataColumnsOrdinals())); - assertEquals(List.of(3), toJavaOrdinals(context.getPartitionColumnsOrdinals())); - } - - @Test - public void testBothRequestsAddAllInternalColumnsInReadOrder() { - RowTrackingSchemaContext context = - new RowTrackingSchemaContext( - readSchemaWithMetadata( - RowId$.MODULE$.ROW_ID(), RowCommitVersion$.MODULE$.METADATA_STRUCT_FIELD_NAME()), - createMetadata(), - PARTITION_SCHEMA); - - StructType expectedPhysicalReadSchema = - new StructType() - .add("id", DataTypes.LongType, false) - .add("name", DataTypes.StringType, true) - .add(MATERIALIZED_ROW_ID_COLUMN, DataTypes.LongType, true) - .add(ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME(), DataTypes.LongType, true) - .add(MATERIALIZED_ROW_COMMIT_VERSION_COLUMN, DataTypes.LongType, true); - - assertEquals(true, context.areRowTrackingMetadataFieldsRequested()); - assertEquals(true, context.isRowIdRequested()); - assertEquals(true, context.isRowCommitVersionRequested()); - assertEquals(2, context.getMaterializedRowIdIndex()); - assertEquals(3, context.getRowIndexColumnIndex()); - assertEquals(4, context.getMaterializedRowCommitVersionIndex()); - - assertEquals(expectedPhysicalReadSchema, context.getSchemaWithRowTrackingColumns()); - assertEquals(BASE_TABLE_SCHEMA, context.getDataSchema()); - assertEquals(PARTITION_SCHEMA, context.getPartitionSchema()); - assertEquals(List.of(0, 1), toJavaOrdinals(context.getDataColumnsOrdinals())); - assertEquals(List.of(5), toJavaOrdinals(context.getPartitionColumnsOrdinals())); - } - - private static StructType readSchemaWithMetadata(String... metadataFieldNames) { - StructType metadataSchema = new StructType(); - for (String metadataFieldName : metadataFieldNames) { - metadataSchema = metadataSchema.add(metadataFieldName, DataTypes.LongType, true); - } - return BASE_TABLE_SCHEMA.add(FileFormat$.MODULE$.METADATA_NAME(), metadataSchema, false); - } - - private static List toJavaOrdinals(Seq ordinals) { - return CollectionConverters.asJava(ordinals).stream() - .map(value -> ((Number) value).intValue()) - .collect(Collectors.toList()); - } - - private static Metadata createMetadata() { - return createMetadata( - Map.of( - MaterializedRowTrackingColumn.MATERIALIZED_ROW_ID.getMaterializedColumnNameProperty(), - MATERIALIZED_ROW_ID_COLUMN, - MaterializedRowTrackingColumn.MATERIALIZED_ROW_COMMIT_VERSION - .getMaterializedColumnNameProperty(), - MATERIALIZED_ROW_COMMIT_VERSION_COLUMN)); - } - - private static Metadata createMetadata(Map configuration) { - io.delta.kernel.types.StructType kernelSchema = - new io.delta.kernel.types.StructType() - .add("id", io.delta.kernel.types.LongType.LONG) - .add("name", io.delta.kernel.types.StringType.STRING); - ArrayValue emptyPartitionColumns = - new ArrayValue() { - @Override - public int getSize() { - return 0; - } - - @Override - public ColumnVector getElements() { - return null; - } - }; - return new Metadata( - "id", - java.util.Optional.empty() /* name */, - java.util.Optional.empty() /* description */, - new Format(), - kernelSchema.toJson(), - kernelSchema, - emptyPartitionColumns, - java.util.Optional.empty() /* createdTime */, - io.delta.kernel.internal.util.VectorUtils.stringStringMapValue(configuration)); - } -} diff --git a/spark/v2/src/test/java/io/delta/spark/internal/v2/utils/PartitionUtilsTest.java b/spark/v2/src/test/java/io/delta/spark/internal/v2/utils/PartitionUtilsTest.java index 2b92348d674..a979e24a545 100644 --- a/spark/v2/src/test/java/io/delta/spark/internal/v2/utils/PartitionUtilsTest.java +++ b/spark/v2/src/test/java/io/delta/spark/internal/v2/utils/PartitionUtilsTest.java @@ -30,17 +30,26 @@ import io.delta.kernel.utils.CloseableIterator; import io.delta.spark.internal.v2.DeltaV2TestBase; import io.delta.spark.internal.v2.read.CDCDataFile; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; import java.time.ZoneId; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.spark.paths.SparkPath; import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.read.InputPartition; import org.apache.spark.sql.connector.read.PartitionReaderFactory; import org.apache.spark.sql.delta.DeltaParquetFileFormat; import org.apache.spark.sql.delta.RowIndexFilterType; import org.apache.spark.sql.delta.commands.cdc.CDCReader; +import org.apache.spark.sql.execution.datasources.FileFormat$; import org.apache.spark.sql.execution.datasources.PartitionedFile; import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.sources.Filter; @@ -404,6 +413,106 @@ public void testBuildPartitionedFile() throws Exception { assertEquals(1, partitionedFile.partitionValues().numFields()); } + /** + * Verifies that Spark {@code _metadata} base fields for a Kernel-built {@link PartitionedFile} + * align with Delta {@link AddFile} plus table root: path/name via the same {@code Path} + {@link + * SparkPath#fromUrlString} rule as {@link PartitionUtils#buildPartitionedFile}, and size/block + * bounds/mtime from the AddFile when using whole-file splits (start=0, length=size). + */ + @Test + public void testBuildPartitionedFile_OneShotMetadataBaseFieldsFromAddFile() throws Exception { + String tablePath = createTestTable("test_one_shot_metadata_" + System.nanoTime(), true); + Table table = Table.forPath(defaultEngine, tablePath); + Scan scan = table.getLatestSnapshot(defaultEngine).getScanBuilder().build(); + FilteredColumnarBatch batch = scan.getScanFiles(defaultEngine).next(); + CloseableIterator rows = batch.getRows(); + AddFile addFile = new AddFile(rows.next().getStruct(0)); + rows.close(); + + StructType partitionSchema = + new StructType( + new StructField[] {DataTypes.createStructField("part", DataTypes.StringType, true)}); + String normalizedTablePath = tablePath.endsWith("/") ? tablePath : tablePath + "/"; + PartitionedFile pf = + PartitionUtils.buildPartitionedFile( + addFile, partitionSchema, normalizedTablePath, ZoneId.of("UTC")); + + SparkPath reconstructed = + SparkPath.fromUrlString(new Path(normalizedTablePath, addFile.getPath()).toString()); + + PartitionedFile referenceFile = + new PartitionedFile( + pf.partitionValues(), + reconstructed, + pf.start(), + pf.length(), + pf.locations(), + pf.modificationTime(), + pf.fileSize(), + pf.otherConstantMetadataColumnValues()); + assertEquals( + FileFormat$.MODULE$ + .BASE_METADATA_EXTRACTORS() + .apply(FileFormat$.MODULE$.FILE_PATH()) + .apply(referenceFile), + FileFormat$.MODULE$ + .BASE_METADATA_EXTRACTORS() + .apply(FileFormat$.MODULE$.FILE_PATH()) + .apply(pf), + "_metadata.file_path: AddFile.path + tablePath (same as buildPartitionedFile) must match"); + assertEquals( + FileFormat$.MODULE$ + .BASE_METADATA_EXTRACTORS() + .apply(FileFormat$.MODULE$.FILE_NAME()) + .apply(referenceFile), + FileFormat$.MODULE$ + .BASE_METADATA_EXTRACTORS() + .apply(FileFormat$.MODULE$.FILE_NAME()) + .apply(pf), + "_metadata.file_name: must match name derived from the same combined path"); + + assertEquals(0L, pf.start()); + assertEquals(addFile.getSize(), pf.length()); + assertEquals(addFile.getSize(), pf.fileSize()); + assertEquals(addFile.getModificationTime(), pf.modificationTime()); + } + + /** + * Table root containing spaces: {@link PartitionUtils#buildPartitionedFile} still builds a {@link + * SparkPath} string from {@code Path(table, addFile.path)}. The url-encoded form must match a + * direct {@code Path(table, addFile.path)} reconstruction. + */ + @Test + public void testBuildPartitionedFile_SpaceInTableDirectory_PathStringMatchesReconstruction() + throws Exception { + String unique = "meta_space_" + System.nanoTime(); + java.nio.file.Path parent = + Paths.get(System.getProperty("java.io.tmpdir"), "delta test dir " + unique); + Files.createDirectories(parent); + String tablePath = parent.resolve("tbl").toString(); + spark.range(5).write().format("delta").save(tablePath); + + Table table = Table.forPath(defaultEngine, tablePath); + Scan scan = table.getLatestSnapshot(defaultEngine).getScanBuilder().build(); + FilteredColumnarBatch batch = scan.getScanFiles(defaultEngine).next(); + CloseableIterator rows = batch.getRows(); + AddFile addFile = new AddFile(rows.next().getStruct(0)); + rows.close(); + + StructType partitionSchema = new StructType(new StructField[0]); + String normalizedTablePath = tablePath.endsWith("/") ? tablePath : tablePath + "/"; + PartitionedFile pf = + PartitionUtils.buildPartitionedFile( + addFile, partitionSchema, normalizedTablePath, ZoneId.of("UTC")); + + SparkPath reconstructed = + SparkPath.fromUrlString(new Path(normalizedTablePath, addFile.getPath()).toString()); + assertEquals( + reconstructed.urlEncoded(), + pf.filePath().urlEncoded(), + "One-shot path string from AddFile + table root matches PartitionedFile.filePath"); + } + @Test public void testDdlOrderedOutputSchema_PartitionInMiddleInterleavedAtDdlPosition() { String tablePath = getTempTablePath("ddl_order_middle_" + System.nanoTime()); @@ -643,4 +752,41 @@ public void testBuildCDCPartitionedFile_noDv_noFilterMetadata() { metadata.containsKey(DeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_ID_ENCODED()), "No filter metadata expected when AddFile has no DV"); } + + @Test + public void testPlanInputPartitions_EmptyFiles() { + // An empty file list must return an empty array without throwing. + List files = new ArrayList<>(); + Configuration hadoopConf = new Configuration(); + SQLConf sqlConf = SQLConf.get(); + InputPartition[] partitions = + PartitionUtils.planInputPartitions(spark, files, 0L, hadoopConf, sqlConf); + assertNotNull(partitions, "Result must not be null for empty input"); + assertEquals(0, partitions.length, "Result must be empty for empty file list"); + } + + /** Returns all PartitionedFiles for the files in the given Delta table path. */ + private List getAllPartitionedFiles(String tablePath) throws IOException { + Table table = Table.forPath(defaultEngine, tablePath); + Snapshot snapshot = table.getLatestSnapshot(defaultEngine); + Scan scan = snapshot.getScanBuilder().build(); + StructType emptyPartitionSchema = new StructType(new StructField[0]); + String normalizedPath = tablePath.endsWith("/") ? tablePath : tablePath + "/"; + + List files = new ArrayList<>(); + try (CloseableIterator scanFiles = scan.getScanFiles(defaultEngine)) { + while (scanFiles.hasNext()) { + FilteredColumnarBatch batch = scanFiles.next(); + try (CloseableIterator rows = batch.getRows()) { + while (rows.hasNext()) { + AddFile addFile = new AddFile(rows.next().getStruct(0)); + files.add( + PartitionUtils.buildPartitionedFile( + addFile, emptyPartitionSchema, normalizedPath, ZoneId.of("UTC"))); + } + } + } + } + return files; + } }