Postgres SQS Lane
Reference production lane composed from Postgres storage and SQS wakeups.
@runlane/lane-postgres-sqs is the reference production lane. It composes postgresStorage() from @runlane/postgres-storage, sqsTransport() from @runlane/transport-sqs, and createLane() from @runlane/contracts.
The package does not own task execution, retries, releases, schedules, worker loops, Lambda parsing, or outbox semantics. Core owns behavior. Postgres owns durable truth. SQS carries wakeups.
pnpm add @runlane/core @runlane/lane-postgres-sqs @runlane/postgres-storage @runlane/transport-sqs @aws-sdk/client-sqsWhen To Use This Lane
Use this lane when one Runlane runtime should share durable state through Postgres and wake workers through SQS:
| Use it for | Why |
|---|---|
| Multi-process production runtimes | Producers, Lambda consumers, long-running workers, maintenance, and CLI commands all see the same Postgres state. |
| SQS-based acquisition | Each durable outbox row becomes one SQS wakeup, and consumers call runlane.executeDelivery() for the delivered run. |
| Operator history and retention | Postgres serves run lists, event history, idempotency ownership, singleton ownership, queue capacity, and pruning. |
| AWS deployments | The lane fits RDS Postgres plus SQS, Lambda event source mappings, EventBridge maintenance, ECS/Fargate workers, and AWS SDK credentials. |
Use createLocalLane() instead when you want an in-memory local backend without Postgres or SQS. Use separate lanes only when the infrastructure boundary is actually different: another database or schema, AWS account, region, tenant boundary, compliance boundary, or storage/transport pair. Use queues for ordinary workload routing inside the same Postgres/SQS boundary.
Create The Lane
import { SQSClient } from '@aws-sdk/client-sqs'
import { queue } from '@runlane/core'
import { postgresSqsLane } from '@runlane/lane-postgres-sqs'
import { sqsQueue } from '@runlane/transport-sqs'
export const defaultQueue = queue({ name: 'default', default: true })
export const emailsQueue = queue({ name: 'emails' })
export const queues = [defaultQueue, emailsQueue]
export const lane = postgresSqsLane({
postgres: {
connectionString: process.env.DATABASE_URL,
schema: 'runlane',
},
sqs: {
client: new SQSClient({ region: 'us-east-1' }),
queues: [
sqsQueue(defaultQueue, {
queueUrl: process.env.RUNLANE_DEFAULT_QUEUE_URL,
}),
sqsQueue(emailsQueue, {
queueName: 'runlane-emails-production',
}),
],
},
})postgres accepts the same options as postgresStorage(). sqs accepts the same options as sqsTransport(). The lane package validates only its own top-level shape and delegates nested adapter validation to the packages that own those contracts.
| Option | Required | Default | What it controls |
|---|---|---|---|
postgres | Yes | None | Options forwarded to postgresStorage(). |
sqs | Yes | None | Options forwarded to sqsTransport(). |
name | No | postgres-sqs | Lane diagnostic name. |
Unsupported top-level keys fail before adapters are constructed. Unsupported nested postgres or sqs keys fail in the owning adapter constructor. Because sqsTransport() exposes its bound queue definitions, createRunlane({ queues }) also verifies that every runtime queue has a matching SQS provider binding with the same queue policy.
By default the lane is named postgres-sqs. Pass name only when you need a different diagnostic label:
postgresSqsLane({
name: 'orders-production',
postgres: { connectionString, schema: 'runlane' },
sqs: { client, queues },
})Run getPostgresStorageMigrationSql() from @runlane/postgres-storage in your normal migration system before starting the lane. lane.start() probes the migrated Postgres tables and fails fast when the database, schema, or migration is not ready.
Capabilities And Lifecycle
postgresSqsLane() returns a standard Lane from createLane():
| Boundary | Reported behavior |
|---|---|
| Lane name | name ?? 'postgres-sqs' |
lane.capabilities.operatorReads | true, inherited from createLane() defaults and backed by Postgres operator reads |
lane.capabilities.productionDurable | true |
lane.capabilities.storage | Exact postgresStorageCapabilities: durable state, process-shared state, leases, idempotency, singleton, queue concurrency, schedule occurrence claims, persisted outbox, history reads, and pruning |
lane.capabilities.transport | Exact SQS transport capabilities; durableDelivery: true, nativeDelay: false, and adapter-wide FIFO grouping/ordering flags only when every configured queue is FIFO |
Lifecycle order is the createLane() order:
lane.start()starts Postgres storage first, then SQS transport.- If transport startup fails, storage is closed.
lane.close()closes transport first, then Postgres storage.
The current SQS transport has no startup probe, so the meaningful production probe is Postgres storage startup. Queue URL resolution for queueName bindings happens on first publish and is cached by the transport.
Dispatch And Recovery
trigger() validates the task payload and creates or returns a durable run in Postgres. What happens next depends on queue policy and dispatch mode:
| Case | Durable effect |
|---|---|
| Duplicate idempotency owner | Returns the existing owner run; no new delivery request or SQS wakeup is created. |
| Unbounded queue | Appends run.created and run.delivery_requested together, and Postgres writes the outbox row in the same transaction. |
Bounded queue with concurrencyLimit | Appends run.created only. A later tick() reserves queue capacity through Postgres, appends run.delivery_requested, and writes the outbox row. |
With the default dispatch.onTrigger: TriggerDispatchMode.Eager, core claims and publishes only the outbox rows returned by that creation call. That means eager trigger dispatch can publish the initial wakeup for unbounded queues, but it does not bypass bounded queue capacity. Bounded queues still need maintenance to reserve capacity and publish wakeups.
tick() is the recovery and maintenance path. It reserves bounded-queue dispatch capacity, publishes deferred outbox rows, retries failed eager publish attempts after the outbox retry delay, materializes due schedules, wakes due released or retried runs, recovers expired leases, and finalizes eligible cancellations.
Run tick() from separate maintenance infrastructure:
- EventBridge scheduled Lambda
- cron process
- container sidecar
- operator command
Do not call tick() at the end of the SQS delivery Lambda batch. Delivery Lambdas should stay focused on the SQS records AWS already delivered; maintenance is a bounded global pass and can run concurrently across its own scheduler.
const runlane = createRunlane({
lane,
queues,
tasks: { sendEmail },
})
await runlane.trigger(runlane.tasks.sendEmail, { userId: 'user_123' })
await runlane.tick()Use dispatch: { onTrigger: TriggerDispatchMode.Deferred } when the producer process should persist work only and let a maintenance process publish SQS wakeups for rows that are already dispatchable:
import { TriggerDispatchMode } from '@runlane/core'
const runlane = createRunlane({
dispatch: { onTrigger: TriggerDispatchMode.Deferred },
lane,
queues,
tasks: { sendEmail },
})Deployment Shape
A production deployment usually has four process roles sharing the same runtime factory:
| Role | What it does |
|---|---|
| Producer | Calls runlane.start() once per process, then runlane.trigger() from HTTP handlers, jobs, scripts, or app code. |
| SQS consumer | Drains exactly one SQS acquisition path per logical queue, then calls runlane.executeDelivery(message, options). |
| Maintenance | Runs runlane.tick() on a schedule such as EventBridge, cron, sidecar, or operator command. |
| Operator/CLI | Uses the same runtime to list, inspect, cancel, retry, rerun, prune, or trigger runs. |
Keep lane composition and task registration in one shared factory so every role registers the same queues and tasks. The AWS example uses this shape: an HTTP trigger Lambda, two SQS-Lambda consumers, one Fargate SQS consumer, and an EventBridge tick Lambda all import the same runtime factory.
For example:
// src/runtime/runlane.ts
import { SQSClient } from '@aws-sdk/client-sqs'
import { createRunlane, queue, task } from '@runlane/core'
import { postgresSqsLane } from '@runlane/lane-postgres-sqs'
import { sqsQueue } from '@runlane/transport-sqs'
import * as z from 'zod'
import { emailProvider } from '../email-provider.js'
const emailsQueue = queue({ name: 'emails', default: true })
const sendEmail = task({
id: 'emails.send',
queue: emailsQueue,
schema: z.object({ userId: z.string().min(1) }),
idempotencyKey: (payload) => `emails.send.${payload.userId}`,
async run(payload, context) {
await emailProvider.send(payload.userId, { signal: context.signal })
},
})
export function createAppRunlane() {
const sqsEndpoint = process.env.RUNLANE_SQS_ENDPOINT
const sqsClient = new SQSClient({
region: process.env.AWS_REGION ?? 'us-east-1',
...(sqsEndpoint === undefined ? {} : { endpoint: sqsEndpoint }),
})
const lane = postgresSqsLane({
postgres: {
connectionString: process.env.DATABASE_URL,
schema: process.env.RUNLANE_POSTGRES_SCHEMA ?? 'public',
},
sqs: {
client: sqsClient,
queues: [
sqsQueue(emailsQueue, {
queueUrl: process.env.RUNLANE_QUEUE_EMAILS_URL,
}),
],
},
})
return createRunlane({
environment: { name: process.env.RUNLANE_ENVIRONMENT ?? 'production' },
lane,
queues: [emailsQueue],
tasks: { sendEmail },
})
}Lambda SQS Handler
The lane package intentionally does not wrap Lambda. Use the SQS transport helper and pass core's executeDelivery() callback:
import { createSqsLambdaHandler } from '@runlane/transport-sqs'
import { createAppRunlane } from '../runtime/runlane.js'
const runlane = createAppRunlane()
let startup: Promise<void> | undefined
async function ensureStarted() {
startup ??= runlane.start()
await startup
}
export const handler = createSqsLambdaHandler({
deliveryOptions: {
leaseDuration: '2m',
},
async executeDelivery(message, options) {
await ensureStarted()
return runlane.executeDelivery(message, options)
},
})Enable Lambda partial batch responses with ReportBatchItemFailures on the SQS event source mapping.
The handler acknowledges a provider message when Runlane delivery handling completed, even if the durable task attempt records run.failed, run.retry_scheduled, run.released, or run.cancelled. Framework, storage, registration, projection, parser, or persistence failures leave the SQS record unacknowledged so AWS can retry or redrive it.
Use one production acquisition path per logical queue. For a Postgres+SQS lane, that path is an SQS consumer: Lambda through createSqsLambdaHandler() or a long-running process through createSqsDeliveryConsumer(). runlane dev and runlane.worker({ mode: WorkerMode.Poll }) are storage-polling workers; they can execute durable runs from Postgres, but they will not receive or delete provider messages from SQS. Use polling workers only with a polling lane/config that does not publish SQS wakeups for that queue. For local testing of SQS delivery behavior, run the SQS Lambda helper or long-running SQS consumer against LocalStack, Ministack, or AWS.
Local SQS Path
Use a real SQS consumer when the local test is meant to prove the Postgres+SQS lane. Do not run runlane dev for the same queue during this test: the dev worker polls Postgres directly and can claim the run before the SQS consumer handles the wakeup.
The generic Runlane path is a small Node process using createSqsDeliveryConsumer(). Framework-specific queue subscribers are valid when the app already uses that framework, but they are not required.
| Path | What it proves | What runs locally |
|---|---|---|
createSqsDeliveryConsumer() | SQS receive/delete behavior and runlane.executeDelivery() without a framework | A long-running Node consumer receives from AWS SQS, LocalStack, or Ministack. |
| Framework queue subscriber | Lambda event-source wiring, partial batch response handling, and createSqsLambdaHandler() | The framework runs or proxies the subscriber handler while SQS remains the provider queue. |
For a fully local Ministack path, point both the lane transport and the consumer at the same local queue URL. createAppRunlane() below is an application-owned runtime factory; Runlane exports createRunlane(), not your app factory.
// scripts/run-sqs-consumer.ts
import { SQSClient } from '@aws-sdk/client-sqs'
import { createSqsDeliveryConsumer } from '@runlane/transport-sqs'
import { createAppRunlane } from '../src/runtime/runlane.js'
const runlane = createAppRunlane()
await runlane.start()
const sqs = new SQSClient({
credentials: {
accessKeyId: process.env.AWS_ACCESS_KEY_ID ?? 'test',
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY ?? 'test',
},
endpoint: process.env.RUNLANE_SQS_ENDPOINT,
region: process.env.AWS_REGION ?? 'us-east-1',
})
const consumer = createSqsDeliveryConsumer({
client: sqs,
deliveryOptions: {
leaseDuration: '2m',
},
executeDelivery: (message, options) => runlane.executeDelivery(message, options),
queueUrl: process.env.RUNLANE_QUEUE_EMAILS_URL,
visibilityTimeoutSeconds: 120,
waitTimeSeconds: 2,
})
process.once('SIGINT', () => {
void consumer.stop()
})
process.once('SIGTERM', () => {
void consumer.stop()
})
await consumer.start()
await consumer.closed
await runlane.close()Run the consumer beside Ministack and the producer command:
# terminal 1
ministack
# terminal 2
RUNLANE_SQS_ENDPOINT=http://127.0.0.1:4566 pnpm tsx scripts/run-sqs-consumer.ts
# terminal 3
RUNLANE_SQS_ENDPOINT=http://127.0.0.1:4566 runlane trigger emails.send '{"userId":"usr_demo_alice"}'For an unbounded queue with eager dispatch, trigger() should persist the run in Postgres, publish the wakeup to the configured SQS queue, and the consumer should delete the message only after executeDelivery() resolves. Run runlane tick separately when testing bounded queue dispatch, schedules, releases, retries, lease recovery, or deferred outbox publishing.