[AURON #2362] Support native bit_and / bit_or / bit_xor aggregate#2363
[AURON #2362] Support native bit_and / bit_or / bit_xor aggregate#2363zhuxiangyi wants to merge 1 commit into
Conversation
weiqingy
left a comment
There was a problem hiding this comment.
Thanks for taking this on — a couple of questions inline.
| // group key "k" and a nullable integer column "v". bit_* skip nulls and | ||
| // are order-independent (associative + commutative). | ||
| // k=1: v = [3, 5, 1] -> bit_and=1, bit_or=7, bit_xor=7 | ||
| // k=2: v = [12, null, 10] -> bit_and=8, bit_or=14, bit_xor=6 |
There was a problem hiding this comment.
The native test exercises k=1 and k=2, both of which have at least one non-null value, so the all-null group → null path isn't covered here — it's only locked down by the Scala end-to-end suite (the k=3 row). That path is also the most fragile one: a regression where someone pre-initializes the slot in update_value would still pass these two groups but silently turn null into 0 for bit_and. Since the Scala aggregate suites are comparatively expensive to gate on, would it be worth adding a third group here whose values are all None and asserting the output row is null for all three aggregates? That pins the invariant cheaply and deterministically right where the seeding logic lives.
There was a problem hiding this comment.
Good call — done. I added a third group k=3 whose values are all None and assert the output row is null for all three aggregates, directly in the native test (test_agg_bitwise). That pins the skip-nulls / never-seed invariant cheaply, right where the seeding logic lives, without relying on the Scala suite.
There was a problem hiding this comment.
Thanks — the k=3 all-None group does exactly that: it pins the skip-nulls/never-seed invariant right where the seeding lives, and asserting null for all three aggregates means a future update_value that pre-seeds the slot would now fail here instead of slipping through to the Scala suite. Resolved.
| FIRST = 7; | ||
| FIRST_IGNORES_NULL = 8; | ||
| BLOOM_FILTER = 9; | ||
| // 10 / 11 are reserved for LAST / LAST_IGNORES_NULL (parallel PR) |
There was a problem hiding this comment.
The // 10 / 11 are reserved for LAST / LAST_IGNORES_NULL (parallel PR) comment holds those numbers by convention only — nothing stops a later edit from reusing them, and the prose goes stale once LAST lands or that PR changes shape. protobuf's reserved 10, 11; does the same documentation but has the compiler enforce the hold. Would you consider swapping the comment for reserved 10, 11;?
There was a problem hiding this comment.
Good point on the fragile comment. Instead of reserved 10, 11;, I dropped the gap entirely and assigned the next sequential numbers — BIT_AND = 10, BIT_OR = 11, BIT_XOR = 12 — so there's no hole left to document or reserve.
There was a problem hiding this comment.
Dropping the gap does kill the rot risk, agreed. One thing it surfaces though: the gap was holding 10/11 for the LAST work, and #2359 (still open, also branched off master) currently assigns LAST = 10 / LAST_IGNORES_NULL = 11 — so both PRs now claim 10 and 11. Whichever lands second hits a rebase conflict on this enum, and if that gets resolved by leaving both at 10/11 the wire numbers alias. Since both are yours, how would you like to deconflict — e.g. keep LAST = 10/11 as originally planned and bump bitwise to 12/13/14, or renumber #2359? Either's fine; just flagging so it's a deliberate pick rather than a merge-time surprise.
There was a problem hiding this comment.
Implement native bit_and / bit_or / bit_xor aggregates: - native: add a generic AggBitwise<P> (agg/bitwise.rs) with AggBitAnd / AggBitOr / AggBitXor aliases. The accumulator is a single column of the input type; the first non-null value initializes the slot and each subsequent value is folded in with the bitwise operator. The operators are associative and commutative, so the result is order-independent, and null inputs are skipped (an all-null group yields null). Integral inputs only (Int8/Int16/Int32/Int64). Wire through the AggFunction enum, create_agg, the protobuf contract (BIT_AND / BIT_OR / BIT_XOR), the protobuf->AggFunction conversion, and the window-agg mapping. - spark-extension: add the BitAndAgg / BitOrAgg / BitXorAgg expression conversions in NativeConverters; declare the buffer schema in NativeAggBase.computeNativeAggBufferDataTypes (Seq(dataType)) so the partial -> shuffle -> final buffer schema matches the native side. Tests: - Rust unit test agg_exec::test::test_agg_bitwise (partial -> final), including an all-null group asserting null for all three aggregates. - Scala e2e AuronDataFrameAggregateSuite "native bit_and / bit_or / bit_xor aggregate" (spark34 + spark35), covering the partial -> shuffle -> final native path (incl. all-null group) and asserting NativeAggBase offload.
93db901 to
a73b664
Compare
|
Both failures are unrelated to this change:
All code-relevant checks are green (Rust, Style, all Build Auron JAR, other TPC-DS). Could a committer re-run the failed jobs? |
Which issue does this PR close?
Closes #2362
Rationale for this change
Auron does not implement
bit_and/bit_or/bit_xornatively, so they fall back to the generic UDAF path (a JNI call back into the JVM), losing vectorized acceleration. This PR adds native support.What changes are included in this PR?
datafusion-ext-plans): add a genericAggBitwise<P>(agg/bitwise.rs) withAggBitAnd/AggBitOr/AggBitXoraliases. The accumulator is a single column of the input type; the first non-null value initializes the slot and each subsequent value is folded in with the operator. The operators are associative and commutative, so the result is order-independent, and null inputs are skipped (an all-null group yields null). Integral inputs only (Int8/Int16/Int32/Int64). Wired through theAggFunctionenum,create_agg, the protobuf contract (BIT_AND/BIT_OR/BIT_XOR), theprotobuf::AggFunction -> AggFunctionconversion, and the window-aggregate mapping.BitAndAgg/BitOrAgg/BitXorAggexpression conversions inNativeConverters.convertAggregateExpr; declare the buffer schema inNativeAggBase.computeNativeAggBufferDataTypes(Seq(dataType)) so the partial -> shuffle -> final buffer schema matches the native side.Are there any user-facing changes?
Yes.
bit_and/bit_or/bit_xorare now executed natively (vectorized); previously they fell back to the UDAF path.How was this patch tested?
agg_exec::test::test_agg_bitwise: partial -> final two-phase aggregation over a nullable integer column, verifying bit_and / bit_or / bit_xor including null skipping.AuronDataFrameAggregateSuite("native bit_and / bit_or / bit_xor aggregate", spark34 + spark35): a grouped aggregate exercising the full partial -> shuffle -> final native path (including an all-null group), asserting correct values and that the plan offloads toNativeAggBase.