Skip to content

Long-running jobs get executed twice β€” acquiredAt is never renewed (no heartbeat)Β #16

Description

@mattstrayer

Hey! Thanks for the package πŸ‘‹

We have a scheduled job that takes many minutes to run. In production we noticed it was getting executed twice, concurrently, from a single enqueue, which caused duplicate side effects on our end. We traced it to the stalled-job recovery path: there doesn't seem to be any lock renewal / heartbeat for in-flight jobs, so stalledThreshold effectively becomes a hard cap on how long a job is allowed to run before it's re-delivered.

What's happening

When a job is claimed, ACQUIRE_JOB_SCRIPT (src/drivers/redis_adapter.ts) records it in the active hash with a single timestamp:

redis.call('HSET', active_key, job_id, cjson.encode({
  workerId = worker_id,
  acquiredAt = now,
}))

RECOVER_STALLED_JOBS_SCRIPT later decides a job is stalled purely from that timestamp:

if active.acquiredAt < (now - stalled_threshold) then
  -- remove from active, then (until maxStalledCount) re-queue to pending
end

But nothing ever updates acquiredAt between claim and completeJob / failJob. The worker runs handlers as non-awaited promises in the pool (#fillPool β†’ #execute), so a long-running handler never refreshes its own entry. With the defaults (stalledThreshold: 30s, stalledInterval: 30s, maxStalledCount: 1) and concurrency > 1, any job still running ~stalledThreshold after it started is treated as stalled, re-queued, and picked up by a free slot β†’ a second concurrent execution of the same job id.

Minimal repro

One job, one enqueue, concurrency: 2. The handler runs longer than stalledThreshold:

class SlowJob extends Job {
  async execute() {
    console.log('START', this.context.jobId, Date.now())
    await new Promise((r) => setTimeout(r, 90_000)) // > stalledThreshold (default 30s)
    console.log('END  ', this.context.jobId, Date.now())
  }
}
Locator.register('SlowJob', SlowJob)

await QueueManager.init({
  default: 'redis',
  adapters: { redis: redis({ /* host, port, … */ }) },
  worker: { concurrency: 2 }, // stalled* left at defaults
})
const worker = new Worker(/* same config */)
worker.start(['default'])
await SlowJob.dispatch({}).run() // dispatch exactly ONE

Observed at the library defaults (stalledThreshold: 30s) with a 90s handler β€” note the same jobId runs twice:

[ 2.0s] START  145d701e-…  (first execution)
[60.1s] START  145d701e-…  (re-delivered as "stalled" β‰ˆ stalledThreshold + stalledInterval later)
[92.0s] END    145d701e-…
[150.1s] END   145d701e-…   β†’ single enqueue executed twice, concurrently
Scenario Executions per single enqueue
stalledThreshold 30s (default), handler runs 90s 2 (2nd starts ~58s after the 1st)
stalledThreshold raised above the handler's runtime 1

(Lowering stalledThreshold/stalledInterval reproduces it proportionally faster β€” e.g. 5s/5s with a 15s handler gives the 2nd start ~10s after the 1st. The gap is always β‰ˆ stalledThreshold + stalledInterval.)

The workaround β€” setting stalledThreshold greater than the longest job's runtime β€” does stop it, but it trades away fast crash recovery, which is why a proper heartbeat feels like the right fix.

Why this can't be fixed by renewing inside the worker loop

Renewal can't piggyback on the process() loop: at full concurrency the loop blocks on pool.waitForNextCompletion() with no idle tick (the setTimeout(idleDelay) branch only runs when there's spare capacity). So exactly when you have several long jobs in flight, the loop isn't cycling β€” renewal needs to be a dedicated timer.

Proposed fix

Add a heartbeat that periodically renews the in-flight jobs' timestamps while their handlers run:

  • a small renewJobs(queue, jobIds) adapter method β€” a Lua HSET that only updates entries still present in the active hash (so a job that was already recovered isn't resurrected);
  • a worker-side setInterval (~stalledThreshold / 2) that renews the job ids currently in the pool;
  • net effect: "stalled" means the worker died again, so stalledThreshold can stay small without re-delivering healthy long-running jobs.

The knex_adapter recovery path looks like it has the same shape, so the fix would want to cover both adapters.

Happy to open a PR with the Lua script + worker timer + a test (concurrency 2, handler longer than stalledThreshold, assert it runs exactly once) if you're open to the approach.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions