Skip to content

[AURON #2177] Implement native support for lag window function#2199

Open
officialasishkumar wants to merge 1 commit into
apache:masterfrom
officialasishkumar:feat/native-window-lag
Open

[AURON #2177] Implement native support for lag window function#2199
officialasishkumar wants to merge 1 commit into
apache:masterfrom
officialasishkumar:feat/native-window-lag

Conversation

@officialasishkumar

@officialasishkumar officialasishkumar commented Apr 13, 2026

Copy link
Copy Markdown

Which issue does this PR close?

Closes #2177

Rationale for this change

Auron's native window support did not support Spark lag(...), causing supported lag queries to fall back instead of being executed natively.

Current master already has native support for Spark lead(...). Spark models Lag and Lead as offset window functions, and Spark's Lag.offset is the signed offset that can be evaluated by the existing native LEAD path. This PR therefore maps supported Lag expressions to the existing native LEAD window function instead of adding a parallel native LAG enum, planner arm, or Rust processor.

What changes are included in this PR?

This PR:

  • adds Lag handling in NativeWindowBase
  • maps Lag to WindowFunction.LEAD using Spark's signed e.offset
  • preserves Spark fallback for unsupported lag(...) IGNORE NULLS
  • makes reflective ignoreNulls detection deterministic by defaulting to false if reflection is unavailable or fails
  • hardens the shared LeadProcessor by returning a DataFusionError for malformed child counts and by handling empty batches without reading offset row zero
  • adds Scala regression tests for:
    • native lag(...) execution
    • Spark fallback for lag(...) IGNORE NULLS

The native implementation supports Spark semantics for:

  • lag(input)

    • default offset is 1
    • default value is null
  • lag(input, offset, default)

    • returns the value of input at the offset row before the current row in the same window partition
    • if the target row exists and input there is null, returns null
    • if the target row does not exist, returns default

Supported scope in this PR:

  • standard RESPECT NULLS behavior

Not supported natively in this PR:

  • IGNORE NULLS

Unsupported IGNORE NULLS queries continue to fall back to Spark to preserve correctness.

Are there any user-facing changes?

Yes. Queries using supported lag(...) semantics can now remain on Auron's native window execution path. Queries using unsupported lag(...) IGNORE NULLS behavior continue to fall back to Spark.

How was this patch tested?

CI.

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds native execution support for Spark’s lag(...) window function in Auron’s native window path, including planner/protobuf wiring and regression tests, while explicitly falling back for unsupported IGNORE NULLS semantics.

Changes:

  • Add Lag handling in Spark-side native window plan construction (with Spark fallback for IGNORE NULLS).
  • Extend native planner + protobuf to represent LAG, and implement a native Rust LagProcessor.
  • Add Rust + Scala regression tests, including cross-batch correctness and Spark fallback coverage.

Reviewed changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeWindowBase.scala Adds Spark-side detection/planning for LAG window expressions and blocks IGNORE NULLS.
spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronWindowSuite.scala Adds Scala tests for native lag and Spark fallback for IGNORE NULLS.
native-engine/datafusion-ext-plans/src/window_exec.rs Adds “full-partition” execution path (concat all batches) to support cross-batch lag evaluation.
native-engine/datafusion-ext-plans/src/window/window_context.rs Adds requires_full_partition() helper to drive execution strategy.
native-engine/datafusion-ext-plans/src/window/processors/mod.rs Exposes new lag_processor module.
native-engine/datafusion-ext-plans/src/window/processors/lag_processor.rs Implements LagProcessor computing lag with offset/default semantics.
native-engine/datafusion-ext-plans/src/window/mod.rs Adds WindowFunction::Lag wiring and a requires_full_partition() marker on expressions.
native-engine/auron-planner/src/planner.rs Decodes protobuf LAG into native WindowFunction::Lag.
native-engine/auron-planner/proto/auron.proto Extends protobuf enum with LAG.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +93 to +97
private def lagIgnoreNulls(expr: Lag): Boolean =
expr.getClass.getMethods
.find(method => method.getName == "ignoreNulls" && method.getParameterCount == 0)
.exists(method => method.invoke(expr).asInstanceOf[Boolean])

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in c705d0f: Lag now uses the shared ignoreNulls helper, which wraps the reflective call in Try and defaults to false if reflection is unavailable or fails.

Comment on lines +222 to +239
if window_ctx.requires_full_partition() {
let mut staging_batches = vec![];
while let Some(batch) = input.next().await.transpose()? {
staging_batches.push(batch);
}

let outputs: Vec<ArrayRef> = batch
.columns()
.iter()
.cloned()
.chain(if window_ctx.output_window_cols {
window_cols
} else {
vec![]
})
.zip(window_ctx.output_schema.fields())
.map(|(array, field)| {
if array.data_type() != field.data_type() {
return cast(&array, field.data_type());
}
Ok(array.clone())
})
.collect::<Result<_>>()?;
let output_batch = RecordBatch::try_new_with_options(
window_ctx.output_schema.clone(),
outputs,
&RecordBatchOptions::new().with_row_count(Some(batch.num_rows())),
)?;
if !staging_batches.is_empty() {
let _timer = elapsed_compute.timer();
let batch = concat_batches(&window_ctx.input_schema, &staging_batches)?;
let output_batch =
process_window_batch(batch, &window_ctx, processors.as_mut_slice())?;
exec_ctx
.baseline_metrics()
.record_output(output_batch.num_rows());
sender.send(output_batch).await;
}
return Ok(());
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to this, the full-partition path buffers all input batches, then concat_batches allocates another contiguous copy. Peak memory is ~3x partition size all live simultaneously

Comment on lines +45 to +65
let input_values = self.children[0]
.evaluate(batch)
.and_then(|v| v.into_array(batch.num_rows()))?;

let offset_values = self.children[1]
.evaluate(batch)
.and_then(|v| v.into_array(batch.num_rows()))?;
let offset_values = if offset_values.data_type() == &DataType::Int32 {
offset_values
} else {
cast(&offset_values, &DataType::Int32)?
};
let offset = match ScalarValue::try_from_array(&offset_values, 0)? {
ScalarValue::Int32(Some(offset)) => offset as i64,
other => {
return Err(DataFusionError::Execution(format!(
"lag offset must be a non-null foldable integer, got {other:?}",
)));
}
};

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in c705d0f: lag now reuses the existing LeadProcessor, and that shared processor returns an empty output array for empty batches before reading the offset value.

Comment on lines +39 to +43
assert_eq!(
self.children.len(),
3,
"lag expects input/offset/default children",
);

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in c705d0f: the shared LeadProcessor now returns a DataFusionError::Execution for an invalid child count instead of asserting.

@ShreyeshArangath ShreyeshArangath left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some high-level comments on the approach, I might be missing details here. If you can capture it in the PR description, it'll be useful for other reviewers as well

Comment on lines +222 to +239
if window_ctx.requires_full_partition() {
let mut staging_batches = vec![];
while let Some(batch) = input.next().await.transpose()? {
staging_batches.push(batch);
}

let outputs: Vec<ArrayRef> = batch
.columns()
.iter()
.cloned()
.chain(if window_ctx.output_window_cols {
window_cols
} else {
vec![]
})
.zip(window_ctx.output_schema.fields())
.map(|(array, field)| {
if array.data_type() != field.data_type() {
return cast(&array, field.data_type());
}
Ok(array.clone())
})
.collect::<Result<_>>()?;
let output_batch = RecordBatch::try_new_with_options(
window_ctx.output_schema.clone(),
outputs,
&RecordBatchOptions::new().with_row_count(Some(batch.num_rows())),
)?;
if !staging_batches.is_empty() {
let _timer = elapsed_compute.timer();
let batch = concat_batches(&window_ctx.input_schema, &staging_batches)?;
let output_batch =
process_window_batch(batch, &window_ctx, processors.as_mut_slice())?;
exec_ctx
.baseline_metrics()
.record_output(output_batch.num_rows());
sender.send(output_batch).await;
}
return Ok(());
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to this, the full-partition path buffers all input batches, then concat_batches allocates another contiguous copy. Peak memory is ~3x partition size all live simultaneously

let value = if target_idx >= partition_start && target_idx < partition_end {
ScalarValue::try_from_array(&input_values, target_idx as usize)?
} else {
ScalarValue::try_from_array(&default_values, row_idx)?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, the row-by-row ScalarValue::try_from_array + ScalarValue::iter_to_array pattern creates N heap-allocated scalar objects. For large partitions that full-partition buffering implies, this is will become an issue. Can we use arrow::compute::take with a pre-computed indices array to gather values in O(1) allocations?

)?;
if !staging_batches.is_empty() {
let _timer = elapsed_compute.timer();
let batch = concat_batches(&window_ctx.input_schema, &staging_batches)?;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The full-partition buffering approach works okay but introduces unbounded memory risk for skewed partitions. Since the input is guaranteed sorted by partition keys, lag can be implemented as a streaming processor (like Rank/Agg) with an O(offset) ring buffer. This would eliminate the need for requires_full_partition(), concat_batches, and the dual code paths in execute_window. Wanted to check with you and see if you had considered a streaming approach here?

@weiqingy weiqingy left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for taking this on — the Spark-semantics handling reads carefully (the offset sign, null-in-target vs default, and the IGNORE NULLS fallback are all faithful), and the fallback test that asserts no NativeWindowBase survives is a nice way to lock the boundary down. My main feedback is about the base this is built on rather than the logic itself — a few questions inline.

windowExprBuilder.setFuncType(pb.WindowFunctionType.Window)
windowExprBuilder.setWindowFunc(pb.WindowFunction.LAG)
windowExprBuilder.addChildren(NativeConverters.convertExpr(e.input))
windowExprBuilder.addChildren(NativeConverters.convertExpr(e.inputOffset))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this opened in April, native lead(...) landed on master — the mirror offset function — and with it lag may not need a parallel native path at all. lag_processor.rs here is nearly identical to the merged lead_processor.rs; the only real difference is target_idx = row_idx - offset instead of + offset. Spark models the two as the same thing: Lag and Lead both extend FrameLessOffsetWindowFunction, and Lag carries both an inputOffset() and a separate derived offset() that negates it. The merged Lead case feeds e.offset into WindowFunction.LEADLeadProcessor (row_idx + offset). So a Lag case passing e.offset (rather than e.inputOffset as here) and mapping to WindowFunction.LEAD would reuse LeadProcessor and produce correct lag — with no new proto value, planner arm, or processor. Did you consider folding lag into the Lead path? If you'd rather keep them separate (anticipated divergence, readability), that's fair — worth capturing the rationale, since otherwise it's two copies of the same logic to maintain. Same goes for lagIgnoreNulls just above, which duplicates the existing leadIgnoreNulls / invokeNoArg[Boolean] helper.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in c705d0f: Lag now maps to the existing WindowFunction.LEAD path using Spark's signed e.offset, so the separate LAG proto/planner/processor path is gone.

)?;
window_cols[0] = arrow::compute::filter(&window_cols[0], &limited)?;
batch = arrow::compute::filter_record_batch(&batch, &limited)?;
if window_ctx.requires_full_partition() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This full-partition staging path already exists on master — requires_full_partition(), process_window_batch(), and the concat_batches staging were added when Lead merged, so this part of the diff largely duplicates what's there and will conflict on rebase (the requires_full_partition() helpers on WindowExpr/WindowContext are already defined too). Could you rebase onto current master? The real delta should shrink a lot. It also means the memory/streaming questions on the existing threads can lean on the precedent the merged Lead path already set — full-partition buffering is now the established approach for offset functions, rather than something this PR has to settle.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in c705d0f: the PR is rebased onto current master; the duplicate full-partition execution changes are removed and lag uses the existing Lead offset path.

ROW_NUMBER = 0;
RANK = 1;
DENSE_RANK = 2;
LAG = 3;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LAG = 3 collides with LEAD = 3, which is already on master. Two values sharing the same ordinal in one enum won't compile without allow_alias, so on rebase this needs a fresh number (8 is the next free one). Flagging because a textual merge can hide it — the conflict is on the comment/whitespace, not the = 3, so it can resolve cleanly and still break the build. (If lag ends up reusing the LEAD path per the other comment, this enum entry goes away entirely.)

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in c705d0f: there is no separate LAG enum value now because lag reuses the existing LEAD path.

@SteNicholas

Copy link
Copy Markdown
Member

@officialasishkumar, thanks for contribution. Please resolve conficts and address review comments.

Spark `lag(...)` is not supported in Auron's native window execution path, causing queries using it to fall back to Spark instead of running natively.

This maps Spark `Lag` window expressions to the existing native `LEAD` window function by passing Spark's signed `offset`, so lag reuses the native LeadProcessor and offset-window execution path already present on master.

Changes included here:

- add `Lag` handling in `NativeWindowBase`

- keep `lag(... IGNORE NULLS)` on the Spark fallback path

- make reflective `ignoreNulls` detection deterministic by defaulting to false on reflection failures

- make `LeadProcessor` return execution errors for malformed children and handle empty batches without reading offset row zero

- add Scala regression tests for native `lag(...)` and fallback for `lag(... IGNORE NULLS)`

Signed-off-by: Asish Kumar <officialasishkumar@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement native support for lag window function

7 participants