Skip to content

Delay Queue

We often use RabbitMQ and Kafka as message queue middleware to add asynchronous messaging capabilities between applications. Both of these middleware options are specialized message queues with features that often exceed most people's understanding.

Those who have used RabbitMQ know how complex it can be. Before sending a message, you need to create an Exchange, then create a Queue, and bind the Queue and Exchange together using certain rules. When sending a message, you must specify the routing key and control header information. Consumers also have to go through this tedious process before consuming messages. However, in most cases, even with only one set of consumers, we still go through these cumbersome steps.

With Redis, we can simplify this for message queues with only one set of consumers. Redis's message queue is not a specialized message queue; it lacks many advanced features and doesn't provide ack guarantees. If you require extreme reliability for messages, it may not be suitable.

Asynchronous Message Queue

Redis's list data structure is commonly used as an asynchronous message queue, utilizing rpush/lpush for enqueue operations and lpop/rpop for dequeue operations.

delay-q1.webp

plaintext
> rpush notify-queue apple banana pear
(integer) 3
> llen notify-queue
(integer) 3
> lpop notify-queue
"apple"
> llen notify-queue
(integer) 2
> lpop notify-queue
"banana"
> llen notify-queue
(integer) 1
> lpop notify-queue
"pear"
> llen notify-queue
(integer) 0
> lpop notify-queue
(nil)

The above shows an example of using rpush and lpop together. You can also use lpush and rpop interchangeably. We won't elaborate further here.

What if the Queue is Empty?

The client retrieves messages via the queue's pop operation for processing. After processing, it continues to fetch messages in a loop, which represents the lifecycle of the queue consumer.

However, if the queue is empty, the client will fall into a pop deadlock, continuously attempting to pop without any data. This results in wasted cycles due to empty polling, which not only raises the client's CPU usage but also increases Redis's QPS. If multiple clients engage in empty polling, Redis's slow query count may significantly rise.

Typically, we use sleep to mitigate this issue, allowing the thread to pause briefly, around 1 second. This approach reduces both the client's CPU usage and Redis's QPS.

python
time.sleep(1)  # Python sleeps for 1s
Thread.sleep(1000)  # Java sleeps for 1s

delay-q2.webp

Delayed Messages

Using the sleep method can solve the problem, but it introduces a small issue: it increases message latency. If there is only one consumer, the latency will be 1 second. With multiple consumers, the latency can decrease since their sleep times are staggered.

Is there a way to significantly reduce latency? You might think of shortening the sleep time. While that works, a better solution exists: blpop/brpop.

These two commands use a blocking prefix, meaning they enter a sleep state when the queue is empty and wake up immediately upon data arrival. This approach nearly eliminates message latency. Replacing lpop/rpop with blpop/brpop perfectly resolves the aforementioned issues.

Idle Connection Auto-Disconnection

Do you think the above solution is perfect? Not so fast; there's still an issue to address.

What issue? — Idle connections.

If a thread remains blocked, the Redis client connection becomes idle, and if it remains idle for too long, the server usually disconnects it to free up resources. In this case, blpop/brpop will throw an exception.

Therefore, when writing the client consumer, it's essential to capture exceptions and implement retries.

Lock Conflict Handling

In the previous lesson, we discussed distributed locks but didn't mention how to handle lock acquisition failures. Typically, there are three strategies for handling failed lock attempts:

  1. Directly throw an exception, informing the user to retry later.
  2. Sleep for a moment before retrying.
  3. Move the request to a delay queue and try again later.

The first approach is suitable for user-initiated requests, as users will read the error message and then click retry, creating a manual delay effect. For better user experience, front-end code can manage the delay retry. Essentially, this relinquishes the current request, allowing the user to decide whether to initiate a new request.

Sleep

Using sleep blocks the current message processing thread, delaying subsequent message processing. If collisions occur frequently or if there are many messages in the queue, sleep may not be suitable. If certain keys lead to a deadlock, threads can become completely blocked, preventing timely processing of subsequent messages.

Delayed Queue

This approach suits asynchronous message handling by placing conflicting requests into another queue for later processing, thus avoiding conflicts.

Implementation of Delayed Queue

A delayed queue can be implemented using Redis's sorted set (zset). We serialize messages into strings as the zset's value, with the message's processing deadline as the score. Multiple threads poll the zset for expired tasks to process, ensuring availability even if one thread fails. Given multiple threads, it's crucial to consider concurrent task contention to ensure tasks are not executed multiple times.

python
def delay(msg):
    msg.id = str(uuid.uuid4())  # Ensure the value is unique
    value = json.dumps(msg)
    retry_ts = time.time() + 5  # Retry after 5 seconds
    redis.zadd("delay-queue", retry_ts, value)

def loop():
    while True:
        # Retrieve at most 1 item
        values = redis.zrangebyscore("delay-queue", 0, time.time(), start=0, num=1)
        if not values:
            time.sleep(1)  # Rest for 1s if the delay queue is empty
            continue
        value = values[0]  # Only one item to process
        success = redis.zrem("delay-queue", value)  # Remove this message from the queue
        if success:  # Only one process can grab the message due to potential concurrency
            msg = json.loads(value)
            handle_msg(msg)

Redis's zrem method is key for multi-threaded and multi-process task contention, as its return value determines if the current instance successfully grabbed the task. The loop method may be called by multiple threads or processes, so using zrem ensures that a task is uniquely assigned.

Also, be sure to capture exceptions in handle_msg to avoid loop exit due to individual task handling issues. Below is the Java version of the delayed queue implementation, which requires the FastJSON library for JSON serialization.

java
import java.lang.reflect.Type;
import java.util.Set;
import java.util.UUID;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;

import redis.clients.jedis.Jedis;

public class RedisDelayingQueue<T> {

  static class TaskItem<T> {
    public String id;
    public T msg;
  }

  // TypeReference is needed for FastJSON serialization with generic types
  private Type TaskType = new TypeReference<TaskItem<T>>() {}.getType();

  private Jedis jedis;
  private String queueKey;

  public RedisDelayingQueue(Jedis jedis, String queueKey) {
    this.jedis = jedis;
    this.queueKey = queueKey;
  }

  public void delay(T msg) {
    TaskItem<T> task = new TaskItem<T>();
    task.id = UUID.randomUUID().toString(); // Assign a unique UUID
    task.msg = msg;
    String s = JSON.toJSONString(task); // FastJSON serialization
    jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // Add to delayed queue, retry after 5s
  }

  public void loop() {
    while (!Thread.interrupted()) {
      // Only retrieve one item
      Set<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
      if (values.isEmpty()) {
        try {
          Thread.sleep(500); // Pause and continue
        } catch (InterruptedException e) {
          break;
        }
        continue;
      }
      String s = values.iterator().next();
      if (jedis.zrem(queueKey, s) > 0) { // Successfully grabbed the task
        TaskItem<T> task = JSON.parseObject(s, TaskType); // FastJSON deserialization
        this.handleMsg(task.msg);
      }
    }
  }

  public void handleMsg(T msg) {
    System.out.println(msg);
  }

  public static void main(String[] args) {
    Jedis jedis = new Jedis();
    RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(jedis, "q-demo");
    Thread producer = new Thread() {
      public void run() {
        for (int i = 0; i < 10; i++) {
          queue.delay("codehole" + i);
        }
      }
    };
    Thread consumer = new Thread() {
      public void run() {
        queue.loop();
      }
    };
    producer.start();
    consumer.start();
    try {
      producer.join();
      Thread.sleep(6000);
      consumer.interrupt();
      consumer.join();
    } catch (InterruptedException e) {}
  }
}

Further Optimization

In the algorithm above, the same task may be retrieved by multiple processes before using zrem, resulting in wasted efforts for those that didn’t succeed. To optimize this logic, consider using Lua scripting to atomically combine zrangebyscore and zrem on the server side. This way, contention for tasks among multiple processes will not lead to this waste.

Reflection

  • Why can't Redis guarantee 100% reliability as a message queue?
  • Using Lua scripting to optimize the logic of the delayed queue.
Delay Queue has loaded