How to Build Typing Indicators in Redis
The "someone is typing..." indicator is a small detail that makes chat feel alive. When you see those three dots or a friend's name appear, you know a message is coming. Building this feature seems simple at first, but the edge cases add up quickly. What happens when someone starts typing, gets distracted, and never sends? What if multiple people type at once? What if a user closes their browser mid-sentence?
Redis handles these problems elegantly because of its built-in key expiration. You can set a typing indicator that automatically disappears after a few seconds of inactivity, without needing cleanup jobs or garbage collection. This guide covers three approaches: simple expiring keys for basic one-on-one chat, sorted sets for group chats where several people might type at once, and Pub/Sub for real-time updates to notify clients instantly when typing status changes.
Which Redis data types we will use
String with expiration is the simplest way to track typing status. Set a key when a user starts typing and give it a short TTL, say 3 seconds. If the user keeps typing, refresh the TTL. If they stop, the key expires automatically. Other clients check for the key's existence to know if someone is typing.
Sorted Set tracks multiple typers in a single room efficiently. Each typing user becomes a member with their last keystroke timestamp as the score. To find who is currently typing, query for members with recent timestamps. This avoids creating many separate keys for busy group chats and makes cleanup straightforward.
Pub/Sub pushes typing notifications to connected clients in real time. Instead of clients polling to check if someone is typing, the server publishes typing events and clients receive them instantly. This reduces latency and Redis load, but clients that disconnect briefly might miss updates.
The simple approach: one key per typing user
For one-on-one conversations or simple chat rooms, an expiring string key works perfectly. When a user types, set a key like typing:room:123:user:456 with a 3-second expiration. Each keystroke refreshes the TTL. When the user stops typing for 3 seconds, the key vanishes on its own.
The receiving client checks for typing indicators by looking up the key. If it exists, show "User is typing...". If not, hide the indicator. This polling approach is simple but adds latency since you only discover typing status when you check.
Debouncing on the client side prevents flooding Redis with updates. Instead of writing to Redis on every keystroke, wait until the user pauses for 300 milliseconds, or throttle updates to once per second. This dramatically reduces write volume while keeping the indicator responsive.
- Redis
- Python
- TypeScript
- Go
# User starts typing, set indicator with 3 second expiration
SET typing:room:123:user:456 1 EX 3
> OK
# User keeps typing, refresh the expiration
SET typing:room:123:user:456 1 EX 3
> OK
# Another user checks if anyone is typing
GET typing:room:123:user:456
> "1" (user 456 is typing)
# After 3 seconds of no keystrokes, key expires automatically
GET typing:room:123:user:456
> (nil) (user stopped typing)
# User sends message, explicitly clear the indicator
DEL typing:room:123:user:456
> 1
TYPING_TTL = 3 # seconds
def set_typing(client, room_id: str, user_id: str):
"""Mark a user as typing in a room."""
key = f'typing:room:{room_id}:user:{user_id}'
client.set(key, '1', ex=TYPING_TTL)
def clear_typing(client, room_id: str, user_id: str):
"""Clear typing indicator when user sends a message."""
key = f'typing:room:{room_id}:user:{user_id}'
client.delete(key)
def is_user_typing(client, room_id: str, user_id: str) -> bool:
"""Check if a specific user is typing."""
key = f'typing:room:{room_id}:user:{user_id}'
return client.exists(key) == 1
def get_typing_users(client, room_id: str, user_ids: list[str]) -> list[str]:
"""Check which users from a list are currently typing."""
keys = [f'typing:room:{room_id}:user:{uid}' for uid in user_ids]
results = client.mget(keys)
return [uid for uid, val in zip(user_ids, results) if val is not None]
const TYPING_TTL = 3; // seconds
async function setTyping(client: Redis, roomId: string, userId: string) {
const key = `typing:room:${roomId}:user:${userId}`;
await client.set(key, '1', 'EX', TYPING_TTL);
}
async function clearTyping(client: Redis, roomId: string, userId: string) {
const key = `typing:room:${roomId}:user:${userId}`;
await client.del(key);
}
async function isUserTyping(client: Redis, roomId: string, userId: string): Promise<boolean> {
const key = `typing:room:${roomId}:user:${userId}`;
return await client.exists(key) === 1;
}
async function getTypingUsers(client: Redis, roomId: string, userIds: string[]): Promise<string[]> {
const keys = userIds.map(uid => `typing:room:${roomId}:user:${uid}`);
const results = await client.mget(keys);
return userIds.filter((_, i) => results[i] !== null);
}
const typingTTL = 3 * time.Second
func setTyping(ctx context.Context, client *redis.Client, roomID, userID string) {
key := fmt.Sprintf("typing:room:%s:user:%s", roomID, userID)
client.Set(ctx, key, "1", typingTTL)
}
func clearTyping(ctx context.Context, client *redis.Client, roomID, userID string) {
key := fmt.Sprintf("typing:room:%s:user:%s", roomID, userID)
client.Del(ctx, key)
}
func isUserTyping(ctx context.Context, client *redis.Client, roomID, userID string) bool {
key := fmt.Sprintf("typing:room:%s:user:%s", roomID, userID)
exists, _ := client.Exists(ctx, key).Result()
return exists == 1
}
func getTypingUsers(ctx context.Context, client *redis.Client, roomID string, userIDs []string) []string {
keys := make([]string, len(userIDs))
for i, uid := range userIDs {
keys[i] = fmt.Sprintf("typing:room:%s:user:%s", roomID, uid)
}
results, _ := client.MGet(ctx, keys...).Result()
var typing []string
for i, val := range results {
if val != nil {
typing = append(typing, userIDs[i])
}
}
return typing
}
Tracking multiple typers in group chats
When many users might type simultaneously, checking individual keys becomes inefficient. A sorted set stores all typing users for a room in a single key, with each user's last activity timestamp as their score. To find current typers, query for users with timestamps within the last few seconds.
This approach scales better for active group chats. One ZRANGEBYSCORE returns all typing users instead of checking many individual keys. You also get automatic ordering by who started typing most recently, which is useful for displaying "Alice, Bob, and 3 others are typing."
The tradeoff is that sorted sets do not expire individual members. You need to clean up stale entries by removing users with old timestamps. Do this when checking for typers, or run a periodic cleanup. The query itself filters out old entries, so stale data only wastes a small amount of memory until cleanup runs.
- Redis
- Python
- TypeScript
- Go
# User starts typing, add to sorted set with current timestamp
ZADD typing:room:123 1699900060 user:456
> 1
# Another user starts typing
ZADD typing:room:123 1699900062 user:789
> 1
# First user keeps typing, update their timestamp
ZADD typing:room:123 1699900065 user:456
> 0 (updated existing member)
# Get users typing in the last 3 seconds (current time is 1699900066)
ZRANGEBYSCORE typing:room:123 1699900063 +inf
> ["user:456", "user:789"]
# Clean up users who stopped typing (older than 3 seconds)
ZREMRANGEBYSCORE typing:room:123 -inf 1699900063
> 0 (removed stale entries)
# User sends message, remove from typing set
ZREM typing:room:123 user:456
> 1
import time
TYPING_WINDOW = 3 # seconds
def set_typing_group(client, room_id: str, user_id: str):
"""Mark a user as typing in a group chat."""
key = f'typing:room:{room_id}'
now = time.time()
client.zadd(key, {user_id: now})
def clear_typing_group(client, room_id: str, user_id: str):
"""Remove a user from the typing set."""
key = f'typing:room:{room_id}'
client.zrem(key, user_id)
def get_typing_users_group(client, room_id: str) -> list[str]:
"""Get all users currently typing in a room."""
key = f'typing:room:{room_id}'
cutoff = time.time() - TYPING_WINDOW
# Get recent typers and clean up stale entries in one go
pipe = client.pipeline()
pipe.zrangebyscore(key, cutoff, '+inf')
pipe.zremrangebyscore(key, '-inf', cutoff)
results = pipe.execute()
return results[0]
const TYPING_WINDOW = 3; // seconds
async function setTypingGroup(client: Redis, roomId: string, userId: string) {
const key = `typing:room:${roomId}`;
const now = Date.now() / 1000;
await client.zadd(key, now, userId);
}
async function clearTypingGroup(client: Redis, roomId: string, userId: string) {
const key = `typing:room:${roomId}`;
await client.zrem(key, userId);
}
async function getTypingUsersGroup(client: Redis, roomId: string): Promise<string[]> {
const key = `typing:room:${roomId}`;
const cutoff = Date.now() / 1000 - TYPING_WINDOW;
// Get recent typers and clean up stale entries
const pipeline = client.pipeline();
pipeline.zrangebyscore(key, cutoff, '+inf');
pipeline.zremrangebyscore(key, '-inf', cutoff);
const results = await pipeline.exec();
return results[0][1] as string[];
}
const typingWindow = 3 // seconds
func setTypingGroup(ctx context.Context, client *redis.Client, roomID, userID string) {
key := "typing:room:" + roomID
now := float64(time.Now().Unix())
client.ZAdd(ctx, key, redis.Z{Score: now, Member: userID})
}
func clearTypingGroup(ctx context.Context, client *redis.Client, roomID, userID string) {
key := "typing:room:" + roomID
client.ZRem(ctx, key, userID)
}
func getTypingUsersGroup(ctx context.Context, client *redis.Client, roomID string) []string {
key := "typing:room:" + roomID
cutoff := float64(time.Now().Unix() - typingWindow)
// Get recent typers and clean up stale entries
pipe := client.Pipeline()
cmd := pipe.ZRangeByScore(ctx, key, &redis.ZRangeBy{Min: fmt.Sprintf("%f", cutoff), Max: "+inf"})
pipe.ZRemRangeByScore(ctx, key, "-inf", fmt.Sprintf("%f", cutoff))
pipe.Exec(ctx)
users, _ := cmd.Result()
return users
}
Pushing updates with Pub/Sub
Polling works but adds latency. The receiving user only sees "typing" when their next poll happens. For real-time feel, use Redis Pub/Sub to push typing events to connected clients immediately. When a user starts or stops typing, publish an event to the room's channel. All subscribed clients receive it instantly.
Combine Pub/Sub with the storage approaches above. The sorted set or string keys remain the source of truth for who is typing. Pub/Sub just notifies clients that something changed. When a client reconnects after a brief disconnect, they can check the stored state to catch up on any events they missed.
The tradeoff is complexity. Your server needs to maintain WebSocket connections to clients and bridge them to Redis Pub/Sub subscriptions. Clients that disconnect and reconnect might miss typing events during the gap. For most chat applications, this brief inconsistency is acceptable since typing indicators are ephemeral anyway.
- Redis
- Python
- TypeScript
- Go
# When user starts typing, publish to the room channel
PUBLISH typing:room:123 '{"user":"456","typing":true}'
> 2 (number of subscribers who received the message)
# When user stops typing or sends message
PUBLISH typing:room:123 '{"user":"456","typing":false}'
> 2
# Clients subscribe to receive typing events
SUBSCRIBE typing:room:123
> Reading messages... (push "typing:room:123" '{"user":"456","typing":true}')
import json
def publish_typing_start(client, room_id: str, user_id: str):
"""Notify room that a user started typing."""
channel = f'typing:room:{room_id}'
message = json.dumps({'user': user_id, 'typing': True})
client.publish(channel, message)
def publish_typing_stop(client, room_id: str, user_id: str):
"""Notify room that a user stopped typing."""
channel = f'typing:room:{room_id}'
message = json.dumps({'user': user_id, 'typing': False})
client.publish(channel, message)
def subscribe_to_typing(client, room_id: str, callback):
"""Subscribe to typing events for a room."""
pubsub = client.pubsub()
pubsub.subscribe(f'typing:room:{room_id}')
for message in pubsub.listen():
if message['type'] == 'message':
data = json.loads(message['data'])
callback(data['user'], data['typing'])
function publishTypingStart(client: Redis, roomId: string, userId: string) {
const channel = `typing:room:${roomId}`;
const message = JSON.stringify({ user: userId, typing: true });
client.publish(channel, message);
}
function publishTypingStop(client: Redis, roomId: string, userId: string) {
const channel = `typing:room:${roomId}`;
const message = JSON.stringify({ user: userId, typing: false });
client.publish(channel, message);
}
function subscribeToTyping(
subscriber: Redis,
roomId: string,
callback: (userId: string, isTyping: boolean) => void
) {
const channel = `typing:room:${roomId}`;
subscriber.subscribe(channel);
subscriber.on('message', (ch, message) => {
if (ch === channel) {
const data = JSON.parse(message);
callback(data.user, data.typing);
}
});
}
func publishTypingStart(ctx context.Context, client *redis.Client, roomID, userID string) {
channel := "typing:room:" + roomID
message, _ := json.Marshal(map[string]interface{}{"user": userID, "typing": true})
client.Publish(ctx, channel, message)
}
func publishTypingStop(ctx context.Context, client *redis.Client, roomID, userID string) {
channel := "typing:room:" + roomID
message, _ := json.Marshal(map[string]interface{}{"user": userID, "typing": false})
client.Publish(ctx, channel, message)
}
func subscribeToTyping(ctx context.Context, client *redis.Client, roomID string, callback func(string, bool)) {
channel := "typing:room:" + roomID
pubsub := client.Subscribe(ctx, channel)
ch := pubsub.Channel()
for msg := range ch {
var data map[string]interface{}
json.Unmarshal([]byte(msg.Payload), &data)
callback(data["user"].(string), data["typing"].(bool))
}
}
Choosing an approach
Use expiring string keys for simple one-on-one chats or small rooms where you know the participant list. The automatic expiration handles cleanup perfectly, and checking a few keys is cheap enough.
Use sorted sets for group chats where many users might type simultaneously. The single key per room scales better than many individual keys, and you get all typers in one query. Remember to clean up stale entries periodically.
Use Pub/Sub when you need real-time updates and already have WebSocket infrastructure. Combine it with one of the storage approaches so clients can recover state after reconnecting. The added complexity is worth it for chat applications where instant feedback matters.
All three approaches handle the hard part automatically: users who abandon their typing without sending. The key expires, the timestamp ages out, or the stop event fires. No zombie typing indicators haunting your chat rooms.