-
Notifications
You must be signed in to change notification settings - Fork 2.1k
[KERNEL] Kernel fix clustering and add tests #6749
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 4 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -30,7 +30,7 @@ import io.delta.kernel.internal.{SnapshotImpl, TableConfig} | |||||||||
| import io.delta.kernel.internal.actions.DomainMetadata | ||||||||||
| import io.delta.kernel.internal.clustering.ClusteringMetadataDomain | ||||||||||
| import io.delta.kernel.internal.util.{ColumnMapping, ColumnMappingSuiteBase} | ||||||||||
| import io.delta.kernel.types.{ArrayType, CollationIdentifier, DecimalType, FieldMetadata, IntegerType, LongType, MapType, StringType, StructType, TypeChange} | ||||||||||
| import io.delta.kernel.types.{ArrayType, CollationIdentifier, DecimalType, FieldMetadata, GeographyType, GeometryType, IntegerType, LongType, MapType, StringType, StructType, TypeChange} | ||||||||||
| import io.delta.kernel.utils.CloseableIterable | ||||||||||
| import io.delta.kernel.utils.CloseableIterable.emptyIterable | ||||||||||
|
|
||||||||||
|
|
@@ -1963,4 +1963,138 @@ trait DeltaTableSchemaEvolutionSuiteBase extends AnyFunSuite with AbstractWriteU | |||||||||
| .fromMetadata(getMetadata(engine, tablePath)) | ||||||||||
| } | ||||||||||
|
|
||||||||||
| // Adding a geo column auto-enables GEOSPATIAL_RW_FEATURE and bumps protocol to (3, 7). | ||||||||||
| Seq( | ||||||||||
| ("geometry", GeometryType.ofDefault()), | ||||||||||
| ("geometry custom SRID", GeometryType.ofSRID("EPSG:4326")), | ||||||||||
| ("geography", GeographyType.ofDefault()), | ||||||||||
| ("geography custom algorithm", new GeographyType("OGC:CRS84", "vincenty"))) | ||||||||||
| .foreach { case (label, geoType) => | ||||||||||
| test(s"adding $label column auto-enables geospatial feature and upgrades protocol") { | ||||||||||
| withTempDirAndEngine { (tablePath, engine) => | ||||||||||
| val table = Table.forPath(engine, tablePath) | ||||||||||
| val initialSchema = new StructType() | ||||||||||
| .add("id", IntegerType.INTEGER, true) | ||||||||||
| .add("name", StringType.STRING, true) | ||||||||||
|
|
||||||||||
| createEmptyTable( | ||||||||||
| engine, | ||||||||||
| tablePath, | ||||||||||
| initialSchema, | ||||||||||
| tableProperties = Map(TableConfig.COLUMN_MAPPING_MODE.getKey -> "id")) | ||||||||||
|
|
||||||||||
| val before = table.getLatestSnapshot(engine).asInstanceOf[SnapshotImpl] | ||||||||||
| val featuresBefore = before.getProtocol.getImplicitlyAndExplicitlySupportedFeatures | ||||||||||
| assert( | ||||||||||
| !featuresBefore.contains(io.delta.kernel.internal.tablefeatures.TableFeatures | ||||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are several fully-qualified references in tests (e.g. |
||||||||||
| .GEOSPATIAL_RW_FEATURE), | ||||||||||
| s"geospatial should not be present before, got: $featuresBefore") | ||||||||||
|
|
||||||||||
| val currentSchema = before.getSchema | ||||||||||
| val newSchema = new StructType() | ||||||||||
| .add("id", IntegerType.INTEGER, true, currentSchema.get("id").getMetadata) | ||||||||||
| .add("name", StringType.STRING, true, currentSchema.get("name").getMetadata) | ||||||||||
| .add("geo", geoType, true, fieldMetadataForColumn(3, "geo")) | ||||||||||
|
|
||||||||||
| updateTableMetadata(engine, tablePath, newSchema) | ||||||||||
|
|
||||||||||
| val after = table.getLatestSnapshot(engine).asInstanceOf[SnapshotImpl] | ||||||||||
| assert(after.getSchema.get("geo").getDataType == geoType) | ||||||||||
|
|
||||||||||
| val featuresAfter = after.getProtocol.getImplicitlyAndExplicitlySupportedFeatures | ||||||||||
| assert( | ||||||||||
| featuresAfter.contains(io.delta.kernel.internal.tablefeatures.TableFeatures | ||||||||||
| .GEOSPATIAL_RW_FEATURE), | ||||||||||
| s"geospatial should be present after, got: $featuresAfter") | ||||||||||
| assert(after.getProtocol.getMinReaderVersion >= 3) | ||||||||||
| assert(after.getProtocol.getMinWriterVersion >= 7) | ||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
| } | ||||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| test("adding a geometry column inside a struct also auto-enables geospatial") { | ||||||||||
| withTempDirAndEngine { (tablePath, engine) => | ||||||||||
| val table = Table.forPath(engine, tablePath) | ||||||||||
| val initialSchema = new StructType() | ||||||||||
| .add("id", IntegerType.INTEGER, true) | ||||||||||
| .add( | ||||||||||
| "info", | ||||||||||
| new StructType().add("name", StringType.STRING, true), | ||||||||||
| true) | ||||||||||
|
|
||||||||||
| createEmptyTable( | ||||||||||
| engine, | ||||||||||
| tablePath, | ||||||||||
| initialSchema, | ||||||||||
| tableProperties = Map(TableConfig.COLUMN_MAPPING_MODE.getKey -> "id")) | ||||||||||
|
|
||||||||||
| val before = table.getLatestSnapshot(engine).asInstanceOf[SnapshotImpl] | ||||||||||
| val currentSchema = before.getSchema | ||||||||||
| val currentInner = currentSchema.get("info").getDataType.asInstanceOf[StructType] | ||||||||||
| val maxIdBefore = getMaxFieldId(engine, tablePath) | ||||||||||
|
|
||||||||||
| val newSchema = new StructType() | ||||||||||
| .add("id", IntegerType.INTEGER, true, currentSchema.get("id").getMetadata) | ||||||||||
| .add( | ||||||||||
| "info", | ||||||||||
| new StructType() | ||||||||||
| .add("name", StringType.STRING, true, currentInner.get("name").getMetadata) | ||||||||||
| .add( | ||||||||||
| "geom", | ||||||||||
| GeometryType.ofDefault(), | ||||||||||
| true, | ||||||||||
| fieldMetadataForColumn(maxIdBefore + 1, "geom")), | ||||||||||
| true, | ||||||||||
| currentSchema.get("info").getMetadata) | ||||||||||
|
|
||||||||||
| updateTableMetadata(engine, tablePath, newSchema) | ||||||||||
|
|
||||||||||
| val after = table.getLatestSnapshot(engine).asInstanceOf[SnapshotImpl] | ||||||||||
| val features = after.getProtocol.getImplicitlyAndExplicitlySupportedFeatures | ||||||||||
| assert( | ||||||||||
| features.contains(io.delta.kernel.internal.tablefeatures.TableFeatures | ||||||||||
| .GEOSPATIAL_RW_FEATURE), | ||||||||||
| s"geospatial should be present, got: $features") | ||||||||||
| assert(after.getProtocol.getMinReaderVersion >= 3) | ||||||||||
| assert(after.getProtocol.getMinWriterVersion >= 7) | ||||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| // SRID / algorithm changes are not in TypeWideningChecker, so schema evolution rejects them. | ||||||||||
| Seq( | ||||||||||
| ( | ||||||||||
| "change geometry SRID", | ||||||||||
| GeometryType.ofSRID("OGC:CRS84").asInstanceOf[io.delta.kernel.types.DataType], | ||||||||||
| GeometryType.ofSRID("EPSG:4326").asInstanceOf[io.delta.kernel.types.DataType]), | ||||||||||
| ( | ||||||||||
| "change geography algorithm", | ||||||||||
| new GeographyType("OGC:CRS84", "spherical").asInstanceOf[io.delta.kernel.types.DataType], | ||||||||||
| new GeographyType("OGC:CRS84", "vincenty").asInstanceOf[io.delta.kernel.types.DataType])) | ||||||||||
| .foreach { case (label, fromType, toType) => | ||||||||||
| test(s"$label is rejected by schema evolution") { | ||||||||||
| withTempDirAndEngine { (tablePath, engine) => | ||||||||||
| val table = Table.forPath(engine, tablePath) | ||||||||||
| val initialSchema = new StructType() | ||||||||||
| .add("id", IntegerType.INTEGER, true) | ||||||||||
| .add("geo", fromType, true) | ||||||||||
|
|
||||||||||
| createEmptyTable( | ||||||||||
| engine, | ||||||||||
| tablePath, | ||||||||||
| initialSchema, | ||||||||||
| tableProperties = Map(TableConfig.COLUMN_MAPPING_MODE.getKey -> "id")) | ||||||||||
|
|
||||||||||
| val currentSchema = table.getLatestSnapshot(engine).getSchema | ||||||||||
| val newSchema = new StructType() | ||||||||||
| .add("id", IntegerType.INTEGER, true, currentSchema.get("id").getMetadata) | ||||||||||
| .add("geo", toType, true, currentSchema.get("geo").getMetadata) | ||||||||||
|
|
||||||||||
| assertSchemaEvolutionFails[KernelException]( | ||||||||||
| table, | ||||||||||
| engine, | ||||||||||
| newSchema, | ||||||||||
| "Cannot change the type of existing field") | ||||||||||
| } | ||||||||||
| } | ||||||||||
| } | ||||||||||
| } | ||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we add test cover the case where schema evolution reject the GeometryType due to malformed GeographyType, e.g GeographyType("XyZ", "vincenty") ?