[AURON #2178] [AURON #2179] Implement native support for first_value and last_value window functions#2200
Conversation
There was a problem hiding this comment.
Pull request overview
Adds native execution support for Spark first_value / last_value window functions (including IGNORE NULLS) by wiring them through Auron’s existing aggregate-window infrastructure and extending the native aggregate set.
Changes:
- Extend Spark-side window/aggregate conversion to emit
FIRST(_IGNORES_NULL)and newLAST(_IGNORES_NULL)agg functions. - Add native-engine Rust implementations for
LastandLastIgnoresNull, plus factory/planner/proto enum wiring. - Add Scala regression tests covering
first_value/last_valuewith RESPECT/IGNORE NULLS.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeWindowBase.scala | Adds window-function mapping for First/Last aggregates to protobuf agg functions. |
| spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala | Adds group-aggregate conversion support for Last / Last IGNORE NULLS. |
| spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronWindowSuite.scala | Adds regression coverage for first_value / last_value (incl. IGNORE NULLS). |
| native-engine/datafusion-ext-plans/src/agg/mod.rs | Registers new Last / LastIgnoresNull modules and enum variants. |
| native-engine/datafusion-ext-plans/src/agg/last.rs | Implements AggLast (RESPECT NULLS) aggregate behavior. |
| native-engine/datafusion-ext-plans/src/agg/last_ignores_null.rs | Implements AggLastIgnoresNull aggregate behavior. |
| native-engine/datafusion-ext-plans/src/agg/agg.rs | Wires new agg functions into the aggregate factory. |
| native-engine/auron-planner/src/planner.rs | Maps protobuf window agg functions to native AggFunction::Last*. |
| native-engine/auron-planner/src/lib.rs | Adds protobuf-to-native enum conversion for Last*. |
| native-engine/auron-planner/proto/auron.proto | Extends protobuf AggFunction enum with LAST and LAST_IGNORES_NULL. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| package org.apache.auron | ||
|
|
||
| import org.apache.spark.sql.AuronQueryTest | ||
| import org.apache.spark.sql.execution.auron.plan.NativeWindowBase |
There was a problem hiding this comment.
Addressed in 1b08978: after rebasing with the current window tests, NativeWindowBase is used by the lead IGNORE NULLS fallback assertion, so the import is no longer unused.
| let accs = downcast_any!(accs, mut AccScalarValueColumn)?; | ||
| idx_for_zipped! { | ||
| ((acc_idx, partial_arg_idx) in (acc_idx, partial_arg_idx)) => { | ||
| if partial_arg.is_valid(partial_arg_idx) { | ||
| accs.set_value(acc_idx, compacted_scalar_value_from_array(partial_arg, partial_arg_idx)?); | ||
| } else { | ||
| accs.set_value(acc_idx, ScalarValue::try_from(&self.data_type)?); | ||
| } | ||
| } |
There was a problem hiding this comment.
Addressed in 1b08978: AggLast now computes the typed null scalar once before the zipped loop and clones it for null rows.
weiqingy
left a comment
There was a problem hiding this comment.
Thanks for taking this on — nice that first_value needed zero new Rust by reusing the existing AggProcessor / aggregate-window infrastructure, and AggLast reads cleanly as the "always overwrite" inversion of AggFirst. The tests are well-built too: checkSparkAnswerAndOperator both compares against vanilla Spark and asserts the native window operator actually ran. A few questions inline.
| }) | ||
| aggBuilder.addChildren(convertExpr(child)) | ||
|
|
||
| case Last(child, ignoresNullExpr) => |
There was a problem hiding this comment.
This case Last(child, ignoresNullExpr) adds last as a plain group-aggregate, which is a separate path from the window conversion in NativeWindowBase. All four new tests go through the window operator, so this group-agg branch ships without coverage — the new AggLast / AggLastIgnoresNull aggregates are only ever driven through the window path in CI.
The PR is framed around window functions, so I'm curious where this branch fits: is a SQL last(...) / last_value(...) as a GROUP BY aggregate reachable and intended here? If it is, would one checkSparkAnswerAndOperator test through the group-agg path make sense — Spark's own group-agg Last is nondeterministic, so pinning a tested behavior seems worth doing before relying on it. If it isn't reachable yet, would it be cleaner to defer it so it doesn't ship untested?
There was a problem hiding this comment.
Addressed in 1b08978: added deterministic coverage for the last(...) group-aggregate path through checkSparkAnswerAndOperator.
| test("last_value window function") { | ||
| withSQLConf("spark.auron.enable.window" -> "true") { | ||
| withTable("t1") { | ||
| sql("create table t1(id int, grp int, v string) using parquet") |
There was a problem hiding this comment.
The v column is string across all four tests, so only the DataType::Utf8 / handle_bytes! arm of the new aggregates runs in CI. The primitive (handle_primitive!, e.g. int/long/double), DataType::Boolean, and scalar _other arms of partial_update / partial_merge are never exercised — and the primitive merge arm copies unconditionally (including None), which is the "last wins" inversion of First's guarded write, so it'd be good to have it run at least once.
Would it be worth adding one window test over an int column and one over a boolean column? That'd cover the primitive and boolean arms that are currently dead in CI.
There was a problem hiding this comment.
Addressed in 1b08978: added last_value window coverage over both int and boolean columns, including IGNORE NULLS variants.
| withSQLConf("spark.auron.enable.window" -> "true") { | ||
| withTable("t1") { | ||
| sql("create table t1(id int, grp int, v string) using parquet") | ||
| sql("insert into t1 values (1, 1, 'a'), (2, 1, null), (3, 1, 'c'), (4, 2, 'x')") |
There was a problem hiding this comment.
Group 1 here starts with 'a' (non-null), so the leading value is never null. That means the case that actually distinguishes RESPECT NULLS from IGNORE NULLS — a group whose first ordered row is null, where first_value should return that leading null while ignore nulls skips to the first non-null — isn't hit by this test. The ignore-nulls test does lead with a null, but the plain RESPECT-NULLS test doesn't.
Could group 1 start with a null instead (e.g. (1, 1, null), (2, 1, 'b'), ...)? That makes the test show RESPECT NULLS returning the leading null, which is the most diagnostic shape for first_value.
There was a problem hiding this comment.
Addressed in 1b08978: the RESPECT NULLS first_value test now starts group 1 with a null value.
| if partial_arg.is_valid(partial_arg_idx) { | ||
| accs.set_value(acc_idx, compacted_scalar_value_from_array(partial_arg, partial_arg_idx)?); | ||
| } else { | ||
| accs.set_value(acc_idx, ScalarValue::try_from(&self.data_type)?); |
There was a problem hiding this comment.
Reinforcing the earlier note on this line with one more data point: this scalar (_other) null branch rebuilds ScalarValue::try_from(&self.data_type)? per row, whereas AggFirst's equivalent branch writes the cheap constant ScalarValue::Null (first.rs:178), and AccScalarValueColumn already precomputes this exact value once as null_value (acc.rs:577). It's correct as-is and only affects the scalar path, but the per-row try_from could be hoisted above the idx_for_zipped! loop or replaced with the precomputed null.
There was a problem hiding this comment.
Addressed in 1b08978: hoisted the typed null scalar out of the per-row scalar null branch in AggLast.
…first_value and last_value window functions Spark `first_value(...)` and `last_value(...)` are not supported in Auron's native window execution path, causing queries using them to fall back to Spark instead of running natively. This extends native window coverage for both functions through the existing aggregate-window infrastructure. Changes included here: - map Spark window `First` and `Last` expressions to native aggregate window functions - add native `AggLast` and `AggLastIgnoresNull` implementations - extend protobuf, planner, and aggregate factory mappings for `LAST` and `LAST_IGNORES_NULL` - support `last(...)` as a native group aggregate and cover that path with a deterministic test - add Scala regression coverage for leading-null `first_value`, string/int/boolean `last_value`, and IGNORE NULLS variants - hoist the typed-null scalar value in `AggLast` instead of rebuilding it for every null row Signed-off-by: Asish Kumar <officialasishkumar@gmail.com>
6ae981c to
1b08978
Compare
Which issue does this PR close?
Closes #2178
Closes #2179
Rationale for this change
Spark
first_value(...)andlast_value(...)are not supported in Auron's native window execution path, causing queries using them to fall back instead of being executed natively.This PR extends native window coverage to include both functions using the existing aggregate window infrastructure.
What changes are included in this PR?
first_value:
FirstandFirst IGNORE NULLShandling toNativeWindowBaseAggFunction::First/AggFunction::FirstIgnoresNullRust aggregates viaAggProcessorROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROWnaturally produce correct first-value behavior through the existingAggProcessorfirst_valuelast_value:
AggFunction::LastandAggFunction::LastIgnoresNullto the Rust aggregate infrastructureAggLast(always updates accumulator, including with null values) andAggLastIgnoresNull(updates accumulator only for non-null values)AggFunctionenum withLAST = 10andLAST_IGNORES_NULL = 11LastandLast IGNORE NULLShandling toNativeWindowBaseAdditional changes:
Lastimport and conversion case toNativeConverters.scalasolast()as a group aggregate also works nativelyfirst_valueandlast_value(both RESPECT NULLS and IGNORE NULLS variants)Both functions use
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, consistent with all other aggregate window functions in Auron.Are there any user-facing changes?
Yes.
Queries using
first_value(...)andlast_value(...)can now remain on Auron's native window execution path.Both RESPECT NULLS (default) and IGNORE NULLS variants are supported.
How was this patch tested?
CI.