diff --git a/api/v1_notifications.go b/api/v1_notifications.go index 6543977d..f6a6a5db 100644 --- a/api/v1_notifications.go +++ b/api/v1_notifications.go @@ -1,9 +1,12 @@ package api import ( + "context" "encoding/json" + "errors" "slices" "strings" + "time" "api.audius.co/api/dbv1" "api.audius.co/trashid" @@ -21,14 +24,32 @@ import ( // surfaces every target — only the actor list is bounded by this cap. const notificationRelatedActorsPerGroup = 1 +const ( + // Keep notification reads well below the production replica's + // max_standby_streaming_delay so pathological reads fail fast instead of + // holding hot-standby snapshots and making the replica fall behind. + notificationReadTimeout = 8 * time.Second + + // Historically `limit=0` fell back to the default limit of 20, so unread + // polling counted at most the first page of notification groups. + notificationUnreadPollLimit = 20 +) + type GetNotificationsQueryParams struct { - // Note that when limit is 0, we return 20 items to calculate unread count Limit int `query:"limit" default:"20" validate:"min=0,max=100"` Types []string `query:"types" validate:"dive,oneof=announcement follow repost save remix cosign create tip_receive tip_send challenge_reward repost_of_repost save_of_repost tastemaker reaction supporter_dethroned supporter_rank_up supporting_rank_up milestone track_added_to_playlist tier_change trending trending_playlist trending_underground usdc_purchase_buyer usdc_purchase_seller track_added_to_purchased_album request_manager approve_manager_request track_collaborator_invite track_collaborator_accept claimable_reward comment comment_thread comment_mention comment_reaction listen_streak_reminder fan_remix_contest_started fan_remix_contest_ended fan_remix_contest_ending_soon fan_remix_contest_winners_selected fan_remix_contest_submission artist_remix_contest_ended artist_remix_contest_ending_soon artist_remix_contest_submissions fan_club_text_post remix_contest_update"` GroupID string `query:"group_id" validate:"omitempty"` Timestamp float64 `query:"timestamp" validate:"omitempty,min=0"` } +type notificationRow struct { + Type string `json:"type"` + GroupID string `json:"group_id"` + Actions []json.RawMessage `json:"actions"` + IsSeen bool `json:"is_seen"` + SeenAt interface{} `json:"seen_at"` +} + var unsupportedNotificationTypes = []string{ // No frontend support "usdc_transfer", @@ -43,12 +64,87 @@ var unsupportedNotificationTypes = []string{ "remix", } +func (app *ApiServer) v1NotificationsUnreadPoll(c *fiber.Ctx, params GetNotificationsQueryParams) error { + sql := ` +WITH latest_seen AS ( + SELECT MAX(seen_at) AS seen_at + FROM notification_seen + WHERE user_id = @user_id +), +-- Equivalent to ARRAY[@user_id] && n.user_ids, split so single-recipient rows +-- can use notification_single_recipient_user_timestamp_idx while +-- multi-recipient arrays keep the existing overlap semantics. +matched_notifications AS ( + SELECT n.type, n.group_id + FROM notification n + CROSS JOIN latest_seen + WHERE array_length(n.user_ids, 1) = 1 + AND n.user_ids[1] = @user_id + AND n.timestamp > (now()::timestamp - interval '90 days') + AND n.timestamp > COALESCE(latest_seen.seen_at, '-infinity'::timestamp) + AND (n.type = ANY(@types) OR @types IS NULL) + AND (n.type != ALL(@unsupported_types)) + UNION ALL + SELECT n.type, n.group_id + FROM notification n + CROSS JOIN latest_seen + WHERE COALESCE(array_length(n.user_ids, 1), 0) != 1 + AND ARRAY[@user_id] && n.user_ids + AND n.timestamp > (now()::timestamp - interval '90 days') + AND n.timestamp > COALESCE(latest_seen.seen_at, '-infinity'::timestamp) + AND (n.type = ANY(@types) OR @types IS NULL) + AND (n.type != ALL(@unsupported_types)) +) +SELECT COUNT(*) +FROM ( + SELECT 1 + FROM matched_notifications + GROUP BY type, group_id + LIMIT @limit +) unread_notifications; +` + ctx, cancel := context.WithTimeout(c.Context(), notificationReadTimeout) + defer cancel() + + var unreadCount int + err := app.pool.QueryRow(ctx, sql, pgx.NamedArgs{ + "user_id": app.getUserId(c), + "limit": notificationUnreadPollLimit, + "types": params.Types, + "unsupported_types": unsupportedNotificationTypes, + }).Scan(&unreadCount) + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + return fiber.NewError(fiber.StatusGatewayTimeout, "notifications unread query timed out") + } + return err + } + + return c.JSON(fiber.Map{ + "data": fiber.Map{ + "notifications": []notificationRow{}, + "unread_count": unreadCount, + }, + "related": fiber.Map{ + "users": []any{}, + "tracks": []any{}, + "playlists": []any{}, + }, + }) +} + func (app *ApiServer) v1Notifications(c *fiber.Ctx) error { + limitZeroPoll := c.Query("limit") == "0" + params := GetNotificationsQueryParams{} if err := app.ParseAndValidateQueryParams(c, ¶ms); err != nil { return err } + if limitZeroPoll { + return app.v1NotificationsUnreadPoll(c, params) + } + sql := ` -- user_seen is a window function that gets windows between seen events. -- @@ -68,6 +164,62 @@ WITH user_seen as ( ORDER BY seen_at desc LIMIT 10 +), +-- Equivalent to ARRAY[@user_id] && n.user_ids, split so single-recipient rows +-- can use notification_single_recipient_user_timestamp_idx while +-- multi-recipient arrays keep the existing overlap semantics. +matched_notifications AS ( + SELECT + n.id, + n.specifier, + n.group_id, + n.type, + n.timestamp, + n.data + FROM notification n + WHERE + array_length(n.user_ids, 1) = 1 + AND n.user_ids[1] = @user_id + AND (n.type = ANY(@types) OR @types IS NULL) + AND (n.type != ALL(@unsupported_types)) + AND ( + -- Initial load: bound to the last 90 days so heavy users don't fan out + -- over their entire notification history. Pagination (timestamp_offset > 0) + -- is unbounded so scrolling further back still works. + (@timestamp_offset = 0 AND @group_id_offset = '' AND n.timestamp > (now()::timestamp - interval '90 days')) OR + (@timestamp_offset = 0 AND @group_id_offset != '' AND n.group_id < @group_id_offset) OR + (@timestamp_offset > 0 AND n.timestamp < to_timestamp(@timestamp_offset)) OR + ( + @group_id_offset != '' AND @timestamp_offset > 0 AND + (n.timestamp = to_timestamp(@timestamp_offset) AND n.group_id < @group_id_offset) + ) + ) + UNION ALL + SELECT + n.id, + n.specifier, + n.group_id, + n.type, + n.timestamp, + n.data + FROM notification n + WHERE + COALESCE(array_length(n.user_ids, 1), 0) != 1 + AND ARRAY[@user_id] && n.user_ids + AND (n.type = ANY(@types) OR @types IS NULL) + AND (n.type != ALL(@unsupported_types)) + AND ( + -- Initial load: bound to the last 90 days so heavy users don't fan out + -- over their entire notification history. Pagination (timestamp_offset > 0) + -- is unbounded so scrolling further back still works. + (@timestamp_offset = 0 AND @group_id_offset = '' AND n.timestamp > (now()::timestamp - interval '90 days')) OR + (@timestamp_offset = 0 AND @group_id_offset != '' AND n.group_id < @group_id_offset) OR + (@timestamp_offset > 0 AND n.timestamp < to_timestamp(@timestamp_offset)) OR + ( + @group_id_offset != '' AND @timestamp_offset > 0 AND + (n.timestamp = to_timestamp(@timestamp_offset) AND n.group_id < @group_id_offset) + ) + ) ) SELECT n.type, @@ -103,7 +255,7 @@ SELECT ELSE null END AS seen_at FROM - notification n + matched_notifications n LEFT JOIN user_seen ON user_seen.seen_at >= n.timestamp AND user_seen.prev_seen_at < n.timestamp -- Join with tracks table to filter out deleted tracks for "create" notifications that have track_id @@ -125,12 +277,8 @@ LEFT JOIN track_collaborators tc ON tc.track_id = (n.data->>'track_id')::integer AND tc.collaborator_user_id = (n.data->>'collaborator_user_id')::integer WHERE - (ARRAY[@user_id] && n.user_ids) - AND (n.type = ANY(@types) OR @types IS NULL) - -- Ignore notification types not supported by frontend - AND (n.type != ALL(@unsupported_types)) -- Filter out notifications for deleted tracks (only for create notifications that have track_id) - AND ( + ( n.type != 'create' OR NOT (n.data ? 'track_id') OR (t.is_delete = false AND t.is_unlisted = false) @@ -204,18 +352,6 @@ WHERE ) ) ) - AND ( - -- Initial load: bound to the last 90 days so heavy users don't fan out - -- over their entire notification history. Pagination (timestamp_offset > 0) - -- is unbounded so scrolling further back still works. - (@timestamp_offset = 0 AND @group_id_offset = '' AND n.timestamp > (now()::timestamp - interval '90 days')) OR - (@timestamp_offset = 0 AND @group_id_offset != '' AND n.group_id < @group_id_offset) OR - (@timestamp_offset > 0 AND n.timestamp < to_timestamp(@timestamp_offset)) OR - ( - @group_id_offset != '' AND @timestamp_offset > 0 AND - (n.timestamp = to_timestamp(@timestamp_offset) AND n.group_id < @group_id_offset) - ) - ) GROUP BY n.type, n.group_id, user_seen.seen_at, user_seen.prev_seen_at, CASE @@ -234,15 +370,11 @@ limit @limit::int ; ` userId := app.getUserId(c) - type GetNotifsRow struct { - Type string `json:"type"` - GroupID string `json:"group_id"` - Actions []json.RawMessage `json:"actions"` - IsSeen bool `json:"is_seen"` - SeenAt interface{} `json:"seen_at"` - } - rows, err := app.pool.Query(c.Context(), sql, pgx.NamedArgs{ + ctx, cancel := context.WithTimeout(c.Context(), notificationReadTimeout) + defer cancel() + + rows, err := app.pool.Query(ctx, sql, pgx.NamedArgs{ "user_id": userId, "limit": params.Limit, "types": params.Types, @@ -251,11 +383,17 @@ limit @limit::int "unsupported_types": unsupportedNotificationTypes, }) if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + return fiber.NewError(fiber.StatusGatewayTimeout, "notifications query timed out") + } return err } - notifs, err := pgx.CollectRows(rows, pgx.RowToAddrOfStructByNameLax[GetNotifsRow]) + notifs, err := pgx.CollectRows(rows, pgx.RowToAddrOfStructByNameLax[notificationRow]) if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + return fiber.NewError(fiber.StatusGatewayTimeout, "notifications query timed out") + } return err } diff --git a/api/v1_notifications_test.go b/api/v1_notifications_test.go index 4ca6f196..2f419b32 100644 --- a/api/v1_notifications_test.go +++ b/api/v1_notifications_test.go @@ -74,6 +74,84 @@ func TestV1Notifications(t *testing.T) { }) } +func TestV1Notifications_LimitZeroReturnsUnreadOnly(t *testing.T) { + app := emptyTestApp(t) + + fixtures := database.FixtureMap{ + "notification": []map[string]any{ + { + "id": 1, + "specifier": "111", + "group_id": "tip_send:user_id:111:signature:eee", + "type": "tip_send", + "user_ids": []int{1}, + "data": []byte(`{"amount": 100000000, "tx_signature": "asdf", "sender_user_id": 111, "receiver_user_id": 222}`), + }, + }, + } + + database.Seed(app.pool.Replicas[0], fixtures) + + status, body := testGet(t, app, "/v1/notifications/"+trashid.MustEncodeHashID(1)+"?limit=0") + assert.Equal(t, 200, status) + + jsonAssert(t, body, map[string]any{ + "data.notifications.#": 0, + "data.unread_count": 1, + "related.users.#": 0, + "related.tracks.#": 0, + "related.playlists.#": 0, + }) +} + +func TestV1Notifications_ReturnsMultiRecipientRows(t *testing.T) { + app := emptyTestApp(t) + + fixtures := database.FixtureMap{ + "notification": []map[string]any{ + { + "id": 1, + "specifier": "single", + "group_id": "single:1", + "type": "milestone", + "user_ids": []int{1}, + "timestamp": time.Now().Add(-1 * time.Minute), + "data": []byte(`{"type": "TRACK_REPOST_COUNT", "threshold": 10, "track_id": 101}`), + }, + { + "id": 2, + "specifier": "multi", + "group_id": "multi:1", + "type": "milestone", + "user_ids": []int{1, 2}, + "timestamp": time.Now(), + "data": []byte(`{"type": "TRACK_SAVE_COUNT", "threshold": 10, "track_id": 102}`), + }, + }, + } + + database.Seed(app.pool.Replicas[0], fixtures) + + status, body := testGet(t, app, "/v1/notifications/"+trashid.MustEncodeHashID(1)) + assert.Equal(t, 200, status) + + jsonAssert(t, body, map[string]any{ + "data.notifications.#": 2, + "data.notifications.0.type": "milestone", + "data.notifications.0.group_id": "multi:1", + "data.notifications.1.type": "milestone", + "data.notifications.1.group_id": "single:1", + }) + + status, body = testGet(t, app, "/v1/notifications/"+trashid.MustEncodeHashID(1)+"?limit=0") + assert.Equal(t, 200, status) + + jsonAssert(t, body, map[string]any{ + "data.notifications.#": 0, + "data.unread_count": 2, + }) +} + func TestV1Notifications_NotDeletedTrack(t *testing.T) { app := emptyTestApp(t) diff --git a/ddl/migrations/0227_notification_single_recipient_user_timestamp_idx.sql b/ddl/migrations/0227_notification_single_recipient_user_timestamp_idx.sql new file mode 100644 index 00000000..58762846 --- /dev/null +++ b/ddl/migrations/0227_notification_single_recipient_user_timestamp_idx.sql @@ -0,0 +1,16 @@ +-- Production stats show notification rows are overwhelmingly single-recipient: +-- user_ids usually contains exactly one recipient. Querying those common rows +-- through the broad GIN array index is expensive for high-fanout users, and the +-- planner can fall back to full table scans. Give the API a btree path keyed by +-- recipient and recency. +-- +-- NOTE: intentionally NOT wrapped in BEGIN/COMMIT so CREATE INDEX +-- CONCURRENTLY can run without holding an ACCESS EXCLUSIVE lock on notification. +-- IF NOT EXISTS makes the migration idempotent. + +CREATE INDEX CONCURRENTLY IF NOT EXISTS notification_single_recipient_user_timestamp_idx + ON public.notification ((user_ids[1]), "timestamp" DESC, group_id DESC, type) + WHERE array_length(user_ids, 1) = 1; + +COMMENT ON INDEX public.notification_single_recipient_user_timestamp_idx IS + 'Covers notification reads for the common single-recipient user_ids array path.';