From 84f5ce6dd87266555f45c5e5253094a03cea8704 Mon Sep 17 00:00:00 2001 From: You Zhou Date: Fri, 10 Apr 2026 21:24:34 +0000 Subject: [PATCH 1/2] create v2 adapters for metadata and protocol --- .../spark/sql/delta/actions/actions.scala | 3 +- .../delta/v2/interop/AbstractMetadata.scala | 8 + .../delta/v2/interop/AbstractProtocol.scala | 11 + .../v2/adapters/KernelMetadataAdapter.java | 108 +++++++ .../v2/adapters/KernelProtocolAdapter.java | 72 +++++ .../v2/adapters/ActionAdaptersTest.java | 282 ++++++++++++++++++ 6 files changed, 482 insertions(+), 2 deletions(-) create mode 100644 spark/v2/src/main/java/io/delta/spark/internal/v2/adapters/KernelMetadataAdapter.java create mode 100644 spark/v2/src/main/java/io/delta/spark/internal/v2/adapters/KernelProtocolAdapter.java create mode 100644 spark/v2/src/test/java/io/delta/spark/internal/v2/adapters/ActionAdaptersTest.java diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala b/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala index 86e2a1d0694..e80eff1cbec 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala @@ -1253,8 +1253,7 @@ case class Metadata( /** Returns the partitionSchema as a [[StructType]] */ @JsonIgnore - lazy val partitionSchema: StructType = - new StructType(partitionColumns.map(c => schema(c)).toArray) + override lazy val partitionSchema: StructType = super.partitionSchema /** Partition value keys in the AddFile map. */ @JsonIgnore diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/v2/interop/AbstractMetadata.scala b/spark/src/main/scala/org/apache/spark/sql/delta/v2/interop/AbstractMetadata.scala index a53ad2bf127..8573663ac08 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/v2/interop/AbstractMetadata.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/v2/interop/AbstractMetadata.scala @@ -16,6 +16,7 @@ package org.apache.spark.sql.delta.v2.interop +import org.apache.spark.sql.delta.DeltaColumnMappingMode import org.apache.spark.sql.types.StructType /** @@ -42,5 +43,12 @@ trait AbstractMetadata { /** The table properties/configuration defined on the table. */ def configuration: Map[String, String] + + /** Column mapping mode for this table. */ + def columnMappingMode: DeltaColumnMappingMode + + /** Returns the partitionSchema as a [[StructType]] */ + def partitionSchema: StructType = + new StructType(partitionColumns.map(c => schema(c)).toArray) } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/v2/interop/AbstractProtocol.scala b/spark/src/main/scala/org/apache/spark/sql/delta/v2/interop/AbstractProtocol.scala index 8c25bd5b821..e054d2f30c4 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/v2/interop/AbstractProtocol.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/v2/interop/AbstractProtocol.scala @@ -40,5 +40,16 @@ trait AbstractProtocol { * Returns None if table features are not enabled for writers. */ def writerFeatures: Option[Set[String]] + + /** + * Field-wise equality across the abstract surface of [[AbstractProtocol]]. Use this instead of + * comparing fields ad-hoc at call sites so that adding a new field to this trait forces an + * update here rather than silently leaving stale comparisons elsewhere. + */ + def equalsByFields(other: AbstractProtocol): Boolean = + minReaderVersion == other.minReaderVersion && + minWriterVersion == other.minWriterVersion && + readerFeatures == other.readerFeatures && + writerFeatures == other.writerFeatures } diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/adapters/KernelMetadataAdapter.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/adapters/KernelMetadataAdapter.java new file mode 100644 index 00000000000..de1c585247e --- /dev/null +++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/adapters/KernelMetadataAdapter.java @@ -0,0 +1,108 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.spark.internal.v2.adapters; + +import io.delta.kernel.internal.actions.Metadata; +import io.delta.kernel.internal.util.ColumnMapping; +import io.delta.kernel.internal.util.VectorUtils; +import io.delta.spark.internal.v2.utils.ScalaUtils; +import io.delta.spark.internal.v2.utils.SchemaUtils; +import java.util.List; +import java.util.Objects; +import org.apache.spark.sql.delta.DeltaColumnMappingMode; +import org.apache.spark.sql.delta.DeltaColumnMappingMode$; +import org.apache.spark.sql.delta.NoMapping$; +import org.apache.spark.sql.delta.v2.interop.AbstractMetadata; +import org.apache.spark.sql.types.StructType; +import scala.collection.immutable.Map; +import scala.collection.immutable.Seq; +import scala.jdk.javaapi.CollectionConverters; + +/** + * Adapter from {@link io.delta.kernel.internal.actions.Metadata} to {@link + * org.apache.spark.sql.delta.v2.interop.AbstractMetadata}. + */ +public class KernelMetadataAdapter implements AbstractMetadata { + + private final Metadata kernelMetadata; + private volatile StructType cachedSchema; + private volatile Seq cachedPartitionColumns; + private volatile Map cachedConfiguration; + private volatile StructType cachedPartitionSchema; + private volatile DeltaColumnMappingMode cachedColumnMappingMode; + + public KernelMetadataAdapter(Metadata kernelMetadata) { + this.kernelMetadata = Objects.requireNonNull(kernelMetadata, "kernelMetadata is null"); + } + + @Override + public String id() { + return kernelMetadata.getId(); + } + + @Override + public String name() { + return kernelMetadata.getName().orElse(null); + } + + @Override + public String description() { + return kernelMetadata.getDescription().orElse(null); + } + + @Override + public StructType schema() { + if (cachedSchema == null) { + cachedSchema = SchemaUtils.convertKernelSchemaToSparkSchema(kernelMetadata.getSchema()); + } + return cachedSchema; + } + + @Override + public Seq partitionColumns() { + if (cachedPartitionColumns == null) { + List rawCols = VectorUtils.toJavaList(kernelMetadata.getPartitionColumns()); + cachedPartitionColumns = CollectionConverters.asScala(rawCols).toSeq(); + } + return cachedPartitionColumns; + } + + @Override + public Map configuration() { + if (cachedConfiguration == null) { + cachedConfiguration = ScalaUtils.toScalaMap(kernelMetadata.getConfiguration()); + } + return cachedConfiguration; + } + + @Override + public DeltaColumnMappingMode columnMappingMode() { + if (cachedColumnMappingMode == null) { + String mode = kernelMetadata.getConfiguration().get(ColumnMapping.COLUMN_MAPPING_MODE_KEY); + cachedColumnMappingMode = + mode == null ? NoMapping$.MODULE$ : DeltaColumnMappingMode$.MODULE$.apply(mode); + } + return cachedColumnMappingMode; + } + + @Override + public StructType partitionSchema() { + if (cachedPartitionSchema == null) { + cachedPartitionSchema = AbstractMetadata.super.partitionSchema(); + } + return cachedPartitionSchema; + } +} diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/adapters/KernelProtocolAdapter.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/adapters/KernelProtocolAdapter.java new file mode 100644 index 00000000000..86713cb9ef3 --- /dev/null +++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/adapters/KernelProtocolAdapter.java @@ -0,0 +1,72 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.spark.internal.v2.adapters; + +import io.delta.kernel.internal.actions.Protocol; +import java.util.Objects; +import org.apache.spark.sql.delta.v2.interop.AbstractProtocol; +import scala.Option; +import scala.collection.immutable.Set; +import scala.jdk.javaapi.CollectionConverters; + +/** + * Adapter from {@link io.delta.kernel.internal.actions.Protocol} to {@link + * org.apache.spark.sql.delta.v2.interop.AbstractProtocol}. + */ +public class KernelProtocolAdapter implements AbstractProtocol { + + private final Protocol kernelProtocol; + private volatile Option> cachedReaderFeatures; + private volatile Option> cachedWriterFeatures; + + public KernelProtocolAdapter(Protocol kernelProtocol) { + this.kernelProtocol = Objects.requireNonNull(kernelProtocol, "kernelProtocol is null"); + } + + @Override + public int minReaderVersion() { + return kernelProtocol.getMinReaderVersion(); + } + + @Override + public int minWriterVersion() { + return kernelProtocol.getMinWriterVersion(); + } + + @Override + public Option> readerFeatures() { + if (cachedReaderFeatures == null) { + cachedReaderFeatures = + kernelProtocol.supportsReaderFeatures() + ? Option.apply( + CollectionConverters.asScala(kernelProtocol.getReaderFeatures()).toSet()) + : Option.empty(); + } + return cachedReaderFeatures; + } + + @Override + public Option> writerFeatures() { + if (cachedWriterFeatures == null) { + cachedWriterFeatures = + kernelProtocol.supportsWriterFeatures() + ? Option.apply( + CollectionConverters.asScala(kernelProtocol.getWriterFeatures()).toSet()) + : Option.empty(); + } + return cachedWriterFeatures; + } +} diff --git a/spark/v2/src/test/java/io/delta/spark/internal/v2/adapters/ActionAdaptersTest.java b/spark/v2/src/test/java/io/delta/spark/internal/v2/adapters/ActionAdaptersTest.java new file mode 100644 index 00000000000..8090951e54f --- /dev/null +++ b/spark/v2/src/test/java/io/delta/spark/internal/v2/adapters/ActionAdaptersTest.java @@ -0,0 +1,282 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.spark.internal.v2.adapters; + +import static org.junit.jupiter.api.Assertions.*; + +import io.delta.kernel.data.ArrayValue; +import io.delta.kernel.internal.actions.Format; +import io.delta.kernel.internal.actions.Metadata; +import io.delta.kernel.internal.actions.Protocol; +import io.delta.kernel.internal.util.VectorUtils; +import io.delta.kernel.types.IntegerType; +import io.delta.kernel.types.StringType; +import io.delta.kernel.types.StructType; +import java.util.*; +import org.apache.spark.sql.delta.IdMapping$; +import org.apache.spark.sql.delta.NameMapping$; +import org.apache.spark.sql.delta.NoMapping$; +import org.junit.jupiter.api.Test; +import scala.jdk.javaapi.CollectionConverters; + +/** Unit tests for {@link KernelMetadataAdapter} and {@link KernelProtocolAdapter}. */ +public class ActionAdaptersTest { + + // ===== KernelProtocolAdapter ===== + + @Test + public void testProtocolAdapterWithTableFeatures() { + // Reader features: supported but empty (version >= 3 means features are supported, even with + // an empty set). Writer features: supported and populated. + Set readerFeatures = Collections.emptySet(); + Set writerFeatures = new HashSet<>(Arrays.asList("v2Checkpoint", "rowTracking")); + Protocol kernelProtocol = new Protocol(3, 7, readerFeatures, writerFeatures); + + KernelProtocolAdapter adapter = new KernelProtocolAdapter(kernelProtocol); + + assertEquals(3, adapter.minReaderVersion()); + assertEquals(7, adapter.minWriterVersion()); + assertTrue(adapter.readerFeatures().isDefined()); + assertTrue(CollectionConverters.asJava(adapter.readerFeatures().get()).isEmpty()); + assertTrue(adapter.writerFeatures().isDefined()); + assertEquals( + new HashSet<>(Arrays.asList("v2Checkpoint", "rowTracking")), + CollectionConverters.asJava(adapter.writerFeatures().get())); + } + + @Test + public void testProtocolAdapterLegacyProtocol() { + Protocol kernelProtocol = new Protocol(1, 2); + + KernelProtocolAdapter adapter = new KernelProtocolAdapter(kernelProtocol); + + assertEquals(1, adapter.minReaderVersion()); + assertEquals(2, adapter.minWriterVersion()); + assertTrue(adapter.readerFeatures().isEmpty()); + assertTrue(adapter.writerFeatures().isEmpty()); + } + + @Test + public void testProtocolAdapterNullThrows() { + assertThrows(NullPointerException.class, () -> new KernelProtocolAdapter(null)); + } + + @Test + public void testProtocolAdapterEqualsByFields() { + Set rf = new HashSet<>(Arrays.asList("v2Checkpoint")); + Set wf = new HashSet<>(Arrays.asList("rowTracking")); + KernelProtocolAdapter base = new KernelProtocolAdapter(new Protocol(3, 7, rf, wf)); + + // Identical fields (built from a fresh Protocol instance) compare equal both directions. + KernelProtocolAdapter same = + new KernelProtocolAdapter( + new Protocol( + 3, + 7, + new HashSet<>(Arrays.asList("v2Checkpoint")), + new HashSet<>(Arrays.asList("rowTracking")))); + assertTrue(base.equalsByFields(same)); + assertTrue(same.equalsByFields(base)); + + // Each field difference flips the result to false. + assertFalse( + base.equalsByFields(new KernelProtocolAdapter(new Protocol(2, 7, rf, wf))), + "minReaderVersion mismatch should not be equal"); + assertFalse( + base.equalsByFields(new KernelProtocolAdapter(new Protocol(3, 6, rf, wf))), + "minWriterVersion mismatch should not be equal"); + assertFalse( + base.equalsByFields( + new KernelProtocolAdapter( + new Protocol(3, 7, new HashSet<>(Arrays.asList("columnMapping")), wf))), + "readerFeatures mismatch should not be equal"); + assertFalse( + base.equalsByFields( + new KernelProtocolAdapter( + new Protocol(3, 7, rf, new HashSet<>(Arrays.asList("columnMapping"))))), + "writerFeatures mismatch should not be equal"); + + // Some(empty) vs None — features defined only at table-features versions; the helper must + // distinguish (3,7,Some(empty),Some(empty)) from legacy (1,2,None,None). + KernelProtocolAdapter emptyFeatures = + new KernelProtocolAdapter( + new Protocol(3, 7, Collections.emptySet(), Collections.emptySet())); + KernelProtocolAdapter legacy = new KernelProtocolAdapter(new Protocol(1, 2)); + assertFalse(emptyFeatures.equalsByFields(legacy)); + assertTrue(legacy.equalsByFields(new KernelProtocolAdapter(new Protocol(1, 2)))); + } + + // ===== KernelMetadataAdapter ===== + + @Test + public void testMetadataAdapter() { + ArrayValue partCols = + VectorUtils.buildArrayValue(Arrays.asList("part1", "part2"), StringType.STRING); + Map formatOptions = Collections.singletonMap("foo", "bar"); + Format format = new Format("parquet", formatOptions); + Map configuration = new HashMap<>(); + configuration.put("zip", "zap"); + configuration.put("delta.columnMapping.mode", "name"); + + Metadata kernelMetadata = + new Metadata( + "id", + Optional.of("name"), + Optional.of("description"), + format, + "{\"type\":\"struct\",\"fields\":" + + "[{\"name\":\"part1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}," + + "{\"name\":\"part2\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}," + + "{\"name\":\"col1\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}", + new StructType() + .add("part1", IntegerType.INTEGER) + .add("part2", StringType.STRING, false /* nullable */) + .add("col1", StringType.STRING, false /* nullable */), + partCols, + Optional.of(42L), + VectorUtils.stringStringMapValue(configuration)); + + KernelMetadataAdapter adapter = new KernelMetadataAdapter(kernelMetadata); + + assertEquals("id", adapter.id()); + assertEquals("name", adapter.name()); + assertEquals("description", adapter.description()); + assertEquals(3, adapter.schema().fields().length); + assertEquals("integer", adapter.schema().apply("part1").dataType().typeName()); + assertTrue(adapter.schema().apply("part1").nullable()); + assertEquals("string", adapter.schema().apply("part2").dataType().typeName()); + assertFalse(adapter.schema().apply("part2").nullable()); + assertEquals("string", adapter.schema().apply("col1").dataType().typeName()); + assertFalse(adapter.schema().apply("col1").nullable()); + assertEquals( + Arrays.asList("part1", "part2"), CollectionConverters.asJava(adapter.partitionColumns())); + org.apache.spark.sql.types.StructType partSchema = adapter.partitionSchema(); + assertEquals(2, partSchema.fields().length); + assertEquals("part1", partSchema.fields()[0].name()); + assertEquals("integer", partSchema.fields()[0].dataType().typeName()); + assertTrue(partSchema.fields()[0].nullable()); + assertEquals("part2", partSchema.fields()[1].name()); + assertEquals("string", partSchema.fields()[1].dataType().typeName()); + assertFalse(partSchema.fields()[1].nullable()); + assertEquals(configuration, CollectionConverters.asJava(adapter.configuration())); + assertEquals(NameMapping$.MODULE$, adapter.columnMappingMode()); + } + + @Test + public void testMetadataAdapterWithNullOptionalFields() { + ArrayValue emptyPartCols = + VectorUtils.buildArrayValue(Collections.emptyList(), StringType.STRING); + Format format = new Format("parquet", Collections.emptyMap()); + + Metadata kernelMetadata = + new Metadata( + "id2", + Optional.empty(), + Optional.empty(), + format, + "{\"type\":\"struct\",\"fields\":[]}", + new StructType(), + emptyPartCols, + Optional.empty(), + VectorUtils.stringStringMapValue(Collections.emptyMap())); + + KernelMetadataAdapter adapter = new KernelMetadataAdapter(kernelMetadata); + + assertEquals("id2", adapter.id()); + assertNull(adapter.name()); + assertNull(adapter.description()); + assertEquals(0, adapter.schema().fields().length); + assertTrue(CollectionConverters.asJava(adapter.partitionColumns()).isEmpty()); + assertEquals(0, adapter.partitionSchema().fields().length); + assertTrue(CollectionConverters.asJava(adapter.configuration()).isEmpty()); + assertEquals(NoMapping$.MODULE$, adapter.columnMappingMode()); + } + + @Test + public void testMetadataAdapterNullThrows() { + assertThrows(NullPointerException.class, () -> new KernelMetadataAdapter(null)); + } + + @Test + public void testMetadataAdapterIdColumnMappingMode() { + KernelMetadataAdapter adapter = + new KernelMetadataAdapter( + buildMinimalKernelMetadata(Collections.singletonMap("delta.columnMapping.mode", "id"))); + + assertEquals(IdMapping$.MODULE$, adapter.columnMappingMode()); + } + + @Test + public void testMetadataAdapterUnknownColumnMappingModeThrows() { + KernelMetadataAdapter adapter = + new KernelMetadataAdapter( + buildMinimalKernelMetadata( + Collections.singletonMap("delta.columnMapping.mode", "bogus"))); + + assertThrows(UnsupportedOperationException.class, adapter::columnMappingMode); + } + + @Test + public void testMetadataAdapterPartitionColumnCaseMismatch() { + // Kernel stores partition column names verbatim; a connector may persist a different + // capitalization than the schema field. The adapter must not silently normalize the case, + // and the default Spark `StructType.apply` lookup is case-sensitive — exercise both. + ArrayValue partCols = + VectorUtils.buildArrayValue(Collections.singletonList("Part1"), StringType.STRING); + Format format = new Format("parquet", Collections.emptyMap()); + + Metadata kernelMetadata = + new Metadata( + "id", + Optional.empty(), + Optional.empty(), + format, + "{\"type\":\"struct\",\"fields\":" + + "[{\"name\":\"part1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}", + new StructType().add("part1", IntegerType.INTEGER), + partCols, + Optional.empty(), + VectorUtils.stringStringMapValue(Collections.emptyMap())); + + KernelMetadataAdapter adapter = new KernelMetadataAdapter(kernelMetadata); + + assertEquals( + Collections.singletonList("Part1"), + CollectionConverters.asJava(adapter.partitionColumns())); + IllegalArgumentException ex = + assertThrows(IllegalArgumentException.class, adapter::partitionSchema); + assertTrue( + ex.getMessage().contains("[FIELD_NOT_FOUND]") + && ex.getMessage().contains("`Part1`") + && ex.getMessage().contains("`part1`"), + "Unexpected exception message: " + ex.getMessage()); + } + + private static Metadata buildMinimalKernelMetadata(Map configuration) { + ArrayValue emptyPartCols = + VectorUtils.buildArrayValue(Collections.emptyList(), StringType.STRING); + return new Metadata( + "id", + Optional.empty(), + Optional.empty(), + new Format("parquet", Collections.emptyMap()), + "{\"type\":\"struct\",\"fields\":[]}", + new StructType(), + emptyPartCols, + Optional.empty(), + VectorUtils.stringStringMapValue(configuration)); + } +} From 9271a6262f7a2615b977de0319c7238044b7d0a9 Mon Sep 17 00:00:00 2001 From: You Zhou Date: Tue, 5 May 2026 20:22:59 +0000 Subject: [PATCH 2/2] trigger CI Co-authored-by: Isaac