Skip to content

feat: partial project fallback (JVM expression detour)#4827

Open
schenksj wants to merge 3 commits into
apache:mainfrom
schenksj:feat/partial-project-fallback
Open

feat: partial project fallback (JVM expression detour)#4827
schenksj wants to merge 3 commits into
apache:mainfrom
schenksj:feat/partial-project-fallback

Conversation

@schenksj

@schenksj schenksj commented Jul 5, 2026

Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Closes #4825.

Rationale for this change

When one expression in a ProjectExec / FilterExec has no native translation, Comet abandons the whole operator and with it the native island below (a ColumnarToRow transition materializes the scan output; supported sibling expressions run row-wise in Spark). This PR evaluates only the unsupported subexpression in the JVM and keeps the operator — and the pipeline — native, by routing that subtree through the codegen dispatcher / JvmScalarUdfExpr callback that already ships on main. No proto or native changes.

This is experimental and default-off (spark.comet.exec.jvmDetour.enabled = false).

What changes are included in this PR?

  • CometConf: new spark.comet.exec.jvmDetour.enabled (COMET_EXEC_JVM_DETOUR_ENABLED), default false.
  • QueryPlanSerde: a withJvmDetour thread-local gate (a plain non-inheritable ThreadLocal, not DynamicVariable, to avoid leaking the flag to child planning threads) plus a last-resort .orElse hook in exprToProtoInternal that retries any unsupported node via emitJvmCodegenDispatch. On a successful detour it tags the node withInfo and clears its speculative FALLBACK_REASONS so a detoured node isn't also labeled a fallback.
  • CometScalaUDF: emitJvmCodegenDispatch now refuses a tree containing a subquery (PlanExpression). A subquery inside the detoured tree is closure-serialized at plan time, before waitForSubqueries populates its result, so evaluating it in the kernel would fail; refusing makes it a clean fallback. (This also hardens the pre-existing ScalaUDF / regex / HOF / date-time dispatch paths, which shared the same latent hazard.)
  • operators.scala: wrap CometProjectExec / CometFilterExec convert bodies in withJvmDetour, scoping the detour to those two contexts in v1.

How it works

The hook fires at the outermost unsupported node (serde recursion is handler-driven). Supported ancestors stay native and reference the detour's output; the detoured subtree's argument columns cross to the JVM over Arrow FFI and are evaluated by Spark's own doGenCode / eval inside a Janino kernel, so results match Spark by construction. Plan-time eligibility uses the dispatcher's existing canHandle (rejects aggregates, generators, Unevaluable, unsupported types, oversized trees). If the detour can't fire, None propagates and the operator falls back exactly as before.

How are these changes tested?

  • CometPartialProjectFallbackSuite (16 tests): flag-on stays native / flag-off falls back; outermost-node granularity; filter-predicate detour; canHandle-refused clean fallback; serde-level gate scoping; parity through the hook for NULL-heavy input, empty batches, decimals, timestamps, nested array/map, and a nondeterministic tree (per-partition seeding); subquery-inside-tree falls back while a sibling subquery stays native; a mixed projection that falls back whole when one sibling can't detour; island preservation (no columnar→row transition below the native projection).
  • CometCodegenFuzzSuite: two added tests fuzz a disabled abs serde through the hook over every numeric column of a random schema and over the decimal precision/scale/null-density sweep.
  • CometPartialProjectFallbackBenchmark: Spark vs. detour-off vs. detour-on across three shapes.

CometCodegenSuite (73), CometCodegenFuzzSuite (30), and the new suite (16) all pass locally under the spark-4.1 profile.

Benchmark

16M-row projections/filter over a native Parquet scan, Abs's serde disabled to stand in for an unsupported expression (only the detour flag differs between the two Comet cases). Local Apple M1:

Shape Spark (best) Comet, detour off (best) Comet, detour on (best) detour-on vs Spark
lone unsupported expr, wide passthrough 414 ms 419 ms 310 ms 1.3×
mixed projection (7 native + 1 unsupported) 434 ms 429 ms 370 ms 1.2×
filter predicate with unsupported subexpr 350 ms 364 ms 278 ms 1.3×

Detour-on preserves the native island (verified by a plan check in the benchmark) and beats both Spark and the whole-operator fallback on all three shapes.

Notes / follow-ups

  • Filters: a detoured sub-predicate is evaluated for all rows, the same as any native Comet predicate; under ANSI this can differ from Spark's per-row AND/OR short-circuit. Documented in the config doc; default-off.
  • v2 follow-up: rewrite supported children (including subqueries) of the detoured node to native BoundReference args so only their values cross the FFI — this is the path to real subquery support and finer granularity.

🤖 Generated with Claude Code

schenksj and others added 3 commits July 4, 2026 11:09
When a projection or filter contains an expression with no native
translation, evaluate just that subtree in the JVM via Comet's
Arrow-direct codegen dispatcher instead of falling the whole operator
back to Spark. The operator, and the native island around it, stays
native. Gated behind spark.comet.exec.jvmDetour.enabled (default off,
experimental).

- CometConf: new COMET_EXEC_JVM_DETOUR_ENABLED config.
- QueryPlanSerde: a withJvmDetour thread-local gate plus a last-resort
  .orElse hook in exprToProtoInternal that retries any unsupported node
  via emitJvmCodegenDispatch and clears FALLBACK_REASONS on success.
- CometScalaUDF: refuse subquery-bearing trees in emitJvmCodegenDispatch
  (they are closure-serialized at plan time, before waitForSubqueries)
  so they fall back cleanly instead of failing in the kernel.
- operators: wrap CometProjectExec/CometFilterExec convert bodies in
  withJvmDetour so the detour is scoped to those two contexts.
- tests: CometPartialProjectFallbackSuite (serde, parity, island
  preservation, gate scoping), CometCodegenFuzzSuite hook fuzz, and
  CometPartialProjectFallbackBenchmark.

No proto or native changes: reuses the existing JvmScalarUdfExpr JVM
callback machinery.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01JnvYuZKVMTeY2rewk94ZFe
dev/ci/check-suites.py requires every test suite to be enumerated in the
PR build workflows. Add the new suite to pr_build_linux.yml and
pr_build_macos.yml next to the other codegen suites.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01JnvYuZKVMTeY2rewk94ZFe
`withDetour[T](enabled)(f: => T): T = withSQLConf(...)(f)` failed to compile on Spark
3.4/3.5: those versions type `withSQLConf(...)(f)` as `Unit`, which cannot unify with the
declared return `T` (Spark 4.x infers it, so only the 3.x jobs were red). Capture the
block's result in a local instead of returning `withSQLConf`'s value, which compiles on
all supported Spark versions (verified against spark-3.5 and spark-4.1).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Partial project fallback: keep a projection/filter native by evaluating an unsupported subexpression in the JVM

1 participant