[AURON #2358] Support native last / last(ignoreNulls) aggregate#2359
[AURON #2358] Support native last / last(ignoreNulls) aggregate#2359zhuxiangyi wants to merge 1 commit into
Conversation
605140f to
bcb152a
Compare
There was a problem hiding this comment.
Pull request overview
Adds native (vectorized) execution support for Spark’s last / last(ignoreNulls) aggregates by implementing corresponding native accumulators in datafusion-ext-plans and wiring the end-to-end Spark ↔ protobuf ↔ planner ↔ native execution path, with coverage in both Rust unit tests and Spark (Scala) integration tests.
Changes:
- Implement native
AggLastandAggLastIgnoresNullcolumnar accumulators (including two-phase partial→final merge behavior) and register them in the native agg factory. - Extend the protobuf contract and planner mappings to recognize
LAST/LAST_IGNORES_NULL, including window aggregate mapping. - Add Spark-side aggregate conversion and native buffer schema declarations, plus Spark 3.4/3.5 end-to-end tests validating correctness and offload.
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggBase.scala | Declares native agg buffer schema for Last to match native accumulator layouts. |
| spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala | Converts Spark Last aggregate into native protobuf LAST / LAST_IGNORES_NULL. |
| native-engine/datafusion-ext-plans/src/agg/mod.rs | Registers new last modules and extends AggFunction enum with Last variants. |
| native-engine/datafusion-ext-plans/src/agg/last.rs | Implements native last(col) accumulator with value + “visited” flag semantics. |
| native-engine/datafusion-ext-plans/src/agg/last_ignores_null.rs | Implements native last(col, ignoreNulls=true) accumulator with overwrite-on-non-null semantics. |
| native-engine/datafusion-ext-plans/src/agg/agg.rs | Wires AggFunction::{Last,LastIgnoresNull} into create_agg. |
| native-engine/datafusion-ext-plans/src/agg_exec.rs | Adds Rust unit test validating two-phase aggregation semantics for both last modes. |
| native-engine/auron-planner/src/planner.rs | Adds window-function mapping for protobuf Last variants to native AggFunction. |
| native-engine/auron-planner/src/lib.rs | Adds protobuf→native AggFunction conversions for Last variants. |
| native-engine/auron-planner/proto/auron.proto | Extends AggFunction protobuf enum with LAST / LAST_IGNORES_NULL. |
| auron-spark-tests/spark35/src/test/scala/org/apache/spark/sql/AuronDataFrameAggregateSuite.scala | Adds Spark 3.5 E2E test asserting correctness and native offload for last. |
| auron-spark-tests/spark34/src/test/scala/org/apache/spark/sql/AuronDataFrameAggregateSuite.scala | Adds Spark 3.4 E2E test asserting correctness and native offload for last. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| } | ||
|
|
||
| #[tokio::test(flavor = "multi_thread", worker_threads = 1)] | ||
| async fn test_agg_last() -> Result<()> { |
There was a problem hiding this comment.
The changes look good to me — just one comment. test_agg_last covers the Int32 arm, but the utf8/binary arms take a different partial_merge path — the take_value move-out in last.rs / last_ignores_null.rs — that Int32 never exercises, and there's no first-family unit test covering it either. Would a string-typed group in this same test be worth adding? Genuinely optional — the code reads as a faithful mirror, so this is durability against a future edit rather than a suspected bug. I'll leave the final call to the maintainers.
There was a problem hiding this comment.
Done — added a string-typed group to test_agg_last. It adds a nullable Utf8 column s alongside v, with last(s) / last(s, ignoreNulls) aggregates, so the merge now routes through the AccBytesColumn take_value move-out in partial_merge that the Int32 column never exercised, for both last variants (k=1 → last(s)="c", k=2 → last(s)=null / last(s, ign)="x"). Thanks for the durability nudge.
I also renumbered the proto enum here to LAST = 13 / LAST_IGNORES_NULL = 14 to deconflict with #2363 (which now takes 10/11/12), so the two PRs no longer alias wire numbers regardless of merge order.
Implement native last / last(ignoreNulls) aggregates, mirroring the existing first implementation with "later value wins" semantics: - native: add AggLast / AggLastIgnoresNull (agg/last.rs, agg/last_ignores_null.rs); wire through the AggFunction enum, create_agg, the protobuf contract (LAST / LAST_IGNORES_NULL), the protobuf->AggFunction conversion, and the window-agg mapping. - spark-extension: add the Last expression conversion in NativeConverters; declare the Last native aggregate buffer schema in NativeAggBase.computeNativeAggBufferDataTypes ([dataType] for ignoreNulls, [dataType, Boolean] otherwise) so the partial -> shuffle -> final buffer schema matches the native side. Tests: - Rust unit test agg_exec::test::test_agg_last (partial -> final, nulls). - Scala e2e AuronDataFrameAggregateSuite "native last / last(ignoreNulls) aggregate" (spark34 + spark35), covering the partial -> shuffle -> final native path and asserting NativeAggBase offload.
bcb152a to
24dda85
Compare
Which issue does this PR close?
Closes #2358
Rationale for this change
Auron currently accelerates
first/first(ignoreNulls)natively, butlast/last(ignoreNulls)fall back to the generic UDAF path (a JNI call back into the JVM), losing vectorized acceleration. This PR adds nativelastsupport.What changes are included in this PR?
datafusion-ext-plans): addAggLast/AggLastIgnoresNull(agg/last.rs,agg/last_ignores_null.rs), mirroring thefirstcolumnar accumulators with "later value wins" semantics. Wire them through theAggFunctionenum,create_agg, the protobuf contract (LAST/LAST_IGNORES_NULL), theprotobuf::AggFunction -> AggFunctionconversion, and the window-aggregate mapping.Lastexpression conversion inNativeConverters.convertAggregateExpr; declare theLastnative aggregate buffer schema inNativeAggBase.computeNativeAggBufferDataTypes([dataType]forignoreNulls,[dataType, Boolean]otherwise) so the partial -> shuffle -> final buffer schema matches the native side.Are there any user-facing changes?
Yes.
last(col)andlast(col, ignoreNulls = true)are now executed natively (vectorized); previously they fell back to the UDAF path.How was this patch tested?
agg_exec::test::test_agg_last: partial -> final two-phase aggregation over a nullable column, verifyinglast(keeps the last visited row including null) andlast(ignoreNulls)(keeps the last non-null value).AuronDataFrameAggregateSuite("native last / last(ignoreNulls) aggregate", spark34 + spark35): a grouped aggregate exercising the full partial -> shuffle -> final native path, asserting correct values and that the plan offloads toNativeAggBase.