feat: partial project fallback (JVM expression detour)#4827
Open
schenksj wants to merge 3 commits into
Open
Conversation
When a projection or filter contains an expression with no native translation, evaluate just that subtree in the JVM via Comet's Arrow-direct codegen dispatcher instead of falling the whole operator back to Spark. The operator, and the native island around it, stays native. Gated behind spark.comet.exec.jvmDetour.enabled (default off, experimental). - CometConf: new COMET_EXEC_JVM_DETOUR_ENABLED config. - QueryPlanSerde: a withJvmDetour thread-local gate plus a last-resort .orElse hook in exprToProtoInternal that retries any unsupported node via emitJvmCodegenDispatch and clears FALLBACK_REASONS on success. - CometScalaUDF: refuse subquery-bearing trees in emitJvmCodegenDispatch (they are closure-serialized at plan time, before waitForSubqueries) so they fall back cleanly instead of failing in the kernel. - operators: wrap CometProjectExec/CometFilterExec convert bodies in withJvmDetour so the detour is scoped to those two contexts. - tests: CometPartialProjectFallbackSuite (serde, parity, island preservation, gate scoping), CometCodegenFuzzSuite hook fuzz, and CometPartialProjectFallbackBenchmark. No proto or native changes: reuses the existing JvmScalarUdfExpr JVM callback machinery. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01JnvYuZKVMTeY2rewk94ZFe
dev/ci/check-suites.py requires every test suite to be enumerated in the PR build workflows. Add the new suite to pr_build_linux.yml and pr_build_macos.yml next to the other codegen suites. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01JnvYuZKVMTeY2rewk94ZFe
`withDetour[T](enabled)(f: => T): T = withSQLConf(...)(f)` failed to compile on Spark 3.4/3.5: those versions type `withSQLConf(...)(f)` as `Unit`, which cannot unify with the declared return `T` (Spark 4.x infers it, so only the 3.x jobs were red). Capture the block's result in a local instead of returning `withSQLConf`'s value, which compiles on all supported Spark versions (verified against spark-3.5 and spark-4.1). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Closes #4825.
Rationale for this change
When one expression in a
ProjectExec/FilterExechas no native translation, Comet abandons the whole operator and with it the native island below (aColumnarToRowtransition materializes the scan output; supported sibling expressions run row-wise in Spark). This PR evaluates only the unsupported subexpression in the JVM and keeps the operator — and the pipeline — native, by routing that subtree through the codegen dispatcher /JvmScalarUdfExprcallback that already ships onmain. No proto or native changes.This is experimental and default-off (
spark.comet.exec.jvmDetour.enabled = false).What changes are included in this PR?
CometConf: newspark.comet.exec.jvmDetour.enabled(COMET_EXEC_JVM_DETOUR_ENABLED), defaultfalse.QueryPlanSerde: awithJvmDetourthread-local gate (a plain non-inheritableThreadLocal, notDynamicVariable, to avoid leaking the flag to child planning threads) plus a last-resort.orElsehook inexprToProtoInternalthat retries any unsupported node viaemitJvmCodegenDispatch. On a successful detour it tags the nodewithInfoand clears its speculativeFALLBACK_REASONSso a detoured node isn't also labeled a fallback.CometScalaUDF:emitJvmCodegenDispatchnow refuses a tree containing a subquery (PlanExpression). A subquery inside the detoured tree is closure-serialized at plan time, beforewaitForSubqueriespopulates its result, so evaluating it in the kernel would fail; refusing makes it a clean fallback. (This also hardens the pre-existing ScalaUDF / regex / HOF / date-time dispatch paths, which shared the same latent hazard.)operators.scala: wrapCometProjectExec/CometFilterExecconvert bodies inwithJvmDetour, scoping the detour to those two contexts in v1.How it works
The hook fires at the outermost unsupported node (serde recursion is handler-driven). Supported ancestors stay native and reference the detour's output; the detoured subtree's argument columns cross to the JVM over Arrow FFI and are evaluated by Spark's own
doGenCode/evalinside a Janino kernel, so results match Spark by construction. Plan-time eligibility uses the dispatcher's existingcanHandle(rejects aggregates, generators,Unevaluable, unsupported types, oversized trees). If the detour can't fire,Nonepropagates and the operator falls back exactly as before.How are these changes tested?
CometPartialProjectFallbackSuite(16 tests): flag-on stays native / flag-off falls back; outermost-node granularity; filter-predicate detour;canHandle-refused clean fallback; serde-level gate scoping; parity through the hook for NULL-heavy input, empty batches, decimals, timestamps, nested array/map, and a nondeterministic tree (per-partition seeding); subquery-inside-tree falls back while a sibling subquery stays native; a mixed projection that falls back whole when one sibling can't detour; island preservation (no columnar→row transition below the native projection).CometCodegenFuzzSuite: two added tests fuzz a disabledabsserde through the hook over every numeric column of a random schema and over the decimal precision/scale/null-density sweep.CometPartialProjectFallbackBenchmark: Spark vs. detour-off vs. detour-on across three shapes.CometCodegenSuite(73),CometCodegenFuzzSuite(30), and the new suite (16) all pass locally under thespark-4.1profile.Benchmark
16M-row projections/filter over a native Parquet scan,
Abs's serde disabled to stand in for an unsupported expression (only the detour flag differs between the two Comet cases). Local Apple M1:Detour-on preserves the native island (verified by a plan check in the benchmark) and beats both Spark and the whole-operator fallback on all three shapes.
Notes / follow-ups
BoundReferenceargs so only their values cross the FFI — this is the path to real subquery support and finer granularity.🤖 Generated with Claude Code