From 2b5464544febdcdc7415df64edc796ba7f56f57c Mon Sep 17 00:00:00 2001 From: emu Date: Fri, 8 May 2026 11:21:12 +0000 Subject: [PATCH 1/6] fixed --- .../kernel/defaults/utils/GeoTestUtils.scala | 202 ++++++++++++++++++ 1 file changed, 202 insertions(+) create mode 100644 kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/utils/GeoTestUtils.scala diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/utils/GeoTestUtils.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/utils/GeoTestUtils.scala new file mode 100644 index 00000000000..a287914345b --- /dev/null +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/utils/GeoTestUtils.scala @@ -0,0 +1,202 @@ +/* + * Copyright (2026) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.defaults.utils + +import java.nio.{ByteBuffer, ByteOrder} +import java.util.{Collections, Optional} + +import scala.collection.JavaConverters._ +import scala.collection.immutable.Seq + +import io.delta.kernel.{Snapshot, Transaction} +import io.delta.kernel.data.{ColumnVector, Row} +import io.delta.kernel.engine.Engine +import io.delta.kernel.expressions.{Column, Literal, StGeometryBoxesIntersect} +import io.delta.kernel.internal.util.Utils.toCloseableIterator +import io.delta.kernel.statistics.DataFileStatistics +import io.delta.kernel.types.{DataType, IntegerType, StructType} +import io.delta.kernel.utils.{CloseableIterable, DataFileStatus} +import io.delta.kernel.utils.CloseableIterable.inMemoryIterable + +/** + * Shared helpers for geospatial tests. Mix into any AnyFunSuite that already mixes in + * AbstractWriteUtils. Provides: + * + * - WKB and WKT literal construction for synthetic geometry/geography points + * - Column vectors carrying WKB binary or scalar int data + * - DataFileStatistics with min/max bounding-box literals + * - The boilerplate to stage a synthetic add file (no real Parquet write) and to + * commit one such file per stats entry against a given table + * - A box-intersect file-pruning shorthand that returns the count of scan files a + * query bounding box leaves after data skipping + * + * Suites that exercise the data-skipping path inject stats directly via + * appendActionsForGeoStatsFile rather than going through the Parquet writer; that + * keeps the test fixtures deterministic and decouples the predicate behavior from + * the write pipeline. + */ +trait GeoTestUtils extends AbstractWriteUtils { + + /** + * Builds a 21-byte little-endian WKB encoding for POINT(x y): + * byteOrder(1) + type=1(4) + x(8) + y(8). + */ + def pointWkb(x: Double, y: Double): Array[Byte] = { + val buf = ByteBuffer.allocate(21).order(ByteOrder.LITTLE_ENDIAN) + buf.put(1.toByte) + buf.putInt(1) + buf.putDouble(x) + buf.putDouble(y) + buf.array() + } + + /** WKT POINT literal of the given type, e.g. for stats min/max or query bounds. */ + def pointWktLiteral(x: Double, y: Double, geoType: DataType): Literal = + Literal.ofGeospatialWKT(s"POINT ($x $y)", geoType) + + /** + * ColumnVector backed by an in-memory Seq of WKB byte arrays. None entries become + * null rows; Some entries return their bytes via getBinary. + */ + def geoColumnVector( + geoType: DataType, + values: Seq[Option[Array[Byte]]]): ColumnVector = new ColumnVector { + override def getDataType: DataType = geoType + override def getSize: Int = values.length + override def close(): Unit = {} + override def isNullAt(rowId: Int): Boolean = values(rowId).isEmpty + override def getBinary(rowId: Int): Array[Byte] = values(rowId).orNull + } + + /** Trivial non-null IntegerType vector. */ + def intColumnVector(values: Seq[Int]): ColumnVector = new ColumnVector { + override def getDataType: DataType = IntegerType.INTEGER + override def getSize: Int = values.length + override def close(): Unit = {} + override def isNullAt(rowId: Int): Boolean = false + override def getInt(rowId: Int): Int = values(rowId) + } + + /** + * DataFileStatistics with a single geo column's min/max set to POINT WKT literals + * for the given bounding box, and null count = 0. + */ + def geoStats( + geomCol: Column, + minX: Double, + minY: Double, + maxX: Double, + maxY: Double, + geoType: DataType, + numRecords: Long = 10L): DataFileStatistics = new DataFileStatistics( + numRecords, + Map(geomCol -> pointWktLiteral(minX, minY, geoType)).asJava, + Map(geomCol -> pointWktLiteral(maxX, maxY, geoType)).asJava, + Map(geomCol -> (0L: java.lang.Long)).asJava, + Optional.empty()) + + /** + * DataFileStatistics with no min/max recorded - the data-skipping path treats + * this file as "always include" because the predicate cannot prove non-intersection. + */ + def emptyStats(numRecords: Long = 10L): DataFileStatistics = new DataFileStatistics( + numRecords, + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap(), + Optional.empty()) + + /** + * General-purpose DataFileStatistics builder for multi-column stats. Use when a + * single file needs min/max recorded for several columns (e.g. a geo column AND + * an int column to test combined predicates). + */ + def stats( + minValues: Map[Column, Literal], + maxValues: Map[Column, Literal], + nullCounts: Map[Column, java.lang.Long] = Map.empty, + numRecords: Long = 10L): DataFileStatistics = new DataFileStatistics( + numRecords, + minValues.asJava, + maxValues.asJava, + nullCounts.asJava, + Optional.empty()) + + /** + * Stages a synthetic add file at /part-{fileIdx}.parquet with the + * provided stats (no real Parquet bytes are written), returning the actions that + * the caller should hand to commitTransaction. fileSize is non-zero so the add + * action looks plausible to downstream consumers. + */ + def appendActionsForGeoStatsFile( + engine: Engine, + txn: Transaction, + fileIdx: Int, + stats: DataFileStatistics, + fileSize: Long = 1000L): CloseableIterable[Row] = { + val txnState = txn.getTransactionState(engine) + val writeContext = Transaction.getWriteContext(engine, txnState, Collections.emptyMap()) + val filePath = engine.getFileSystemClient.resolvePath( + writeContext.getTargetDirectory + s"/part-$fileIdx.parquet") + val fileStatus = new DataFileStatus(filePath, fileSize, 0L, Optional.of(stats)) + val actions = Transaction.generateAppendActions( + engine, + txnState, + toCloseableIterator(Seq(fileStatus).iterator.asJava), + writeContext) + inMemoryIterable(actions) + } + + /** + * Commits one synthetic add file per stats entry, creating the table on the first + * commit and updating it on subsequent ones. Each entry becomes its own commit so + * tests can reason about per-file pruning post-checkpoint. + */ + def commitGeoStatsFiles( + tablePath: String, + engine: Engine, + schema: StructType, + stats: Seq[DataFileStatistics], + tableProperties: Map[String, String] = Map.empty): Unit = stats.zipWithIndex.foreach { + case (s, idx) => + val txn = if (idx == 0) { + getCreateTxn(engine, tablePath, schema, tableProperties = tableProperties) + } else { + getUpdateTxn(engine, tablePath) + } + commitTransaction(txn, engine, appendActionsForGeoStatsFile(engine, txn, idx, s)) + } + + /** + * Returns the count of scan files a snapshot leaves after applying an + * StGeometryBoxesIntersect predicate against the given column with the given query + * bounding box. Files with missing geo stats fall through and count toward the result. + */ + def boxFilesHit( + snapshot: Snapshot, + geomCol: Column, + geoType: DataType, + qMinX: Double, + qMinY: Double, + qMaxX: Double, + qMaxY: Double): Int = { + val pred = new StGeometryBoxesIntersect( + geomCol, + pointWktLiteral(qMinX, qMinY, geoType), + pointWktLiteral(qMaxX, qMaxY, geoType)) + collectScanFileRows(snapshot.getScanBuilder().withFilter(pred).build()).size + } +} From a47595e9df17cfb579cdb63a9d5f2d232b9fcbc1 Mon Sep 17 00:00:00 2001 From: emu Date: Fri, 8 May 2026 11:42:03 +0000 Subject: [PATCH 2/6] tests --- .../kernel/internal/util/SchemaUtils.java | 12 + .../defaults/CreateCheckpointSuite.scala | 98 +++++- .../defaults/DeltaTableClusteringSuite.scala | 65 +++- .../DeltaTableSchemaEvolutionSuite.scala | 136 +++++++- .../defaults/DeltaTableWritesSuite.scala | 124 +++++++- .../defaults/GeometryDataSkippingSuite.scala | 296 +++++------------- .../kernel/defaults/utils/GeoTestUtils.scala | 62 +--- 7 files changed, 505 insertions(+), 288 deletions(-) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/SchemaUtils.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/SchemaUtils.java index 0e500265c67..6d916775d53 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/SchemaUtils.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/SchemaUtils.java @@ -293,6 +293,18 @@ public static List casePreservingEligibleClusterColumns( .map(col -> ColumnMapping.getPhysicalColumnNameAndDataType(schema, col)) .collect(Collectors.toList()); + // Geo types are skipping-eligible (bbox stats) but have no total order, so cannot cluster. + List geoColumns = + physicalColumnsWithTypes.stream() + .filter(tuple -> tuple._2 instanceof GeometryType || tuple._2 instanceof GeographyType) + .map(tuple -> tuple._1.toString() + " : " + tuple._2) + .collect(Collectors.toList()); + + if (!geoColumns.isEmpty()) { + throw new KernelException( + format("Clustering is not supported on geometry/geography column(s): %s", geoColumns)); + } + List nonSkippingEligibleColumns = physicalColumnsWithTypes.stream() .filter(tuple -> !StatsSchemaHelper.isSkippingEligibleDataType(tuple._2)) diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/CreateCheckpointSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/CreateCheckpointSuite.scala index ce778beb260..103041a3520 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/CreateCheckpointSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/CreateCheckpointSuite.scala @@ -17,14 +17,18 @@ package io.delta.kernel.defaults import java.io.File +import scala.collection.immutable.Seq + import io.delta.golden.GoldenTableUtils.goldenTablePath import io.delta.kernel.{Table, TableManager} import io.delta.kernel.defaults.engine.DefaultEngine -import io.delta.kernel.defaults.utils.{TestRow, TestUtils, WriteUtils} +import io.delta.kernel.defaults.utils.{GeoTestUtils, TestRow, TestUtils, WriteUtils} import io.delta.kernel.engine.Engine import io.delta.kernel.exceptions.{CheckpointAlreadyExistsException, TableNotFoundException} -import io.delta.kernel.expressions.Literal +import io.delta.kernel.expressions.{Column, Literal} import io.delta.kernel.internal.SnapshotImpl +import io.delta.kernel.statistics.DataFileStatistics +import io.delta.kernel.types.{GeometryType, StructType => KernelStructType} import org.apache.spark.sql.delta.{DeltaLog, VersionNotFoundException} import org.apache.spark.sql.delta.DeltaOperations.ManualUpdate @@ -40,7 +44,7 @@ import org.scalatest.funsuite.AnyFunSuite /** * Test suite for `io.delta.kernel.Table.checkpoint(engine, version)` */ -class CreateCheckpointSuite extends CheckpointBase { +class CreateCheckpointSuite extends CheckpointBase with GeoTestUtils { /////////// // Tests // @@ -469,6 +473,94 @@ class CreateCheckpointSuite extends CheckpointBase { } } + // Same fixture as GeometryDataSkippingSuite (4 quadrants + null-stats f4). + private val geomCheckpointFileExtents: Seq[Option[(Double, Double, Double, Double)]] = Seq( + Some((0.0, 0.0, 3.0, 3.0)), + Some((7.0, 7.0, 10.0, 10.0)), + Some((0.0, 7.0, 3.0, 10.0)), + Some((7.0, 0.0, 10.0, 3.0)), + None) + + private val geomColType = new GeometryType("OGC:CRS84") + private val geomCol = new Column("geom") + + private def geoCheckpointStatsList: Seq[DataFileStatistics] = + geomCheckpointFileExtents.map { + case Some((minX, minY, maxX, maxY)) => + geoStats(geomCol, minX, minY, maxX, maxY, geomColType) + case None => emptyStats() + } + + // Counts the checkpoint manifest files at a version. Excludes sidecars + // (.checkpoint.NNNN.NNNN..parquet) so both classic and V2 layouts return 1. + private def checkpointManifestCount(tablePath: String, checkpointVersion: Long): Int = { + val deltaLogDir = new java.io.File(tablePath, "_delta_log") + val versionPrefix = f"$checkpointVersion%020d.checkpoint" + deltaLogDir + .listFiles() + .count { f => + val name = f.getName + name.startsWith(versionPrefix) && + !name.matches(raw".*\.checkpoint\.\d+\.\d+\..*") + } + } + + Seq( + ("classic", Map.empty[String, String]), + ( + "v2-typed", + Map( + io.delta.kernel.internal.TableConfig.CHECKPOINT_POLICY.getKey -> "v2"))).foreach { + case (label, tableProps) => + test(s"data skipping survives $label checkpoint - geometry column") { + withTempDirAndEngine { (tablePath, engine) => + val schema = new KernelStructType().add("geom", geomColType) + commitGeoStatsFiles( + tablePath, + engine, + schema, + geoCheckpointStatsList, + tableProperties = tableProps) + + val checkpointVersion = (geomCheckpointFileExtents.length - 1).toLong + kernelCheckpoint(engine, tablePath, checkpointVersion) + + val manifestCount = checkpointManifestCount(tablePath, checkpointVersion) + assert( + manifestCount == 1, + s"expected exactly 1 checkpoint manifest at v=$checkpointVersion, got $manifestCount") + + // Inline delete instead of deleteDeltaFilesBefore: that helper goes through Spark, + // which rejects tables with the geospatial feature it doesn't recognize. + Seq.range(0L, checkpointVersion).foreach { v => + val p = new Path(f"$tablePath/_delta_log/$v%020d.json") + p.getFileSystem(new Configuration()).delete(p, false) + } + + val snapshot = latestSnapshot(tablePath) + + assert(snapshot.getSchema.get("geom").getDataType == geomColType) + + val features = snapshot.getProtocol.getImplicitlyAndExplicitlySupportedFeatures + assert( + features.contains(io.delta.kernel.internal.tablefeatures.TableFeatures + .GEOSPATIAL_RW_FEATURE), + s"geospatial feature missing post-checkpoint: $features") + + // null-stats f4 always falls through (+1 in every count). + assert(boxFilesHit(snapshot, geomCol, geomColType, 1.0, 1.0, 4.0, 4.0) == 2) + assert(boxFilesHit(snapshot, geomCol, geomColType, 8.0, 8.0, 11.0, 11.0) == 2) + assert(boxFilesHit(snapshot, geomCol, geomColType, 4.0, 4.0, 6.0, 6.0) == 1) + assert(boxFilesHit(snapshot, geomCol, geomColType, 0.0, 0.0, 11.0, 11.0) == 5) + + assert( + collectScanFileRows(snapshot.getScanBuilder().build()).size == + geomCheckpointFileExtents.length, + s"checkpoint dropped add actions: expected ${geomCheckpointFileExtents.length}") + } + } + } + test( "log cleanup: checkpointProtection enabled prevents log cleanup, " + "even snapshot is built as latest") { diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableClusteringSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableClusteringSuite.scala index db143108d10..ea522f214a6 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableClusteringSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableClusteringSuite.scala @@ -31,7 +31,7 @@ import io.delta.kernel.internal.actions.DomainMetadata import io.delta.kernel.internal.clustering.ClusteringMetadataDomain import io.delta.kernel.internal.util.ColumnMapping import io.delta.kernel.internal.util.ColumnMapping.COLUMN_MAPPING_PHYSICAL_NAME_KEY -import io.delta.kernel.types.{MapType, StructType} +import io.delta.kernel.types.{GeographyType, GeometryType, MapType, StructType} import io.delta.kernel.types.IntegerType.INTEGER import io.delta.kernel.utils.CloseableIterable import io.delta.kernel.utils.CloseableIterable.emptyIterable @@ -119,6 +119,69 @@ trait DeltaTableClusteringSuiteBase extends AnyFunSuite with AbstractWriteUtils } } + Seq( + ("geometry", GeometryType.ofDefault()), + ("geography", GeographyType.ofDefault())).foreach { case (label, geoType) => + test(s"build table txn: clustering on a $label column should be rejected") { + withTempDirAndEngine { (tablePath, engine) => + val schema = new StructType() + .add("id", INTEGER) + .add("geo", geoType) + val ex = intercept[KernelException] { + getCreateTxn( + engine, + tablePath, + schema, + clusteringColsOpt = Some(List(new Column("geo")))) + } + assert( + ex.getMessage.contains("Clustering is not supported on geometry/geography column(s)"), + s"unexpected error message: ${ex.getMessage}") + assert(ex.getMessage.contains("geo")) + } + } + + test(s"build table txn: clustering with mixed non-geo + $label column rejects only the geo") { + withTempDirAndEngine { (tablePath, engine) => + val schema = new StructType() + .add("id", INTEGER) + .add("geo", geoType) + val ex = intercept[KernelException] { + getCreateTxn( + engine, + tablePath, + schema, + clusteringColsOpt = Some(List(new Column("id"), new Column("geo")))) + } + assert( + ex.getMessage.contains("Clustering is not supported on geometry/geography column(s)"), + s"unexpected error message: ${ex.getMessage}") + assert(ex.getMessage.contains("geo")) + assert( + !ex.getMessage.contains(" id "), + s"non-geo column should not be in the rejection list: ${ex.getMessage}") + } + } + + test(s"update a non-clustered table: cannot enable clustering on a $label column") { + withTempDirAndEngine { (tablePath, engine) => + val schema = new StructType() + .add("id", INTEGER) + .add("geo", geoType) + createEmptyTable(engine, tablePath, schema) + val ex = intercept[KernelException] { + updateTableMetadata( + engine, + tablePath, + clusteringColsOpt = Some(List(new Column("geo")))) + } + assert( + ex.getMessage.contains("Clustering is not supported on geometry/geography column(s)"), + s"unexpected error message: ${ex.getMessage}") + } + } + } + test("create a clustered table should succeed") { withTempDirAndEngine { (tablePath, engine) => val commitResult = createEmptyTable( diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableSchemaEvolutionSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableSchemaEvolutionSuite.scala index 21cb3e86138..3eb87fb3ea7 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableSchemaEvolutionSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableSchemaEvolutionSuite.scala @@ -30,7 +30,7 @@ import io.delta.kernel.internal.{SnapshotImpl, TableConfig} import io.delta.kernel.internal.actions.DomainMetadata import io.delta.kernel.internal.clustering.ClusteringMetadataDomain import io.delta.kernel.internal.util.{ColumnMapping, ColumnMappingSuiteBase} -import io.delta.kernel.types.{ArrayType, CollationIdentifier, DecimalType, FieldMetadata, IntegerType, LongType, MapType, StringType, StructType, TypeChange} +import io.delta.kernel.types.{ArrayType, CollationIdentifier, DecimalType, FieldMetadata, GeographyType, GeometryType, IntegerType, LongType, MapType, StringType, StructType, TypeChange} import io.delta.kernel.utils.CloseableIterable import io.delta.kernel.utils.CloseableIterable.emptyIterable @@ -1963,4 +1963,138 @@ trait DeltaTableSchemaEvolutionSuiteBase extends AnyFunSuite with AbstractWriteU .fromMetadata(getMetadata(engine, tablePath)) } + // Adding a geometry/geography column auto-enables GEOSPATIAL_RW_FEATURE and bumps protocol to (3, 7). + Seq( + ("geometry", GeometryType.ofDefault()), + ("geometry custom SRID", GeometryType.ofSRID("EPSG:4326")), + ("geography", GeographyType.ofDefault()), + ("geography custom algorithm", new GeographyType("OGC:CRS84", "vincenty"))) + .foreach { case (label, geoType) => + test(s"adding $label column auto-enables geospatial feature and upgrades protocol") { + withTempDirAndEngine { (tablePath, engine) => + val table = Table.forPath(engine, tablePath) + val initialSchema = new StructType() + .add("id", IntegerType.INTEGER, true) + .add("name", StringType.STRING, true) + + createEmptyTable( + engine, + tablePath, + initialSchema, + tableProperties = Map(TableConfig.COLUMN_MAPPING_MODE.getKey -> "id")) + + val before = table.getLatestSnapshot(engine).asInstanceOf[SnapshotImpl] + val featuresBefore = before.getProtocol.getImplicitlyAndExplicitlySupportedFeatures + assert( + !featuresBefore.contains(io.delta.kernel.internal.tablefeatures.TableFeatures + .GEOSPATIAL_RW_FEATURE), + s"geospatial should not be present before, got: $featuresBefore") + + val currentSchema = before.getSchema + val newSchema = new StructType() + .add("id", IntegerType.INTEGER, true, currentSchema.get("id").getMetadata) + .add("name", StringType.STRING, true, currentSchema.get("name").getMetadata) + .add("geo", geoType, true, fieldMetadataForColumn(3, "geo")) + + updateTableMetadata(engine, tablePath, newSchema) + + val after = table.getLatestSnapshot(engine).asInstanceOf[SnapshotImpl] + assert(after.getSchema.get("geo").getDataType == geoType) + + val featuresAfter = after.getProtocol.getImplicitlyAndExplicitlySupportedFeatures + assert( + featuresAfter.contains(io.delta.kernel.internal.tablefeatures.TableFeatures + .GEOSPATIAL_RW_FEATURE), + s"geospatial should be present after, got: $featuresAfter") + assert(after.getProtocol.getMinReaderVersion >= 3) + assert(after.getProtocol.getMinWriterVersion >= 7) + } + } + } + + test("adding a geometry column inside a struct also auto-enables geospatial") { + withTempDirAndEngine { (tablePath, engine) => + val table = Table.forPath(engine, tablePath) + val initialSchema = new StructType() + .add("id", IntegerType.INTEGER, true) + .add( + "info", + new StructType().add("name", StringType.STRING, true), + true) + + createEmptyTable( + engine, + tablePath, + initialSchema, + tableProperties = Map(TableConfig.COLUMN_MAPPING_MODE.getKey -> "id")) + + val before = table.getLatestSnapshot(engine).asInstanceOf[SnapshotImpl] + val currentSchema = before.getSchema + val currentInner = currentSchema.get("info").getDataType.asInstanceOf[StructType] + val maxIdBefore = getMaxFieldId(engine, tablePath) + + val newSchema = new StructType() + .add("id", IntegerType.INTEGER, true, currentSchema.get("id").getMetadata) + .add( + "info", + new StructType() + .add("name", StringType.STRING, true, currentInner.get("name").getMetadata) + .add( + "geom", + GeometryType.ofDefault(), + true, + fieldMetadataForColumn(maxIdBefore + 1, "geom")), + true, + currentSchema.get("info").getMetadata) + + updateTableMetadata(engine, tablePath, newSchema) + + val after = table.getLatestSnapshot(engine).asInstanceOf[SnapshotImpl] + val features = after.getProtocol.getImplicitlyAndExplicitlySupportedFeatures + assert( + features.contains(io.delta.kernel.internal.tablefeatures.TableFeatures + .GEOSPATIAL_RW_FEATURE), + s"geospatial should be present, got: $features") + assert(after.getProtocol.getMinReaderVersion >= 3) + assert(after.getProtocol.getMinWriterVersion >= 7) + } + } + + // SRID / algorithm changes are not in TypeWideningChecker, so schema evolution rejects them. + Seq( + ( + "change geometry SRID", + GeometryType.ofSRID("OGC:CRS84").asInstanceOf[io.delta.kernel.types.DataType], + GeometryType.ofSRID("EPSG:4326").asInstanceOf[io.delta.kernel.types.DataType]), + ( + "change geography algorithm", + new GeographyType("OGC:CRS84", "spherical").asInstanceOf[io.delta.kernel.types.DataType], + new GeographyType("OGC:CRS84", "vincenty").asInstanceOf[io.delta.kernel.types.DataType])) + .foreach { case (label, fromType, toType) => + test(s"$label is rejected by schema evolution") { + withTempDirAndEngine { (tablePath, engine) => + val table = Table.forPath(engine, tablePath) + val initialSchema = new StructType() + .add("id", IntegerType.INTEGER, true) + .add("geo", fromType, true) + + createEmptyTable( + engine, + tablePath, + initialSchema, + tableProperties = Map(TableConfig.COLUMN_MAPPING_MODE.getKey -> "id")) + + val currentSchema = table.getLatestSnapshot(engine).getSchema + val newSchema = new StructType() + .add("id", IntegerType.INTEGER, true, currentSchema.get("id").getMetadata) + .add("geo", toType, true, currentSchema.get("geo").getMetadata) + + assertSchemaEvolutionFails[KernelException]( + table, + engine, + newSchema, + "Cannot change the type of existing field") + } + } + } } diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala index 89475a850c5..76b4f3d4c9b 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala @@ -30,7 +30,7 @@ import io.delta.kernel.defaults.internal.data.DefaultColumnarBatch import io.delta.kernel.defaults.internal.data.vector.DefaultGenericVector import io.delta.kernel.defaults.internal.data.vector.DefaultStructVector import io.delta.kernel.defaults.internal.parquet.ParquetSuiteBase -import io.delta.kernel.defaults.utils.{AbstractWriteUtils, TestRow, WriteUtils} +import io.delta.kernel.defaults.utils.{AbstractWriteUtils, GeoTestUtils, TestRow, WriteUtils} import io.delta.kernel.engine.Engine import io.delta.kernel.exceptions._ import io.delta.kernel.expressions.{Column, Literal} @@ -67,7 +67,7 @@ class DeltaTableWritesSuite extends AbstractDeltaTableWritesSuite with WriteUtil /** Transaction commit in this suite IS REQUIRED TO use commitTransaction than .commit */ abstract class AbstractDeltaTableWritesSuite extends AnyFunSuite with AbstractWriteUtils - with ParquetSuiteBase { + with GeoTestUtils with ParquetSuiteBase { /////////////////////////////////////////////////////////////////////////// // Create table tests @@ -374,6 +374,27 @@ abstract class AbstractDeltaTableWritesSuite extends AnyFunSuite with AbstractWr } } + Seq( + ("geometry", GeometryType.ofDefault(), "Geometry(srid=OGC:CRS84)"), + ("geography", GeographyType.ofDefault(), "Geography(srid=OGC:CRS84, algorithm=spherical)")) + .foreach { case (label, geoType, typeStr) => + test(s"create partitioned table - $label partition column is rejected") { + withTempDirAndEngine { (tablePath, engine) => + val schema = new StructType() + .add("id", INTEGER) + .add("geo", geoType) + + val ex = intercept[KernelException] { + getCreateTxn(engine, tablePath, schema = schema, partCols = Seq("geo")) + } + assert( + ex.getMessage.contains( + s"Kernel doesn't support writing data with partition column (geo) of type: $typeStr"), + s"unexpected error message: ${ex.getMessage}") + } + } + } + test("create a partitioned table") { withTempDirAndEngine { (tablePath, engine) => val schema = new StructType() @@ -2096,4 +2117,103 @@ abstract class AbstractDeltaTableWritesSuite extends AnyFunSuite with AbstractWr } newStructType } + + // Reads (id INT, geo ) rows; Seq[Byte] avoids Array reference-equality surprises. + private def readGeoTable(tablePath: String): Seq[(Int, Option[Seq[Byte]])] = { + val schema = latestSnapshot(tablePath).getSchema + val out = scala.collection.mutable.ArrayBuffer.empty[(Int, Option[Seq[Byte]])] + readTableUsingKernel(defaultEngine, tablePath, schema).foreach { filteredBatch => + val batch = filteredBatch.getData + val idIdx = batch.getSchema.indexOf("id") + val geoIdx = batch.getSchema.indexOf("geo") + val idCol = batch.getColumnVector(idIdx) + val geoCol = batch.getColumnVector(geoIdx) + val sel = filteredBatch.getSelectionVector + (0 until batch.getSize).foreach { rowId => + val included = !sel.isPresent || + (!sel.get().isNullAt(rowId) && sel.get().getBoolean(rowId)) + if (included) { + val id = idCol.getInt(rowId) + val geo = + if (geoCol.isNullAt(rowId)) None else Some(geoCol.getBinary(rowId).toSeq) + out.append((id, geo)) + } + } + } + out.toSeq + } + + private def insertGeoBatch( + tablePath: String, + schema: StructType, + rows: Seq[(Int, Option[Array[Byte]])], + isNewTable: Boolean): TransactionCommitResult = { + val ids = rows.map(_._1) + val geos = rows.map(_._2) + val geoFieldType = schema.get("geo").getDataType + val batch = new DefaultColumnarBatch( + ids.length, + schema, + Array(intColumnVector(ids), geoColumnVector(geoFieldType, geos))) + val data = + Seq(Map.empty[String, Literal] -> Seq(new FilteredColumnarBatch(batch, Optional.empty()))) + appendData( + defaultEngine, + tablePath, + isNewTable = isNewTable, + schema = if (isNewTable) schema else null, + data = data) + } + + Seq( + ("geometry default SRID", GeometryType.ofDefault()), + ("geometry custom SRID", GeometryType.ofSRID("EPSG:4326")), + ("geography default", GeographyType.ofDefault()), + ("geography custom algorithm", new GeographyType("OGC:CRS84", "vincenty"))) + .foreach { case (label, geoType) => + test(s"create + insert + read roundtrip - $label") { + withTempDirAndEngine { (tablePath, engine) => + val schema = new StructType() + .add("id", INTEGER) + .add("geo", geoType) + + val rowsBatch1 = Seq[(Int, Option[Array[Byte]])]( + (1, Some(pointWkb(1.0, 2.0))), + (2, None), + (3, Some(pointWkb(-3.5, 4.25)))) + val rowsBatch2 = Seq[(Int, Option[Array[Byte]])]( + (4, Some(pointWkb(10.0, 20.0))), + (5, Some(pointWkb(0.0, 0.0)))) + + val res0 = insertGeoBatch(tablePath, schema, rowsBatch1, isNewTable = true) + assert(res0.getVersion === 0) + val res1 = insertGeoBatch(tablePath, schema, rowsBatch2, isNewTable = false) + assert(res1.getVersion === 1) + + val snapshot = latestSnapshot(tablePath) + val loadedGeoType = snapshot.getSchema.get("geo").getDataType + assert(loadedGeoType == geoType, s"loaded $loadedGeoType, expected $geoType") + + val protocol = snapshot.getProtocol + val supported = protocol.getImplicitlyAndExplicitlySupportedFeatures + assert( + supported.contains(io.delta.kernel.internal.tablefeatures.TableFeatures + .GEOSPATIAL_RW_FEATURE), + s"protocol features: $supported") + assert(protocol.getMinReaderVersion >= 3) + assert(protocol.getMinWriterVersion >= 7) + + // id-keyed compare; cross-file/cross-partition row order is not guaranteed. + val expected = (rowsBatch1 ++ rowsBatch2).map { case (id, bytes) => + (id, bytes.map(_.toSeq)) + }.toMap + val actual = readGeoTable(tablePath).toMap + assert(actual.size === expected.size) + expected.foreach { case (id, expGeo) => + assert(actual.contains(id), s"missing id=$id") + assert(actual(id) === expGeo, s"WKB mismatch at id=$id") + } + } + } + } } diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/GeometryDataSkippingSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/GeometryDataSkippingSuite.scala index 0d45e4857a8..f9dbfc77e18 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/GeometryDataSkippingSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/GeometryDataSkippingSuite.scala @@ -15,269 +15,115 @@ */ package io.delta.kernel.defaults -import java.util.{Collections, Optional} - -import scala.collection.JavaConverters._ import scala.collection.immutable.Seq -import io.delta.kernel._ -import io.delta.kernel.defaults.utils.WriteUtils -import io.delta.kernel.expressions.{Column, Literal, StGeometryBoxesIntersect} -import io.delta.kernel.internal.util.Utils.toCloseableIterator +import io.delta.kernel.defaults.utils.{GeoTestUtils, WriteUtils} +import io.delta.kernel.expressions.{And, Column, Literal, Predicate, StGeometryBoxesIntersect} import io.delta.kernel.statistics.DataFileStatistics -import io.delta.kernel.types.{GeometryType, StructType} -import io.delta.kernel.utils.CloseableIterable.inMemoryIterable -import io.delta.kernel.utils.DataFileStatus +import io.delta.kernel.types.{GeometryType, IntegerType, StructType} import org.scalatest.funsuite.AnyFunSuite -/** - * End-to-end tests for data skipping with geometry column types using - * the StGeometryBoxesIntersect predicate. - * - * Tests bypass the Parquet writer by injecting DataFileStatistics with - * geometry min/max literals directly into the Delta log. - */ -class GeometryDataSkippingSuite extends AnyFunSuite with WriteUtils { +class GeometryDataSkippingSuite extends AnyFunSuite with WriteUtils with GeoTestUtils { // 4-quadrant layout + 1 null-stats file (f4, never skipped): - // y=10 +------+ +------+ - // | f2 | | f1 | - // y=7 +------+ +------+ - // (gap) - // y=3 +------+ +------+ - // | f0 | | f3 | - // y=0 +------+ +------+ - // x=0 x=3 x=7 x=10 - private val fileExtents: Seq[Option[(Double, Double, Double, Double)]] = - Seq( - Some((0.0, 0.0, 3.0, 3.0)), // SW - file 0 - Some((7.0, 7.0, 10.0, 10.0)), // NE - file 1 - Some((0.0, 7.0, 3.0, 10.0)), // NW - file 2 - Some((7.0, 0.0, 10.0, 3.0)), // SE - file 3 - None // null stats - file 4 (never skipped) - ) + // f2 NW [0,7]-[3,10] f1 NE [7,7]-[10,10] + // f0 SW [0,0]-[3,3] f3 SE [7,0]-[10,3] + private val fileExtents: Seq[Option[(Double, Double, Double, Double)]] = Seq( + Some((0.0, 0.0, 3.0, 3.0)), + Some((7.0, 7.0, 10.0, 10.0)), + Some((0.0, 7.0, 3.0, 10.0)), + Some((7.0, 0.0, 10.0, 3.0)), + None) private val colType = new GeometryType("OGC:CRS84") + private val geomCol = new Column("geom") test("StGeometryBoxesIntersect data skipping on GeometryType column") { withTempDirAndEngine { (tablePath, engine) => val schema = new StructType().add("geom", colType) - writeFilesWithGeometryStats( + val statsList = fileExtents.map { + case Some((minX, minY, maxX, maxY)) => + geoStats(geomCol, minX, minY, maxX, maxY, colType) + case None => emptyStats() + } + commitGeoStatsFiles(tablePath, engine, schema, statsList) + + val snapshot = latestSnapshot(tablePath) + + // null-stats f4 is always returned (+1 in every count below). + assert(boxFilesHit(snapshot, geomCol, colType, 1.0, 1.0, 4.0, 4.0) == 2) // f0 + assert(boxFilesHit(snapshot, geomCol, colType, 8.0, 8.0, 11.0, 11.0) == 2) // f1 + assert(boxFilesHit(snapshot, geomCol, colType, 4.0, 4.0, 6.0, 6.0) == 1) // none + assert(boxFilesHit(snapshot, geomCol, colType, 1.0, 1.0, 4.0, 9.0) == 3) // f0+f2 + assert(boxFilesHit(snapshot, geomCol, colType, 0.0, 8.0, 11.0, 11.0) == 3) // f1+f2 + assert(boxFilesHit(snapshot, geomCol, colType, 0.0, 0.0, 11.0, 11.0) == 5) // all + } + } + + test("file with completely missing stats falls through and is never pruned") { + withTempDirAndEngine { (tablePath, engine) => + val schema = new StructType().add("geom", colType) + commitGeoStatsFiles( tablePath, engine, schema, - fileExtents) + Seq(geoStats(geomCol, 0.0, 0.0, 3.0, 3.0, colType), emptyStats())) val snapshot = latestSnapshot(tablePath) - - def filesHit( - qMinX: Double, - qMinY: Double, - qMaxX: Double, - qMaxY: Double): Int = { - val pred = new StGeometryBoxesIntersect( - new Column("geom"), - Literal.ofGeospatialWKT(s"POINT ($qMinX $qMinY)", colType), - Literal.ofGeospatialWKT(s"POINT ($qMaxX $qMaxY)", colType)) - collectScanFileRows( - snapshot.getScanBuilder().withFilter(pred).build()).size - } - - // null-stats f4 is always included in every query - // SW query - intersects f0 + f4(null) - assert(filesHit(1.0, 1.0, 4.0, 4.0) == 2) - // NE query - intersects f1 + f4(null) - assert(filesHit(8.0, 8.0, 11.0, 11.0) == 2) - // Center gap - no data file hit, only f4(null) - assert(filesHit(4.0, 4.0, 6.0, 6.0) == 1) - // West strip [1,1]-[4,9] - f0 + f2 + f4(null) - assert(filesHit(1.0, 1.0, 4.0, 9.0) == 3) - // Top strip [0,8]-[11,11] - f1 + f2 + f4(null) - assert(filesHit(0.0, 8.0, 11.0, 11.0) == 3) - // Global - all 4 data files + f4(null) - assert(filesHit(0.0, 0.0, 11.0, 11.0) == 5) + assert(boxFilesHit(snapshot, geomCol, colType, 100.0, 100.0, 101.0, 101.0) == 1) } } test("StGeometryBoxesIntersect combined with AND predicate on second column") { withTempDirAndEngine { (tablePath, engine) => val schema = new StructType() - .add("id", io.delta.kernel.types.IntegerType.INTEGER) - .add("geom", new GeometryType("OGC:CRS84")) - + .add("id", IntegerType.INTEGER) + .add("geom", colType) val idCol = new Column("id") - val geomCol = new Column("geom") - // File 0 (SW): id [1,5], geom bbox [0,0]-[3,3] - // File 1 (NE): id [10,20], geom bbox [7,7]-[10,10] - // File 2: null stats (never skipped) - val files: Seq[Option[(Int, Int, Double, Double, Double, Double)]] = - Seq( - Some((1, 5, 0.0, 0.0, 3.0, 3.0)), - Some((10, 20, 7.0, 7.0, 10.0, 10.0)), - None) - - files.zipWithIndex.foreach { - case (fileOpt, idx) => - val txn = if (idx == 0) { - getCreateTxn(engine, tablePath, schema) - } else { - getUpdateTxn(engine, tablePath) - } - - val txnState = txn.getTransactionState(engine) - val writeContext = Transaction.getWriteContext( - engine, - txnState, - Collections.emptyMap()) - - val stats = fileOpt match { - case Some((idMin, idMax, gMinX, gMinY, gMaxX, gMaxY)) => - new DataFileStatistics( - 10, - Map( - idCol -> Literal.ofInt(idMin), - geomCol -> Literal.ofGeospatialWKT( - s"POINT ($gMinX $gMinY)", - colType)).asJava, - Map( - idCol -> Literal.ofInt(idMax), - geomCol -> Literal.ofGeospatialWKT( - s"POINT ($gMaxX $gMaxY)", - colType)).asJava, - Map( - idCol -> (0L: java.lang.Long), - geomCol -> (0L: java.lang.Long)).asJava, - Optional.empty()) - case None => - new DataFileStatistics( - 10, - Collections.emptyMap(), - Collections.emptyMap(), - Collections.emptyMap(), - Optional.empty()) - } - - val filePath = - engine.getFileSystemClient.resolvePath( - writeContext.getTargetDirectory + s"/part-$idx.parquet") - val fileStatus = new DataFileStatus( - filePath, - 1000, - 0L, - Optional.of(stats)) - - val actions = Transaction.generateAppendActions( - engine, - txnState, - toCloseableIterator(Seq(fileStatus).iterator.asJava), - writeContext) - commitTransaction( - txn, - engine, - inMemoryIterable(actions)) - } + // f0: id [1,5], geom [0,0]-[3,3]; f1: id [10,20], geom [7,7]-[10,10]; f2: null stats. + val statsList = Seq[DataFileStatistics]( + stats( + minValues = Map( + idCol -> Literal.ofInt(1), + geomCol -> pointWktLiteral(0.0, 0.0, colType)), + maxValues = Map( + idCol -> Literal.ofInt(5), + geomCol -> pointWktLiteral(3.0, 3.0, colType)), + nullCounts = Map(idCol -> 0L, geomCol -> 0L)), + stats( + minValues = Map( + idCol -> Literal.ofInt(10), + geomCol -> pointWktLiteral(7.0, 7.0, colType)), + maxValues = Map( + idCol -> Literal.ofInt(20), + geomCol -> pointWktLiteral(10.0, 10.0, colType)), + nullCounts = Map(idCol -> 0L, geomCol -> 0L)), + emptyStats()) + commitGeoStatsFiles(tablePath, engine, schema, statsList) val snapshot = latestSnapshot(tablePath) - // null-stats f2 is always included in every query - - // SW geo + id<=5: f0 matches both, f2(null) included - val geoAndId = new io.delta.kernel.expressions.And( + val swGeoAndId = new And( new StGeometryBoxesIntersect( geomCol, - Literal.ofGeospatialWKT("POINT (1.0 1.0)", colType), - Literal.ofGeospatialWKT("POINT (4.0 4.0)", colType)), - new io.delta.kernel.expressions.Predicate( - "<=", - idCol, - Literal.ofInt(5))) + pointWktLiteral(1.0, 1.0, colType), + pointWktLiteral(4.0, 4.0, colType)), + new Predicate("<=", idCol, Literal.ofInt(5))) assert(collectScanFileRows( - snapshot.getScanBuilder().withFilter(geoAndId).build()).size == 2) + snapshot.getScanBuilder().withFilter(swGeoAndId).build()).size == 2) - // NE geo + id>=15: f1 matches both, f2(null) included - val geoAndId2 = new io.delta.kernel.expressions.And( + val neGeoAndId = new And( new StGeometryBoxesIntersect( geomCol, - Literal.ofGeospatialWKT("POINT (8.0 8.0)", colType), - Literal.ofGeospatialWKT("POINT (11.0 11.0)", colType)), - new io.delta.kernel.expressions.Predicate( - ">=", - idCol, - Literal.ofInt(15))) + pointWktLiteral(8.0, 8.0, colType), + pointWktLiteral(11.0, 11.0, colType)), + new Predicate(">=", idCol, Literal.ofInt(15))) assert(collectScanFileRows( - snapshot.getScanBuilder().withFilter(geoAndId2).build()).size == 2) - - // Center geo: both data files skipped, only f2(null) - val geoCenterAny = new StGeometryBoxesIntersect( - geomCol, - Literal.ofGeospatialWKT("POINT (4.0 4.0)", colType), - Literal.ofGeospatialWKT("POINT (6.0 6.0)", colType)) - assert(collectScanFileRows( - snapshot.getScanBuilder().withFilter(geoCenterAny).build()).size == 1) - } - } - - private def writeFilesWithGeometryStats( - tablePath: String, - engine: io.delta.kernel.engine.Engine, - schema: StructType, - extents: Seq[Option[(Double, Double, Double, Double)]]): Unit = { - val geomCol = new Column("geom") - extents.zipWithIndex.foreach { - case (extentOpt, idx) => - val txn = if (idx == 0) { - getCreateTxn(engine, tablePath, schema) - } else { - getUpdateTxn(engine, tablePath) - } - - val txnState = txn.getTransactionState(engine) - val writeContext = Transaction.getWriteContext( - engine, - txnState, - Collections.emptyMap()) - - val stats = extentOpt match { - case Some((minX, minY, maxX, maxY)) => - new DataFileStatistics( - 10, - Map(geomCol -> Literal.ofGeospatialWKT( - s"POINT ($minX $minY)", - colType)).asJava, - Map(geomCol -> Literal.ofGeospatialWKT( - s"POINT ($maxX $maxY)", - colType)).asJava, - Map(geomCol -> (0L: java.lang.Long)).asJava, - Optional.empty()) - case None => - new DataFileStatistics( - 10, - Collections.emptyMap(), - Collections.emptyMap(), - Collections.emptyMap(), - Optional.empty()) - } - - val filePath = - engine.getFileSystemClient.resolvePath( - writeContext.getTargetDirectory + s"/part-$idx.parquet") - val fileStatus = new DataFileStatus( - filePath, - 1000, - 0L, - Optional.of(stats)) + snapshot.getScanBuilder().withFilter(neGeoAndId).build()).size == 2) - val actions = Transaction.generateAppendActions( - engine, - txnState, - toCloseableIterator(Seq(fileStatus).iterator.asJava), - writeContext) - commitTransaction( - txn, - engine, - inMemoryIterable(actions)) + assert(boxFilesHit(snapshot, geomCol, colType, 4.0, 4.0, 6.0, 6.0) == 1) } } } diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/utils/GeoTestUtils.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/utils/GeoTestUtils.scala index a287914345b..0c87f4f281b 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/utils/GeoTestUtils.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/utils/GeoTestUtils.scala @@ -31,29 +31,10 @@ import io.delta.kernel.types.{DataType, IntegerType, StructType} import io.delta.kernel.utils.{CloseableIterable, DataFileStatus} import io.delta.kernel.utils.CloseableIterable.inMemoryIterable -/** - * Shared helpers for geospatial tests. Mix into any AnyFunSuite that already mixes in - * AbstractWriteUtils. Provides: - * - * - WKB and WKT literal construction for synthetic geometry/geography points - * - Column vectors carrying WKB binary or scalar int data - * - DataFileStatistics with min/max bounding-box literals - * - The boilerplate to stage a synthetic add file (no real Parquet write) and to - * commit one such file per stats entry against a given table - * - A box-intersect file-pruning shorthand that returns the count of scan files a - * query bounding box leaves after data skipping - * - * Suites that exercise the data-skipping path inject stats directly via - * appendActionsForGeoStatsFile rather than going through the Parquet writer; that - * keeps the test fixtures deterministic and decouples the predicate behavior from - * the write pipeline. - */ +/** Mixin for geospatial test fixtures: WKB/WKT helpers, stats injection, box-prune queries. */ trait GeoTestUtils extends AbstractWriteUtils { - /** - * Builds a 21-byte little-endian WKB encoding for POINT(x y): - * byteOrder(1) + type=1(4) + x(8) + y(8). - */ + /** 21-byte little-endian WKB for POINT(x y). */ def pointWkb(x: Double, y: Double): Array[Byte] = { val buf = ByteBuffer.allocate(21).order(ByteOrder.LITTLE_ENDIAN) buf.put(1.toByte) @@ -63,14 +44,9 @@ trait GeoTestUtils extends AbstractWriteUtils { buf.array() } - /** WKT POINT literal of the given type, e.g. for stats min/max or query bounds. */ def pointWktLiteral(x: Double, y: Double, geoType: DataType): Literal = Literal.ofGeospatialWKT(s"POINT ($x $y)", geoType) - /** - * ColumnVector backed by an in-memory Seq of WKB byte arrays. None entries become - * null rows; Some entries return their bytes via getBinary. - */ def geoColumnVector( geoType: DataType, values: Seq[Option[Array[Byte]]]): ColumnVector = new ColumnVector { @@ -81,7 +57,6 @@ trait GeoTestUtils extends AbstractWriteUtils { override def getBinary(rowId: Int): Array[Byte] = values(rowId).orNull } - /** Trivial non-null IntegerType vector. */ def intColumnVector(values: Seq[Int]): ColumnVector = new ColumnVector { override def getDataType: DataType = IntegerType.INTEGER override def getSize: Int = values.length @@ -90,10 +65,6 @@ trait GeoTestUtils extends AbstractWriteUtils { override def getInt(rowId: Int): Int = values(rowId) } - /** - * DataFileStatistics with a single geo column's min/max set to POINT WKT literals - * for the given bounding box, and null count = 0. - */ def geoStats( geomCol: Column, minX: Double, @@ -108,10 +79,7 @@ trait GeoTestUtils extends AbstractWriteUtils { Map(geomCol -> (0L: java.lang.Long)).asJava, Optional.empty()) - /** - * DataFileStatistics with no min/max recorded - the data-skipping path treats - * this file as "always include" because the predicate cannot prove non-intersection. - */ + /** Stats with no min/max recorded - data skipping must always retain such files. */ def emptyStats(numRecords: Long = 10L): DataFileStatistics = new DataFileStatistics( numRecords, Collections.emptyMap(), @@ -119,11 +87,6 @@ trait GeoTestUtils extends AbstractWriteUtils { Collections.emptyMap(), Optional.empty()) - /** - * General-purpose DataFileStatistics builder for multi-column stats. Use when a - * single file needs min/max recorded for several columns (e.g. a geo column AND - * an int column to test combined predicates). - */ def stats( minValues: Map[Column, Literal], maxValues: Map[Column, Literal], @@ -135,12 +98,7 @@ trait GeoTestUtils extends AbstractWriteUtils { nullCounts.asJava, Optional.empty()) - /** - * Stages a synthetic add file at /part-{fileIdx}.parquet with the - * provided stats (no real Parquet bytes are written), returning the actions that - * the caller should hand to commitTransaction. fileSize is non-zero so the add - * action looks plausible to downstream consumers. - */ + /** Stages a synthetic add file (no real Parquet bytes) at part-{fileIdx}.parquet. */ def appendActionsForGeoStatsFile( engine: Engine, txn: Transaction, @@ -160,11 +118,7 @@ trait GeoTestUtils extends AbstractWriteUtils { inMemoryIterable(actions) } - /** - * Commits one synthetic add file per stats entry, creating the table on the first - * commit and updating it on subsequent ones. Each entry becomes its own commit so - * tests can reason about per-file pruning post-checkpoint. - */ + /** One commit per stats entry; first creates the table, rest update. */ def commitGeoStatsFiles( tablePath: String, engine: Engine, @@ -180,11 +134,7 @@ trait GeoTestUtils extends AbstractWriteUtils { commitTransaction(txn, engine, appendActionsForGeoStatsFile(engine, txn, idx, s)) } - /** - * Returns the count of scan files a snapshot leaves after applying an - * StGeometryBoxesIntersect predicate against the given column with the given query - * bounding box. Files with missing geo stats fall through and count toward the result. - */ + /** Count of scan files left after applying StGeometryBoxesIntersect on a query bbox. */ def boxFilesHit( snapshot: Snapshot, geomCol: Column, From cf71e01170b3a6fa5870de6a9880dd07db61c205 Mon Sep 17 00:00:00 2001 From: emu Date: Fri, 8 May 2026 12:29:39 +0000 Subject: [PATCH 3/6] comment update --- .../delta/kernel/defaults/CreateCheckpointSuite.scala | 10 +++++++++- .../kernel/defaults/GeometryDataSkippingSuite.scala | 10 ++++++++-- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/CreateCheckpointSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/CreateCheckpointSuite.scala index 103041a3520..9fbccc3bcb4 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/CreateCheckpointSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/CreateCheckpointSuite.scala @@ -473,7 +473,15 @@ class CreateCheckpointSuite extends CheckpointBase with GeoTestUtils { } } - // Same fixture as GeometryDataSkippingSuite (4 quadrants + null-stats f4). + // Same fixture as GeometryDataSkippingSuite (4 quadrants + null-stats f4): + // y=10 +------+ +------+ + // | f2 | | f1 | + // y=7 +------+ +------+ + // (gap) + // y=3 +------+ +------+ + // | f0 | | f3 | + // y=0 +------+ +------+ + // x=0 x=3 x=7 x=10 private val geomCheckpointFileExtents: Seq[Option[(Double, Double, Double, Double)]] = Seq( Some((0.0, 0.0, 3.0, 3.0)), Some((7.0, 7.0, 10.0, 10.0)), diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/GeometryDataSkippingSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/GeometryDataSkippingSuite.scala index f9dbfc77e18..45f4daa5ab0 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/GeometryDataSkippingSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/GeometryDataSkippingSuite.scala @@ -27,8 +27,14 @@ import org.scalatest.funsuite.AnyFunSuite class GeometryDataSkippingSuite extends AnyFunSuite with WriteUtils with GeoTestUtils { // 4-quadrant layout + 1 null-stats file (f4, never skipped): - // f2 NW [0,7]-[3,10] f1 NE [7,7]-[10,10] - // f0 SW [0,0]-[3,3] f3 SE [7,0]-[10,3] + // y=10 +------+ +------+ + // | f2 | | f1 | + // y=7 +------+ +------+ + // (gap) + // y=3 +------+ +------+ + // | f0 | | f3 | + // y=0 +------+ +------+ + // x=0 x=3 x=7 x=10 private val fileExtents: Seq[Option[(Double, Double, Double, Double)]] = Seq( Some((0.0, 0.0, 3.0, 3.0)), Some((7.0, 7.0, 10.0, 10.0)), From 6fce014dfbe2153a295ac85d2008f4d8fc907d3f Mon Sep 17 00:00:00 2001 From: emu Date: Fri, 8 May 2026 13:44:20 +0000 Subject: [PATCH 4/6] fix scalastyle --- .../delta/kernel/defaults/DeltaTableSchemaEvolutionSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableSchemaEvolutionSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableSchemaEvolutionSuite.scala index 3eb87fb3ea7..3f964302679 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableSchemaEvolutionSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableSchemaEvolutionSuite.scala @@ -1963,7 +1963,7 @@ trait DeltaTableSchemaEvolutionSuiteBase extends AnyFunSuite with AbstractWriteU .fromMetadata(getMetadata(engine, tablePath)) } - // Adding a geometry/geography column auto-enables GEOSPATIAL_RW_FEATURE and bumps protocol to (3, 7). + // Adding a geo column auto-enables GEOSPATIAL_RW_FEATURE and bumps protocol to (3, 7). Seq( ("geometry", GeometryType.ofDefault()), ("geometry custom SRID", GeometryType.ofSRID("EPSG:4326")), From 374049ff91234dac6314dfa1763e7b06f4175e28 Mon Sep 17 00:00:00 2001 From: emu Date: Mon, 11 May 2026 11:56:41 +0000 Subject: [PATCH 5/6] address comments --- .../defaults/CreateCheckpointSuite.scala | 11 ++++----- .../defaults/DeltaTableClusteringSuite.scala | 23 +++++++++++++++++++ .../DeltaTableSchemaEvolutionSuite.scala | 20 ++++++++-------- .../defaults/DeltaTableWritesSuite.scala | 6 ++--- 4 files changed, 38 insertions(+), 22 deletions(-) diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/CreateCheckpointSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/CreateCheckpointSuite.scala index 9fbccc3bcb4..462b5d4ffbb 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/CreateCheckpointSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/CreateCheckpointSuite.scala @@ -26,7 +26,8 @@ import io.delta.kernel.defaults.utils.{GeoTestUtils, TestRow, TestUtils, WriteUt import io.delta.kernel.engine.Engine import io.delta.kernel.exceptions.{CheckpointAlreadyExistsException, TableNotFoundException} import io.delta.kernel.expressions.{Column, Literal} -import io.delta.kernel.internal.SnapshotImpl +import io.delta.kernel.internal.{SnapshotImpl, TableConfig} +import io.delta.kernel.internal.tablefeatures.TableFeatures.GEOSPATIAL_RW_FEATURE import io.delta.kernel.statistics.DataFileStatistics import io.delta.kernel.types.{GeometryType, StructType => KernelStructType} @@ -515,10 +516,7 @@ class CreateCheckpointSuite extends CheckpointBase with GeoTestUtils { Seq( ("classic", Map.empty[String, String]), - ( - "v2-typed", - Map( - io.delta.kernel.internal.TableConfig.CHECKPOINT_POLICY.getKey -> "v2"))).foreach { + ("v2-typed", Map(TableConfig.CHECKPOINT_POLICY.getKey -> "v2"))).foreach { case (label, tableProps) => test(s"data skipping survives $label checkpoint - geometry column") { withTempDirAndEngine { (tablePath, engine) => @@ -551,8 +549,7 @@ class CreateCheckpointSuite extends CheckpointBase with GeoTestUtils { val features = snapshot.getProtocol.getImplicitlyAndExplicitlySupportedFeatures assert( - features.contains(io.delta.kernel.internal.tablefeatures.TableFeatures - .GEOSPATIAL_RW_FEATURE), + features.contains(GEOSPATIAL_RW_FEATURE), s"geospatial feature missing post-checkpoint: $features") // null-stats f4 always falls through (+1 in every count). diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableClusteringSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableClusteringSuite.scala index ea522f214a6..cdf77ff0cc9 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableClusteringSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableClusteringSuite.scala @@ -180,6 +180,29 @@ trait DeltaTableClusteringSuiteBase extends AnyFunSuite with AbstractWriteUtils s"unexpected error message: ${ex.getMessage}") } } + + test(s"build table txn: clustering on a nested $label column should be rejected") { + withTempDirAndEngine { (tablePath, engine) => + val schema = new StructType() + .add("id", INTEGER) + .add( + "parent", + new StructType().add("geo", geoType)) + val ex = intercept[KernelException] { + getCreateTxn( + engine, + tablePath, + schema, + clusteringColsOpt = Some(List(new Column(Array("parent", "geo"))))) + } + assert( + ex.getMessage.contains("Clustering is not supported on geometry/geography column(s)"), + s"unexpected error message: ${ex.getMessage}") + assert( + ex.getMessage.contains("parent") && ex.getMessage.contains("geo"), + s"error should name the nested column path: ${ex.getMessage}") + } + } } test("create a clustered table should succeed") { diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableSchemaEvolutionSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableSchemaEvolutionSuite.scala index 3f964302679..69137e45afb 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableSchemaEvolutionSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableSchemaEvolutionSuite.scala @@ -29,8 +29,9 @@ import io.delta.kernel.expressions.Column import io.delta.kernel.internal.{SnapshotImpl, TableConfig} import io.delta.kernel.internal.actions.DomainMetadata import io.delta.kernel.internal.clustering.ClusteringMetadataDomain +import io.delta.kernel.internal.tablefeatures.TableFeatures.GEOSPATIAL_RW_FEATURE import io.delta.kernel.internal.util.{ColumnMapping, ColumnMappingSuiteBase} -import io.delta.kernel.types.{ArrayType, CollationIdentifier, DecimalType, FieldMetadata, GeographyType, GeometryType, IntegerType, LongType, MapType, StringType, StructType, TypeChange} +import io.delta.kernel.types.{ArrayType, CollationIdentifier, DataType, DecimalType, FieldMetadata, GeographyType, GeometryType, IntegerType, LongType, MapType, StringType, StructType, TypeChange} import io.delta.kernel.utils.CloseableIterable import io.delta.kernel.utils.CloseableIterable.emptyIterable @@ -1986,8 +1987,7 @@ trait DeltaTableSchemaEvolutionSuiteBase extends AnyFunSuite with AbstractWriteU val before = table.getLatestSnapshot(engine).asInstanceOf[SnapshotImpl] val featuresBefore = before.getProtocol.getImplicitlyAndExplicitlySupportedFeatures assert( - !featuresBefore.contains(io.delta.kernel.internal.tablefeatures.TableFeatures - .GEOSPATIAL_RW_FEATURE), + !featuresBefore.contains(GEOSPATIAL_RW_FEATURE), s"geospatial should not be present before, got: $featuresBefore") val currentSchema = before.getSchema @@ -2003,8 +2003,7 @@ trait DeltaTableSchemaEvolutionSuiteBase extends AnyFunSuite with AbstractWriteU val featuresAfter = after.getProtocol.getImplicitlyAndExplicitlySupportedFeatures assert( - featuresAfter.contains(io.delta.kernel.internal.tablefeatures.TableFeatures - .GEOSPATIAL_RW_FEATURE), + featuresAfter.contains(GEOSPATIAL_RW_FEATURE), s"geospatial should be present after, got: $featuresAfter") assert(after.getProtocol.getMinReaderVersion >= 3) assert(after.getProtocol.getMinWriterVersion >= 7) @@ -2052,8 +2051,7 @@ trait DeltaTableSchemaEvolutionSuiteBase extends AnyFunSuite with AbstractWriteU val after = table.getLatestSnapshot(engine).asInstanceOf[SnapshotImpl] val features = after.getProtocol.getImplicitlyAndExplicitlySupportedFeatures assert( - features.contains(io.delta.kernel.internal.tablefeatures.TableFeatures - .GEOSPATIAL_RW_FEATURE), + features.contains(GEOSPATIAL_RW_FEATURE), s"geospatial should be present, got: $features") assert(after.getProtocol.getMinReaderVersion >= 3) assert(after.getProtocol.getMinWriterVersion >= 7) @@ -2064,12 +2062,12 @@ trait DeltaTableSchemaEvolutionSuiteBase extends AnyFunSuite with AbstractWriteU Seq( ( "change geometry SRID", - GeometryType.ofSRID("OGC:CRS84").asInstanceOf[io.delta.kernel.types.DataType], - GeometryType.ofSRID("EPSG:4326").asInstanceOf[io.delta.kernel.types.DataType]), + GeometryType.ofSRID("OGC:CRS84").asInstanceOf[DataType], + GeometryType.ofSRID("EPSG:4326").asInstanceOf[DataType]), ( "change geography algorithm", - new GeographyType("OGC:CRS84", "spherical").asInstanceOf[io.delta.kernel.types.DataType], - new GeographyType("OGC:CRS84", "vincenty").asInstanceOf[io.delta.kernel.types.DataType])) + new GeographyType("OGC:CRS84", "spherical").asInstanceOf[DataType], + new GeographyType("OGC:CRS84", "vincenty").asInstanceOf[DataType])) .foreach { case (label, fromType, toType) => test(s"$label is rejected by schema evolution") { withTempDirAndEngine { (tablePath, engine) => diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala index 76b4f3d4c9b..4350f88c45c 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala @@ -39,6 +39,7 @@ import io.delta.kernel.internal.{ScanImpl, SnapshotImpl, TableConfig} import io.delta.kernel.internal.checkpoints.CheckpointerSuite.selectSingleElement import io.delta.kernel.internal.data.GenericRow import io.delta.kernel.internal.table.SnapshotBuilderImpl +import io.delta.kernel.internal.tablefeatures.TableFeatures.GEOSPATIAL_RW_FEATURE import io.delta.kernel.internal.types.DataTypeJsonSerDe import io.delta.kernel.internal.util.{Clock, JsonUtils} import io.delta.kernel.internal.util.SchemaUtils.casePreservingPartitionColNames @@ -2196,10 +2197,7 @@ abstract class AbstractDeltaTableWritesSuite extends AnyFunSuite with AbstractWr val protocol = snapshot.getProtocol val supported = protocol.getImplicitlyAndExplicitlySupportedFeatures - assert( - supported.contains(io.delta.kernel.internal.tablefeatures.TableFeatures - .GEOSPATIAL_RW_FEATURE), - s"protocol features: $supported") + assert(supported.contains(GEOSPATIAL_RW_FEATURE), s"protocol features: $supported") assert(protocol.getMinReaderVersion >= 3) assert(protocol.getMinWriterVersion >= 7) From 6bb52314ef479fab40521657930eda3cc7948260 Mon Sep 17 00:00:00 2001 From: emu Date: Mon, 18 May 2026 13:38:05 +0000 Subject: [PATCH 6/6] address comment --- .../kernel/defaults/DeltaTableSchemaEvolutionSuite.scala | 8 ++++---- .../io/delta/kernel/defaults/DeltaTableWritesSuite.scala | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableSchemaEvolutionSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableSchemaEvolutionSuite.scala index 69137e45afb..1ce066f63da 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableSchemaEvolutionSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableSchemaEvolutionSuite.scala @@ -2005,8 +2005,8 @@ trait DeltaTableSchemaEvolutionSuiteBase extends AnyFunSuite with AbstractWriteU assert( featuresAfter.contains(GEOSPATIAL_RW_FEATURE), s"geospatial should be present after, got: $featuresAfter") - assert(after.getProtocol.getMinReaderVersion >= 3) - assert(after.getProtocol.getMinWriterVersion >= 7) + assert(after.getProtocol.getMinReaderVersion == 3) + assert(after.getProtocol.getMinWriterVersion == 7) } } } @@ -2053,8 +2053,8 @@ trait DeltaTableSchemaEvolutionSuiteBase extends AnyFunSuite with AbstractWriteU assert( features.contains(GEOSPATIAL_RW_FEATURE), s"geospatial should be present, got: $features") - assert(after.getProtocol.getMinReaderVersion >= 3) - assert(after.getProtocol.getMinWriterVersion >= 7) + assert(after.getProtocol.getMinReaderVersion == 3) + assert(after.getProtocol.getMinWriterVersion == 7) } } diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala index 4350f88c45c..c0429f63161 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala @@ -2198,8 +2198,8 @@ abstract class AbstractDeltaTableWritesSuite extends AnyFunSuite with AbstractWr val protocol = snapshot.getProtocol val supported = protocol.getImplicitlyAndExplicitlySupportedFeatures assert(supported.contains(GEOSPATIAL_RW_FEATURE), s"protocol features: $supported") - assert(protocol.getMinReaderVersion >= 3) - assert(protocol.getMinWriterVersion >= 7) + assert(protocol.getMinReaderVersion == 3) + assert(protocol.getMinWriterVersion == 7) // id-keyed compare; cross-file/cross-partition row order is not guaranteed. val expected = (rowsBatch1 ++ rowsBatch2).map { case (id, bytes) =>