Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ trait DeltaV2SourceSchemaEvolutionSuiteBase extends V2ForceTest {
}
}

// TODO(#5319): Move tests to shouldPassTests as V2 schema tracking log support is implemented.
override protected def shouldPassTests: Set[String] = Set(
// ========== Schema log unit test ==========
"schema location not under checkpoint",
Expand All @@ -93,11 +92,7 @@ trait DeltaV2SourceSchemaEvolutionSuiteBase extends V2ForceTest {
"forward-compat: older version can read back newer JSON",

// ========== Schema log core ==========
"multiple delta source sharing same schema log is blocked"
)

override protected def shouldFailTests: Set[String] = Set(
// ========== Schema log core ==========
"multiple delta source sharing same schema log is blocked",
"schema log is applied",
"schema log initialization with additive schema changes",
"detect incompatible schema change while streaming",
Expand All @@ -124,6 +119,7 @@ trait DeltaV2SourceSchemaEvolutionSuiteBase extends V2ForceTest {
"schema evolution with Delta sink",
"latestOffset should not progress before schema evolved",
"unblock with sql conf",
"unblock with sql conf - nested struct",
"schema tracking interacting with unsafe escape flag",
"streaming with a column mapping upgrade",
"partition evolution"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ class RemoveColumnMappingStreamingReadV2Suite

override protected def executeDml(sqlText: String): Unit = executeInV1Mode(sqlText)

// Tests that run without schema tracking. These exercise non-additive column-mapping schema
// change detection, which is supported on the V2 connector.
override protected def shouldPassTests: Set[String] = Set(
// Tests that run without schema tracking.
"Upgrade, StartStreamRead, Downgrade, FailNonAdditiveChange",
"Upgrade, Downgrade, StartStreamRead, Success",
"StartStreamRead, Upgrade, Rename, Downgrade, FailNonAdditiveChange",
Expand All @@ -50,18 +49,12 @@ class RemoveColumnMappingStreamingReadV2Suite
"Upgrade, Drop, StartStreamRead, Downgrade, FailNonAdditiveChange",
"Upgrade, Drop, StartStreamRead, Downgrade, Upgrade, FailNonAdditiveChange",
"Upgrade, Rename, Downgrade, StartStreamRead, Success",
"Upgrade, Drop, Downgrade, StartStreamRead, Success"
)

// Tests that run with schema tracking enabled. The schema tracking log is not yet supported
// on the V2 connector.
override protected def shouldFailTests: Set[String] = Set(
// TODO(#5319): the three tests are not supported in v2 yet due to the gap of columnMapping
// check util.
"StartStreamRead, Upgrade, Downgrade, SuccessAndFailSchemaTracking",
"Upgrade, Drop, Downgrade, StartStreamRead, Success",
"Upgrade, Rename, Downgrade, StartStreamRead, Upgrade, SuccessAndFailSchemaTracking",
"Upgrade, Drop, Downgrade, StartStreamRead, Upgrade, SuccessAndFailSchemaTracking",
// TODO(#5319): Move these to shouldPassTests as V2 schema tracking log support is implemented.
"StartStreamRead, Upgrade, Downgrade, SuccessAndFailSchemaTracking",

// Tests that run with schema tracking enabled.
"StartStreamRead, Upgrade, Downgrade, SuccessAndFailSchemaTracking with schema tracking",
"Upgrade, StartStreamRead, Downgrade, FailNonAdditiveChange with schema tracking",
"Upgrade, Downgrade, StartStreamRead, Success with schema tracking",
Expand All @@ -88,6 +81,5 @@ class RemoveColumnMappingStreamingReadV2Suite
"Upgrade, Rename, Downgrade, StartStreamRead, Upgrade, SuccessAndFailSchemaTracking" +
" with schema tracking",
"Upgrade, Drop, Downgrade, StartStreamRead, Upgrade, SuccessAndFailSchemaTracking" +
" with schema tracking"
)
" with schema tracking")
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ import org.apache.spark.sql.delta.typewidening.{

/**
* Base trait for V2 type widening streaming source tests.
* Provides common shouldFail logic shared by both suites.
*
* The base lists every test from `TypeWideningStreamingSourceTests` as passing: V2 supports
* type-widening streaming reads. Subclasses move specific tests back to `shouldFailTests` when
* there is a concrete V2 limitation (e.g., partition-column schema bug, missing event logging).
*/
trait TypeWideningStreamingV2SourceSuiteBase extends V2ForceTest {
self: TypeWideningStreamingSourceTestMixin =>
Expand All @@ -34,12 +37,7 @@ trait TypeWideningStreamingV2SourceSuiteBase extends V2ForceTest {

override protected def executeDml(sqlText: String): Unit = executeInV1Mode(sqlText)

// TODO(#5319): Move tests to shouldPassTests as V2 schema tracking log support is implemented.
override protected def shouldPassTests: Set[String] = Set.empty[String]

// Tests from TypeWideningStreamingSourceTests, shared by both suites.
// Override in subclasses to add suite-specific tests.
override protected def shouldFailTests: Set[String] = Set(
override protected def shouldPassTests: Set[String] = Set(
"type change - filter",
"type change - projection",
"type change - projection partition column",
Expand All @@ -59,45 +57,34 @@ trait TypeWideningStreamingV2SourceSuiteBase extends V2ForceTest {
"arbitrary type changes are not supported",
"type change in delta source writing to a delta sink"
)
}

class TypeWideningStreamingV2SourceSuite
extends TypeWideningStreamingSourceSuite
with TypeWideningStreamingV2SourceSuiteBase {

// All tests pass without schema tracking enabled, except where noted in shouldFailTests.
override protected def shouldPassTests: Set[String] =
super.shouldFailTests -- shouldFailTests

// Failures that affect both the schema-tracking and non-schema-tracking suites.
override protected def shouldFailTests: Set[String] = Set(
// Delta log event is not supported in V2, so event-logging tests are not meaningful.
"schema changed event is logged for type widening",
"schema changed event is not logged when there are no schema changes",
// TODO(#5319): Partition column schema has a bug in V2 causing these to fail.
"type change - projection partition column",
"type change - widen aggregation expression partition column",
// TODO(#5319): V2 lacks the implementation of
// validateAndInitMetadataLogForPlannedBatchesDuringStreamStart, so the
// 2nd testStream restart does not throw on the incompatible type change.
"widening type change then restore back",
"narrowing type changes are not supported",
"arbitrary type changes are not supported"
"schema changed event is not logged when there are no schema changes"
)
}

class TypeWideningStreamingV2SourceSuite
extends TypeWideningStreamingSourceSuite
with TypeWideningStreamingV2SourceSuiteBase

class TypeWideningStreamingV2SourceSchemaTrackingSuite
extends TypeWideningStreamingSourceSchemaTrackingSuite
with TypeWideningStreamingV2SourceSuiteBase {

override protected def shouldFailTests: Set[String] = super.shouldFailTests ++ Set(
// Additional tests from TypeWideningStreamingSourceSchemaTrackingTests
"type change first without schemaTrackingLocation and unblock using schemaTrackingLocation",
"unblocking stream with sql conf after type change - unblock all",
"unblocking stream with sql conf after type change - unblock stream",
"unblocking stream with sql conf after type change - unblock version",
"unblocking stream with reader option after type change - unblock stream",
"unblocking stream with reader option after type change - unblock version",
"overwrite schema with type change and dropped column",
"disable schema tracking log using internal conf"
)
// Schema-tracking-specific tests from TypeWideningStreamingSourceSchemaTrackingTests, on top of
// the base type-widening tests inherited from the trait, minus tests with known V2 issues.
override protected def shouldPassTests: Set[String] =
super.shouldPassTests ++ Set(
"type change first without schemaTrackingLocation and unblock using schemaTrackingLocation",
"unblocking stream with sql conf after type change - unblock all",
"unblocking stream with sql conf after type change - unblock stream",
"unblocking stream with sql conf after type change - unblock version",
"unblocking stream with reader option after type change - unblock stream",
"unblocking stream with reader option after type change - unblock version",
"overwrite schema with type change and dropped column",
"disable schema tracking log using internal conf"
)
}
9 changes: 0 additions & 9 deletions spark/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -2685,15 +2685,6 @@
],
"sqlState" : "42KD4"
},
"DELTA_STREAMING_INCOMPATIBLE_SCHEMA_CHANGE_V2" : {
"message" : [
"Streaming read is not supported on tables with read-incompatible schema changes (e.g. rename or drop or datatype changes).",
"Non-additive schema change handling is not supported in Delta source v2 yet.",
"For further information and possible next steps to resolve this issue, please review the documentation at <docLink>",
"Read schema: <readSchema>. Incompatible data schema: <incompatibleSchema>."
],
"sqlState" : "42KD4"
},
"DELTA_STREAMING_INITIAL_SNAPSHOT_TOO_LARGE" : {
"message" : [
"Initial snapshot for Delta streaming at table '<tablePath>' (version <snapshotVersion>) contains <numFiles> files, which exceeds the maximum allowed <maxFiles> files.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3248,8 +3248,7 @@ trait DeltaErrorsBase
spark: SparkSession,
readSchema: StructType,
incompatibleSchema: StructType,
detectedDuringStreaming: Boolean,
isV2DataSource: Boolean = false): Throwable = {
detectedDuringStreaming: Boolean): Throwable = {
val docLink = "/versioning.html#column-mapping"
val enableNonAdditiveSchemaEvolution = spark.sessionState.conf.getConf(
DeltaSQLConf.DELTA_STREAMING_ENABLE_SCHEMA_TRACKING)
Expand All @@ -3259,8 +3258,7 @@ trait DeltaErrorsBase
generateDocsLinkOption(spark, docLink).getOrElse("-"),
enableNonAdditiveSchemaEvolution,
additionalProperties = Map(
"detectedDuringStreaming" -> detectedDuringStreaming.toString,
"isV2DataSource" -> isV2DataSource.toString
"detectedDuringStreaming" -> detectedDuringStreaming.toString
))
}

Expand Down Expand Up @@ -4443,9 +4441,7 @@ class DeltaStreamingNonAdditiveSchemaIncompatibleException(
val enableNonAdditiveSchemaEvolution: Boolean = false,
val additionalProperties: Map[String, String] = Map.empty)
extends DeltaUnsupportedOperationException(
errorClass = if (additionalProperties.getOrElse("isV2DataSource", "false") == "true") {
"DELTA_STREAMING_INCOMPATIBLE_SCHEMA_CHANGE_V2"
} else if (enableNonAdditiveSchemaEvolution) {
errorClass = if (enableNonAdditiveSchemaEvolution) {
"DELTA_STREAMING_INCOMPATIBLE_SCHEMA_CHANGE_USE_SCHEMA_LOG"
} else {
"DELTA_STREAMING_INCOMPATIBLE_SCHEMA_CHANGE"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ object PersistedMetadata {
def fromJson(json: String): PersistedMetadata = JsonUtils.fromJson[PersistedMetadata](json)

/**
* Builds a [[PersistedMetadata]] from V2 interop abstractions.
* Serializes an [[AbstractProtocol]] to the Spark [[Protocol]] JSON format used in
* [[PersistedMetadata.protocolJson]].
*
* Contract on [[AbstractProtocol]]: `readerFeatures` / `writerFeatures` must be consistent
* with the min protocol versions: `readerFeatures` may only be defined when
Expand All @@ -116,26 +117,32 @@ object PersistedMetadata {
* relies on this invariant; [[Protocol]] will throw a `require` failure if an implementation
* gets it wrong.
*/
def toProtocolJson(abstractProtocol: AbstractProtocol): String = {
Protocol(
abstractProtocol.minReaderVersion,
abstractProtocol.minWriterVersion
).copy(
readerFeatures = abstractProtocol.readerFeatures,
writerFeatures = abstractProtocol.writerFeatures
).json
}

/**
* Builds a [[PersistedMetadata]] from V2 interop abstractions.
*/
def apply(
tableId: String,
deltaCommitVersion: Long,
abstractMetadata: AbstractMetadata,
abstractProtocol: AbstractProtocol,
sourceMetadataPath: String): PersistedMetadata = {
val protocol = Protocol(
abstractProtocol.minReaderVersion,
abstractProtocol.minWriterVersion
).copy(
readerFeatures = abstractProtocol.readerFeatures,
writerFeatures = abstractProtocol.writerFeatures
)
PersistedMetadata(tableId, deltaCommitVersion,
abstractMetadata.schema.json, abstractMetadata.partitionSchema.json,
// The schema is bound to the specific source
sourceMetadataPath,
// Table configurations come from the Metadata action
Some(abstractMetadata.configuration),
Some(protocol.json)
Some(toProtocolJson(abstractProtocol))
)
}
}
Expand Down
Loading
Loading