Skip to content

feat: renew in-flight job timestamps via a worker heartbeat#17

Open
mattstrayer wants to merge 1 commit into
boringnode:mainfrom
mattstrayer:feat/renew-jobs-heartbeat
Open

feat: renew in-flight job timestamps via a worker heartbeat#17
mattstrayer wants to merge 1 commit into
boringnode:mainfrom
mattstrayer:feat/renew-jobs-heartbeat

Conversation

@mattstrayer

Copy link
Copy Markdown

Closes #16.

Companion PR for the long-running-job double-execution issue. As described there, when a handler runs longer than stalledThreshold, nothing ever refreshes acquiredAt between claim and completion, so the stalled-recovery path re-delivers the job to a free slot and it runs a second time, concurrently. stalledThreshold effectively becomes a hard cap on how long a job may run rather than a crash-detection window.

What this does

Adds a heartbeat that periodically renews the acquired timestamp of the jobs a worker is actively processing.

  • renewJobs(queue, jobIds) on the Adapter contract, implemented for every backend:
    • Redis — new RENEW_JOBS_SCRIPT Lua that HSETs acquiredAt on the active hash, preserving workerId.
    • KnexUPDATE ... WHERE status = 'active' AND id IN (...).
    • Fake / Sync adapters (and the in-memory test mock).
    • Only entries still active are renewed, so a job that was already recovered or finalized is never resurrected by a late heartbeat.
  • A dedicated worker setInterval (~stalledThreshold / 2) that renews the in-flight job ids. It has to be a separate timer rather than piggybacking on the process loop: at full concurrency the loop blocks on waitForNextCompletion() with no idle tick, so the loop is not cycling exactly when long jobs are in flight.
  • The heartbeat is cleared in stop() (after draining, so jobs still finishing keep being renewed until they complete) as well as in the process() generator's finally, giving deterministic cleanup whether the worker is driven via start() or processCycle(). #startHeartbeat is idempotent so the timer can't leak if the loop is re-entered.

Net effect: "stalled" once again means the worker actually died, so stalledThreshold can stay small without re-delivering healthy long-running jobs.

Tests

  • renewJobs behavior across all adapters via the shared driver suite: renewal keeps an active job from being recovered, never resurrects an already-recovered job, is queue-scoped, and is a no-op for an empty id list.
  • Worker-level: a long-running job at full capacity is kept alive by the heartbeat and executes exactly once (verified against a negative control where the double-execution reproduces without renewal), and the heartbeat stops firing once the worker is stopped.

All 647 tests pass locally across Memory, Redis, Knex (SQLite) and Knex (PostgreSQL); tsc, oxlint and oxfmt are clean.

Long-running jobs could be executed twice from a single enqueue. When a
handler runs longer than `stalledThreshold`, the stalled-recovery path
re-delivers it to a free slot because nothing ever refreshes `acquiredAt`
between claim and completion — so the threshold acts as a hard cap on job
runtime rather than a crash-detection window.

Add a heartbeat that periodically renews the acquired timestamp of jobs
currently in the pool:

- `renewJobs(queue, jobIds)` on the Adapter contract, implemented for the
  Redis (Lua `HSET` over the active hash), Knex (UPDATE ... WHERE status =
  'active'), Fake and Sync adapters. Only entries still active are renewed,
  so a job that was already recovered or finalized is never resurrected by
  a late heartbeat.
- A dedicated worker `setInterval` (~`stalledThreshold / 2`) that renews the
  in-flight job ids. It must be a separate timer: at full concurrency the
  process loop blocks on `waitForNextCompletion()` with no idle tick, so the
  loop is not cycling exactly when long jobs are in flight.
- The heartbeat is cleared in `stop()` (after draining, so jobs that are
  still finishing keep being renewed until they complete) as well as in the
  process() generator's `finally`, guaranteeing deterministic cleanup whether
  the worker was driven via start() or processCycle(). `#startHeartbeat` is
  idempotent so the timer can never leak if the loop is re-entered.

"Stalled" now means the worker actually died again, so `stalledThreshold`
can stay small without re-delivering healthy long-running jobs.

Tests cover renewJobs across all adapters (renew keeps an active job from
recovery, never resurrects an already-recovered job, is queue-scoped) and
two worker-level tests: a long-running job at full capacity is renewed by
the heartbeat and executes exactly once, and the heartbeat stops firing once
the worker is stopped.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Long-running jobs get executed twice — acquiredAt is never renewed (no heartbeat)

1 participant