feat(temporal): opt-in continue-as-new for long-lived agent workflows#447
feat(temporal): opt-in continue-as-new for long-lived agent workflows#447danielmillerp wants to merge 1 commit into
Conversation
5d63a08 to
4170651
Compare
|
Review the following changes in direct dependencies. Learn more about Socket for GitHub.
|
| role = "assistant" if content.author == "agent" else "user" | ||
| conversation.append({"role": role, "content": content.content}) |
There was a problem hiding this comment.
conversation_from_messages() restores every text task message as model input. The updated workflows also emit welcome or initialization TextContent from on_task_create, and on_task_create runs again after every continue-as-new. When a workflow recycles, those welcome messages are restored as prior assistant turns and another welcome is written to the ledger, so long-lived chats accumulate repeated initialization text in model context and user-visible history. Please filter the ledger to only real conversation turns, or skip re-emitting initialization messages on continued runs.
Artifacts
Repro: focused runtime harness for restored initialization turn
- Contains supporting evidence from the run (text/x-python; charset=utf-8).
Repro: script output showing welcome TextContent restored as assistant turn
- Keeps the command output available without making the summary code-heavy.
Ran code and verified through T-Rex
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/agentex/lib/core/temporal/workflows/workflow.py
Line: 187-188
Comment:
**Filter restored turns**
`conversation_from_messages()` restores every text task message as model input. The updated workflows also emit welcome or initialization `TextContent` from `on_task_create`, and `on_task_create` runs again after every continue-as-new. When a workflow recycles, those welcome messages are restored as prior assistant turns and another welcome is written to the ledger, so long-lived chats accumulate repeated initialization text in model context and user-visible history. Please filter the ledger to only real conversation turns, or skip re-emitting initialization messages on continued runs.
How can I resolve this? If you propose a fix, please make it concise.Long-lived chat/session agents run as a single Temporal workflow that stays open indefinitely, so their event history grows until it hits Temporal's ~50k-event / 50MB limit and the workflow stalls. This adds an opt-in continue-as-new path that recycles the history so a session can stay open forever, plus the discipline of keeping messages/state outside workflow state so they survive the recycle. SDK (BaseWorkflow): - should_continue_as_new(): recycle decision (Temporal's is_continue_as_new_ suggested() or a configurable WORKFLOW_MAX_HISTORY_LENGTH threshold). - drain_and_continue_as_new(): waits all_handlers_finished (so an in-flight turn is never lost/duplicated at the boundary) then continue_as_new. - run_until_complete(): drop-in replacement for the usual wait_condition(timeout=None) tail; gated once behind workflow.patched() so in-flight pre-patch workflows keep the old behaviour (no non-determinism on replay). Identical behaviour unless WORKFLOW_CONTINUE_AS_NEW_ENABLED is set. - conversation_from_messages(): rebuild the conversation from the adk.messages ledger after a recycle (messages live in adk.messages, not workflow state). Config (default off, so existing agents are unaffected): - WORKFLOW_CONTINUE_AS_NEW_ENABLED (bool) - WORKFLOW_MAX_HISTORY_LENGTH (int|None) Examples: all 13 long-lived Temporal tutorial agents adopt run_until_complete. Message-based chat agents rebuild conversation from adk.messages; harness agents with an opaque session handle (claude-code, codex, claude-sdk) or rich history (pydantic-ai via ModelMessagesTypeAdapter, langgraph) persist their non-message state to adk.state and re-hydrate on recycle. Every adk.state / adk.messages round-trip is guarded by the enabled flag, so the default path is byte-for-byte unchanged. Note: continue-as-new bounds history SIZE; it does NOT extend the chain-wide WORKFLOW_EXECUTION_TIMEOUT_SECONDS (raise that to keep workflows long-lived). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
4170651 to
891ef6d
Compare
| if not self._continue_as_new_enabled or not workflow.patched( | ||
| CONTINUE_AS_NEW_PATCH_ID | ||
| ): | ||
| await workflow.wait_condition(is_complete, timeout=None) |
There was a problem hiding this comment.
This patch marker runs only after each workflow's on_task_create has already executed its new rehydration and welcome-message work. When WORKFLOW_CONTINUE_AS_NEW_ENABLED=true, an execution that started before this change and then replays on the new code can schedule new commands before reaching this guard: message-ledger workflows call conversation_from_messages() before run_until_complete, opaque-state workflows call adk.state.get_by_task_and_agent, and several workflows create welcome or workspace activities first. Those commands are not in the old event history, so existing long-lived workflows can still hit Temporal nondeterminism even though this branch intends to preserve the old behavior. Please move the old-run patch decision ahead of any new activity-emitting prologue work, or expose a helper that callers check before both rehydration and the recycle wait.
Artifacts
Repro: focused workflow harness tracing prologue command ordering
- Contains supporting evidence from the run (text/x-python; charset=utf-8).
Repro: harness output showing adk.messages.list before workflow.patched guard
- Keeps the command output available without making the summary code-heavy.
Ran code and verified through T-Rex
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/agentex/lib/core/temporal/workflows/workflow.py
Line: 143-146
Comment:
**Gate the prologue**
This patch marker runs only after each workflow's `on_task_create` has already executed its new rehydration and welcome-message work. When `WORKFLOW_CONTINUE_AS_NEW_ENABLED=true`, an execution that started before this change and then replays on the new code can schedule new commands before reaching this guard: message-ledger workflows call `conversation_from_messages()` before `run_until_complete`, opaque-state workflows call `adk.state.get_by_task_and_agent`, and several workflows create welcome or workspace activities first. Those commands are not in the old event history, so existing long-lived workflows can still hit Temporal nondeterminism even though this branch intends to preserve the old behavior. Please move the old-run patch decision ahead of any new activity-emitting prologue work, or expose a helper that callers check before both rehydration and the recycle wait.
How can I resolve this? If you propose a fix, please make it concise.
Why
Long-lived chat/session agents (e.g. the Emu/FDD researcher) run as a single Temporal workflow that stays open indefinitely. Their event history grows until it hits Temporal's ~50k-event / 50MB limit and the workflow stalls — this is the root cause behind "chats die / state outgrows the 2MB payload" (P0 for EY).
This PR adds an opt-in continue-as-new path so a session can stay open forever by recycling its history, plus the discipline of keeping messages/state outside workflow state so they survive the recycle.
SDK —
BaseWorkflowhelpers (opt-in)should_continue_as_new()— recycle decision: Temporal'sis_continue_as_new_suggested()or a configurableWORKFLOW_MAX_HISTORY_LENGTHthreshold.drain_and_continue_as_new()— waitsall_handlers_finished(so an in-flight turn isn't lost/duplicated at the boundary), thencontinue_as_new.run_until_complete()— drop-in replacement for the usualwait_condition(timeout=None)tail. Gated once behindworkflow.patched()so in-flight pre-patch workflows keep the old behaviour and don't hit a non-determinism error on replay.conversation_from_messages()— rebuild the conversation from theadk.messagesledger after a recycle (messages live inadk.messages, not workflow state).Config (default OFF — existing agents unaffected)
WORKFLOW_CONTINUE_AS_NEW_ENABLED(bool)WORKFLOW_MAX_HISTORY_LENGTH(int | None)Examples
All 13 long-lived Temporal tutorial agents adopt
run_until_complete:adk.messages.adk.stateand re-hydrate on recycle: opaque session handles for claude-sdk (090), claude-code (140), codex (150); richModelMessagehistory for pydantic-ai (110, viaModelMessagesTypeAdapter); langgraph (130) rebuilds from the ledger.Every
adk.state/adk.messagesround-trip is guarded by the enabled flag, so the default path is byte-for-byte unchanged.Verification
tests/lib/core/temporal/test_base_workflow_continue_as_new.py(5 passing).tests/lib/core/temporalsuite: 8 passed, no regressions.py_compile+ruffclean across all 16 changed files.Follow-ups (not in this PR)
drain_and_continue_as_newagainst a Temporal test server.🤖 Generated with Claude Code
Greptile Summary
This PR adds opt-in continue-as-new support for long-lived Temporal agent workflows. The main changes are:
BaseWorkflowhelpers for recycle decisions, draining handlers, and rebuilding conversations.run_until_complete.adk.staterehydration for workflows that need state across recycles.Confidence Score: 3/5
The feature is opt-in and well scoped, but the replay compatibility guard needs to move ahead of any activity-emitting prologue work before this can be safely enabled for existing long-lived workflows.
The implementation has targeted tests for the recycle decision logic and keeps the default path disabled, but replay safety for upgraded executions is a key requirement for Temporal workflow changes and remains unresolved.
src/agentex/lib/core/temporal/workflows/workflow.py and the Temporal tutorial workflow prologues that perform rehydration, state loading, welcome-message, or workspace setup before entering the guarded wait helper.
What T-Rex did
Prompt To Fix All With AI
Reviews (2): Last reviewed commit: "feat(temporal): opt-in continue-as-new f..." | Re-trigger Greptile