fix(streaming): StreamTaskMessageFull closes the coalescing buffer#426
Open
eberki-scale wants to merge 2 commits into
Open
fix(streaming): StreamTaskMessageFull closes the coalescing buffer#426eberki-scale wants to merge 2 commits into
eberki-scale wants to merge 2 commits into
Conversation
1d86e8a to
b30a90b
Compare
A StreamTaskMessageFull ends the stream and marks the context done, but it did not close the coalescing buffer. __aexit__'s close() then short-circuits on _is_closed and never stops the buffer's ticker, leaving an orphaned background task. Buffered deltas could also publish after the terminal Full, which a consumer treating Full as final reads as a stale duplicate tail. Drain and stop the buffer before publishing the Full (deltas -> Full ordering), and reap the buffer in close() before the _is_closed short-circuit so it can't be orphaned on any path. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
d15800e to
a75aa68
Compare
…ng delta add() checked _closed before taking the lock, so a delta racing a Full-driven buffer close() could pass the check, then append after the buffer was drained and its ticker shut down — stranding the delta, never published. Re-check _closed under the lock before appending. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
A
StreamTaskMessageFullends the stream and marks theStreamingTaskMessageContextdone, but it did not close the coalescing buffer. When the context later exits,__aexit__ → close()hits itsif self._is_closed: returnguard and never stops the buffer's background ticker — leaving an orphaned task per such stream. Separately, any deltas still buffered when theFullarrives could be published after the terminalFull, which a consumer treatingFullas the final message reads as a stale duplicate tail.Fix
Two small changes to
StreamingTaskMessageContextinagentex/lib/core/services/adk/streaming.py:stream_update— when aStreamTaskMessageFullarrives, drain and close the buffer before publishing theFull, so leftover deltas land in order (deltas → Full) and the ticker is stopped.close()— reap the buffer before the_is_closedshort-circuit, so a context already marked done (by aFullon another path) can never leave the ticker orphaned.No change to
CoalescingBufferitself.Tests
New
TestFullMessageClosesBuffer:test_full_message_stops_ticker— after aFull, the buffer is reaped and its ticker task isdone()(not orphaned).test_full_is_terminal_publish_no_trailing_deltas— buffered deltas publish before theFull; theFullis the terminal publish.Full streaming suite: 33 passed (
tests/lib/core/services/adk/test_streaming.py).Scope
Deliberately narrow — just the
StreamTaskMessageFullorphan + ordering fix, split out from the broader buffer work in #418 for easier review.Greptile Summary
StreamingTaskMessageContextso terminalStreamTaskMessageFullmessages drain and close the coalescing buffer before publishing the full message.close()reaps the coalescing buffer even when the context was already marked closed.Confidence Score: 5/5
The streaming lifecycle changes are narrow, directly covered by targeted tests, and do not introduce broader API or persistence risk.
The changes address buffer draining, ticker cleanup, and terminal publish ordering in the affected context, with focused tests exercising the intended edge cases.
What T-Rex did
Reviews (3): Last reviewed commit: "fix(streaming): re-check _closed under l..." | Re-trigger Greptile