SQS Transport
Wakeup-only Amazon SQS transport adapter.
@runlane/transport-sqs publishes Runlane wakeups to Amazon SQS. It is a production transport adapter, not a storage layer and not a compute integration.
SQS carries only enough data to find a durable run again: environment, run id, logical queue, requested time, and optional trace context. Storage still owns payloads, run state, leases, retries, releases, schedules, idempotency, singleton enforcement, and operator truth.
Use it when your production lane should wake Lambda, EC2, ECS, Fargate, or another Node process through SQS. Do not use it to store task payloads or to model AWS Batch, browser workers, retries, or schedules.
pnpm add @runlane/transport-sqs @aws-sdk/client-sqsProducer Transport
Create one transport and bind registered Runlane queue definitions to AWS SQS resources:
import { SQSClient } from '@aws-sdk/client-sqs'
import { queue } from '@runlane/core'
import { sqsQueue, sqsTransport } from '@runlane/transport-sqs'
const defaultQueue = queue({ name: 'default', default: true })
const emailsQueue = queue({ name: 'emails' })
const mediaQueue = queue({ name: 'media' })
const transport = sqsTransport({
client: new SQSClient({ region: 'us-east-1' }),
queues: [
sqsQueue(defaultQueue, {
queueUrl: process.env.RUNLANE_DEFAULT_QUEUE_URL,
}),
sqsQueue(emailsQueue, {
queueName: 'runlane-emails-production',
}),
sqsQueue(mediaQueue, {
queueUrl: process.env.RUNLANE_MEDIA_QUEUE_URL,
}),
],
})sqsQueue(queueDefinition, options) is the provider binding. The queue definition owns Runlane routing and durable capacity policy; the SQS options own provider URL/name and FIFO behavior.
Use queues for workload routing such as default, emails, billing, media, and imports. Use separate lanes only for real infrastructure boundaries such as a different database, AWS account, region, tenant boundary, compliance boundary, or durability policy.
Each SQS queue binding accepts exactly one of queueUrl or queueName. queueUrl skips provider lookup. queueName calls AWS GetQueueUrlCommand once and caches the resolved URL inside the adapter; add queueOwnerAWSAccountId when resolving a queue owned by another AWS account. If the configured queue name cannot be resolved, publishWakeups() rejects with RunlaneError and ErrorCode.ConfigurationInvalid.
Unsupported options are rejected when the transport or queue binding is constructed. The SQS transport exposes its bound queue definitions, so createRunlane({ queues }) rejects missing provider bindings or queue-policy drift before the first wakeup publish.
| Option | Default | Notes |
|---|---|---|
batchSize | 10 | SendMessageBatch chunk size. Must be between 1 and the SQS maximum of 10. |
client | Required | AWS SDK v3 SQS client or compatible send() surface. |
name | 'sqs' | Adapter name reported through the transport contract. |
queues | Required | Array of sqsQueue(queueDefinition, options) bindings. Must contain at least one entry. |
queues[].queueUrl | None | Direct SQS queue URL. Skips lookup. Must be an http:// or https:// URL. |
queues[].queueName | None | SQS queue name. Resolved with GetQueueUrlCommand and cached. Queue names must satisfy SQS naming rules. |
queues[].queueOwnerAWSAccountId | None | Only valid with queueName. |
queues[].fifo.messageGroup | SqsFifoMessageGroup.Run ('run') | FIFO only. Run isolates groups per run; Queue groups by logical queue. |
queues[].fifo.deduplication | SqsFifoDeduplication.Outbox ('outbox') | FIFO only. Outbox sends a dedupe id derived from the durable outbox row; ContentBased omits MessageDeduplicationId. |
The package exports enums for the option and failure discriminants consumers may branch on:
| Enum | Values | Where it appears |
|---|---|---|
SqsFifoMessageGroup | Run, Queue | queues[].fifo.messageGroup |
SqsFifoDeduplication | Outbox, ContentBased | queues[].fifo.deduplication |
SqsQueueSourceType | QueueUrl, QueueName | Parsed SqsTransportConfig.queues[].source.type from resolveSqsTransportConfig() |
SqsDeliveryFailurePhase | Message, Parse, Execute, Delete | SqsDeliveryFailure.phase from Lambda and long-running consumer failure observers |
Wakeup Body
SQS MessageBody contains a versioned envelope:
type SqsWakeupEnvelopeV1 = {
type: 'runlane.wakeup'
version: 1
message: {
environment: { name: string }
runId: string
queue: string
requestedAt?: string
traceCarrier?: Record<string, string>
}
}One durable outbox row becomes one SQS message. The adapter batches provider calls with SendMessageBatch, but it does not bundle multiple run ids into one JSON body. That keeps SQS visibility, Lambda partial batch failure, FIFO dedupe, and Runlane leases aligned to the same unit: one run wakeup.
parseSqsWakeupBody(body) validates the envelope with Zod, converts requestedAt back to a Date, and validates the resulting value with the contract-owned deliveryMessageSchema.
Publish Behavior
sqsTransport().publishWakeups(command) validates PublishWakeupsCommand with publishWakeupsCommandSchema, groups attempts by resolved SQS queue URL, and sends SendMessageBatch chunks using batchSize, capped by the SQS maximum of 10 entries. outcomes[index] always describes command.attempts[index], even when AWS returns a mixed success/failure batch response.
AWS Successful[] entries become WakeupPublishOutcomeType.Published. AWS Failed[] entries become WakeupPublishOutcomeType.Failed with ErrorCode.TransportPublishFailed; provider codes, sender-fault flags, request ids, and queue details go in structured meta. Raw AWS error text is not copied into public failure messages.
If an AWS response omits an entry, duplicates an entry id, or contains an unrecognized entry id, the adapter throws an operation-level RunlaneError because indexed outcomes are no longer trustworthy. If the SDK call rejects before per-entry outcomes exist, the adapter throws TransportUnavailable for retryable provider/network failures or TransportPublishFailed for non-retryable provider failures.
Capabilities
The SQS adapter reports:
durableDelivery: truenativeDelay: falsemessageGrouping: trueonly when every configured queue is FIFOorderedDelivery: trueonly when every configured queue is FIFO
sqsTransport() derives the configured messageGrouping and orderedDelivery flags from the parsed queue map so mixed standard/FIFO adapters do not overstate adapter-wide ordering guarantees.
Runlane does not use SQS native delay for schedules, releases, retries, or outbox recovery. Storage availability and outbox state decide when work is due.
Standard Queues
Standard SQS queues are the default recommendation. Runlane correctness does not require provider ordering because every wakeup re-reads durable storage and tries to claim a lease before executing. Duplicate, delayed, stale, wrong-queue, already-leased, not-due, and terminal wakeups are ack-safe ignored results from executeDelivery(message, options).
Create a standard queue and optional dead-letter queue through your normal IaC system. A minimal AWS CLI sketch:
aws sqs create-queue --queue-name runlane-default-production
aws sqs create-queue --queue-name runlane-default-production-dlqAttach a redrive policy to the source queue so poison messages land in the DLQ after a bounded number of receives.
Standard source queues can share a standard DLQ. FIFO source queues need a FIFO DLQ; the AWS example provisions Dlq for Emails and Heavy, and DlqFifo for Ordered.
Set the SQS visibility timeout longer than the expected executeDelivery() handling window for that consumer. For Lambda event source mappings, AWS recommends a queue visibility timeout of at least six times the Lambda function timeout plus MaximumBatchingWindowInSeconds.
Do not share a Runlane SQS queue with non-Runlane messages unless your consumer deliberately parses and failure-handles those messages. The default parser treats malformed bodies as delivery failures so the provider can retry and eventually redrive them.
Choosing A Consumer
SQS is a transport choice. After SQS wakeups are published, exactly one SQS-consuming deployment path should drain each SQS queue:
| Deployment | Uses SQS? | Acquisition path | Use when |
|---|---|---|---|
| Lambda SQS consumer | Yes | AWS invokes Lambda with SQS records. | Lambda can run the lightweight orchestration work within its timeout and runtime limits. |
| Long-running SQS consumer | Yes | EC2, ECS, Fargate, or another Node process calls SQS ReceiveMessage. | You want SQS wakeups with long-running process control. |
| Core storage-polling worker | No | runlane.worker({ mode: WorkerMode.Poll }) scans storage. | You intentionally do not want SQS as the acquisition path. |
Do not publish SQS wakeups for a queue and then only run a storage-polling worker for that same queue. The polling worker can execute the durable runs, but it does not receive or delete SQS messages, so those provider messages become stale and eventually redrive or move to the DLQ.
Running a polling worker and SQS consumers against the same queue is usually wasteful. Storage leases prevent double execution, but two acquisition paths race for the same durable runs and make provider behavior harder to reason about. Use both only as a deliberate temporary migration or emergency backstop.
FIFO Queues
FIFO support is a queue-level provider option, not a separate Runlane abstraction:
import { queue } from '@runlane/core'
import { SqsFifoDeduplication, SqsFifoMessageGroup, sqsQueue, sqsTransport } from '@runlane/transport-sqs'
const emailsQueue = queue({ name: 'emails' })
const transport = sqsTransport({
client,
queues: [
sqsQueue(emailsQueue, {
queueUrl: process.env.RUNLANE_EMAILS_FIFO_QUEUE_URL,
fifo: {
messageGroup: SqsFifoMessageGroup.Run,
deduplication: SqsFifoDeduplication.Outbox,
},
}),
],
})Default messageGroup: SqsFifoMessageGroup.Run prevents one stuck run from blocking the whole logical queue. Use messageGroup: SqsFifoMessageGroup.Queue only when you intentionally want one ordered provider group for the whole logical queue.
Default deduplication: SqsFifoDeduplication.Outbox sends a provider-safe MessageDeduplicationId derived from the durable outbox message id. Use deduplication: SqsFifoDeduplication.ContentBased only when the FIFO queue has content-based deduplication enabled; the adapter will omit MessageDeduplicationId.
FIFO does not mean exactly-once task execution. Storage leases, durable run state, idempotency keys, singleton keys, and idempotent user code remain the correctness boundary.
If a configured queue URL or name ends in .fifo, include fifo options. If fifo options are present, the queue URL or name must point at a FIFO queue.
Lambda Consumer
Use the Lambda helper when AWS invokes a function with SQS records:
import { createRunlane } from '@runlane/core'
import { createSqsLambdaHandler } from '@runlane/transport-sqs'
const runlane = createRunlane({
lane,
queues,
tasks: { syncQuickBooksInvoices },
})
export const handler = createSqsLambdaHandler({
executeDelivery: (message, options) => runlane.executeDelivery(message, options),
})The helper does not import core. It parses each SQS record body, calls the supplied executeDelivery(message, options) callback, and returns Lambda's partial batch failure shape:
{
batchItemFailures: [{ itemIdentifier: record.messageId }]
}createSqsLambdaHandler() options:
| Option | Required | Default | What it controls |
|---|---|---|---|
executeDelivery | Yes | None | Callback that should call runlane.executeDelivery(message, options) for the runtime handling this queue. |
deliveryOptions | No | {} | leaseDuration, heartbeatInterval, workerId, and signal forwarded into every delivery attempt. |
fifo | No | false | Stops processing after the first failed record and returns that record plus later unprocessed records as batch failures. Required for FIFO ordering semantics. |
onDeliveryFailure | No | None | Observer called before the helper reports a failed record. Rejections from the observer are treated as handler failures. |
Do not run runlane.tick() at the end of this handler. The Lambda helper is the SQS record acknowledgment boundary; adding a global maintenance pass there couples provider batch latency to schedules, outbox recovery, lease recovery, and cancellation cleanup. Run maintenance from a separate scheduled function, cron process, container sidecar, or explicit CLI command.
Enable ReportBatchItemFailures on the Lambda event source mapping. Without it, Lambda does not process the returned batchItemFailures list: a handler that returns normally can acknowledge the whole provider batch, while a thrown handler failure retries the whole batch.
aws lambda create-event-source-mapping \
--function-name runlane-worker \
--event-source-arn arn:aws:sqs:us-east-1:123456789012:runlane-default-production \
--batch-size 10 \
--function-response-types ReportBatchItemFailuresFor standard queues, the helper continues after a failed record and reports only failed records. For FIFO queues, pass fifo: true; the helper stops after the first failure and returns that failed record plus every unprocessed later record so provider ordering is preserved.
export const handler = createSqsLambdaHandler({
fifo: true,
deliveryOptions: {
leaseDuration: '2m',
},
executeDelivery: (message, options) => runlane.executeDelivery(message, options),
})deliveryOptions.leaseDuration is not an SQS visibility timeout. It is the Runlane lease written to storage when runlane.executeDelivery(message, options) claims the run.
In a Postgres lane, the lease means "this run is held by worker invocation X until time T." If the Lambda crashes or times out before persisting a task outcome, the lease expires and a later tick() can request delivery again for another consumer.
Use a lease at least as long as the Lambda timeout. A 1-minute Lambda commonly uses leaseDuration: '2m' to leave a recovery buffer.
Persisted user task outcomes are not SQS failures. If core records run.succeeded, run.failed, run.retry_scheduled, run.released, or run.cancelled, the SQS record is acknowledged because delivery handling completed. Framework, storage, projection, registration, or persistence failures reject so Lambda can retry the SQS record.
onDeliveryFailure is optional. Use it to log parse or execution failures before the helper reports partial batch failure. It receives:
type SqsDeliveryFailure = {
phase: SqsDeliveryFailurePhase
providerMessageId?: string
deliveryMessage?: DeliveryMessage
queueUrl?: string
cause: unknown
}Lambda failures use SqsDeliveryFailurePhase.Message, SqsDeliveryFailurePhase.Parse, or SqsDeliveryFailurePhase.Execute. If onDeliveryFailure rejects, the helper rejects too; observer failures are treated as application failures, not silently swallowed.
Long-Running Consumer
Use the long-running consumer on EC2, ECS, Fargate, or another Node process when SQS is the work acquisition path. Its start() and stop() methods are only the lifecycle for this optional SQS receive loop; the core integration remains the supplied executeDelivery(message, options) callback.
import { SQSClient } from '@aws-sdk/client-sqs'
import { createRunlane } from '@runlane/core'
import { createSqsDeliveryConsumer } from '@runlane/transport-sqs'
const runlane = createRunlane({
lane,
queues,
tasks: { syncQuickBooksInvoices },
})
const consumer = createSqsDeliveryConsumer({
client: new SQSClient({ region: 'us-east-1' }),
queueUrl: process.env.RUNLANE_QUICKBOOKS_QUEUE_URL,
deliveryOptions: {
leaseDuration: '2m',
},
executeDelivery: (message, options) => runlane.executeDelivery(message, options),
maxNumberOfMessages: 5,
waitTimeSeconds: 10,
visibilityTimeoutSeconds: 120,
})
await consumer.start()
process.once('SIGTERM', () => {
void consumer.stop()
})Treat a consumer instance as single-start. start() is idempotent while the loop is already running; after stop() or a fatal closed rejection, create a new consumer instance to resume receiving.
The consumer loop:
- Calls SQS
ReceiveMessage. - Parses each wakeup body.
- Calls
executeDelivery(message, options). - Deletes only messages whose delivery resolves.
deliveryOptions supports the same execution controls as core's executeDelivery() options: leaseDuration, heartbeatInterval, signal, and diagnostic workerId. Long-running consumers link forwarded signals to stop() and to the consumer-level signal, so cooperative task code can stop through context.signal.
Messages whose provider shape, wakeup body, or execution path rejects are not deleted. SQS visibility timeout and redrive policy handle retry and DLQ movement.
onDeliveryFailure receives the same SqsDeliveryFailure shape as the Lambda helper. Consumer failures may also include SqsDeliveryFailurePhase.Message, SqsDeliveryFailurePhase.Delete, and queueUrl.
createSqsDeliveryConsumer() options:
| Option | Required | Default | What it controls |
|---|---|---|---|
client | Yes | None | AWS SDK v3 SQS client or compatible send() surface. |
queueUrl | Yes | None | SQS queue URL this consumer receives from. Start one consumer per provider queue URL. |
executeDelivery | Yes | None | Callback that should call runlane.executeDelivery(message, options). |
deliveryOptions | No | {} | Delivery lease, heartbeat, signal, and worker id controls forwarded into each run attempt. |
fifo | No | false | Must be true when reading a FIFO queue URL ending in .fifo. |
maxNumberOfMessages | No | 10 | SQS receive batch size, bounded by the provider maximum. |
waitTimeSeconds | No | 20 | SQS long-poll wait time per receive call. |
visibilityTimeoutSeconds | No | Queue default | Per-receive visibility override. Keep it longer than the delivery handling window. |
signal | No | None | Stops the receive loop cooperatively and links into forwarded delivery signals. |
onDeliveryFailure | No | None | Observer for parse, execute, message-shape, or delete failures. |
If DeleteMessageBatch returns per-entry failures, the consumer reports each failed delete through onDeliveryFailure with SqsDeliveryFailurePhase.Delete when that observer is configured, and keeps the receive loop alive.
SQS will make undeleted messages visible again according to the queue visibility timeout and redrive policy. SDK-level receive and delete call failures reject consumer.closed because the loop cannot prove provider receive or acknowledgement state for that batch.
One consumer loop reads one SQS queue URL. Start one consumer per SQS queue that the process should handle:
const emailsConsumer = createSqsDeliveryConsumer({
client,
queueUrl: process.env.RUNLANE_EMAILS_QUEUE_URL,
executeDelivery: (message, options) => runlane.executeDelivery(message, options),
})
const mediaConsumer = createSqsDeliveryConsumer({
client,
queueUrl: process.env.RUNLANE_MEDIA_QUEUE_URL,
executeDelivery: (message, options) => runlane.executeDelivery(message, options),
})
await emailsConsumer.start()
await mediaConsumer.start()This is different from runlane.worker({ mode: WorkerMode.Poll }). A polling worker scans storage and ignores SQS messages. An SQS consumer waits on SQS receive and handles delivered wakeups.
IAM
Producer permissions when every queue uses queueUrl:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["sqs:SendMessage"],
"Resource": [
"arn:aws:sqs:us-east-1:123456789012:runlane-default-production",
"arn:aws:sqs:us-east-1:123456789012:runlane-emails-production"
]
}
]
}Add sqs:GetQueueUrl when using queueName resolution:
{
"Effect": "Allow",
"Action": ["sqs:GetQueueUrl"],
"Resource": "*"
}Consumers need receive/delete permissions for the queues they drain. Lambda event source mappings commonly also require queue attribute reads; the long-running helper itself sends ReceiveMessage and DeleteMessageBatch.
{
"Effect": "Allow",
"Action": ["sqs:ReceiveMessage", "sqs:DeleteMessage", "sqs:GetQueueAttributes"],
"Resource": "arn:aws:sqs:us-east-1:123456789012:runlane-default-production"
}Add the required KMS permissions if your queues use a customer-managed KMS key.
AWS Batch And External Compute
Do not put AWS Batch support in the SQS transport. SQS should wake the lightweight Runlane delivery process; AWS Batch can own heavyweight compute such as headless Chrome.
Model external compute as ordinary task code that submits or checks a provider job, stores or rediscovers the provider job id durably, and releases the Runlane run while the external job is still running:
const renderPdf = task({
id: 'pdf.render',
schema: renderPdfSchema,
async run(payload, context) {
const job = await getOrSubmitBatchJob(payload)
if (job.status !== 'SUCCEEDED') {
return context.release('30s', { reason: 'batch_not_finished' })
}
await saveRenderedPdf(job)
},
})That keeps transport, durable run lifecycle, and compute orchestration in the right layers.
Provider Constraints
The adapter encodes AWS hard ceilings as provider constraints and exposes caller-tunable options inside those bounds. Defaults favor efficient provider usage, but production consumers can reduce batch and long-poll sizes for latency, shutdown behavior, cost, or deployment-specific fairness.
| Option | Default | Valid range | Applies to |
|---|---|---|---|
batchSize | 10 | 1..10 | sqsTransport() publish batching. |
maxNumberOfMessages | 10 | 1..10 | createSqsDeliveryConsumer() receive size. |
waitTimeSeconds | 20 | 0..20 | createSqsDeliveryConsumer() long polling. |
visibilityTimeoutSeconds | Provider queue default | 0..43200 | createSqsDeliveryConsumer() per-receive visibility override. |
deliveryOptions.leaseDuration | Core lease default (5m) | Positive duration string | Durable Runlane ownership window written by executeDelivery(). Set it greater than or equal to the consumer timeout. |
deliveryOptions.heartbeatInterval | Half of lease duration | Positive duration string shorter than lease duration | How often executeDelivery() extends the Runlane lease while task code runs. |
deliveryOptions.workerId | Runtime default worker id | Runlane worker id | Diagnostic identity recorded on leases; queues route work, worker ids do not. |
deliveryOptions.signal | None | AbortSignal | Cooperative abort signal forwarded into TaskContext.signal; long-running consumers also link it to consumer stop. |
SendMessageBatchsends at most 10 entries per request.ReceiveMessagereceives at most 10 messages per request.- SQS long polling waits at most 20 seconds per receive request.
- Message visibility timeout is bounded by SQS to 12 hours.
MessageGroupIdis required for FIFO queues.MessageDeduplicationIdis required for FIFO queues unless content-based deduplication is enabled and configured in Runlane asdeduplication: SqsFifoDeduplication.ContentBased.- Lambda partial batch failure responses use
batchItemFailures[].itemIdentifier.
AWS references: SendMessageBatch, ReceiveMessage, Lambda with SQS, Lambda SQS event source configuration, and SQS partial batch failures.
Local SQS Gate
The package includes an opt-in Ministack integration suite for local SQS-compatible behavior. Start Ministack on the default edge port:
docker run --rm --name runlane-ministack-test -p 4566:4566 ministackorg/ministackThen run the transport package's live SQS gate:
pnpm --filter @runlane/transport-sqs test:sqs:ministackStop the local container when the gate finishes:
docker stop runlane-ministack-testThe suite creates ephemeral standard and FIFO queues, publishes through sqsTransport(), receives through the real AWS SDK SQS client, and exercises the long-running consumer delete-after-resolve path. Set RUNLANE_SQS_MINISTACK_ENDPOINT or RUNLANE_SQS_MINISTACK_REGION when Ministack is not running at http://localhost:4566 in us-east-1.
The package-local wrapper lives at packages/transport-sqs/scripts/test-sqs-ministack-local.ts; keep local SQS test setup changes there so the entry point remains discoverable from the package.