Skip to content

[AURON #2362] Support native bit_and / bit_or / bit_xor aggregate#2363

Open
zhuxiangyi wants to merge 1 commit into
apache:masterfrom
zhuxiangyi:support-native-bitwise-aggregate
Open

[AURON #2362] Support native bit_and / bit_or / bit_xor aggregate#2363
zhuxiangyi wants to merge 1 commit into
apache:masterfrom
zhuxiangyi:support-native-bitwise-aggregate

Conversation

@zhuxiangyi

Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Closes #2362

Rationale for this change

Auron does not implement bit_and / bit_or / bit_xor natively, so they fall back to the generic UDAF path (a JNI call back into the JVM), losing vectorized acceleration. This PR adds native support.

What changes are included in this PR?

  • native (datafusion-ext-plans): add a generic AggBitwise<P> (agg/bitwise.rs) with AggBitAnd / AggBitOr / AggBitXor aliases. The accumulator is a single column of the input type; the first non-null value initializes the slot and each subsequent value is folded in with the operator. The operators are associative and commutative, so the result is order-independent, and null inputs are skipped (an all-null group yields null). Integral inputs only (Int8/Int16/Int32/Int64). Wired through the AggFunction enum, create_agg, the protobuf contract (BIT_AND / BIT_OR / BIT_XOR), the protobuf::AggFunction -> AggFunction conversion, and the window-aggregate mapping.
  • spark-extension: add the BitAndAgg / BitOrAgg / BitXorAgg expression conversions in NativeConverters.convertAggregateExpr; declare the buffer schema in NativeAggBase.computeNativeAggBufferDataTypes (Seq(dataType)) so the partial -> shuffle -> final buffer schema matches the native side.

Note: proto numbers 12/13/14 are used here; 10/11 are left for the parallel LAST / LAST_IGNORES_NULL PR (#2359) so the two can merge in any order.

Are there any user-facing changes?

Yes. bit_and / bit_or / bit_xor are now executed natively (vectorized); previously they fell back to the UDAF path.

How was this patch tested?

  • Rust unit test agg_exec::test::test_agg_bitwise: partial -> final two-phase aggregation over a nullable integer column, verifying bit_and / bit_or / bit_xor including null skipping.
  • Scala end-to-end test in AuronDataFrameAggregateSuite ("native bit_and / bit_or / bit_xor aggregate", spark34 + spark35): a grouped aggregate exercising the full partial -> shuffle -> final native path (including an all-null group), asserting correct values and that the plan offloads to NativeAggBase.

@weiqingy weiqingy left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for taking this on — a couple of questions inline.

// group key "k" and a nullable integer column "v". bit_* skip nulls and
// are order-independent (associative + commutative).
// k=1: v = [3, 5, 1] -> bit_and=1, bit_or=7, bit_xor=7
// k=2: v = [12, null, 10] -> bit_and=8, bit_or=14, bit_xor=6

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The native test exercises k=1 and k=2, both of which have at least one non-null value, so the all-null group → null path isn't covered here — it's only locked down by the Scala end-to-end suite (the k=3 row). That path is also the most fragile one: a regression where someone pre-initializes the slot in update_value would still pass these two groups but silently turn null into 0 for bit_and. Since the Scala aggregate suites are comparatively expensive to gate on, would it be worth adding a third group here whose values are all None and asserting the output row is null for all three aggregates? That pins the invariant cheaply and deterministically right where the seeding logic lives.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call — done. I added a third group k=3 whose values are all None and assert the output row is null for all three aggregates, directly in the native test (test_agg_bitwise). That pins the skip-nulls / never-seed invariant cheaply, right where the seeding logic lives, without relying on the Scala suite.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks — the k=3 all-None group does exactly that: it pins the skip-nulls/never-seed invariant right where the seeding lives, and asserting null for all three aggregates means a future update_value that pre-seeds the slot would now fail here instead of slipping through to the Scala suite. Resolved.

FIRST = 7;
FIRST_IGNORES_NULL = 8;
BLOOM_FILTER = 9;
// 10 / 11 are reserved for LAST / LAST_IGNORES_NULL (parallel PR)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The // 10 / 11 are reserved for LAST / LAST_IGNORES_NULL (parallel PR) comment holds those numbers by convention only — nothing stops a later edit from reusing them, and the prose goes stale once LAST lands or that PR changes shape. protobuf's reserved 10, 11; does the same documentation but has the compiler enforce the hold. Would you consider swapping the comment for reserved 10, 11;?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point on the fragile comment. Instead of reserved 10, 11;, I dropped the gap entirely and assigned the next sequential numbers — BIT_AND = 10, BIT_OR = 11, BIT_XOR = 12 — so there's no hole left to document or reserve.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dropping the gap does kill the rot risk, agreed. One thing it surfaces though: the gap was holding 10/11 for the LAST work, and #2359 (still open, also branched off master) currently assigns LAST = 10 / LAST_IGNORES_NULL = 11 — so both PRs now claim 10 and 11. Whichever lands second hits a rebase conflict on this enum, and if that gets resolved by leaving both at 10/11 the wire numbers alias. Since both are yours, how would you like to deconflict — e.g. keep LAST = 10/11 as originally planned and bump bitwise to 12/13/14, or renumber #2359? Either's fine; just flagging so it's a deliberate pick rather than a merge-time surprise.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — thanks for flagging it explicitly. Since both are mine, I'll keep this PR sequential at 10/11/12 and renumber #2359 to LAST = 13 / LAST_IGNORES_NULL = 14, so neither PR leaves a gap and the wire numbers don't alias regardless of merge order. Updating #2359 now.

Implement native bit_and / bit_or / bit_xor aggregates:

- native: add a generic AggBitwise<P> (agg/bitwise.rs) with AggBitAnd /
  AggBitOr / AggBitXor aliases. The accumulator is a single column of the
  input type; the first non-null value initializes the slot and each
  subsequent value is folded in with the bitwise operator. The operators
  are associative and commutative, so the result is order-independent, and
  null inputs are skipped (an all-null group yields null). Integral inputs
  only (Int8/Int16/Int32/Int64). Wire through the AggFunction enum,
  create_agg, the protobuf contract (BIT_AND / BIT_OR / BIT_XOR), the
  protobuf->AggFunction conversion, and the window-agg mapping.
- spark-extension: add the BitAndAgg / BitOrAgg / BitXorAgg expression
  conversions in NativeConverters; declare the buffer schema in
  NativeAggBase.computeNativeAggBufferDataTypes (Seq(dataType)) so the
  partial -> shuffle -> final buffer schema matches the native side.

Tests:
- Rust unit test agg_exec::test::test_agg_bitwise (partial -> final),
  including an all-null group asserting null for all three aggregates.
- Scala e2e AuronDataFrameAggregateSuite "native bit_and / bit_or / bit_xor
  aggregate" (spark34 + spark35), covering the partial -> shuffle -> final
  native path (incl. all-null group) and asserting NativeAggBase offload.
@zhuxiangyi zhuxiangyi force-pushed the support-native-bitwise-aggregate branch from 93db901 to a73b664 Compare June 30, 2026 12:49
@zhuxiangyi

Copy link
Copy Markdown
Contributor Author

Both failures are unrelated to this change:

  • spark-3.3 q30–q39 failed at the Download Auron JAR step (CI artifact download); the test itself was skipped.
  • Uniffle q1–q9 failed only on q6, which doesn't use bit_and/bit_or/bit_xor and passes on spark-3.0/3.1 with regular shuffle — a flake on the Uniffle/RSS path. This PR only adds new AggFunction cases and doesn't touch existing aggregate conversion.

All code-relevant checks are green (Rust, Style, all Build Auron JAR, other TPC-DS). Could a committer re-run the failed jobs?

@zhuxiangyi zhuxiangyi requested a review from weiqingy July 1, 2026 16:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support native bit_and / bit_or / bit_xor aggregate

2 participants