Job Queue
topics gives you two ways to run a durable job queue, and they share the same persistent log underneath. The cursor queue (Bull-style) is a durable unbounded topic where each worker owns its own cursor and acks by advancing it — simple, stateless on the server, perfect for a known partition of workers. The lease queue (type: "queue") adds claim/ack/nack/extend with per-job visibility-timeout leases, redelivery, and dead-lettering — what you reach for when many workers compete for one pool of jobs and you need stalled-job recovery.
Both are at-least-once. Both keep jobs durable across a crash. The difference is who tracks delivery: in the cursor model the client owns its position; in the lease model the server leases jobs and reclaims them on timeout.
Pick the cursor queue when workers can be statically partitioned (one cursor per
worker, or per shard) and you want zero server-side per-job state. Pick the lease
queue when an arbitrary fleet competes for a shared backlog and you need per-job acks,
redelivery on stall, and dead-lettering. You can start with the cursor queue and migrate
later — the lease queue is the same topic with type: "queue" set.
Model A — Bull-style cursor queue
A durable, unbounded topic. Producers append jobs; each worker reads from its own from_seq, processes the batch, then persists next_from_seq as its ack. The server keeps no per-consumer state on this path — the cursor is the ack.
Create the queue topic
Durable so an acked job survives a crash; unbounded (cap_records: 0) so nothing is ever lost to eviction — replay is just reading from an earlier from_seq.
Examples use $TOPICS as the base URL — see the Quickstart.
curl -X PUT $TOPICS/v0/topics/transcode \
-H 'content-type: application/json' \
-d '{ "durable": true, "cap_records": 0, "ttl_ms": 0 }'# →
{ "topic": "transcode", "created": true,
"config": { "ttl_ms": 0, "cap_records": 0, "cap_bytes": 0, "discard": "old",
"durable": true, "durability": "fsync", "auto_create": true,
"idempotency_window_ms": 120000, "dedupe_node": true },
"performance": { "server_total_ms": 0.22 } }durable: true resolves to the fsync durability class: an
acked write is fsync-gated and recovered by WAL replay on restart. durable: false
(class disk) is faster but can lose the un-fsynced tail on a crash — fine for jobs you
can regenerate, wrong for jobs you cannot.
Produce jobs
A producer appends with a normal write. Tag each job so you can cancel it later. Set node only if a producer also consumes (so loop-prevention filters its own jobs back out).
curl -X POST $TOPICS/v0/topics/transcode \
-H 'content-type: application/json' \
-d '{ "records": [
{ "data": { "src": "s3://uploads/clip-9001.mov", "preset": "1080p" }, "tag": "tenant42:clip-9001" },
{ "data": { "src": "s3://uploads/clip-9002.mov", "preset": "720p" }, "tag": "tenant42:clip-9002" }
] }'# →
{ "topic": "transcode", "first_seq": 1, "last_seq": 2, "seqs": [1, 2],
"head_seq": 2, "count": 2, "created": false, "deduped": false,
"performance": { "server_total_ms": 0.62, "fsync_ms": 0.39 } }For safe retries, send an idempotency_key on the write. A retried POST with the same
key within idempotency_window_ms (default 120000) returns the original seqs with
"deduped": true and appends nothing — so a producer that times out and retries does not
double-enqueue. See Idempotency.
Consume: each worker owns a cursor
Each worker reads from its stored from_seq, processes the batch, then persists next_from_seq. Pass the worker’s own node so it never receives jobs it produced.
curl -X POST $TOPICS/v0/topics/transcode/diff \
-H 'content-type: application/json' \
-d '{ "from_seq": 0, "limit": 50, "node": "transcode-1" }'# →
{ "topic": "transcode",
"records": [
{ "$seq": 1, "$ts": 1748470000123, "data": { "src": "s3://uploads/clip-9001.mov", "preset": "1080p" } },
{ "$seq": 2, "$ts": 1748470000140, "data": { "src": "s3://uploads/clip-9002.mov", "preset": "720p" } }
],
"next_from_seq": 2, "head_seq": 2, "earliest_seq": 1,
"caught_up": true, "tombstone": null, "lag": 0,
"performance": { "server_total_ms": 0.30 } }The loop is: read → process the batch → persist next_from_seq → repeat. Persisting next_from_seq past seq N acks 1..N at once (cursor-advance = ack-all). Because the server holds no consumer state, where you persist the cursor is your choice — a row in your DB, a key in another topic, a file. A crash mid-batch just re-reads from the last persisted cursor; jobs are reprocessed, so workers must be idempotent.
Use caught_up, not records.length, as the “no more work right now” signal.
Node-filtered, deleted, and TTL-expired records are omitted from records but still
advance next_from_seq, so a batch can legitimately return zero records while the cursor
moves forward. caught_up == true means next_from_seq == head_seq. See
Ordering & Cursors.
To wake on new work instead of polling, set wait_ms on the diff (long-poll, clamped to 30000 ms) or — for true streaming — open an SSE watch on transcode and advance your cursor from the frames.
Cancel a job (tag-delete)
Cancellation is a permanent delete by tag. It is silent (never a tombstone), effective immediately for every reader, and point-in-time — a job enqueued a moment later with the same tag is not affected.
One job
# Cancel one job by exact tag match.
curl -X POST $TOPICS/v0/topics/transcode/delete \
-H 'content-type: application/json' \
-d '{ "match": ["tag", "Eq", "tenant42:clip-9001"] }'# →
{ "topic": "transcode", "deleted": 1, "earliest_seq": 2, "head_seq": 2, "count": 1,
"performance": { "server_total_ms": 0.12 } }A consumer whose cursor was behind the deleted seq simply advances past it — no tombstone, because a delete is voluntary removal, not capacity loss. To compact already-processed jobs, delete by before_seq after a checkpoint:
curl -X POST $TOPICS/v0/topics/transcode/delete -d '{ "before_seq": 480000 }'When the cursor queue fits
- Statically partitioned workers — one cursor per worker, or per shard key, so two workers never claim the same job. (Two workers sharing one cursor would each process every job — there is no per-job lease to prevent it.)
- Replay is free — durable + unbounded means a worker can reset to any earlier
from_seqand reprocess. No special “replay” mode. - Zero server state — nothing to fsync per ack beyond the records themselves; the throughput ceiling is the topic’s write/read path, not a lease table.
If instead you have an arbitrary fleet competing for one backlog and need stalled-job recovery, use the lease queue.
Model B — lease queue (type: "queue")
Set type: "queue" and the topic layers a lease lifecycle on the same log: workers claim jobs (leased with a visibility timeout), ack to complete (ack is the permanent delete), nack to release early, and extend to keep a long job alive. A lease that expires unacked makes the job claimable again — redelivery with no per-job timers. After too many redeliveries a job is dead-lettered.
Internally a queue is two logs: the jobs log (the topic you produce to, durable per durable) and a leases log of lifecycle events. The live who-holds-what is a projection of the leases log, rebuilt on restart. See Queues and DESIGN §10 for the full model.
Create the queue
discard: "reject" so a full durable queue refuses new jobs loudly instead of dropping unconsumed work. dead_letter + max_deliveries send poison jobs aside after N failed deliveries.
curl -X PUT $TOPICS/v0/topics/transcode \
-H 'content-type: application/json' \
-d '{ "type": "queue", "durable": true, "discard": "reject",
"lease_ms": 30000, "claim_jitter_ms": 0,
"max_deliveries": 5, "dead_letter": "transcode.dlq" }'# →
{ "topic": "transcode", "created": true,
"config": { "type": "queue", "durable": true, "durability": "fsync",
"discard": "reject", "lease_ms": 30000, "claim_jitter_ms": 0,
"max_deliveries": 5, "dead_letter": "transcode.dlq", "leases_durable": false,
"...": "..." },
"performance": { "server_total_ms": 0.24 } }type is the one immutable field. A PUT that flips an existing topic between log and
queue returns 409 topic_exists_incompatible. Decide the type at creation.
Produce jobs with the same normal write as Model A (POST /v0/topics/transcode). A non-queue topic rejects every claim/ack/nack/extend/work endpoint with 409 not_a_queue.
Claim → process → ack
Claim jobs
Lease up to max claimable jobs to a worker node. A job is claimable iff it is in the jobs log, not acked, and has no active lease.
curl -X POST $TOPICS/v0/topics/transcode/claim \
-H 'content-type: application/json' \
-d '{ "node": "transcode-1", "max": 8, "lease_ms": 30000 }'# →
{ "topic": "transcode",
"claimed": [
{ "$seq": 1, "lease_id": "lease_7f3a9c", "deadline": 1748470030000,
"$ts": 1748470000123, "$tag": "tenant42:clip-9001", "deliveries": 1,
"data": { "src": "s3://uploads/clip-9001.mov", "preset": "1080p" } }
],
"count": 1, "ready": 0,
"performance": { "server_total_ms": 0.42, "throttle_wait_ms": 0.0 } }count < max (here count: 1) is the reliable “queue near-empty” signal — never an error. Each claim increments the job’s deliveries counter and sets deadline = claim_ts + lease_ms.
Process and ack
Ack the seqs you completed. The ack is the delete — the server records the completion and permanently removes the job from the jobs log. Ack is idempotent: seqs not currently leased to node are silently skipped and reported in skipped. To fence a stale worker, optionally echo the claim’s lease_id tokens in a lease_ids array (validate-when-supplied — a superseded token’s seq is skipped); omit it for the plain node + seqs match.
curl -X POST $TOPICS/v0/topics/transcode/ack \
-H 'content-type: application/json' \
-d '{ "node": "transcode-1", "seqs": [1] }'# →
{ "topic": "transcode", "acked": 1, "skipped": [], "ready": 0, "in_flight": 0,
"performance": { "server_total_ms": 0.30, "fsync_ms": 0.21 } }Ack durability equals the topic’s durable: on an fsync queue the delete is fsynced before the ack returns (fsync_ms > 0), so an acked-and-deleted job stays gone across a crash.
Release, retry, and extend
# Nack: release a job back to the queue, optionally delayed (backoff).
curl -X POST $TOPICS/v0/topics/transcode/nack \
-d '{ "node": "transcode-1", "seqs": [2], "delay_ms": 5000 }'
# Extend: heartbeat a long-running job to push its deadline out.
curl -X POST $TOPICS/v0/topics/transcode/extend \
-d '{ "node": "transcode-1", "seqs": [3], "lease_ms": 30000 }'A nack drops the lease and makes the job claimable again at now + delay_ms — semantically identical to letting the lease expire, just sooner. An extend sets a new deadline = now + lease_ms (it sets, not adds — “I need this much more time from now”) and does not touch the delivery counter. A seq whose lease already expired and was reclaimed cannot be extended; it is skipped and the worker should re-claim.
Visibility timeout & redelivery
A lease carries an absolute deadline. Once now > deadline the job is claimable again — the visibility timeout. There are no per-job timers: expired-lease seqs land on a reclaim freelist that the next claim pass drains first, so reclaimed work jumps ahead of never-delivered work. Each reclaim increments the delivery counter.
The lease queue is at-least-once. A slow-but-alive worker whose lease expires mid-job
has that job reclaimed and possibly run by another worker; if the slow worker then acks
past its deadline, the ack is skipped but the job already ran twice. Consumers must be
idempotent (dedupe on $seq or a key in data/meta). Per-job FIFO is not guaranteed
across parallel workers — a single max:1 worker sees near-FIFO; a fleet does not.
Because the leases log defaults non-durable (leases_durable: false), losing it on a crash is self-healing: on restart any job with no replayed active lease is immediately claimable — exactly as if every lease had expired. No job is lost (the jobs log is durable per config); at worst an in-flight job is redelivered, which at-least-once already permits.
Dead-lettering
Each job carries a delivery counter incremented on every claim (including reclaims). When a job would be delivered for the (max_deliveries + 1)-th time and dead_letter is non-null, the server moves it to the dead-letter topic — appending the record there (stamping meta.$dead_letter_from / $dead_letter_deliveries / $dead_letter_src_seq) and deleting it from the jobs log — instead of redelivering. With max_deliveries: 0 or dead_letter: null, jobs are reclaimed forever.
The dead-letter topic is an ordinary topic, so inspect poison jobs with a normal read:
curl -X POST $TOPICS/v0/topics/transcode.dlq/diff -d '{ "from_seq": 0, "include_meta": true }'To re-drive, read the dead-letter topic and re-produce the jobs into the source queue.
PUSH mode — auto-claim over SSE
Instead of a claim→ack→claim poll loop, open the /work SSE stream. The server keeps up to max jobs leased-and-pushed to that one connection, claiming replacements as you ack, applying backpressure at max in-flight.
curl -N "$TOPICS/v0/topics/transcode/work?node=transcode-1&max=8" \
-H 'accept: text/event-stream'retry: 2000
id: 1
event: job
data: {"topic":"transcode","$seq":1,"lease_id":"lease_7f3a9c","deadline":1748470030000,"$ts":1748470000123,"$tag":"tenant42:clip-9001","deliveries":1,"data":{"src":"s3://uploads/clip-9001.mov","preset":"1080p"}}
: hbThe stream only delivers — you still ack (and may nack/extend) via the separate POST endpoints. The big win: on disconnect the server instantly releases every lease delivered on that connection (recording released events), so a crashed or restarted worker’s jobs are claimable again immediately rather than after lease expiry. Hard crashes where the disconnect isn’t observed are still covered by the visibility timeout.
When the lease queue fits
- An arbitrary fleet competes for one backlog — no static partition; the server hands out non-overlapping jobs.
- Per-job acks + stalled-job recovery — a worker that dies mid-job has only that job redelivered, not the whole tail.
- Dead-lettering — poison jobs are isolated automatically after
max_deliveries. - Push delivery with instant failover —
/workkeeps a worker fed and reclaims its leases the instant it disconnects.
Cursor queue vs lease queue
Cursor queue fits when
- Workers are statically partitioned (one cursor each / per shard).
- You want zero server-side per-job state and the simplest possible loop.
- Free replay matters — reset
from_seqand reprocess. - Ordered processing within a partition (a single cursor reads in seq order).
Lease queue fits when
- An arbitrary fleet competes for one shared backlog.
- You need per-job acks and redelivery on stall (visibility timeout).
- You want dead-lettering of poison jobs after N attempts.
- You want push delivery (
/work) with instant lease release on disconnect.
Both are durable, both are at-least-once, both demand idempotent consumers. The lease queue costs a little more per job (the lease lifecycle) in exchange for shared-pool semantics and stalled-job recovery.
See also
- Queues API — full claim/ack/nack/extend/work reference, field tables, and errors.
- Reading — the
/diffcursor model used by the Bull-style queue. - Deletion — tag/seq deletes for cancellation and compaction.
- Durable Streams & Replay — the
discard:"reject"no-loss configuration in depth. - Idempotency — safe-retry writes and why consumers must be idempotent.