Skip to content

PubSub

We previously discussed the usage of Redis message queues but did not mention one of its shortcomings: it does not support multicast messaging.

pubsub1.webp

Message Multicast

Message multicast allows a producer to send a message once while the middleware is responsible for copying the message to multiple queues, each consumed by a corresponding consumer group. This is a common decoupling method in distributed systems, enabling the logical separation of multiple consumer groups into different subsystems. Without multicast support, different consumer group logics would have to be chained together in a single subsystem for continuous consumption.

pubsub2.webp

PubSub

To support message multicast, Redis cannot rely solely on its five basic data types. It employs a separate module called PubSub, which stands for Publisher-Subscriber. Let's demonstrate how to use PubSub in Python.

python
# -*- coding: utf-8 -*-
import time
import redis

client = redis.StrictRedis()
p = client.pubsub()
p.subscribe("codehole")
time.sleep(1)
print(p.get_message())
client.publish("codehole", "java comes")
time.sleep(1)
print(p.get_message())
client.publish("codehole", "python comes")
time.sleep(1)
print(p.get_message())
print(p.get_message())

Output:

{'pattern': None, 'type': 'subscribe', 'channel': 'codehole', 'data': 1}
{'pattern': None, 'type': 'message', 'channel': 'codehole', 'data': 'java comes'}
{'pattern': None, 'type': 'message', 'channel': 'codehole', 'data': 'python comes'}
None

pubsub3.webp

After the client issues a subscription command, Redis provides an immediate feedback message indicating successful subscription. Due to network transmission delays, the client needs to wait briefly before calling get_message() to retrieve the feedback. After publishing a message, the same delay applies when trying to get the published message. If there are no messages available, get_message() will return None, indicating no messages are present, meaning it's non-blocking.

The producer and consumer in Redis PubSub are on different connections. In the example above, two Redis connections are used, which is necessary because Redis does not allow a connection to wait for messages while performing other operations.

In production environments, we rarely run the producer and consumer in the same thread. If they were to run in the same thread, we could directly call functions instead of using middleware for message passing. Thus, we should separate the producer and consumer. Let’s see how to implement this separation.

Consumer

python
# -*- coding: utf-8 -*-
import time
import redis

client = redis.StrictRedis()
p = client.pubsub()
p.subscribe("codehole")
while True:
    msg = p.get_message()
    if not msg:
        time.sleep(1)
        continue
    print(msg)

Producer

python
# -*- coding: utf-8 -*-
import redis

client = redis.StrictRedis()
client.publish("codehole", "python comes")
client.publish("codehole", "java comes")
client.publish("codehole", "golang comes")

The consumer must be started first, followed by the producer. We can start multiple consumers, and PubSub will ensure they receive the same sequence of messages.

Output in the consumer console:

{'pattern': None, 'type': 'subscribe', 'channel': 'codehole', 'data': 1}
{'pattern': None, 'type': 'message', 'channel': 'codehole', 'data': 'python comes'}
{'pattern': None, 'type': 'message', 'channel': 'codehole', 'data': 'java comes'}
{'pattern': None, 'type': 'message', 'channel': 'codehole', 'data': 'golang comes'}

The output shows that each consumer window displays the same messages. The first line is the successful subscription message, followed quickly by the next three lines when the producer process executes. The consumer retrieves messages by polling get_message(), sleeping for 1 second if no messages are received. This brings to mind the message queue model we discussed earlier, where we could use blpop to improve message processing timeliness.

Using a polling approach with sleep can lead to message processing delays. Instead, we can use listen for blocking message listening, which works similarly to blpop. Here’s how we can modify the consumer:

Blocking Consumer

python
# -*- coding: utf-8 -*-
import redis

client = redis.StrictRedis()
p = client.pubsub()
p.subscribe("codehole")
for msg in p.listen():
    print(msg)

This code is much shorter, removing the need for sleep and allowing for timely message processing.

Pattern Subscription

The aforementioned subscription mode is based on explicit topic names. To subscribe to multiple topics, we can issue multiple subscribe commands:

plaintext
> subscribe codehole.image codehole.text codehole.blog

This results in three successful subscription feedback messages:

1) "subscribe"
2) "codehole.image"
3) (integer) 1
1) "subscribe"
2) "codehole.text"
3) (integer) 2
1) "subscribe"
2) "codehole.blog"
3) (integer) 3

When the producer publishes messages to these three topics, the consumer receives them all:

plaintext
> publish codehole.image https://www.google.com/dudo.png
(integer) 1
> publish codehole.text "你好,欢迎加入码洞"
(integer) 1
> publish codehole.blog '{"content": "hello, everyone", "title": "welcome"}'
(integer) 1

If a new topic, codehole.group, is added, the client must issue another subscription command to receive messages from this new topic.

To simplify subscriptions, Redis offers a pattern subscription feature, allowing clients to subscribe to multiple topics at once. Even if the producer adds new topics that match the pattern, the consumer will immediately receive messages.

plaintext
> psubscribe codehole.*

This command uses pattern matching to subscribe to multiple topics, allowing the consumer to receive messages that start with codehole.

Message Structure

The output from the consumer messages is in the following dictionary format:

plaintext
{'pattern': None, 'type': 'subscribe', 'channel': 'codehole', 'data': 1}
{'pattern': None, 'type': 'message', 'channel': 'codehole', 'data': 'python comes'}
{'pattern': None, 'type': 'message', 'channel': 'codehole', 'data': 'java comes'}
{'pattern': None, 'type': 'message', 'channel': 'codehole', 'data': 'golang comes'}

What do these fields mean?

  • data: The message content, a string.
  • channel: The name of the subscribed topic.
  • type: The type of the message. If it’s a regular message, the type is message. For control messages, like subscription feedback, the type is subscribe. For pattern subscription feedback, it’s psubscribe, and for unsubscribe feedback, it’s unsubscribe or punsubscribe.
  • pattern: Indicates the subscription pattern used. If it’s through the subscribe command, this field is empty.

Drawbacks of PubSub

When a producer sends a message, Redis directly passes it to the corresponding consumer. If no consumers are available, the message is discarded. If there were initially three consumers and one fails, the producer continues to send messages, and the remaining consumers receive them. However, any messages sent during the disconnection period will be completely lost when the disconnected consumer reconnects.

If Redis restarts, PubSub messages are not persistent. Since a Redis outage effectively means no consumers are available, all messages are discarded.

Because of these limitations, PubSub has few suitable application scenarios. As a result, the creator of Redis initiated a project called Disque, specifically for multicast messaging queues. This project has remained in Beta for a long time and hasn't matured, but the client SDKs are quite rich. The official Release version is still awaited. For more details about Disque, interested readers can check related documentation.

Supplement

With the recent introduction of the Stream data structure in Redis 5.0, persistent message queue capabilities have been added, potentially making PubSub obsolete. This may mean that Disque will never release its final version.

PubSub has loaded