diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/stats/DataFiltersBuilder.scala b/spark/src/main/scala/org/apache/spark/sql/delta/stats/DataFiltersBuilder.scala index 37e679f1d82..ff52b8cfacf 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/stats/DataFiltersBuilder.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/stats/DataFiltersBuilder.scala @@ -102,7 +102,7 @@ class DataFiltersBuilder( filters.reduceOption { (a, b) => (a, b) match { case (Some(a), Some(b)) => - Some(DataSkippingPredicate(a.expr || b.expr, a.referencedStats ++ b.referencedStats)) + Some(DataSkippingPredicate.or(a, b)) case _ => None } }.flatten @@ -263,9 +263,7 @@ class DataFiltersBuilder( val e1Filter = constructDataFilters(e1, isNullExpansionDepth) val e2Filter = constructDataFilters(e2, isNullExpansionDepth) if (e1Filter.isDefined && e2Filter.isDefined) { - Some(DataSkippingPredicate( - e1Filter.get.expr && e2Filter.get.expr, - e1Filter.get.referencedStats ++ e2Filter.get.referencedStats)) + Some(DataSkippingPredicate.and(e1Filter.get, e2Filter.get)) } else if (e1Filter.isDefined) { e1Filter } else { @@ -312,9 +310,7 @@ class DataFiltersBuilder( val e1Filter = constructDataFilters(e1, isNullExpansionDepth) val e2Filter = constructDataFilters(e2, isNullExpansionDepth) if (e1Filter.isDefined && e2Filter.isDefined) { - Some(DataSkippingPredicate( - e1Filter.get.expr || e2Filter.get.expr, - e1Filter.get.referencedStats ++ e2Filter.get.referencedStats)) + Some(DataSkippingPredicate.or(e1Filter.get, e2Filter.get)) } else { None } @@ -760,4 +756,3 @@ class DataFiltersBuilder( true } } - diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingReader.scala b/spark/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingReader.scala index 261fc15dc75..ef770d57dea 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingReader.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingReader.scala @@ -96,27 +96,63 @@ object StatsColumn { * NOTE: It would be more accurate to call these "file keeping" predicates, because they specify the * set of files a query must examine, not the set of rows a query can safely skip. */ -private [sql] case class DataSkippingPredicate( - expr: Column, - referencedStats: Set[StatsColumn] -) +private [sql] class DataSkippingPredicate private( + val expr: Column, + val referencedStats: Set[StatsColumn], + validatedExpr: (Set[StatsColumn] => Column) => Column) { + /** + * Returns a file-level predicate that is safe to evaluate against stats. + * + * `expr` can only be trusted when all referenced stats are present for a file. This method adds + * the required missing-stats checks while preserving the predicate's boolean shape, so one side + * of an AND can still skip a file when the other side's stats are missing. + */ + def withStatsValidation(verifyStatsForFilter: Set[StatsColumn] => Column): Column = { + validatedExpr(verifyStatsForFilter) + } +} -/** - * Overloads the constructor for `DataSkippingPredicate`, allowing callers to pass referenced stats - * as individual arguments, rather than wrapped up as a Set. - * - * For example, instead of this: - * - * DataSkippingPredicate(pred, Set(stat1, stat2)) - * - * We can just do: - * - * DataSkippingPredicate(pred, stat1, stat2) - */ private [sql] object DataSkippingPredicate { + def apply(filters: Column, referencedStats: Set[StatsColumn]): DataSkippingPredicate = { + new DataSkippingPredicate( + filters, + referencedStats, + verifyStatsForFilter => filters || !verifyStatsForFilter(referencedStats)) + } + + /** + * Overloads the constructor for `DataSkippingPredicate`, allowing callers to pass referenced + * stats as individual arguments, rather than wrapped up as a Set. + * + * For example, instead of this: + * + * DataSkippingPredicate(pred, Set(stat1, stat2)) + * + * We can just do: + * + * DataSkippingPredicate(pred, stat1, stat2) + */ def apply(filters: Column, referencedStats: StatsColumn*): DataSkippingPredicate = { DataSkippingPredicate(filters, referencedStats.toSet) } + + def and(left: DataSkippingPredicate, right: DataSkippingPredicate): DataSkippingPredicate = { + new DataSkippingPredicate( + left.expr && right.expr, + left.referencedStats ++ right.referencedStats, + verifyStatsForFilter => + left.withStatsValidation(verifyStatsForFilter) && + right.withStatsValidation(verifyStatsForFilter)) + } + + def or(left: DataSkippingPredicate, right: DataSkippingPredicate): DataSkippingPredicate = { + new DataSkippingPredicate( + left.expr || right.expr, + left.referencedStats ++ right.referencedStats, + verifyStatsForFilter => + left.withStatsValidation(verifyStatsForFilter) || + right.withStatsValidation(verifyStatsForFilter)) + } } /** @@ -619,13 +655,14 @@ trait DataSkippingReaderBase val (partitionSize, partitionFilter) = buildSizeCollectorFilter() val (scanSize, scanFilter) = buildSizeCollectorFilter() - // NOTE: If any stats are missing, the value of `dataFilters` is untrustworthy -- it could be - // NULL or even just plain incorrect. We rely on `verifyStatsForFilter` to be FALSE in that - // case, forcing the overall OR to evaluate as TRUE no matter what value `dataFilters` takes. + // NOTE: If a filter's required stats are missing, that filter's value is untrustworthy -- it + // could be NULL or even just plain incorrect. Missing stats force that filter's file-keeping + // predicate to TRUE, but compound filters preserve their boolean shape. This lets one side of + // an AND still prune a file even if the other side's stats are missing. val filteredFiles = withStats.where( totalFilter(trueLiteral) && partitionFilter(partitionFilters) && - scanFilter(dataFilters.expr || !verifyStatsForFilter(dataFilters.referencedStats)) + scanFilter(dataFilters.withStatsValidation(verifyStatsForFilter)) ) val statsColumn = if (keepNumRecords) { @@ -796,9 +833,7 @@ trait DataSkippingReaderBase val finalSkippingFilters = skippingFilters .map(_._2.get) - .reduceOption((skip1, skip2) => DataSkippingPredicate( - // Fold the filters into a conjunction, while unioning their referencedStats. - skip1.expr && skip2.expr, skip1.referencedStats ++ skip2.referencedStats)) + .reduceOption(DataSkippingPredicate.and) .getOrElse(DataSkippingPredicate(trueLiteral)) val (files, sizes) = { diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/stats/DataSkippingDeltaTests.scala b/spark/src/test/scala/org/apache/spark/sql/delta/stats/DataSkippingDeltaTests.scala index 64c2aa835af..d8593bfb7b6 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/stats/DataSkippingDeltaTests.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/stats/DataSkippingDeltaTests.scala @@ -888,6 +888,23 @@ trait DataSkippingDeltaTestsBase extends QueryTest checkAnswer(df.where("value > 0"), Seq(Row(1), Row(2), Row(3))) } + test("data skipping with partially missing stats in AND") { + val tempDir = Utils.createTempDir() + + withSQLConf(getDataSkippingConfs(indexedCols = 1, deltaStatsColNamesOpt = None).toSeq: _*) { + Seq((1, 10)).toDF("a", "b").coalesce(1).write.format("delta").save(tempDir.toString) + } + + setNumIndexedColumns(tempDir.toString, numIndexedCols = 2) + Seq((2, 20)).toDF("a", "b").coalesce(1).write.format("delta").mode("append") + .save(tempDir.toString) + + val log = DeltaLog.forTable(spark, new Path(tempDir.toString)) + + assert(filesRead(log, "a < 0 AND b < 100") == 0) + assert(filesRead(log, "a > 0 AND b < 100") == 2) + } + test("data skipping stats before and after optimize") { assume(!catalogOwnedDefaultCreationEnabledInTests, "OPTIMIZE is blocked on catalog-managed tables")