Skip to content

fix(streaming): StreamTaskMessageFull closes the coalescing buffer#426

Open
eberki-scale wants to merge 2 commits into
nextfrom
endre/full-closes-buffer
Open

fix(streaming): StreamTaskMessageFull closes the coalescing buffer#426
eberki-scale wants to merge 2 commits into
nextfrom
endre/full-closes-buffer

Conversation

@eberki-scale

@eberki-scale eberki-scale commented Jun 23, 2026

Copy link
Copy Markdown
Contributor

Summary

A StreamTaskMessageFull ends the stream and marks the StreamingTaskMessageContext done, but it did not close the coalescing buffer. When the context later exits, __aexit__ → close() hits its if self._is_closed: return guard and never stops the buffer's background ticker — leaving an orphaned task per such stream. Separately, any deltas still buffered when the Full arrives could be published after the terminal Full, which a consumer treating Full as the final message reads as a stale duplicate tail.

Fix

Two small changes to StreamingTaskMessageContext in agentex/lib/core/services/adk/streaming.py:

  1. stream_update — when a StreamTaskMessageFull arrives, drain and close the buffer before publishing the Full, so leftover deltas land in order (deltas → Full) and the ticker is stopped.
  2. close() — reap the buffer before the _is_closed short-circuit, so a context already marked done (by a Full on another path) can never leave the ticker orphaned.

No change to CoalescingBuffer itself.

Tests

New TestFullMessageClosesBuffer:

  • test_full_message_stops_ticker — after a Full, the buffer is reaped and its ticker task is done() (not orphaned).
  • test_full_is_terminal_publish_no_trailing_deltas — buffered deltas publish before the Full; the Full is the terminal publish.

Full streaming suite: 33 passed (tests/lib/core/services/adk/test_streaming.py).

Scope

Deliberately narrow — just the StreamTaskMessageFull orphan + ordering fix, split out from the broader buffer work in #418 for easier review.

Greptile Summary

  • Updates StreamingTaskMessageContext so terminal StreamTaskMessageFull messages drain and close the coalescing buffer before publishing the full message.
  • Ensures close() reaps the coalescing buffer even when the context was already marked closed.
  • Adds streaming tests covering ticker cleanup, terminal publish ordering, and add/close race handling.

Confidence Score: 5/5

The streaming lifecycle changes are narrow, directly covered by targeted tests, and do not introduce broader API or persistence risk.

The changes address buffer draining, ticker cleanup, and terminal publish ordering in the affected context, with focused tests exercising the intended edge cases.

T-Rex T-Rex Logs

What T-Rex did

  • Before the artifact for proof 0, the run shows base published start, delta:first-, full:FINAL, followed by a stale trailing delta:buffered-tail.
  • After the artifact for proof 0, head published start, delta:first-, delta:buffered-tail, then full:FINAL; buffer_after_full_is_none was true, ticker_done_after_full was true, and no coalescing-buffer tasks remained alive after close.
  • Before the artifact for proof 1, trex-artifacts/coalescing-race-guard-01-before.log shows the commit ending with buf_len=1, buf_contents=['racing'], flushed=0, RESULT=STRANDED at the race observation point.
  • After the artifact for proof 1, trex-artifacts/coalescing-race-guard-02-after.log shows the commit ending with buf_len=0, buf_contents=[], flushed=0, RESULT=DROPPED at the same race observation point.

View all artifacts

T-Rex Ran code and verified through T-Rex

Reviews (3): Last reviewed commit: "fix(streaming): re-check _closed under l..." | Re-trigger Greptile

@stainless-app stainless-app Bot force-pushed the next branch 2 times, most recently from 1d86e8a to b30a90b Compare June 23, 2026 22:17
A StreamTaskMessageFull ends the stream and marks the context done, but it did
not close the coalescing buffer. __aexit__'s close() then short-circuits on
_is_closed and never stops the buffer's ticker, leaving an orphaned background
task. Buffered deltas could also publish after the terminal Full, which a
consumer treating Full as final reads as a stale duplicate tail.

Drain and stop the buffer before publishing the Full (deltas -> Full ordering),
and reap the buffer in close() before the _is_closed short-circuit so it can't
be orphaned on any path.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@eberki-scale eberki-scale force-pushed the endre/full-closes-buffer branch from d15800e to a75aa68 Compare June 25, 2026 14:30
Comment thread src/agentex/lib/core/services/adk/streaming.py
…ng delta

add() checked _closed before taking the lock, so a delta racing a Full-driven
buffer close() could pass the check, then append after the buffer was drained
and its ticker shut down — stranding the delta, never published. Re-check
_closed under the lock before appending.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant