Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,18 @@ public static List<Column> casePreservingEligibleClusterColumns(
.map(col -> ColumnMapping.getPhysicalColumnNameAndDataType(schema, col))
.collect(Collectors.toList());

// Geo types are skipping-eligible (bbox stats) but have no total order, so cannot cluster.
List<String> geoColumns =
physicalColumnsWithTypes.stream()
.filter(tuple -> tuple._2 instanceof GeometryType || tuple._2 instanceof GeographyType)
.map(tuple -> tuple._1.toString() + " : " + tuple._2)
.collect(Collectors.toList());

if (!geoColumns.isEmpty()) {
throw new KernelException(
format("Clustering is not supported on geometry/geography column(s): %s", geoColumns));
}

List<String> nonSkippingEligibleColumns =
physicalColumnsWithTypes.stream()
.filter(tuple -> !StatsSchemaHelper.isSkippingEligibleDataType(tuple._2))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@ package io.delta.kernel.defaults

import java.io.File

import scala.collection.immutable.Seq

import io.delta.golden.GoldenTableUtils.goldenTablePath
import io.delta.kernel.{Table, TableManager}
import io.delta.kernel.defaults.engine.DefaultEngine
import io.delta.kernel.defaults.utils.{TestRow, TestUtils, WriteUtils}
import io.delta.kernel.defaults.utils.{GeoTestUtils, TestRow, TestUtils, WriteUtils}
import io.delta.kernel.engine.Engine
import io.delta.kernel.exceptions.{CheckpointAlreadyExistsException, TableNotFoundException}
import io.delta.kernel.expressions.Literal
import io.delta.kernel.internal.SnapshotImpl
import io.delta.kernel.expressions.{Column, Literal}
import io.delta.kernel.internal.{SnapshotImpl, TableConfig}
import io.delta.kernel.internal.tablefeatures.TableFeatures.GEOSPATIAL_RW_FEATURE
import io.delta.kernel.statistics.DataFileStatistics
import io.delta.kernel.types.{GeometryType, StructType => KernelStructType}

import org.apache.spark.sql.delta.{DeltaLog, VersionNotFoundException}
import org.apache.spark.sql.delta.DeltaOperations.ManualUpdate
Expand All @@ -40,7 +45,7 @@ import org.scalatest.funsuite.AnyFunSuite
/**
* Test suite for `io.delta.kernel.Table.checkpoint(engine, version)`
*/
class CreateCheckpointSuite extends CheckpointBase {
class CreateCheckpointSuite extends CheckpointBase with GeoTestUtils {

///////////
// Tests //
Expand Down Expand Up @@ -469,6 +474,98 @@ class CreateCheckpointSuite extends CheckpointBase {
}
}

// Same fixture as GeometryDataSkippingSuite (4 quadrants + null-stats f4):
// y=10 +------+ +------+
// | f2 | | f1 |
// y=7 +------+ +------+
// (gap)
// y=3 +------+ +------+
// | f0 | | f3 |
// y=0 +------+ +------+
// x=0 x=3 x=7 x=10
private val geomCheckpointFileExtents: Seq[Option[(Double, Double, Double, Double)]] = Seq(
Some((0.0, 0.0, 3.0, 3.0)),
Some((7.0, 7.0, 10.0, 10.0)),
Some((0.0, 7.0, 3.0, 10.0)),
Some((7.0, 0.0, 10.0, 3.0)),
None)

private val geomColType = new GeometryType("OGC:CRS84")
private val geomCol = new Column("geom")

private def geoCheckpointStatsList: Seq[DataFileStatistics] =
geomCheckpointFileExtents.map {
case Some((minX, minY, maxX, maxY)) =>
geoStats(geomCol, minX, minY, maxX, maxY, geomColType)
case None => emptyStats()
}

// Counts the checkpoint manifest files at a version. Excludes sidecars
// (<version>.checkpoint.NNNN.NNNN.<UUID>.parquet) so both classic and V2 layouts return 1.
private def checkpointManifestCount(tablePath: String, checkpointVersion: Long): Int = {
val deltaLogDir = new java.io.File(tablePath, "_delta_log")
val versionPrefix = f"$checkpointVersion%020d.checkpoint"
deltaLogDir
.listFiles()
.count { f =>
val name = f.getName
name.startsWith(versionPrefix) &&
!name.matches(raw".*\.checkpoint\.\d+\.\d+\..*")
}
}

Seq(
("classic", Map.empty[String, String]),
("v2-typed", Map(TableConfig.CHECKPOINT_POLICY.getKey -> "v2"))).foreach {
case (label, tableProps) =>
test(s"data skipping survives $label checkpoint - geometry column") {
withTempDirAndEngine { (tablePath, engine) =>
val schema = new KernelStructType().add("geom", geomColType)
commitGeoStatsFiles(
tablePath,
engine,
schema,
geoCheckpointStatsList,
tableProperties = tableProps)

val checkpointVersion = (geomCheckpointFileExtents.length - 1).toLong
kernelCheckpoint(engine, tablePath, checkpointVersion)

val manifestCount = checkpointManifestCount(tablePath, checkpointVersion)
assert(
manifestCount == 1,
s"expected exactly 1 checkpoint manifest at v=$checkpointVersion, got $manifestCount")

// Inline delete instead of deleteDeltaFilesBefore: that helper goes through Spark,
// which rejects tables with the geospatial feature it doesn't recognize.
Seq.range(0L, checkpointVersion).foreach { v =>
val p = new Path(f"$tablePath/_delta_log/$v%020d.json")
p.getFileSystem(new Configuration()).delete(p, false)
}

val snapshot = latestSnapshot(tablePath)

assert(snapshot.getSchema.get("geom").getDataType == geomColType)

val features = snapshot.getProtocol.getImplicitlyAndExplicitlySupportedFeatures
assert(
features.contains(GEOSPATIAL_RW_FEATURE),
s"geospatial feature missing post-checkpoint: $features")

// null-stats f4 always falls through (+1 in every count).
assert(boxFilesHit(snapshot, geomCol, geomColType, 1.0, 1.0, 4.0, 4.0) == 2)
assert(boxFilesHit(snapshot, geomCol, geomColType, 8.0, 8.0, 11.0, 11.0) == 2)
assert(boxFilesHit(snapshot, geomCol, geomColType, 4.0, 4.0, 6.0, 6.0) == 1)
assert(boxFilesHit(snapshot, geomCol, geomColType, 0.0, 0.0, 11.0, 11.0) == 5)

assert(
collectScanFileRows(snapshot.getScanBuilder().build()).size ==
geomCheckpointFileExtents.length,
s"checkpoint dropped add actions: expected ${geomCheckpointFileExtents.length}")
}
}
}

test(
"log cleanup: checkpointProtection enabled prevents log cleanup, " +
"even snapshot is built as latest") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import io.delta.kernel.internal.actions.DomainMetadata
import io.delta.kernel.internal.clustering.ClusteringMetadataDomain
import io.delta.kernel.internal.util.ColumnMapping
import io.delta.kernel.internal.util.ColumnMapping.COLUMN_MAPPING_PHYSICAL_NAME_KEY
import io.delta.kernel.types.{MapType, StructType}
import io.delta.kernel.types.{GeographyType, GeometryType, MapType, StructType}
import io.delta.kernel.types.IntegerType.INTEGER
import io.delta.kernel.utils.CloseableIterable
import io.delta.kernel.utils.CloseableIterable.emptyIterable
Expand Down Expand Up @@ -119,6 +119,92 @@ trait DeltaTableClusteringSuiteBase extends AnyFunSuite with AbstractWriteUtils
}
}

Seq(
("geometry", GeometryType.ofDefault()),
("geography", GeographyType.ofDefault())).foreach { case (label, geoType) =>
test(s"build table txn: clustering on a $label column should be rejected") {
withTempDirAndEngine { (tablePath, engine) =>
val schema = new StructType()
.add("id", INTEGER)
.add("geo", geoType)
val ex = intercept[KernelException] {
getCreateTxn(
engine,
tablePath,
schema,
clusteringColsOpt = Some(List(new Column("geo"))))
}
assert(
ex.getMessage.contains("Clustering is not supported on geometry/geography column(s)"),
s"unexpected error message: ${ex.getMessage}")
assert(ex.getMessage.contains("geo"))
}
}

test(s"build table txn: clustering with mixed non-geo + $label column rejects only the geo") {
withTempDirAndEngine { (tablePath, engine) =>
val schema = new StructType()
.add("id", INTEGER)
.add("geo", geoType)
val ex = intercept[KernelException] {
getCreateTxn(
engine,
tablePath,
schema,
clusteringColsOpt = Some(List(new Column("id"), new Column("geo"))))
}
assert(
ex.getMessage.contains("Clustering is not supported on geometry/geography column(s)"),
s"unexpected error message: ${ex.getMessage}")
assert(ex.getMessage.contains("geo"))
assert(
!ex.getMessage.contains(" id "),
s"non-geo column should not be in the rejection list: ${ex.getMessage}")
}
}

test(s"update a non-clustered table: cannot enable clustering on a $label column") {
withTempDirAndEngine { (tablePath, engine) =>
val schema = new StructType()
.add("id", INTEGER)
.add("geo", geoType)
createEmptyTable(engine, tablePath, schema)
val ex = intercept[KernelException] {
updateTableMetadata(
engine,
tablePath,
clusteringColsOpt = Some(List(new Column("geo"))))
}
assert(
ex.getMessage.contains("Clustering is not supported on geometry/geography column(s)"),
s"unexpected error message: ${ex.getMessage}")
}
}

test(s"build table txn: clustering on a nested $label column should be rejected") {
withTempDirAndEngine { (tablePath, engine) =>
val schema = new StructType()
.add("id", INTEGER)
.add(
"parent",
new StructType().add("geo", geoType))
val ex = intercept[KernelException] {
getCreateTxn(
engine,
tablePath,
schema,
clusteringColsOpt = Some(List(new Column(Array("parent", "geo")))))
}
assert(
ex.getMessage.contains("Clustering is not supported on geometry/geography column(s)"),
s"unexpected error message: ${ex.getMessage}")
assert(
ex.getMessage.contains("parent") && ex.getMessage.contains("geo"),
s"error should name the nested column path: ${ex.getMessage}")
}
}
}

test("create a clustered table should succeed") {
withTempDirAndEngine { (tablePath, engine) =>
val commitResult = createEmptyTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ import io.delta.kernel.expressions.Column
import io.delta.kernel.internal.{SnapshotImpl, TableConfig}
import io.delta.kernel.internal.actions.DomainMetadata
import io.delta.kernel.internal.clustering.ClusteringMetadataDomain
import io.delta.kernel.internal.tablefeatures.TableFeatures.GEOSPATIAL_RW_FEATURE
import io.delta.kernel.internal.util.{ColumnMapping, ColumnMappingSuiteBase}
import io.delta.kernel.types.{ArrayType, CollationIdentifier, DecimalType, FieldMetadata, IntegerType, LongType, MapType, StringType, StructType, TypeChange}
import io.delta.kernel.types.{ArrayType, CollationIdentifier, DataType, DecimalType, FieldMetadata, GeographyType, GeometryType, IntegerType, LongType, MapType, StringType, StructType, TypeChange}
import io.delta.kernel.utils.CloseableIterable
import io.delta.kernel.utils.CloseableIterable.emptyIterable

Expand Down Expand Up @@ -1963,4 +1964,135 @@ trait DeltaTableSchemaEvolutionSuiteBase extends AnyFunSuite with AbstractWriteU
.fromMetadata(getMetadata(engine, tablePath))
}

// Adding a geo column auto-enables GEOSPATIAL_RW_FEATURE and bumps protocol to (3, 7).
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add test cover the case where schema evolution reject the GeometryType due to malformed GeographyType, e.g GeographyType("XyZ", "vincenty") ?

Seq(
("geometry", GeometryType.ofDefault()),
("geometry custom SRID", GeometryType.ofSRID("EPSG:4326")),
("geography", GeographyType.ofDefault()),
("geography custom algorithm", new GeographyType("OGC:CRS84", "vincenty")))
.foreach { case (label, geoType) =>
test(s"adding $label column auto-enables geospatial feature and upgrades protocol") {
withTempDirAndEngine { (tablePath, engine) =>
val table = Table.forPath(engine, tablePath)
val initialSchema = new StructType()
.add("id", IntegerType.INTEGER, true)
.add("name", StringType.STRING, true)

createEmptyTable(
engine,
tablePath,
initialSchema,
tableProperties = Map(TableConfig.COLUMN_MAPPING_MODE.getKey -> "id"))

val before = table.getLatestSnapshot(engine).asInstanceOf[SnapshotImpl]
val featuresBefore = before.getProtocol.getImplicitlyAndExplicitlySupportedFeatures
assert(
!featuresBefore.contains(GEOSPATIAL_RW_FEATURE),
s"geospatial should not be present before, got: $featuresBefore")

val currentSchema = before.getSchema
val newSchema = new StructType()
.add("id", IntegerType.INTEGER, true, currentSchema.get("id").getMetadata)
.add("name", StringType.STRING, true, currentSchema.get("name").getMetadata)
.add("geo", geoType, true, fieldMetadataForColumn(3, "geo"))

updateTableMetadata(engine, tablePath, newSchema)

val after = table.getLatestSnapshot(engine).asInstanceOf[SnapshotImpl]
assert(after.getSchema.get("geo").getDataType == geoType)

val featuresAfter = after.getProtocol.getImplicitlyAndExplicitlySupportedFeatures
assert(
featuresAfter.contains(GEOSPATIAL_RW_FEATURE),
s"geospatial should be present after, got: $featuresAfter")
assert(after.getProtocol.getMinReaderVersion >= 3)
assert(after.getProtocol.getMinWriterVersion >= 7)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert(after.getProtocol.getMinReaderVersion >= 3)
assert(after.getProtocol.getMinWriterVersion >= 7)
assert(after.getProtocol.getMinReaderVersion == 3)
assert(after.getProtocol.getMinWriterVersion == 7)

}
}
}

test("adding a geometry column inside a struct also auto-enables geospatial") {
withTempDirAndEngine { (tablePath, engine) =>
val table = Table.forPath(engine, tablePath)
val initialSchema = new StructType()
.add("id", IntegerType.INTEGER, true)
.add(
"info",
new StructType().add("name", StringType.STRING, true),
true)

createEmptyTable(
engine,
tablePath,
initialSchema,
tableProperties = Map(TableConfig.COLUMN_MAPPING_MODE.getKey -> "id"))

val before = table.getLatestSnapshot(engine).asInstanceOf[SnapshotImpl]
val currentSchema = before.getSchema
val currentInner = currentSchema.get("info").getDataType.asInstanceOf[StructType]
val maxIdBefore = getMaxFieldId(engine, tablePath)

val newSchema = new StructType()
.add("id", IntegerType.INTEGER, true, currentSchema.get("id").getMetadata)
.add(
"info",
new StructType()
.add("name", StringType.STRING, true, currentInner.get("name").getMetadata)
.add(
"geom",
GeometryType.ofDefault(),
true,
fieldMetadataForColumn(maxIdBefore + 1, "geom")),
true,
currentSchema.get("info").getMetadata)

updateTableMetadata(engine, tablePath, newSchema)

val after = table.getLatestSnapshot(engine).asInstanceOf[SnapshotImpl]
val features = after.getProtocol.getImplicitlyAndExplicitlySupportedFeatures
assert(
features.contains(GEOSPATIAL_RW_FEATURE),
s"geospatial should be present, got: $features")
assert(after.getProtocol.getMinReaderVersion >= 3)
assert(after.getProtocol.getMinWriterVersion >= 7)
}
}

// SRID / algorithm changes are not in TypeWideningChecker, so schema evolution rejects them.
Seq(
(
"change geometry SRID",
GeometryType.ofSRID("OGC:CRS84").asInstanceOf[DataType],
GeometryType.ofSRID("EPSG:4326").asInstanceOf[DataType]),
(
"change geography algorithm",
new GeographyType("OGC:CRS84", "spherical").asInstanceOf[DataType],
new GeographyType("OGC:CRS84", "vincenty").asInstanceOf[DataType]))
.foreach { case (label, fromType, toType) =>
test(s"$label is rejected by schema evolution") {
withTempDirAndEngine { (tablePath, engine) =>
val table = Table.forPath(engine, tablePath)
val initialSchema = new StructType()
.add("id", IntegerType.INTEGER, true)
.add("geo", fromType, true)

createEmptyTable(
engine,
tablePath,
initialSchema,
tableProperties = Map(TableConfig.COLUMN_MAPPING_MODE.getKey -> "id"))

val currentSchema = table.getLatestSnapshot(engine).getSchema
val newSchema = new StructType()
.add("id", IntegerType.INTEGER, true, currentSchema.get("id").getMetadata)
.add("geo", toType, true, currentSchema.get("geo").getMetadata)

assertSchemaEvolutionFails[KernelException](
table,
engine,
newSchema,
"Cannot change the type of existing field")
}
}
}
}
Loading
Loading