How to Process Jobs Exactly Once in Redis
Processing a job more than once can cause real problems. Charging a customer twice, sending duplicate emails, or creating duplicate records all erode trust and create support tickets. The challenge is that distributed systems fail in messy ways: workers crash mid-job, networks partition, and processes restart. Ensuring a job runs exactly once requires careful coordination.
Redis provides the primitives needed to build exactly-once job processing. The core insight is that "exactly once" is really "at least once delivery" plus "idempotent processing." You cannot prevent a job from being delivered multiple times in a distributed system, but you can ensure that processing it multiple times has the same effect as processing it once. This guide covers three approaches: deduplication with unique job IDs to reject duplicates at submission time, atomic claim and process to prevent concurrent execution, and idempotency keys to make repeated processing safe.
Which Redis data types we will use
Set tracks which job IDs have been seen or processed. The SADD command returns whether the member was newly added, giving you an atomic check-and-add operation. If SADD returns 0, the job ID already exists and you can skip it. Sets provide O(1) membership checks and naturally deduplicate.
String is used in two ways in this implementation:
- As a lock to claim exclusive ownership of a job. The
SETcommand withNX(only if not exists) atomically creates a claim that other workers will see. - As an idempotency record to store the result of a completed operation. Before processing, check if a result already exists. If so, return the cached result instead of reprocessing.
Hash stores job metadata and processing state. Fields can track the current status, the worker that claimed it, timestamps, and the final result. Hashes let you update individual fields atomically without rewriting the entire job record.
List serves as the job queue itself. Workers pop jobs from the list for processing. Redis lists support atomic pop operations that remove and return an item in one step, preventing two workers from receiving the same job.
Preventing duplicate jobs with unique IDs
The first layer of defense is rejecting duplicate jobs before they enter the queue. Every job gets a unique ID, and you track which IDs you have seen. When a new job arrives, check if its ID exists in the seen set. If it does, reject the duplicate. If not, add the ID and enqueue the job.
This approach catches duplicates caused by client retries, network issues that cause resubmission, or application bugs that submit the same job twice. The deduplication window depends on how long you keep IDs in the set. For jobs that should never repeat, keep IDs forever. For jobs where duplicates only matter within a time window, expire old IDs periodically.
The tradeoff is memory usage. Every job ID consumes space in the set. For high-volume systems, you may need to expire old IDs or use a probabilistic data structure like a Bloom filter. The examples below use a set with expiration on individual tracking keys for bounded memory.
- Redis
- Python
- TypeScript
- Go
# Check if job ID already exists and add it atomically
# Returns 1 if added (new job), 0 if already existed (duplicate)
SADD jobs:seen job-abc-123
> 1 (new job, proceed to enqueue)
# Another attempt to submit the same job
SADD jobs:seen job-abc-123
> 0 (duplicate, reject it)
# Only enqueue if SADD returned 1
RPUSH jobs:pending job-abc-123
> 1
# For time-bounded deduplication, use a string with expiration instead
SET jobs:seen:job-xyz-456 1 NX EX 3600
> OK (new job, expires in 1 hour)
SET jobs:seen:job-xyz-456 1 NX EX 3600
> (nil) (duplicate within the hour)
def submit_job(client, job_id: str, job_data: str) -> bool:
"""Submit a job, rejecting duplicates."""
# SADD returns 1 if the member was added, 0 if it already existed
is_new = client.sadd('jobs:seen', job_id)
if not is_new:
return False # Duplicate job, reject it
# Job is new, add to the queue
client.rpush('jobs:pending', job_data)
return True
def submit_job_with_expiry(client, job_id: str, job_data: str, dedup_seconds: int = 3600) -> bool:
"""Submit a job with time-bounded deduplication."""
# SET NX returns True only if the key was created
is_new = client.set(f'jobs:seen:{job_id}', '1', nx=True, ex=dedup_seconds)
if not is_new:
return False # Duplicate within the deduplication window
client.rpush('jobs:pending', job_data)
return True
async function submitJob(client: Redis, jobId: string, jobData: string): Promise<boolean> {
// SADD returns 1 if the member was added, 0 if it already existed
const isNew = await client.sadd('jobs:seen', jobId);
if (isNew === 0) {
return false; // Duplicate job, reject it
}
// Job is new, add to the queue
await client.rpush('jobs:pending', jobData);
return true;
}
async function submitJobWithExpiry(
client: Redis, jobId: string, jobData: string, dedupSeconds: number = 3600
): Promise<boolean> {
// SET NX returns OK only if the key was created
const result = await client.set(`jobs:seen:${jobId}`, '1', 'NX', 'EX', dedupSeconds);
if (!result) {
return false; // Duplicate within the deduplication window
}
await client.rpush('jobs:pending', jobData);
return true;
}
func submitJob(ctx context.Context, client *redis.Client, jobID, jobData string) (bool, error) {
// SADD returns the number of elements added (1 if new, 0 if duplicate)
added, err := client.SAdd(ctx, "jobs:seen", jobID).Result()
if err != nil {
return false, err
}
if added == 0 {
return false, nil // Duplicate job, reject it
}
// Job is new, add to the queue
err = client.RPush(ctx, "jobs:pending", jobData).Err()
return err == nil, err
}
func submitJobWithExpiry(ctx context.Context, client *redis.Client, jobID, jobData string, dedupSeconds int) (bool, error) {
// SET NX returns true only if the key was created
set, err := client.SetNX(ctx, "jobs:seen:"+jobID, "1", time.Duration(dedupSeconds)*time.Second).Result()
if err != nil {
return false, err
}
if !set {
return false, nil // Duplicate within the deduplication window
}
err = client.RPush(ctx, "jobs:pending", jobData).Err()
return err == nil, err
}
Claiming jobs atomically to prevent double processing
Deduplication at submission time does not prevent the same job from being processed twice if a worker crashes mid-job and the job gets requeued. To handle this, workers must atomically claim a job before processing it. If two workers try to claim the same job, only one succeeds.
The pattern uses BRPOPLPUSH (or its newer equivalent BLMOVE) to atomically move a job from the pending queue to a processing queue. This single command removes the job from pending and adds it to processing, so no other worker can grab it. After successful processing, remove the job from the processing queue. If a worker crashes, a recovery process can move stale jobs from processing back to pending.
This approach handles worker crashes gracefully. The processing queue acts as a record of in-flight jobs. A background process periodically checks for jobs that have been in processing too long and moves them back to pending for retry. The job will run again, so your processing logic must be idempotent or you need the idempotency key pattern described in the next section.
- Redis
- Python
- TypeScript
- Go
# Worker atomically moves job from pending to processing
# BLMOVE blocks until a job is available, then moves it atomically
BLMOVE jobs:pending jobs:processing RIGHT LEFT 30
> "job-abc-123"
# The job is now in processing, not pending
# Only this worker has it
# After successful processing, remove from processing queue
LREM jobs:processing 1 "job-abc-123"
> 1 (removed)
# Recovery: find jobs stuck in processing (check timestamps separately)
LRANGE jobs:processing 0 -1
> ["job-xyz-old", "job-abc-stuck"]
# Move stuck job back to pending for retry
LREM jobs:processing 1 "job-xyz-old"
RPUSH jobs:pending "job-xyz-old"
def process_jobs(client):
"""Worker loop that claims and processes jobs atomically."""
while True:
# Atomically move job from pending to processing
# Blocks for up to 30 seconds waiting for a job
job = client.blmove('jobs:pending', 'jobs:processing', 30, 'RIGHT', 'LEFT')
if job is None:
continue # Timeout, no jobs available
try:
# Process the job (your business logic here)
process(job)
# Success: remove from processing queue
client.lrem('jobs:processing', 1, job)
except Exception as e:
# Leave in processing queue for recovery process to handle
# Or move back to pending immediately for retry
client.lrem('jobs:processing', 1, job)
client.rpush('jobs:pending', job)
raise
def recover_stuck_jobs(client, max_processing_seconds: int = 300):
"""Move jobs stuck in processing back to pending."""
# In production, you would track timestamps to identify truly stuck jobs
# This simplified version moves all jobs in processing
stuck_jobs = client.lrange('jobs:processing', 0, -1)
for job in stuck_jobs:
client.lrem('jobs:processing', 1, job)
client.rpush('jobs:pending', job)
async function processJobs(client: Redis) {
while (true) {
// Atomically move job from pending to processing
const job = await client.blmove('jobs:pending', 'jobs:processing', 'RIGHT', 'LEFT', 30);
if (!job) continue; // Timeout, no jobs available
try {
// Process the job (your business logic here)
await process(job);
// Success: remove from processing queue
await client.lrem('jobs:processing', 1, job);
} catch (err) {
// Move back to pending for retry
await client.lrem('jobs:processing', 1, job);
await client.rpush('jobs:pending', job);
throw err;
}
}
}
async function recoverStuckJobs(client: Redis) {
const stuckJobs = await client.lrange('jobs:processing', 0, -1);
for (const job of stuckJobs) {
await client.lrem('jobs:processing', 1, job);
await client.rpush('jobs:pending', job);
}
}
func processJobs(ctx context.Context, client *redis.Client) error {
for {
// Atomically move job from pending to processing
job, err := client.BLMove(ctx, "jobs:pending", "jobs:processing", "RIGHT", "LEFT", 30*time.Second).Result()
if err == redis.Nil {
continue // Timeout, no jobs available
}
if err != nil {
return err
}
if err := process(job); err != nil {
// Move back to pending for retry
client.LRem(ctx, "jobs:processing", 1, job)
client.RPush(ctx, "jobs:pending", job)
continue
}
// Success: remove from processing queue
client.LRem(ctx, "jobs:processing", 1, job)
}
}
func recoverStuckJobs(ctx context.Context, client *redis.Client) {
stuckJobs, _ := client.LRange(ctx, "jobs:processing", 0, -1).Result()
for _, job := range stuckJobs {
client.LRem(ctx, "jobs:processing", 1, job)
client.RPush(ctx, "jobs:pending", job)
}
}
Using idempotency keys for safe retries
Even with atomic claiming, jobs may run multiple times due to crashes, timeouts, or recovery processes. The final layer of defense is making the job processing itself idempotent. Before doing work, check if you have already done it. Store the result of completed work and return the cached result on subsequent attempts.
An idempotency key is a unique identifier for an operation. Before processing, check if a result exists for that key. If it does, return the cached result without reprocessing. If not, do the work and store the result. This pattern is especially important for operations with external side effects like payment processing or sending notifications.
The tradeoff is storage and complexity. You need to store results for as long as retries might occur, and you need to handle the case where a crash happens after the work is done but before the result is stored. For critical operations, wrap the work and the result storage in a transaction or use a two-phase approach where you mark the operation as "in progress" before starting.
- Redis
- Python
- TypeScript
- Go
# Check if this operation was already completed
GET idempotency:payment:order-123
> (nil) (not processed yet)
# Do the work, then store the result
# SET NX ensures only one worker can store the result
SET idempotency:payment:order-123 '{"status":"success","charge_id":"ch_xxx"}' NX EX 86400
> OK (result stored, expires in 24 hours)
# Subsequent attempts find the existing result
GET idempotency:payment:order-123
> '{"status":"success","charge_id":"ch_xxx"}'
# Return cached result, do not reprocess
# Another worker trying to store a result fails
SET idempotency:payment:order-123 '{"status":"success","charge_id":"ch_yyy"}' NX EX 86400
> (nil) (already exists, this is a duplicate)
import json
def process_payment_idempotently(client, order_id: str, amount: int) -> dict:
"""Process a payment exactly once using an idempotency key."""
idempotency_key = f'idempotency:payment:{order_id}'
# Check for existing result
existing = client.get(idempotency_key)
if existing:
return json.loads(existing) # Return cached result
# Do the actual work
result = charge_payment(order_id, amount)
# Store the result atomically
# NX ensures we only store if no result exists yet
stored = client.set(idempotency_key, json.dumps(result), nx=True, ex=86400)
if not stored:
# Another worker completed this while we were processing
# Return their result instead
existing = client.get(idempotency_key)
return json.loads(existing)
return result
def charge_payment(order_id: str, amount: int) -> dict:
"""Your actual payment processing logic."""
# Call payment provider, etc.
return {'status': 'success', 'charge_id': 'ch_xxx', 'amount': amount}
async function processPaymentIdempotently(
client: Redis, orderId: string, amount: number
): Promise<object> {
const idempotencyKey = `idempotency:payment:${orderId}`;
// Check for existing result
const existing = await client.get(idempotencyKey);
if (existing) {
return JSON.parse(existing); // Return cached result
}
// Do the actual work
const result = await chargePayment(orderId, amount);
// Store the result atomically
const stored = await client.set(idempotencyKey, JSON.stringify(result), 'NX', 'EX', 86400);
if (!stored) {
// Another worker completed this while we were processing
const otherResult = await client.get(idempotencyKey);
return JSON.parse(otherResult!);
}
return result;
}
async function chargePayment(orderId: string, amount: number): Promise<object> {
// Your actual payment processing logic
return { status: 'success', chargeId: 'ch_xxx', amount };
}
func processPaymentIdempotently(ctx context.Context, client *redis.Client, orderID string, amount int) (map[string]interface{}, error) {
idempotencyKey := "idempotency:payment:" + orderID
// Check for existing result
existing, err := client.Get(ctx, idempotencyKey).Result()
if err == nil {
var result map[string]interface{}
json.Unmarshal([]byte(existing), &result)
return result, nil // Return cached result
}
// Do the actual work
result := chargePayment(orderID, amount)
// Store the result atomically
resultJSON, _ := json.Marshal(result)
stored, _ := client.SetNX(ctx, idempotencyKey, resultJSON, 24*time.Hour).Result()
if !stored {
// Another worker completed this while we were processing
existing, _ := client.Get(ctx, idempotencyKey).Result()
var otherResult map[string]interface{}
json.Unmarshal([]byte(existing), &otherResult)
return otherResult, nil
}
return result, nil
}
func chargePayment(orderID string, amount int) map[string]interface{} {
return map[string]interface{}{
"status": "success",
"charge_id": "ch_xxx",
"amount": amount,
}
}
Choosing an approach
Use deduplication with unique job IDs when duplicate submissions are your main concern. This is the simplest layer and catches most duplicates at the door. It works well when clients might retry requests or when upstream systems might send the same event multiple times.
Use atomic claiming when you have multiple workers and need to ensure only one processes each job. The BLMOVE pattern provides clean handoff between queues and makes recovery straightforward. Combine this with a background process that monitors for stuck jobs.
Use idempotency keys when your job processing has side effects that cannot be safely repeated. Payment processing, sending emails, and creating external resources all benefit from this pattern. The cached result also improves performance for legitimate retries.
In practice, production systems often use all three approaches together. Deduplicate at submission to reduce load, claim atomically to prevent concurrent processing, and use idempotency keys as the final safety net for operations that must not repeat. Each layer catches failures that slip through the others, giving you true exactly-once semantics even in a distributed system that can fail in countless ways.