Skip to content

feat(amber): add loop-bookkeeping columns to materialized State (dormant)#5900

Open
aglinxinyuan wants to merge 4 commits into
apache:mainfrom
aglinxinyuan:state-format-loop-columns
Open

feat(amber): add loop-bookkeeping columns to materialized State (dormant)#5900
aglinxinyuan wants to merge 4 commits into
apache:mainfrom
aglinxinyuan:state-format-loop-columns

Conversation

@aglinxinyuan

@aglinxinyuan aglinxinyuan commented Jun 23, 2026

Copy link
Copy Markdown
Contributor

What changes were proposed in this PR?

Extends the cross-region State materialization format from a single content column to 4 columnscontent, loop_counter, loop_start_id, loop_start_state_uri — promoting loop bookkeeping to first-class columns (never inside the content JSON). The transport carries them end to end: the OutputManager state writer + emit_state, the Python network sender/receiver, the materialization reader, and the Scala state.toTuple call sites. In memory the three loop fields ride on the StateFrame envelope; they are materialized/serialized as their own columns (parallel to content), and from_tuple / fromTuple read only content back into the State.

On the Python side the column-name → value mapping is defined once in State.to_columns and reused by both to_tuple (iceberg) and the network sender's StateFrame branch, so adding a column later is a single-line change in one place rather than an edit in every serializer.

Dormant on main — nothing observable changes without the loop operators:

  • to_tuple() / toTuple() and OutputManager.save_state_to_storage_if_needed / emit_state default the three loop columns to 0 / "", so every existing non-loop caller is unchanged.
  • from_tuple / fromTuple read only the content column, so round-trip identity is preserved and the extra columns are inert.

No backward-compatible read of old 1-column State is needed: State materialization is intra-execution only — the iceberg state-document URI is execution-scoped (…/eid/{executionId}/) and recreated fresh each run, and State tuples are never replayed across executions or engine versions, so a 1-column tuple can never reach the 4-column reader.

This is the state-format prerequisite the loop operators build on; the columns carry non-default values only once Loop Start/End set them (follow-up PR).

Any related issues, documentation, discussions?

Extracted from #5700 (loop operators) per @Xiao-zhen-Liu's split request; part of #4442 ("Introduce for loop").

How was this PR tested?

  • Format / round-trip: test_state.py (loop columns are their own columns, never in content JSON, default to 0 / ""), Scala StateSpec (all three loop columns round-trip through a tuple with non-default values, not just content), ArrowUtilsSpec (4-column Arrow vector round-trip), IcebergDocumentSpec (iceberg state-doc round-trip).
  • Transport: test_network_receiver.py, test_input_port_materialization_reader_runnable.py, and test_state_materialization_e2e.py — the e2e (hermetic sqlite catalog) writes non-default values for all three loop columns end-to-end and asserts they replay both on the StateFrame and on the raw iceberg row, exercising the real Tuple/Schema/iceberg path.
  • Dormancy: new test_output_manager.py::test_defaults_loop_columns_when_omitted pins that a no-loop caller (no loop_counter) still produces a valid 4-column tuple with the loop columns at 0 / "".
  • Local: workflow-core + amber compile; StateSpec + ArrowUtilsSpec pass; Python state + transport + e2e tests pass; scalafmt + scalafix + black clean. (IcebergDocumentSpec needs the iceberg catalog backend, so it runs in CI.)

Was this PR authored or co-authored using generative AI tooling?

Co-authored with Claude Opus 4.8 in compliance with ASF.

…ant)

Extend the cross-region State materialization format from 1 column (content)
to 4 columns: content + loop_counter + loop_start_id + loop_start_state_uri.
The loop bookkeeping is promoted to first-class columns (never inside the
content JSON), and the transport carries them: OutputManager state writer +
emit, the Python network sender/receiver, the materialization reader, and the
Scala state.toTuple call sites.

Dormant on main: to_tuple()/toTuple() and OutputManager.save_state_to_storage_if_needed
/ emit_state default the loop columns to 0/"", so every existing non-loop caller
is unchanged, and fromTuple/from_tuple read only the content column. The columns
activate only once the loop operators set them (follow-up PR). State
materialization is intra-execution (execution-scoped iceberg URI, recreated
fresh each run), so no backward-compatible read of old 1-column data is needed.

Extracted from apache#5700 (loop operators); part of apache#4442.
Copilot AI review requested due to automatic review settings June 23, 2026 01:35
@github-actions

Copy link
Copy Markdown
Contributor

Automated Reviewer Suggestions

Based on the git blame history of the changed files, we recommend the following reviewers:

  • Contributors with relevant context: @Yicong-Huang, @Ma77Ball
    You can notify them by mentioning @Yicong-Huang, @Ma77Ball in a comment.

@codecov-commenter

codecov-commenter commented Jun 23, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 95.83333% with 2 lines in your changes missing coverage. Please review.
✅ Project coverage is 55.02%. Comparing base (8803d08) to head (74eef1d).
⚠️ Report is 41 commits behind head on main.

Files with missing lines Patch % Lines
...ne/architecture/messaginglayer/OutputManager.scala 0.00% 1 Missing ⚠️
.../architecture/pythonworker/PythonProxyClient.scala 0.00% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #5900      +/-   ##
============================================
+ Coverage     54.11%   55.02%   +0.91%     
- Complexity     2819     2988     +169     
============================================
  Files          1103     1111       +8     
  Lines         42650    42925     +275     
  Branches       4588     4623      +35     
============================================
+ Hits          23079    23621     +542     
+ Misses        18226    17916     -310     
- Partials       1345     1388      +43     
Flag Coverage Δ *Carryforward flag
access-control-service 70.00% <ø> (-0.45%) ⬇️
agent-service 34.36% <ø> (ø) Carriedforward from 7d5ab80
amber 57.84% <86.66%> (+2.20%) ⬆️
computing-unit-managing-service 0.00% <ø> (-1.66%) ⬇️
config-service 51.56% <ø> (-5.80%) ⬇️
file-service 59.02% <ø> (+0.42%) ⬆️
frontend 48.09% <ø> (-0.03%) ⬇️ Carriedforward from 7d5ab80
notebook-migration-service 78.57% <ø> (?)
pyamber 91.15% <100.00%> (+0.95%) ⬆️
python 90.69% <ø> (-0.08%) ⬇️ Carriedforward from 7d5ab80
workflow-compiling-service 55.14% <ø> (-3.55%) ⬇️

*This pull request uses carry forward flags. Click here to find out more.

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@github-actions

github-actions Bot commented Jun 23, 2026

Copy link
Copy Markdown
Contributor

⚠️ Benchmark changes need a look

🟢 0 better · 🔴 10 worse · ⚪ 5 noise (<±5%) · 0 without baseline

Compared against main 8803d08 benchmarked on this same runner, so the delta is largely free of cross-runner hardware noise. The "7d avg" column still reflects the gh-pages dashboard. Treat <±5% as noise unless repeated.

Dashboard · Run

config throughput MB/s latency max Δ latest / 7d
🔴 bs=10 sw=10 sl=64 391 0.239 25,144/35,671/35,671 us 🔴 +14.1% / 🔴 +132.6%
🔴 bs=100 sw=10 sl=64 809 0.494 122,134/140,551/140,551 us 🔴 +6.7% / 🔴 +27.1%
bs=1000 sw=10 sl=64 941 0.575 1,062,243/1,107,215/1,107,215 us ⚪ within ±5% / ⚪ within ±5%
Baseline details

Latest main 8803d08 from same runner

config metric PR latest main 7d avg Δ latest Δ 7d
bs=10 sw=10 sl=64 throughput 391 tuples/sec 443 tuples/sec 757.16 tuples/sec -11.7% -48.4%
bs=10 sw=10 sl=64 MB/s 0.239 MB/s 0.271 MB/s 0.462 MB/s -11.8% -48.3%
bs=10 sw=10 sl=64 p50 25,144 us 22,045 us 12,971 us +14.1% +93.8%
bs=10 sw=10 sl=64 p95 35,671 us 32,753 us 15,333 us +8.9% +132.6%
bs=10 sw=10 sl=64 p99 35,671 us 32,753 us 18,732 us +8.9% +90.4%
bs=100 sw=10 sl=64 throughput 809 tuples/sec 860 tuples/sec 957.66 tuples/sec -5.9% -15.5%
bs=100 sw=10 sl=64 MB/s 0.494 MB/s 0.525 MB/s 0.585 MB/s -5.9% -15.5%
bs=100 sw=10 sl=64 p50 122,134 us 114,435 us 103,982 us +6.7% +17.5%
bs=100 sw=10 sl=64 p95 140,551 us 132,053 us 110,583 us +6.4% +27.1%
bs=100 sw=10 sl=64 p99 140,551 us 132,053 us 118,369 us +6.4% +18.7%
bs=1000 sw=10 sl=64 throughput 941 tuples/sec 936 tuples/sec 979.6 tuples/sec +0.5% -3.9%
bs=1000 sw=10 sl=64 MB/s 0.575 MB/s 0.571 MB/s 0.598 MB/s +0.7% -3.8%
bs=1000 sw=10 sl=64 p50 1,062,243 us 1,070,452 us 1,024,553 us -0.8% +3.7%
bs=1000 sw=10 sl=64 p95 1,107,215 us 1,115,452 us 1,063,789 us -0.7% +4.1%
bs=1000 sw=10 sl=64 p99 1,107,215 us 1,115,452 us 1,096,239 us -0.7% +1.0%
Raw CSV
config_idx,batch_size,schema_width,string_len,num_batches,total_ms,total_tuples,total_bytes,tuples_per_sec,mb_per_sec,lat_p50_us,lat_p95_us,lat_p99_us
0,10,10,64,20,511.44,200,128000,391,0.239,25144.48,35671.08,35671.08
1,100,10,64,20,2472.42,2000,1280000,809,0.494,122133.94,140550.88,140550.88
2,1000,10,64,20,21246.30,20000,12800000,941,0.575,1062242.89,1107214.64,1107214.64

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR updates Amber’s cross-region State materialization wire format from a single content column to a 4-column schema (content, loop_counter, loop_start_id, loop_start_state_uri) so loop bookkeeping is carried as first-class columns rather than embedded in the user state JSON. It touches both Scala and Python runtimes plus tests to keep the change dormant for non-loop workflows via default values.

Changes:

  • Extend Scala/Python State schemas and toTuple/to_tuple writers to emit the 4-column state tuple with defaults for non-loop callers.
  • Update Python materialization reader and Python network sender/receiver to read/write the new loop bookkeeping columns on StateFrame.
  • Add/adjust Scala + Python tests to pin Arrow vector round-trips and state materialization behavior.

Reviewed changes

Copilot reviewed 17 out of 17 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala Expands State.schema to 4 columns and updates toTuple to write loop bookkeeping columns with defaults.
common/workflow-core/src/test/scala/org/apache/texera/amber/core/state/StateSpec.scala Updates state tuple tests to align with the new toTuple() signature and schema.
common/workflow-core/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala Adds a data-level Arrow encode/decode round-trip test to ensure multi-column State tuples survive Arrow vector conversion.
common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala Updates call sites to toTuple() in state-document tests (but some test semantics still use reserved loop keys in JSON).
amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala Updates Scala state persistence path to call state.toTuple().
amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyClient.scala Updates Scala→Python Arrow send path to call state.toTuple().
amber/src/main/python/core/models/state.py Expands Python State schema and adds loop-bookkeeping parameters to to_tuple(...).
amber/src/main/python/core/models/payload.py Extends Python StateFrame envelope with loop bookkeeping fields (defaulting to no-loop values).
amber/src/main/python/core/architecture/packaging/output_manager.py Threads loop bookkeeping through save_state_to_storage_if_needed(...) and emit_state(...) and refactors writer setup/close logic.
amber/src/main/python/core/runnables/network_sender.py Serializes StateFrame as a 4-column Arrow table (content + loop bookkeeping columns).
amber/src/main/python/core/runnables/network_receiver.py Deserializes StateFrame from a 4-column Arrow table and populates loop bookkeeping fields.
amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py Reads loop bookkeeping columns from the state table rows and carries them on emitted StateFrames.
amber/src/test/python/core/models/test_state.py Updates tests to assert the expanded schema and that loop bookkeeping does not leak into the content JSON.
amber/src/test/python/core/runnables/test_network_receiver.py Updates unit test to verify non-zero loop_counter survives Python sender→receiver serialization.
amber/src/test/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py Updates reader test to assert loop bookkeeping values are carried on emitted StateFrames.
amber/src/test/python/core/architecture/packaging/test_output_manager.py Updates tests for new signature and adds a dormancy test ensuring omitted loop args default to 0/empty strings.
amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py Updates e2e state materialization test to use sqlite-backed catalog and assert loop_counter column round-trip.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread amber/src/main/python/core/runnables/network_sender.py Outdated
Comment thread amber/src/main/python/core/runnables/network_receiver.py
Address review (Copilot): the materialization round-trip tests used
"loop_counter" as a user-state key (landing in the content JSON), which is
misleading now that loop bookkeeping is a dedicated column. Rename the user key
to "i" and write/assert the loop columns via toTuple(loopCounter = ...) +
row.getField("loop_counter"/"loop_start_id"/"loop_start_state_uri"), matching
how StateSpec/test_state were updated.
aglinxinyuan added a commit to aglinxinyuan/texera that referenced this pull request Jun 23, 2026

@Xiao-zhen-Liu Xiao-zhen-Liu left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reviewed the part of the loop-split change that handles the state format. The main logic looks correct:

  • State only lives within a single execution, so no pre-existing single-column state can reach the new four-column reader. I checked this across the restart, fault-recovery, and result-reuse paths.
  • The non-loop defaults keep existing behavior unchanged.
  • The Python and Scala four-column schemas agree on names, order, and types.

The comments below are about test coverage and two design points. I'm not recording an approve / request-changes decision.

One accuracy note on the description: it lists only to_tuple/toTuple and two OutputManager methods, but the change also adds the three fields to the Python StateFrame and widens the wire format to four columns. Behavior is unchanged; just worth stating accurately.

Comment thread amber/src/main/python/core/models/state.py Outdated
# that need these values read the corresponding columns off the tuple.
LOOP_COUNTER = "loop_counter"
LOOP_START_ID = "loop_start_id"
LOOP_START_STATE_URI = "loop_start_state_uri"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Design question, not a request: does the saved state need to carry the storage URI, or is loop_start_id enough for the controller to resolve where Loop End writes the next iteration? Raising it partly because it looks different from what we discussed offline.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question — and you're right it differs from the offline sketch. As currently wired in the loop branch, the URI isn't derivable from loop_start_id alone:

  • loop_start_id is just LoopStart's logical op id — it's what JumpToOperatorRegionRequest(OperatorIdentity(loop_start_id)) jumps to.
  • loop_start_state_uri is VFSURIFactory.state_uri(readers[0].uri) — the physical iceberg URI of LoopStart's input port, which also encodes the execution id, the physical layer name, and which input port/reader. LoopEnd writes the next iteration's state straight into that URI, so it needs the full path, not just the logical id.

So the real choice is: (a) carry the resolved URI on the state — current approach, LoopEnd writes back with no controller round-trip; or (b) carry only loop_start_id and have the controller resolve LoopStart's input-port materialization URI during the jump and hand it back. (b) is the leaner payload but couples LoopEnd to a controller resolution step.

Happy to go with (b) if that's what we landed on offline. This PR is only the dormant 4-column format — the consumer logic (and this decision) actually lands in the later loop PR, so flagging it here so we can settle it before then.

🤖 Addressed by Claude Code

…ip tests

Address review feedback on apache#5900:

- Add `State.to_columns`, the single column-name -> value mapping for the
  State wire/storage format, and route both `to_tuple` (iceberg) and the
  network sender's StateFrame branch through it, so adding a column is a
  one-line change rather than an edit in every serializer.
- e2e materialization test and StateSpec now round-trip all three loop
  columns (loop_counter, loop_start_id, loop_start_state_uri) with
  non-default values, not just loop_counter, so a regression in any single
  column's plumbing is caught.
- Document why the e2e deliberately uses a hermetic sqlite catalog while
  the other iceberg tests use postgres/REST.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants