diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/SchemaUtils.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/SchemaUtils.java index 0e500265c67..6d916775d53 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/SchemaUtils.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/SchemaUtils.java @@ -293,6 +293,18 @@ public static List casePreservingEligibleClusterColumns( .map(col -> ColumnMapping.getPhysicalColumnNameAndDataType(schema, col)) .collect(Collectors.toList()); + // Geo types are skipping-eligible (bbox stats) but have no total order, so cannot cluster. + List geoColumns = + physicalColumnsWithTypes.stream() + .filter(tuple -> tuple._2 instanceof GeometryType || tuple._2 instanceof GeographyType) + .map(tuple -> tuple._1.toString() + " : " + tuple._2) + .collect(Collectors.toList()); + + if (!geoColumns.isEmpty()) { + throw new KernelException( + format("Clustering is not supported on geometry/geography column(s): %s", geoColumns)); + } + List nonSkippingEligibleColumns = physicalColumnsWithTypes.stream() .filter(tuple -> !StatsSchemaHelper.isSkippingEligibleDataType(tuple._2)) diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/CreateCheckpointSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/CreateCheckpointSuite.scala index ce778beb260..462b5d4ffbb 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/CreateCheckpointSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/CreateCheckpointSuite.scala @@ -17,14 +17,19 @@ package io.delta.kernel.defaults import java.io.File +import scala.collection.immutable.Seq + import io.delta.golden.GoldenTableUtils.goldenTablePath import io.delta.kernel.{Table, TableManager} import io.delta.kernel.defaults.engine.DefaultEngine -import io.delta.kernel.defaults.utils.{TestRow, TestUtils, WriteUtils} +import io.delta.kernel.defaults.utils.{GeoTestUtils, TestRow, TestUtils, WriteUtils} import io.delta.kernel.engine.Engine import io.delta.kernel.exceptions.{CheckpointAlreadyExistsException, TableNotFoundException} -import io.delta.kernel.expressions.Literal -import io.delta.kernel.internal.SnapshotImpl +import io.delta.kernel.expressions.{Column, Literal} +import io.delta.kernel.internal.{SnapshotImpl, TableConfig} +import io.delta.kernel.internal.tablefeatures.TableFeatures.GEOSPATIAL_RW_FEATURE +import io.delta.kernel.statistics.DataFileStatistics +import io.delta.kernel.types.{GeometryType, StructType => KernelStructType} import org.apache.spark.sql.delta.{DeltaLog, VersionNotFoundException} import org.apache.spark.sql.delta.DeltaOperations.ManualUpdate @@ -40,7 +45,7 @@ import org.scalatest.funsuite.AnyFunSuite /** * Test suite for `io.delta.kernel.Table.checkpoint(engine, version)` */ -class CreateCheckpointSuite extends CheckpointBase { +class CreateCheckpointSuite extends CheckpointBase with GeoTestUtils { /////////// // Tests // @@ -469,6 +474,98 @@ class CreateCheckpointSuite extends CheckpointBase { } } + // Same fixture as GeometryDataSkippingSuite (4 quadrants + null-stats f4): + // y=10 +------+ +------+ + // | f2 | | f1 | + // y=7 +------+ +------+ + // (gap) + // y=3 +------+ +------+ + // | f0 | | f3 | + // y=0 +------+ +------+ + // x=0 x=3 x=7 x=10 + private val geomCheckpointFileExtents: Seq[Option[(Double, Double, Double, Double)]] = Seq( + Some((0.0, 0.0, 3.0, 3.0)), + Some((7.0, 7.0, 10.0, 10.0)), + Some((0.0, 7.0, 3.0, 10.0)), + Some((7.0, 0.0, 10.0, 3.0)), + None) + + private val geomColType = new GeometryType("OGC:CRS84") + private val geomCol = new Column("geom") + + private def geoCheckpointStatsList: Seq[DataFileStatistics] = + geomCheckpointFileExtents.map { + case Some((minX, minY, maxX, maxY)) => + geoStats(geomCol, minX, minY, maxX, maxY, geomColType) + case None => emptyStats() + } + + // Counts the checkpoint manifest files at a version. Excludes sidecars + // (.checkpoint.NNNN.NNNN..parquet) so both classic and V2 layouts return 1. + private def checkpointManifestCount(tablePath: String, checkpointVersion: Long): Int = { + val deltaLogDir = new java.io.File(tablePath, "_delta_log") + val versionPrefix = f"$checkpointVersion%020d.checkpoint" + deltaLogDir + .listFiles() + .count { f => + val name = f.getName + name.startsWith(versionPrefix) && + !name.matches(raw".*\.checkpoint\.\d+\.\d+\..*") + } + } + + Seq( + ("classic", Map.empty[String, String]), + ("v2-typed", Map(TableConfig.CHECKPOINT_POLICY.getKey -> "v2"))).foreach { + case (label, tableProps) => + test(s"data skipping survives $label checkpoint - geometry column") { + withTempDirAndEngine { (tablePath, engine) => + val schema = new KernelStructType().add("geom", geomColType) + commitGeoStatsFiles( + tablePath, + engine, + schema, + geoCheckpointStatsList, + tableProperties = tableProps) + + val checkpointVersion = (geomCheckpointFileExtents.length - 1).toLong + kernelCheckpoint(engine, tablePath, checkpointVersion) + + val manifestCount = checkpointManifestCount(tablePath, checkpointVersion) + assert( + manifestCount == 1, + s"expected exactly 1 checkpoint manifest at v=$checkpointVersion, got $manifestCount") + + // Inline delete instead of deleteDeltaFilesBefore: that helper goes through Spark, + // which rejects tables with the geospatial feature it doesn't recognize. + Seq.range(0L, checkpointVersion).foreach { v => + val p = new Path(f"$tablePath/_delta_log/$v%020d.json") + p.getFileSystem(new Configuration()).delete(p, false) + } + + val snapshot = latestSnapshot(tablePath) + + assert(snapshot.getSchema.get("geom").getDataType == geomColType) + + val features = snapshot.getProtocol.getImplicitlyAndExplicitlySupportedFeatures + assert( + features.contains(GEOSPATIAL_RW_FEATURE), + s"geospatial feature missing post-checkpoint: $features") + + // null-stats f4 always falls through (+1 in every count). + assert(boxFilesHit(snapshot, geomCol, geomColType, 1.0, 1.0, 4.0, 4.0) == 2) + assert(boxFilesHit(snapshot, geomCol, geomColType, 8.0, 8.0, 11.0, 11.0) == 2) + assert(boxFilesHit(snapshot, geomCol, geomColType, 4.0, 4.0, 6.0, 6.0) == 1) + assert(boxFilesHit(snapshot, geomCol, geomColType, 0.0, 0.0, 11.0, 11.0) == 5) + + assert( + collectScanFileRows(snapshot.getScanBuilder().build()).size == + geomCheckpointFileExtents.length, + s"checkpoint dropped add actions: expected ${geomCheckpointFileExtents.length}") + } + } + } + test( "log cleanup: checkpointProtection enabled prevents log cleanup, " + "even snapshot is built as latest") { diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableClusteringSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableClusteringSuite.scala index db143108d10..cdf77ff0cc9 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableClusteringSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableClusteringSuite.scala @@ -31,7 +31,7 @@ import io.delta.kernel.internal.actions.DomainMetadata import io.delta.kernel.internal.clustering.ClusteringMetadataDomain import io.delta.kernel.internal.util.ColumnMapping import io.delta.kernel.internal.util.ColumnMapping.COLUMN_MAPPING_PHYSICAL_NAME_KEY -import io.delta.kernel.types.{MapType, StructType} +import io.delta.kernel.types.{GeographyType, GeometryType, MapType, StructType} import io.delta.kernel.types.IntegerType.INTEGER import io.delta.kernel.utils.CloseableIterable import io.delta.kernel.utils.CloseableIterable.emptyIterable @@ -119,6 +119,92 @@ trait DeltaTableClusteringSuiteBase extends AnyFunSuite with AbstractWriteUtils } } + Seq( + ("geometry", GeometryType.ofDefault()), + ("geography", GeographyType.ofDefault())).foreach { case (label, geoType) => + test(s"build table txn: clustering on a $label column should be rejected") { + withTempDirAndEngine { (tablePath, engine) => + val schema = new StructType() + .add("id", INTEGER) + .add("geo", geoType) + val ex = intercept[KernelException] { + getCreateTxn( + engine, + tablePath, + schema, + clusteringColsOpt = Some(List(new Column("geo")))) + } + assert( + ex.getMessage.contains("Clustering is not supported on geometry/geography column(s)"), + s"unexpected error message: ${ex.getMessage}") + assert(ex.getMessage.contains("geo")) + } + } + + test(s"build table txn: clustering with mixed non-geo + $label column rejects only the geo") { + withTempDirAndEngine { (tablePath, engine) => + val schema = new StructType() + .add("id", INTEGER) + .add("geo", geoType) + val ex = intercept[KernelException] { + getCreateTxn( + engine, + tablePath, + schema, + clusteringColsOpt = Some(List(new Column("id"), new Column("geo")))) + } + assert( + ex.getMessage.contains("Clustering is not supported on geometry/geography column(s)"), + s"unexpected error message: ${ex.getMessage}") + assert(ex.getMessage.contains("geo")) + assert( + !ex.getMessage.contains(" id "), + s"non-geo column should not be in the rejection list: ${ex.getMessage}") + } + } + + test(s"update a non-clustered table: cannot enable clustering on a $label column") { + withTempDirAndEngine { (tablePath, engine) => + val schema = new StructType() + .add("id", INTEGER) + .add("geo", geoType) + createEmptyTable(engine, tablePath, schema) + val ex = intercept[KernelException] { + updateTableMetadata( + engine, + tablePath, + clusteringColsOpt = Some(List(new Column("geo")))) + } + assert( + ex.getMessage.contains("Clustering is not supported on geometry/geography column(s)"), + s"unexpected error message: ${ex.getMessage}") + } + } + + test(s"build table txn: clustering on a nested $label column should be rejected") { + withTempDirAndEngine { (tablePath, engine) => + val schema = new StructType() + .add("id", INTEGER) + .add( + "parent", + new StructType().add("geo", geoType)) + val ex = intercept[KernelException] { + getCreateTxn( + engine, + tablePath, + schema, + clusteringColsOpt = Some(List(new Column(Array("parent", "geo"))))) + } + assert( + ex.getMessage.contains("Clustering is not supported on geometry/geography column(s)"), + s"unexpected error message: ${ex.getMessage}") + assert( + ex.getMessage.contains("parent") && ex.getMessage.contains("geo"), + s"error should name the nested column path: ${ex.getMessage}") + } + } + } + test("create a clustered table should succeed") { withTempDirAndEngine { (tablePath, engine) => val commitResult = createEmptyTable( diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableSchemaEvolutionSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableSchemaEvolutionSuite.scala index 21cb3e86138..1ce066f63da 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableSchemaEvolutionSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableSchemaEvolutionSuite.scala @@ -29,8 +29,9 @@ import io.delta.kernel.expressions.Column import io.delta.kernel.internal.{SnapshotImpl, TableConfig} import io.delta.kernel.internal.actions.DomainMetadata import io.delta.kernel.internal.clustering.ClusteringMetadataDomain +import io.delta.kernel.internal.tablefeatures.TableFeatures.GEOSPATIAL_RW_FEATURE import io.delta.kernel.internal.util.{ColumnMapping, ColumnMappingSuiteBase} -import io.delta.kernel.types.{ArrayType, CollationIdentifier, DecimalType, FieldMetadata, IntegerType, LongType, MapType, StringType, StructType, TypeChange} +import io.delta.kernel.types.{ArrayType, CollationIdentifier, DataType, DecimalType, FieldMetadata, GeographyType, GeometryType, IntegerType, LongType, MapType, StringType, StructType, TypeChange} import io.delta.kernel.utils.CloseableIterable import io.delta.kernel.utils.CloseableIterable.emptyIterable @@ -1963,4 +1964,135 @@ trait DeltaTableSchemaEvolutionSuiteBase extends AnyFunSuite with AbstractWriteU .fromMetadata(getMetadata(engine, tablePath)) } + // Adding a geo column auto-enables GEOSPATIAL_RW_FEATURE and bumps protocol to (3, 7). + Seq( + ("geometry", GeometryType.ofDefault()), + ("geometry custom SRID", GeometryType.ofSRID("EPSG:4326")), + ("geography", GeographyType.ofDefault()), + ("geography custom algorithm", new GeographyType("OGC:CRS84", "vincenty"))) + .foreach { case (label, geoType) => + test(s"adding $label column auto-enables geospatial feature and upgrades protocol") { + withTempDirAndEngine { (tablePath, engine) => + val table = Table.forPath(engine, tablePath) + val initialSchema = new StructType() + .add("id", IntegerType.INTEGER, true) + .add("name", StringType.STRING, true) + + createEmptyTable( + engine, + tablePath, + initialSchema, + tableProperties = Map(TableConfig.COLUMN_MAPPING_MODE.getKey -> "id")) + + val before = table.getLatestSnapshot(engine).asInstanceOf[SnapshotImpl] + val featuresBefore = before.getProtocol.getImplicitlyAndExplicitlySupportedFeatures + assert( + !featuresBefore.contains(GEOSPATIAL_RW_FEATURE), + s"geospatial should not be present before, got: $featuresBefore") + + val currentSchema = before.getSchema + val newSchema = new StructType() + .add("id", IntegerType.INTEGER, true, currentSchema.get("id").getMetadata) + .add("name", StringType.STRING, true, currentSchema.get("name").getMetadata) + .add("geo", geoType, true, fieldMetadataForColumn(3, "geo")) + + updateTableMetadata(engine, tablePath, newSchema) + + val after = table.getLatestSnapshot(engine).asInstanceOf[SnapshotImpl] + assert(after.getSchema.get("geo").getDataType == geoType) + + val featuresAfter = after.getProtocol.getImplicitlyAndExplicitlySupportedFeatures + assert( + featuresAfter.contains(GEOSPATIAL_RW_FEATURE), + s"geospatial should be present after, got: $featuresAfter") + assert(after.getProtocol.getMinReaderVersion == 3) + assert(after.getProtocol.getMinWriterVersion == 7) + } + } + } + + test("adding a geometry column inside a struct also auto-enables geospatial") { + withTempDirAndEngine { (tablePath, engine) => + val table = Table.forPath(engine, tablePath) + val initialSchema = new StructType() + .add("id", IntegerType.INTEGER, true) + .add( + "info", + new StructType().add("name", StringType.STRING, true), + true) + + createEmptyTable( + engine, + tablePath, + initialSchema, + tableProperties = Map(TableConfig.COLUMN_MAPPING_MODE.getKey -> "id")) + + val before = table.getLatestSnapshot(engine).asInstanceOf[SnapshotImpl] + val currentSchema = before.getSchema + val currentInner = currentSchema.get("info").getDataType.asInstanceOf[StructType] + val maxIdBefore = getMaxFieldId(engine, tablePath) + + val newSchema = new StructType() + .add("id", IntegerType.INTEGER, true, currentSchema.get("id").getMetadata) + .add( + "info", + new StructType() + .add("name", StringType.STRING, true, currentInner.get("name").getMetadata) + .add( + "geom", + GeometryType.ofDefault(), + true, + fieldMetadataForColumn(maxIdBefore + 1, "geom")), + true, + currentSchema.get("info").getMetadata) + + updateTableMetadata(engine, tablePath, newSchema) + + val after = table.getLatestSnapshot(engine).asInstanceOf[SnapshotImpl] + val features = after.getProtocol.getImplicitlyAndExplicitlySupportedFeatures + assert( + features.contains(GEOSPATIAL_RW_FEATURE), + s"geospatial should be present, got: $features") + assert(after.getProtocol.getMinReaderVersion == 3) + assert(after.getProtocol.getMinWriterVersion == 7) + } + } + + // SRID / algorithm changes are not in TypeWideningChecker, so schema evolution rejects them. + Seq( + ( + "change geometry SRID", + GeometryType.ofSRID("OGC:CRS84").asInstanceOf[DataType], + GeometryType.ofSRID("EPSG:4326").asInstanceOf[DataType]), + ( + "change geography algorithm", + new GeographyType("OGC:CRS84", "spherical").asInstanceOf[DataType], + new GeographyType("OGC:CRS84", "vincenty").asInstanceOf[DataType])) + .foreach { case (label, fromType, toType) => + test(s"$label is rejected by schema evolution") { + withTempDirAndEngine { (tablePath, engine) => + val table = Table.forPath(engine, tablePath) + val initialSchema = new StructType() + .add("id", IntegerType.INTEGER, true) + .add("geo", fromType, true) + + createEmptyTable( + engine, + tablePath, + initialSchema, + tableProperties = Map(TableConfig.COLUMN_MAPPING_MODE.getKey -> "id")) + + val currentSchema = table.getLatestSnapshot(engine).getSchema + val newSchema = new StructType() + .add("id", IntegerType.INTEGER, true, currentSchema.get("id").getMetadata) + .add("geo", toType, true, currentSchema.get("geo").getMetadata) + + assertSchemaEvolutionFails[KernelException]( + table, + engine, + newSchema, + "Cannot change the type of existing field") + } + } + } } diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala index 89475a850c5..c0429f63161 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala @@ -30,7 +30,7 @@ import io.delta.kernel.defaults.internal.data.DefaultColumnarBatch import io.delta.kernel.defaults.internal.data.vector.DefaultGenericVector import io.delta.kernel.defaults.internal.data.vector.DefaultStructVector import io.delta.kernel.defaults.internal.parquet.ParquetSuiteBase -import io.delta.kernel.defaults.utils.{AbstractWriteUtils, TestRow, WriteUtils} +import io.delta.kernel.defaults.utils.{AbstractWriteUtils, GeoTestUtils, TestRow, WriteUtils} import io.delta.kernel.engine.Engine import io.delta.kernel.exceptions._ import io.delta.kernel.expressions.{Column, Literal} @@ -39,6 +39,7 @@ import io.delta.kernel.internal.{ScanImpl, SnapshotImpl, TableConfig} import io.delta.kernel.internal.checkpoints.CheckpointerSuite.selectSingleElement import io.delta.kernel.internal.data.GenericRow import io.delta.kernel.internal.table.SnapshotBuilderImpl +import io.delta.kernel.internal.tablefeatures.TableFeatures.GEOSPATIAL_RW_FEATURE import io.delta.kernel.internal.types.DataTypeJsonSerDe import io.delta.kernel.internal.util.{Clock, JsonUtils} import io.delta.kernel.internal.util.SchemaUtils.casePreservingPartitionColNames @@ -67,7 +68,7 @@ class DeltaTableWritesSuite extends AbstractDeltaTableWritesSuite with WriteUtil /** Transaction commit in this suite IS REQUIRED TO use commitTransaction than .commit */ abstract class AbstractDeltaTableWritesSuite extends AnyFunSuite with AbstractWriteUtils - with ParquetSuiteBase { + with GeoTestUtils with ParquetSuiteBase { /////////////////////////////////////////////////////////////////////////// // Create table tests @@ -374,6 +375,27 @@ abstract class AbstractDeltaTableWritesSuite extends AnyFunSuite with AbstractWr } } + Seq( + ("geometry", GeometryType.ofDefault(), "Geometry(srid=OGC:CRS84)"), + ("geography", GeographyType.ofDefault(), "Geography(srid=OGC:CRS84, algorithm=spherical)")) + .foreach { case (label, geoType, typeStr) => + test(s"create partitioned table - $label partition column is rejected") { + withTempDirAndEngine { (tablePath, engine) => + val schema = new StructType() + .add("id", INTEGER) + .add("geo", geoType) + + val ex = intercept[KernelException] { + getCreateTxn(engine, tablePath, schema = schema, partCols = Seq("geo")) + } + assert( + ex.getMessage.contains( + s"Kernel doesn't support writing data with partition column (geo) of type: $typeStr"), + s"unexpected error message: ${ex.getMessage}") + } + } + } + test("create a partitioned table") { withTempDirAndEngine { (tablePath, engine) => val schema = new StructType() @@ -2096,4 +2118,100 @@ abstract class AbstractDeltaTableWritesSuite extends AnyFunSuite with AbstractWr } newStructType } + + // Reads (id INT, geo ) rows; Seq[Byte] avoids Array reference-equality surprises. + private def readGeoTable(tablePath: String): Seq[(Int, Option[Seq[Byte]])] = { + val schema = latestSnapshot(tablePath).getSchema + val out = scala.collection.mutable.ArrayBuffer.empty[(Int, Option[Seq[Byte]])] + readTableUsingKernel(defaultEngine, tablePath, schema).foreach { filteredBatch => + val batch = filteredBatch.getData + val idIdx = batch.getSchema.indexOf("id") + val geoIdx = batch.getSchema.indexOf("geo") + val idCol = batch.getColumnVector(idIdx) + val geoCol = batch.getColumnVector(geoIdx) + val sel = filteredBatch.getSelectionVector + (0 until batch.getSize).foreach { rowId => + val included = !sel.isPresent || + (!sel.get().isNullAt(rowId) && sel.get().getBoolean(rowId)) + if (included) { + val id = idCol.getInt(rowId) + val geo = + if (geoCol.isNullAt(rowId)) None else Some(geoCol.getBinary(rowId).toSeq) + out.append((id, geo)) + } + } + } + out.toSeq + } + + private def insertGeoBatch( + tablePath: String, + schema: StructType, + rows: Seq[(Int, Option[Array[Byte]])], + isNewTable: Boolean): TransactionCommitResult = { + val ids = rows.map(_._1) + val geos = rows.map(_._2) + val geoFieldType = schema.get("geo").getDataType + val batch = new DefaultColumnarBatch( + ids.length, + schema, + Array(intColumnVector(ids), geoColumnVector(geoFieldType, geos))) + val data = + Seq(Map.empty[String, Literal] -> Seq(new FilteredColumnarBatch(batch, Optional.empty()))) + appendData( + defaultEngine, + tablePath, + isNewTable = isNewTable, + schema = if (isNewTable) schema else null, + data = data) + } + + Seq( + ("geometry default SRID", GeometryType.ofDefault()), + ("geometry custom SRID", GeometryType.ofSRID("EPSG:4326")), + ("geography default", GeographyType.ofDefault()), + ("geography custom algorithm", new GeographyType("OGC:CRS84", "vincenty"))) + .foreach { case (label, geoType) => + test(s"create + insert + read roundtrip - $label") { + withTempDirAndEngine { (tablePath, engine) => + val schema = new StructType() + .add("id", INTEGER) + .add("geo", geoType) + + val rowsBatch1 = Seq[(Int, Option[Array[Byte]])]( + (1, Some(pointWkb(1.0, 2.0))), + (2, None), + (3, Some(pointWkb(-3.5, 4.25)))) + val rowsBatch2 = Seq[(Int, Option[Array[Byte]])]( + (4, Some(pointWkb(10.0, 20.0))), + (5, Some(pointWkb(0.0, 0.0)))) + + val res0 = insertGeoBatch(tablePath, schema, rowsBatch1, isNewTable = true) + assert(res0.getVersion === 0) + val res1 = insertGeoBatch(tablePath, schema, rowsBatch2, isNewTable = false) + assert(res1.getVersion === 1) + + val snapshot = latestSnapshot(tablePath) + val loadedGeoType = snapshot.getSchema.get("geo").getDataType + assert(loadedGeoType == geoType, s"loaded $loadedGeoType, expected $geoType") + + val protocol = snapshot.getProtocol + val supported = protocol.getImplicitlyAndExplicitlySupportedFeatures + assert(supported.contains(GEOSPATIAL_RW_FEATURE), s"protocol features: $supported") + assert(protocol.getMinReaderVersion == 3) + assert(protocol.getMinWriterVersion == 7) + + // id-keyed compare; cross-file/cross-partition row order is not guaranteed. + val expected = (rowsBatch1 ++ rowsBatch2).map { case (id, bytes) => + (id, bytes.map(_.toSeq)) + }.toMap + val actual = readGeoTable(tablePath).toMap + assert(actual.size === expected.size) + expected.foreach { case (id, expGeo) => + assert(actual.contains(id), s"missing id=$id") + assert(actual(id) === expGeo, s"WKB mismatch at id=$id") + } + } + } + } } diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/GeometryDataSkippingSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/GeometryDataSkippingSuite.scala index 0d45e4857a8..45f4daa5ab0 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/GeometryDataSkippingSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/GeometryDataSkippingSuite.scala @@ -15,30 +15,16 @@ */ package io.delta.kernel.defaults -import java.util.{Collections, Optional} - -import scala.collection.JavaConverters._ import scala.collection.immutable.Seq -import io.delta.kernel._ -import io.delta.kernel.defaults.utils.WriteUtils -import io.delta.kernel.expressions.{Column, Literal, StGeometryBoxesIntersect} -import io.delta.kernel.internal.util.Utils.toCloseableIterator +import io.delta.kernel.defaults.utils.{GeoTestUtils, WriteUtils} +import io.delta.kernel.expressions.{And, Column, Literal, Predicate, StGeometryBoxesIntersect} import io.delta.kernel.statistics.DataFileStatistics -import io.delta.kernel.types.{GeometryType, StructType} -import io.delta.kernel.utils.CloseableIterable.inMemoryIterable -import io.delta.kernel.utils.DataFileStatus +import io.delta.kernel.types.{GeometryType, IntegerType, StructType} import org.scalatest.funsuite.AnyFunSuite -/** - * End-to-end tests for data skipping with geometry column types using - * the StGeometryBoxesIntersect predicate. - * - * Tests bypass the Parquet writer by injecting DataFileStatistics with - * geometry min/max literals directly into the Delta log. - */ -class GeometryDataSkippingSuite extends AnyFunSuite with WriteUtils { +class GeometryDataSkippingSuite extends AnyFunSuite with WriteUtils with GeoTestUtils { // 4-quadrant layout + 1 null-stats file (f4, never skipped): // y=10 +------+ +------+ @@ -49,235 +35,101 @@ class GeometryDataSkippingSuite extends AnyFunSuite with WriteUtils { // | f0 | | f3 | // y=0 +------+ +------+ // x=0 x=3 x=7 x=10 - private val fileExtents: Seq[Option[(Double, Double, Double, Double)]] = - Seq( - Some((0.0, 0.0, 3.0, 3.0)), // SW - file 0 - Some((7.0, 7.0, 10.0, 10.0)), // NE - file 1 - Some((0.0, 7.0, 3.0, 10.0)), // NW - file 2 - Some((7.0, 0.0, 10.0, 3.0)), // SE - file 3 - None // null stats - file 4 (never skipped) - ) + private val fileExtents: Seq[Option[(Double, Double, Double, Double)]] = Seq( + Some((0.0, 0.0, 3.0, 3.0)), + Some((7.0, 7.0, 10.0, 10.0)), + Some((0.0, 7.0, 3.0, 10.0)), + Some((7.0, 0.0, 10.0, 3.0)), + None) private val colType = new GeometryType("OGC:CRS84") + private val geomCol = new Column("geom") test("StGeometryBoxesIntersect data skipping on GeometryType column") { withTempDirAndEngine { (tablePath, engine) => val schema = new StructType().add("geom", colType) - writeFilesWithGeometryStats( + val statsList = fileExtents.map { + case Some((minX, minY, maxX, maxY)) => + geoStats(geomCol, minX, minY, maxX, maxY, colType) + case None => emptyStats() + } + commitGeoStatsFiles(tablePath, engine, schema, statsList) + + val snapshot = latestSnapshot(tablePath) + + // null-stats f4 is always returned (+1 in every count below). + assert(boxFilesHit(snapshot, geomCol, colType, 1.0, 1.0, 4.0, 4.0) == 2) // f0 + assert(boxFilesHit(snapshot, geomCol, colType, 8.0, 8.0, 11.0, 11.0) == 2) // f1 + assert(boxFilesHit(snapshot, geomCol, colType, 4.0, 4.0, 6.0, 6.0) == 1) // none + assert(boxFilesHit(snapshot, geomCol, colType, 1.0, 1.0, 4.0, 9.0) == 3) // f0+f2 + assert(boxFilesHit(snapshot, geomCol, colType, 0.0, 8.0, 11.0, 11.0) == 3) // f1+f2 + assert(boxFilesHit(snapshot, geomCol, colType, 0.0, 0.0, 11.0, 11.0) == 5) // all + } + } + + test("file with completely missing stats falls through and is never pruned") { + withTempDirAndEngine { (tablePath, engine) => + val schema = new StructType().add("geom", colType) + commitGeoStatsFiles( tablePath, engine, schema, - fileExtents) + Seq(geoStats(geomCol, 0.0, 0.0, 3.0, 3.0, colType), emptyStats())) val snapshot = latestSnapshot(tablePath) - - def filesHit( - qMinX: Double, - qMinY: Double, - qMaxX: Double, - qMaxY: Double): Int = { - val pred = new StGeometryBoxesIntersect( - new Column("geom"), - Literal.ofGeospatialWKT(s"POINT ($qMinX $qMinY)", colType), - Literal.ofGeospatialWKT(s"POINT ($qMaxX $qMaxY)", colType)) - collectScanFileRows( - snapshot.getScanBuilder().withFilter(pred).build()).size - } - - // null-stats f4 is always included in every query - // SW query - intersects f0 + f4(null) - assert(filesHit(1.0, 1.0, 4.0, 4.0) == 2) - // NE query - intersects f1 + f4(null) - assert(filesHit(8.0, 8.0, 11.0, 11.0) == 2) - // Center gap - no data file hit, only f4(null) - assert(filesHit(4.0, 4.0, 6.0, 6.0) == 1) - // West strip [1,1]-[4,9] - f0 + f2 + f4(null) - assert(filesHit(1.0, 1.0, 4.0, 9.0) == 3) - // Top strip [0,8]-[11,11] - f1 + f2 + f4(null) - assert(filesHit(0.0, 8.0, 11.0, 11.0) == 3) - // Global - all 4 data files + f4(null) - assert(filesHit(0.0, 0.0, 11.0, 11.0) == 5) + assert(boxFilesHit(snapshot, geomCol, colType, 100.0, 100.0, 101.0, 101.0) == 1) } } test("StGeometryBoxesIntersect combined with AND predicate on second column") { withTempDirAndEngine { (tablePath, engine) => val schema = new StructType() - .add("id", io.delta.kernel.types.IntegerType.INTEGER) - .add("geom", new GeometryType("OGC:CRS84")) - + .add("id", IntegerType.INTEGER) + .add("geom", colType) val idCol = new Column("id") - val geomCol = new Column("geom") - // File 0 (SW): id [1,5], geom bbox [0,0]-[3,3] - // File 1 (NE): id [10,20], geom bbox [7,7]-[10,10] - // File 2: null stats (never skipped) - val files: Seq[Option[(Int, Int, Double, Double, Double, Double)]] = - Seq( - Some((1, 5, 0.0, 0.0, 3.0, 3.0)), - Some((10, 20, 7.0, 7.0, 10.0, 10.0)), - None) - - files.zipWithIndex.foreach { - case (fileOpt, idx) => - val txn = if (idx == 0) { - getCreateTxn(engine, tablePath, schema) - } else { - getUpdateTxn(engine, tablePath) - } - - val txnState = txn.getTransactionState(engine) - val writeContext = Transaction.getWriteContext( - engine, - txnState, - Collections.emptyMap()) - - val stats = fileOpt match { - case Some((idMin, idMax, gMinX, gMinY, gMaxX, gMaxY)) => - new DataFileStatistics( - 10, - Map( - idCol -> Literal.ofInt(idMin), - geomCol -> Literal.ofGeospatialWKT( - s"POINT ($gMinX $gMinY)", - colType)).asJava, - Map( - idCol -> Literal.ofInt(idMax), - geomCol -> Literal.ofGeospatialWKT( - s"POINT ($gMaxX $gMaxY)", - colType)).asJava, - Map( - idCol -> (0L: java.lang.Long), - geomCol -> (0L: java.lang.Long)).asJava, - Optional.empty()) - case None => - new DataFileStatistics( - 10, - Collections.emptyMap(), - Collections.emptyMap(), - Collections.emptyMap(), - Optional.empty()) - } - - val filePath = - engine.getFileSystemClient.resolvePath( - writeContext.getTargetDirectory + s"/part-$idx.parquet") - val fileStatus = new DataFileStatus( - filePath, - 1000, - 0L, - Optional.of(stats)) - - val actions = Transaction.generateAppendActions( - engine, - txnState, - toCloseableIterator(Seq(fileStatus).iterator.asJava), - writeContext) - commitTransaction( - txn, - engine, - inMemoryIterable(actions)) - } + // f0: id [1,5], geom [0,0]-[3,3]; f1: id [10,20], geom [7,7]-[10,10]; f2: null stats. + val statsList = Seq[DataFileStatistics]( + stats( + minValues = Map( + idCol -> Literal.ofInt(1), + geomCol -> pointWktLiteral(0.0, 0.0, colType)), + maxValues = Map( + idCol -> Literal.ofInt(5), + geomCol -> pointWktLiteral(3.0, 3.0, colType)), + nullCounts = Map(idCol -> 0L, geomCol -> 0L)), + stats( + minValues = Map( + idCol -> Literal.ofInt(10), + geomCol -> pointWktLiteral(7.0, 7.0, colType)), + maxValues = Map( + idCol -> Literal.ofInt(20), + geomCol -> pointWktLiteral(10.0, 10.0, colType)), + nullCounts = Map(idCol -> 0L, geomCol -> 0L)), + emptyStats()) + commitGeoStatsFiles(tablePath, engine, schema, statsList) val snapshot = latestSnapshot(tablePath) - // null-stats f2 is always included in every query - - // SW geo + id<=5: f0 matches both, f2(null) included - val geoAndId = new io.delta.kernel.expressions.And( + val swGeoAndId = new And( new StGeometryBoxesIntersect( geomCol, - Literal.ofGeospatialWKT("POINT (1.0 1.0)", colType), - Literal.ofGeospatialWKT("POINT (4.0 4.0)", colType)), - new io.delta.kernel.expressions.Predicate( - "<=", - idCol, - Literal.ofInt(5))) + pointWktLiteral(1.0, 1.0, colType), + pointWktLiteral(4.0, 4.0, colType)), + new Predicate("<=", idCol, Literal.ofInt(5))) assert(collectScanFileRows( - snapshot.getScanBuilder().withFilter(geoAndId).build()).size == 2) + snapshot.getScanBuilder().withFilter(swGeoAndId).build()).size == 2) - // NE geo + id>=15: f1 matches both, f2(null) included - val geoAndId2 = new io.delta.kernel.expressions.And( + val neGeoAndId = new And( new StGeometryBoxesIntersect( geomCol, - Literal.ofGeospatialWKT("POINT (8.0 8.0)", colType), - Literal.ofGeospatialWKT("POINT (11.0 11.0)", colType)), - new io.delta.kernel.expressions.Predicate( - ">=", - idCol, - Literal.ofInt(15))) + pointWktLiteral(8.0, 8.0, colType), + pointWktLiteral(11.0, 11.0, colType)), + new Predicate(">=", idCol, Literal.ofInt(15))) assert(collectScanFileRows( - snapshot.getScanBuilder().withFilter(geoAndId2).build()).size == 2) - - // Center geo: both data files skipped, only f2(null) - val geoCenterAny = new StGeometryBoxesIntersect( - geomCol, - Literal.ofGeospatialWKT("POINT (4.0 4.0)", colType), - Literal.ofGeospatialWKT("POINT (6.0 6.0)", colType)) - assert(collectScanFileRows( - snapshot.getScanBuilder().withFilter(geoCenterAny).build()).size == 1) - } - } - - private def writeFilesWithGeometryStats( - tablePath: String, - engine: io.delta.kernel.engine.Engine, - schema: StructType, - extents: Seq[Option[(Double, Double, Double, Double)]]): Unit = { - val geomCol = new Column("geom") - extents.zipWithIndex.foreach { - case (extentOpt, idx) => - val txn = if (idx == 0) { - getCreateTxn(engine, tablePath, schema) - } else { - getUpdateTxn(engine, tablePath) - } - - val txnState = txn.getTransactionState(engine) - val writeContext = Transaction.getWriteContext( - engine, - txnState, - Collections.emptyMap()) - - val stats = extentOpt match { - case Some((minX, minY, maxX, maxY)) => - new DataFileStatistics( - 10, - Map(geomCol -> Literal.ofGeospatialWKT( - s"POINT ($minX $minY)", - colType)).asJava, - Map(geomCol -> Literal.ofGeospatialWKT( - s"POINT ($maxX $maxY)", - colType)).asJava, - Map(geomCol -> (0L: java.lang.Long)).asJava, - Optional.empty()) - case None => - new DataFileStatistics( - 10, - Collections.emptyMap(), - Collections.emptyMap(), - Collections.emptyMap(), - Optional.empty()) - } - - val filePath = - engine.getFileSystemClient.resolvePath( - writeContext.getTargetDirectory + s"/part-$idx.parquet") - val fileStatus = new DataFileStatus( - filePath, - 1000, - 0L, - Optional.of(stats)) + snapshot.getScanBuilder().withFilter(neGeoAndId).build()).size == 2) - val actions = Transaction.generateAppendActions( - engine, - txnState, - toCloseableIterator(Seq(fileStatus).iterator.asJava), - writeContext) - commitTransaction( - txn, - engine, - inMemoryIterable(actions)) + assert(boxFilesHit(snapshot, geomCol, colType, 4.0, 4.0, 6.0, 6.0) == 1) } } } diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/utils/GeoTestUtils.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/utils/GeoTestUtils.scala new file mode 100644 index 00000000000..0c87f4f281b --- /dev/null +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/utils/GeoTestUtils.scala @@ -0,0 +1,152 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.defaults.utils + +import java.nio.{ByteBuffer, ByteOrder} +import java.util.{Collections, Optional} + +import scala.collection.JavaConverters._ +import scala.collection.immutable.Seq + +import io.delta.kernel.{Snapshot, Transaction} +import io.delta.kernel.data.{ColumnVector, Row} +import io.delta.kernel.engine.Engine +import io.delta.kernel.expressions.{Column, Literal, StGeometryBoxesIntersect} +import io.delta.kernel.internal.util.Utils.toCloseableIterator +import io.delta.kernel.statistics.DataFileStatistics +import io.delta.kernel.types.{DataType, IntegerType, StructType} +import io.delta.kernel.utils.{CloseableIterable, DataFileStatus} +import io.delta.kernel.utils.CloseableIterable.inMemoryIterable + +/** Mixin for geospatial test fixtures: WKB/WKT helpers, stats injection, box-prune queries. */ +trait GeoTestUtils extends AbstractWriteUtils { + + /** 21-byte little-endian WKB for POINT(x y). */ + def pointWkb(x: Double, y: Double): Array[Byte] = { + val buf = ByteBuffer.allocate(21).order(ByteOrder.LITTLE_ENDIAN) + buf.put(1.toByte) + buf.putInt(1) + buf.putDouble(x) + buf.putDouble(y) + buf.array() + } + + def pointWktLiteral(x: Double, y: Double, geoType: DataType): Literal = + Literal.ofGeospatialWKT(s"POINT ($x $y)", geoType) + + def geoColumnVector( + geoType: DataType, + values: Seq[Option[Array[Byte]]]): ColumnVector = new ColumnVector { + override def getDataType: DataType = geoType + override def getSize: Int = values.length + override def close(): Unit = {} + override def isNullAt(rowId: Int): Boolean = values(rowId).isEmpty + override def getBinary(rowId: Int): Array[Byte] = values(rowId).orNull + } + + def intColumnVector(values: Seq[Int]): ColumnVector = new ColumnVector { + override def getDataType: DataType = IntegerType.INTEGER + override def getSize: Int = values.length + override def close(): Unit = {} + override def isNullAt(rowId: Int): Boolean = false + override def getInt(rowId: Int): Int = values(rowId) + } + + def geoStats( + geomCol: Column, + minX: Double, + minY: Double, + maxX: Double, + maxY: Double, + geoType: DataType, + numRecords: Long = 10L): DataFileStatistics = new DataFileStatistics( + numRecords, + Map(geomCol -> pointWktLiteral(minX, minY, geoType)).asJava, + Map(geomCol -> pointWktLiteral(maxX, maxY, geoType)).asJava, + Map(geomCol -> (0L: java.lang.Long)).asJava, + Optional.empty()) + + /** Stats with no min/max recorded - data skipping must always retain such files. */ + def emptyStats(numRecords: Long = 10L): DataFileStatistics = new DataFileStatistics( + numRecords, + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap(), + Optional.empty()) + + def stats( + minValues: Map[Column, Literal], + maxValues: Map[Column, Literal], + nullCounts: Map[Column, java.lang.Long] = Map.empty, + numRecords: Long = 10L): DataFileStatistics = new DataFileStatistics( + numRecords, + minValues.asJava, + maxValues.asJava, + nullCounts.asJava, + Optional.empty()) + + /** Stages a synthetic add file (no real Parquet bytes) at part-{fileIdx}.parquet. */ + def appendActionsForGeoStatsFile( + engine: Engine, + txn: Transaction, + fileIdx: Int, + stats: DataFileStatistics, + fileSize: Long = 1000L): CloseableIterable[Row] = { + val txnState = txn.getTransactionState(engine) + val writeContext = Transaction.getWriteContext(engine, txnState, Collections.emptyMap()) + val filePath = engine.getFileSystemClient.resolvePath( + writeContext.getTargetDirectory + s"/part-$fileIdx.parquet") + val fileStatus = new DataFileStatus(filePath, fileSize, 0L, Optional.of(stats)) + val actions = Transaction.generateAppendActions( + engine, + txnState, + toCloseableIterator(Seq(fileStatus).iterator.asJava), + writeContext) + inMemoryIterable(actions) + } + + /** One commit per stats entry; first creates the table, rest update. */ + def commitGeoStatsFiles( + tablePath: String, + engine: Engine, + schema: StructType, + stats: Seq[DataFileStatistics], + tableProperties: Map[String, String] = Map.empty): Unit = stats.zipWithIndex.foreach { + case (s, idx) => + val txn = if (idx == 0) { + getCreateTxn(engine, tablePath, schema, tableProperties = tableProperties) + } else { + getUpdateTxn(engine, tablePath) + } + commitTransaction(txn, engine, appendActionsForGeoStatsFile(engine, txn, idx, s)) + } + + /** Count of scan files left after applying StGeometryBoxesIntersect on a query bbox. */ + def boxFilesHit( + snapshot: Snapshot, + geomCol: Column, + geoType: DataType, + qMinX: Double, + qMinY: Double, + qMaxX: Double, + qMaxY: Double): Int = { + val pred = new StGeometryBoxesIntersect( + geomCol, + pointWktLiteral(qMinX, qMinY, geoType), + pointWktLiteral(qMaxX, qMaxY, geoType)) + collectScanFileRows(snapshot.getScanBuilder().withFilter(pred).build()).size + } +}