diff --git a/src/contracts/adapter.ts b/src/contracts/adapter.ts index 8bd8e12..c0ae4d5 100644 --- a/src/contracts/adapter.ts +++ b/src/contracts/adapter.ts @@ -90,6 +90,21 @@ export interface Adapter { maxStalledCount: number ): Promise + /** + * Renew the acquired timestamp of in-flight jobs (heartbeat). + * + * A worker calls this periodically for the jobs it is actively processing + * so that long-running handlers are not mistaken for stalled jobs and + * re-delivered while they are still running. Only jobs that are still + * active are renewed; jobs that have already been recovered/completed are + * skipped (so a job is never resurrected by a late heartbeat). + * + * @param queue - The queue the jobs belong to + * @param jobIds - The ids of the jobs currently being processed + * @returns Number of jobs whose timestamp was renewed + */ + renewJobs(queue: string, jobIds: string[]): Promise + /** * Mark a job as completed and remove it from the queue. * diff --git a/src/drivers/fake_adapter.ts b/src/drivers/fake_adapter.ts index 3671cc4..7eaef7a 100644 --- a/src/drivers/fake_adapter.ts +++ b/src/drivers/fake_adapter.ts @@ -345,6 +345,21 @@ export class FakeAdapter implements Adapter { return recovered } + async renewJobs(queue: string, jobIds: string[]): Promise { + const now = Date.now() + let renewed = 0 + + for (const jobId of jobIds) { + const active = this.#activeJobs.get(jobId) + if (active && active.queue === queue) { + active.acquiredAt = now + renewed++ + } + } + + return renewed + } + async getJob(jobId: string, queue: string): Promise { const active = this.#activeJobs.get(jobId) if (active && active.queue === queue) { diff --git a/src/drivers/knex_adapter.ts b/src/drivers/knex_adapter.ts index adeb610..a83a935 100644 --- a/src/drivers/knex_adapter.ts +++ b/src/drivers/knex_adapter.ts @@ -630,6 +630,24 @@ export class KnexAdapter implements Adapter { }) } + async renewJobs(queue: string, jobIds: string[]): Promise { + if (jobIds.length === 0) { + return 0 + } + + const now = Date.now() + + // Only renew jobs that are still active; a job that was already recovered + // or finalized will not match and is therefore never resurrected. + const renewed = await this.#connection(this.#jobsTable) + .where('queue', queue) + .where('status', 'active') + .whereIn('id', jobIds) + .update({ acquired_at: now }) + + return renewed + } + async upsertSchedule(config: ScheduleConfig): Promise { const id = config.id ?? randomUUID() diff --git a/src/drivers/redis_adapter.ts b/src/drivers/redis_adapter.ts index ad3626e..50a9239 100644 --- a/src/drivers/redis_adapter.ts +++ b/src/drivers/redis_adapter.ts @@ -24,6 +24,7 @@ import { PUSH_JOB_SCRIPT, RECOVER_STALLED_JOBS_SCRIPT, REMOVE_JOB_SCRIPT, + RENEW_JOBS_SCRIPT, RETRY_JOB_SCRIPT, } from './redis_scripts.js' @@ -404,6 +405,25 @@ export class RedisAdapter implements Adapter { return recovered as number } + async renewJobs(queue: string, jobIds: string[]): Promise { + if (jobIds.length === 0) { + return 0 + } + + const keys = this.#getKeys(queue) + const now = Date.now() + + const renewed = await this.#connection.eval( + RENEW_JOBS_SCRIPT, + 1, + keys.active, + now.toString(), + ...jobIds + ) + + return renewed as number + } + async upsertSchedule(config: ScheduleConfig): Promise { const id = config.id ?? randomUUID() const now = Date.now() diff --git a/src/drivers/redis_scripts.ts b/src/drivers/redis_scripts.ts index e0ff6dd..5d22cff 100644 --- a/src/drivers/redis_scripts.ts +++ b/src/drivers/redis_scripts.ts @@ -401,6 +401,32 @@ ${REDIS_JOB_STORAGE_LUA} return recovered ` +/** + * Lua script for renewing the acquired timestamp of in-flight jobs (heartbeat). + * Only entries still present in the active hash are renewed, so a job that was + * already recovered or finalized is never resurrected by a late heartbeat. + * Preserves the existing worker info, updating only acquiredAt. + * Returns the number of jobs renewed. + */ +export const RENEW_JOBS_SCRIPT = ` + local active_key = KEYS[1] + local now = tonumber(ARGV[1]) + + local renewed = 0 + for i = 2, #ARGV do + local job_id = ARGV[i] + local active_data = redis.call('HGET', active_key, job_id) + if active_data then + local active = cjson.decode(active_data) + active.acquiredAt = now + redis.call('HSET', active_key, job_id, cjson.encode(active)) + renewed = renewed + 1 + end + end + + return renewed +` + /** * Lua script for getting a job record with its status. */ diff --git a/src/drivers/sync_adapter.ts b/src/drivers/sync_adapter.ts index d97fa55..91d8468 100644 --- a/src/drivers/sync_adapter.ts +++ b/src/drivers/sync_adapter.ts @@ -110,6 +110,11 @@ export class SyncAdapter implements Adapter { return Promise.resolve(0) } + renewJobs(_queue: string, _jobIds: string[]): Promise { + // SyncAdapter executes jobs immediately - there is nothing to renew + return Promise.resolve(0) + } + getJob(_jobId: string, _queue: string): Promise { return Promise.resolve(null) } diff --git a/src/job_pool.ts b/src/job_pool.ts index 2225ccf..2ffcf5b 100644 --- a/src/job_pool.ts +++ b/src/job_pool.ts @@ -67,6 +67,30 @@ export class JobPool { this.#activeJobs.set(job.id, { promise, job, queue }) } + /** + * Get the ids of all currently running jobs, grouped by the queue they + * came from. + * + * Used by the worker heartbeat to renew the acquired timestamp of in-flight + * jobs so long-running handlers are not mistaken for stalled jobs. + * + * @returns A map of queue name to the job ids running for that queue + */ + activeJobIdsByQueue(): Map { + const byQueue = new Map() + + for (const { job, queue } of this.#activeJobs.values()) { + const ids = byQueue.get(queue) + if (ids) { + ids.push(job.id) + } else { + byQueue.set(queue, [job.id]) + } + } + + return byQueue + } + /** * Wait for the next job to complete and return it. * diff --git a/src/worker.ts b/src/worker.ts index 83cec94..c8d680c 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -75,6 +75,8 @@ export class Worker { #pool?: JobPool #lastStalledCheck = 0 #shutdownHandler?: () => Promise + #heartbeatTimer?: NodeJS.Timeout + #renewingJobs = false /** Unique identifier for this worker instance */ get id() { @@ -193,6 +195,13 @@ export class Worker { await this.#pool.drain() } + // Stop the heartbeat only after draining, so jobs that are still finishing + // keep being renewed until they actually complete. The process() generator + // also clears it in its `finally`, but clearing here guarantees prompt and + // deterministic cleanup regardless of how the worker was driven (start() vs + // processCycle()). + this.#stopHeartbeat() + this.#removeShutdownHandlers() } @@ -267,48 +276,53 @@ export class Worker { */ async *process(queues: string[]): AsyncGenerator { this.#pool = new JobPool() + this.#startHeartbeat(queues) - while (this.#running) { - try { - // Check for stalled jobs periodically - await this.#checkStalledJobs(queues) - - // Dispatch any due scheduled jobs - await this.#dispatchDueSchedules() - - yield* this.#fillPool(queues) - - if (this.#pool.isEmpty()) { - yield { type: 'idle', suggestedDelay: this.#idleDelay } - continue - } - - const hasCapacity = this.#pool.hasCapacity(this.#concurrency) - - // If we have capacity, don't block indefinitely waiting for a completion; - // wake up periodically to try to acquire newly enqueued jobs. - const result = await Promise.race([ - this.#pool - .waitForNextCompletion() - .then((completed) => ({ kind: 'completed' as const, completed })), - ...(hasCapacity - ? [setTimeout(this.#idleDelay).then(() => ({ kind: 'tick' as const }))] - : []), - ]) - - if (result.kind === 'tick') { - // No completion yet, but we woke up to check the queue again - continue - } - - yield { type: 'completed', queue: result.completed.queue, job: result.completed.job } - } catch (error) { - yield { - type: 'error', - error: error as Error, - suggestedDelay: parse(DEFAULT_ERROR_RETRY_DELAY), + try { + while (this.#running) { + try { + // Check for stalled jobs periodically + await this.#checkStalledJobs(queues) + + // Dispatch any due scheduled jobs + await this.#dispatchDueSchedules() + + yield* this.#fillPool(queues) + + if (this.#pool.isEmpty()) { + yield { type: 'idle', suggestedDelay: this.#idleDelay } + continue + } + + const hasCapacity = this.#pool.hasCapacity(this.#concurrency) + + // If we have capacity, don't block indefinitely waiting for a completion; + // wake up periodically to try to acquire newly enqueued jobs. + const result = await Promise.race([ + this.#pool + .waitForNextCompletion() + .then((completed) => ({ kind: 'completed' as const, completed })), + ...(hasCapacity + ? [setTimeout(this.#idleDelay).then(() => ({ kind: 'tick' as const }))] + : []), + ]) + + if (result.kind === 'tick') { + // No completion yet, but we woke up to check the queue again + continue + } + + yield { type: 'completed', queue: result.completed.queue, job: result.completed.job } + } catch (error) { + yield { + type: 'error', + error: error as Error, + suggestedDelay: parse(DEFAULT_ERROR_RETRY_DELAY), + } } } + } finally { + this.#stopHeartbeat() } } @@ -502,6 +516,76 @@ export class Worker { } } + /** + * Start the heartbeat timer that periodically renews the acquired timestamp + * of in-flight jobs. + * + * Renewal cannot piggyback on the main process loop: at full concurrency the + * loop blocks on `waitForNextCompletion()` with no idle tick, so exactly when + * long-running jobs are in flight the loop is not cycling. A dedicated timer + * guarantees healthy jobs are refreshed before `recoverStalledJobs` would + * consider them stalled and re-deliver them. + * + * The interval is half the stalled threshold so a job is renewed at least + * once within every stalled window. + */ + #startHeartbeat(queues: string[]) { + // Never leave a previous timer running if the loop is somehow re-entered. + this.#stopHeartbeat() + + const interval = Math.max(Math.floor(this.#stalledThreshold / 2), 1) + + this.#heartbeatTimer = setInterval(() => { + void this.#renewActiveJobs(queues) + }, interval) + + // Don't let the heartbeat keep the event loop alive on its own. + this.#heartbeatTimer.unref?.() + } + + #stopHeartbeat() { + if (this.#heartbeatTimer) { + clearInterval(this.#heartbeatTimer) + this.#heartbeatTimer = undefined + } + } + + /** + * Renew the acquired timestamp of the jobs currently in the pool so that + * long-running handlers are not treated as stalled while they are still + * running. Only jobs still active in the adapter are renewed. + */ + async #renewActiveJobs(queues: string[]): Promise { + // Guard against overlapping runs if a renewal takes longer than the interval. + if (this.#renewingJobs || !this.#pool || this.#pool.isEmpty()) { + return + } + + this.#renewingJobs = true + + try { + const jobIdsByQueue = this.#pool.activeJobIdsByQueue() + + for (const queue of queues) { + const jobIds = jobIdsByQueue.get(queue) + + if (!jobIds || jobIds.length === 0) { + continue + } + + try { + await this.#wrapInternal(() => this.#adapter.renewJobs(queue, jobIds)) + } catch (error) { + // A failed heartbeat must never crash the worker; the job will simply + // be considered stalled if renewals keep failing. + debug('worker %s: failed to renew jobs on queue %s: %O', this.#id, queue, error) + } + } + } finally { + this.#renewingJobs = false + } + } + #setupGracefulShutdown() { if (!this.#gracefulShutdown) { return diff --git a/tests/_mocks/memory_adapter.ts b/tests/_mocks/memory_adapter.ts index fa64e31..0f940e8 100644 --- a/tests/_mocks/memory_adapter.ts +++ b/tests/_mocks/memory_adapter.ts @@ -253,6 +253,21 @@ export class MemoryAdapter implements Adapter { return recovered } + async renewJobs(queue: string, jobIds: string[]): Promise { + const now = Date.now() + let renewed = 0 + + for (const jobId of jobIds) { + const active = this.#activeJobs.get(jobId) + if (active && active.queue === queue) { + active.acquiredAt = now + renewed++ + } + } + + return renewed + } + async getJob(jobId: string, queue: string): Promise { const active = this.#activeJobs.get(jobId) if (active && active.queue === queue) { diff --git a/tests/_utils/register_driver_test_suite.ts b/tests/_utils/register_driver_test_suite.ts index e1a6a99..0b4ff3f 100644 --- a/tests/_utils/register_driver_test_suite.ts +++ b/tests/_utils/register_driver_test_suite.ts @@ -691,6 +691,91 @@ export function registerDriverTestSuite(options: DriverTestSuiteOptions) { assert.equal(recoveredJobB!.id, 'job-stalled-b') }) + test('renewJobs should keep an active job from being recovered as stalled', async ({ + assert, + }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushOn('test-queue', { + id: 'long-running', + name: 'TestJob', + payload: {}, + attempts: 0, + }) + + const job = await adapter.popFrom('test-queue') + assert.isNotNull(job) + + // Keep renewing the job while it "runs" longer than the stalled threshold. + for (let i = 0; i < 5; i++) { + await new Promise((resolve) => setTimeout(resolve, 20)) + const renewed = await adapter.renewJobs('test-queue', ['long-running']) + assert.equal(renewed, 1) + + // Even though more than 30ms has elapsed in total, the job is never + // stalled because each renewal refreshes its acquired timestamp. + const recovered = await adapter.recoverStalledJobs('test-queue', 30, 1) + assert.equal(recovered, 0) + } + + // Still active, not back in pending. + const pending = await adapter.popFrom('test-queue') + assert.isNull(pending) + }) + + test('renewJobs should only renew jobs that are still active', async ({ assert }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushOn('test-queue', { + id: 'job-1', + name: 'TestJob', + payload: {}, + attempts: 0, + }) + + const job = await adapter.popFrom('test-queue') + assert.isNotNull(job) + + // Let it stall and recover it back to pending. + await new Promise((resolve) => setTimeout(resolve, 30)) + const recovered = await adapter.recoverStalledJobs('test-queue', 10, 1) + assert.equal(recovered, 1) + + // A late heartbeat for the (no longer active) job must not resurrect it. + const renewed = await adapter.renewJobs('test-queue', ['job-1']) + assert.equal(renewed, 0) + + // The recovered job is still pending and can be acquired exactly once. + const reacquired = await adapter.popFrom('test-queue') + assert.isNotNull(reacquired) + assert.equal(reacquired!.id, 'job-1') + }) + + test('renewJobs should only renew jobs on the targeted queue', async ({ assert }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushOn('queue-a', { id: 'job-a', name: 'TestJob', payload: null, attempts: 0 }) + await adapter.pushOn('queue-b', { id: 'job-b', name: 'TestJob', payload: null, attempts: 0 }) + + await adapter.popFrom('queue-a') + await adapter.popFrom('queue-b') + + // job-b is active on queue-b, so renewing it on queue-a renews nothing. + const renewed = await adapter.renewJobs('queue-a', ['job-b']) + assert.equal(renewed, 0) + }) + + test('renewJobs should return 0 when given no job ids', async ({ assert }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + const renewed = await adapter.renewJobs('test-queue', []) + assert.equal(renewed, 0) + }) + test('completeJob with undefined retention should remove job (default behavior)', async ({ assert, }) => { diff --git a/tests/queue_manager.spec.ts b/tests/queue_manager.spec.ts index c92d5d5..46dde6d 100644 --- a/tests/queue_manager.spec.ts +++ b/tests/queue_manager.spec.ts @@ -247,6 +247,7 @@ test.group('QueueManager', () => { pop: async () => null, popFrom: async () => null, recoverStalledJobs: async () => 0, + renewJobs: async () => 0, completeJob: async () => {}, failJob: async () => {}, retryJob: async () => {}, @@ -315,6 +316,7 @@ test.group('QueueManager', () => { pop: async () => null, popFrom: async () => null, recoverStalledJobs: async () => 0, + renewJobs: async () => 0, completeJob: async () => {}, failJob: async () => {}, retryJob: async () => {}, diff --git a/tests/worker.spec.ts b/tests/worker.spec.ts index fbe5b65..b61cbdf 100644 --- a/tests/worker.spec.ts +++ b/tests/worker.spec.ts @@ -1022,6 +1022,155 @@ test.group('Worker', () => { assert.equal(executionCount, 1, 'Job should have been executed once') }) + test('heartbeat keeps a long-running job from being recovered as stalled', async ({ + assert, + cleanup, + }) => { + let executionCount = 0 + let started: () => void = () => {} + const jobStarted = new Promise((resolve) => { + started = resolve + }) + + class LongJob extends Job { + async execute() { + executionCount++ + started() + // Run well beyond the stalled threshold. While this handler runs the + // worker is at full capacity, so its main loop is blocked on the single + // in-flight job and only the dedicated heartbeat timer keeps refreshing + // the job's acquired timestamp. + await setTimeout(400) + } + } + + const sharedAdapter = memory()() + + const localConfig = { + default: 'memory', + adapters: { memory: () => sharedAdapter }, + worker: { + concurrency: 1, + idleDelay: 20, + stalledThreshold: 100, + // Large so the worker's own periodic check never fires during the test; + // we drive recovery manually to simulate another worker (or the checker) + // looking for stalled jobs while this one is still running. + stalledInterval: 10_000, + maxStalledCount: 1, + gracefulShutdown: false, + }, + } + + Locator.register('LongJob', LongJob) + + const worker = new Worker(localConfig) + + cleanup(async () => { + Locator.clear() + await worker.stop() + await QueueManager.destroy() + }) + + await sharedAdapter.pushOn('default', { + id: 'long-running-job', + name: 'LongJob', + payload: {}, + attempts: 0, + }) + + // Run the worker in the background; start() blocks until the worker stops. + const running = worker.start(['default']) + await jobStarted + + // Repeatedly run a stalled check while the handler is still executing past + // the stalled threshold. The heartbeat keeps acquiredAt fresh, so there is + // never anything to recover. Without the heartbeat this would recover the + // job back to pending and it would run a second time. + for (let i = 0; i < 3; i++) { + await setTimeout(80) + const recovered = await sharedAdapter.recoverStalledJobs('default', 100, 1) + assert.equal(recovered, 0, 'Heartbeat should keep the running job from being recovered') + } + + // Let the job finish and the worker go idle before stopping, so stop() does + // not race with an in-flight job. + await setTimeout(300) + await worker.stop() + await running + + assert.equal(executionCount, 1, 'Long-running job should execute exactly once') + }) + + test('stops the heartbeat once the worker is stopped', async ({ assert, cleanup }) => { + let renewCalls = 0 + let started: () => void = () => {} + const jobStarted = new Promise((resolve) => { + started = resolve + }) + + class LongJob extends Job { + async execute() { + started() + await setTimeout(200) + } + } + + const sharedAdapter = memory()() + const originalRenew = sharedAdapter.renewJobs.bind(sharedAdapter) + sharedAdapter.renewJobs = async (queue: string, jobIds: string[]) => { + renewCalls++ + return originalRenew(queue, jobIds) + } + + const localConfig = { + default: 'memory', + adapters: { memory: () => sharedAdapter }, + worker: { + concurrency: 1, + idleDelay: 20, + // Small threshold -> ~20ms heartbeat interval so it fires several times + // within the test window. + stalledThreshold: 40, + stalledInterval: 10_000, + maxStalledCount: 1, + gracefulShutdown: false, + }, + } + + Locator.register('LongJob', LongJob) + + const worker = new Worker(localConfig) + + cleanup(async () => { + Locator.clear() + await worker.stop() + await QueueManager.destroy() + }) + + await sharedAdapter.pushOn('default', { + id: 'hb-job', + name: 'LongJob', + payload: {}, + attempts: 0, + }) + + const running = worker.start(['default']) + await jobStarted + + // The heartbeat should renew the in-flight job while it runs. + await setTimeout(120) + assert.isAbove(renewCalls, 0, 'Heartbeat should renew while a job is in flight') + + await worker.stop() + await running + + // Once stopped, the heartbeat timer must be cleared: no further renewals. + const callsAtStop = renewCalls + await setTimeout(150) + assert.equal(renewCalls, callsAtStop, 'Heartbeat must not fire after stop()') + }) + test('should fail stalled job permanently after maxStalledCount exceeded', async ({ assert, cleanup,