Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package org.apache.spark.sql.delta.catalog;

import io.delta.spark.internal.v2.catalog.SparkTable;
import io.delta.spark.internal.v2.catalog.DeltaV2Table;
import org.apache.spark.sql.delta.DeltaV2Mode;
import java.util.HashMap;
import java.util.function.Supplier;
Expand Down Expand Up @@ -57,7 +57,7 @@
* <p>The unified module can access both implementations:</p>
* <ul>
* <li>V1 connector: {@link DeltaTableV2} - Legacy connector using DeltaLog, full read/write support</li>
* <li>V2 connector: {@link SparkTable} - sparkV2 connector, read-only support</li>
* <li>V2 connector: {@link DeltaV2Table} - sparkV2 connector, read-only support</li>
* </ul>
*
* <p>See {@link DeltaV2Mode} for V1 vs V2 connector definitions and enable mode configuration.</p>
Expand All @@ -69,18 +69,18 @@ public class DeltaCatalog extends AbstractDeltaCatalog {
*
* <p>Routing logic based on {@link DeltaV2Mode}:
* <ul>
* <li>STRICT: Returns sparkV2 {@link SparkTable} (V2 connector)</li>
* <li>STRICT: Returns sparkV2 {@link DeltaV2Table} (V2 connector)</li>
* <li>NONE (default): Returns {@link DeltaTableV2} (V1 connector)</li>
* </ul>
*
* @param ident The identifier of the table in the catalog.
* @param catalogTable The catalog table metadata containing table properties and location.
* @return Table instance (SparkTable for V2, DeltaTableV2 for V1).
* @return Table instance (DeltaV2Table for V2, DeltaTableV2 for V1).
*/
@Override
public Table loadCatalogTable(Identifier ident, CatalogTable catalogTable) {
return loadTableInternal(
() -> new SparkTable(ident, catalogTable, new HashMap<>()),
() -> new DeltaV2Table(ident, catalogTable, new HashMap<>()),
() -> super.loadCatalogTable(ident, catalogTable));
}

Expand All @@ -90,18 +90,18 @@ public Table loadCatalogTable(Identifier ident, CatalogTable catalogTable) {
*
* <p>Routing logic based on {@link DeltaV2Mode}:
* <ul>
* <li>STRICT: Returns sparkV2 {@link SparkTable} (V2 connector)</li>
* <li>STRICT: Returns sparkV2 {@link DeltaV2Table} (V2 connector)</li>
* <li>NONE (default): Returns {@link DeltaTableV2} (V1 connector)</li>
* </ul>
*
* @param ident The identifier whose name contains the path to the Delta table.
* @return Table instance (SparkTable for V2, DeltaTableV2 for V1).
* @return Table instance (DeltaV2Table for V2, DeltaTableV2 for V1).
*/
@Override
public Table loadPathTable(Identifier ident) {
return loadTableInternal(
// delta.`/path/to/table`, where ident.name() is `/path/to/table`
() -> new SparkTable(ident, ident.name()),
() -> new DeltaV2Table(ident, ident.name()),
() -> super.loadPathTable(ident));
}

Expand All @@ -110,13 +110,14 @@ public Table loadPathTable(Identifier ident) {
*
* <p>This method checks the configuration and delegates to the appropriate supplier:
* <ul>
* <li>STRICT mode: Uses V2 connector (sparkV2 SparkTable) - for testing V2 capabilities</li>
* <li>STRICT mode: Uses V2 connector (sparkV2 DeltaV2Table) - for testing V2 capabilities</li>
* <li>NONE mode (default): Uses V1 connector (DeltaTableV2) - production default with full features</li>
* </ul>
*
* <p>See {@link DeltaV2Mode} for detailed V1 vs V2 connector definitions.
*
* @param v2ConnectorSupplier Supplier for V2 connector (sparkV2 SparkTable) - used in STRICT mode
* @param v2ConnectorSupplier Supplier for V2 connector (sparkV2 DeltaV2Table)
* - used in STRICT mode
* @param v1ConnectorSupplier Supplier for V1 connector (DeltaTableV2) - used in NONE mode (default)
* @return Table instance from the selected supplier
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package io.delta.internal
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters._

import io.delta.spark.internal.v2.catalog.SparkTable
import io.delta.spark.internal.v2.catalog.DeltaV2Table
import io.delta.spark.internal.v2.utils.ScalaUtils
import org.apache.spark.sql.delta.DeltaV2Mode
import org.apache.spark.sql.delta.sources.DeltaSourceUtils
Expand All @@ -35,12 +35,12 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap

/**
* Rule for applying the V2 streaming path by rewriting V1 StreamingRelation
* with Delta DataSource to StreamingRelationV2 with SparkTable.
* with Delta DataSource to StreamingRelationV2 with DeltaV2Table.
*
* This rule handles the case where Spark's FindDataSourceTable rule has converted
* a StreamingRelationV2 (with DeltaTableV2) back to a StreamingRelation because
* DeltaTableV2 doesn't advertise STREAMING_READ capability. We convert it back to
* StreamingRelationV2 with SparkTable (from sparkV2) which does support streaming.
* StreamingRelationV2 with DeltaV2Table (from sparkV2) which does support streaming.
*
* See [[DeltaV2Mode]] for configuration behavior.
*
Expand Down Expand Up @@ -79,11 +79,11 @@ class ApplyV2Streaming(
val ident =
Identifier.of(catalogTable.identifier.database.toArray, catalogTable.identifier.table)
val table =
new SparkTable(
new DeltaV2Table(
ident,
catalogTable,
// Use user-specified streaming options to override catalog storage properties.
// SparkTable handles merging catalogTable storage props internally.
// DeltaV2Table handles merging catalogTable storage props internally.
ScalaUtils.toJavaMap(s.dataSource.options))
val catalog = catalogTable.identifier.catalog.map(
session.sessionState.catalogManager.catalog)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class DeltaSparkSessionExtension extends AbstractDeltaSparkSessionExtension {
super.apply(extensions)

// Register a post-hoc resolution rule that rewrites V1 StreamingRelation plans that
// read catalog owned Delta tables into V2 StreamingRelationV2 plans backed by SparkTable.
// read catalog owned Delta tables into V2 StreamingRelationV2 plans backed by DeltaV2Table.
//
// NOTE: This rule is functional (not a placeholder). Binary compatibility concerns are
// handled separately via the nested NoOpRule class below (kept for MiMa).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.{HashMap => JHashMap}

import scala.jdk.CollectionConverters._

import io.delta.spark.internal.v2.catalog.SparkTable
import io.delta.spark.internal.v2.catalog.DeltaV2Table
import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorClient
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
Expand All @@ -45,7 +45,7 @@ class ApplyV2StreamingSuite extends DeltaSQLCommandTest {

private def assertV2(result: LogicalPlan): Unit = {
result match {
case StreamingRelationV2(_, _, _: SparkTable, _, _, _, _, v1Relation) =>
case StreamingRelationV2(_, _, _: DeltaV2Table, _, _, _, _, v1Relation) =>
assert(v1Relation.isEmpty)
case other =>
fail(s"Expected StreamingRelationV2, got $other")
Expand Down Expand Up @@ -100,7 +100,7 @@ class ApplyV2StreamingSuite extends DeltaSQLCommandTest {
val ident = Identifier.of(
catalogTable.identifier.database.toArray,
catalogTable.identifier.table)
val table = new SparkTable(ident, catalogTable, new JHashMap[String, String]())
val table = new DeltaV2Table(ident, catalogTable, new JHashMap[String, String]())
DataSourceV2Relation.create(
table,
None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class DataFrameWriterV2WithV2ConnectorSuite
* Tests that we expect to fail because they require write operations after initial
* table creation.
*
* Kernel's SparkTable (V2 connector) only implements SupportsRead, not SupportsWrite.
* Kernel's DeltaV2Table (V2 connector) only implements SupportsRead, not SupportsWrite.
* Tests that perform append/replace operations after table creation are expected to fail.
*/
override protected def shouldFail(testName: String): Boolean = {
Expand All @@ -49,8 +49,8 @@ class DataFrameWriterV2WithV2ConnectorSuite
"OverwritePartitions: overwrite all rows if not partitioned",
"OverwritePartitions: by name not position",

// Create operations - TODO: fix SparkTable's name() to match DeltaTableV2
// SparkTable.name() returns simple table name, but tests expect catalog.schema.table format
// Create operations - TODO: fix DeltaV2Table's name() to match DeltaTableV2
// DeltaV2Table.name() returns simple table name, but tests expect catalog.schema.table format
"Create: basic behavior",
"Create: with using",
"Create: with property",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package org.apache.spark.sql.delta.catalog

import io.delta.spark.internal.v2.catalog.SparkTable
import io.delta.spark.internal.v2.catalog.DeltaV2Table
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest

Expand All @@ -28,13 +28,13 @@ import java.util.Locale
*
* Verifies that DeltaCatalog correctly routes table loading based on
* DeltaSQLConf.V2_ENABLE_MODE:
* - STRICT mode: Kernel's SparkTable (V2 connector)
* - STRICT mode: Kernel's DeltaV2Table (V2 connector)
* - NONE mode (default): DeltaTableV2 (V1 connector)
*/
class DeltaCatalogSuite extends DeltaSQLCommandTest {

private val modeTestCases = Seq(
("STRICT", classOf[SparkTable], "Kernel SparkTable"),
("STRICT", classOf[DeltaV2Table], "Kernel DeltaV2Table"),
("NONE", classOf[DeltaTableV2], "DeltaTableV2")
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.collection.mutable

/**
* Trait that forces Delta V2 connector mode to STRICT, ensuring all operations
* use the Kernel-based SparkTable implementation (V2 connector) instead of
* use the Kernel-based DeltaV2Table implementation (V2 connector) instead of
* DeltaTableV2 (V1 connector).
*
* See [[DeltaSQLConf.V2_ENABLE_MODE]] for V1 vs V2 connector definitions.
Expand Down Expand Up @@ -85,7 +85,7 @@ trait V2ForceTest extends DeltaSQLCommandTest {

/**
* Override `sparkConf` to set V2_ENABLE_MODE to "STRICT".
* This ensures all catalog operations use Kernel SparkTable (V2 connector).
* This ensures all catalog operations use Kernel DeltaV2Table (V2 connector).
*/
abstract override protected def sparkConf: SparkConf = {
super.sparkConf
Expand All @@ -94,7 +94,7 @@ trait V2ForceTest extends DeltaSQLCommandTest {

/**
* Run a SQL statement through the V1 connector by temporarily setting
* V2_ENABLE_MODE to NONE. Useful for DDL/DML that SparkTable (V2) doesn't support.
* V2_ENABLE_MODE to NONE. Useful for DDL/DML that DeltaV2Table (V2) doesn't support.
*/
protected def executeInV1Mode(sqlText: String): Unit = {
withSQLConf(DeltaSQLConf.V2_ENABLE_MODE.key -> "NONE") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.delta.test.V2ForceTest
* Test suite that runs [[RemoveColumnMappingStreamingReadSuite]] using the V2 connector
* (V2_ENABLE_MODE=STRICT).
*
* SparkTable (V2) is read-only and does not support DDL, so DDL/DML operations are routed
* DeltaV2Table (V2) is read-only and does not support DDL, so DDL/DML operations are routed
* through the V1 connector via `executeDml`. Only streaming reads use the V2 connector.
*/
class RemoveColumnMappingStreamingReadV2Suite
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public boolean isStreamingReadsEnabled(Optional<CatalogTable> catalogTable) {
}

/**
* Determines if catalog should return sparkV2 (SparkTable) or sparkV1 (DeltaTableV2) tables.
* Determines if catalog should return sparkV2 (DeltaV2Table) or sparkV1 (DeltaTableV2) tables.
*
* @return true if catalog should return sparkV2 tables
*/
Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala-shims/spark-4.0/SparkTableShims.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.sql.delta

import org.apache.spark.sql.connector.catalog.TableCapability

/** Shim to build [[SparkTable]] against different Spark versions. */
/** Shim to build [[DeltaV2Table]] against different Spark versions. */
object SparkTableShims {
// Capability [[TableCapability.AUTOMATIC_SCHEMA_EVOLUTION]] is not available in Spark 4.0.
val schemaEvolutionCapability: Option[TableCapability] = None
Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala-shims/spark-4.1/SparkTableShims.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.sql.delta

import org.apache.spark.sql.connector.catalog.TableCapability

/** Shim to build [[SparkTable]] against different Spark versions. */
/** Shim to build [[DeltaV2Table]] against different Spark versions. */
object SparkTableShims {
// Capability [[TableCapability.AUTOMATIC_SCHEMA_EVOLUTION]] is available in Spark 4.1, but
// schema evolution isn't properly supported yet in MERGE/INSERT there so ignore it.
Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala-shims/spark-4.2/SparkTableShims.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.delta
import org.apache.spark.sql.connector.catalog.TableCapability

/**
* Shim to build [[SparkTable]] against different Spark versions.
* Shim to build [[DeltaV2Table]] against different Spark versions.
* This is the shim for the latest version - Spark 4.2.
*/
object SparkTableShims {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3354,15 +3354,15 @@ trait DeltaSQLConfBase extends DeltaSQLConfUtils {
*
* Valid values:
* - NONE: sparkV2 connector is disabled, always use sparkV1 connector (DeltaTableV2) - default
* - AUTO: Automatically use sparkV2 connector (SparkTable) for Unity Catalog managed tables
* - AUTO: Automatically use sparkV2 connector (DeltaV2Table) for Unity Catalog managed tables
* in streaming queries and sparkV1 connector (DeltaTableV2) for all other tables
* - STRICT: sparkV2 connector is strictly enforced, always use sparkV2 connector (SparkTable).
* - STRICT: sparkV2 connector is strictly enforced, always use sparkV2 connector (DeltaV2Table).
* Intended for testing sparkV2 connector capabilities
*
* sparkV1 vs sparkV2 Connectors:
* - sparkV1 Connector (DeltaTableV2): Legacy Delta connector with full read/write support,
* uses DeltaLog for metadata management
* - sparkV2 Connector (SparkTable): New kernel-based connector with read-only support,
* - sparkV2 Connector (DeltaV2Table): New kernel-based connector with read-only support,
* uses Kernel's Table API for metadata management
*
* See [[org.apache.spark.sql.delta.DeltaV2Mode]] for the centralized logic that interprets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ trait DeltaSourceDeletionVectorTests extends StreamTest
/**
* Executes a DML SQL statement (DELETE, INSERT, etc.).
* Overridable so that V2 suites can route DML through the V1 connector,
* since SparkTable (V2) is read-only and does not support writes.
* since DeltaV2Table (V2) is read-only and does not support writes.
*/
protected def executeDml(sqlText: String): Unit = sql(sqlText)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ trait StreamingSchemaEvolutionSuiteBase extends ColumnMappingStreamingTestUtils

/**
* Executes a DDL/DML SQL statement. Overridable so that V2 suites can route it through the V1
* connector, since SparkTable (V2) is read-only and does not support writes/DDL.
* connector, since DeltaV2Table (V2) is read-only and does not support writes/DDL.
*/
protected def executeDml(sqlText: String): Unit = sql(sqlText)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType

/**
* In-memory DSv2 table used as a test stand-in for SparkTable (the Kernel-based Delta V2
* In-memory DSv2 table used as a test stand-in for DeltaV2Table (the Kernel-based Delta V2
* connector).
*
* Created by [[InMemoryDeltaCatalog]] when used as the session catalog in tests.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ trait TypeWideningStreamingSourceTestMixin

/**
* Executes a DDL/DML SQL statement. Overridable so that V2 suites can route it through the V1
* connector, since SparkTable (V2) is read-only and does not support writes/DDL.
* connector, since DeltaV2Table (V2) is read-only and does not support writes/DDL.
*/
protected def executeDml(sqlText: String): Unit = sql(sqlText)

Expand Down
Loading
Loading