feat(python/sedonadb): add DataFrame.agg for global aggregation#887
Conversation
There was a problem hiding this comment.
Pull request overview
Adds Python DataFrame.agg(*exprs) support for global, ungrouped aggregation, using the existing function-registry expression dispatch and a new Rust binding over DataFusion aggregation.
Changes:
- Adds
InternalDataFrame::aggregate(group_exprs, agg_exprs)in Rust. - Adds Python
DataFrame.agg()validation and lazy DataFrame return path. - Adds coverage for aggregate execution, errors, lazy return, and filter composition.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
python/sedonadb/src/dataframe.rs |
Adds the Rust aggregate binding over DataFusion DataFrame::aggregate. |
python/sedonadb/python/sedonadb/dataframe.py |
Adds the public Python DataFrame.agg(*exprs) API. |
python/sedonadb/tests/expr/test_dataframe_agg.py |
Adds tests for global aggregation behavior and validation. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| df.agg("x") | ||
|
|
||
|
|
||
| def test_agg_chains_with_select(con): |
There was a problem hiding this comment.
Renamed to test_agg_chains_with_filter in 6fdc21c.
| /// The Python side guarantees `agg_exprs` is non-empty. Argument | ||
| /// shape validation (every entry being an aggregate-shaped `Expr`) | ||
| /// happens Python-side. DataFusion's plan-build raises a clear | ||
| /// error if a non-aggregate Expr is passed in `agg_exprs`, so we | ||
| /// don't try to enforce that here. |
There was a problem hiding this comment.
Rewrote the comment in 6fdc21c to drop the contradictory "Python-side validation" sentence — the Python wrapper only checks isinstance(e, Expr), not aggregate-shapedness, and DataFusion's plan-build catches the rest.
paleolimbot
left a comment
There was a problem hiding this comment.
Exciting...thank you!
Mostly nits...I'm hoping we can rename to aggregate (which is what DuckDB and Ibis call this).
| For grouped aggregation use `DataFrame.group_by(...).agg(...)` | ||
| (lands in a follow-up PR). | ||
|
|
There was a problem hiding this comment.
| For grouped aggregation use `DataFrame.group_by(...).agg(...)` | |
| (lands in a follow-up PR). |
There was a problem hiding this comment.
Dropped in 2196d03 — no more forward-reference to grouped agg in the docstring.
| >>> from sedonadb.expr import col | ||
| >>> sd = sedona.db.connect() | ||
| >>> df = sd.sql("SELECT * FROM (VALUES (1), (2), (3), (4)) AS t(x)") | ||
| >>> df.agg(sd.funcs.sum(col("x")).alias("total")).show() |
There was a problem hiding this comment.
| >>> from sedonadb.expr import col | |
| >>> sd = sedona.db.connect() | |
| >>> df = sd.sql("SELECT * FROM (VALUES (1), (2), (3), (4)) AS t(x)") | |
| >>> df.agg(sd.funcs.sum(col("x")).alias("total")).show() | |
| >>> sd = sedona.db.connect() | |
| >>> df = sd.sql("SELECT * FROM (VALUES (1), (2), (3), (4)) AS t(x)") | |
| >>> df.agg(sd.funcs.sum(sd.col("x")).alias("total")).show() |
There was a problem hiding this comment.
Switched to sd.col("x") in 2196d03 — the from sedonadb.expr import col line is gone from the doctest.
| def agg(self, *exprs: Expr) -> "DataFrame": | ||
| """Aggregate the entire DataFrame to a single row. |
There was a problem hiding this comment.
- Can we call this
aggregate()? (Ibis, DuckDB) - Can we expose
**kwargslike is done inselect()?df.aggregate(x_sum=df.x.sum())is much more compact thandf.aggregate(df.x.sum().alias("x_sum"))and is allowed by Ibis.
There was a problem hiding this comment.
PySpark, Pandas and Polars all use agg. I'd like to keep it that way.
There was a problem hiding this comment.
kwargs added in 2196d03 — df.agg(total=sd.funcs.sum(sd.col("x"))) now desugars to …sum(sd.col("x")).alias("total"), and positional + named can mix. Three new tests cover the kwarg path, mixed positional/kwarg, and the non-Expr kwarg value rejection.
| /// from `DataFrame.agg`) and grouped aggregation (called from | ||
| /// `DataFrame.group_by(...).agg(...)` once that lands). |
There was a problem hiding this comment.
| /// from `DataFrame.agg`) and grouped aggregation (called from | |
| /// `DataFrame.group_by(...).agg(...)` once that lands). | |
| /// from `DataFrame.agg`) and grouped aggregation. |
First DataFrame consumer of the function-registry dispatch landed in apache#885. Builds the call site that grouped aggregation will also use. API: df.agg(sd.funcs.sum(col("x")).alias("total")) df.agg( sd.funcs.sum(col("x")).alias("sum_x"), sd.funcs.count(col("y")).alias("n"), sd.funcs.min(col("x")).alias("lo"), sd.funcs.max(col("x")).alias("hi"), ) - Varargs of aggregate `Expr` values. Aggregate exprs come from `sd.funcs.<name>(args)` via apache#885; no per-aggregate plumbing in this PR (or any future PR — that's the whole point of the registry dispatch). - Strings rejected — `df.agg("x")` has no meaning since a bare column isn't an aggregate. No auto-promotion. - Empty `df.agg()` → ValueError; non-Expr arg → TypeError. - Returns a one-row DataFrame. Rust side: `InternalDataFrame::aggregate(group_exprs, agg_exprs)` is the generic binding for both `DataFrame.agg` (this PR — passes an empty `group_exprs`) and `DataFrame.group_by(*keys).agg(*aggs)` (next PR — same Rust call, with `group_exprs` populated). One binding serves both surfaces. Tests: 9 covering single-aggregate (sum/count), min+max paired, avg over a compound expression, multiple-aggregates-one-row, lazy return, both error paths, and chained `filter().agg()` for plan composition.
paleolimbot
left a comment
There was a problem hiding this comment.
PySpark, Pandas and Polars all use agg. I'd like to keep it that way
At this point our API has little in common with PySpark, Pandas, and Polars but we can always alias it later if LLMs or their humans get confused. Either works for me 🙂
(Later we can add sedona.db.pyspark.connect() if that's a compatibility layer that is important)
Grouped aggregation on top of the registry-driven function dispatch (apache#885) and the global-aggregation binding (apache#887). API: df.group_by("k").agg(total=sd.funcs.sum(sd.col("v"))) df.group_by("k1", "k2").agg( sd.funcs.sum(col("x")).alias("sum_x"), n=sd.funcs.count(col("y")), ) df.group_by(col("x") + col("y")).agg(...) df.group_by(col("k"), "other_key").agg(...) - `df.group_by(*keys)` — varargs of `str | Expr`. Strings auto-promote to `col(name)`; arbitrary `Expr` values are accepted as computed group keys. Empty keys → ValueError; non-str/non-Expr → TypeError. - Returns a new `GroupedDataFrame` — a thin holder for the parent df plus the resolved group exprs. Single method `.agg(*exprs, **named_exprs)` with the same shape as `DataFrame.agg`. Pure Python — the Rust `InternalDataFrame::aggregate(group_exprs, agg_exprs)` from apache#887 already handles the grouped case; this PR just populates `group_exprs` when constructing the aggregation. The `GroupedDataFrame` intermediate is kept minimal (one method beyond `__init__`) so it stays a clean place to add convenience aggregates (`count`, `size`, etc.) later without polluting `DataFrame`. Tests: 12 covering single/multi string keys, Expr keys, computed Expr keys, mixed str/Expr, positional + kwarg agg, lazy return type, and the empty/bad-type error paths for both `group_by` and its `.agg`.
First DataFrame consumer of the function-registry dispatch landed in #885. Adds global (ungrouped) aggregation; grouped aggregation (
DataFrame.group_by(*keys).agg(*aggs)) is the next small PR, sharing the same Rust binding.API
Exprvalues, built viasd.funcs.<name>(args)from feat(python/sedonadb): Expose scalar and aggregate udfs from context registry #885.df.agg()→ValueError; non-Expr arg →TypeError.Why this is so small
The function-registry dispatch in #885 means
sd.funcs.sum,sd.funcs.count,sd.funcs.min,sd.funcs.max,sd.funcs.avg— and every other built-in / plugin / Python-registered aggregate — are already callable. This PR doesn't need any per-aggregate plumbing on either the Rust or Python side. One Rust binding, one Python method, a test file.Implementation
python/sedonadb/src/dataframe.rsInternalDataFrame::aggregate(group_exprs, agg_exprs). Generic wrapper over DataFusion'sDataFrame::aggregate. Shared with the upcominggroup_byPR — that path passes a populatedgroup_exprs.python/sedonadb/python/sedonadb/dataframe.pyDataFrame.agg(*exprs). Calls the Rust binding with an emptygroup_exprs.Test plan
9 tests in
tests/expr/test_dataframe_agg.py:sum; singlecount; pairedmin/max;avgover a compound expressioncol("x") + col("y"); four aggregates yielding a one-row four-column result.isinstance(out, DataFrame).agg()→ValueError; non-Expr arg →TypeError.filter().agg()produces the right result.All assertions use
pd.testing.assert_frame_equalfor outputs.Local: 9 unit + 22 doctests +
ruff format+ruff checkall clean.