Adapter Authoring
Implement storage and transport adapters from contracts alone.
Runlane lanes are composed from one storage adapter and one transport adapter. Storage owns durable truth. Transport wakes workers. A lane package wires compatible adapters together, drives optional lifecycle hooks, and reports the combined capabilities.
interface Lane {
readonly name: string
readonly capabilities: LaneCapabilities
readonly storage: StorageAdapter
readonly transport: TransportAdapter
start?(): Promise<void>
close?(): Promise<void>
}Storage
Implement StorageAdapter when you are building a reusable durable backend such as Postgres, MySQL, DynamoDB, SQLite, or Redis.
import type { StorageCapabilities } from '@runlane/contracts'
export const postgresStorageCapabilities = {
claimsScheduleOccurrences: true,
durableState: true,
enforcesIdempotency: true,
enforcesQueueConcurrency: true,
enforcesSingleton: true,
leasesRuns: true,
persistsOutbox: true,
processLocalState: false,
prunesRuns: true,
readsRunHistory: true,
} satisfies StorageCapabilitiesThe important rule is the append boundary: core owns reducer semantics; storage owns atomic persistence.
durableState and processLocalState answer different questions. durableState: false means process loss can lose committed state. processLocalState: true means another process cannot construct an equivalent adapter and reach the same state. Redis without persistence can be non-durable but shared across processes; in-memory local storage is both non-durable and process-local.
Report capability flags as public promises, not implementation wishes. Core and operator APIs use these flags to decide whether runtime construction, maintenance, and operator commands are valid.
Storage capability flags:
| Capability | Meaning |
|---|---|
durableState | Committed storage state survives adapter process loss. |
processLocalState | Storage state is only reachable from the process that owns this adapter instance. |
readsRunHistory | Operator run and event reads are supported. |
prunesRuns | Terminal run pruning is supported. |
leasesRuns | Workers can claim and heartbeat run leases through storage. |
claimsScheduleOccurrences | Maintenance can claim schedule occurrences safely. |
persistsOutbox | Delivery requests create storage-backed outbox rows. |
enforcesIdempotency | Storage enforces task-scoped idempotency ownership. |
enforcesSingleton | Storage enforces singleton ownership. |
enforcesQueueConcurrency | Storage enforces bounded queue capacity. |
appendRunEvents() receives append-only events and the core-projected RunRecord. It verifies environment, runId, and expectedSequence, persists the events and supplied projection, and creates outbox rows for any run.delivery_requested events in the same transaction.
Append commands include a required expectedSequence; callers use the current RunRecord.eventSequence or contractDefaults.run.newEventSequence for a new run.
Storage adapters may expose start() and close() hooks for pools, embedded engines, or external clients. Lane/runtime code starts storage before transport and closes transport before storage.
StorageAdapter.start() and StorageAdapter.close() are optional lifecycle hooks. Every other StorageAdapter method below is required by the contract and must exist on the adapter instance. When a required method is guarded by an unsupported capability, it should fail fast with RunlaneError and ErrorCode.CapabilityUnsupported; it must not silently no-op or return an empty success value.
| Method | Contract |
|---|---|
appendRunEvents() | Atomically persist supplied run events, the supplied projected run, and any outbox rows implied by run.delivery_requested events after verifying environment, run id, and expected sequence. |
getRun() | Return the current materialized run for one environment/run id, or undefined when it is absent. |
getRuns() | Batch-read current materialized runs for one environment in caller-requested order while omitting missing runs. |
getRunByIdempotencyKey() | Return the active or retained terminal run owning one task-scoped idempotency key, or undefined when no retained owner exists. |
resetIdempotencyKey() | Clear one retained terminal idempotency-key owner, rejecting active owners. |
listRuns() | Page operator run summaries with opaque cursor semantics, stable ordering, and filters scoped to the requested environment. |
listRunEvents() | Page durable run history records without losing event order; sequence sorting is meaningful only inside one run. |
listRunnableRuns() | Return compact due-work candidates for storage polling; do not return payload-bearing run records. |
listRunsNeedingDispatch() | Return bounded-queue runs that are due and need a dispatch reservation. |
listRunsNeedingCancellationFinalization() | Return cancellation-requested runs whose current lease has expired so maintenance can append terminal cancellation. |
listRunsNeedingDelivery() | Return runs that need fresh delivery recovery, excluding queued runs that already have outbox-backed delivery intent. |
reserveRunDispatch() | Atomically reserve bounded queue capacity, append the supplied run.delivery_requested event, and create the outbox row. |
claimRunLease() | Atomically claim an executable run attempt using the core-projected lease events and return the updated run only when ownership was acquired. |
heartbeatRunLease() | Atomically append the core-projected heartbeat event for the current lease owner and return the updated run. |
releaseRunLease() | Clear the current owner's lease after appendRunEvents() has already persisted the terminal or waiting lifecycle event and projected inactive run. |
claimScheduleOccurrence() | Claim one schedule occurrence by deterministic occurrence id and fire time, returning undefined when another owner already holds or completed it. |
completeScheduleOccurrence() | Complete a claimed schedule occurrence with the runs it materialized, enforcing the claim token. |
claimOutboxMessages() | Claim due outbox rows for publishing, optionally restricted to specific message ids, and return only rows owned by the new claim. |
markOutboxMessagesPublished() | Batch-ack successfully published claimed outbox rows. |
markOutboxMessagesFailed() | Batch-record retryable publish failures and next availability for claimed outbox rows. |
markOutboxMessagesDeadLettered() | Batch-record terminal publish failures for claimed outbox rows. |
pruneRuns() | Remove terminal runs selected by the prune command and return bounded progress with an opaque continuation cursor when more work remains. |
Storage concurrency guarantees
Adapter correctness depends on the guarantees below, not on any particular database primitive. Postgres uses advisory locks, row locks, generated columns, and ON CONFLICT, but another backend can satisfy the same contract with conditional writes, compare-and-swap transactions, serializable partitions, leases, or single-writer streams. The important rule is the observable behavior under competing callers.
appendRunEvents() must compare the current persisted event sequence with expectedSequence before validating projected run invariants. If it does not match, reject with RunlaneError, ErrorCode.StorageConflict, and StorageConflictKind.EventSequence. This preserves optimistic-concurrency recovery: a stale append is a lost race, not an adapter contract violation.
When an append succeeds, these records commit atomically:
- event-history rows
- materialized projection
- storage-owned idempotency or singleton ownership rows
- derived outbox rows
The RunEventRecord values returned from appendRunEvents(), claimRunLease(), and heartbeatRunLease() must be the exact records persisted to history, including ids and sequence numbers. Do not generate one event id for storage and a second event id for the method result.
Idempotency owner creation is a serialized ownership decision for (environment, taskId, idempotencyKey).
| Race outcome | Required behavior |
|---|---|
| One run creates the live owner | That owner wins. |
| Another run attempts the same live owner | Reject with StorageConflictKind.IdempotencyKey so core can read and return the original run. |
| Adapter uses upsert | It must not overwrite a concurrent owner. |
A valid implementation may lock the partition, conditionally insert then read back the owner, or use a transaction primitive that guarantees one owner.
Idempotency retention uses the policy captured at owner creation:
- active owners never expire while active
- with a finite TTL, successful and cancelled terminal owners remain readable until their captured TTL expires or
resetIdempotencyKey()clears them - with
ttl: "active", ownership is released when the run reaches any terminal state - failed owners are released
getRunByIdempotencyKey() is side-effect free: an expired owner returns undefined; cleanup can happen on a later write, reset, or prune path.
Bounded queue capacity is partitioned by (environment, queue, concurrencyKey). Absent concurrency keys share one partition.
listRunnableRuns() and listRunsNeedingDispatch() should apply capacity predicates before returning candidates so a capacity-blocked early row does not starve runnable work in another partition.
claimRunLease() and reserveRunDispatch() must enforce the same capacity partition inside the write operation. A scan-time check alone is not enough, and row-locking only the candidate run is not enough when two different runs in the same partition can be claimed or reserved concurrently.
reserveRunDispatch() is the atomic bounded-queue delivery path. In one operation, it:
- Checks the current sequence.
- Confirms the run is still dispatchable.
- Verifies partition capacity.
- Appends the supplied
run.delivery_requestedevent. - Writes the projected reservation.
- Creates the outbox row.
A stale sequence, expired/ineligible dispatch state, or full capacity returns undefined; malformed command data remains an adapter contract violation.
claimRunLease() is a race-tolerant ownership operation. A stale sequence, expired dispatch reservation, or full bounded queue returns undefined so competing workers can keep polling without treating normal contention as a framework error. heartbeatRunLease() is stricter because the caller believes it already owns the run: stale sequence is StorageConflictKind.EventSequence, and lost lease ownership is StorageConflictKind.LeaseOwnership.
Outbox mutation methods are batch-only by contract.
claimOutboxMessages() atomically transitions only due rows whose status is pending, failed, or claimed with an expired claim into a new claim, and returns the exact rows now owned by that claim. When outboxMessageIds is provided, do not widen the claim into unrelated backlog rows.
markOutboxMessagesPublished(), markOutboxMessagesFailed(), and markOutboxMessagesDeadLettered() must only mutate messages whose current claim token still matches the command. A stale or missing claim rejects with StorageConflictKind.OutboxClaim.
Promise-returning adapter methods must report validation, contract, conflict, and backend failures as rejected promises. If an adapter implementation performs synchronous validation, wrap it so callers never observe a synchronous throw from a Promise-typed method.
Records returned from storage must be defensive copies. Mutable Date instances and nested objects must not share references with adapter-owned state or command input.
listRunnableRuns() returns compact RunnableRunRef candidates for worker acquisition. It must not return payload-bearing RunRecord values.
Workers use the refs to pick candidates, read the full run, and then supply core-projected lease events to claimRunLease(). Use getRunRunnableAvailableAt() for the due-work predicate and ordering timestamp, and use runStatusValues.isRunnableCandidate() when adapter code needs status narrowing.
listRunsNeedingDelivery() is the maintenance query for fresh delivery requests.
| Must include | Must exclude | Predicate helper |
|---|---|---|
Due scheduled, released, retrying, plus expired-lease running runs | queued runs, because they already have delivery intent and outbox state | getRunDeliveryRecoveryAvailableAt() |
Use runStatusValues.isDeliveryRecoveryCandidate() when adapter code needs to narrow the compact ref status. Maintenance then calls getRuns() once for the selected ids, appends run.delivery_requested, and lets storage create a new outbox row atomically with that event append.
listRunsNeedingDispatch() and reserveRunDispatch() are the bounded-queue dispatch path. Queue capacity must be enforced inside the storage transaction because multiple schedulers and workers can race.
Capacity counts currently running runs plus unexpired dispatch reservations in the same environment, queue, and concurrencyKey partition. When no concurrencyKey is present, the whole queue is one partition.
Dispatch reservations use dispatchExpiresAt; if no worker claims the run before that time, maintenance may reserve and publish it again.
listRunsNeedingCancellationFinalization() is the cancellation maintenance query. It returns only cancellation_requested runs whose current lease has expired.
Use getRunCancellationFinalizationAvailableAt() for the predicate and runStatusValues.isCancellationFinalizationCandidate() for status-only narrowing. Maintenance then reads current projections with getRuns() and appends run.cancelled; it does not wake workers for cancellation finalization.
getRunByIdempotencyKey() is the recovery read behind retained idempotent trigger() semantics. Idempotency ownership is scoped by environment, task id, and key.
Retention rules:
- active owners never expire while active
- successful and cancelled terminal owners with a finite captured
idempotencyKeyTTLremain until that TTL expires orresetIdempotencyKey()clears them - owners captured with
idempotencyKeyTTL: "active"are released on terminal state - failed owners are cleared automatically
Storage must use its authoritative owner table for this lookup, not operator pagination.
pruneRuns() receives terminal statuses and a concrete olderThan: Date from core and must never delete active runs. Core resolves public duration strings before calling storage and freezes that cutoff into public continuation cursors.
When the command omits limit, storage uses contractDefaults.pruning.batchLimit. When storage cannot finish in one bounded call, it returns nextCursor; core passes the adapter cursor back on the next prune command with the same retention filter.
Ownership tokens are part of the contract. Lease tokens are supplied in the core-projected lease events passed to claimRunLease() and heartbeatRunLease() so storage does not compute run projections. Schedule occurrence and outbox claim tokens are storage-generated, and callers return those tokens for schedule completion, publish, failure, or dead-letter updates.
Adapter indexes should treat Runlane ids as opaque strings. Public Runlane ids are non-empty and reserve : for backend-internal key composition; adapters should still avoid parsing prefixes or separators for behavior.
Schedule occurrence ids and generated schedule run ids use deterministic hashed scope data and may be longer than random run ids. Do not parse them to recover environment, schedule, or fire time.
Outbox mutations are batch-only so networked adapters can persist publish results without one round-trip per row.
| Method | Required behavior |
|---|---|
markOutboxMessagesPublished() | Acknowledge successful publishes. |
markOutboxMessagesFailed() | Record retryable failed attempts and optional nextAvailableAt. |
markOutboxMessagesDeadLettered() | Move poisoned rows to OutboxMessageStatus.DeadLettered with final failure records. |
claimOutboxMessages() with outboxMessageIds | Claim only those rows; do not widen into an unrelated backlog sweep. |
OutboxFailureRecord.code must be an ErrorCode. Provider-specific response codes belong in meta so operator tooling can branch on Runlane's stable vocabulary.
Run events are durable replay records. Existing event types stay backward compatible: add optional fields with reducer defaults, or introduce a new RunEventType when new required data is needed.
Lease heartbeats are real events in v1. Adapters must preserve observable sequence and history semantics for listRunEvents(). Physical compaction is only valid if callers still see the same ordered event records, or if a future compaction contract explicitly changes that behavior.
Use environmentKey(environment) for durable scoping and uniqueness indexes. Do not build parallel environment key logic from environment.name; future environment identity fields must widen the same contract everywhere.
Operator list APIs use opaque cursors. Storage should apply contractDefaults.pagination, and include filter and sort state in cursor semantics so a cursor cannot silently resume a different ordering. Event sequence sorting is only valid inside one run, so adapters should require runId when callers request RunEventSortField.Sequence.
Transport
Implement TransportAdapter when you are building a reusable wakeup backend such as SQS, Redis, RabbitMQ, Pub/Sub, Kafka, or HTTP push.
Transport should carry the minimum needed to wake a worker. It does not own payloads or materialized run state.
import { asId, type DeliveryMessage } from '@runlane/contracts'
const wakeup = {
environment: { name: 'production' },
queue: asId<'queue'>('emails'),
requestedAt: new Date(),
runId: asId<'run'>('run_123'),
} satisfies DeliveryMessageWorkers use the wakeup environment and runId to read current durable state from storage before executing. Duplicate or delayed transport messages are safe only when storage remains the durable truth boundary.
Transport capability flags:
| Capability | Meaning |
|---|---|
durableDelivery | Provider-accepted wakeups survive adapter process loss until the provider delivers or redrives them. |
messageGrouping | The adapter can group wakeups for provider-level FIFO or partition semantics. |
nativeDelay | The provider can hold future wakeups natively; Runlane's first-party adapters currently use storage due times and outbox recovery instead. |
orderedDelivery | The adapter can preserve provider-level order for the configured queue set. |
If a transport exposes queues, runtime construction verifies that every runtime queue has a matching provider binding and policy. Omit queues only for transports whose binding cannot be described as static Runlane queue definitions.
Transport-driven execution uses runlane.executeDelivery(message), not a drain worker. Drain asks storage what work is due; delivery execution uses the transport-acquired runId, verifies the stored run is still runnable for that queue, and returns an ignored result for ack-safe stale wakeups such as terminal, not-due, wrong-queue, already-leased, claim-lost, or abandoned attempts.
Reusable transport adapters should stay contract-only and should not import core. Provider-specific runtime helpers, such as an SQS/Lambda bridge, can parse provider records into DeliveryMessage values and then call executeDelivery().
Persisted user task outcomes are not transport failures. Storage, projection, registration, and persistence failures should propagate so the provider integration can retry or report batch item failure.
Transport publish commands receive claimed outbox attempts keyed by outboxMessageId and claimToken. A successful publishWakeups() return must include outcomes[index] for command.attempts[index], with each outcome tagged by WakeupPublishOutcomeType.Published or WakeupPublishOutcomeType.Failed.
Throw TransportUnavailable or operation-level TransportPublishFailed when the adapter cannot produce trustworthy per-attempt outcomes; core records that operation-level failure against every claimed attempt. Do not echo ids or split successful and failed rows into parallel arrays.
Use publishWakeupsCommandSchema when an adapter or adapter test intentionally validates a publish command. Do not use provider serialization or clone heuristics as a substitute for the Runlane command contract.
Transport adapters may expose start() and close() hooks for clients, sockets, or subscriptions. They must not own durable run state even when the transport backend is itself durable.
Lane Packages
A lane package composes storage and transport with createLane() from @runlane/contracts. The helper validates the supplied adapters, reports combined capabilities, and uses the standard lifecycle order: start storage before transport, close transport before storage.
createLane() is a composition boundary, not a capability shim. It keeps the original adapter instances, copies storage.capabilities and transport.capabilities into the lane capabilities, and adds only lane-level metadata and lifecycle ordering. It does not make an adapter durable, add operator reads, or emulate unsupported storage/transport behavior.
| Option | Required | Default | Meaning |
|---|---|---|---|
storage | Yes | None | Complete StorageAdapter instance used as the lane's durable truth boundary. |
transport | Yes | None | Complete TransportAdapter instance used to publish wakeups. |
name | No | "lane" | Human-readable lane name for diagnostics and operator surfaces. |
operatorReads | No | true | Whether the composed lane exposes operator-facing reads through storage. Set this honestly for the packaged lane. |
productionDurable | No | false | Whether the composed lane is safe as a production persistence and delivery boundary. Set true only when both adapters actually provide that guarantee. |
Malformed options or incomplete adapters fail fast with RunlaneError and ErrorCode.ConfigurationInvalid. Unsupported option names are rejected instead of ignored so lane packages do not grow accidental shadow configuration.
import { createLane, type Lane, type StorageAdapter, type TransportAdapter } from '@runlane/contracts'
export interface MyLaneOptions {
readonly name?: string
readonly operatorReads?: boolean
readonly productionDurable?: boolean
readonly storage: StorageAdapter
readonly transport: TransportAdapter
}
export function myLane(options: MyLaneOptions): Lane {
return createLane({
name: options.name ?? 'my-lane',
operatorReads: options.operatorReads ?? options.storage.capabilities.readsRunHistory,
productionDurable: options.productionDurable ?? false,
storage: options.storage,
transport: options.transport,
})
}Errors
Adapters throw RunlaneError with stable ErrorCode values. Raw driver errors can be attached as cause for logs, but public callers should branch on error.code.
Use the narrowest code that describes the boundary that failed:
| Code | Use |
|---|---|
CapabilityUnsupported | A method is not supported by the adapter's reported capabilities. |
ValidationFailed | Public query input is invalid, such as malformed cursors, non-positive limits, or sequence sorting without a runId filter. |
StorageConflict | Optimistic concurrency, idempotency ownership, singleton ownership, lease ownership, outbox claim ownership, or schedule occurrence ownership is stale. |
AdapterContractViolation | The caller supplied an internally inconsistent command, or the adapter returned data that violates its contract. |
RunNotFound / ScheduleNotFound | A command requires an existing record that is missing. |
StorageUnavailable / TransportUnavailable | A backend outage prevents the operation. Attach the raw driver failure as cause for server-side logs. |
TransportPublishFailed | The provider rejected a publish attempt but the transport backend is reachable. Include provider request ids or provider error codes in metadata. |
Create storage conflicts with createStorageConflictError({ kind: StorageConflictKind.* }). Do not hand-roll bare RunlaneError storage conflicts; that drops the storageConflictKind metadata core and tooling use to distinguish expected races from real storage failures.
Do not expose raw driver messages as the public error contract. Callers should be able to handle adapter failures by code, retryability, and structured metadata without parsing provider text.
Conformance
Adapter packages should import the shared conformance suites from @runlane/testing and provide a fresh adapter or lane resource per test. Vitest packages can use @runlane/testing/vitest; other runners can import defineStorageConformanceSuite(), defineTransportConformanceSuite(), or defineLaneCompositionConformanceSuite() from @runlane/testing and pass the resulting suite to runConformanceSuite() with a runner object that supplies describe and test.
Conformance covers one primitive boundary at a time: storage conformance for durable run truth, transport conformance for wakeup publishing, and lane composition conformance for wiring compatible adapters into a Lane. Keep backend-specific tests beside the adapter for migrations, SQLSTATE or SDK error mapping, connection lifecycle, and operational behavior the generic contract cannot observe.