feat(gateway): add List API with request-summary read model#226
feat(gateway): add List API with request-summary read model#226albertywu wants to merge 5 commits into
Conversation
19d5978 to
b218ec7
Compare
f5c76ca to
0096a76
Compare
b218ec7 to
883bf2f
Compare
|
This PR could not be automatically rebased after its base PR was merged. The rebase hit conflicts that need manual resolution. To fix manually: git fetch origin
git checkout wua/implement-list-api
git rebase --onto origin/main d3f2992d387ce02389f583b2e27953f506608b37 wua/implement-list-api
# resolve conflicts, then:
git push --force-with-leaseThen update this PR's base branch: gh pr edit 226 --base main |
Add a queue-scoped, time-windowed List RPC to the gateway, served from a new gateway-owned request-summary read model rather than by scanning and reconciling the append-only request log per request. - entity: RequestSummary, plus queue + change URIs on RequestLog, QueueFromRequestID parse helper, and IsKnownRequestStatus allow-list for filter validation. - storage: RequestSummaryStore (page-in/page-out List with a stable newest-started-first cursor; UpsertFromLog guarded merge) and mysql impl + schema; queue/change_uri columns on request_log. - core/request: PersistLog helper that pairs RequestLogStore.Insert with a guarded summary upsert, wired into Land, Cancel, and the log sink so all three write paths update both views through one helper. - gateway: ListController + List RPC (proto + generated code) and wiring. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The request-summary store was the only store using a database transaction (BeginTx + SELECT ... FOR UPDATE + Commit), which this repo forbids in favor of optimistic concurrency. Replace it with the same optimistic-locking pattern as requestStore.UpdateState: read the summary unlocked, merge the incoming log in memory, then write back with a conditional update guarded by a new row version column. A concurrent writer (another gateway write path or a redelivered log event) loses the CAS, and the caller re-reads and re-merges. Merges are monotonic and idempotent, so the bounded loop converges; exhausting it surfaces as storage.ErrVersionMismatch. - schema: add optimistic-lock version column to request_summary. - update is now a pure conditional write taking oldVersion/newVersion; insert returns inserted=false on a duplicate-key race so the caller retries into the merge path. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
883bf2f to
0486e5e
Compare
0486e5e to
5959630
Compare
rebased |
There was a problem hiding this comment.
Pull request overview
This PR adds a new gateway List RPC that returns queue-scoped request summaries over a time window, backed by a gateway-owned request_summary read model that’s incrementally maintained from the append-only request log (instead of scanning/reconciling the log per List call).
Changes:
- Introduces
entity.RequestSummaryplusRequestSummaryStore(with MySQL implementation + schema) and addsqueue/change_uricolumns torequest_log. - Adds
core/request.PersistLogand wires gateway write paths (Land/Cancel/log sink) to upsert both the request log and request summary. - Adds gateway List controller + tests, and exposes the List RPC via proto + generated clients/servers and example wiring.
Reviewed changes
Copilot reviewed 30 out of 35 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| submitqueue/gateway/controller/log/log.go | Switch log sink persistence to PersistLog (log + summary). |
| submitqueue/gateway/controller/log/log_test.go | Updates mocks to expect summary upserts. |
| submitqueue/gateway/controller/log/BUILD.bazel | Adds dependency on //submitqueue/core/request. |
| submitqueue/gateway/controller/list.go | New ListController implementing queue/time-window listing + pagination token. |
| submitqueue/gateway/controller/list_test.go | Unit tests for ListController (filters, sort, token validation, errors). |
| submitqueue/gateway/controller/land.go | Land now persists log + summary and includes queue/change URIs in accepted log. |
| submitqueue/gateway/controller/land_test.go | Updates mocks to include summary upserts. |
| submitqueue/gateway/controller/cancel.go | Cancel now uses storage.Storage and PersistLog (log + summary). |
| submitqueue/gateway/controller/cancel_test.go | Updates tests/mocks for new CancelController signature + summary upserts. |
| submitqueue/gateway/controller/BUILD.bazel | Adds list controller sources/tests to Bazel targets. |
| submitqueue/extension/storage/storage.go | Extends Storage interface with GetRequestSummaryStore. |
| submitqueue/extension/storage/request_summary_store.go | New RequestSummaryStore interface + list options/cursor types. |
| submitqueue/extension/storage/mysql/storage.go | Wires MySQL-backed RequestSummaryStore into storage. |
| submitqueue/extension/storage/mysql/schema/request_summary.sql | Adds request_summary table schema. |
| submitqueue/extension/storage/mysql/schema/request_log.sql | Adds queue and change_uri columns to request_log. |
| submitqueue/extension/storage/mysql/schema/README.md | Documents request_summary table key/index tradeoffs. |
| submitqueue/extension/storage/mysql/request_summary_store.go | Implements optimistic upsert + paged list query for request summaries. |
| submitqueue/extension/storage/mysql/request_summary_store_test.go | Tests merge and sort SQL helpers for request_summary store. |
| submitqueue/extension/storage/mysql/request_log_store.go | Persists/loads queue + change_uri for request logs. |
| submitqueue/extension/storage/mysql/BUILD.bazel | Adds request_summary store + new go_test target. |
| submitqueue/extension/storage/mock/storage_mock.go | Updates Storage mock to include GetRequestSummaryStore. |
| submitqueue/extension/storage/mock/request_summary_store_mock.go | New mock for RequestSummaryStore. |
| submitqueue/extension/storage/mock/BUILD.bazel | Adds request_summary store mock to build. |
| submitqueue/extension/storage/BUILD.bazel | Adds request_summary store interface to build. |
| submitqueue/entity/request_summary.go | New RequestSummary entity type. |
| submitqueue/entity/request_log.go | Adds Queue, ChangeURIs, QueueFromRequestID, IsKnownRequestStatus, and NewRequestLogWithDetails. |
| submitqueue/entity/request_log_test.go | Expands serialization tests and adds QueueFromRequestID test. |
| submitqueue/entity/BUILD.bazel | Adds request_summary to entity library. |
| submitqueue/core/request/store.go | New PersistLog helper (log insert + summary upsert). |
| submitqueue/core/request/BUILD.bazel | Adds store.go to core/request build. |
| example/submitqueue/gateway/server/main.go | Wires List controller into example gateway server; updates Cancel wiring. |
| api/submitqueue/gateway/protopb/gateway.pb.yarpc.go | Generated YARPC bindings updated to include List. |
| api/submitqueue/gateway/protopb/gateway.pb.go | Generated proto Go code updated (ListSort, ListRequest/Response, RequestSummary). |
| api/submitqueue/gateway/protopb/gateway_grpc.pb.go | Generated gRPC bindings updated to include List. |
| api/submitqueue/gateway/proto/gateway.proto | Adds List RPC, request/response messages, and sorting enum. |
Files not reviewed (5)
- api/submitqueue/gateway/protopb/gateway.pb.go: Generated file
- api/submitqueue/gateway/protopb/gateway.pb.yarpc.go: Generated file
- api/submitqueue/gateway/protopb/gateway_grpc.pb.go: Generated file
- submitqueue/extension/storage/mock/request_summary_store_mock.go: Generated file
- submitqueue/extension/storage/mock/storage_mock.go: Generated file
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| func PersistLog(ctx context.Context, store storage.Storage, log entity.RequestLog) error { | ||
| if err := store.GetRequestLogStore().Insert(ctx, log); err != nil { | ||
| return fmt.Errorf("failed to insert request log for request_id=%s: %w", log.RequestID, err) | ||
| } | ||
| if err := store.GetRequestSummaryStore().UpsertFromLog(ctx, log); err != nil { | ||
| return fmt.Errorf("failed to upsert request summary for request_id=%s: %w", log.RequestID, err) | ||
| } | ||
| return nil |
| if log.TimestampMs > 0 && (next.summary.StartedAtMs == 0 || log.TimestampMs < next.summary.StartedAtMs) { | ||
| next.summary.StartedAtMs = log.TimestampMs | ||
| } |
| if log.Queue == "" { | ||
| log.Queue = entity.QueueFromRequestID(log.RequestID) | ||
| } |
Summary
Add a queue-scoped, time-windowed List RPC to the gateway, served from a
new gateway-owned request-summary read model rather than by scanning and
reconciling the append-only request log per request.
Implements RFC design in #225
Co-Authored-By: Claude Opus 4.8 (1M context) noreply@anthropic.com
Test Plan
✅
make fmt && make build && make test && make e2e-testStack