Skip to content

Stream

Redis 5.0 was unexpectedly released by its author Antirez, adding many new features. The ultimate highlight of Redis 5.0 is the introduction of a new data structure, Stream, which serves as a powerful, persistent message queue supporting multicast. The author admits that Redis Stream heavily borrows from Kafka's design.

stream1.webp

The structure of Redis Stream is shown above; it consists of a message linked list that chains all messages together, each with a unique ID and corresponding content. Messages are persistent and remain after a Redis restart.

Each Stream has a unique name, functioning as the Redis key, created automatically when the xadd command is first used to append messages.

Each Stream can have multiple consumer groups, with each group having a cursor, last_delivered_id, that moves forward over the Stream array, indicating the last message consumed by that group. Each consumer group has a unique name within the Stream and needs to be created separately with the xgroup create command, specifying the starting message ID for initializing the last_delivered_id variable.

The state of each Consumer Group is independent and does not affect each other. This means messages within the same Stream can be consumed by each consumer group.

A single Consumer Group can attach multiple Consumers, which compete with each other; when any Consumer reads a message, the cursor last_delivered_id advances. Each Consumer has a unique name within the group.

Internally, each Consumer has a state variable pending_ids, which records messages that have been read by the client but not yet acknowledged (ack). If the client does not ack, the IDs in this variable will increase; once a message is acked, it decreases. The pending_ids variable is officially referred to by Redis as PEL (Pending Entries List), a crucial data structure ensuring that a client consumes a message at least once without losing it during network transmission.

Message ID

The format of the message ID is timestampInMillis-sequence, for example, 1527846880572-5, indicating the message was generated at millisecond timestamp 1527846880572 and is the 5th message produced in that millisecond. Message IDs can be automatically generated by the server or specified by the client, but the format must be integer-integer, and IDs for later messages must be greater than those for earlier messages.

Message Content

Message content consists of key-value pairs, resembling hash structures, which are straightforward.

CRUD Operations

  • xadd: Append a message.
  • xdel: Delete a message (just sets a flag, does not affect total message length).
  • xrange: Retrieve the message list, automatically filtering deleted messages.
  • xlen: Get the message length.
  • del: Delete the Stream.
# * indicates server-generated ID, followed by a series of key/value pairs
# Name: laoqian, Age: 30
127.0.0.1:6379> xadd codehole * name laoqian age 30  
1527849609889-0  # Generated message ID
127.0.0.1:6379> xadd codehole * name xiaoyu age 29
1527849629172-0
127.0.0.1:6379> xadd codehole * name xiaoqian age 1
1527849637634-0
127.0.0.1:6379> xlen codehole
(integer) 3
# - indicates minimum value, + indicates maximum value
127.0.0.1:6379> xrange codehole - +
1) 1) 1527849609889-0
   2) 1) "name"
      2) "laoqian"
      3) "age"
      4) "30"
2) 1) 1527849629172-0
   2) 1) "name"
      2) "xiaoyu"
      3) "age"
      4) "29"
3) 1) 1527849637634-0
   2) 1) "name"
      2) "xiaoqian"
      3)  "age"
      4) "1"
# Specify minimum message ID
127.0.0.1:6379> xrange codehole 1527849629172-0 +  
1) 1) 1527849629172-0
   2) 1) "name"
      2) "xiaoyu"
      3) "age"
      4) "29"
2) 1) 1527849637634-0
   2) 1) "name"
      2) "xiaoqian"
      3) "age"
      4) "1"
# Specify maximum message ID
127.0.0.1:6379> xrange codehole - 1527849629172-0
1) 1) 1527849609889-0
   2) 1) "name"
      2) "laoqian"
      3) "age"
      4) "30"
2) 1) 1527849629172-0
   2) 1) "name"
      2) "xiaoyu"
      3) "age"
      4) "29"
127.0.0.1:6379> xdel codehole 1527849609889-0
(integer) 1
# Length remains unchanged
127.0.0.1:6379> xlen codehole
(integer) 3
# Deleted message is gone
127.0.0.1:6379> xrange codehole - +
1) 1) 1527849629172-0
   2) 1) "name"
      2) "xiaoyu"
      3) "age"
      4) "29"
2) 1) 1527849637634-0
   2) 1) "name"
      2) "xiaoqian"
      3) "age"
      4) "1"
# Delete entire Stream
127.0.0.1:6379> del codehole
(integer) 1

Independent Consumption

We can consume Stream messages independently without defining consumer groups. When there are no new messages, it can even block and wait. Redis has designed a separate consumption command, xread, allowing the Stream to be used like a regular message queue (list). Using xread, we can completely ignore the existence of consumer groups, treating the Stream like an ordinary list.

# Read two messages from the head of the Stream
127.0.0.1:6379> xread count 2 streams codehole 0-0
1) 1) "codehole"
   2) 1) 1) 1527851486781-0
         2) 1) "name"
            2) "laoqian"
            3) "age"
            4) "30"
      2) 1) 1527851493405-0
         2) 1) "name"
            2) "yurui"
            3) "age"
            4) "29"
# Read one message from the tail of the Stream (returns nothing)
127.0.0.1:6379> xread count 1 streams codehole $
(nil)
# Block waiting for new messages to arrive at the tail
127.0.0.1:6379> xread block 0 count 1 streams codehole $
# In another window, we add a message to the Stream
127.0.0.1:6379> xadd codehole * name youming age 60
1527852774092-0
# Back in the previous window, we see the block is lifted, returning new message content
# It also shows a wait time of 93s
127.0.0.1:6379> xread block 0 count 1 streams codehole $
1) 1) "codehole"
   2) 1) 1) 1527852774092-0
         2) 1) "name"
            2) "youming"
            3) "age"
            4) "60"
(93.11s)

Clients using xread for sequential consumption must remember where they last consumed, using the returned message ID. When calling xread again, the last message ID should be passed to continue consuming subsequent messages.

block 0 indicates indefinite blocking until a message arrives, while block 1000 indicates blocking for 1 second; if no messages arrive within that time, it returns nil.

127.0.0.1:6379> xread block 1000 count 1 streams codehole $
(nil)
(1.07s)

Creating Consumer Groups

stream2.webp

Streams create consumer groups with the xgroup create command, requiring a starting message ID parameter to initialize the last_delivered_id variable.

# Indicating to start consumption from the head
127.0.0.1:6379> xgroup create codehole cg1 0-0
OK
# $ indicates starting from

 the tail, only accepting new messages; current Stream messages will be ignored
127.0.0.1:6379> xgroup create codehole cg2 $
OK
# Get Stream information
127.0.0.1:6379> xinfo stream codehole
 1) length
 2) (integer) 3  # Total of 3 messages
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2  # Two consumer groups
 9) first-entry  # First message
10) 1) 1527851486781-0
    2) 1) "name"
       2) "laoqian"
       3) "age"
       4) "30"
11) last-entry  # Last message
12) 1) 1527851498956-0
    2) 1) "name"
       2) "xiaoqian"
       3) "age"
       4) "1"

# Get information about Stream's consumer groups
127.0.0.1:6379> xinfo groups codehole
1) 1) name
   2) "cg1"
   3) consumers
   4) (integer) 0  # No consumers in this group yet
   5) pending
   6) (integer) 0  # No messages being processed by this group
2) 1) name
   2) "cg2"
   3) consumers  # No consumers in this group yet
   4) (integer) 0
   5) pending
   6) (integer) 0  # No messages being processed by this group

Consumption

Stream provides the xreadgroup command for group consumption within a consumer group. You need to specify the group name, consumer name, and starting message ID. Like xread, it can block waiting for new messages. When new messages are read, their IDs enter the consumer's Pending Entries List (PEL). After processing, the client uses the xack command to notify the server that the message has been handled, removing the message ID from the PEL.

# The > sign indicates starting to read from the last_delivered_id of the current consumer group
# Each time a consumer reads a message, the last_delivered_id variable advances
127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 1 streams codehole >
1) 1) "codehole"
   2) 1) 1) 1527851486781-0
         2) 1) "name"
            2) "laoqian"
            3) "age"
            4) "30"
127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 1 streams codehole >
1) 1) "codehole"
   2) 1) 1) 1527851493405-0
         2) 1) "name"
            2) "yurui"
            3) "age"
            4) "29"
127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 2 streams codehole >
1) 1) "codehole"
   2) 1) 1) 1527851498956-0
         2) 1) "name"
            2) "xiaoqian"
            3) "age"
            4) "1"
      2) 1) 1527852774092-0
         2) 1) "name"
            2) "youming"
            3) "age"
            4) "60"
# Continue reading, but no new messages
127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 1 streams codehole >
(nil)
# Then block and wait
127.0.0.1:6379> xreadgroup GROUP cg1 c1 block 0 count 1 streams codehole >
# Open another window and add messages
127.0.0.1:6379> xadd codehole * name lanying age 61
1527854062442-0
# Go back to the previous window and see that the block is lifted, receiving a new message
127.0.0.1:6379> xreadgroup GROUP cg1 c1 block 0 count 1 streams codehole >
1) 1) "codehole"
   2) 1) 1) 1527854062442-0
         2) 1) "name"
            2) "lanying"
            3) "age"
            4) "61"
(36.54s)
# Check consumer group information
127.0.0.1:6379> xinfo groups codehole
1) 1) name
   2) "cg1"
   3) consumers
   4) (integer) 1  # One consumer
   5) pending
   6) (integer) 5  # Total of 5 messages pending ack
2) 1) name
   2) "cg2"
   3) consumers
   4) (integer) 0  # No changes in consumer group cg2, as we have been manipulating cg1
   5) pending
   6) (integer) 0
# If there are multiple consumers in the same group, we can observe the status of each consumer with xinfo consumers
127.0.0.1:6379> xinfo consumers codehole cg1  # Currently, there is 1 consumer
1) 1) name
   2) "c1"
   3) pending
   4) (integer) 5  # Total of 5 messages pending
   5) idle
   6) (integer) 418715  # Idle for how long in ms without reading messages
# Next, we ack one message
127.0.0.1:6379> xack codehole cg1 1527851486781-0
(integer) 1
127.0.0.1:6379> xinfo consumers codehole cg1
1) 1) name
   2) "c1"
   3) pending
   4) (integer) 4  # Changed to 4 messages pending
   5) idle
   6) (integer) 668504
# Now ack all messages
127.0.0.1:6379> xack codehole cg1 1527851493405-0 1527851498956-0 1527852774092-0 1527854062442-0
(integer) 4
127.0.0.1:6379> xinfo consumers codehole cg1
1) 1) name
   2) "c1"
   3) pending
   4) (integer) 0  # PEL is empty
   5) idle
   6) (integer) 745505

What if there are too many messages in Stream?

Readers may wonder if too many messages accumulate, won't the Stream's linked list become very long? The xdel command does not delete messages; it only marks them. Redis addresses this by providing a fixed-length Stream feature. By providing a maximum length in the xadd command, older messages can be discarded, ensuring the length does not exceed the specified limit.

127.0.0.1:6379> xlen codehole
(integer) 5
127.0.0.1:6379> xadd codehole maxlen 3 * name xiaorui age 1
1527855160273-0
127.0.0.1:6379> xlen codehole
(integer) 3

What happens if you forget to ACK a message?

The Stream keeps a list of message IDs being processed in each consumer structure (PEL). If a consumer receives a message and finishes processing but does not reply with an ack, the PEL list continues to grow. If there are many consumer groups, this can significantly increase memory usage.

stream3.webp

How does PEL avoid message loss?

If a client consumer reading Stream messages suddenly disconnects during the message response from the Redis server, the message may be lost. However, the PEL contains the sent message IDs. When the client reconnects, it can receive the list of message IDs from the PEL. At this point, the starting message ID for xreadgroup cannot be >, but must be any valid message ID, typically set as 0-0 to read all PEL messages and any new messages since last_delivered_id.

High Availability of Stream

The high availability of Stream is based on master-slave replication, which operates similarly to other data structures. This means that in Sentinel and Cluster environments, Stream can support high availability. However, due to Redis's asynchronous command replication, some minimal data loss may occur during failover, which is also true for other Redis data structures.

Partitioning

Redis does not natively support partitioning. To use partitioning, multiple Streams need to be allocated, and a strategy must be employed at the client side to produce messages to different Streams. You might think Kafka is more advanced since it natively supports partitioning, but I disagree. Kafka's client also has a HashStrategy because it uses the client's hash algorithm to place different messages into different partitions.

Additionally, Kafka supports the dynamic increase of partition numbers, but this adjustment is quite clumsy; it does not rehash existing content or redistribute historical data. Redis Stream can achieve this simple dynamic adjustment by adding new Streams.

Summary

The consumption model of Stream draws on the concept of consumer groups from Kafka, addressing the limitation of Redis Pub/Sub, which cannot persist messages. However, unlike Kafka, Stream does not support partitioning. If partitioning is necessary, it must be handled on the client side, providing different Stream names and using a hash modulus to determine which Stream to send messages to.

Stream has loaded