Workers
Workers claim queued runs and execute task attempts.
A worker is the runtime loop that turns queued durable runs into task attempts. Core has two storage-acquisition modes, poll and drain, both built on executeNext(options), plus a separate transport-driven executeDelivery(message, options) path.
runNow() is the current-process exception: it creates and leases one new run atomically, then reuses the same post-lease execution path workers use. It is covered separately because it must not scan storage or execute an unrelated queued run. See Current-Process Execution.
Use polling workers in application processes that should execute background work continuously:
import { WorkerMode } from '@runlane/core'
import { emailQueue } from './queues'
const worker = runlane.worker({
mode: WorkerMode.Poll,
queues: [emailQueue],
concurrency: 4,
})
// During process shutdown:
await worker.stop()runlane.worker() starts immediately and returns a handle. Omitting mode preserves the default polling behavior.
Worker Handle
The worker handle separates passive state from actions:
const worker = runlane.worker({ mode: WorkerMode.Poll })
await worker.closedworker.closed is a Promise<void> property, not a function. It is the same terminal promise every time it is read, and it settles when every worker slot has exited. Await it when the process should stay alive until the worker exits naturally or fails.
worker.stop() is the action. It aborts the worker signal, waits for active attempts to finish cooperatively, and then returns the same terminal result represented by worker.closed.
process.once('SIGTERM', () => {
void worker.stop()
})Await worker.closed or worker.stop() to observe worker loop failures. The handle attaches an internal rejection handler only to prevent unhandled promise warnings.
Execution Options
runlane.worker() options:
| Option | Default | Valid with | What it controls |
|---|---|---|---|
mode | WorkerMode.Poll | worker | Poll keeps scanning until stopped; Drain exits when current due work is gone or maxRuns is reached. |
queues | All registered runtime queues | worker, executeNext() | Queue definitions this storage-acquisition path may claim from. |
concurrency | 1 | worker | Local process slots. This is not durable queue capacity. |
emptyPollDelay | 100ms | poll worker only | Sleep after an empty storage scan. |
maxRuns | No limit | drain worker only | Maximum successfully executed runs before drain exits. Lost claims and abandoned attempts do not count. |
leaseDuration | contractDefaults.lease.duration (5m) | worker, executeNext() | Durable ownership window written when a run is claimed and on each heartbeat. |
heartbeatInterval | Half of leaseDuration | worker, executeNext() | How often core writes run.lease_heartbeat while user code is still running. Must be shorter than leaseDuration. |
onRunExecuted | None | worker | Observer called after a worker slot persists the run's final state for one attempt. Use it for logs or metrics, not run control flow; observer failures surface through worker.closed after the run state is already durable. |
signal | None | worker, executeNext() | Cooperative stop/cancel signal linked into TaskContext.signal. |
workerId | Runtime worker id | worker, executeNext() | Diagnostic owner recorded on leases. It does not route work. |
runlane.executeNext(options) accepts the shared storage-acquisition options without worker loop controls:
| Option | Default | What it controls |
|---|---|---|
queues | All registered runtime queues | Queue definitions this one storage scan may claim from. |
leaseDuration | contractDefaults.lease.duration (5m) | Durable ownership window written when a run is claimed and on each heartbeat. |
heartbeatInterval | Half of leaseDuration | How often core writes run.lease_heartbeat while user code is still running. Must be shorter than leaseDuration. |
signal | None | Cooperative cancellation signal linked into TaskContext.signal. If already aborted before the scan, executeNext() returns undefined. |
workerId | Runtime worker id | Diagnostic owner recorded on leases. |
runlane.executeDelivery(message, options) options:
| Option | Default | What it controls |
|---|---|---|
leaseDuration | contractDefaults.lease.duration (5m) | Durable ownership window for this delivered attempt. |
heartbeatInterval | Half of leaseDuration | Heartbeat cadence for this attempt. Must be shorter than leaseDuration. |
signal | None | Abort signal linked into the task context. If already aborted before claim, delivery rejects so the transport does not acknowledge unhandled work. |
workerId | Runtime worker id | Diagnostic owner recorded on the run lease. |
Acquisition Paths
Plain language:
poll means: "Keep looking for work until I'm told to stop."
- Good for EC2, containers, local dev.
- If storage has work, execute it.
- If storage has no work, wait
emptyPollDelay, then check again. - It is a long-running loop.
drain means: "Process whatever is available right now, then exit."
- Good for cron/serverless/bounded jobs.
- If storage has 10 due runs and concurrency is 2, process them until none are left.
- If storage is empty, return immediately.
- No sleeping, no idle loop.
executeNext(options) performs one storage scan, tries to claim one currently due run, executes at most one attempt, and returns the final RunRecord for that attempt. If storage has no due work, the signal is already aborted, the candidate run disappears, another worker wins the claim, or the attempt is abandoned after losing lease ownership, it returns undefined. Worker loops keep the reason internally so drain workers can distinguish idle storage from races that should not spend maxRuns.
executeDelivery is different. It should not use drain.
drain asks storage: "What work is due?"
executeDelivery(message) says: "A transport woke me for this specific runId; try to execute exactly this run if storage says it is still valid."
poll
-> storage scan
-> execute next due run
-> no work? sleep, repeat
drain
-> storage scan
-> execute due runs until empty or maxRuns reached
-> exit
executeDelivery
-> use delivered runId
-> read that run
-> ignore if terminal/not due/wrong queue/already leased
-> otherwise claim and execute it
-> exitSQS/Lambda uses executeDelivery, not drain, because Lambda already received specific SQS messages. Scanning storage again would ignore the transport's work acquisition and can cause bad behavior around ack/retry.
Queues
Workers only claim runs from their configured queues. Omitting queues means the worker can process any registered runtime queue. Queue filters accept queue definitions from the runtime queue catalog; CLI text input such as --queue emails is resolved against runtime.queues before calling the runtime API.
Queue filters are routing, not ownership. Storage still owns run truth, and every worker reads the latest RunRecord before executing.
Concurrency
concurrency controls how many attempts one storage-acquisition worker process may run at the same time. Omitting it starts one slot.
Each polling concurrency slot runs the same storage scan, claim, and execute loop. If storage has no due run, the slot waits for emptyPollDelay before scanning again. Omitting emptyPollDelay uses 100ms.
Worker concurrency and emptyPollDelay are local process controls. Queue concurrencyLimit is durable storage policy enforced across workers for bounded queues. Transport-driven SQS consumers use provider message delivery plus executeDelivery(message) and do not use the storage-polling worker's empty-poll loop.
Use the core polling worker for local development loops and deployments that intentionally poll storage, such as a simple EC2 or container process. Event-driven transports should use executeDelivery(message).
Drain Workers
Drain mode is still storage acquisition. It is useful when the process itself owns the decision to scan storage, but should not stay alive after the current backlog is gone:
const worker = runlane.worker({
mode: WorkerMode.Drain,
queues: [emailQueue],
concurrency: 2,
maxRuns: 100,
})
await worker.closedmaxRuns is optional. When omitted, drain mode keeps scanning until storage returns no due work. emptyPollDelay is not valid in drain mode because drain workers do not sleep on empty storage.
Lost lease-claim races and abandoned attempts do not count against maxRuns. In both cases storage had due work, but this worker did not execute a run, so drain workers keep scanning and exit only when storage is idle or the executed-run budget is reached.
Delivered Wakeups
executeDelivery(message) is the transport-driven execution primitive. It receives a DeliveryMessage, reads storage by message.environment + message.runId, verifies the queue, and then either claims and executes that run or returns an ignored result.
import { ExecuteDeliveryStatus } from '@runlane/core'
const result = await runlane.executeDelivery(message)
if (result.status === ExecuteDeliveryStatus.Ignored) {
// Ack-safe non-execution: terminal, not due, wrong queue, already leased, claim lost, or abandoned.
}Ignored delivery results are safe for transport adapters to acknowledge.
| Result | Transport behavior |
|---|---|
Persisted task outcome: run.succeeded, run.failed, run.retry_scheduled, run.released, or run.cancelled | Acknowledge the provider message. Delivery handling completed. |
| Missing registered task or queue | Core persists a non-retryable run.failed outcome, then returns Executed to avoid poison-message redelivery loops. |
| Storage, projection, cancellation-before-claim, or persistence failure | Reject so the transport integration can retry or report a provider batch failure. |
| Already-aborted delivered signal | Reject with OperationCancelled before claim. A delivered provider message should not be acknowledged without handling. |
This is stricter than storage polling: executeNext({ signal }) returns undefined when the signal is already aborted, because no provider message has been acquired.
For SQS-to-Lambda, @runlane/transport-sqs:
- parses SQS records into
DeliveryMessagevalues - calls the supplied
executeDelivery()callback for each parsed record - returns batch item failures only for rejected framework or infrastructure failures
For standard SQS queues, the Lambda helper continues after a failed record and reports only that record as a partial batch failure. For FIFO queues, pass fifo: true; the helper stops after the first failure and returns that failed record plus every later unprocessed record so provider ordering is preserved.
For a long-running SQS consumer, createSqsDeliveryConsumer() blocks on SQS ReceiveMessage, calls executeDelivery() for each received message, and deletes only messages whose delivery resolves. It does not scan storage on emptyPollDelay. Messages whose provider shape, wakeup body, or execution path rejects are left undeleted so SQS visibility timeout and redrive policy can handle retry or DLQ movement.
| Path | Acquisition owner | Acknowledges provider messages | Use when |
|---|---|---|---|
worker({ mode: WorkerMode.Poll }) | Runlane storage scan | No | A long-running process intentionally polls storage. |
worker({ mode: WorkerMode.Drain }) | Runlane storage scan | No | A bounded process should execute due storage work and exit. |
executeNext(options) | One Runlane storage scan | No | Tests, CLIs, or custom loops need one storage-acquired attempt. |
createSqsLambdaHandler() | AWS Lambda SQS event source | Yes, through partial batch failures | Lambda receives SQS records and should invoke executeDelivery(). |
createSqsDeliveryConsumer() | SQS ReceiveMessage loop | Yes, through DeleteMessageBatch | EC2, ECS, Fargate, or another Node process should consume SQS directly. |
Pick one production acquisition path per queue. For SQS-backed queues on EC2/ECS/Fargate, use createSqsDeliveryConsumer(). For polling deployments, use runlane.worker({ mode: WorkerMode.Poll }) with a lane/config that does not publish SQS wakeups for that queue. A storage-polling worker can execute due runs from storage, but it will not receive or delete SQS records.
Leases And Heartbeats
Every attempt starts by claiming a run lease. The lease prevents two workers from executing the same run at the same time. While task code is running, core records run.lease_heartbeat events so storage can extend worker ownership.
leaseDuration controls how long each claim or heartbeat keeps ownership alive. In Lambda, runlane.executeDelivery(message, { leaseDuration: '2m' }) writes the durable marker that means "this run is held by this worker invocation until time T."
If the Lambda crashes or times out before persisting an outcome, the lease eventually expires and tick() can request delivery again for another consumer. That lease is the durable safety net behind the provider timeout.
Set leaseDuration longer than the consumer's maximum execution window. For example, a Lambda with a 1-minute timeout commonly uses a 2-minute Runlane lease for provider shutdown, storage writes, and recovery races.
Set the SQS visibility timeout longer than the same handling window so the provider does not redeliver before Runlane's own lease can protect the run.
heartbeatInterval controls how often core heartbeats during an active attempt. When omitted, core heartbeats halfway through the lease duration. A custom heartbeatInterval must be shorter than leaseDuration.
If a heartbeat observes CancellationRequested, core aborts the task context signal and stops scheduling further heartbeats for that attempt. The handler still exits cooperatively; core then persists the handler's outcome against the current owned run if ownership is still valid.
If a heartbeat fails with StorageConflict, the attempt has lost ownership. Core aborts the task context signal, abandons that attempt result, and lets the run become recoverable through normal lease expiry. Other heartbeat or persistence failures, such as StorageUnavailable, reject the worker loop or delivery call because the runtime cannot prove ownership or persist the outcome.
Cooperative Stop
Runlane does not hard-kill user code. Worker stop and external worker signals abort the TaskContext.signal passed to the handler and make context.isCancellationRequested() return true.
Task code should check the signal at safe points and finish in a way that matches the domain: return successfully after cleanup, throw a structured non-retryable error, or release the run to continue later. Core waits for the handler to cooperate before the worker closes.
Cancellation before acquisition differs by path:
| Path | Already-aborted signal before claim |
|---|---|
executeNext({ signal }) | Returns undefined; no provider message has been acquired. |
worker({ signal }) or worker.stop() | Stops the local worker loop and waits for active attempts to cooperate. |
executeDelivery(message, { signal }) | Rejects with OperationCancelled so the transport does not acknowledge an unhandled provider message. |