Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,41 @@

package io.delta.internal

import scala.jdk.OptionConverters._

import io.delta.spark.internal.v2.catalog.SparkTable
import io.delta.spark.internal.v2.read.MetadataEvolutionHandler
import io.delta.spark.internal.v2.read.cdc.CDCSchemaContext
import org.apache.spark.sql.delta.commands.cdc.CDCReader

import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
import org.apache.spark.sql.connector.catalog.Identifier

/**
* TODO(#5319): remove this class after Spark supports directly create table reflect cdc/trackingLog
* Plumbs read options into a V2 [[StreamingRelationV2]]'s [[SparkTable]] when those options
* change a property the table derives from them.
*/
class ApplyV2ReadOptions extends Rule[LogicalPlan] {

override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
case r @ StreamingRelationV2(_, _, sparkTable: SparkTable, extraOptions, _, _, Some(ident), _)
if CDCReader.isCDCRead(extraOptions)
&& !r.output.exists(a => CDCSchemaContext.isCDCColumn(a.name)) =>
val newTable = sparkTable.getCatalogTable.toScala match {
case Some(catalogTable) => new SparkTable(ident, catalogTable, extraOptions)
case None => new SparkTable(ident, sparkTable.getTablePath.toString, extraOptions)
case s @ StreamingRelationV2(_, _, table: SparkTable, extraOptions, _, _, _, _)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see a lot of overlap between cdc and schema evolution. Could you help reconcile them? We should only run schema augmentation once.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if (CDCReader.isCDCRead(extraOptions) &&
!s.output.exists(a => CDCSchemaContext.isCDCColumn(a.name))) ||
MetadataEvolutionHandler.shouldPropagateSchemaTrackingToTable(
extraOptions, table.getOptions) =>
val merged = new java.util.HashMap[String, String]()
merged.putAll(table.getOptions)
merged.putAll(extraOptions.asCaseSensitiveMap())
val rebuilt = if (table.getCatalogTable.isPresent) {
new SparkTable(table.getIdentifier, table.getCatalogTable.get, merged)
} else {
new SparkTable(table.getIdentifier, table.getTablePath.toString, merged)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we make sure table.getIdentifier is not null? same for getTablePath.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is requireNonNull in SparkTable's constructor for identifier and tablePath

}
StreamingRelationV2(
s.copy(
source = None,
sourceName = r.sourceName,
table = newTable,
extraOptions = extraOptions,
output = toAttributes(newTable.schema()),
catalog = r.catalog,
identifier = Some(ident),
table = rebuilt,
output = toAttributes(rebuilt.schema),
v1Relation = None)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,24 @@ import java.util.{HashMap => JHashMap}

import scala.jdk.CollectionConverters._

import io.delta.kernel.internal.SnapshotImpl
import io.delta.spark.internal.v2.catalog.SparkTable
import io.delta.spark.internal.v2.snapshot.PathBasedSnapshotManager
import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorClient
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTableType}
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
import org.apache.spark.sql.delta.DeltaOptions
import org.apache.spark.sql.delta.{DeltaLog, DeltaOptions}
import org.apache.spark.sql.delta.Relocated.StreamingRelation
import org.apache.spark.sql.delta.commands.cdc.CDCReader
import org.apache.spark.sql.delta.sources.{DeltaSQLConf, DeltaSourceUtils}
import org.apache.spark.sql.delta.sources.{DeltaSourceMetadataTrackingLog, DeltaSourceUtils, DeltaSQLConf, PersistedMetadata}
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap

class ApplyV2ReadOptionsSuite extends DeltaSQLCommandTest {
Expand Down Expand Up @@ -240,4 +242,268 @@ class ApplyV2ReadOptionsSuite extends DeltaSQLCommandTest {
}
}
}

// ---------------------------------------------------------------------------
// Rebuild StreamingRelationV2 if provided schema tracking log provided
// ---------------------------------------------------------------------------

/** The data-schema seeded into the tracking log by [[seedSchemaLogWithExtraColumn]]. */
private val seededFieldNames: Seq[String] = Seq("id", "extra")

private def buildStreamingRelationV2(
table: SparkTable, extraOptions: Map[String, String]): StreamingRelationV2 = {
StreamingRelationV2(
source = None,
sourceName = "delta",
table = table,
extraOptions = new CaseInsensitiveStringMap(extraOptions.asJava),
output = toAttributes(table.schema),
catalog = None,
identifier = Some(table.getIdentifier),
v1Relation = None)
}

/**
* Pre-seed the schema-tracking log at `schemaLogPath` with a 2-column schema
* (`id LONG, extra STRING`) that differs from the underlying snapshot's 1-column schema
*/
private def seedSchemaLogWithExtraColumn(tablePath: String, schemaLogPath: String): Unit = {
val deltaLog = DeltaLog.forTable(spark, tablePath)
val snapshotManager =
new PathBasedSnapshotManager(tablePath, deltaLog.newDeltaHadoopConf())
val tableId =
snapshotManager.loadLatestSnapshot.asInstanceOf[SnapshotImpl].getMetadata.getId
val trackingLog = DeltaSourceMetadataTrackingLog.create(
spark, schemaLogPath, tableId, tablePath, parameters = Map.empty[String, String])
val customSchemaJson =
"""{"type":"struct","fields":[
|{"name":"id","type":"long","nullable":true,"metadata":{}},
|{"name":"extra","type":"string","nullable":true,"metadata":{}}]}""".stripMargin
val emptyPartitionJson = """{"type":"struct","fields":[]}"""
val seededEntry = PersistedMetadata(
tableId,
deltaCommitVersion = 0L,
dataSchemaJson = customSchemaJson,
partitionSchemaJson = emptyPartitionJson,
sourceMetadataPath = tablePath + "/_delta_log/_streaming_metadata")
trackingLog.writeNewMetadata(seededEntry, replaceCurrent = false)
}

/** Asserts the table's schema matches the entry written by [[seedSchemaLogWithExtraColumn]]. */
private def assertSchemaMatchesSeededLogEntry(table: SparkTable): Unit = {
assert(table.schema.fieldNames.toSeq == seededFieldNames)
assert(table.schema.fields(1).dataType == StringType)
}

/**
* Build a catalog-backed SparkTable rooted at `tableLocationUri`. Mirrors the common production
* path through DeltaCatalog and is the default for tests that do not specifically distinguish
* between path-based and catalog-based construction.
*/
private def buildCatalogBasedSparkTable(
tableLocationUri: URI, options: JHashMap[String, String]): SparkTable = {
val catalogTable = createCatalogTable(tableLocationUri, ucManaged = false)
val identifier = Identifier.of(
catalogTable.identifier.database.toArray, catalogTable.identifier.table)
new SparkTable(identifier, catalogTable, options)
}

private def applyReadOptions(plan: LogicalPlan): LogicalPlan = {
new ApplyV2ReadOptions().apply(plan)
}

test("schema-tracking rebuild: path-based SparkTable picks up the persisted schema") {
withTempDir { tableDir =>
withTempDir { schemaLogDir =>
val tablePath = tableDir.getCanonicalPath
createDeltaTable(tablePath) // snapshot schema: id BIGINT
val schemaLogPath = schemaLogDir.getCanonicalPath
seedSchemaLogWithExtraColumn(tablePath, schemaLogPath)

val identifier = Identifier.of(Array("default"), "tbl")
val table = new SparkTable(identifier, tablePath)
assert(!table.getOptions.containsKey(DeltaOptions.SCHEMA_TRACKING_LOCATION))

val plan = buildStreamingRelationV2(
table, Map(DeltaOptions.SCHEMA_TRACKING_LOCATION -> schemaLogPath))
val result = applyReadOptions(plan)
assertV2(result)
val rebuiltTable = result.asInstanceOf[StreamingRelationV2].table.asInstanceOf[SparkTable]

assert(rebuiltTable ne table, "rebuild should produce a new SparkTable")
assert(rebuiltTable.getOptions.containsKey(DeltaOptions.SCHEMA_TRACKING_LOCATION))
assert(rebuiltTable.getOptions.get(DeltaOptions.SCHEMA_TRACKING_LOCATION) ==
schemaLogPath)
assert(!rebuiltTable.getCatalogTable.isPresent,
"path branch should not have catalogTable")
// Rebuilt schema is driven by the persisted entry, not the snapshot.
assertSchemaMatchesSeededLogEntry(rebuiltTable)
// And the rule's output is re-derived from that rebuilt schema.
assert(result.output.map(_.name) == seededFieldNames)

// Idempotent: re-applying the rule does not rebuild a second time.
val reappliedResult = applyReadOptions(result).asInstanceOf[StreamingRelationV2]
assert(reappliedResult.table eq rebuiltTable, "re-applying rule should not rebuild")
}
}
}

test("schema-tracking rebuild: catalog-based SparkTable picks up the persisted schema and " +
"keeps its CatalogTable") {
withTempDir { tableDir =>
withTempDir { schemaLogDir =>
val tablePath = tableDir.getCanonicalPath
createDeltaTable(tablePath)
val schemaLogPath = schemaLogDir.getCanonicalPath
seedSchemaLogWithExtraColumn(tablePath, schemaLogPath)

val table = buildCatalogBasedSparkTable(tableDir.toURI, new JHashMap[String, String]())
assert(!table.getOptions.containsKey(DeltaOptions.SCHEMA_TRACKING_LOCATION))
assert(table.getCatalogTable.isPresent)

val plan = buildStreamingRelationV2(
table, Map(DeltaOptions.SCHEMA_TRACKING_LOCATION -> schemaLogPath))
val result = applyReadOptions(plan)
assertV2(result)
val rebuiltTable = result.asInstanceOf[StreamingRelationV2].table.asInstanceOf[SparkTable]

assert(rebuiltTable.getOptions.containsKey(DeltaOptions.SCHEMA_TRACKING_LOCATION))
assert(rebuiltTable.getCatalogTable.isPresent,
"catalog branch should keep CatalogTable")
assertSchemaMatchesSeededLogEntry(rebuiltTable)
}
}
}

test("schema-tracking rebuild: triggered by SCHEMA_TRACKING_LOCATION_ALIAS option key") {
withTempDir { tableDir =>
withTempDir { schemaLogDir =>
val tablePath = tableDir.getCanonicalPath
createDeltaTable(tablePath)
val schemaLogPath = schemaLogDir.getCanonicalPath
seedSchemaLogWithExtraColumn(tablePath, schemaLogPath)

val table = buildCatalogBasedSparkTable(tableDir.toURI, new JHashMap[String, String]())

val plan = buildStreamingRelationV2(
table, Map(DeltaOptions.SCHEMA_TRACKING_LOCATION_ALIAS -> schemaLogPath))
val result = applyReadOptions(plan)
assertV2(result)
val rebuiltTable = result.asInstanceOf[StreamingRelationV2].table.asInstanceOf[SparkTable]

assert(rebuiltTable.getOptions.containsKey(DeltaOptions.SCHEMA_TRACKING_LOCATION_ALIAS))
assert(rebuiltTable.getOptions.get(DeltaOptions.SCHEMA_TRACKING_LOCATION_ALIAS) ==
schemaLogPath)
assertSchemaMatchesSeededLogEntry(rebuiltTable)
}
}
}

test("schema-tracking rebuild: skipped when extraOptions has no schema-tracking option") {
withTempDir { tableDir =>
val tablePath = tableDir.getCanonicalPath
createDeltaTable(tablePath)
val table = buildCatalogBasedSparkTable(tableDir.toURI, new JHashMap[String, String]())

val plan = buildStreamingRelationV2(table, Map.empty)
val result = applyReadOptions(plan)
assert(result eq plan, "no rebuild expected when schema-tracking option not present")
}
}

test("schema-tracking rebuild: skipped when SparkTable already carries the " +
"schema-tracking option") {
withTempDir { tableDir =>
withTempDir { schemaLogDir =>
val tablePath = tableDir.getCanonicalPath
createDeltaTable(tablePath)
val schemaLogPath = schemaLogDir.getCanonicalPath
val tableOptions = new JHashMap[String, String]()
tableOptions.put(DeltaOptions.SCHEMA_TRACKING_LOCATION, schemaLogPath)
val table = buildCatalogBasedSparkTable(tableDir.toURI, tableOptions)
assert(table.getOptions.containsKey(DeltaOptions.SCHEMA_TRACKING_LOCATION))

val plan = buildStreamingRelationV2(
table, Map(DeltaOptions.SCHEMA_TRACKING_LOCATION -> schemaLogPath))
val result = applyReadOptions(plan)
assert(result eq plan, "no rebuild expected when table already carries the option")
}
}
}

test("schema-tracking via V1 StreamingRelation: option propagates through V1 -> V2 conversion") {
// Counterpart to the V2 rebuild tests above: those start from StreamingRelationV2 and exercise
// the rebuild branch. This test starts from a V1 StreamingRelation carrying the schema-tracking
// option in dataSource.options, and verifies the V1 -> V2 conversion branch hands the option to
// the new SparkTable so its schema is driven by the persisted log entry.
withTempDir { tableDir =>
withTempDir { schemaLogDir =>
val tablePath = tableDir.getCanonicalPath
createDeltaTable(tablePath)
val schemaLogPath = schemaLogDir.getCanonicalPath
seedSchemaLogWithExtraColumn(tablePath, schemaLogPath)

val catalogTable = createCatalogTable(tableDir.toURI, ucManaged = false)
val dataSource = DataSource(
sparkSession = spark,
userSpecifiedSchema = None,
className = "delta",
options = Map(
"path" -> tablePath,
DeltaOptions.SCHEMA_TRACKING_LOCATION -> schemaLogPath),
catalogTable = Some(catalogTable))
val plan = StreamingRelation(dataSource)

// STRICT mode forces V1 -> V2 conversion in ApplyV2Streaming.
withSQLConf(DeltaSQLConf.V2_ENABLE_MODE.key -> "STRICT") {
val result = applyRules(plan)
assertV2(result)
val convertedTable =
result.asInstanceOf[StreamingRelationV2].table.asInstanceOf[SparkTable]
assert(convertedTable.getOptions.containsKey(DeltaOptions.SCHEMA_TRACKING_LOCATION))
assert(convertedTable.getOptions.get(DeltaOptions.SCHEMA_TRACKING_LOCATION) ==
schemaLogPath)
// Schema is driven by the seeded log entry, not the underlying snapshot.
assertSchemaMatchesSeededLogEntry(convertedTable)
assert(result.output.map(_.name) == seededFieldNames)
}
}
}
}

test("CDC + schema-tracking co-occurrence: single rebuild applies both transformations") {
withTempDir { tableDir =>
withTempDir { schemaLogDir =>
val tablePath = tableDir.getCanonicalPath
createDeltaTable(tablePath)
val schemaLogPath = schemaLogDir.getCanonicalPath
seedSchemaLogWithExtraColumn(tablePath, schemaLogPath)

val table = buildCatalogBasedSparkTable(tableDir.toURI, new JHashMap[String, String]())
val plan = buildStreamingRelationV2(table, Map(
DeltaOptions.SCHEMA_TRACKING_LOCATION -> schemaLogPath,
DeltaOptions.CDC_READ_OPTION -> "true"))

val result = applyReadOptions(plan)
assertV2(result)
val rebuilt = result.asInstanceOf[StreamingRelationV2].table.asInstanceOf[SparkTable]

// Both options land on the rebuilt table in a single pass.
assert(rebuilt.getOptions.get(DeltaOptions.SCHEMA_TRACKING_LOCATION) == schemaLogPath)
assert(rebuilt.getOptions.get(DeltaOptions.CDC_READ_OPTION) == "true")
// Schema reflects both transformations: seeded log columns + appended CDC columns.
val expected = Seq(
"id", "extra",
CDCReader.CDC_TYPE_COLUMN_NAME,
CDCReader.CDC_COMMIT_VERSION,
CDCReader.CDC_COMMIT_TIMESTAMP)
assert(result.output.map(_.name) == expected,
s"Expected $expected, got: ${result.output.map(_.name)}")

// Idempotent: a second pass leaves the rebuilt table in place.
val reapplied = applyReadOptions(result).asInstanceOf[StreamingRelationV2]
assert(reapplied.table eq rebuilt, "re-applying rule should not rebuild")
}
}
}
}
Loading
Loading