Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 166 additions & 28 deletions api/v1_notifications.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package api

import (
"context"
"encoding/json"
"errors"
"slices"
"strings"
"time"

"api.audius.co/api/dbv1"
"api.audius.co/trashid"
Expand All @@ -21,14 +24,32 @@ import (
// surfaces every target — only the actor list is bounded by this cap.
const notificationRelatedActorsPerGroup = 1

const (
// Keep notification reads well below the production replica's
// max_standby_streaming_delay so pathological reads fail fast instead of
// holding hot-standby snapshots and making the replica fall behind.
notificationReadTimeout = 8 * time.Second

// Historically `limit=0` fell back to the default limit of 20, so unread
// polling counted at most the first page of notification groups.
notificationUnreadPollLimit = 20
)

type GetNotificationsQueryParams struct {
// Note that when limit is 0, we return 20 items to calculate unread count
Limit int `query:"limit" default:"20" validate:"min=0,max=100"`
Types []string `query:"types" validate:"dive,oneof=announcement follow repost save remix cosign create tip_receive tip_send challenge_reward repost_of_repost save_of_repost tastemaker reaction supporter_dethroned supporter_rank_up supporting_rank_up milestone track_added_to_playlist tier_change trending trending_playlist trending_underground usdc_purchase_buyer usdc_purchase_seller track_added_to_purchased_album request_manager approve_manager_request track_collaborator_invite track_collaborator_accept claimable_reward comment comment_thread comment_mention comment_reaction listen_streak_reminder fan_remix_contest_started fan_remix_contest_ended fan_remix_contest_ending_soon fan_remix_contest_winners_selected fan_remix_contest_submission artist_remix_contest_ended artist_remix_contest_ending_soon artist_remix_contest_submissions fan_club_text_post remix_contest_update"`
GroupID string `query:"group_id" validate:"omitempty"`
Timestamp float64 `query:"timestamp" validate:"omitempty,min=0"`
}

type notificationRow struct {
Type string `json:"type"`
GroupID string `json:"group_id"`
Actions []json.RawMessage `json:"actions"`
IsSeen bool `json:"is_seen"`
SeenAt interface{} `json:"seen_at"`
}

var unsupportedNotificationTypes = []string{
// No frontend support
"usdc_transfer",
Expand All @@ -43,12 +64,87 @@ var unsupportedNotificationTypes = []string{
"remix",
}

func (app *ApiServer) v1NotificationsUnreadPoll(c *fiber.Ctx, params GetNotificationsQueryParams) error {
sql := `
WITH latest_seen AS (
SELECT MAX(seen_at) AS seen_at
FROM notification_seen
WHERE user_id = @user_id
),
-- Equivalent to ARRAY[@user_id] && n.user_ids, split so single-recipient rows
-- can use notification_single_recipient_user_timestamp_idx while
-- multi-recipient arrays keep the existing overlap semantics.
matched_notifications AS (
SELECT n.type, n.group_id
FROM notification n
CROSS JOIN latest_seen
WHERE array_length(n.user_ids, 1) = 1
AND n.user_ids[1] = @user_id
AND n.timestamp > (now()::timestamp - interval '90 days')
AND n.timestamp > COALESCE(latest_seen.seen_at, '-infinity'::timestamp)
AND (n.type = ANY(@types) OR @types IS NULL)
AND (n.type != ALL(@unsupported_types))
UNION ALL
SELECT n.type, n.group_id
FROM notification n
CROSS JOIN latest_seen
WHERE COALESCE(array_length(n.user_ids, 1), 0) != 1
AND ARRAY[@user_id] && n.user_ids
AND n.timestamp > (now()::timestamp - interval '90 days')
AND n.timestamp > COALESCE(latest_seen.seen_at, '-infinity'::timestamp)
AND (n.type = ANY(@types) OR @types IS NULL)
AND (n.type != ALL(@unsupported_types))
)
SELECT COUNT(*)
FROM (
SELECT 1
FROM matched_notifications
GROUP BY type, group_id
LIMIT @limit
) unread_notifications;
`
ctx, cancel := context.WithTimeout(c.Context(), notificationReadTimeout)
defer cancel()

var unreadCount int
err := app.pool.QueryRow(ctx, sql, pgx.NamedArgs{
"user_id": app.getUserId(c),
"limit": notificationUnreadPollLimit,
"types": params.Types,
"unsupported_types": unsupportedNotificationTypes,
}).Scan(&unreadCount)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
return fiber.NewError(fiber.StatusGatewayTimeout, "notifications unread query timed out")
}
return err
}

return c.JSON(fiber.Map{
"data": fiber.Map{
"notifications": []notificationRow{},
"unread_count": unreadCount,
},
"related": fiber.Map{
"users": []any{},
"tracks": []any{},
"playlists": []any{},
},
})
}

func (app *ApiServer) v1Notifications(c *fiber.Ctx) error {
limitZeroPoll := c.Query("limit") == "0"

params := GetNotificationsQueryParams{}
if err := app.ParseAndValidateQueryParams(c, &params); err != nil {
return err
}

if limitZeroPoll {
return app.v1NotificationsUnreadPoll(c, params)
}

sql := `
-- user_seen is a window function that gets windows between seen events.
--
Expand All @@ -68,6 +164,62 @@ WITH user_seen as (
ORDER BY
seen_at desc
LIMIT 10
),
-- Equivalent to ARRAY[@user_id] && n.user_ids, split so single-recipient rows
-- can use notification_single_recipient_user_timestamp_idx while
-- multi-recipient arrays keep the existing overlap semantics.
matched_notifications AS (
SELECT
n.id,
n.specifier,
n.group_id,
n.type,
n.timestamp,
n.data
FROM notification n
WHERE
array_length(n.user_ids, 1) = 1
AND n.user_ids[1] = @user_id
AND (n.type = ANY(@types) OR @types IS NULL)
AND (n.type != ALL(@unsupported_types))
AND (
-- Initial load: bound to the last 90 days so heavy users don't fan out
-- over their entire notification history. Pagination (timestamp_offset > 0)
-- is unbounded so scrolling further back still works.
(@timestamp_offset = 0 AND @group_id_offset = '' AND n.timestamp > (now()::timestamp - interval '90 days')) OR
(@timestamp_offset = 0 AND @group_id_offset != '' AND n.group_id < @group_id_offset) OR
(@timestamp_offset > 0 AND n.timestamp < to_timestamp(@timestamp_offset)) OR
(
@group_id_offset != '' AND @timestamp_offset > 0 AND
(n.timestamp = to_timestamp(@timestamp_offset) AND n.group_id < @group_id_offset)
)
)
UNION ALL
SELECT
n.id,
n.specifier,
n.group_id,
n.type,
n.timestamp,
n.data
FROM notification n
WHERE
COALESCE(array_length(n.user_ids, 1), 0) != 1
AND ARRAY[@user_id] && n.user_ids
AND (n.type = ANY(@types) OR @types IS NULL)
AND (n.type != ALL(@unsupported_types))
AND (
-- Initial load: bound to the last 90 days so heavy users don't fan out
-- over their entire notification history. Pagination (timestamp_offset > 0)
-- is unbounded so scrolling further back still works.
(@timestamp_offset = 0 AND @group_id_offset = '' AND n.timestamp > (now()::timestamp - interval '90 days')) OR
(@timestamp_offset = 0 AND @group_id_offset != '' AND n.group_id < @group_id_offset) OR
(@timestamp_offset > 0 AND n.timestamp < to_timestamp(@timestamp_offset)) OR
(
@group_id_offset != '' AND @timestamp_offset > 0 AND
(n.timestamp = to_timestamp(@timestamp_offset) AND n.group_id < @group_id_offset)
)
)
)
SELECT
n.type,
Expand Down Expand Up @@ -103,7 +255,7 @@ SELECT
ELSE null
END AS seen_at
FROM
notification n
matched_notifications n
LEFT JOIN user_seen ON
user_seen.seen_at >= n.timestamp AND user_seen.prev_seen_at < n.timestamp
-- Join with tracks table to filter out deleted tracks for "create" notifications that have track_id
Expand All @@ -125,12 +277,8 @@ LEFT JOIN track_collaborators tc ON
tc.track_id = (n.data->>'track_id')::integer AND
tc.collaborator_user_id = (n.data->>'collaborator_user_id')::integer
WHERE
(ARRAY[@user_id] && n.user_ids)
AND (n.type = ANY(@types) OR @types IS NULL)
-- Ignore notification types not supported by frontend
AND (n.type != ALL(@unsupported_types))
-- Filter out notifications for deleted tracks (only for create notifications that have track_id)
AND (
(
n.type != 'create'
OR NOT (n.data ? 'track_id')
OR (t.is_delete = false AND t.is_unlisted = false)
Expand Down Expand Up @@ -204,18 +352,6 @@ WHERE
)
)
)
AND (
-- Initial load: bound to the last 90 days so heavy users don't fan out
-- over their entire notification history. Pagination (timestamp_offset > 0)
-- is unbounded so scrolling further back still works.
(@timestamp_offset = 0 AND @group_id_offset = '' AND n.timestamp > (now()::timestamp - interval '90 days')) OR
(@timestamp_offset = 0 AND @group_id_offset != '' AND n.group_id < @group_id_offset) OR
(@timestamp_offset > 0 AND n.timestamp < to_timestamp(@timestamp_offset)) OR
(
@group_id_offset != '' AND @timestamp_offset > 0 AND
(n.timestamp = to_timestamp(@timestamp_offset) AND n.group_id < @group_id_offset)
)
)
GROUP BY
n.type, n.group_id, user_seen.seen_at, user_seen.prev_seen_at,
CASE
Expand All @@ -234,15 +370,11 @@ limit @limit::int
;
`
userId := app.getUserId(c)
type GetNotifsRow struct {
Type string `json:"type"`
GroupID string `json:"group_id"`
Actions []json.RawMessage `json:"actions"`
IsSeen bool `json:"is_seen"`
SeenAt interface{} `json:"seen_at"`
}

rows, err := app.pool.Query(c.Context(), sql, pgx.NamedArgs{
ctx, cancel := context.WithTimeout(c.Context(), notificationReadTimeout)
defer cancel()

rows, err := app.pool.Query(ctx, sql, pgx.NamedArgs{
"user_id": userId,
"limit": params.Limit,
"types": params.Types,
Expand All @@ -251,11 +383,17 @@ limit @limit::int
"unsupported_types": unsupportedNotificationTypes,
})
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
return fiber.NewError(fiber.StatusGatewayTimeout, "notifications query timed out")
}
return err
}

notifs, err := pgx.CollectRows(rows, pgx.RowToAddrOfStructByNameLax[GetNotifsRow])
notifs, err := pgx.CollectRows(rows, pgx.RowToAddrOfStructByNameLax[notificationRow])
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
return fiber.NewError(fiber.StatusGatewayTimeout, "notifications query timed out")
}
return err
}

Expand Down
78 changes: 78 additions & 0 deletions api/v1_notifications_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,84 @@ func TestV1Notifications(t *testing.T) {
})
}

func TestV1Notifications_LimitZeroReturnsUnreadOnly(t *testing.T) {
app := emptyTestApp(t)

fixtures := database.FixtureMap{
"notification": []map[string]any{
{
"id": 1,
"specifier": "111",
"group_id": "tip_send:user_id:111:signature:eee",
"type": "tip_send",
"user_ids": []int{1},
"data": []byte(`{"amount": 100000000, "tx_signature": "asdf", "sender_user_id": 111, "receiver_user_id": 222}`),
},
},
}

database.Seed(app.pool.Replicas[0], fixtures)

status, body := testGet(t, app, "/v1/notifications/"+trashid.MustEncodeHashID(1)+"?limit=0")
assert.Equal(t, 200, status)

jsonAssert(t, body, map[string]any{
"data.notifications.#": 0,
"data.unread_count": 1,
"related.users.#": 0,
"related.tracks.#": 0,
"related.playlists.#": 0,
})
}

func TestV1Notifications_ReturnsMultiRecipientRows(t *testing.T) {
app := emptyTestApp(t)

fixtures := database.FixtureMap{
"notification": []map[string]any{
{
"id": 1,
"specifier": "single",
"group_id": "single:1",
"type": "milestone",
"user_ids": []int{1},
"timestamp": time.Now().Add(-1 * time.Minute),
"data": []byte(`{"type": "TRACK_REPOST_COUNT", "threshold": 10, "track_id": 101}`),
},
{
"id": 2,
"specifier": "multi",
"group_id": "multi:1",
"type": "milestone",
"user_ids": []int{1, 2},
"timestamp": time.Now(),
"data": []byte(`{"type": "TRACK_SAVE_COUNT", "threshold": 10, "track_id": 102}`),
},
},
}

database.Seed(app.pool.Replicas[0], fixtures)

status, body := testGet(t, app, "/v1/notifications/"+trashid.MustEncodeHashID(1))
assert.Equal(t, 200, status)

jsonAssert(t, body, map[string]any{
"data.notifications.#": 2,
"data.notifications.0.type": "milestone",
"data.notifications.0.group_id": "multi:1",
"data.notifications.1.type": "milestone",
"data.notifications.1.group_id": "single:1",
})

status, body = testGet(t, app, "/v1/notifications/"+trashid.MustEncodeHashID(1)+"?limit=0")
assert.Equal(t, 200, status)

jsonAssert(t, body, map[string]any{
"data.notifications.#": 0,
"data.unread_count": 2,
})
}

func TestV1Notifications_NotDeletedTrack(t *testing.T) {
app := emptyTestApp(t)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
-- Production stats show notification rows are overwhelmingly single-recipient:
-- user_ids usually contains exactly one recipient. Querying those common rows
-- through the broad GIN array index is expensive for high-fanout users, and the
-- planner can fall back to full table scans. Give the API a btree path keyed by
-- recipient and recency.
--
-- NOTE: intentionally NOT wrapped in BEGIN/COMMIT so CREATE INDEX
-- CONCURRENTLY can run without holding an ACCESS EXCLUSIVE lock on notification.
-- IF NOT EXISTS makes the migration idempotent.

CREATE INDEX CONCURRENTLY IF NOT EXISTS notification_single_recipient_user_timestamp_idx
ON public.notification ((user_ids[1]), "timestamp" DESC, group_id DESC, type)
WHERE array_length(user_ids, 1) = 1;

COMMENT ON INDEX public.notification_single_recipient_user_timestamp_idx IS
'Covers notification reads for the common single-recipient user_ids array path.';
Loading