feat(amber): add loop-bookkeeping columns to materialized State (dormant)#5900
feat(amber): add loop-bookkeeping columns to materialized State (dormant)#5900aglinxinyuan wants to merge 4 commits into
Conversation
…ant) Extend the cross-region State materialization format from 1 column (content) to 4 columns: content + loop_counter + loop_start_id + loop_start_state_uri. The loop bookkeeping is promoted to first-class columns (never inside the content JSON), and the transport carries them: OutputManager state writer + emit, the Python network sender/receiver, the materialization reader, and the Scala state.toTuple call sites. Dormant on main: to_tuple()/toTuple() and OutputManager.save_state_to_storage_if_needed / emit_state default the loop columns to 0/"", so every existing non-loop caller is unchanged, and fromTuple/from_tuple read only the content column. The columns activate only once the loop operators set them (follow-up PR). State materialization is intra-execution (execution-scoped iceberg URI, recreated fresh each run), so no backward-compatible read of old 1-column data is needed. Extracted from apache#5700 (loop operators); part of apache#4442.
Automated Reviewer SuggestionsBased on the
|
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #5900 +/- ##
============================================
+ Coverage 54.11% 55.02% +0.91%
- Complexity 2819 2988 +169
============================================
Files 1103 1111 +8
Lines 42650 42925 +275
Branches 4588 4623 +35
============================================
+ Hits 23079 23621 +542
+ Misses 18226 17916 -310
- Partials 1345 1388 +43
*This pull request uses carry forward flags. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
|
| config | throughput | MB/s | latency | max Δ latest / 7d | |
|---|---|---|---|---|---|
| 🔴 | bs=10 sw=10 sl=64 | 391 | 0.239 | 25,144/35,671/35,671 us | 🔴 +14.1% / 🔴 +132.6% |
| 🔴 | bs=100 sw=10 sl=64 | 809 | 0.494 | 122,134/140,551/140,551 us | 🔴 +6.7% / 🔴 +27.1% |
| ⚪ | bs=1000 sw=10 sl=64 | 941 | 0.575 | 1,062,243/1,107,215/1,107,215 us | ⚪ within ±5% / ⚪ within ±5% |
Baseline details
Latest main 8803d08 from same runner
| config | metric | PR | latest main | 7d avg | Δ latest | Δ 7d |
|---|---|---|---|---|---|---|
| bs=10 sw=10 sl=64 | throughput | 391 tuples/sec | 443 tuples/sec | 757.16 tuples/sec | -11.7% | -48.4% |
| bs=10 sw=10 sl=64 | MB/s | 0.239 MB/s | 0.271 MB/s | 0.462 MB/s | -11.8% | -48.3% |
| bs=10 sw=10 sl=64 | p50 | 25,144 us | 22,045 us | 12,971 us | +14.1% | +93.8% |
| bs=10 sw=10 sl=64 | p95 | 35,671 us | 32,753 us | 15,333 us | +8.9% | +132.6% |
| bs=10 sw=10 sl=64 | p99 | 35,671 us | 32,753 us | 18,732 us | +8.9% | +90.4% |
| bs=100 sw=10 sl=64 | throughput | 809 tuples/sec | 860 tuples/sec | 957.66 tuples/sec | -5.9% | -15.5% |
| bs=100 sw=10 sl=64 | MB/s | 0.494 MB/s | 0.525 MB/s | 0.585 MB/s | -5.9% | -15.5% |
| bs=100 sw=10 sl=64 | p50 | 122,134 us | 114,435 us | 103,982 us | +6.7% | +17.5% |
| bs=100 sw=10 sl=64 | p95 | 140,551 us | 132,053 us | 110,583 us | +6.4% | +27.1% |
| bs=100 sw=10 sl=64 | p99 | 140,551 us | 132,053 us | 118,369 us | +6.4% | +18.7% |
| bs=1000 sw=10 sl=64 | throughput | 941 tuples/sec | 936 tuples/sec | 979.6 tuples/sec | +0.5% | -3.9% |
| bs=1000 sw=10 sl=64 | MB/s | 0.575 MB/s | 0.571 MB/s | 0.598 MB/s | +0.7% | -3.8% |
| bs=1000 sw=10 sl=64 | p50 | 1,062,243 us | 1,070,452 us | 1,024,553 us | -0.8% | +3.7% |
| bs=1000 sw=10 sl=64 | p95 | 1,107,215 us | 1,115,452 us | 1,063,789 us | -0.7% | +4.1% |
| bs=1000 sw=10 sl=64 | p99 | 1,107,215 us | 1,115,452 us | 1,096,239 us | -0.7% | +1.0% |
Raw CSV
config_idx,batch_size,schema_width,string_len,num_batches,total_ms,total_tuples,total_bytes,tuples_per_sec,mb_per_sec,lat_p50_us,lat_p95_us,lat_p99_us
0,10,10,64,20,511.44,200,128000,391,0.239,25144.48,35671.08,35671.08
1,100,10,64,20,2472.42,2000,1280000,809,0.494,122133.94,140550.88,140550.88
2,1000,10,64,20,21246.30,20000,12800000,941,0.575,1062242.89,1107214.64,1107214.64There was a problem hiding this comment.
Pull request overview
This PR updates Amber’s cross-region State materialization wire format from a single content column to a 4-column schema (content, loop_counter, loop_start_id, loop_start_state_uri) so loop bookkeeping is carried as first-class columns rather than embedded in the user state JSON. It touches both Scala and Python runtimes plus tests to keep the change dormant for non-loop workflows via default values.
Changes:
- Extend Scala/Python
Stateschemas andtoTuple/to_tuplewriters to emit the 4-column state tuple with defaults for non-loop callers. - Update Python materialization reader and Python network sender/receiver to read/write the new loop bookkeeping columns on
StateFrame. - Add/adjust Scala + Python tests to pin Arrow vector round-trips and state materialization behavior.
Reviewed changes
Copilot reviewed 17 out of 17 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala | Expands State.schema to 4 columns and updates toTuple to write loop bookkeeping columns with defaults. |
| common/workflow-core/src/test/scala/org/apache/texera/amber/core/state/StateSpec.scala | Updates state tuple tests to align with the new toTuple() signature and schema. |
| common/workflow-core/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala | Adds a data-level Arrow encode/decode round-trip test to ensure multi-column State tuples survive Arrow vector conversion. |
| common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala | Updates call sites to toTuple() in state-document tests (but some test semantics still use reserved loop keys in JSON). |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala | Updates Scala state persistence path to call state.toTuple(). |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyClient.scala | Updates Scala→Python Arrow send path to call state.toTuple(). |
| amber/src/main/python/core/models/state.py | Expands Python State schema and adds loop-bookkeeping parameters to to_tuple(...). |
| amber/src/main/python/core/models/payload.py | Extends Python StateFrame envelope with loop bookkeeping fields (defaulting to no-loop values). |
| amber/src/main/python/core/architecture/packaging/output_manager.py | Threads loop bookkeeping through save_state_to_storage_if_needed(...) and emit_state(...) and refactors writer setup/close logic. |
| amber/src/main/python/core/runnables/network_sender.py | Serializes StateFrame as a 4-column Arrow table (content + loop bookkeeping columns). |
| amber/src/main/python/core/runnables/network_receiver.py | Deserializes StateFrame from a 4-column Arrow table and populates loop bookkeeping fields. |
| amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py | Reads loop bookkeeping columns from the state table rows and carries them on emitted StateFrames. |
| amber/src/test/python/core/models/test_state.py | Updates tests to assert the expanded schema and that loop bookkeeping does not leak into the content JSON. |
| amber/src/test/python/core/runnables/test_network_receiver.py | Updates unit test to verify non-zero loop_counter survives Python sender→receiver serialization. |
| amber/src/test/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py | Updates reader test to assert loop bookkeeping values are carried on emitted StateFrames. |
| amber/src/test/python/core/architecture/packaging/test_output_manager.py | Updates tests for new signature and adds a dormancy test ensuring omitted loop args default to 0/empty strings. |
| amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py | Updates e2e state materialization test to use sqlite-backed catalog and assert loop_counter column round-trip. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Address review (Copilot): the materialization round-trip tests used
"loop_counter" as a user-state key (landing in the content JSON), which is
misleading now that loop bookkeeping is a dedicated column. Rename the user key
to "i" and write/assert the loop columns via toTuple(loopCounter = ...) +
row.getField("loop_counter"/"loop_start_id"/"loop_start_state_uri"), matching
how StateSpec/test_state were updated.
Xiao-zhen-Liu
left a comment
There was a problem hiding this comment.
I reviewed the part of the loop-split change that handles the state format. The main logic looks correct:
- State only lives within a single execution, so no pre-existing single-column state can reach the new four-column reader. I checked this across the restart, fault-recovery, and result-reuse paths.
- The non-loop defaults keep existing behavior unchanged.
- The Python and Scala four-column schemas agree on names, order, and types.
The comments below are about test coverage and two design points. I'm not recording an approve / request-changes decision.
One accuracy note on the description: it lists only to_tuple/toTuple and two OutputManager methods, but the change also adds the three fields to the Python StateFrame and widens the wire format to four columns. Behavior is unchanged; just worth stating accurately.
| # that need these values read the corresponding columns off the tuple. | ||
| LOOP_COUNTER = "loop_counter" | ||
| LOOP_START_ID = "loop_start_id" | ||
| LOOP_START_STATE_URI = "loop_start_state_uri" |
There was a problem hiding this comment.
Design question, not a request: does the saved state need to carry the storage URI, or is loop_start_id enough for the controller to resolve where Loop End writes the next iteration? Raising it partly because it looks different from what we discussed offline.
There was a problem hiding this comment.
Good question — and you're right it differs from the offline sketch. As currently wired in the loop branch, the URI isn't derivable from loop_start_id alone:
loop_start_idis just LoopStart's logical op id — it's whatJumpToOperatorRegionRequest(OperatorIdentity(loop_start_id))jumps to.loop_start_state_uriisVFSURIFactory.state_uri(readers[0].uri)— the physical iceberg URI of LoopStart's input port, which also encodes the execution id, the physical layer name, and which input port/reader. LoopEnd writes the next iteration's state straight into that URI, so it needs the full path, not just the logical id.
So the real choice is: (a) carry the resolved URI on the state — current approach, LoopEnd writes back with no controller round-trip; or (b) carry only loop_start_id and have the controller resolve LoopStart's input-port materialization URI during the jump and hand it back. (b) is the leaner payload but couples LoopEnd to a controller resolution step.
Happy to go with (b) if that's what we landed on offline. This PR is only the dormant 4-column format — the consumer logic (and this decision) actually lands in the later loop PR, so flagging it here so we can settle it before then.
🤖 Addressed by Claude Code
…ip tests Address review feedback on apache#5900: - Add `State.to_columns`, the single column-name -> value mapping for the State wire/storage format, and route both `to_tuple` (iceberg) and the network sender's StateFrame branch through it, so adding a column is a one-line change rather than an edit in every serializer. - e2e materialization test and StateSpec now round-trip all three loop columns (loop_counter, loop_start_id, loop_start_state_uri) with non-default values, not just loop_counter, so a regression in any single column's plumbing is caught. - Document why the e2e deliberately uses a hermetic sqlite catalog while the other iceberg tests use postgres/REST.
What changes were proposed in this PR?
Extends the cross-region State materialization format from a single
contentcolumn to 4 columns —content,loop_counter,loop_start_id,loop_start_state_uri— promoting loop bookkeeping to first-class columns (never inside the content JSON). The transport carries them end to end: theOutputManagerstate writer +emit_state, the Python network sender/receiver, the materialization reader, and the Scalastate.toTuplecall sites. In memory the three loop fields ride on theStateFrameenvelope; they are materialized/serialized as their own columns (parallel tocontent), andfrom_tuple/fromTupleread onlycontentback into theState.On the Python side the column-name → value mapping is defined once in
State.to_columnsand reused by bothto_tuple(iceberg) and the network sender'sStateFramebranch, so adding a column later is a single-line change in one place rather than an edit in every serializer.Dormant on
main— nothing observable changes without the loop operators:to_tuple()/toTuple()andOutputManager.save_state_to_storage_if_needed/emit_statedefault the three loop columns to0/"", so every existing non-loop caller is unchanged.from_tuple/fromTupleread only thecontentcolumn, so round-trip identity is preserved and the extra columns are inert.No backward-compatible read of old 1-column State is needed: State materialization is intra-execution only — the iceberg state-document URI is execution-scoped (
…/eid/{executionId}/) and recreated fresh each run, and State tuples are never replayed across executions or engine versions, so a 1-column tuple can never reach the 4-column reader.This is the state-format prerequisite the loop operators build on; the columns carry non-default values only once Loop Start/End set them (follow-up PR).
Any related issues, documentation, discussions?
Extracted from #5700 (loop operators) per @Xiao-zhen-Liu's split request; part of #4442 ("Introduce for loop").
How was this PR tested?
test_state.py(loop columns are their own columns, never in content JSON, default to0/""), ScalaStateSpec(all three loop columns round-trip through a tuple with non-default values, not justcontent),ArrowUtilsSpec(4-column Arrow vector round-trip),IcebergDocumentSpec(iceberg state-doc round-trip).test_network_receiver.py,test_input_port_materialization_reader_runnable.py, andtest_state_materialization_e2e.py— the e2e (hermetic sqlite catalog) writes non-default values for all three loop columns end-to-end and asserts they replay both on theStateFrameand on the raw iceberg row, exercising the real Tuple/Schema/iceberg path.test_output_manager.py::test_defaults_loop_columns_when_omittedpins that a no-loop caller (noloop_counter) still produces a valid 4-column tuple with the loop columns at0/"".workflow-core+ambercompile;StateSpec+ArrowUtilsSpecpass; Python state + transport + e2e tests pass; scalafmt + scalafix + black clean. (IcebergDocumentSpecneeds the iceberg catalog backend, so it runs in CI.)Was this PR authored or co-authored using generative AI tooling?
Co-authored with Claude Opus 4.8 in compliance with ASF.