diff --git a/adk/docs/harness.md b/adk/docs/harness.md index d81835a03..62094d469 100644 --- a/adk/docs/harness.md +++ b/adk/docs/harness.md @@ -198,3 +198,9 @@ result = await emitter.auto_send_turn(turn, created_at=workflow.now()) # result.final_text — last text segment # result.usage — TurnUsage (tokens, cost, ...) ``` + +--- + +## Migration + +- [Migrating to `agentex-client` 0.16.0 / `agentex-sdk` 0.15.0](./migration-0.16.0.md) — removed LangGraph/Pydantic-AI tracing handlers (tracing is now derived from the canonical stream), private `_modules` path moves, the OpenAI harness facade relocation, and the new `run_turn` Temporal entry point. diff --git a/adk/docs/migration-0.16.0.md b/adk/docs/migration-0.16.0.md new file mode 100644 index 000000000..b76da55ba --- /dev/null +++ b/adk/docs/migration-0.16.0.md @@ -0,0 +1,272 @@ +# Migration Guide — `agentex-client` 0.16.0 / `agentex-sdk` 0.15.0 + +This release consolidates the LangGraph, Pydantic-AI, and OpenAI Agents harnesses +onto the **unified harness surface** (`UnifiedEmitter` + `SpanDeriver`), introduces +`run_turn` as the single Temporal entry point for OpenAI Agents, renders +hosted/server-side tool calls in the Temporal streaming model, and ships new CLI +init templates. + +Most consumers only need to act on **section 1** (removed tracing handlers). +Sections 2–3 only matter if you import private modules. Section 4 lists the new, +opt-in capabilities. Section 5 documents the defect fixes shipped on top of the +release. + +--- + +## 1. Tracing handlers removed (LangGraph + Pydantic-AI) — **action required** + +The bespoke tracing callback handlers are **gone** from the public +`agentex.lib.adk` surface: + +| Removed | | +|---|---| +| `agentex.lib.adk.create_langgraph_tracing_handler` | + class `AgentexLangGraphTracingHandler` | +| `agentex.lib.adk.create_pydantic_ai_tracing_handler` | + class `AgentexPydanticAITracingHandler` | + +Span tracing is now **derived automatically** from the canonical +`StreamTaskMessage*` stream by `UnifiedEmitter`. You no longer construct or pass a +callback handler — you wrap the run in the harness `*Turn` and drive delivery +through the emitter, and spans fall out of the stream. + +### LangGraph + +**Before** + +```python +from agentex.lib import adk + +handler = adk.create_langgraph_tracing_handler( + trace_id=trace_id, + parent_span_id=parent_span_id, +) +result = await graph.ainvoke(state, config={"callbacks": [handler]}) +``` + +**After** + +```python +from agentex.lib.adk import stream_langgraph_events # facade name unchanged + +# Streaming delivery + tracing are handled for you; no callbacks wiring. +async for event in stream_langgraph_events(graph, state, ...): + ... +``` + +or, when you own the emitter directly: + +```python +from agentex.lib.adk import LangGraphTurn +from agentex.lib.core.harness import UnifiedEmitter + +emitter = UnifiedEmitter(...) +await emitter.auto_send_turn(LangGraphTurn(...)) # or: emitter.yield_turn(...) +``` + +### Pydantic-AI + +**Before** + +```python +handler = adk.create_pydantic_ai_tracing_handler(trace_id=..., parent_span_id=...) +``` + +**After** + +```python +from agentex.lib.adk import PydanticAITurn, stream_pydantic_ai_events +from agentex.lib.core.harness import UnifiedEmitter + +# Wrap in PydanticAITurn and drive UnifiedEmitter.yield_turn / auto_send_turn. +await UnifiedEmitter(...).auto_send_turn(PydanticAITurn(...)) +``` + +The `agentex init` templates were migrated to this pattern. If you scaffolded +from an older template, regenerate (or diff against a fresh template) for the +canonical shape. + +--- + +## 2. Private `_modules` import paths changed — **only if you import privates** + +Each harness now exposes exactly `__sync.py` + `__turn.py` under +`agentex.lib.adk._modules`. Several private modules were deleted and their +functions relocated. If you imported the **public facade names** from +`agentex.lib.adk`, **nothing changes**. Repoint only if you reached into the +private modules directly: + +| Old (deleted) private import | New location | Public facade (unchanged) | +|---|---|---| +| `_modules._langgraph_async.stream_langgraph_events` | `_modules._langgraph_turn` | `adk.stream_langgraph_events` | +| `_modules._langgraph_messages.emit_langgraph_messages` | `_modules._langgraph_sync` | `adk.emit_langgraph_messages` | +| `_modules._langgraph_tracing.*` | **removed** (see §1) | — | +| `_modules._pydantic_ai_async.stream_pydantic_ai_events` | `_modules._pydantic_ai_turn` | `adk.stream_pydantic_ai_events` | +| `_modules._pydantic_ai_tracing.*` | **removed** (see §1) | — | + +✅ These facade names are unchanged and keep working: +`stream_langgraph_events`, `emit_langgraph_messages`, +`convert_langgraph_to_agentex_events`, `LangGraphTurn`, +`stream_pydantic_ai_events`, `convert_pydantic_ai_to_agentex_events`, +`PydanticAITurn`. + +--- + +## 3. OpenAI harness moved into `adk/_modules` + facade export + +The OpenAI Agents harness now lives alongside the others: + +- `OpenAITurn`, `openai_usage_to_turn_usage` → `agentex.lib.adk._modules._openai_turn` +- `convert_openai_to_agentex_events` → `agentex.lib.adk._modules._openai_sync` + +New **public** facade exports (prefer these): + +```python +from agentex.lib.adk import ( + OpenAITurn, + convert_openai_to_agentex_events, + openai_usage_to_turn_usage, +) +``` + +Back-compat shims remain at +`agentex.lib.adk.providers._modules.{openai_turn,sync_provider}` **for one +release** — migrate to the facade names before the next minor. + +--- + +## 4. New capabilities (opt-in, no migration required) + +- **`run_turn` — unified Temporal entry point for OpenAI Agents.** + + ```python + from agentex.lib.core.temporal.plugins.openai_agents import run_turn, OpenAIAgentsTurnResult + + result = await run_turn( + agent, input, + task_id=task_id, + trace_id=trace_id, + parent_span_id=parent_span_id, + ) + result.final_output # raw SDK final_output + result.usage # normalized TurnUsage for the turn span + ``` + + It emits each tool call exactly once (the streaming model is the sole + tool-**request** emitter; hooks emit tool **responses**), traces per-tool spans, + normalizes token usage, and drains orphaned tool spans in a `finally` block if + the run terminates mid-tool. Existing `TemporalStreamingHooks` callers keep + working — `run_turn` is additive. If you pass your own `hooks` subclass, also + set `emit_tool_requests=False` and forward `trace_id` / `parent_span_id` + yourself (they are only auto-applied to the default hooks). + +- **Hosted / server-side tool rendering** in `TemporalStreamingModel`: + web_search, file_search, code_interpreter, image_generation, server-side mcp, + computer, and local_shell calls now surface as ToolRequest/ToolResponse pairs. + +- **New CLI init templates:** `default` / `sync` / `temporal` flavors of + `claude-code` and `codex`, plus `default-openai-agents`. + +--- + +## 5. Defect fixes shipped with this migration + +These fixes harden the newly-added sync OpenAI converter +(`convert_openai_to_agentex_events` / `OpenAITurn`) and the Temporal hosted-tool +path. No API change — behavior only. + +1. **Malformed tool arguments no longer abort the turn.** The converter now + parses raw tool-call arguments through a defensive helper + (`_safe_parse_arguments`): a non-decodable string is preserved under `raw` + and a non-dict JSON value under `value`, instead of raising `JSONDecodeError` + and killing the run before later output is delivered. This matches the + Temporal streaming model's existing fallback. + +2. **Reasoning messages are closed.** Completed reasoning content/summary items + now emit a matching `StreamTaskMessageDone`. Previously the `Done` was + skipped, so `UnifiedEmitter.auto_send` never released the context and the + reasoning span could be marked incomplete (reasoning-model output appeared to + hang). + +3. **Text no longer collides with reasoning.** Every new text `item_id` now + reserves a fresh message index (matching the increment-then-use convention of + the reasoning/tool paths). Previously the first text item reused the current + index, so on reasoning-model streams the final answer could overwrite the + reasoning message, duplicate a `Start`, or route deltas into the wrong context. + +4. **Hosted-tool response shape aligned.** Hosted/server-side tool responses in + `TemporalStreamingModel` now emit `content` as a plain string, matching the + function-tool response path (`on_tool_end`) so hosted and function tools + render identically within the same flow. + +5. **Reasoning text now appears in derived spans.** `SpanDeriver` opened reasoning + spans with empty input and closed them with `output=None`, so reasoning/thinking + text never reached the trace (spans showed blank — read as "0 reasoning traces"). + It now accumulates the `ReasoningContentDelta` / `ReasoningSummaryDelta` text (and + any text seeded on the Start content) and records it as the span output. Affects + every harness that streams reasoning, including the Claude Code tap. + +6. **Claude Code: no more duplicate text messages.** The `stream-json` converter + deduped streamed-vs-materialized blocks by numeric block index and reset that + state after every materialized `assistant` envelope. A single streamed message + that materializes as several envelopes (thinking, then text) lost the dedup + marker between envelopes and re-emitted the text. Dedup is now **content-based** + (match the streamed block's text, consume once), which a numeric index cannot do + reliably. + +> Action: if you adopted `OpenAITurn` for **reasoning models** (o1/o3/gpt-5) on +> the sync path before these fixes, upgrade — fixes 2 and 3 are required for +> correct reasoning rendering. Claude Code agents on the unified harness tap should +> upgrade for fixes 5 and 6. + +--- + +## 6. Legacy Temporal `claude_agents` plugin → unified harness tap + +`agentex.lib.core.temporal.plugins.claude_agents` (`run_claude_agent_activity`, +`create_streaming_hooks`, `TemporalStreamingHooks`, `ClaudeMessageHandler`) is the +**original** Claude Code integration: it drives the Python `claude-agent-sdk` +directly and hand-rolls its own streaming + tracing. It is **superseded** by the +unified harness tap and slated for removal in a future release. It still works +today, so this migration is **recommended, not yet required** — but new Claude Code +agents should use the tap, and existing ones should plan to move. + +Why migrate: the tap routes Claude Code through the same canonical +`StreamTaskMessage*` stream as every other harness, so it gets central span +derivation (tool **and** reasoning spans), the single delivery path +(`UnifiedEmitter`), and fixes like the two above for free. The legacy plugin does +not derive reasoning spans at all and duplicates the streaming/tracing logic. + +**Before — legacy plugin activity:** + +```python +from agentex.lib.core.temporal.plugins.claude_agents import run_claude_agent_activity + +# In the workflow: +result = await workflow.execute_activity( + run_claude_agent_activity, + args=[prompt, workspace_path, allowed_tools, ...], + start_to_close_timeout=..., +) +``` + +**After — unified harness tap.** Run the CLI yourself (`claude -p --output-format +stream-json --include-partial-messages`), wrap its stdout in `ClaudeCodeTurn`, and +deliver through `UnifiedEmitter`: + +```python +from agentex.lib.adk import ClaudeCodeTurn, UnifiedEmitter + +# `stdout_lines` is an async iterator of the CLI's stdout lines (raw JSON strings +# or pre-parsed dicts) — e.g. read from sandbox.exec() / a subprocess. +turn = ClaudeCodeTurn(stdout_lines) + +emitter = UnifiedEmitter(task_id=task_id, trace_id=trace_id, parent_span_id=parent_span_id) +result = await emitter.auto_send_turn(turn, created_at=workflow.now()) +# result.final_text — last text segment +# result.usage — TurnUsage (tokens, cost, num_reasoning_blocks, ...) +``` + +The golden agent is the reference implementation +(`teams/sgp/agents/golden_agent/project/harness/`): it spawns the CLI in a sandbox, +yields stdout lines into `ClaudeCodeTurn`, and drives `auto_send_turn`. Known +remaining consumers to migrate: the `090_claude_agents_sdk_mvp` tutorial and the +`eval_dashboard_agent`. diff --git a/src/agentex/lib/adk/_modules/_claude_code_sync.py b/src/agentex/lib/adk/_modules/_claude_code_sync.py index 4e25503cf..93a639118 100644 --- a/src/agentex/lib/adk/_modules/_claude_code_sync.py +++ b/src/agentex/lib/adk/_modules/_claude_code_sync.py @@ -98,18 +98,15 @@ async def convert_claude_code_to_agentex_events( _text_open = False _text_buf = "" _text_index: int | None = None - # Track which assistant-message block indices were already streamed via - # stream_event triples. Those blocks must not be re-emitted when the full - # assistant message arrives. Reset at each message boundary (see below) so a - # later turn's block indices don't collide with an earlier turn's. - _streamed_block_indexes: set[int] = set() - # Once-guard so a thinking block's pending index is claimed on its first - # thinking_delta only. Reset per turn alongside _streamed_block_indexes. - _saw_thinking_stream = False - # For deferred ReasoningStarted: if a content_block_start(thinking) arrives - # but no thinking_delta ever follows, the final assistant block's thinking - # field fills the reasoning content instead. - _pending_thinking_block_index: int | None = None + # Full text of each block already delivered via stream_event deltas, so the + # materialised assistant envelope does not re-emit it. Matched by CONTENT, + # not block index: a single streamed message can arrive as several assistant + # envelopes (e.g. a thinking block, then the text block), and the per-block + # numeric index does not survive that split while the text does. Each match + # is consumed (one entry removed) so a genuinely repeated later block — a new + # turn that happens to emit identical text — is still delivered. + _streamed_texts: list[str] = [] + _streamed_thinkings: list[str] = [] async for raw in lines: if not raw: @@ -138,43 +135,56 @@ async def convert_claude_code_to_agentex_events( if not isinstance(blocks, list): blocks = [blocks] - for idx, block in enumerate(blocks): + for block in blocks: if not isinstance(block, dict): continue block_type = block.get("type", "") if block_type == "text": - # Skip only the specific blocks already delivered via - # stream_event deltas (per-block, not a turn-wide latch). - if idx in _streamed_block_indexes: - continue text = block.get("text", "") - if text: - msg_index = next_index - next_index += 1 - yield StreamTaskMessageStart( - type="start", - index=msg_index, - content=TextContent( - type="text", - author="agent", - content="", - ), - ) - yield StreamTaskMessageDelta( - type="delta", - index=msg_index, - delta=TextDelta(type="text", text_delta=text), - ) - yield StreamTaskMessageDone(type="done", index=msg_index) + if not text: + continue + # Skip blocks already delivered via stream_event deltas. Two + # cases: (1) the streamed block already finished — its full + # text is recorded in _streamed_texts; (2) the materialised + # envelope arrives INTERLEAVED, mid-stream, before the streamed + # block's content_block_stop records its buffer — the still-open + # block's partial buffer is a prefix of this full text. + if text in _streamed_texts: + _streamed_texts.remove(text) + continue + if _text_open and _text_buf and text.startswith(_text_buf): + continue + msg_index = next_index + next_index += 1 + yield StreamTaskMessageStart( + type="start", + index=msg_index, + content=TextContent( + type="text", + author="agent", + content="", + ), + ) + yield StreamTaskMessageDelta( + type="delta", + index=msg_index, + delta=TextDelta(type="text", text_delta=text), + ) + yield StreamTaskMessageDone(type="done", index=msg_index) elif block_type == "thinking": - # Skip only the specific blocks already delivered via - # stream_event deltas (per-block, not a turn-wide latch). - if idx in _streamed_block_indexes: - continue thinking_text = block.get("thinking", "") if thinking_text: + # Skip blocks already delivered via stream_event deltas. + # Same two cases as text above: finished streamed block + # (recorded), or an interleaved materialised envelope whose + # text the still-open streamed buffer is a prefix of. + if thinking_text in _streamed_thinkings: + _streamed_thinkings.remove(thinking_text) + continue + if _thinking_open and _thinking_buf and thinking_text.startswith(_thinking_buf): + continue summary = _extract_summary(thinking_text) msg_index = next_index next_index += 1 @@ -243,20 +253,12 @@ async def convert_claude_code_to_agentex_events( ), ) - # End of a materialised message: reset per-turn streaming dedup state - # so the next turn's stream_event indices start clean. Without this, - # a block index streamed in an earlier turn would linger in the set - # and silently drop a later turn's non-streamed block at that index. - _streamed_block_indexes = set() - _saw_thinking_stream = False - # ----------------------------------------------------------------------- # stream_event — incremental streaming deltas # ----------------------------------------------------------------------- elif evt_type == "stream_event": se = evt.get("event") or {} se_type = se.get("type", "") - block_index = se.get("index") if se_type == "content_block_start": block = se.get("content_block") or {} @@ -265,11 +267,6 @@ async def convert_claude_code_to_agentex_events( if btype == "thinking": _thinking_open = True _thinking_buf = "" - # Defer marking the block as streamed until we actually - # receive a thinking_delta. Some configurations emit a - # thinking block_start but no deltas — in that case we want - # the final assistant-message handler to fill the text. - _pending_thinking_block_index = block_index if isinstance(block_index, int) else None msg_index = next_index next_index += 1 _thinking_index = msg_index @@ -288,8 +285,6 @@ async def convert_claude_code_to_agentex_events( elif btype == "text": _text_open = True _text_buf = "" - if isinstance(block_index, int): - _streamed_block_indexes.add(block_index) msg_index = next_index next_index += 1 _text_index = msg_index @@ -310,12 +305,6 @@ async def convert_claude_code_to_agentex_events( if dtype == "thinking_delta": chunk = delta.get("thinking", "") if chunk and _thinking_open: - if not _saw_thinking_stream: - _saw_thinking_stream = True - # Now mark the block as claimed so the assistant - # message handler won't re-emit it. - if _pending_thinking_block_index is not None: - _streamed_block_indexes.add(_pending_thinking_block_index) _thinking_buf += chunk if _thinking_index is not None: yield StreamTaskMessageDelta( @@ -342,18 +331,21 @@ async def convert_claude_code_to_agentex_events( elif se_type == "content_block_stop": if _thinking_open: _thinking_open = False + # Record the streamed thinking so the materialised assistant + # envelope doesn't re-emit it. Skip empties: a block_start with + # no deltas leaves the assistant envelope free to fill the text. + if _thinking_buf: + _streamed_thinkings.append(_thinking_buf) _thinking_buf = "" - _pending_thinking_block_index = None - # Reset the once-guard per thinking block: a turn can stream a - # second thinking block, and without this the guard stays True, - # the second block's index is never claimed, and the final - # assistant envelope re-emits it (duplicate Start/Delta/Done). - _saw_thinking_stream = False if _thinking_index is not None: yield StreamTaskMessageDone(type="done", index=_thinking_index) _thinking_index = None elif _text_open: _text_open = False + # Record the streamed text for content-based dedup against the + # materialised assistant envelope (see _streamed_texts). + if _text_buf: + _streamed_texts.append(_text_buf) _text_buf = "" if _text_index is not None: yield StreamTaskMessageDone(type="done", index=_text_index) diff --git a/src/agentex/lib/adk/_modules/_openai_sync.py b/src/agentex/lib/adk/_modules/_openai_sync.py index 75d8f8f2a..ac404bef1 100644 --- a/src/agentex/lib/adk/_modules/_openai_sync.py +++ b/src/agentex/lib/adk/_modules/_openai_sync.py @@ -12,6 +12,7 @@ from __future__ import annotations +import json from typing import Any from openai.types.responses import ( @@ -43,6 +44,33 @@ from agentex.types.reasoning_summary_delta import ReasoningSummaryDelta +def _safe_parse_arguments(arguments: Any) -> dict[str, Any]: + """Coerce a tool call's ``arguments`` into a dict, tolerating bad JSON. + + ``ToolRequestContent.arguments`` is typed ``Dict[str, object]``, so the + result is ALWAYS a dict — a non-dict payload must not abort the turn. + Mirroring the Temporal streaming model: malformed/truncated strings are + preserved under ``raw``, and any other non-dict value (a list, scalar, or + SDK object) is serialized if possible, otherwise wrapped under ``value``. + """ + if not arguments: + return {} + if isinstance(arguments, dict): + return arguments + if isinstance(arguments, str): + try: + parsed = json.loads(arguments) + except (json.JSONDecodeError, ValueError): + return {"raw": arguments} + return parsed if isinstance(parsed, dict) else {"value": parsed} + # Non-string, non-dict (e.g. a provider tool passing a list / scalar / SDK + # object). Prefer the object's own dict form; fall back to wrapping it. + dumped = arguments.model_dump() if hasattr(arguments, "model_dump") else None + if isinstance(dumped, dict): + return dumped + return {"value": arguments} + + def _extract_tool_call_info(tool_call_item: Any) -> tuple[str, str, dict[str, Any]]: """ Extract call_id, tool_name, and tool_arguments from a tool call item. @@ -69,30 +97,12 @@ def _extract_tool_call_info(tool_call_item: Any) -> tuple[str, str, dict[str, An elif isinstance(tool_call_item, ResponseFunctionToolCall): # Handle standard function tool calls tool_name = tool_call_item.name - # Handle the arguments field which might be a string or None - if tool_call_item.arguments: - if isinstance(tool_call_item.arguments, str): - import json - - tool_arguments = json.loads(tool_call_item.arguments) if tool_call_item.arguments else {} - else: - tool_arguments = tool_call_item.arguments - else: - tool_arguments = {} + tool_arguments = _safe_parse_arguments(tool_call_item.arguments) else: # Generic handling for any tool call type tool_name = getattr(tool_call_item, "name", type(tool_call_item).__name__) - # Handle the arguments field which might be a string or None if hasattr(tool_call_item, "arguments"): - arguments = tool_call_item.arguments - if isinstance(arguments, str): - import json - - tool_arguments = json.loads(arguments) if arguments else {} - elif arguments is None: - tool_arguments = {} - else: - tool_arguments = arguments + tool_arguments = _safe_parse_arguments(tool_call_item.arguments) else: tool_arguments = tool_call_item.model_dump() @@ -150,7 +160,6 @@ async def convert_openai_to_agentex_events(stream_response): tool_map = {} event_count = 0 message_index = 0 # Track message index for proper sequencing - seen_tool_output = False # Track if we've seen tool output to know when final text starts item_id_to_index = {} # Map item_id to message index item_id_to_type = {} # Map item_id to content type (text, reasoning_content, reasoning_summary) @@ -172,16 +181,16 @@ async def convert_openai_to_agentex_events(stream_response): elif isinstance(raw_event, ResponseOutputItemDoneEvent): item_id = raw_event.item.id if item_id in item_id_to_index: - # Get the message type to decide whether to send done event - message_type = item_id_to_type.get(item_id, "text") - - # Don't send done events for reasoning content/summary - # They just end with their last delta - if message_type not in ("reasoning_content", "reasoning_summary"): - yield StreamTaskMessageDone( - type="done", - index=item_id_to_index[item_id], - ) + # Close every streamed message — text AND reasoning — with a + # matching Done. UnifiedEmitter.auto_send only releases a + # context on StreamTaskMessageDone; skipping it for reasoning + # left those messages hanging and their spans incomplete. The + # accumulator rebuilds ReasoningContent from the deltas, so the + # Done carries no payload. + yield StreamTaskMessageDone( + type="done", + index=item_id_to_index[item_id], + ) # Skip reasoning summary part added events - we handle them on delta elif isinstance(raw_event, ResponseReasoningSummaryPartAddedEvent): @@ -292,17 +301,14 @@ async def convert_openai_to_agentex_events(stream_response): # Check if this event has an item_id item_id = getattr(raw_event, "item_id", None) - # If this is a new item_id we haven't seen, it's a new message + # If this is a new item_id we haven't seen, it's a new message. + # Reserve a fresh index for every text item_id (matching the + # increment-then-use convention of the reasoning/tool paths). + # Reusing the current index let a final answer collide with the + # preceding reasoning message on reasoning-model streams. if item_id and item_id not in item_id_to_index: - # Check if this is truly a NEW text message after tools - # We need to differentiate between the first text and the final text after tools - if seen_tool_output: - # This is the final text message after tool execution - message_index += 1 - item_id_to_index[item_id] = message_index - else: - item_id_to_index[item_id] = message_index - + message_index += 1 + item_id_to_index[item_id] = message_index item_id_to_type[item_id] = "text" # Send a start event with empty content for this new text message @@ -363,7 +369,6 @@ async def convert_openai_to_agentex_events(stream_response): author="agent", ) message_index += 1 # Increment for new message - seen_tool_output = True # Mark that we've seen tool output so next text gets new index yield StreamTaskMessageFull( type="full", index=message_index, diff --git a/src/agentex/lib/core/harness/span_derivation.py b/src/agentex/lib/core/harness/span_derivation.py index cecb24bcc..c0ed6ee90 100644 --- a/src/agentex/lib/core/harness/span_derivation.py +++ b/src/agentex/lib/core/harness/span_derivation.py @@ -19,6 +19,8 @@ ) from agentex.types.tool_request_content import ToolRequestContent from agentex.types.tool_response_content import ToolResponseContent +from agentex.types.reasoning_content_delta import ReasoningContentDelta +from agentex.types.reasoning_summary_delta import ReasoningSummaryDelta @dataclass @@ -51,6 +53,9 @@ class SpanDeriver: def __init__(self) -> None: self._tool_by_index: dict[int, _ToolReqMeta] = {} self._reasoning_index_open: set[int] = set() + # accumulated reasoning text per open reasoning index, recorded as the + # span output on close (deltas carry the chain-of-thought / summary text). + self._reasoning_text: dict[int, str] = {} # insertion-ordered set of open tool_call_ids (dict keys preserve order) self._open_tool_ids: dict[str, None] = {} @@ -72,8 +77,10 @@ def flush(self) -> list[SpanSignal]: signals.append(CloseSpan(key=tcid, output=None, is_complete=False)) self._open_tool_ids.clear() for idx in sorted(self._reasoning_index_open): - signals.append(CloseSpan(key=f"reasoning:{idx}", output=None, is_complete=False)) + text = self._reasoning_text.pop(idx, "") + signals.append(CloseSpan(key=f"reasoning:{idx}", output=text or None, is_complete=False)) self._reasoning_index_open.clear() + self._reasoning_text.clear() return signals def _on_start(self, event: StreamTaskMessageStart) -> list[SpanSignal]: @@ -90,6 +97,11 @@ def _on_start(self, event: StreamTaskMessageStart) -> list[SpanSignal]: return [] if content.type == "reasoning": self._reasoning_index_open.add(idx) + # Seed from any text already on the Start content — non-streaming + # harnesses may carry the full reasoning up front; deltas append. + summary = getattr(content, "summary", None) or [] + body = getattr(content, "content", None) or [] + self._reasoning_text[idx] = "".join([*summary, *body]) return [OpenSpan(key=f"reasoning:{idx}", kind="reasoning", name="reasoning", input={})] return [] @@ -102,6 +114,12 @@ def _on_delta(self, event: StreamTaskMessageDelta) -> list[SpanSignal]: meta = self._tool_by_index.get(idx) if meta is not None and delta.arguments_delta: meta.args_buf += delta.arguments_delta + elif isinstance(delta, ReasoningContentDelta): + if idx in self._reasoning_index_open and delta.content_delta: + self._reasoning_text[idx] = self._reasoning_text.get(idx, "") + delta.content_delta + elif isinstance(delta, ReasoningSummaryDelta): + if idx in self._reasoning_index_open and delta.summary_delta: + self._reasoning_text[idx] = self._reasoning_text.get(idx, "") + delta.summary_delta return [] def _on_full(self, event: StreamTaskMessageFull) -> list[SpanSignal]: @@ -150,5 +168,6 @@ def _on_done(self, event: StreamTaskMessageDone) -> list[SpanSignal]: return [OpenSpan(key=meta.tool_call_id, kind="tool", name=meta.name, input=args)] if idx in self._reasoning_index_open: self._reasoning_index_open.discard(idx) - return [CloseSpan(key=f"reasoning:{idx}", output=None, is_complete=True)] + text = self._reasoning_text.pop(idx, "") + return [CloseSpan(key=f"reasoning:{idx}", output=text or None, is_complete=True)] return [] diff --git a/src/agentex/lib/core/harness/tracer.py b/src/agentex/lib/core/harness/tracer.py index 0c6167b76..bf37bad30 100644 --- a/src/agentex/lib/core/harness/tracer.py +++ b/src/agentex/lib/core/harness/tracer.py @@ -16,6 +16,21 @@ logger = logging.getLogger(__name__) +def _as_span_payload(value: Any, *, key: str) -> Any: + """Coerce a span input/output payload into a dict. + + The SGP spans API requires ``input`` and ``output`` to be objects: a scalar + or string is rejected with a 422 and the span is dropped by the async + processor. The SpanDeriver legitimately produces non-dict payloads — the + reasoning span's output is the chain-of-thought string, and some harnesses' + tool results are plain strings — so wrap anything that isn't already a dict + (``None`` passes through unchanged so an absent payload stays absent). + """ + if value is None or isinstance(value, dict): + return value + return {key: value} + + class SpanTracer: """Opens/closes adk.tracing child spans in response to span signals. @@ -60,7 +75,7 @@ async def handle(self, signal: SpanSignal) -> None: span = await self._tracing.start_span( trace_id=self.trace_id, name=signal.name, - input=signal.input, + input=_as_span_payload(signal.input, key="input"), parent_id=self.parent_span_id, task_id=self.task_id, ) @@ -73,7 +88,7 @@ async def handle(self, signal: SpanSignal) -> None: # The real TracingModule.end_span signature is: # end_span(trace_id, span, start_to_close_timeout, heartbeat_timeout, retry_policy) # It does not accept an output= kwarg. - span.output = signal.output + span.output = _as_span_payload(signal.output, key="output") # Tool failure status (ToolResponseContent.is_error) is recorded # on span.data when the harness reports one; Span has no dedicated # error field. None means no status was reported, so leave data alone. diff --git a/src/agentex/lib/core/temporal/plugins/claude_agents/__init__.py b/src/agentex/lib/core/temporal/plugins/claude_agents/__init__.py index fd40545ec..1e9ee694a 100644 --- a/src/agentex/lib/core/temporal/plugins/claude_agents/__init__.py +++ b/src/agentex/lib/core/temporal/plugins/claude_agents/__init__.py @@ -1,5 +1,16 @@ """Claude Agents SDK integration with Temporal. +.. deprecated:: + This is the original Claude Code integration: it drives the Python + ``claude-agent-sdk`` directly and hand-rolls its own streaming + tracing + (and does not derive reasoning spans). It is superseded by the unified + harness tap (``agentex.lib.adk.ClaudeCodeTurn`` over the ``claude -p + --output-format stream-json`` CLI stdout, delivered via ``UnifiedEmitter``), + which routes Claude Code through the same canonical ``StreamTaskMessage*`` + stream as every other harness. It still works, but new agents should use the + tap and existing ones should plan to migrate; see + ``adk/docs/migration-0.16.0.md`` for the before/after. + This plugin provides integration between Claude Agents SDK and AgentEx's Temporal-based orchestration platform. diff --git a/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py b/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py index 38e3503d7..7c8690f21 100644 --- a/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py +++ b/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py @@ -1087,7 +1087,10 @@ async def get_response( author="agent", tool_call_id=call_id, name=name, - content={"result": _hosted_tool_result(item)[:_HOSTED_TOOL_RESULT_CAP]}, + # Plain string, matching the function-tool response + # path (hooks.on_tool_end) so hosted and function + # tools render identically in the same flow. + content=_hosted_tool_result(item)[:_HOSTED_TOOL_RESULT_CAP], ), ) diff --git a/tests/lib/adk/test_claude_code_sync.py b/tests/lib/adk/test_claude_code_sync.py index 6dd36d973..5a78acaf7 100644 --- a/tests/lib/adk/test_claude_code_sync.py +++ b/tests/lib/adk/test_claude_code_sync.py @@ -140,6 +140,84 @@ async def test_streamed_text_not_re_emitted_by_assistant_block(self): text_starts = [e for e in out if isinstance(e, StreamTaskMessageStart) and isinstance(e.content, TextContent)] assert len(text_starts) == 1, "Text block must not be emitted twice" + async def test_streamed_message_split_across_assistant_envelopes_not_duplicated(self): + """Regression: one streamed message (thinking + text) can materialise as + SEPARATE assistant envelopes. Content-based dedup must skip both streamed + blocks even though the text arrives in its own later envelope — an earlier + index-based scheme re-emitted the text (duplicate).""" + envelopes = [ + # Streamed: thinking at block index 0, then text at block index 1. + { + "type": "stream_event", + "event": {"type": "content_block_start", "index": 0, "content_block": {"type": "thinking"}}, + }, + { + "type": "stream_event", + "event": { + "type": "content_block_delta", + "index": 0, + "delta": {"type": "thinking_delta", "thinking": "ponder"}, + }, + }, + {"type": "stream_event", "event": {"type": "content_block_stop", "index": 0}}, + { + "type": "stream_event", + "event": {"type": "content_block_start", "index": 1, "content_block": {"type": "text"}}, + }, + { + "type": "stream_event", + "event": {"type": "content_block_delta", "index": 1, "delta": {"type": "text_delta", "text": "answer"}}, + }, + {"type": "stream_event", "event": {"type": "content_block_stop", "index": 1}}, + # Materialised as two separate assistant envelopes (thinking alone at + # idx 0, then text alone at idx 0) — the shape that caused duplicates. + {"type": "assistant", "message": {"content": [{"type": "thinking", "thinking": "ponder"}]}}, + {"type": "assistant", "message": {"content": [{"type": "text", "text": "answer"}]}}, + ] + out = await _collect(convert_claude_code_to_agentex_events(_aiter(envelopes))) + text_starts = [e for e in out if isinstance(e, StreamTaskMessageStart) and isinstance(e.content, TextContent)] + reasoning_starts = [ + e for e in out if isinstance(e, StreamTaskMessageStart) and isinstance(e.content, ReasoningContent) + ] + assert len(text_starts) == 1, "Streamed text must not be re-emitted by its own materialised envelope" + assert len(reasoning_starts) == 1, "Streamed thinking must not be re-emitted either" + + async def test_interleaved_materialized_block_not_duplicated(self): + """Regression: the materialised `assistant` envelope can arrive MID-stream + (before the streamed block's content_block_stop). Content-recorded dedup + hasn't fired yet, so the still-open block's partial buffer is prefix-matched + against the materialised full text to suppress the duplicate.""" + envelopes = [ + { + "type": "stream_event", + "event": {"type": "content_block_start", "index": 0, "content_block": {"type": "thinking"}}, + }, + { + "type": "stream_event", + "event": { + "type": "content_block_delta", + "index": 0, + "delta": {"type": "thinking_delta", "thinking": "I"}, + }, + }, + # Materialised envelope interleaved before content_block_stop. + {"type": "assistant", "message": {"content": [{"type": "thinking", "thinking": "I need to load tools."}]}}, + { + "type": "stream_event", + "event": { + "type": "content_block_delta", + "index": 0, + "delta": {"type": "thinking_delta", "thinking": " need to load tools."}, + }, + }, + {"type": "stream_event", "event": {"type": "content_block_stop", "index": 0}}, + ] + out = await _collect(convert_claude_code_to_agentex_events(_aiter(envelopes))) + reasoning_starts = [ + e for e in out if isinstance(e, StreamTaskMessageStart) and isinstance(e.content, ReasoningContent) + ] + assert len(reasoning_starts) == 1, "Interleaved materialised reasoning must not duplicate the streamed block" + async def test_later_turn_non_streamed_text_not_dropped(self): """A non-streamed text block in a later turn must not be dropped because an earlier turn streamed a block at the same index.""" diff --git a/tests/lib/adk/test_openai_sync.py b/tests/lib/adk/test_openai_sync.py new file mode 100644 index 000000000..de2a61db8 --- /dev/null +++ b/tests/lib/adk/test_openai_sync.py @@ -0,0 +1,189 @@ +"""Tests for ``convert_openai_to_agentex_events`` and its helpers. + +Focused on three previously-broken behaviors on the sync OpenAI converter: + +- ``_safe_parse_arguments`` never raises on malformed/non-dict JSON (a bad + tool-args string must not abort the whole turn). +- Every streamed item — text AND reasoning — is closed with a matching + ``StreamTaskMessageDone`` (reasoning messages used to hang open). +- Each new text ``item_id`` gets a fresh index, so a final answer cannot + collide with the preceding reasoning message on reasoning-model streams. +""" + +import types as _types + +import pytest +from openai.types.responses import ResponseTextDeltaEvent, ResponseOutputItemDoneEvent +from openai.types.responses.response_output_message import ResponseOutputMessage +from openai.types.responses.response_reasoning_item import ResponseReasoningItem +from openai.types.responses.response_reasoning_text_delta_event import ResponseReasoningTextDeltaEvent + +from agentex.types.task_message_update import ( + StreamTaskMessageDone, + StreamTaskMessageDelta, + StreamTaskMessageStart, +) +from agentex.lib.adk._modules._openai_sync import ( + _safe_parse_arguments, + convert_openai_to_agentex_events, +) + +# --------------------------------------------------------------------------- +# _safe_parse_arguments +# --------------------------------------------------------------------------- + + +def test_safe_parse_arguments_valid_dict_json(): + assert _safe_parse_arguments('{"a": 1}') == {"a": 1} + + +def test_safe_parse_arguments_empty_and_none(): + assert _safe_parse_arguments("") == {} + assert _safe_parse_arguments(None) == {} + + +def test_safe_parse_arguments_passthrough_dict(): + d = {"already": "dict"} + assert _safe_parse_arguments(d) is d + + +def test_safe_parse_arguments_malformed_preserved_not_raised(): + # A truncated / malformed payload must be preserved, never raise — raising + # here would abort the whole turn before later output is delivered. + assert _safe_parse_arguments('{"a": ') == {"raw": '{"a": '} + + +def test_safe_parse_arguments_non_dict_json_wrapped(): + # Valid JSON that isn't an object is wrapped so the result stays a dict. + assert _safe_parse_arguments("[1, 2]") == {"value": [1, 2]} + assert _safe_parse_arguments("42") == {"value": 42} + + +def test_safe_parse_arguments_non_string_non_dict_always_returns_dict(): + # A provider tool may pass arguments as a list / scalar / SDK object rather + # than a JSON string. The result must still be a dict so ToolRequestContent + # (arguments: Dict[str, object]) accepts it instead of raising. + assert _safe_parse_arguments([1, 2]) == {"value": [1, 2]} + assert _safe_parse_arguments(7) == {"value": 7} + + class _Args: + def model_dump(self): + return {"q": "hi"} + + assert _safe_parse_arguments(_Args()) == {"q": "hi"} + + # An SDK object whose model_dump is not a dict still degrades to a dict. + class _BadDump: + def model_dump(self): + return ["not", "a", "dict"] + + bad = _BadDump() + assert _safe_parse_arguments(bad) == {"value": bad} + + +# --------------------------------------------------------------------------- +# convert_openai_to_agentex_events — reasoning + text sequencing +# --------------------------------------------------------------------------- + + +def _raw(data): + return _types.SimpleNamespace(type="raw_response_event", data=data) + + +async def _stream(events): + for e in events: + yield e + + +async def _collect(events): + return [e async for e in convert_openai_to_agentex_events(_stream(events))] + + +@pytest.mark.asyncio +async def test_reasoning_item_emits_done(): + """A completed reasoning item must yield a matching Done (it used to be skipped).""" + events = [ + _raw( + ResponseReasoningTextDeltaEvent( + type="response.reasoning_text.delta", + item_id="r1", + content_index=0, + delta="thinking", + output_index=0, + sequence_number=1, + ) + ), + _raw( + ResponseOutputItemDoneEvent( + type="response.output_item.done", + item=ResponseReasoningItem(id="r1", type="reasoning", summary=[]), + output_index=0, + sequence_number=2, + ) + ), + ] + out = await _collect(events) + + starts = [e for e in out if isinstance(e, StreamTaskMessageStart)] + dones = [e for e in out if isinstance(e, StreamTaskMessageDone)] + assert len(starts) == 1 + # The reasoning message is now closed instead of hanging open. + assert [d.index for d in dones] == [starts[0].index] + + +@pytest.mark.asyncio +async def test_reasoning_then_text_use_distinct_indices(): + """Final answer text must not reuse the reasoning message's index.""" + events = [ + _raw( + ResponseReasoningTextDeltaEvent( + type="response.reasoning_text.delta", + item_id="r1", + content_index=0, + delta="thinking", + output_index=0, + sequence_number=1, + ) + ), + _raw( + ResponseOutputItemDoneEvent( + type="response.output_item.done", + item=ResponseReasoningItem(id="r1", type="reasoning", summary=[]), + output_index=0, + sequence_number=2, + ) + ), + _raw( + ResponseTextDeltaEvent( + type="response.output_text.delta", + item_id="t1", + content_index=0, + delta="answer", + output_index=1, + sequence_number=3, + logprobs=[], + ) + ), + _raw( + ResponseOutputItemDoneEvent( + type="response.output_item.done", + item=ResponseOutputMessage(id="t1", type="message", role="assistant", status="completed", content=[]), + output_index=1, + sequence_number=4, + ) + ), + ] + out = await _collect(events) + + starts = [e for e in out if isinstance(e, StreamTaskMessageStart)] + assert len(starts) == 2 + reasoning_index, text_index = starts[0].index, starts[1].index + assert reasoning_index != text_index + + # Text deltas route to the text index, not the reasoning index. + text_deltas = [e for e in out if isinstance(e, StreamTaskMessageDelta) and e.delta.type == "text"] + assert text_deltas and all(d.index == text_index for d in text_deltas) + + # Both messages are closed on their own index. + done_indices = sorted(e.index for e in out if isinstance(e, StreamTaskMessageDone)) + assert done_indices == sorted({reasoning_index, text_index}) diff --git a/tests/lib/core/harness/test_auto_send.py b/tests/lib/core/harness/test_auto_send.py index 764dae8b3..8133a488c 100644 --- a/tests/lib/core/harness/test_auto_send.py +++ b/tests/lib/core/harness/test_auto_send.py @@ -218,7 +218,8 @@ async def test_auto_send_derives_tool_spans_via_tracer(): assert result.final_text == "" assert fake_tracing.started_names == ["Bash"] - assert fake_tracing.ended_outputs == ["ok"] + # String tool output is wrapped in a dict (SGP spans require an object). + assert fake_tracing.ended_outputs == [{"output": "ok"}] # --------------------------------------------------------------------------- diff --git a/tests/lib/core/harness/test_span_derivation.py b/tests/lib/core/harness/test_span_derivation.py index 51e2ede2c..6376dc0c6 100644 --- a/tests/lib/core/harness/test_span_derivation.py +++ b/tests/lib/core/harness/test_span_derivation.py @@ -10,6 +10,8 @@ ) from agentex.types.tool_request_content import ToolRequestContent from agentex.types.tool_response_content import ToolResponseContent +from agentex.types.reasoning_content_delta import ReasoningContentDelta +from agentex.types.reasoning_summary_delta import ReasoningSummaryDelta from agentex.lib.core.harness.span_derivation import SpanDeriver @@ -98,9 +100,86 @@ def test_reasoning_opens_on_start_closes_on_done(): ] sigs = _signals(d, events) assert sigs[0] == OpenSpan(key="reasoning:0", kind="reasoning", name="reasoning", input={}) + # No deltas -> nothing to record, so output stays None (not an empty string). assert sigs[1] == CloseSpan(key="reasoning:0", output=None, is_complete=True) +def test_reasoning_content_deltas_recorded_as_output(): + """The chain-of-thought streamed via ReasoningContentDelta lands on the + reasoning span's output (previously dropped, leaving the span blank).""" + d = SpanDeriver() + events = [ + StreamTaskMessageStart( + type="start", index=0, content=ReasoningContent(type="reasoning", author="agent", summary=[], content=[]) + ), + StreamTaskMessageDelta( + type="delta", + index=0, + delta=ReasoningContentDelta(type="reasoning_content", content_index=0, content_delta="Let me "), + ), + StreamTaskMessageDelta( + type="delta", + index=0, + delta=ReasoningContentDelta(type="reasoning_content", content_index=0, content_delta="think."), + ), + StreamTaskMessageDone(type="done", index=0), + ] + sigs = _signals(d, events) + assert sigs[0] == OpenSpan(key="reasoning:0", kind="reasoning", name="reasoning", input={}) + assert sigs[1] == CloseSpan(key="reasoning:0", output="Let me think.", is_complete=True) + + +def test_reasoning_summary_deltas_recorded_as_output(): + """Reasoning-model summary deltas (o-series) also land on the span output.""" + d = SpanDeriver() + events = [ + StreamTaskMessageStart( + type="start", index=0, content=ReasoningContent(type="reasoning", author="agent", summary=[], content=[]) + ), + StreamTaskMessageDelta( + type="delta", + index=0, + delta=ReasoningSummaryDelta(type="reasoning_summary", summary_index=0, summary_delta="Summary text"), + ), + StreamTaskMessageDone(type="done", index=0), + ] + sigs = _signals(d, events) + assert sigs[1] == CloseSpan(key="reasoning:0", output="Summary text", is_complete=True) + + +def test_reasoning_text_seeded_from_start_content(): + """A non-streaming harness that carries the full thinking on the Start + content still records it as output even with no deltas.""" + d = SpanDeriver() + events = [ + StreamTaskMessageStart( + type="start", + index=0, + content=ReasoningContent(type="reasoning", author="agent", summary=[], content=["full thought"]), + ), + StreamTaskMessageDone(type="done", index=0), + ] + sigs = _signals(d, events) + assert sigs[1] == CloseSpan(key="reasoning:0", output="full thought", is_complete=True) + + +def test_reasoning_unclosed_flushes_with_text(): + """An unclosed reasoning span flushes incomplete but still carries its text.""" + d = SpanDeriver() + events = [ + StreamTaskMessageStart( + type="start", index=0, content=ReasoningContent(type="reasoning", author="agent", summary=[], content=[]) + ), + StreamTaskMessageDelta( + type="delta", + index=0, + delta=ReasoningContentDelta(type="reasoning_content", content_index=0, content_delta="partial"), + ), + ] + sigs = _signals(d, events) + assert sigs[-1] == CloseSpan(key="reasoning:0", output="partial", is_complete=False) + + def test_parallel_tools_pair_by_tool_call_id(): d = SpanDeriver() events = [ diff --git a/tests/lib/core/harness/test_tracer.py b/tests/lib/core/harness/test_tracer.py index b3d9002c4..9bd17b90c 100644 --- a/tests/lib/core/harness/test_tracer.py +++ b/tests/lib/core/harness/test_tracer.py @@ -15,7 +15,32 @@ async def test_open_then_close_starts_and_ends_span(): await tracer.handle(OpenSpan(key="call_1", kind="tool", name="Bash", input={"cmd": "ls"})) await tracer.handle(CloseSpan(key="call_1", output="files", is_complete=True)) assert fake.started == [("Bash", "p1", {"cmd": "ls"})] - assert fake.ended == [("Bash", "files")] + # A plain-string output is wrapped in a dict (SGP spans require an object). + assert fake.ended == [("Bash", {"output": "files"})] + + +@pytest.mark.asyncio +async def test_non_dict_payloads_are_wrapped_in_a_dict(): + """SGP spans reject scalar input/output with a 422; the tracer wraps any + non-dict payload so reasoning spans (string output) are not dropped.""" + fake = FakeTracing() + tracer = SpanTracer(trace_id="t1", parent_span_id="p1", tracing=fake) + await tracer.handle(OpenSpan(key="reasoning:0", kind="reasoning", name="reasoning", input={})) + await tracer.handle(CloseSpan(key="reasoning:0", output="chain of thought", is_complete=True)) + # Empty-dict input stays a dict; string output is wrapped. + assert fake.started == [("reasoning", "p1", {})] + assert fake.ended == [("reasoning", {"output": "chain of thought"})] + + +@pytest.mark.asyncio +async def test_dict_and_none_payloads_pass_through_unchanged(): + fake = FakeTracing() + tracer = SpanTracer(trace_id="t1", parent_span_id="p1", tracing=fake) + await tracer.handle(OpenSpan(key="c", kind="tool", name="T", input={"a": 1})) + await tracer.handle(CloseSpan(key="c", output={"result": "x"}, is_complete=True)) + await tracer.handle(OpenSpan(key="d", kind="tool", name="U", input={})) + await tracer.handle(CloseSpan(key="d", output=None, is_complete=False)) + assert fake.ended == [("T", {"result": "x"}), ("U", None)] @pytest.mark.asyncio diff --git a/tests/lib/core/harness/test_yield_delivery.py b/tests/lib/core/harness/test_yield_delivery.py index ef3861a16..21c93a95c 100644 --- a/tests/lib/core/harness/test_yield_delivery.py +++ b/tests/lib/core/harness/test_yield_delivery.py @@ -42,7 +42,8 @@ async def test_yield_passes_events_through_and_traces(): out = [e async for e in yield_events(_gen(events), tracer=tracer)] assert out == events # passthrough unchanged assert fake.started_names == ["Bash"] # span derived + opened - assert fake.ended_outputs == ["ok"] # span closed with response + # String tool output is wrapped in a dict (SGP spans require an object). + assert fake.ended_outputs == [{"output": "ok"}] # span closed with response @pytest.mark.asyncio