From 5b900a73fb473cc4cb5f649fae852f175c2f34b7 Mon Sep 17 00:00:00 2001 From: Stelquis <3420761503@qq.com> Date: Thu, 2 Jul 2026 17:51:48 +0800 Subject: [PATCH] =?UTF-8?q?feat(sessions):=20=E6=B7=BB=E5=8A=A0=E5=A4=9A?= =?UTF-8?q?=E5=90=8E=E7=AB=AF=E5=9B=9E=E6=94=BE=E4=B8=80=E8=87=B4=E6=80=A7?= =?UTF-8?q?=E6=B5=8B=E8=AF=95=E6=A1=86=E6=9E=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 实现 ReplayHarness 和 DiffEngine,覆盖 event/state/memory/summary 四维度跨后端一致性验证。包含 10 条 replay case (7 正常 + 3 注入) 和 29 个测试用例,支持 InMemory ↔ SQLite 轻量对比。 Fixes #89 RELEASE NOTES: 新增 Session/Memory 多后端回放一致性测试框架 --- docs/mkdocs/en/replay-consistency.md | 67 + docs/mkdocs/zh/replay-consistency.md | 67 + tests/sessions/conftest.py | 395 ++++++ tests/sessions/replay_cases/__init__.py | 0 .../replay_cases/case_01_single_turn.jsonl | 5 + .../replay_cases/case_02_multi_turn.jsonl | 9 + .../replay_cases/case_03_tool_call.jsonl | 7 + .../replay_cases/case_04_state_update.jsonl | 8 + .../replay_cases/case_05_memory_rw.jsonl | 9 + .../replay_cases/case_06_summary_gen.jsonl | 24 + .../case_07_summary_truncate.jsonl | 32 + .../case_08_exception_recovery.jsonl | 9 + .../case_09_injected_event_order.jsonl | 9 + .../case_10_injected_summary_session.jsonl | 26 + .../session_memory_summary_diff_report.json | 185 +++ tests/sessions/test_replay_consistency.py | 1183 +++++++++++++++++ 16 files changed, 2035 insertions(+) create mode 100644 docs/mkdocs/en/replay-consistency.md create mode 100644 docs/mkdocs/zh/replay-consistency.md create mode 100644 tests/sessions/conftest.py create mode 100644 tests/sessions/replay_cases/__init__.py create mode 100644 tests/sessions/replay_cases/case_01_single_turn.jsonl create mode 100644 tests/sessions/replay_cases/case_02_multi_turn.jsonl create mode 100644 tests/sessions/replay_cases/case_03_tool_call.jsonl create mode 100644 tests/sessions/replay_cases/case_04_state_update.jsonl create mode 100644 tests/sessions/replay_cases/case_05_memory_rw.jsonl create mode 100644 tests/sessions/replay_cases/case_06_summary_gen.jsonl create mode 100644 tests/sessions/replay_cases/case_07_summary_truncate.jsonl create mode 100644 tests/sessions/replay_cases/case_08_exception_recovery.jsonl create mode 100644 tests/sessions/replay_cases/case_09_injected_event_order.jsonl create mode 100644 tests/sessions/replay_cases/case_10_injected_summary_session.jsonl create mode 100644 tests/sessions/session_memory_summary_diff_report.json create mode 100644 tests/sessions/test_replay_consistency.py diff --git a/docs/mkdocs/en/replay-consistency.md b/docs/mkdocs/en/replay-consistency.md new file mode 100644 index 00000000..e87b2a40 --- /dev/null +++ b/docs/mkdocs/en/replay-consistency.md @@ -0,0 +1,67 @@ +# Replay Consistency Test Framework + +tRPC-Agent supports InMemory, SQL, and Redis backends for Session/Memory storage. In production, developers often prototype with InMemory and then switch to SQL or Redis. If different backends produce inconsistent event order, state, memory, or summary data for the same agent trajectory, it leads to replay errors, context loss, long-term memory corruption, and summary overwrite issues. + +This framework provides a set of standardized input trajectories to drive multiple backends, automatically generates diff reports, and pinpoints the field path and values of each inconsistency. It serves both as a testing tool and a quality benchmark for backend implementations. + +## Architecture + +Core components: + +- **ReplayCase / ReplayStep**: JSONL files defining standardized input trajectories +- **ReplayHarness**: Parses JSONL steps, drives two backends in parallel, collects raw results +- **DiffEngine**: Four-dimension comparison (events / state / memory / summary), produces DiffReport +- **Normalizer**: Truncates timestamps to second precision, reassigns stable IDs by content, excludes `is_final_response` + +Based on [tests/sessions/conftest.py](../../../tests/sessions/conftest.py) and [tests/sessions/test_replay_consistency.py](../../../tests/sessions/test_replay_consistency.py). + +## Replay Cases + +| # | Case Name | Type | Description | +|---|---|---|---| +| 1 | `single_turn` | Normal | Single user → agent exchange | +| 2 | `multi_turn` | Normal | 3 rounds of alternating conversation | +| 3 | `tool_call` | Normal | function_call + function_response | +| 4 | `state_update` | Normal | Multiple state_delta writes and overwrites | +| 5 | `memory_rw` | Normal | store_session + search_memory | +| 6 | `summary_gen` | Normal | 22-turn conversation triggering summary | +| 7 | `summary_truncate` | Known divergence | Two-layer validation: strict metadata + per-backend semantics | +| 8 | `exception_recovery` | Injected | inject_skip_append to simulate write failure | +| 9 | `injected_event_order` | Injected | inject_reorder_events to swap events | +| 10 | `injected_summary_session` | Injected | inject_summary_session_id to alter summary ownership | + +## Normalization Strategy + +Before cross-backend comparison, non-business differences are removed: + +| Field | Treatment | +|-------|-----------| +| event.timestamp | Truncate to second precision (int) | +| event.id | Reassign stable ID sorted by content | +| state_delta | Unify JSON key ordering | +| is_final_response | Excluded (computed property differs across serialization paths) | + +Three categories of differences are explicitly allowed and written to allowed_diff: + +1. Backend-generated `invocation_id` +2. Backend-specific `save_key` format differences +3. Event count differences after summary compression (InMemory stores compressed events in memory; SQL get_session re-reads all raw events from the event table) + +## Summary Comparison Strategy + +The comparison operates in two layers: + +1. **Summary metadata**: `session_id`, `summary_text`, `original_event_count`, `compressed_event_count` must be strictly consistent across backends — this is the core requirement for replay correctness +2. **Per-backend independent validation**: summary text is non-empty, compression has taken effect (compressed < original), and new events appended after compression are preserved + +The exact boundary between summary text and retained events is allowed to differ due to backend storage model differences. + +## Backend Access + +| Mode | Backend A | Backend B | Trigger | +|------|-----------|-----------|---------| +| Lightweight (default) | InMemorySessionService | SqlSessionService(SQLite) | Always | +| SQL integration | InMemorySessionService | SqlSessionService(MySQL) | TEST_MYSQL_URL | +| Redis integration | InMemorySessionService | RedisSessionService | TEST_REDIS_URL | + +All three backends conform to the `SessionServiceABC` interface; adding a new backend only requires implementing that interface. diff --git a/docs/mkdocs/zh/replay-consistency.md b/docs/mkdocs/zh/replay-consistency.md new file mode 100644 index 00000000..bdfa0469 --- /dev/null +++ b/docs/mkdocs/zh/replay-consistency.md @@ -0,0 +1,67 @@ +# 回放一致性测试框架 + +tRPC-Agent 支持 InMemory、SQL、Redis 三种 Session/Memory 后端。生产环境常先用 InMemory 开发,再切换到 SQL 或 Redis。不同后端在同一条 Agent 轨迹下保存的事件顺序、state、memory 或 summary 不一致时,会导致回放错乱、上下文丢失、长期记忆污染、摘要覆盖错误等问题。 + +该框架提供一组标准化输入轨迹驱动多个后端,自动生成差异报告,定位不一致的字段路径和具体值。它既是测试工具,也是后端实现质量的基准。 + +## 架构 + +核心组件: + +- **ReplayCase / ReplayStep**:JSONL 文件定义标准输入轨迹 +- **ReplayHarness**:解析 JSONL 步骤,并行驱动两个后端执行,收集原始结果 +- **DiffEngine**:四维度比较(events / state / memory / summary),产出 DiffReport +- **Normalizer**:时间戳截断到秒级、ID 按内容重赋、`is_final_response` 排除 + +基于 [tests/sessions/conftest.py](../../../tests/sessions/conftest.py) 和 [tests/sessions/test_replay_consistency.py](../../../tests/sessions/test_replay_consistency.py) 实现。 + +## Replay Case + +| # | Case 名称 | 类型 | 说明 | +|---|---|---|---| +| 1 | `single_turn` | 正常 | 单轮 user → agent 对话 | +| 2 | `multi_turn` | 正常 | 3 轮交替对话 | +| 3 | `tool_call` | 正常 | function_call + function_response | +| 4 | `state_update` | 正常 | 多次 state_delta 写入覆盖 | +| 5 | `memory_rw` | 正常 | store_session + search_memory | +| 6 | `summary_gen` | 正常 | 22 轮对话触发摘要 | +| 7 | `summary_truncate` | 已知不一致 | 两层验证:元数据严格 + 单端语义 | +| 8 | `exception_recovery` | 注入 | inject_skip_append 模拟写入失败 | +| 9 | `injected_event_order` | 注入 | inject_reorder_events 交换事件 | +| 10 | `injected_summary_session` | 注入 | inject_summary_session_id 篡改归属 | + +## 归一化策略 + +跨后端比较前需去除非业务差异: + +| 字段 | 处理方式 | +|------|---------| +| event.timestamp | 截断到秒级精度(int) | +| event.id | 按内容排序后重赋稳定 ID | +| state_delta | 统一 JSON key 排序 | +| is_final_response | 排除(computed property,序列化路径不同) | + +三类差异明确允许,写入 allowed_diff: + +1. 后端自动生成的 `invocation_id` +2. 不同后端的 `save_key` 格式差异 +3. Summary 压缩后事件总数的差异(InMemory 在内存中压缩事件列表,SQL 的 get_session 从事件表重新读取全部原始事件) + +## Summary 比较策略 + +分两层: + +1. **摘要元数据**:`session_id`、`summary_text`、`original_event_count`、`compressed_event_count` 跨后端严格一致——这是回放正确性的核心 +2. **单后端独立验证**:摘要文本非空、压缩已生效(compressed < original)、压缩后追加的新事件已保留 + +摘要文本与事件列表的精确分界允许因后端存储模型不同而异。 + +## 后端接入 + +| 模式 | 后端 A | 后端 B | 触发条件 | +|------|--------|--------|----------| +| 轻量模式(默认) | InMemorySessionService | SqlSessionService(SQLite) | 无条件 | +| SQL 集成模式 | InMemorySessionService | SqlSessionService(MySQL) | TEST_MYSQL_URL | +| Redis 集成模式 | InMemorySessionService | RedisSessionService | TEST_REDIS_URL | + +三个后端的 `SessionServiceABC` 接口一致,新增后端只需实现该接口即可接入框架。 diff --git a/tests/sessions/conftest.py b/tests/sessions/conftest.py new file mode 100644 index 00000000..99b178d0 --- /dev/null +++ b/tests/sessions/conftest.py @@ -0,0 +1,395 @@ +# Tencent is pleased to support the open source community by making tRPC-Agent-Python available. +# +# Copyright (C) 2026 Tencent. All rights reserved. +# +# tRPC-Agent-Python is licensed under Apache-2.0. +"""Shared fixtures and utilities for Session/Memory replay consistency tests. + +This module provides: +- ReplayCase / ReplayStep: data models describing a replay scenario trajectory +- load_replay_case(): loads a replay scenario from a JSONL file +- make_inmemory_service() / make_sqlite_service(): backend factory functions +- normalize_*(): normalization helpers for cross-backend comparison +- backend_pair: pytest fixture yielding (InMemory, SQLite) in lightweight mode +""" + +from __future__ import annotations + +import json +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Dict, List, Optional + +import pytest + +from trpc_agent_sdk.abc import MemoryServiceConfig, SearchMemoryResponse +from trpc_agent_sdk.events import Event +from trpc_agent_sdk.memory import InMemoryMemoryService +from trpc_agent_sdk.sessions import ( + InMemorySessionService, + Session, + SessionServiceConfig, + SessionSummarizer, + SessionSummary, + SqlSessionService, + SummarizerSessionManager, +) +from trpc_agent_sdk.models import LlmRequest +from trpc_agent_sdk.types import Content, Part + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +REPLAY_CASES_DIR = Path(__file__).parent / "replay_cases" + +# Default test config: no TTL, no event count limit +_DEFAULT_SESSION_CONFIG = SessionServiceConfig() +_DEFAULT_SESSION_CONFIG.clean_ttl_config() + +# --------------------------------------------------------------------------- +# Data Models +# --------------------------------------------------------------------------- + + +@dataclass +class ReplayStep: + """A single step in a replay case trajectory. + + Fields: + action: The operation to perform, e.g. ``create_session``, + ``append_event``, ``get_session``, ``update_state``, + ``store_memory``, ``search_memory``, ``create_session_summary``, + ``get_session_summary``. + kwargs: Keyword arguments passed to the backend method. + expected: Optional dict of expected outcomes for verification. + expected_events: Optional int, shorthand for expected event count. + inject: Optional dict describing an injected inconsistency + (only used in injected cases). + """ + + action: str = "" + kwargs: Dict[str, Any] = field(default_factory=dict) + expected: Optional[Dict[str, Any]] = None + expected_events: Optional[int] = None + inject: Optional[Dict[str, Any]] = None + + +@dataclass +class ReplayCase: + """A complete replay scenario covering one or more session operations. + + Fields: + name: Unique case identifier, e.g. ``case_01_single_turn``. + description: Human-readable description of the scenario. + steps: Ordered list of ReplayStep instances to drive the backends. + """ + + name: str = "" + description: str = "" + steps: List[ReplayStep] = field(default_factory=list) + + +def load_replay_case(name: str) -> ReplayCase: + """Load a replay case from a JSONL file in ``replay_cases/``. + + The JSONL format: + - Line 1: metadata dict with ``name`` and ``description`` keys. + - Lines 2+: one JSON object per line, each describing a ReplayStep. + + Args: + name: Case name (without ``.jsonl`` suffix), e.g. ``case_01_single_turn``. + + Returns: + A fully populated ReplayCase instance. + """ + path = REPLAY_CASES_DIR / f"{name}.jsonl" + with open(path, "r", encoding="utf-8") as f: + lines = [line.strip() for line in f if line.strip() and not line.strip().startswith("#")] + + if not lines: + raise ValueError(f"Empty replay case file: {path}") + + meta = json.loads(lines[0]) + steps = [ReplayStep(**json.loads(line)) for line in lines[1:]] + return ReplayCase(name=meta["name"], description=meta.get("description", ""), steps=steps) + + +def list_replay_cases() -> List[str]: + """Return the list of available replay case names (without suffix).""" + return sorted(p.stem for p in REPLAY_CASES_DIR.glob("case_*.jsonl")) + + +# --------------------------------------------------------------------------- +# Backend Factory +# --------------------------------------------------------------------------- + + +async def make_inmemory_service( + summarizer_manager: Optional[SummarizerSessionManager] = None, + session_config: Optional[SessionServiceConfig] = None, +) -> InMemorySessionService: + """Create a configured InMemorySessionService. + + Args: + summarizer_manager: Optional summarizer manager for summary tests. + session_config: Optional session config; defaults to no-TTL config. + + Returns: + Initialized InMemorySessionService instance. + """ + if session_config is None: + session_config = _DEFAULT_SESSION_CONFIG + return InMemorySessionService( + summarizer_manager=summarizer_manager, + session_config=session_config, + ) + + +async def make_sqlite_service( + summarizer_manager: Optional[SummarizerSessionManager] = None, + session_config: Optional[SessionServiceConfig] = None, +) -> SqlSessionService: + """Create a configured SqlSessionService backed by an in-memory SQLite database. + + Args: + summarizer_manager: Optional summarizer manager for summary tests. + session_config: Optional session config; defaults to no-TTL config. + + Returns: + Initialized SqlSessionService instance. + """ + if session_config is None: + session_config = _DEFAULT_SESSION_CONFIG + # Default to store historical events for persistent backends (aligns with + # the production SqlSessionService default behaviour). + session_config.store_historical_events = True + svc = SqlSessionService( + db_url="sqlite:///:memory:", + summarizer_manager=summarizer_manager, + session_config=session_config, + is_async=False, + ) + await svc._sql_storage.create_sql_engine() + return svc + + +# --------------------------------------------------------------------------- +# Normalizers +# --------------------------------------------------------------------------- + + +def normalize_timestamp(ts: float) -> int: + """Truncate a float timestamp to second precision for comparison.""" + return int(ts) + + +def normalize_event_for_compare(event: Event) -> Dict[str, Any]: + """Convert an Event to a normalized dict, stripping runtime-only fields. + + Removes: ``id``, ``invocation_id``, ``timestamp``, ``last_update_time``, + ``request_id``, ``parent_invocation_id``, ``is_final_response`` + (computed property, differs across backends due to serialization). + + Preserves: ``author``, ``content`` (with all parts), ``partial``, + ``branch``, ``tag``, ``actions.state_delta``. + """ + # Start with all fields at their Python values + state_delta = dict(event.actions.state_delta) if event.actions and event.actions.state_delta else {} + # Determine event type for easier debugging + parts_info = [] + if event.content and event.content.parts: + for p in event.content.parts: + if p.text: + parts_info.append({"type": "text", "text": p.text}) + elif p.function_call: + parts_info.append({ + "type": "function_call", + "name": p.function_call.name, + "args": p.function_call.args, + }) + elif p.function_response: + parts_info.append({ + "type": "function_response", + "name": p.function_response.name, + "response": p.function_response.response, + }) + elif p.executable_code: + parts_info.append({"type": "executable_code"}) + elif p.code_execution_result: + parts_info.append({"type": "code_execution_result"}) + + normalized = { + "author": event.author, + "partial": event.partial, + "branch": event.branch, + "parts": parts_info, + "state_delta": state_delta, + } + return normalized + + +def normalize_session_for_compare(session: Session) -> Dict[str, Any]: + """Normalize a Session by retaining only comparable fields. + + Strips: ``events[].id``, all timestamps, ``save_key``. + Normalizes each event via :func:`normalize_event_for_compare`. + """ + events_norm = [normalize_event_for_compare(evt) for evt in (session.events or [])] + return { + "id": session.id, + "app_name": session.app_name, + "user_id": session.user_id, + "state": dict(session.state or {}), + "events": events_norm, + "conversation_count": session.conversation_count, + } + + +def normalize_summary_for_compare(summary: Optional[SessionSummary]) -> Optional[Dict[str, Any]]: + """Normalize a SessionSummary for comparison. + + Strips: ``summary_timestamp`` (backend-dependent). + Preserves: ``session_id``, ``summary_text``, ``original_event_count``, + ``compressed_event_count``. + """ + if summary is None: + return None + return { + "session_id": summary.session_id, + "summary_text": summary.summary_text.strip() if summary.summary_text else "", + "original_event_count": summary.original_event_count, + "compressed_event_count": summary.compressed_event_count, + } + + +def normalize_memory_response(response: SearchMemoryResponse) -> List[Dict[str, Any]]: + """Normalize a memory search response for comparison. + + Strips: ``event.id``, ``event.timestamp``, ``score`` (backend-specific). + Preserves: ``event.author``, ``event.content.parts[].text`` and + meaningful fields. + """ + results = [] + for mem in (response.memories or []): + if hasattr(mem, "event") and mem.event: + results.append(normalize_event_for_compare(mem.event)) + return results + + +# --------------------------------------------------------------------------- +# Pytest Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture +async def inmemory_service(): + """Fixture providing a clean InMemorySessionService.""" + svc = await make_inmemory_service() + yield svc + await svc.close() + + +@pytest.fixture +async def sqlite_service(): + """Fixture providing a clean SqlSessionService backed by in-memory SQLite.""" + svc = await make_sqlite_service() + yield svc + await svc.close() + + +@pytest.fixture +async def backend_pair(): + """Lightweight-mode fixture: yield ``(inmemory, sqlite)`` backend pair. + + Both services are freshly created with default (no-TTL) config. + """ + inmem = await make_inmemory_service() + sqlite = await make_sqlite_service() + yield inmem, sqlite + await sqlite.close() + await inmem.close() + + +# --------------------------------------------------------------------------- +# Mock model for summary tests +# --------------------------------------------------------------------------- + + +class _MockSummaryResponse: + """A single mock LLM response chunk for summarization.""" + + def __init__(self, text: str): + self.content = Content(parts=[Part.from_text(text=text)], role="assistant") + + +class _MockSummaryModel: + """A mock LLM model that returns a fixed summary text.""" + name = "test-summarizer-mock" + + async def generate_async(self, request: LlmRequest, stream: bool = False, ctx=None): + """Yield a single mock response with predetermined summary text.""" + yield _MockSummaryResponse("Mock session summary for testing replay consistency.") + + +def make_mock_summarizer_manager() -> SummarizerSessionManager: + """Create a SummarizerSessionManager backed by a mock model. + + The mock model returns a fixed summary text, making summary output + deterministic and comparable across backends. + """ + model = _MockSummaryModel() + summarizer = SessionSummarizer(model=model) + return SummarizerSessionManager(model=model, summarizer=summarizer, auto_summarize=True) + + +# --------------------------------------------------------------------------- +# Full backend pair with memory +# --------------------------------------------------------------------------- + + +@pytest.fixture +async def full_backend_pair(): + """Fixture yielding two complete backend tuples for cross-backend comparison. + + Each tuple is ``(session_service, memory_service)``. + + Backend A: InMemorySessionService + InMemoryMemoryService + Backend B: SqlSessionService + InMemoryMemoryService (same memory impl, + different session backend) + """ + # Backend A + inmem_session = await make_inmemory_service() + inmem_memory = InMemoryMemoryService(MemoryServiceConfig(enabled=True)) + + # Backend B + sqlite_session = await make_sqlite_service() + sqlite_memory = InMemoryMemoryService(MemoryServiceConfig(enabled=True)) + + yield (inmem_session, inmem_memory), (sqlite_session, sqlite_memory) + + await sqlite_session.close() + await inmem_session.close() + + +@pytest.fixture +async def full_backend_pair_with_summary(): + """Like ``full_backend_pair`` but with a mock summarizer attached. + + Each backend gets its own SummarizerSessionManager instance so that + inject operations targeting only one backend's summary cache work + correctly. + """ + mgr_a = make_mock_summarizer_manager() + mgr_b = make_mock_summarizer_manager() + + inmem_session = await make_inmemory_service(summarizer_manager=mgr_a) + inmem_memory = InMemoryMemoryService(MemoryServiceConfig(enabled=True)) + + sqlite_session = await make_sqlite_service(summarizer_manager=mgr_b) + sqlite_memory = InMemoryMemoryService(MemoryServiceConfig(enabled=True)) + + yield (inmem_session, inmem_memory), (sqlite_session, sqlite_memory) + + await sqlite_session.close() + await inmem_session.close() diff --git a/tests/sessions/replay_cases/__init__.py b/tests/sessions/replay_cases/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/sessions/replay_cases/case_01_single_turn.jsonl b/tests/sessions/replay_cases/case_01_single_turn.jsonl new file mode 100644 index 00000000..12eabce6 --- /dev/null +++ b/tests/sessions/replay_cases/case_01_single_turn.jsonl @@ -0,0 +1,5 @@ +{"name": "case_01_single_turn", "description": "Single-turn conversation: user message followed by agent text response"} +{"action": "create_session", "kwargs": {"app_name": "test_app", "user_id": "test_user", "session_id": "session_01"}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "text", "text": "Hello"}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "agent", "parts": [{"type": "text", "text": "Hi there! How can I help you today?"}], "state_delta": {}}} +{"action": "get_session", "expected": {"events_count": 2}, "expected_events": 2} diff --git a/tests/sessions/replay_cases/case_02_multi_turn.jsonl b/tests/sessions/replay_cases/case_02_multi_turn.jsonl new file mode 100644 index 00000000..670bfae7 --- /dev/null +++ b/tests/sessions/replay_cases/case_02_multi_turn.jsonl @@ -0,0 +1,9 @@ +{"name": "case_02_multi_turn", "description": "Multi-turn conversation: 3 rounds of user/agent exchange"} +{"action": "create_session", "kwargs": {"app_name": "test_app", "user_id": "test_user", "session_id": "session_02"}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "text", "text": "What is the weather like?"}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "agent", "parts": [{"type": "text", "text": "I can check that for you. Where are you located?"}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "text", "text": "I am in Beijing"}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "agent", "parts": [{"type": "text", "text": "The weather in Beijing is sunny, 25 degrees Celsius."}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "text", "text": "Thanks! What about tomorrow?"}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "agent", "parts": [{"type": "text", "text": "Tomorrow will be cloudy with a chance of rain, around 22 degrees."}], "state_delta": {}}} +{"action": "get_session", "expected": {"events_count": 6}, "expected_events": 6} diff --git a/tests/sessions/replay_cases/case_03_tool_call.jsonl b/tests/sessions/replay_cases/case_03_tool_call.jsonl new file mode 100644 index 00000000..12731d86 --- /dev/null +++ b/tests/sessions/replay_cases/case_03_tool_call.jsonl @@ -0,0 +1,7 @@ +{"name": "case_03_tool_call", "description": "Tool call conversation: user request triggers function_call and function_response"} +{"action": "create_session", "kwargs": {"app_name": "test_app", "user_id": "test_user", "session_id": "session_03"}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "text", "text": "What is the weather in Shanghai?"}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "agent", "parts": [{"type": "function_call", "name": "get_weather", "args": {"location": "Shanghai"}}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "function_response", "name": "get_weather", "response": {"temperature": 28, "condition": "cloudy"}}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "agent", "parts": [{"type": "text", "text": "The weather in Shanghai is currently cloudy with a temperature of 28 degrees Celsius."}], "state_delta": {}}} +{"action": "get_session", "expected": {"events_count": 4}, "expected_events": 4} diff --git a/tests/sessions/replay_cases/case_04_state_update.jsonl b/tests/sessions/replay_cases/case_04_state_update.jsonl new file mode 100644 index 00000000..e7d457fc --- /dev/null +++ b/tests/sessions/replay_cases/case_04_state_update.jsonl @@ -0,0 +1,8 @@ +{"name": "case_04_state_update", "description": "State update: multiple state_delta writes and overwrites within a single session"} +{"action": "create_session", "kwargs": {"app_name": "test_app", "user_id": "test_user", "session_id": "session_04"}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "text", "text": "My name is Alice"}], "state_delta": {"user_name": "Alice"}}} +{"action": "append_event", "kwargs": {"author": "agent", "parts": [{"type": "text", "text": "Nice to meet you, Alice!"}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "text", "text": "I like programming in Python"}], "state_delta": {"language": "Python"}}} +{"action": "append_event", "kwargs": {"author": "agent", "parts": [{"type": "text", "text": "Python is a great language!"}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "text", "text": "Actually, I also use Rust"}], "state_delta": {"language": "Rust, Python"}}} +{"action": "get_session", "expected": {"events_count": 5, "state": {"user_name": "Alice", "language": "Rust, Python"}}} diff --git a/tests/sessions/replay_cases/case_05_memory_rw.jsonl b/tests/sessions/replay_cases/case_05_memory_rw.jsonl new file mode 100644 index 00000000..3977ab20 --- /dev/null +++ b/tests/sessions/replay_cases/case_05_memory_rw.jsonl @@ -0,0 +1,9 @@ +{"name": "case_05_memory_rw", "description": "Memory read/write: store session with user preferences, then search and verify recall"} +{"action": "create_session", "kwargs": {"app_name": "test_app", "user_id": "test_user", "session_id": "session_05", "initial_state": {"user_name": "Alice"}}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "text", "text": "I love cats and have two Persian cats at home"}], "state_delta": {"pets": "cats"}}} +{"action": "append_event", "kwargs": {"author": "agent", "parts": [{"type": "text", "text": "That is wonderful! Persian cats are beautiful."}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "text", "text": "My favorite food is sushi"}], "state_delta": {"favorite_food": "sushi"}}} +{"action": "get_session", "expected_events": 4} +{"action": "store_memory", "kwargs": {"save_key": "test_app/test_user", "session_id": "session_05"}} +{"action": "search_memory", "kwargs": {"key": "test_app/test_user", "query": "cat", "expected_count": 1}} +{"action": "search_memory", "kwargs": {"key": "test_app/test_user", "query": "sushi", "expected_count": 1}} diff --git a/tests/sessions/replay_cases/case_06_summary_gen.jsonl b/tests/sessions/replay_cases/case_06_summary_gen.jsonl new file mode 100644 index 00000000..e9357aff --- /dev/null +++ b/tests/sessions/replay_cases/case_06_summary_gen.jsonl @@ -0,0 +1,24 @@ +{"name": "case_06_summary_gen", "description": "Summary generation: long conversation triggers summarization; verify summary content, version, timestamp, and session association"} +{"action": "create_session", "kwargs": {"app_name": "test_app", "user_id": "test_user", "session_id": "session_06"}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "text", "text": "I want to plan a trip to Japan"}], "state_delta": {"topic": "japan_trip"}}} +{"action": "append_event", "kwargs": {"author": "agent", "parts": [{"type": "text", "text": "Great choice! When are you planning to go?"}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "text", "text": "Next spring, around March"}], "state_delta": {"travel_month": "March"}}} +{"action": "append_event", "kwargs": {"author": "agent", "parts": [{"type": "text", "text": "March is cherry blossom season in Japan!"}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "text", "text": "I want to visit Tokyo, Kyoto, and Osaka"}], "state_delta": {"cities": "Tokyo, Kyoto, Osaka"}}} +{"action": "append_event", "kwargs": {"author": "agent", "parts": [{"type": "text", "text": "That is a classic itinerary. How many days do you have?"}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "text", "text": "About 10 days"}], "state_delta": {"travel_days": 10}}} +{"action": "append_event", "kwargs": {"author": "agent", "parts": [{"type": "text", "text": "I recommend 4 days in Tokyo, 3 in Kyoto, and 3 in Osaka."}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "text", "text": "What are the must-see attractions in Tokyo?"}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "agent", "parts": [{"type": "text", "text": "Sensoji Temple, Shibuya Crossing, Meiji Shrine, and Akihabara."}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "text", "text": "And in Kyoto?"}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "agent", "parts": [{"type": "text", "text": "Fushimi Inari Shrine, Kinkakuji, Arashiyama Bamboo Grove, and Kiyomizudera."}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "text", "text": "What about Osaka?"}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "agent", "parts": [{"type": "text", "text": "Osaka Castle, Dotonbori, Universal Studios Japan, and the Aquarium."}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "text", "text": "What is the best way to travel between these cities?"}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "agent", "parts": [{"type": "text", "text": "The Shinkansen (bullet train) is the best option. Tokyo to Kyoto is about 2.5 hours."}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "text", "text": "Do I need a JR Pass?"}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "agent", "parts": [{"type": "text", "text": "A 7-day JR Pass would cover your inter-city travel and save money."}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "text", "text": "What about accommodation recommendations?"}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "agent", "parts": [{"type": "text", "text": "Consider staying in Shinjuku for Tokyo, near Kyoto Station for Kyoto, and Namba for Osaka."}], "state_delta": {}}} +{"action": "create_session_summary", "kwargs": {"force": true}} +{"action": "get_session_summary", "expected": {"has_summary": true}} diff --git a/tests/sessions/replay_cases/case_07_summary_truncate.jsonl b/tests/sessions/replay_cases/case_07_summary_truncate.jsonl new file mode 100644 index 00000000..0cdf2d2a --- /dev/null +++ b/tests/sessions/replay_cases/case_07_summary_truncate.jsonl @@ -0,0 +1,32 @@ +{"name": "case_07_summary_truncate", "description": "Summary truncation: long conversation is summarized, then new events are appended; verify that summary + retained events + new events together reconstruct context"} +{"action": "create_session", "kwargs": {"app_name": "test_app", "user_id": "test_user", "session_id": "session_07"}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "text", "text": "Help me plan a trip to Japan"}], "state_delta": {"topic": "japan_trip"}}} +{"action": "append_event", "kwargs": {"author": "agent", "parts": [{"type": "text", "text": "I would be happy to help! When do you plan to travel?"}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "text", "text": "March next year"}], "state_delta": {"travel_month": "March"}}} +{"action": "append_event", "kwargs": {"author": "agent", "parts": [{"type": "text", "text": "March is cherry blossom season, a wonderful time to visit!"}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "text", "text": "I want to visit Tokyo, Kyoto, and Osaka"}], "state_delta": {"cities": "Tokyo, Kyoto, Osaka"}}} +{"action": "append_event", "kwargs": {"author": "agent", "parts": [{"type": "text", "text": "Those three cities are the classic Japan itinerary. How many days?"}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "text", "text": "10 days"}], "state_delta": {"days": 10}}} +{"action": "append_event", "kwargs": {"author": "agent", "parts": [{"type": "text", "text": "I suggest 4 days in Tokyo, 3 in Kyoto, and 3 in Osaka."}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "text", "text": "What to see in Tokyo?"}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "agent", "parts": [{"type": "text", "text": "Sensoji, Shibuya, Meiji Shrine, Akihabara."}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "text", "text": "What to see in Kyoto?"}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "agent", "parts": [{"type": "text", "text": "Fushimi Inari, Kinkakuji, Arashiyama, Kiyomizudera."}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "text", "text": "What to see in Osaka?"}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "agent", "parts": [{"type": "text", "text": "Osaka Castle, Dotonbori, Universal Studios."}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "text", "text": "How to travel between cities?"}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "agent", "parts": [{"type": "text", "text": "Shinkansen bullet train, about 2.5 hours Tokyo to Kyoto."}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "text", "text": "What about accommodation?"}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "agent", "parts": [{"type": "text", "text": "Shinjuku in Tokyo, near Kyoto Station, and Namba in Osaka."}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "text", "text": "Do I need a JR Pass?"}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "agent", "parts": [{"type": "text", "text": "A 7-day JR Pass will save you money on inter-city travel."}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "text", "text": "What is the budget for this trip?"}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "agent", "parts": [{"type": "text", "text": "A moderate budget would be around $3000-4000 per person including flights."}], "state_delta": {}}} +{"action": "create_session_summary", "kwargs": {"force": true}} +{"action": "get_session_summary", "expected": {"has_summary": true}} +{"action": "get_session", "expected_events": 0} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "text", "text": "Is March a good time to see cherry blossoms?"}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "agent", "parts": [{"type": "text", "text": "Yes, late March to early April is peak cherry blossom season in Tokyo and Kyoto."}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "text", "text": "Any specific cherry blossom spots you recommend?"}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "agent", "parts": [{"type": "text", "text": "Ueno Park in Tokyo, Philosopher's Path in Kyoto, and Osaka Castle Park."}], "state_delta": {}}} +{"action": "get_session", "expected_events": 0} diff --git a/tests/sessions/replay_cases/case_08_exception_recovery.jsonl b/tests/sessions/replay_cases/case_08_exception_recovery.jsonl new file mode 100644 index 00000000..50a183a0 --- /dev/null +++ b/tests/sessions/replay_cases/case_08_exception_recovery.jsonl @@ -0,0 +1,9 @@ +{"name": "case_08_exception_recovery", "description": "Exception recovery: inject a skipped write on the SQL backend to simulate mid-write failure; verify cross-backend detection"} +{"action": "create_session", "kwargs": {"app_name": "test_app", "user_id": "test_user", "session_id": "session_08"}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "text", "text": "Book a flight to Tokyo"}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "agent", "parts": [{"type": "function_call", "name": "search_flights", "args": {"destination": "Tokyo", "date": "2026-03-15"}}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "function_response", "name": "search_flights", "response": {"flights": ["CA123", "CA456"], "price": 850}}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "agent", "parts": [{"type": "text", "text": "I found flights to Tokyo starting from $850."}], "state_delta": {"booking_status": "searching"}}} +# inject: skip this append on SQL backend (simulate write failure) +{"action": "inject_skip_append", "kwargs": {}} +{"action": "get_session", "expected": {"events_count": 4}} diff --git a/tests/sessions/replay_cases/case_09_injected_event_order.jsonl b/tests/sessions/replay_cases/case_09_injected_event_order.jsonl new file mode 100644 index 00000000..eb92ea21 --- /dev/null +++ b/tests/sessions/replay_cases/case_09_injected_event_order.jsonl @@ -0,0 +1,9 @@ +{"name": "case_09_injected_event_order", "description": "Injected inconsistency: normal multi-turn trajectory with deliberately swapped event order to test detection"} +{"action": "create_session", "kwargs": {"app_name": "test_app", "user_id": "test_user", "session_id": "session_09"}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "text", "text": "What is the capital of France?"}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "agent", "parts": [{"type": "text", "text": "The capital of France is Paris."}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "text", "text": "What is the population?"}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "agent", "parts": [{"type": "text", "text": "Paris has a population of about 2.1 million."}], "state_delta": {}}} +{"action": "get_session", "expected": {"events_count": 4}} +# inject: reorder events[1] and events[2] in the backend being tested +{"action": "inject_reorder_events", "kwargs": {"indices": [1, 2]}} diff --git a/tests/sessions/replay_cases/case_10_injected_summary_session.jsonl b/tests/sessions/replay_cases/case_10_injected_summary_session.jsonl new file mode 100644 index 00000000..c7b76f4e --- /dev/null +++ b/tests/sessions/replay_cases/case_10_injected_summary_session.jsonl @@ -0,0 +1,26 @@ +{"name": "case_10_injected_summary_session", "description": "Injected inconsistency: summary session_id is set to a wrong value on the SQL backend; verify that DiffEngine detects the ownership mismatch"} +{"action": "create_session", "kwargs": {"app_name": "test_app", "user_id": "test_user", "session_id": "session_10"}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "text", "text": "I need travel advice for Europe"}], "state_delta": {"topic": "europe_travel"}}} +{"action": "append_event", "kwargs": {"author": "agent", "parts": [{"type": "text", "text": "Where in Europe would you like to go?"}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "text", "text": "France, Italy, and Spain"}], "state_delta": {"countries": "France, Italy, Spain"}}} +{"action": "append_event", "kwargs": {"author": "agent", "parts": [{"type": "text", "text": "Great choices! How many days total?"}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "text", "text": "Two weeks"}], "state_delta": {"days": 14}}} +{"action": "append_event", "kwargs": {"author": "agent", "parts": [{"type": "text", "text": "I suggest 5 days in France, 5 in Italy, and 4 in Spain."}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "text", "text": "Best cities in France?"}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "agent", "parts": [{"type": "text", "text": "Paris, Lyon, and Marseille are all wonderful."}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "text", "text": "And in Italy?"}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "agent", "parts": [{"type": "text", "text": "Rome, Florence, Venice, and Milan are must-visits."}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "text", "text": "And Spain?"}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "agent", "parts": [{"type": "text", "text": "Barcelona, Madrid, Seville, and Valencia."}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "text", "text": "Transport between countries?"}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "agent", "parts": [{"type": "text", "text": "The high-speed train (TGV, Frecciarossa, AVE) is excellent for cross-border travel."}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "text", "text": "Estimated budget?"}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "agent", "parts": [{"type": "text", "text": "A moderate budget would be about $4000-5000 per person including accommodation and transport."}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "text", "text": "Best time to visit?"}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "agent", "parts": [{"type": "text", "text": "Late spring (May-June) or early autumn (September-October) for pleasant weather and fewer crowds."}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "user", "parts": [{"type": "text", "text": "Do I need visas?"}], "state_delta": {}}} +{"action": "append_event", "kwargs": {"author": "agent", "parts": [{"type": "text", "text": "The Schengen visa covers all three countries for most visitors."}], "state_delta": {}}} +# Step order matters: create_summary on both, then inject on B only, then read on both +{"action": "create_session_summary", "kwargs": {"force": true}} +{"action": "inject_summary_session_id", "kwargs": {"wrong_session_id": "session_99"}} +{"action": "get_session_summary", "expected": {"has_summary": true}} diff --git a/tests/sessions/session_memory_summary_diff_report.json b/tests/sessions/session_memory_summary_diff_report.json new file mode 100644 index 00000000..6516738f --- /dev/null +++ b/tests/sessions/session_memory_summary_diff_report.json @@ -0,0 +1,185 @@ +{ + "generated_at": 1782985748.3901637, + "total_cases": 10, + "cases_passed": 6, + "backends": { + "a": "InMemorySessionService", + "b": "SqlSessionService (SQLite :memory:)" + }, + "reports": [ + { + "case_name": "case_01_single_turn", + "passed": true, + "backend_a": "InMemory", + "backend_b": "Sql", + "inconsistencies": [], + "allowed_diffs": [] + }, + { + "case_name": "case_02_multi_turn", + "passed": true, + "backend_a": "InMemory", + "backend_b": "Sql", + "inconsistencies": [], + "allowed_diffs": [] + }, + { + "case_name": "case_03_tool_call", + "passed": true, + "backend_a": "InMemory", + "backend_b": "Sql", + "inconsistencies": [], + "allowed_diffs": [] + }, + { + "case_name": "case_04_state_update", + "passed": true, + "backend_a": "InMemory", + "backend_b": "Sql", + "inconsistencies": [], + "allowed_diffs": [] + }, + { + "case_name": "case_05_memory_rw", + "passed": true, + "backend_a": "InMemory", + "backend_b": "Sql", + "inconsistencies": [], + "allowed_diffs": [] + }, + { + "case_name": "case_06_summary_gen", + "passed": true, + "backend_a": "InMemory", + "backend_b": "Sql", + "inconsistencies": [], + "allowed_diffs": [] + }, + { + "case_name": "case_07_summary_truncate", + "passed": false, + "backend_a": "InMemory", + "backend_b": "Sql", + "inconsistencies": [ + { + "category": "event", + "session_id": "session_07", + "field_path": "events[1].parts[0].text", + "value_a": "What to see in Osaka?", + "value_b": "Is March a good time to see cherry blossoms?", + "type": "inconsistency", + "message": "Field events[1].parts[0].text differs" + }, + { + "category": "event", + "session_id": "session_07", + "field_path": "events[2].parts[0].text", + "value_a": "Osaka Castle, Dotonbori, Universal Studios.", + "value_b": "Yes, late March to early April is peak cherry blossom season in Tokyo and Kyoto.", + "type": "inconsistency", + "message": "Field events[2].parts[0].text differs" + }, + { + "category": "event", + "session_id": "session_07", + "field_path": "events[3].parts[0].text", + "value_a": "How to travel between cities?", + "value_b": "Any specific cherry blossom spots you recommend?", + "type": "inconsistency", + "message": "Field events[3].parts[0].text differs" + }, + { + "category": "event", + "session_id": "session_07", + "field_path": "events[4].parts[0].text", + "value_a": "Shinkansen bullet train, about 2.5 hours Tokyo to Kyoto.", + "value_b": "Ueno Park in Tokyo, Philosopher's Path in Kyoto, and Osaka Castle Park.", + "type": "inconsistency", + "message": "Field events[4].parts[0].text differs" + } + ], + "allowed_diffs": [] + }, + { + "case_name": "case_08_exception_recovery", + "passed": false, + "backend_a": "InMemory", + "backend_b": "Sql", + "inconsistencies": [ + { + "category": "event", + "session_id": "session_08", + "field_path": "events.count", + "value_a": 4, + "value_b": 3, + "type": "inconsistency", + "message": "Event count mismatch: 4 vs 3" + } + ], + "allowed_diffs": [] + }, + { + "case_name": "case_09_injected_event_order", + "passed": false, + "backend_a": "InMemory", + "backend_b": "Sql", + "inconsistencies": [ + { + "category": "event", + "session_id": "session_09", + "field_path": "events[1].author", + "value_a": "agent", + "value_b": "user", + "type": "inconsistency", + "message": "Field events[1].author differs" + }, + { + "category": "event", + "session_id": "session_09", + "field_path": "events[1].parts[0].text", + "value_a": "The capital of France is Paris.", + "value_b": "What is the population?", + "type": "inconsistency", + "message": "Field events[1].parts[0].text differs" + }, + { + "category": "event", + "session_id": "session_09", + "field_path": "events[2].author", + "value_a": "user", + "value_b": "agent", + "type": "inconsistency", + "message": "Field events[2].author differs" + }, + { + "category": "event", + "session_id": "session_09", + "field_path": "events[2].parts[0].text", + "value_a": "What is the population?", + "value_b": "The capital of France is Paris.", + "type": "inconsistency", + "message": "Field events[2].parts[0].text differs" + } + ], + "allowed_diffs": [] + }, + { + "case_name": "case_10_injected_summary_session", + "passed": false, + "backend_a": "InMemory", + "backend_b": "Sql", + "inconsistencies": [ + { + "category": "summary", + "session_id": "session_10", + "field_path": "summary.session_id", + "value_a": "session_10", + "value_b": "session_99", + "type": "inconsistency", + "message": "SUMMARY_OWNERSHIP: summary session_id mismatch" + } + ], + "allowed_diffs": [] + } + ] +} \ No newline at end of file diff --git a/tests/sessions/test_replay_consistency.py b/tests/sessions/test_replay_consistency.py new file mode 100644 index 00000000..3454cf5f --- /dev/null +++ b/tests/sessions/test_replay_consistency.py @@ -0,0 +1,1183 @@ +# Tencent is pleased to support the open source community by making tRPC-Agent-Python available. +# +# Copyright (C) 2026 Tencent. All rights reserved. +# +# tRPC-Agent-Python is licensed under Apache-2.0. +"""Session / Memory / Summary replay consistency test framework. + +This module provides: + +- :class:`ReplayHarness`: drives a :class:`ReplayCase` through two + ``(session_service, memory_service)`` backends and collects raw results. +- :class:`DiffEngine`: normalizes raw results and performs four-dimension + comparison (events, state, memory, summary), producing a :class:`DiffReport`. +- Parameterised pytest tests that run all 10 replay cases in lightweight mode + (InMemory vs SQLite). +""" + +from __future__ import annotations + +import json +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +import pytest + +from trpc_agent_sdk.abc import MemoryServiceABC, SearchMemoryResponse, SessionServiceABC +from trpc_agent_sdk.events import Event +from trpc_agent_sdk.sessions import Session, SessionSummary +from trpc_agent_sdk.types import Content, EventActions, Part + +from .conftest import ( + ReplayCase, + ReplayStep, + load_replay_case, + list_replay_cases, + normalize_memory_response, + normalize_session_for_compare, + normalize_summary_for_compare, +) + +# --------------------------------------------------------------------------- +# DiffReport data structures +# --------------------------------------------------------------------------- + + +@dataclass +class DiffEntry: + """A single inconsistency discovered between two backends. + + Attributes: + category: One of ``event``, ``state``, ``memory``, ``summary``. + session_id: The session this entry belongs to. + event_index: Index of the event within the session (for event diffs). + summary_id: Summary identifier (for summary diffs). + field_path: Dot-separated path to the differing field, + e.g. ``events[2].author``. + value_a: Value from backend A (InMemory). + value_b: Value from backend B (SQLite). + type: ``inconsistency`` or ``allowed_diff``. + message: Human-readable explanation. + """ + + category: str = "" + session_id: str = "" + event_index: Optional[int] = None + summary_id: Optional[str] = None + field_path: str = "" + value_a: Any = None + value_b: Any = None + type: str = "inconsistency" + message: str = "" + + +@dataclass +class DiffReport: + """Aggregated result of comparing two backends on a single replay case. + + Attributes: + case_name: Name of the replayed case. + passed: ``True`` when no inconsistencies were found. + backend_a_label: Human-readable label for backend A. + backend_b_label: Human-readable label for backend B. + diffs: All discovered differences (inconsistencies + allowed diffs). + allowed_diffs: Differences that are explicitly allowed by policy. + """ + + case_name: str = "" + passed: bool = True + backend_a_label: str = "InMemory" + backend_b_label: str = "SQLite" + diffs: List[DiffEntry] = field(default_factory=list) + allowed_diffs: List[DiffEntry] = field(default_factory=list) + + @property + def inconsistencies(self) -> List[DiffEntry]: + """Return only the entries classified as real inconsistencies.""" + return [d for d in self.diffs if d.type == "inconsistency"] + + def summary(self) -> str: + """Return a one-line summary of the report.""" + n_issues = len(self.inconsistencies) + n_allowed = len(self.allowed_diffs) + status = "PASSED" if self.passed else "FAILED" + return (f"[{status}] {self.case_name}: " + f"{n_issues} inconsistency(ies), {n_allowed} allowed diff(s)") + + def to_dict(self) -> Dict[str, Any]: + """Serialize to a JSON-compatible dict.""" + + def _entry_dict(e: DiffEntry) -> Dict[str, Any]: + d = { + "category": e.category, + "session_id": e.session_id, + "field_path": e.field_path, + "value_a": _serialize_value(e.value_a), + "value_b": _serialize_value(e.value_b), + "type": e.type, + "message": e.message, + } + if e.event_index is not None: + d["event_index"] = e.event_index + if e.summary_id is not None: + d["summary_id"] = e.summary_id + return d + + return { + "case_name": self.case_name, + "passed": self.passed, + "backend_a": self.backend_a_label, + "backend_b": self.backend_b_label, + "inconsistencies": [_entry_dict(d) for d in self.inconsistencies], + "allowed_diffs": [_entry_dict(d) for d in self.allowed_diffs], + } + + +def _serialize_value(val: Any) -> Any: + """Convert non-serializable values (e.g. Pydantic models) to plain types.""" + if val is None: + return None + if isinstance(val, (str, int, float, bool)): + return val + if isinstance(val, (list, tuple)): + return [_serialize_value(v) for v in val] + if isinstance(val, dict): + return {k: _serialize_value(v) for k, v in val.items()} + # Pydantic model or other object: try to dump + if hasattr(val, "model_dump"): + return val.model_dump() + return str(val) + + +# --------------------------------------------------------------------------- +# DiffEngine +# --------------------------------------------------------------------------- + + +class DiffEngine: + """Four-dimension comparison engine for session replay results. + + Usage:: + + engine = DiffEngine() + report = engine.compare(case_name, result_a, result_b) + """ + + # Fields that may legitimately differ between backends + ALLOWED_FIELD_PREFIXES = ( + "invocation_id", + "save_key", + "long_running_tool_ids", + ) + + def compare( + self, + case_name: str, + result_a: RawResult, + result_b: RawResult, + ) -> DiffReport: + """Compare two raw results and produce a DiffReport.""" + report = DiffReport(case_name=case_name) + report.backend_a_label = result_a.label + report.backend_b_label = result_b.label + + # 1. Events comparison + self._compare_events(result_a, result_b, report) + + # 2. State comparison + self._compare_state(result_a, result_b, report) + + # 3. Memory comparison + self._compare_memory(result_a, result_b, report) + + # 4. Summary comparison + self._compare_summary(result_a, result_b, report) + + report.passed = len(report.inconsistencies) == 0 + return report + + # ------------------------------------------------------------------ + # Event comparison + # ------------------------------------------------------------------ + + def _compare_events(self, a: RawResult, b: RawResult, report: DiffReport) -> None: + session_id = a.session.id if a.session else (b.session.id if b.session else "") + events_a = a.normalized_session.get("events", []) if a.normalized_session else [] + events_b = b.normalized_session.get("events", []) if b.normalized_session else [] + + # If either side has a summary anchor, align comparison at the summary + # boundary. Events before the summary are "covered by summary" and may + # differ in count across backends (InMemory stores compressed events, + # SQL re-reads from event table). Only events at/after the anchor are + # compared 1:1. + summary_idx_a = self._find_summary_anchor(events_a) + summary_idx_b = self._find_summary_anchor(events_b) + + if summary_idx_a >= 0 and summary_idx_b >= 0: + # Both sides have summary anchors — align at the anchor and compare + # events from there onwards. + post_summary_len = min(len(events_a) - summary_idx_a, len(events_b) - summary_idx_b) + for i in range(post_summary_len): + self._compare_event_dicts( + summary_idx_a + i, + session_id, + events_a[summary_idx_a + i], + events_b[summary_idx_b + i], + report, + ) + return + elif summary_idx_a >= 0 or summary_idx_b >= 0: + # Only one side was summarised — this is expected when comparing + # InMemory (in-memory compression) vs SQL (re-reads from event table). + # Skip event-by-event comparison and rely on summary metadata check. + return + + if len(events_a) != len(events_b): + report.diffs.append( + DiffEntry( + category="event", + session_id=session_id, + field_path="events.count", + value_a=len(events_a), + value_b=len(events_b), + message=f"Event count mismatch: {len(events_a)} vs {len(events_b)}", + )) + min_len = min(len(events_a), len(events_b)) + else: + min_len = len(events_a) + + for i in range(min_len): + self._compare_event_dicts( + i, + session_id, + events_a[i], + events_b[i], + report, + ) + + @staticmethod + def _find_summary_anchor(events: List[Dict[str, Any]]) -> int: + """Find the index of the first summary anchor event, or -1.""" + for i, evt in enumerate(events): + if evt.get("author") == "system": + for p in evt.get("parts", []): + if p.get("text", "").startswith("Previous conversation summary:"): + return i + return -1 + + def _compare_event_dicts( + self, + idx: int, + session_id: str, + evt_a: Dict[str, Any], + evt_b: Dict[str, Any], + report: DiffReport, + ) -> None: + """Compare two normalized event dicts field by field.""" + # Compare top-level scalar fields (is_final_response stripped by normalizer) + for fname in ("author", "partial", "branch"): + va = evt_a.get(fname) + vb = evt_b.get(fname) + if va != vb: + self._add_diff( + report, + "event", + session_id, + f"events[{idx}].{fname}", + va, + vb, + ) + + # Detect summary anchor events: both sides produce a summary event + # after compression. The exact summary text is compared in the summary + # dimension; skip parts-level comparison for summary events here. + is_summary_a = self._is_summary_event(evt_a) + is_summary_b = self._is_summary_event(evt_b) + if is_summary_a and is_summary_b: + return # skip parts comparison for summary events + if is_summary_a or is_summary_b: + self._add_diff( + report, + "event", + session_id, + f"events[{idx}].author", + evt_a.get("author"), + evt_b.get("author"), + message="Summary event exists in one backend but not the other", + ) + + # Compare parts + parts_a = evt_a.get("parts", []) + parts_b = evt_b.get("parts", []) + if len(parts_a) != len(parts_b): + self._add_diff( + report, + "event", + session_id, + f"events[{idx}].parts.count", + len(parts_a), + len(parts_b), + ) + + for pi in range(min(len(parts_a), len(parts_b))): + self._compare_parts(idx, pi, session_id, parts_a[pi], parts_b[pi], report) + + # Compare state_delta + sda = evt_a.get("state_delta", {}) or {} + sdb = evt_b.get("state_delta", {}) or {} + if sda != sdb: + self._add_diff( + report, + "event", + session_id, + f"events[{idx}].state_delta", + sda, + sdb, + ) + + @staticmethod + def _is_summary_event(evt: Dict[str, Any]) -> bool: + """Check if a normalized event dict represents a summary anchor.""" + if evt.get("author") != "system": + return False + for p in evt.get("parts", []): + text = p.get("text", "") + if text.startswith("Previous conversation summary:"): + return True + return False + + def _compare_parts( + self, + event_idx: int, + part_idx: int, + session_id: str, + pa: Dict[str, Any], + pb: Dict[str, Any], + report: DiffReport, + ) -> None: + """Compare two event parts.""" + prefix = f"events[{event_idx}].parts[{part_idx}]" + + # Type + ta = pa.get("type", "") + tb = pb.get("type", "") + if ta != tb: + self._add_diff(report, "event", session_id, f"{prefix}.type", ta, tb) + return + + # Type-specific comparison + if ta == "text": + if pa.get("text") != pb.get("text"): + self._add_diff(report, "event", session_id, f"{prefix}.text", pa.get("text"), pb.get("text")) + elif ta == "function_call": + if pa.get("name") != pb.get("name"): + self._add_diff(report, "event", session_id, f"{prefix}.name", pa.get("name"), pb.get("name")) + if pa.get("args") != pb.get("args"): + self._add_diff(report, "event", session_id, f"{prefix}.args", pa.get("args"), pb.get("args")) + elif ta == "function_response": + if pa.get("name") != pb.get("name"): + self._add_diff(report, "event", session_id, f"{prefix}.name", pa.get("name"), pb.get("name")) + if pa.get("response") != pb.get("response"): + self._add_diff(report, "event", session_id, f"{prefix}.response", pa.get("response"), + pb.get("response")) + + # ------------------------------------------------------------------ + # State comparison + # ------------------------------------------------------------------ + + def _compare_state(self, a: RawResult, b: RawResult, report: DiffReport) -> None: + session_id = a.session.id if a.session else (b.session.id if b.session else "") + state_a = (a.normalized_session or {}).get("state", {}) or {} + state_b = (b.normalized_session or {}).get("state", {}) or {} + + all_keys = set(state_a.keys()) | set(state_b.keys()) + for key in sorted(all_keys): + va = state_a.get(key) + vb = state_b.get(key) + if va != vb: + self._add_diff(report, "state", session_id, f"state.{key}", va, vb) + + # ------------------------------------------------------------------ + # Memory comparison + # ------------------------------------------------------------------ + + def _compare_memory(self, a: RawResult, b: RawResult, report: DiffReport) -> None: + session_id = a.session.id if a.session else (b.session.id if b.session else "") + mem_a = a.normalized_memory or [] + mem_b = b.normalized_memory or [] + + if len(mem_a) != len(mem_b): + self._add_diff(report, "memory", session_id, "memory.count", len(mem_a), len(mem_b)) + + for i in range(min(len(mem_a), len(mem_b))): + # Compare author and text content in memory entries + author_a = mem_a[i].get("author", "") + author_b = mem_b[i].get("author", "") + if author_a != author_b: + self._add_diff(report, "memory", session_id, f"memory[{i}].author", author_a, author_b) + + parts_a = mem_a[i].get("parts", []) + parts_b = mem_b[i].get("parts", []) + for pi in range(min(len(parts_a), len(parts_b))): + if parts_a[pi].get("text") != parts_b[pi].get("text"): + self._add_diff( + report, + "memory", + session_id, + f"memory[{i}].parts[{pi}].text", + parts_a[pi].get("text"), + parts_b[pi].get("text"), + ) + + # ------------------------------------------------------------------ + # Summary comparison + # ------------------------------------------------------------------ + + def _compare_summary(self, a: RawResult, b: RawResult, report: DiffReport) -> None: + sum_a = a.normalized_summary + sum_b = b.normalized_summary + + if sum_a is None and sum_b is None: + return # Both have no summary — consistent + if sum_a is None or sum_b is None: + sid = (sum_a or sum_b or {}).get("session_id", "") + self._add_diff(report, + "summary", + sid, + "summary.exists", + sum_a is not None, + sum_b is not None, + message="Summary exists in one backend but not the other") + return + + sid = sum_a.get("session_id", "") + # session_id must match exactly (summary ownership is critical) + if sum_a["session_id"] != sum_b["session_id"]: + self._add_diff(report, + "summary", + sid, + "summary.session_id", + sum_a["session_id"], + sum_b["session_id"], + message="SUMMARY_OWNERSHIP: summary session_id mismatch") + + # event counts + if sum_a["original_event_count"] != sum_b["original_event_count"]: + self._add_diff(report, "summary", sid, "summary.original_event_count", sum_a["original_event_count"], + sum_b["original_event_count"]) + if sum_a["compressed_event_count"] != sum_b["compressed_event_count"]: + self._add_diff(report, "summary", sid, "summary.compressed_event_count", sum_a["compressed_event_count"], + sum_b["compressed_event_count"]) + + # summary_text: semantic comparison (strip and compare) + text_a = (sum_a["summary_text"] or "").strip() + text_b = (sum_b["summary_text"] or "").strip() + if text_a != text_b: + self._add_diff(report, + "summary", + sid, + "summary.summary_text", + text_a, + text_b, + message="Summary text differs between backends") + + # ------------------------------------------------------------------ + # Helpers + # ------------------------------------------------------------------ + + def _add_diff( + self, + report: DiffReport, + category: str, + session_id: str, + field_path: str, + value_a: Any, + value_b: Any, + *, + message: str = "", + ) -> None: + """Add a diff entry, classifying it as inconsistency or allowed diff.""" + if any(field_path.startswith(prefix) for prefix in self.ALLOWED_FIELD_PREFIXES): + entry = DiffEntry( + category=category, + session_id=session_id, + field_path=field_path, + value_a=value_a, + value_b=value_b, + type="allowed_diff", + message=message or f"Allowed difference in {field_path}", + ) + report.allowed_diffs.append(entry) + else: + entry = DiffEntry( + category=category, + session_id=session_id, + field_path=field_path, + value_a=value_a, + value_b=value_b, + message=message or f"Field {field_path} differs", + ) + report.diffs.append(entry) + + +# --------------------------------------------------------------------------- +# RawResult: per-backend result of a single replay case +# --------------------------------------------------------------------------- + + +@dataclass +class RawResult: + """Raw output collected from one backend after executing a replay case. + + Attributes: + label: Human-readable label, e.g. ``InMemory`` or ``SQLite``. + session: The final :class:`Session` object retrieved from the backend. + normalized_session: Normalized dict of the session for comparison. + memory_responses: List of ``(query, SearchMemoryResponse)`` tuples. + normalized_memory: Normalized memory search results. + summary: Optional :class:`SessionSummary`. + normalized_summary: Normalized summary dict. + """ + + label: str = "" + session: Optional[Session] = None + normalized_session: Optional[Dict[str, Any]] = None + memory_responses: List[Tuple[str, SearchMemoryResponse]] = field(default_factory=list) + normalized_memory: Optional[List[Dict[str, Any]]] = None + summary: Optional[SessionSummary] = None + normalized_summary: Optional[Dict[str, Any]] = None + + +# --------------------------------------------------------------------------- +# ReplayHarness +# --------------------------------------------------------------------------- + + +class ReplayHarness: + """Drives a :class:`ReplayCase` through two backends for comparison. + + Usage:: + + harness = ReplayHarness() + result_a, result_b = await harness.run_case( + case=load_replay_case("case_01_single_turn"), + backend_a=(session_svc_a, memory_svc_a), + backend_b=(session_svc_b, memory_svc_b), + ) + report = DiffEngine().compare("case_01_single_turn", result_a, result_b) + """ + + # Mapping from step action names to handler methods + _ACTION_HANDLERS = { + "create_session": "_handle_create_session", + "append_event": "_handle_append_event", + "get_session": "_handle_get_session", + "store_memory": "_handle_store_memory", + "search_memory": "_handle_search_memory", + "create_session_summary": "_handle_create_session_summary", + "get_session_summary": "_handle_get_session_summary", + "inject_reorder_events": "_handle_inject_reorder_events", + "inject_summary_session_id": "_handle_inject_summary_session_id", + "inject_skip_append": "_handle_inject_skip_append", + } + + def __init__(self): + self._sessions: Dict[str, Session] = {} # label -> current session + + async def run_case( + self, + case: ReplayCase, + backend_a: Tuple[SessionServiceABC, MemoryServiceABC], + backend_b: Tuple[SessionServiceABC, MemoryServiceABC], + ) -> Tuple[RawResult, RawResult]: + """Execute every step of *case* on both backends. + + Returns: + Raw results from both backends. + """ + svc_a, mem_a = backend_a + svc_b, mem_b = backend_b + + result_a = RawResult(label=type(svc_a).__name__.replace("SessionService", "")) + result_b = RawResult(label=type(svc_b).__name__.replace("SessionService", "")) + + self._sessions = {} + + for step in case.steps: + handler_name = self._ACTION_HANDLERS.get(step.action) + if handler_name is None: + raise ValueError(f"Unknown step action: {step.action}") + + handler = getattr(self, handler_name) + + # Inject steps: apply to backend B only so the injected backend + # differs from the reference backend (A). This creates a detectable + # cross-backend inconsistency. + if step.action.startswith("inject_"): + await handler(step, svc_b, mem_b, result_b) + else: + await handler(step, svc_a, mem_a, result_a) + await handler(step, svc_b, mem_b, result_b) + + # Normalize results for comparison + result_a.normalized_session = normalize_session_for_compare(result_a.session) if result_a.session else None + result_b.normalized_session = normalize_session_for_compare(result_b.session) if result_b.session else None + result_a.normalized_summary = normalize_summary_for_compare(result_a.summary) if result_a.summary else None + result_b.normalized_summary = normalize_summary_for_compare(result_b.summary) if result_b.summary else None + + # Aggregate memory results + if result_a.memory_responses: + all_mem = [] + for _, resp in result_a.memory_responses: + all_mem.extend(normalize_memory_response(resp)) + result_a.normalized_memory = all_mem + if result_b.memory_responses: + all_mem = [] + for _, resp in result_b.memory_responses: + all_mem.extend(normalize_memory_response(resp)) + result_b.normalized_memory = all_mem + + return result_a, result_b + + # ------------------------------------------------------------------ + # Step handlers + # ------------------------------------------------------------------ + + async def _handle_create_session( + self, + step: ReplayStep, + svc: SessionServiceABC, + mem: MemoryServiceABC, + result: RawResult, + ) -> None: + kw = step.kwargs + session = await svc.create_session( + app_name=kw.get("app_name", "test_app"), + user_id=kw.get("user_id", "test_user"), + session_id=kw.get("session_id", f"session_{result.label}"), + state=kw.get("initial_state"), + ) + self._sessions[result.label] = session + + async def _handle_append_event( + self, + step: ReplayStep, + svc: SessionServiceABC, + mem: MemoryServiceABC, + result: RawResult, + ) -> None: + kw = step.kwargs + session = self._sessions.get(result.label) + if session is None: + raise RuntimeError(f"No session created for {result.label} before append_event") + + parts_list = kw.get("parts", []) + parts = [] + for p in parts_list: + ptype = p.get("type", "") + if ptype == "text": + parts.append(Part.from_text(text=p.get("text", ""))) + elif ptype == "function_call": + parts.append(Part.from_function_call( + name=p.get("name", ""), + args=p.get("args", {}), + )) + elif ptype == "function_response": + parts.append(Part.from_function_response( + name=p.get("name", ""), + response=p.get("response", {}), + )) + + state_delta = kw.get("state_delta", {}) + event = Event( + invocation_id=f"inv_{result.label}", + author=kw.get("author", "user"), + content=Content(parts=parts, role=kw.get("author", "user")), + actions=EventActions(state_delta=state_delta) if state_delta else EventActions(), + ) + await svc.append_event(session, event) + self._sessions[result.label] = session + + async def _handle_get_session( + self, + step: ReplayStep, + svc: SessionServiceABC, + mem: MemoryServiceABC, + result: RawResult, + ) -> None: + kw = step.kwargs + session_id = kw.get("session_id") or (result.session.id if result.session else None) + if session_id is None: + # Derive from the stored session key + for label, s in self._sessions.items(): + if label == result.label: + session_id = s.id + break + + app_name = kw.get("app_name", "test_app") + user_id = kw.get("user_id", "test_user") + session = await svc.get_session(app_name=app_name, user_id=user_id, session_id=session_id) + if session: + result.session = session + self._sessions[result.label] = session + + async def _handle_store_memory( + self, + step: ReplayStep, + svc: SessionServiceABC, + mem: MemoryServiceABC, + result: RawResult, + ) -> None: + session = self._sessions.get(result.label) + if session is None: + raise RuntimeError(f"No session for {result.label} in store_memory") + if mem.enabled: + await mem.store_session(session) + + async def _handle_search_memory( + self, + step: ReplayStep, + svc: SessionServiceABC, + mem: MemoryServiceABC, + result: RawResult, + ) -> None: + kw = step.kwargs + key = kw.get("key", "") + query = kw.get("query", "") + if not key or not query: + return + if mem.enabled: + response = await mem.search_memory(key=key, query=query) + result.memory_responses.append((query, response)) + + async def _handle_create_session_summary( + self, + step: ReplayStep, + svc: SessionServiceABC, + mem: MemoryServiceABC, + result: RawResult, + ) -> None: + session = self._sessions.get(result.label) + if session is None: + return + # The summarizer manager is attached to the session service + mgr = getattr(svc, "summarizer_manager", None) if hasattr(svc, "summarizer_manager") else None + if mgr: + kw = step.kwargs + await mgr.create_session_summary(session, force=kw.get("force", False)) + + async def _handle_get_session_summary( + self, + step: ReplayStep, + svc: SessionServiceABC, + mem: MemoryServiceABC, + result: RawResult, + ) -> None: + session = self._sessions.get(result.label) + if session is None: + return + mgr = getattr(svc, "summarizer_manager", None) if hasattr(svc, "summarizer_manager") else None + if mgr: + summary = await mgr.get_session_summary(session) + result.summary = summary + + # ------------------------------------------------------------------ + # Injection handlers (for injected inconsistency cases) + # ------------------------------------------------------------------ + + async def _handle_inject_reorder_events( + self, + step: ReplayStep, + svc: SessionServiceABC, + mem: MemoryServiceABC, + result: RawResult, + ) -> None: + """Swap two events in the session's event list to simulate ordering bug.""" + session = self._sessions.get(result.label) + if session is None or not session.events: + return + kw = step.kwargs + indices = kw.get("indices", [0, 1]) + if len(indices) >= 2 and max(indices) < len(session.events): + i, j = indices[0], indices[1] + session.events[i], session.events[j] = session.events[j], session.events[i] + await svc.update_session(session) + + async def _handle_inject_summary_session_id( + self, + step: ReplayStep, + svc: SessionServiceABC, + mem: MemoryServiceABC, + result: RawResult, + ) -> None: + """Override the summary's session_id to point to a wrong session.""" + session = self._sessions.get(result.label) + if session is None: + return + mgr = getattr(svc, "summarizer_manager", None) + if mgr: + kw = step.kwargs + wrong_id = kw.get("wrong_session_id", "session_wrong") + cache = getattr(mgr, "_summarizer_cache", {}) + app_name = session.app_name + user_id = session.user_id + sid = session.id + if app_name in cache and user_id in cache[app_name] and sid in cache[app_name][user_id]: + cache[app_name][user_id][sid].session_id = wrong_id + result.summary = cache[app_name][user_id][sid] + + async def _handle_inject_skip_append( + self, + step: ReplayStep, + svc: SessionServiceABC, + mem: MemoryServiceABC, + result: RawResult, + ) -> None: + """Skip the last append on this backend (simulate write failure). + + Removes the most recently appended event from the session. + This creates a cross-backend inconsistency when only one backend + receives the injection. + """ + session = self._sessions.get(result.label) + if session is None or not session.events: + return + removed = session.events.pop() + logger = __import__("logging").getLogger(__name__) + logger.info("inject_skip_append removed event author=%s on %s", removed.author, result.label) + await svc.update_session(session) + + +# --------------------------------------------------------------------------- +# Pytest tests +# --------------------------------------------------------------------------- + +# List of normal (non-injected) cases to verify consistency +_NORMAL_CASES = [ + "case_01_single_turn", + "case_02_multi_turn", + "case_03_tool_call", + "case_04_state_update", +] + +# Cases that require memory service enabled +_MEMORY_CASES = [ + "case_05_memory_rw", +] + +# Cases that require summarizer manager (note: case_07 is expected to produce +# event-level divergences because summary compression creates different event +# boundaries across backends — it is tracked as a "known inconsistency" case). +_SUMMARY_CASES = [ + "case_06_summary_gen", +] + +_KNOWN_INCONSISTENCY_CASES = [ + "case_07_summary_truncate", +] + +# Injected inconsistency cases +_INJECTED_CASES = [ + "case_08_exception_recovery", + "case_09_injected_event_order", + "case_10_injected_summary_session", +] + + +@pytest.mark.parametrize("case_name", _NORMAL_CASES) +async def test_replay_normal(case_name: str, full_backend_pair) -> None: + """Verify that normal (non-injected) cases produce identical results + across InMemory and SQLite backends.""" + case = load_replay_case(case_name) + backend_a, backend_b = full_backend_pair + + harness = ReplayHarness() + result_a, result_b = await harness.run_case(case, backend_a, backend_b) + report = DiffEngine().compare(case_name, result_a, result_b) + + assert report.passed, report.summary() + + +@pytest.mark.parametrize("case_name", _MEMORY_CASES) +async def test_replay_with_memory(case_name: str, full_backend_pair) -> None: + """Verify replay cases that exercise memory store and search.""" + case = load_replay_case(case_name) + backend_a, backend_b = full_backend_pair + + harness = ReplayHarness() + result_a, result_b = await harness.run_case(case, backend_a, backend_b) + report = DiffEngine().compare(case_name, result_a, result_b) + + assert report.passed, report.summary() + + +@pytest.mark.parametrize("case_name", _SUMMARY_CASES) +async def test_replay_with_summary(case_name: str, full_backend_pair_with_summary) -> None: + """Verify replay cases that exercise session summarization.""" + case = load_replay_case(case_name) + backend_a, backend_b = full_backend_pair_with_summary + + harness = ReplayHarness() + result_a, result_b = await harness.run_case(case, backend_a, backend_b) + report = DiffEngine().compare(case_name, result_a, result_b) + + assert report.passed, report.summary() + + +@pytest.mark.parametrize("case_name", _INJECTED_CASES) +async def test_replay_injected_detection(case_name: str, full_backend_pair_with_summary) -> None: + """Verify that injected inconsistencies are detected (must report failure). + + For injected cases the report *must* flag inconsistencies. + """ + case = load_replay_case(case_name) + backend_a, backend_b = full_backend_pair_with_summary + + harness = ReplayHarness() + result_a, result_b = await harness.run_case(case, backend_a, backend_b) + report = DiffEngine().compare(case_name, result_a, result_b) + + # Injected cases MUST have inconsistencies + assert not report.passed, (f"Injected case {case_name} should have been detected as inconsistent, " + f"but passed. Diffs: {report.diffs}") + + +@pytest.mark.parametrize("case_name", _NORMAL_CASES + _MEMORY_CASES) +async def test_replay_false_positive_check(case_name: str, full_backend_pair) -> None: + """Verify that normal (non-summary) cases have zero or very few + inconsistencies (≤5%).""" + backend_a, backend_b = full_backend_pair + case = load_replay_case(case_name) + harness = ReplayHarness() + result_a, result_b = await harness.run_case(case, backend_a, backend_b) + report = DiffEngine().compare(case_name, result_a, result_b) + + inconsistencies = report.inconsistencies + total_checks = max(len(report.diffs) + len(report.allowed_diffs), 1) + false_positive_rate = len(inconsistencies) / total_checks + + assert false_positive_rate <= 0.05, (f"False positive rate {false_positive_rate:.2%} exceeds 5% " + f"for case {case_name}. Inconsistencies: {inconsistencies}") + + +@pytest.mark.parametrize("case_name", _SUMMARY_CASES) +async def test_replay_false_positive_check_summary(case_name: str, full_backend_pair_with_summary) -> None: + """Verify that summary-involved cases have zero or very few + inconsistencies (≤5%).""" + backend_a, backend_b = full_backend_pair_with_summary + case = load_replay_case(case_name) + harness = ReplayHarness() + result_a, result_b = await harness.run_case(case, backend_a, backend_b) + report = DiffEngine().compare(case_name, result_a, result_b) + + inconsistencies = report.inconsistencies + total_checks = max(len(report.diffs) + len(report.allowed_diffs), 1) + false_positive_rate = len(inconsistencies) / total_checks + + assert false_positive_rate <= 0.05, (f"False positive rate {false_positive_rate:.2%} exceeds 5% " + f"for case {case_name}. Inconsistencies: {inconsistencies}") + + +@pytest.mark.parametrize("case_name", _KNOWN_INCONSISTENCY_CASES) +async def test_replay_summary_truncation(case_name: str, full_backend_pair_with_summary) -> None: + """Verify summary truncation with two-layer validation. + + Layer 1 — Cross-backend consistency (strict): + - Summary metadata (session_id, summary_text, counts) must match + - Session state must match + - Event count divergence is recorded as allowed_diff (expected due + to different storage models) + + Layer 2 — Per-backend semantic validation: + - Summary was generated (non-empty text) + - Compression happened (compressed_count < original_count) + - New events appended after summary are present + - Summary + retained events + new events together cover context + """ + case = load_replay_case(case_name) + backend_a, backend_b = full_backend_pair_with_summary + + harness = ReplayHarness() + result_a, result_b = await harness.run_case(case, backend_a, backend_b) + report = DiffEngine().compare(case_name, result_a, result_b) + + # ── Layer 1: Cross-backend comparison ────────────────────────────── + + # Summary metadata must be consistent across backends + sum_a = result_a.normalized_summary + sum_b = result_b.normalized_summary + assert sum_a is not None, f"Backend A missing summary for {case_name}" + assert sum_b is not None, f"Backend B missing summary for {case_name}" + assert sum_a["session_id"] == sum_b["session_id"], "Summary session_id mismatch" + assert sum_a["original_event_count"] == sum_b["original_event_count"] + assert sum_a["compressed_event_count"] == sum_b["compressed_event_count"] + assert sum_a["summary_text"].strip() == sum_b["summary_text"].strip() + + # State must be consistent (only the last get_session state is captured) + state_a = (result_a.normalized_session or {}).get("state", {}) or {} + state_b = (result_b.normalized_session or {}).get("state", {}) or {} + assert state_a == state_b, f"State mismatch between backends: {state_a} vs {state_b}" + + # Event count divergence is allowed (known storage-model difference) + events_a = (result_a.normalized_session or {}).get("events", []) or [] + events_b = (result_b.normalized_session or {}).get("events", []) or [] + if len(events_a) != len(events_b): + report.allowed_diffs.append( + DiffEntry( + category="event", + session_id=sum_a["session_id"], + field_path="events.count", + value_a=len(events_a), + value_b=len(events_b), + message=("Event count differs because InMemory stores the compressed " + "event list while SQL re-reads all events from the event table. " + "This is a known backend storage-model difference."), + )) + + # ── Layer 2: Per-backend semantic validation ─────────────────────── + + for label, result in [("InMemory", result_a), ("SQLite", result_b)]: + summary = result.normalized_summary + events = (result.normalized_session or {}).get("events", []) or [] + assert summary is not None, f"{label}: no summary generated" + assert summary["summary_text"].strip(), f"{label}: empty summary text" + assert summary["compressed_event_count"] < summary["original_event_count"], ( + f"{label}: compression did not reduce event count " + f"({summary['compressed_event_count']} >= {summary['original_event_count']})") + + # Check that new events after summary are preserved + # The mock summary text contains "Mock session summary" — verify + # it appears in the event list as a summary anchor + summary_text_found = any("Previous conversation summary:" in (p.get("text", "") or "") for evt in events + for p in evt.get("parts", [])) + assert summary_text_found, f"{label}: summary event not found in session events" + + # Verify new events (cherry blossom questions) are present + all_text = " ".join(p.get("text", "") for evt in events for p in evt.get("parts", [])) + assert "cherry blossom" in all_text.lower(), (f"{label}: new events after summary are missing. " + f"Events text: {all_text[:200]}") + + # Report should reflect expected divergences + report.passed = len(report.inconsistencies) == 0 + assert report.case_name == case_name + logger = __import__("logging").getLogger(__name__) + logger.info( + "case_07 summary: %s (allowed diffs: %d)", + report.summary(), + len(report.allowed_diffs), + ) + + +async def test_replay_all_summary_issues_detected(full_backend_pair_with_summary) -> None: + """Dedicated test for summary problem detection: summary loss, override, + and wrong-session ownership must all be caught.""" + backend_a, backend_b = full_backend_pair_with_summary + cases = load_replay_case("case_10_injected_summary_session") + + harness = ReplayHarness() + result_a, result_b = await harness.run_case(cases, backend_a, backend_b) + report = DiffEngine().compare("case_10_injected_summary_session", result_a, result_b) + + summary_diffs = [d for d in report.inconsistencies if d.category == "summary"] + summary_session_issues = [d for d in summary_diffs if "session_id" in d.field_path] + + # Must detect the wrong-session-id injection + assert len(summary_session_issues) >= 1, ( + f"Summary session_id mismatch was not detected. All summary diffs: {summary_diffs}") + + +async def test_diff_report_json_output(full_backend_pair, tmp_path) -> None: + """Verify that the diff report can be serialized to JSON with all + required fields.""" + case = load_replay_case("case_01_single_turn") + backend_a, backend_b = full_backend_pair + + harness = ReplayHarness() + result_a, result_b = await harness.run_case(case, backend_a, backend_b) + report = DiffEngine().compare("case_01_single_turn", result_a, result_b) + + json_data = report.to_dict() + + # Verify required fields + assert "case_name" in json_data + assert "passed" in json_data + assert "inconsistencies" in json_data + assert "allowed_diffs" in json_data + assert json_data["case_name"] == "case_01_single_turn" + + # Verify entry fields (if any diffs exist) + if json_data["inconsistencies"]: + entry = json_data["inconsistencies"][0] + assert "session_id" in entry + assert "field_path" in entry + assert "value_a" in entry + assert "value_b" in entry + + # Write to temp path for verification + output_path = tmp_path / "session_memory_summary_diff_report.json" + with open(output_path, "w", encoding="utf-8") as f: + json.dump(json_data, f, indent=2, ensure_ascii=False) + assert output_path.exists() + + +@pytest.mark.parametrize("case_name", list_replay_cases()) +async def test_replay_all_cases_load_and_validate(case_name: str) -> None: + """Quick validation that every replay case JSONL can be loaded correctly.""" + case = load_replay_case(case_name) + assert case.name == case_name + assert len(case.steps) >= 1, f"Case {case_name} has no steps" + # Verify first step is always create_session (for consistency) + if case_name not in ("case_09_injected_event_order", "case_10_injected_summary_session"): + assert case.steps[0].action == "create_session", (f"Case {case_name} should start with create_session") + + +async def test_generate_aggregated_diff_report( + full_backend_pair, + full_backend_pair_with_summary, +) -> None: + """Generate the full ``session_memory_summary_diff_report.json``. + + Runs all 10 replay cases against both backends and aggregates the + per-case DiffReport into a single JSON file written to the repo root. + Summary-involved cases use ``full_backend_pair_with_summary``; others + use ``full_backend_pair``. + """ + _SUMMARY_OR_INJECTED = (set(_SUMMARY_CASES) | set(_KNOWN_INCONSISTENCY_CASES) | set(_INJECTED_CASES)) + + normal_a, normal_b = full_backend_pair + summary_a, summary_b = full_backend_pair_with_summary + + all_reports = [] + + for case_name in list_replay_cases(): + # Choose the right backend pair + if case_name in _SUMMARY_OR_INJECTED: + bp_a, bp_b = summary_a, summary_b + else: + bp_a, bp_b = normal_a, normal_b + + case = load_replay_case(case_name) + harness = ReplayHarness() + result_a, result_b = await harness.run_case(case, bp_a, bp_b) + report = DiffEngine().compare(case_name, result_a, result_b) + all_reports.append(report.to_dict()) + + output = { + "generated_at": __import__("time").time(), + "total_cases": len(all_reports), + "cases_passed": sum(1 for r in all_reports if r["passed"]), + "backends": { + "a": "InMemorySessionService", + "b": "SqlSessionService (SQLite :memory:)", + }, + "reports": all_reports, + } + + output_path_agg = Path(__file__).parent / "session_memory_summary_diff_report.json" + with open(output_path_agg, "w", encoding="utf-8") as f: + json.dump(output, f, indent=2, ensure_ascii=False) + + # Print path for convenience + print(f"\nAggregated diff report written to: {output_path_agg}") + print(f"Total cases: {output['total_cases']}, Passed: {output['cases_passed']}")