Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/contracts/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,21 @@ export interface Adapter {
maxStalledCount: number
): Promise<number>

/**
* Renew the acquired timestamp of in-flight jobs (heartbeat).
*
* A worker calls this periodically for the jobs it is actively processing
* so that long-running handlers are not mistaken for stalled jobs and
* re-delivered while they are still running. Only jobs that are still
* active are renewed; jobs that have already been recovered/completed are
* skipped (so a job is never resurrected by a late heartbeat).
*
* @param queue - The queue the jobs belong to
* @param jobIds - The ids of the jobs currently being processed
* @returns Number of jobs whose timestamp was renewed
*/
renewJobs(queue: string, jobIds: string[]): Promise<number>

/**
* Mark a job as completed and remove it from the queue.
*
Expand Down
15 changes: 15 additions & 0 deletions src/drivers/fake_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,21 @@ export class FakeAdapter implements Adapter {
return recovered
}

async renewJobs(queue: string, jobIds: string[]): Promise<number> {
const now = Date.now()
let renewed = 0

for (const jobId of jobIds) {
const active = this.#activeJobs.get(jobId)
if (active && active.queue === queue) {
active.acquiredAt = now
renewed++
}
}

return renewed
}

async getJob(jobId: string, queue: string): Promise<JobRecord | null> {
const active = this.#activeJobs.get(jobId)
if (active && active.queue === queue) {
Expand Down
18 changes: 18 additions & 0 deletions src/drivers/knex_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,24 @@ export class KnexAdapter implements Adapter {
})
}

async renewJobs(queue: string, jobIds: string[]): Promise<number> {
if (jobIds.length === 0) {
return 0
}

const now = Date.now()

// Only renew jobs that are still active; a job that was already recovered
// or finalized will not match and is therefore never resurrected.
const renewed = await this.#connection(this.#jobsTable)
.where('queue', queue)
.where('status', 'active')
.whereIn('id', jobIds)
.update({ acquired_at: now })

return renewed
}

async upsertSchedule(config: ScheduleConfig): Promise<string> {
const id = config.id ?? randomUUID()

Expand Down
20 changes: 20 additions & 0 deletions src/drivers/redis_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
PUSH_JOB_SCRIPT,
RECOVER_STALLED_JOBS_SCRIPT,
REMOVE_JOB_SCRIPT,
RENEW_JOBS_SCRIPT,
RETRY_JOB_SCRIPT,
} from './redis_scripts.js'

Expand Down Expand Up @@ -404,6 +405,25 @@ export class RedisAdapter implements Adapter {
return recovered as number
}

async renewJobs(queue: string, jobIds: string[]): Promise<number> {
if (jobIds.length === 0) {
return 0
}

const keys = this.#getKeys(queue)
const now = Date.now()

const renewed = await this.#connection.eval(
RENEW_JOBS_SCRIPT,
1,
keys.active,
now.toString(),
...jobIds
)

return renewed as number
}

async upsertSchedule(config: ScheduleConfig): Promise<string> {
const id = config.id ?? randomUUID()
const now = Date.now()
Expand Down
26 changes: 26 additions & 0 deletions src/drivers/redis_scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,32 @@ ${REDIS_JOB_STORAGE_LUA}
return recovered
`

/**
* Lua script for renewing the acquired timestamp of in-flight jobs (heartbeat).
* Only entries still present in the active hash are renewed, so a job that was
* already recovered or finalized is never resurrected by a late heartbeat.
* Preserves the existing worker info, updating only acquiredAt.
* Returns the number of jobs renewed.
*/
export const RENEW_JOBS_SCRIPT = `
local active_key = KEYS[1]
local now = tonumber(ARGV[1])

local renewed = 0
for i = 2, #ARGV do
local job_id = ARGV[i]
local active_data = redis.call('HGET', active_key, job_id)
if active_data then
local active = cjson.decode(active_data)
active.acquiredAt = now
redis.call('HSET', active_key, job_id, cjson.encode(active))
renewed = renewed + 1
end
end

return renewed
`

/**
* Lua script for getting a job record with its status.
*/
Expand Down
5 changes: 5 additions & 0 deletions src/drivers/sync_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ export class SyncAdapter implements Adapter {
return Promise.resolve(0)
}

renewJobs(_queue: string, _jobIds: string[]): Promise<number> {
// SyncAdapter executes jobs immediately - there is nothing to renew
return Promise.resolve(0)
}

getJob(_jobId: string, _queue: string): Promise<null> {
return Promise.resolve(null)
}
Expand Down
24 changes: 24 additions & 0 deletions src/job_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,30 @@ export class JobPool {
this.#activeJobs.set(job.id, { promise, job, queue })
}

/**
* Get the ids of all currently running jobs, grouped by the queue they
* came from.
*
* Used by the worker heartbeat to renew the acquired timestamp of in-flight
* jobs so long-running handlers are not mistaken for stalled jobs.
*
* @returns A map of queue name to the job ids running for that queue
*/
activeJobIdsByQueue(): Map<string, string[]> {
const byQueue = new Map<string, string[]>()

for (const { job, queue } of this.#activeJobs.values()) {
const ids = byQueue.get(queue)
if (ids) {
ids.push(job.id)
} else {
byQueue.set(queue, [job.id])
}
}

return byQueue
}

/**
* Wait for the next job to complete and return it.
*
Expand Down
162 changes: 123 additions & 39 deletions src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ export class Worker {
#pool?: JobPool
#lastStalledCheck = 0
#shutdownHandler?: () => Promise<void>
#heartbeatTimer?: NodeJS.Timeout
#renewingJobs = false

/** Unique identifier for this worker instance */
get id() {
Expand Down Expand Up @@ -193,6 +195,13 @@ export class Worker {
await this.#pool.drain()
}

// Stop the heartbeat only after draining, so jobs that are still finishing
// keep being renewed until they actually complete. The process() generator
// also clears it in its `finally`, but clearing here guarantees prompt and
// deterministic cleanup regardless of how the worker was driven (start() vs
// processCycle()).
this.#stopHeartbeat()

this.#removeShutdownHandlers()
}

Expand Down Expand Up @@ -267,48 +276,53 @@ export class Worker {
*/
async *process(queues: string[]): AsyncGenerator<WorkerCycle, void, unknown> {
this.#pool = new JobPool()
this.#startHeartbeat(queues)

while (this.#running) {
try {
// Check for stalled jobs periodically
await this.#checkStalledJobs(queues)

// Dispatch any due scheduled jobs
await this.#dispatchDueSchedules()

yield* this.#fillPool(queues)

if (this.#pool.isEmpty()) {
yield { type: 'idle', suggestedDelay: this.#idleDelay }
continue
}

const hasCapacity = this.#pool.hasCapacity(this.#concurrency)

// If we have capacity, don't block indefinitely waiting for a completion;
// wake up periodically to try to acquire newly enqueued jobs.
const result = await Promise.race([
this.#pool
.waitForNextCompletion()
.then((completed) => ({ kind: 'completed' as const, completed })),
...(hasCapacity
? [setTimeout(this.#idleDelay).then(() => ({ kind: 'tick' as const }))]
: []),
])

if (result.kind === 'tick') {
// No completion yet, but we woke up to check the queue again
continue
}

yield { type: 'completed', queue: result.completed.queue, job: result.completed.job }
} catch (error) {
yield {
type: 'error',
error: error as Error,
suggestedDelay: parse(DEFAULT_ERROR_RETRY_DELAY),
try {
while (this.#running) {
try {
// Check for stalled jobs periodically
await this.#checkStalledJobs(queues)

// Dispatch any due scheduled jobs
await this.#dispatchDueSchedules()

yield* this.#fillPool(queues)

if (this.#pool.isEmpty()) {
yield { type: 'idle', suggestedDelay: this.#idleDelay }
continue
}

const hasCapacity = this.#pool.hasCapacity(this.#concurrency)

// If we have capacity, don't block indefinitely waiting for a completion;
// wake up periodically to try to acquire newly enqueued jobs.
const result = await Promise.race([
this.#pool
.waitForNextCompletion()
.then((completed) => ({ kind: 'completed' as const, completed })),
...(hasCapacity
? [setTimeout(this.#idleDelay).then(() => ({ kind: 'tick' as const }))]
: []),
])

if (result.kind === 'tick') {
// No completion yet, but we woke up to check the queue again
continue
}

yield { type: 'completed', queue: result.completed.queue, job: result.completed.job }
} catch (error) {
yield {
type: 'error',
error: error as Error,
suggestedDelay: parse(DEFAULT_ERROR_RETRY_DELAY),
}
}
}
} finally {
this.#stopHeartbeat()
}
}

Expand Down Expand Up @@ -502,6 +516,76 @@ export class Worker {
}
}

/**
* Start the heartbeat timer that periodically renews the acquired timestamp
* of in-flight jobs.
*
* Renewal cannot piggyback on the main process loop: at full concurrency the
* loop blocks on `waitForNextCompletion()` with no idle tick, so exactly when
* long-running jobs are in flight the loop is not cycling. A dedicated timer
* guarantees healthy jobs are refreshed before `recoverStalledJobs` would
* consider them stalled and re-deliver them.
*
* The interval is half the stalled threshold so a job is renewed at least
* once within every stalled window.
*/
#startHeartbeat(queues: string[]) {
// Never leave a previous timer running if the loop is somehow re-entered.
this.#stopHeartbeat()

const interval = Math.max(Math.floor(this.#stalledThreshold / 2), 1)

this.#heartbeatTimer = setInterval(() => {
void this.#renewActiveJobs(queues)
}, interval)

// Don't let the heartbeat keep the event loop alive on its own.
this.#heartbeatTimer.unref?.()
}

#stopHeartbeat() {
if (this.#heartbeatTimer) {
clearInterval(this.#heartbeatTimer)
this.#heartbeatTimer = undefined
}
}

/**
* Renew the acquired timestamp of the jobs currently in the pool so that
* long-running handlers are not treated as stalled while they are still
* running. Only jobs still active in the adapter are renewed.
*/
async #renewActiveJobs(queues: string[]): Promise<void> {
// Guard against overlapping runs if a renewal takes longer than the interval.
if (this.#renewingJobs || !this.#pool || this.#pool.isEmpty()) {
return
}

this.#renewingJobs = true

try {
const jobIdsByQueue = this.#pool.activeJobIdsByQueue()

for (const queue of queues) {
const jobIds = jobIdsByQueue.get(queue)

if (!jobIds || jobIds.length === 0) {
continue
}

try {
await this.#wrapInternal(() => this.#adapter.renewJobs(queue, jobIds))
} catch (error) {
// A failed heartbeat must never crash the worker; the job will simply
// be considered stalled if renewals keep failing.
debug('worker %s: failed to renew jobs on queue %s: %O', this.#id, queue, error)
}
}
} finally {
this.#renewingJobs = false
}
}

#setupGracefulShutdown() {
if (!this.#gracefulShutdown) {
return
Expand Down
15 changes: 15 additions & 0 deletions tests/_mocks/memory_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,21 @@ export class MemoryAdapter implements Adapter {
return recovered
}

async renewJobs(queue: string, jobIds: string[]): Promise<number> {
const now = Date.now()
let renewed = 0

for (const jobId of jobIds) {
const active = this.#activeJobs.get(jobId)
if (active && active.queue === queue) {
active.acquiredAt = now
renewed++
}
}

return renewed
}

async getJob(jobId: string, queue: string): Promise<JobRecord | null> {
const active = this.#activeJobs.get(jobId)
if (active && active.queue === queue) {
Expand Down
Loading