feat: renew in-flight job timestamps via a worker heartbeat#17
Open
mattstrayer wants to merge 1 commit into
Open
feat: renew in-flight job timestamps via a worker heartbeat#17mattstrayer wants to merge 1 commit into
mattstrayer wants to merge 1 commit into
Conversation
Long-running jobs could be executed twice from a single enqueue. When a handler runs longer than `stalledThreshold`, the stalled-recovery path re-delivers it to a free slot because nothing ever refreshes `acquiredAt` between claim and completion — so the threshold acts as a hard cap on job runtime rather than a crash-detection window. Add a heartbeat that periodically renews the acquired timestamp of jobs currently in the pool: - `renewJobs(queue, jobIds)` on the Adapter contract, implemented for the Redis (Lua `HSET` over the active hash), Knex (UPDATE ... WHERE status = 'active'), Fake and Sync adapters. Only entries still active are renewed, so a job that was already recovered or finalized is never resurrected by a late heartbeat. - A dedicated worker `setInterval` (~`stalledThreshold / 2`) that renews the in-flight job ids. It must be a separate timer: at full concurrency the process loop blocks on `waitForNextCompletion()` with no idle tick, so the loop is not cycling exactly when long jobs are in flight. - The heartbeat is cleared in `stop()` (after draining, so jobs that are still finishing keep being renewed until they complete) as well as in the process() generator's `finally`, guaranteeing deterministic cleanup whether the worker was driven via start() or processCycle(). `#startHeartbeat` is idempotent so the timer can never leak if the loop is re-entered. "Stalled" now means the worker actually died again, so `stalledThreshold` can stay small without re-delivering healthy long-running jobs. Tests cover renewJobs across all adapters (renew keeps an active job from recovery, never resurrects an already-recovered job, is queue-scoped) and two worker-level tests: a long-running job at full capacity is renewed by the heartbeat and executes exactly once, and the heartbeat stops firing once the worker is stopped.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Closes #16.
Companion PR for the long-running-job double-execution issue. As described there, when a handler runs longer than
stalledThreshold, nothing ever refreshesacquiredAtbetween claim and completion, so the stalled-recovery path re-delivers the job to a free slot and it runs a second time, concurrently.stalledThresholdeffectively becomes a hard cap on how long a job may run rather than a crash-detection window.What this does
Adds a heartbeat that periodically renews the acquired timestamp of the jobs a worker is actively processing.
renewJobs(queue, jobIds)on theAdaptercontract, implemented for every backend:RENEW_JOBS_SCRIPTLua thatHSETsacquiredAton the active hash, preservingworkerId.UPDATE ... WHERE status = 'active' AND id IN (...).setInterval(~stalledThreshold / 2) that renews the in-flight job ids. It has to be a separate timer rather than piggybacking on the process loop: at full concurrency the loop blocks onwaitForNextCompletion()with no idle tick, so the loop is not cycling exactly when long jobs are in flight.stop()(after draining, so jobs still finishing keep being renewed until they complete) as well as in theprocess()generator'sfinally, giving deterministic cleanup whether the worker is driven viastart()orprocessCycle().#startHeartbeatis idempotent so the timer can't leak if the loop is re-entered.Net effect: "stalled" once again means the worker actually died, so
stalledThresholdcan stay small without re-delivering healthy long-running jobs.Tests
renewJobsbehavior across all adapters via the shared driver suite: renewal keeps an active job from being recovered, never resurrects an already-recovered job, is queue-scoped, and is a no-op for an empty id list.All 647 tests pass locally across Memory, Redis, Knex (SQLite) and Knex (PostgreSQL);
tsc, oxlint and oxfmt are clean.