Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1253,8 +1253,7 @@ case class Metadata(

/** Returns the partitionSchema as a [[StructType]] */
@JsonIgnore
lazy val partitionSchema: StructType =
new StructType(partitionColumns.map(c => schema(c)).toArray)
override lazy val partitionSchema: StructType = super.partitionSchema

/** Partition value keys in the AddFile map. */
@JsonIgnore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.apache.spark.sql.delta.v2.interop

import org.apache.spark.sql.delta.DeltaColumnMappingMode
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was AbstractMetadata ever used before? Trying to see if SparkMetadataAdapter / SparkProtocolAdapter should be created in this PR of if there are existing adapters that can be reused (not seeing any though)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it is only used to extend v1 metadata and protocol class

import org.apache.spark.sql.types.StructType

/**
Expand All @@ -42,5 +43,12 @@ trait AbstractMetadata {

/** The table properties/configuration defined on the table. */
def configuration: Map[String, String]

/** Column mapping mode for this table. */
def columnMappingMode: DeltaColumnMappingMode

/** Returns the partitionSchema as a [[StructType]] */
def partitionSchema: StructType =
new StructType(partitionColumns.map(c => schema(c)).toArray)
}

Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,16 @@ trait AbstractProtocol {
* Returns None if table features are not enabled for writers.
*/
def writerFeatures: Option[Set[String]]

/**
* Field-wise equality across the abstract surface of [[AbstractProtocol]]. Use this instead of
* comparing fields ad-hoc at call sites so that adding a new field to this trait forces an
* update here rather than silently leaving stale comparisons elsewhere.
*/
def equalsByFields(other: AbstractProtocol): Boolean =
minReaderVersion == other.minReaderVersion &&
minWriterVersion == other.minWriterVersion &&
readerFeatures == other.readerFeatures &&
writerFeatures == other.writerFeatures
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright (2026) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.spark.internal.v2.adapters;

import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.util.ColumnMapping;
import io.delta.kernel.internal.util.VectorUtils;
import io.delta.spark.internal.v2.utils.ScalaUtils;
import io.delta.spark.internal.v2.utils.SchemaUtils;
import java.util.List;
import java.util.Objects;
import org.apache.spark.sql.delta.DeltaColumnMappingMode;
import org.apache.spark.sql.delta.DeltaColumnMappingMode$;
import org.apache.spark.sql.delta.NoMapping$;
import org.apache.spark.sql.delta.v2.interop.AbstractMetadata;
import org.apache.spark.sql.types.StructType;
import scala.collection.immutable.Map;
import scala.collection.immutable.Seq;
import scala.jdk.javaapi.CollectionConverters;

/**
* Adapter from {@link io.delta.kernel.internal.actions.Metadata} to {@link
* org.apache.spark.sql.delta.v2.interop.AbstractMetadata}.
*/
public class KernelMetadataAdapter implements AbstractMetadata {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is there both interop and adapters directories?
whats the difference?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interop is under org.apache.spark.sql define the AbstractMetadata interface that v1 MetadataAction extends, while adapters is under io.delta.spark.internal.v2 that adapts v2 MetadataAction into AbstractMetadata to bridge between v1 and v2 MetadataAction.


private final Metadata kernelMetadata;
private volatile StructType cachedSchema;
private volatile Seq<String> cachedPartitionColumns;
private volatile Map<String, String> cachedConfiguration;
private volatile StructType cachedPartitionSchema;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you cache the partition schema, you may also want to cache the table schema to avoid repeated conversions to Kernel StructType

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cached more fields in both adapters to reduce repeated conversions

private volatile DeltaColumnMappingMode cachedColumnMappingMode;

public KernelMetadataAdapter(Metadata kernelMetadata) {
this.kernelMetadata = Objects.requireNonNull(kernelMetadata, "kernelMetadata is null");
}

@Override
public String id() {
return kernelMetadata.getId();
}

@Override
public String name() {
return kernelMetadata.getName().orElse(null);
}

@Override
public String description() {
return kernelMetadata.getDescription().orElse(null);
}

@Override
public StructType schema() {
if (cachedSchema == null) {
cachedSchema = SchemaUtils.convertKernelSchemaToSparkSchema(kernelMetadata.getSchema());
}
return cachedSchema;
}

@Override
public Seq<String> partitionColumns() {
if (cachedPartitionColumns == null) {
List<String> rawCols = VectorUtils.toJavaList(kernelMetadata.getPartitionColumns());
cachedPartitionColumns = CollectionConverters.asScala(rawCols).toSeq();
}
return cachedPartitionColumns;
}

@Override
public Map<String, String> configuration() {
if (cachedConfiguration == null) {
cachedConfiguration = ScalaUtils.toScalaMap(kernelMetadata.getConfiguration());
}
return cachedConfiguration;
}

@Override
public DeltaColumnMappingMode columnMappingMode() {
if (cachedColumnMappingMode == null) {
String mode = kernelMetadata.getConfiguration().get(ColumnMapping.COLUMN_MAPPING_MODE_KEY);
cachedColumnMappingMode =
mode == null ? NoMapping$.MODULE$ : DeltaColumnMappingMode$.MODULE$.apply(mode);
}
return cachedColumnMappingMode;
}

@Override
public StructType partitionSchema() {
if (cachedPartitionSchema == null) {
cachedPartitionSchema = AbstractMetadata.super.partitionSchema();
}
return cachedPartitionSchema;
}
}
Comment on lines +101 to +108
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a test where partitionColumns = ["Part1"] and the schema field is part1? Note the difference in capitalization.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright (2026) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.spark.internal.v2.adapters;

import io.delta.kernel.internal.actions.Protocol;
import java.util.Objects;
import org.apache.spark.sql.delta.v2.interop.AbstractProtocol;
import scala.Option;
import scala.collection.immutable.Set;
import scala.jdk.javaapi.CollectionConverters;

/**
* Adapter from {@link io.delta.kernel.internal.actions.Protocol} to {@link
* org.apache.spark.sql.delta.v2.interop.AbstractProtocol}.
*/
public class KernelProtocolAdapter implements AbstractProtocol {

private final Protocol kernelProtocol;
private volatile Option<Set<String>> cachedReaderFeatures;
private volatile Option<Set<String>> cachedWriterFeatures;

public KernelProtocolAdapter(Protocol kernelProtocol) {
this.kernelProtocol = Objects.requireNonNull(kernelProtocol, "kernelProtocol is null");
}

@Override
public int minReaderVersion() {
return kernelProtocol.getMinReaderVersion();
}

@Override
public int minWriterVersion() {
return kernelProtocol.getMinWriterVersion();
}

@Override
public Option<Set<String>> readerFeatures() {
if (cachedReaderFeatures == null) {
cachedReaderFeatures =
kernelProtocol.supportsReaderFeatures()
? Option.apply(
CollectionConverters.asScala(kernelProtocol.getReaderFeatures()).toSet())
: Option.empty();
}
return cachedReaderFeatures;
}
Comment on lines +50 to +59
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

volatile + check-then-set seems racy (two threads can both compute)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kernelProtocol is declared to be a constant, so it is fine


@Override
public Option<Set<String>> writerFeatures() {
if (cachedWriterFeatures == null) {
cachedWriterFeatures =
kernelProtocol.supportsWriterFeatures()
? Option.apply(
CollectionConverters.asScala(kernelProtocol.getWriterFeatures()).toSet())
: Option.empty();
}
return cachedWriterFeatures;
}
}
Loading
Loading