Distributed task queue for Go,
backed by Postgres.
Consumers pull tasks concurrently without contention – Postgres hands each worker its own batch and skips rows already claimed by others, so throughput scales with workers. Producers can enqueue inside the same transaction as your business logic, so a job appears if and only if your write commits.
Built for production task queues.
No magic. Every feature maps to one or two columns in asynqpg_tasks or a small SQL
pattern.
Postgres-native
Tasks live in a single table. No Redis, no Kafka, no new datastore to operate.
Transactional enqueue
Pass an *sqlx.Tx to EnqueueTx. The job is committed atomically with your business logic.
Flexible retries
Exponential or constant backoff, snooze without consuming an attempt, skip-retry for permanent errors.
Delayed & scheduled
Run tasks at a future time. WithDelay, or set ProcessAt directly on the task.
Per-type worker pools
Independent concurrency, timeout, and middleware per task type. email:send can have 5 workers, report:generate can have 2.
Leader-elected maintenance
One node holds a lease. Stuck-task rescue and old-task cleanup run there. The other nodes just fetch.
Idempotency tokens
EnqueueMany deduplicates by (type, idempotency_token) at the DB layer, not in code.
OpenTelemetry built in
Counters, histograms, and traces for every enqueue and handler call. SpanKind set correctly for each.
Web dashboard
Embedded React SPA with Overview, Tasks, Workers, and Maintenance pages. Saved views, filter pills, bulk retry/cancel/delete, and a task drawer with payload, attempts, timing, and the raw row.
Command palette & keyboard nav
⌘K jumps to any task, type, or page. j/k walks the table, x selects, Esc closes the drawer – no mouse required.
Pluggable auth
Basic Auth out of the box, or wire up any number of OAuth providers (GitHub ships in-tree). Per-request user context flows into audit logs.
Five minutes from zero to a working task queue.
Three steps: install the module, apply one migration, write a producer and a consumer. No infra to provision.
Install the module
Adds asynqpg to your Go workspace.
Apply the migration
One SQL file. Creates asynqpg_tasks + indexes.
Wire a producer
Pass a *sqlx.DB. Call Enqueue. Done.
Wire a consumer
Register a handler per task type. Start().
$ go get github.com/yakser/asynqpg-- 001_initial.sql · creates the tasks table + indexes
CREATE TYPE asynqpg_task_status AS ENUM (
'pending', 'running', 'completed', 'failed', 'cancelled'
);
CREATE TABLE asynqpg_tasks (
id BIGSERIAL PRIMARY KEY,
type TEXT NOT NULL,
payload BYTEA NOT NULL,
status asynqpg_task_status NOT NULL DEFAULT 'pending',
attempts_left SMALLINT NOT NULL,
blocked_till TIMESTAMPTZ NOT NULL
-- ... 5 indexes, idempotency, lifecycle constraints
) WITH (fillfactor = 90);package main
import (
"context"
"github.com/jmoiron/sqlx"
"github.com/yakser/asynqpg"
"github.com/yakser/asynqpg/producer"
)
func main() {
db := sqlx.MustConnect("postgres", dsn)
p, _ := producer.New(producer.Config{Pool: db})
// Enqueue inside an existing transaction.
tx := db.MustBeginTx(ctx, nil)
_, err := p.EnqueueTx(ctx, tx, asynqpg.NewTask("email:send", payload,
asynqpg.WithMaxRetry(5),
asynqpg.WithDelay(10*time.Second),
asynqpg.WithIdempotencyToken("order-1234"),
))
if err == nil { tx.Commit() }
}package main
import (
"context"
"github.com/yakser/asynqpg"
"github.com/yakser/asynqpg/consumer"
)
func main() {
c, _ := consumer.New(consumer.Config{Pool: db})
c.RegisterTaskHandler("email:send",
consumer.TaskHandlerFunc(func(ctx context.Context, t *asynqpg.TaskInfo) error {
// process task...
return nil
}),
consumer.WithWorkersCount(5),
consumer.WithTimeout(30*time.Second),
)
c.Start()
// graceful shutdown on SIGTERM…
c.Stop()
}EnqueueTx the same tx as your business logic. The queued
job commits if (and only if) your transaction commits. Try doing that with Redis.
A queue is a table.
Five lifecycle states. One row per task. Workers pull jobs concurrently without stepping on each other – Postgres locks the rows it hands out and lets the rest move on, so no two consumers ever grab the same task and idle workers are never blocked by busy ones.
Packages
| Package | Purpose |
|---|---|
| producer/ | Enqueue tasks. Enqueue, EnqueueTx, EnqueueMany, EnqueueManyTx. |
| consumer/ | Fetch and process tasks with configurable per-type worker pools. Middleware. Snooze, skip-retry, custom retry policies. |
| client/ | Inspect and manage tasks. GetTask, ListTasks, CancelTask, RetryTask, DeleteTask. All have *Tx variants. |
| ui/ | HTTP handler serving REST API + embedded React SPA dashboard. Basic Auth or pluggable OAuth providers. |
| internal/leadership/ | Postgres-leased leader election. The leader runs maintenance. |
| internal/maintenance/ | Stuck-task rescuer + finalized-task cleaner. Leader-only. Off by default. |
A dashboard built for operators.
An embedded React SPA. Mount it as an HTTP handler under any prefix, or run the standalone binary. Basic Auth or pluggable OAuth providers, your choice.
Five live KPIs across all task types. Per-type cards show pending/running/done/failed/cancelled counts and link straight into the filtered Tasks list. Auto-refresh every 5s.
Saved views (All · Pending · Running · Failed · Needs retry · Dead-letter), filter pills for type/status/idempotency, full-text search, and j/k keyboard navigation. Bulk-select rows to retry, cancel, or delete.
Side drawer with Payload, Attempts (every error trace + duration), Timing breakdown, Related tasks, and the Raw row. Copy curl, retry, or delete from the footer – no SQL required.
The elected leader, lease TTL, and a live snapshot of every task currently in the running state – with attempt counts and seconds-since-pickup. Click any row to jump to its detail.
Leader-elected background tasks (Rescuer, Cleaner, Leader election) with their lease state, dead-letter count, and the most recent failures – the page operators look at first when something is off.
OpenTelemetry, end to end.
All public components accept optional MeterProvider and TracerProvider. A pre-built Grafana dashboard ships in deploy/.
| Metric | Type | Description |
|---|---|---|
| asynqpg.tasks.enqueued | counter | Tasks enqueued, tagged by task_type. |
| asynqpg.tasks.processed | counter | Tasks finished, tagged by task_type and status. |
| asynqpg.tasks.errors | counter | Processing or enqueue errors. |
| asynqpg.task.duration | histogram | Handler execution latency (seconds). p50/p95/p99 in the Grafana panel. |
| asynqpg.task.enqueue_duration | histogram | Enqueue latency (seconds). |
| asynqpg.tasks.in_flight | gauge | Currently executing tasks. |