Hey! Thanks for the package π
We have a scheduled job that takes many minutes to run. In production we noticed it was getting executed twice, concurrently, from a single enqueue, which caused duplicate side effects on our end. We traced it to the stalled-job recovery path: there doesn't seem to be any lock renewal / heartbeat for in-flight jobs, so stalledThreshold effectively becomes a hard cap on how long a job is allowed to run before it's re-delivered.
What's happening
When a job is claimed, ACQUIRE_JOB_SCRIPT (src/drivers/redis_adapter.ts) records it in the active hash with a single timestamp:
redis.call('HSET', active_key, job_id, cjson.encode({
workerId = worker_id,
acquiredAt = now,
}))
RECOVER_STALLED_JOBS_SCRIPT later decides a job is stalled purely from that timestamp:
if active.acquiredAt < (now - stalled_threshold) then
-- remove from active, then (until maxStalledCount) re-queue to pending
end
But nothing ever updates acquiredAt between claim and completeJob / failJob. The worker runs handlers as non-awaited promises in the pool (#fillPool β #execute), so a long-running handler never refreshes its own entry. With the defaults (stalledThreshold: 30s, stalledInterval: 30s, maxStalledCount: 1) and concurrency > 1, any job still running ~stalledThreshold after it started is treated as stalled, re-queued, and picked up by a free slot β a second concurrent execution of the same job id.
Minimal repro
One job, one enqueue, concurrency: 2. The handler runs longer than stalledThreshold:
class SlowJob extends Job {
async execute() {
console.log('START', this.context.jobId, Date.now())
await new Promise((r) => setTimeout(r, 90_000)) // > stalledThreshold (default 30s)
console.log('END ', this.context.jobId, Date.now())
}
}
Locator.register('SlowJob', SlowJob)
await QueueManager.init({
default: 'redis',
adapters: { redis: redis({ /* host, port, β¦ */ }) },
worker: { concurrency: 2 }, // stalled* left at defaults
})
const worker = new Worker(/* same config */)
worker.start(['default'])
await SlowJob.dispatch({}).run() // dispatch exactly ONE
Observed at the library defaults (stalledThreshold: 30s) with a 90s handler β note the same jobId runs twice:
[ 2.0s] START 145d701e-β¦ (first execution)
[60.1s] START 145d701e-β¦ (re-delivered as "stalled" β stalledThreshold + stalledInterval later)
[92.0s] END 145d701e-β¦
[150.1s] END 145d701e-β¦ β single enqueue executed twice, concurrently
| Scenario |
Executions per single enqueue |
stalledThreshold 30s (default), handler runs 90s |
2 (2nd starts ~58s after the 1st) |
stalledThreshold raised above the handler's runtime |
1 |
(Lowering stalledThreshold/stalledInterval reproduces it proportionally faster β e.g. 5s/5s with a 15s handler gives the 2nd start ~10s after the 1st. The gap is always β stalledThreshold + stalledInterval.)
The workaround β setting stalledThreshold greater than the longest job's runtime β does stop it, but it trades away fast crash recovery, which is why a proper heartbeat feels like the right fix.
Why this can't be fixed by renewing inside the worker loop
Renewal can't piggyback on the process() loop: at full concurrency the loop blocks on pool.waitForNextCompletion() with no idle tick (the setTimeout(idleDelay) branch only runs when there's spare capacity). So exactly when you have several long jobs in flight, the loop isn't cycling β renewal needs to be a dedicated timer.
Proposed fix
Add a heartbeat that periodically renews the in-flight jobs' timestamps while their handlers run:
- a small
renewJobs(queue, jobIds) adapter method β a Lua HSET that only updates entries still present in the active hash (so a job that was already recovered isn't resurrected);
- a worker-side
setInterval (~stalledThreshold / 2) that renews the job ids currently in the pool;
- net effect: "stalled" means the worker died again, so
stalledThreshold can stay small without re-delivering healthy long-running jobs.
The knex_adapter recovery path looks like it has the same shape, so the fix would want to cover both adapters.
Happy to open a PR with the Lua script + worker timer + a test (concurrency 2, handler longer than stalledThreshold, assert it runs exactly once) if you're open to the approach.
Hey! Thanks for the package π
We have a scheduled job that takes many minutes to run. In production we noticed it was getting executed twice, concurrently, from a single enqueue, which caused duplicate side effects on our end. We traced it to the stalled-job recovery path: there doesn't seem to be any lock renewal / heartbeat for in-flight jobs, so
stalledThresholdeffectively becomes a hard cap on how long a job is allowed to run before it's re-delivered.What's happening
When a job is claimed,
ACQUIRE_JOB_SCRIPT(src/drivers/redis_adapter.ts) records it in the active hash with a single timestamp:RECOVER_STALLED_JOBS_SCRIPTlater decides a job is stalled purely from that timestamp:But nothing ever updates
acquiredAtbetween claim andcompleteJob/failJob. The worker runs handlers as non-awaited promises in the pool (#fillPoolβ#execute), so a long-running handler never refreshes its own entry. With the defaults (stalledThreshold: 30s,stalledInterval: 30s,maxStalledCount: 1) andconcurrency > 1, any job still running ~stalledThresholdafter it started is treated as stalled, re-queued, and picked up by a free slot β a second concurrent execution of the same job id.Minimal repro
One job, one enqueue,
concurrency: 2. The handler runs longer thanstalledThreshold:Observed at the library defaults (
stalledThreshold: 30s) with a 90s handler β note the samejobIdruns twice:stalledThreshold30s (default), handler runs 90sstalledThresholdraised above the handler's runtime(Lowering
stalledThreshold/stalledIntervalreproduces it proportionally faster β e.g.5s/5swith a 15s handler gives the 2nd start ~10s after the 1st. The gap is always βstalledThreshold + stalledInterval.)The workaround β setting
stalledThresholdgreater than the longest job's runtime β does stop it, but it trades away fast crash recovery, which is why a proper heartbeat feels like the right fix.Why this can't be fixed by renewing inside the worker loop
Renewal can't piggyback on the
process()loop: at fullconcurrencythe loop blocks onpool.waitForNextCompletion()with no idle tick (thesetTimeout(idleDelay)branch only runs when there's spare capacity). So exactly when you have several long jobs in flight, the loop isn't cycling β renewal needs to be a dedicated timer.Proposed fix
Add a heartbeat that periodically renews the in-flight jobs' timestamps while their handlers run:
renewJobs(queue, jobIds)adapter method β a LuaHSETthat only updates entries still present in the active hash (so a job that was already recovered isn't resurrected);setInterval(~stalledThreshold / 2) that renews the job ids currently in the pool;stalledThresholdcan stay small without re-delivering healthy long-running jobs.The
knex_adapterrecovery path looks like it has the same shape, so the fix would want to cover both adapters.Happy to open a PR with the Lua script + worker timer + a test (concurrency 2, handler longer than
stalledThreshold, assert it runs exactly once) if you're open to the approach.