How to Build a Reliable Queue in Redis
The simplest Redis queue uses LPUSH to add jobs and RPOP to fetch them. This works until a worker crashes after popping a job but before finishing it. The job vanishes, lost forever. For background tasks that matter, you need a queue where jobs survive worker failures.
A reliable queue keeps jobs visible until a worker explicitly acknowledges completion. If a worker crashes, the job remains in a "processing" state where a recovery process can find it and return it to the queue. This pattern gives you at-least-once delivery without the complexity of Redis Streams.
Which Redis data types we will use
List is used in two ways in this implementation:
- As the pending queue where new jobs wait to be processed. Producers push jobs with
LPUSH, and workers claim them from the other end. - As the processing list where jobs live while being worked on. The
LMOVEcommand atomically transfers a job from pending to processing in one step.
Hash stores metadata for each job, including the timestamp when processing started, which worker claimed it, how many times it has been attempted, and any error messages from failed attempts. This metadata enables retry logic and helps identify stuck jobs.
Sorted Set tracks when each job entered the processing state. The score is the claim timestamp, letting you efficiently query for jobs that have been processing longer than a timeout threshold. This powers the recovery process that rescues stuck jobs.
The basic pattern and why jobs get lost
The naive queue pattern pushes jobs to a list and pops them for processing. A producer calls LPUSH to add a job, and a worker calls BRPOP to wait for and retrieve a job. This is simple and fast, but the moment BRPOP returns, the job exists only in the worker's memory.
If the worker crashes, loses network connectivity, or gets killed before completing the job, that job disappears. There is no record that it ever existed. For jobs like sending emails, processing payments, or updating critical data, this is unacceptable.
- Redis
- Python
- TypeScript
- Go
# Producer adds a job
LPUSH jobs:pending '{"id":"job_123","task":"send_email"}'
> 1
# Worker fetches a job (blocks until one is available)
BRPOP jobs:pending 30
> ["jobs:pending", '{"id":"job_123","task":"send_email"}']
# If the worker crashes here, the job is gone forever
def enqueue_job(client, job: dict):
"""Add a job to the queue."""
client.lpush('jobs:pending', json.dumps(job))
def process_jobs_unsafe(client):
"""Process jobs with the naive pattern. Jobs can be lost!"""
while True:
result = client.brpop('jobs:pending', timeout=30)
if result:
_, job_data = result
job = json.loads(job_data)
process(job) # If we crash here, job is lost
async function enqueueJob(client: Redis, job: object) {
await client.lpush('jobs:pending', JSON.stringify(job));
}
async function processJobsUnsafe(client: Redis) {
while (true) {
const result = await client.brpop('jobs:pending', 30);
if (result) {
const job = JSON.parse(result[1]);
await process(job); // If we crash here, job is lost
}
}
}
func enqueueJob(ctx context.Context, client *redis.Client, job map[string]interface{}) {
data, _ := json.Marshal(job)
client.LPush(ctx, "jobs:pending", data)
}
func processJobsUnsafe(ctx context.Context, client *redis.Client) {
for {
result, err := client.BRPop(ctx, 30*time.Second, "jobs:pending").Result()
if err != nil {
continue
}
var job map[string]interface{}
json.Unmarshal([]byte(result[1]), &job)
process(job) // If we crash here, job is lost
}
}
Claiming jobs atomically
The fix is to never let a job exist in only one place. Instead of popping a job and hoping you finish it, atomically move it from the pending queue to a processing list. The LMOVE command does this in one step: it removes an element from one list and pushes it to another atomically.
After LMOVE, the job is in the processing list. If the worker crashes, the job is still there. A separate recovery process can find it and move it back to pending. The job is always either pending, processing, or done. It never disappears.
Use BLMOVE for the blocking variant that waits when the pending queue is empty. This is more efficient than polling with LMOVE in a tight loop.
- Redis
- Python
- TypeScript
- Go
# Worker atomically moves job from pending to processing
BLMOVE jobs:pending jobs:processing RIGHT LEFT 30
> '{"id":"job_123","task":"send_email"}'
# Job is now in the processing list, not pending
LRANGE jobs:processing 0 -1
> ['{"id":"job_123","task":"send_email"}']
# Record when we started processing (for timeout detection)
ZADD jobs:processing:times 1699900060 "job_123"
> 1
import time
def claim_job(client) -> dict | None:
"""Claim a job atomically, moving it to the processing list."""
job_data = client.blmove('jobs:pending', 'jobs:processing', 30, 'RIGHT', 'LEFT')
if not job_data:
return None
job = json.loads(job_data)
# Track when we started processing this job
client.zadd('jobs:processing:times', {job['id']: time.time()})
return job
async function claimJob(client: Redis): Promise<object | null> {
const jobData = await client.blmove('jobs:pending', 'jobs:processing', 'RIGHT', 'LEFT', 30);
if (!jobData) return null;
const job = JSON.parse(jobData);
// Track when we started processing this job
await client.zadd('jobs:processing:times', Date.now() / 1000, job.id);
return job;
}
func claimJob(ctx context.Context, client *redis.Client) (map[string]interface{}, error) {
jobData, err := client.BLMove(ctx, "jobs:pending", "jobs:processing", "RIGHT", "LEFT", 30*time.Second).Result()
if err != nil {
return nil, err
}
var job map[string]interface{}
json.Unmarshal([]byte(jobData), &job)
// Track when we started processing this job
client.ZAdd(ctx, "jobs:processing:times", redis.Z{
Score: float64(time.Now().Unix()),
Member: job["id"],
})
return job, nil
}
Acknowledging completed jobs
When a worker finishes a job successfully, it must acknowledge completion by removing the job from the processing list. This is the ACK operation. Use LREM to remove the specific job from the processing list, and clean up the timestamp tracking.
After acknowledgment, the job is completely gone from the queue system. If you need to keep a record of completed jobs, add them to a separate completed list or log them to your database before acknowledging.
- Redis
- Python
- TypeScript
- Go
# Job completed successfully, remove from processing
LREM jobs:processing 1 '{"id":"job_123","task":"send_email"}'
> 1 (removed)
# Clean up the timestamp tracking
ZREM jobs:processing:times "job_123"
> 1
def ack_job(client, job: dict, job_data: str):
"""Acknowledge successful job completion."""
# Remove from processing list
client.lrem('jobs:processing', 1, job_data)
# Clean up timestamp tracking
client.zrem('jobs:processing:times', job['id'])
def process_jobs(client):
"""Process jobs reliably with acknowledgment."""
while True:
job_data = client.blmove('jobs:pending', 'jobs:processing', 30, 'RIGHT', 'LEFT')
if not job_data:
continue
job = json.loads(job_data)
client.zadd('jobs:processing:times', {job['id']: time.time()})
try:
process(job)
ack_job(client, job, job_data) # Success!
except Exception as e:
nack_job(client, job, job_data, str(e)) # Failed, handle retry
async function ackJob(client: Redis, job: any, jobData: string) {
// Remove from processing list
await client.lrem('jobs:processing', 1, jobData);
// Clean up timestamp tracking
await client.zrem('jobs:processing:times', job.id);
}
async function processJobs(client: Redis) {
while (true) {
const jobData = await client.blmove('jobs:pending', 'jobs:processing', 'RIGHT', 'LEFT', 30);
if (!jobData) continue;
const job = JSON.parse(jobData);
await client.zadd('jobs:processing:times', Date.now() / 1000, job.id);
try {
await process(job);
await ackJob(client, job, jobData); // Success!
} catch (err) {
await nackJob(client, job, jobData, String(err)); // Failed, handle retry
}
}
}
func ackJob(ctx context.Context, client *redis.Client, job map[string]interface{}, jobData string) {
// Remove from processing list
client.LRem(ctx, "jobs:processing", 1, jobData)
// Clean up timestamp tracking
client.ZRem(ctx, "jobs:processing:times", job["id"])
}
func processJobs(ctx context.Context, client *redis.Client) {
for {
jobData, err := client.BLMove(ctx, "jobs:pending", "jobs:processing", "RIGHT", "LEFT", 30*time.Second).Result()
if err != nil {
continue
}
var job map[string]interface{}
json.Unmarshal([]byte(jobData), &job)
client.ZAdd(ctx, "jobs:processing:times", redis.Z{Score: float64(time.Now().Unix()), Member: job["id"]})
if err := process(job); err != nil {
nackJob(ctx, client, job, jobData, err.Error()) // Failed
} else {
ackJob(ctx, client, job, jobData) // Success
}
}
}
Returning failed jobs to the queue
When processing fails, you have choices: retry immediately, retry with a delay, or give up after too many attempts. The NACK operation removes the job from processing and either returns it to the pending queue for retry or moves it to a dead letter queue.
Track retry attempts in a hash. Each time a job fails, increment its attempt counter. If the counter exceeds your maximum, move the job to a dead letter list for manual inspection instead of retrying forever.
- Redis
- Python
- TypeScript
- Go
# Job failed, check how many attempts so far
HINCRBY jobs:attempts job_123 1
> 2 (second attempt)
# Under max retries? Return to pending queue
LREM jobs:processing 1 '{"id":"job_123","task":"send_email"}'
RPUSH jobs:pending '{"id":"job_123","task":"send_email"}'
ZREM jobs:processing:times "job_123"
# Over max retries? Move to dead letter queue
LREM jobs:processing 1 '{"id":"job_123","task":"send_email"}'
RPUSH jobs:dead '{"id":"job_123","task":"send_email"}'
ZREM jobs:processing:times "job_123"
MAX_RETRIES = 3
def nack_job(client, job: dict, job_data: str, error: str):
"""Handle a failed job with retry logic."""
job_id = job['id']
# Increment attempt counter
attempts = client.hincrby('jobs:attempts', job_id, 1)
# Remove from processing
client.lrem('jobs:processing', 1, job_data)
client.zrem('jobs:processing:times', job_id)
if attempts >= MAX_RETRIES:
# Too many failures, move to dead letter queue
client.rpush('jobs:dead', job_data)
client.hset('jobs:errors', job_id, error)
else:
# Retry: put back in pending queue
client.rpush('jobs:pending', job_data)
const MAX_RETRIES = 3;
async function nackJob(client: Redis, job: any, jobData: string, error: string) {
const jobId = job.id;
// Increment attempt counter
const attempts = await client.hincrby('jobs:attempts', jobId, 1);
// Remove from processing
await client.lrem('jobs:processing', 1, jobData);
await client.zrem('jobs:processing:times', jobId);
if (attempts >= MAX_RETRIES) {
// Too many failures, move to dead letter queue
await client.rpush('jobs:dead', jobData);
await client.hset('jobs:errors', jobId, error);
} else {
// Retry: put back in pending queue
await client.rpush('jobs:pending', jobData);
}
}
const maxRetries = 3
func nackJob(ctx context.Context, client *redis.Client, job map[string]interface{}, jobData, errMsg string) {
jobID := job["id"].(string)
// Increment attempt counter
attempts, _ := client.HIncrBy(ctx, "jobs:attempts", jobID, 1).Result()
// Remove from processing
client.LRem(ctx, "jobs:processing", 1, jobData)
client.ZRem(ctx, "jobs:processing:times", jobID)
if attempts >= maxRetries {
// Too many failures, move to dead letter queue
client.RPush(ctx, "jobs:dead", jobData)
client.HSet(ctx, "jobs:errors", jobID, errMsg)
} else {
// Retry: put back in pending queue
client.RPush(ctx, "jobs:pending", jobData)
}
}
Recovering jobs from crashed workers
Workers can crash without calling ACK or NACK. A background recovery process handles this by finding jobs that have been in the processing state too long. Query the sorted set for jobs with timestamps older than your timeout threshold, then move them back to the pending queue.
Run this recovery process periodically, perhaps every minute. It catches jobs abandoned by crashed workers and returns them to the queue for another attempt. The retry counter ensures jobs do not retry forever even if workers keep crashing.
- Redis
- Python
- TypeScript
- Go
# Find jobs processing for more than 5 minutes (300 seconds)
# Current time is 1699900360, so cutoff is 1699900060
ZRANGEBYSCORE jobs:processing:times -inf 1699900060
> ["job_123", "job_456"]
# For each stuck job, find it in the processing list and requeue
# This requires knowing the full job data, so store job_id -> job_data mapping
# or scan the processing list
PROCESSING_TIMEOUT = 300 # 5 minutes
def recover_stuck_jobs(client):
"""Find and requeue jobs stuck in processing."""
cutoff = time.time() - PROCESSING_TIMEOUT
# Find jobs that have been processing too long
stuck_job_ids = client.zrangebyscore('jobs:processing:times', '-inf', cutoff)
for job_id in stuck_job_ids:
# Find the job in the processing list
processing_jobs = client.lrange('jobs:processing', 0, -1)
for job_data in processing_jobs:
job = json.loads(job_data)
if job['id'] == job_id:
# Treat as a failure and use nack logic
nack_job(client, job, job_data, 'Processing timeout')
break
const PROCESSING_TIMEOUT = 300; // 5 minutes
async function recoverStuckJobs(client: Redis) {
const cutoff = Date.now() / 1000 - PROCESSING_TIMEOUT;
// Find jobs that have been processing too long
const stuckJobIds = await client.zrangebyscore('jobs:processing:times', '-inf', cutoff);
for (const jobId of stuckJobIds) {
// Find the job in the processing list
const processingJobs = await client.lrange('jobs:processing', 0, -1);
for (const jobData of processingJobs) {
const job = JSON.parse(jobData);
if (job.id === jobId) {
// Treat as a failure and use nack logic
await nackJob(client, job, jobData, 'Processing timeout');
break;
}
}
}
}
const processingTimeout = 5 * time.Minute
func recoverStuckJobs(ctx context.Context, client *redis.Client) {
cutoff := float64(time.Now().Add(-processingTimeout).Unix())
// Find jobs that have been processing too long
stuckJobIDs, _ := client.ZRangeByScore(ctx, "jobs:processing:times", &redis.ZRangeBy{
Min: "-inf",
Max: fmt.Sprintf("%f", cutoff),
}).Result()
for _, jobID := range stuckJobIDs {
// Find the job in the processing list
processingJobs, _ := client.LRange(ctx, "jobs:processing", 0, -1).Result()
for _, jobData := range processingJobs {
var job map[string]interface{}
json.Unmarshal([]byte(jobData), &job)
if job["id"] == jobID {
// Treat as a failure and use nack logic
nackJob(ctx, client, job, jobData, "Processing timeout")
break
}
}
}
}
When to use this pattern
This reliable queue pattern works well for moderate workloads where you need job persistence without external dependencies. It handles worker crashes gracefully, supports retries with backoff, and provides visibility into stuck jobs. The implementation uses only basic Redis commands and is easy to understand and debug.
For high-throughput scenarios or when you need consumer groups, message IDs, and built-in acknowledgment tracking, consider Redis Streams. Streams provide these features natively with better performance for complex use cases. But for many applications, this simple list-based pattern provides enough reliability without the added complexity.