How to Build a Scheduled Queue in Redis
Many applications need to process tasks at a specific time in the future rather than immediately. Email reminders that go out 24 hours before an event, retry logic that waits before trying again, subscription renewals that trigger on a billing date. These all require a scheduled queue: a way to store tasks now and process them later when their time comes.
Redis sorted sets are a natural fit for this problem. The score represents when the task should run, and the member contains the task payload. Tasks sort themselves by execution time, and you can efficiently query for everything that is ready to process. This post covers two approaches: a simple polling approach for basic setups, and a reliable atomic approach for production systems with multiple workers.
Which Redis data types we will use
Sorted Set is the core data structure for a scheduled queue. Unlike a regular list or set, a sorted set associates each member with a numeric score and keeps members ordered by that score. For scheduling, the score is the Unix timestamp when the task should run, and the member is the task payload (or task ID). This gives us two key abilities: tasks automatically stay sorted by execution time, and we can efficiently query for "all tasks with a score less than now" to find what is ready to process.
Hash is used optionally for retry tracking. When a task fails, we need to know how many times it has been retried. A hash maps task IDs to retry counts, letting us implement exponential backoff and dead letter queues.
The sorted set handles the scheduling and ordering. When you add a task with ZADD, Redis inserts it in the right position based on its timestamp. When you query with ZRANGEBYSCORE, you get all tasks up to a certain time, oldest first. When you remove with ZREM, the task is gone. This combination of ordered insertion, range queries, and removal is exactly what a scheduled queue needs.
The simple approach: poll and process
The simplest approach uses a sorted set where each task's score is its scheduled Unix timestamp. A worker polls periodically, fetches all tasks with scores less than or equal to the current time, processes them, and removes them from the set. This uses ZRANGEBYSCORE to find ready tasks and ZREM to delete them after processing.
This approach is easy to understand and debug. You can inspect the queue at any time with ZRANGE to see pending tasks, or ZCOUNT to check how many are waiting. The sorted set keeps everything ordered by execution time automatically, so the oldest ready task is always first.
The downside is the gap between fetching and removing. If your worker crashes after ZRANGEBYSCORE but before ZREM, the task stays in the queue and gets processed again on the next poll. For idempotent tasks this is fine. For tasks that cannot run twice, you need the atomic approach described in the next section. There is also a race condition with multiple workers: two workers might both fetch the same task before either removes it.
- Redis
- Python
- TypeScript
- Go
# Add a task scheduled for 60 seconds from now
# Score is the Unix timestamp when it should run
ZADD scheduled:tasks 1699900060 "send-email:user123"
> 1 (task added)
# Add another task for 5 minutes from now
ZADD scheduled:tasks 1699900300 "expire-trial:account456"
> 1 (task added)
# Worker polls: get all tasks ready to run (score <= now)
# Returns tasks ordered by scheduled time, oldest first
ZRANGEBYSCORE scheduled:tasks 0 1699900060
> ["send-email:user123"]
# After processing, remove the task
ZREM scheduled:tasks "send-email:user123"
> 1 (removed successfully)
# Check how many tasks are waiting
ZCARD scheduled:tasks
> 1 (one task remaining)
import time
# Add a task scheduled for 60 seconds from now
scheduled_time = time.time() + 60
client.zadd('scheduled:tasks', {'send-email:user123': scheduled_time})
# Worker loop: poll for ready tasks
now = time.time()
ready_tasks = client.zrangebyscore('scheduled:tasks', 0, now)
for task in ready_tasks:
# Process the task
process(task)
# Remove after processing
client.zrem('scheduled:tasks', task)
// Add a task scheduled for 60 seconds from now
const scheduledTime = Date.now() / 1000 + 60;
await client.zadd('scheduled:tasks', scheduledTime, 'send-email:user123');
// Worker loop: poll for ready tasks
const now = Date.now() / 1000;
const readyTasks = await client.zrangebyscore('scheduled:tasks', 0, now);
for (const task of readyTasks) {
// Process the task
await process(task);
// Remove after processing
await client.zrem('scheduled:tasks', task);
}
// Add a task scheduled for 60 seconds from now
scheduledTime := float64(time.Now().Unix() + 60)
client.ZAdd(ctx, "scheduled:tasks", redis.Z{
Score: scheduledTime,
Member: "send-email:user123",
})
// Worker loop: poll for ready tasks
now := float64(time.Now().Unix())
readyTasks, _ := client.ZRangeByScore(ctx, "scheduled:tasks", &redis.ZRangeBy{
Min: "0",
Max: fmt.Sprintf("%f", now),
}).Result()
for _, task := range readyTasks {
// Process the task
process(task)
// Remove after processing
client.ZRem(ctx, "scheduled:tasks", task)
}
Making it reliable: atomic claim and process
For production systems with multiple workers, you need atomicity both for fetching the task and for moving it to an in-progress state. Raw ZPOPMIN alone is not enough: if your worker pops a task and crashes before recording it anywhere, that task is gone forever.
The solution is a small Lua script that atomically checks if a task is ready, removes it from the scheduled queue, and adds it to an in-progress set in one operation. The in-progress set uses the claim timestamp as the score, so a separate recovery process can find stuck tasks and move them back to the main queue.
- Redis
- Python
- TypeScript
- Go
# Lua script: atomically pop ready task and move to in-progress
# KEYS[1] = scheduled:tasks, KEYS[2] = in-progress:tasks
# ARGV[1] = current timestamp
EVAL "
local task = redis.call('ZRANGEBYSCORE', KEYS[1], 0, ARGV[1], 'LIMIT', 0, 1)
if #task == 0 then return nil end
redis.call('ZREM', KEYS[1], task[1])
redis.call('ZADD', KEYS[2], ARGV[1], task[1])
return task[1]
" 2 scheduled:tasks in-progress:tasks 1699900060
> "send-email:user123"
# After successful processing, remove from in-progress
ZREM in-progress:tasks "send-email:user123"
# Recovery: find tasks stuck for more than 5 minutes
ZRANGEBYSCORE in-progress:tasks 0 1699900065
> ["send-email:user123"]
# Move stuck tasks back to scheduled queue
ZREM in-progress:tasks "send-email:user123"
ZADD scheduled:tasks 1699900365 "send-email:user123"
import time
PROCESSING_TIMEOUT = 300 # 5 minutes
# Lua script: atomically pop ready task and move to in-progress
CLAIM_SCRIPT = """
local task = redis.call('ZRANGEBYSCORE', KEYS[1], 0, ARGV[1], 'LIMIT', 0, 1)
if #task == 0 then return nil end
redis.call('ZREM', KEYS[1], task[1])
redis.call('ZADD', KEYS[2], ARGV[1], task[1])
return task[1]
"""
def process_scheduled_tasks(client):
claim = client.register_script(CLAIM_SCRIPT)
while True:
now = time.time()
task = claim(keys=['scheduled:tasks', 'in-progress:tasks'], args=[now])
if not task:
time.sleep(1)
continue
try:
process(task)
client.zrem('in-progress:tasks', task)
except Exception:
client.zrem('in-progress:tasks', task)
raise
def recover_stuck_tasks(client):
cutoff = time.time() - PROCESSING_TIMEOUT
stuck = client.zrangebyscore('in-progress:tasks', 0, cutoff)
for task in stuck:
client.zrem('in-progress:tasks', task)
client.zadd('scheduled:tasks', {task: time.time()})
const PROCESSING_TIMEOUT = 300; // 5 minutes
// Lua script: atomically pop ready task and move to in-progress
const CLAIM_SCRIPT = `
local task = redis.call('ZRANGEBYSCORE', KEYS[1], 0, ARGV[1], 'LIMIT', 0, 1)
if #task == 0 then return nil end
redis.call('ZREM', KEYS[1], task[1])
redis.call('ZADD', KEYS[2], ARGV[1], task[1])
return task[1]
`;
async function processScheduledTasks(client: Redis) {
while (true) {
const now = Date.now() / 1000;
const task = await client.eval(
CLAIM_SCRIPT, 2, 'scheduled:tasks', 'in-progress:tasks', now
) as string | null;
if (!task) {
await sleep(1000);
continue;
}
try {
await process(task);
await client.zrem('in-progress:tasks', task);
} catch (err) {
await client.zrem('in-progress:tasks', task);
throw err;
}
}
}
async function recoverStuckTasks(client: Redis) {
const cutoff = Date.now() / 1000 - PROCESSING_TIMEOUT;
const stuck = await client.zrangebyscore('in-progress:tasks', 0, cutoff);
for (const task of stuck) {
await client.zrem('in-progress:tasks', task);
await client.zadd('scheduled:tasks', Date.now() / 1000, task);
}
}
const processingTimeout = 5 * time.Minute
// Lua script: atomically pop ready task and move to in-progress
var claimScript = redis.NewScript(`
local task = redis.call('ZRANGEBYSCORE', KEYS[1], 0, ARGV[1], 'LIMIT', 0, 1)
if #task == 0 then return nil end
redis.call('ZREM', KEYS[1], task[1])
redis.call('ZADD', KEYS[2], ARGV[1], task[1])
return task[1]
`)
func processScheduledTasks(ctx context.Context, client *redis.Client) {
for {
now := float64(time.Now().Unix())
result, err := claimScript.Run(ctx, client,
[]string{"scheduled:tasks", "in-progress:tasks"}, now).Result()
if err == redis.Nil || result == nil {
time.Sleep(time.Second)
continue
}
task := result.(string)
if err := process(task); err != nil {
client.ZRem(ctx, "in-progress:tasks", task)
continue
}
client.ZRem(ctx, "in-progress:tasks", task)
}
}
func recoverStuckTasks(ctx context.Context, client *redis.Client) {
cutoff := float64(time.Now().Add(-processingTimeout).Unix())
stuck, _ := client.ZRangeByScore(ctx, "in-progress:tasks", &redis.ZRangeBy{
Min: "0", Max: fmt.Sprintf("%f", cutoff),
}).Result()
for _, task := range stuck {
client.ZRem(ctx, "in-progress:tasks", task)
client.ZAdd(ctx, "scheduled:tasks", redis.Z{
Score: float64(time.Now().Unix()), Member: task,
})
}
}
Handling failures and retries
Tasks can fail for many reasons: network issues, bugs, external service outages. A robust scheduled queue needs a retry strategy. The simplest approach is to re-add failed tasks with a new scheduled time in the future, implementing exponential backoff by increasing the delay on each retry.
Track retry counts by encoding them in the task payload or using a separate hash. After a maximum number of retries, move the task to a dead letter queue for manual inspection. This prevents poison messages from clogging your queue forever.
- Redis
- Python
- TypeScript
- Go
# Task failed, schedule retry in 60 seconds
ZADD scheduled:tasks 1699900120 "send-email:user123:retry=1"
> 1 (rescheduled)
# Track retry count separately if payload cannot change
HINCRBY task:retries "send-email:user123" 1
> 2 (second retry)
# Check if max retries exceeded
HGET task:retries "send-email:user123"
> "5"
# Move to dead letter queue after max retries
ZADD dead-letter:tasks 1699900060 "send-email:user123"
ZREM scheduled:tasks "send-email:user123:retry=5"
HDEL task:retries "send-email:user123"
import time
MAX_RETRIES = 5
def handle_failure(client, task, retry_count):
if retry_count >= MAX_RETRIES:
# Move to dead letter queue
client.zadd('dead-letter:tasks', {task: time.time()})
client.hdel('task:retries', task)
else:
# Exponential backoff: 60s, 120s, 240s, 480s, 960s
delay = 60 * (2 ** retry_count)
client.zadd('scheduled:tasks', {task: time.time() + delay})
client.hincrby('task:retries', task, 1)
const MAX_RETRIES = 5;
async function handleFailure(client: Redis, task: string, retryCount: number) {
if (retryCount >= MAX_RETRIES) {
// Move to dead letter queue
await client.zadd('dead-letter:tasks', Date.now() / 1000, task);
await client.hdel('task:retries', task);
} else {
// Exponential backoff: 60s, 120s, 240s, 480s, 960s
const delay = 60 * Math.pow(2, retryCount);
await client.zadd('scheduled:tasks', Date.now() / 1000 + delay, task);
await client.hincrby('task:retries', task, 1);
}
}
const maxRetries = 5
func handleFailure(ctx context.Context, client *redis.Client, task string, retryCount int) {
if retryCount >= maxRetries {
// Move to dead letter queue
client.ZAdd(ctx, "dead-letter:tasks", redis.Z{
Score: float64(time.Now().Unix()),
Member: task,
})
client.HDel(ctx, "task:retries", task)
} else {
// Exponential backoff: 60s, 120s, 240s, 480s, 960s
delay := 60 * (1 << retryCount)
client.ZAdd(ctx, "scheduled:tasks", redis.Z{
Score: float64(time.Now().Unix() + int64(delay)),
Member: task,
})
client.HIncrBy(ctx, "task:retries", task, 1)
}
}
Choosing an approach
Use polling with ZRANGEBYSCORE when you have a single worker and tasks are idempotent. It is simpler to implement and debug, and accidental double-processing will not cause problems.
Use the Lua script approach when you have multiple workers competing for tasks or when tasks must not run twice. The atomic claim-and-move prevents both duplicate processing and data loss at the cost of slightly more complex worker logic.
Both approaches work well at scale. Redis sorted sets handle millions of members efficiently, and the operations are O(log N). Add monitoring for queue depth and processing lag to catch issues early. A scheduled queue that falls behind is often the first sign of a downstream problem.