Skip to content
On this page

Funnel Rate Limiting

Funnel rate limiting is one of the most commonly used limiting methods. As the name suggests, this algorithm is inspired by the structure of a funnel.

The capacity of a funnel is limited. If you block the spout and continue to pour water in, it will fill up until no more can fit. If you open the spout, water will flow out, and after some has drained, you can continue pouring in. If the flow rate from the spout exceeds the pouring rate, the funnel will never fill up. If the flow rate is lower, once the funnel is full, pouring must pause until it drains.

Thus, the remaining space in the funnel represents the current quantity of actions that can continue, and the flow rate from the spout represents the maximum frequency allowed by the system. Below is code that describes a single-machine funnel algorithm.

python
# coding: utf8

import time

class Funnel(object):
    def __init__(self, capacity, leaking_rate):
        self.capacity = capacity  # Funnel capacity
        self.leaking_rate = leaking_rate  # Spout flow rate
        self.left_quota = capacity  # Remaining space in the funnel
        self.leaking_ts = time.time()  # Last leak time

    def make_space(self):
        now_ts = time.time()
        delta_ts = now_ts - self.leaking_ts  # Time since last leak
        delta_quota = delta_ts * self.leaking_rate  # Space made available
        if delta_quota < 1:  # Insufficient space, wait for next time
            return
        self.left_quota += delta_quota  # Increase remaining space
        self.leaking_ts = now_ts  # Update leak time
        if self.left_quota > self.capacity:  # Remaining space cannot exceed capacity
            self.left_quota = self.capacity

    def watering(self, quota):
        self.make_space()
        if self.left_quota >= quota:  # Check if remaining space is sufficient
            self.left_quota -= quota
            return True
        return False


funnels = {}  # All funnels

# capacity: funnel capacity
# leaking_rate: spout flow rate quota/s
def is_action_allowed(user_id, action_key, capacity, leaking_rate):
    key = '%s:%s' % (user_id, action_key)
    funnel = funnels.get(key)
    if not funnel:
        funnel = Funnel(capacity, leaking_rate)
        funnels[key] = funnel
    return funnel.watering(1)


for i in range(20):
    print(is_action_allowed('laoqian', 'reply', 15, 0.5))

Here’s a Java version:

java
public class FunnelRateLimiter {

    static class Funnel {
        int capacity;
        float leakingRate;
        int leftQuota;
        long leakingTs;

        public Funnel(int capacity, float leakingRate) {
            this.capacity = capacity;
            this.leakingRate = leakingRate;
            this.leftQuota = capacity;
            this.leakingTs = System.currentTimeMillis();
        }

        void makeSpace() {
            long nowTs = System.currentTimeMillis();
            long deltaTs = nowTs - leakingTs;
            int deltaQuota = (int) (deltaTs * leakingRate);
            if (deltaQuota < 0) { // Overflow protection
                this.leftQuota = capacity;
                this.leakingTs = nowTs;
                return;
            }
            if (deltaQuota < 1) { // Minimum space unit
                return;
            }
            this.leftQuota += deltaQuota;
            this.leakingTs = nowTs;
            if (this.leftQuota > this.capacity) {
                this.leftQuota = this.capacity;
            }
        }

        boolean watering(int quota) {
            makeSpace();
            if (this.leftQuota >= quota) {
                this.leftQuota -= quota;
                return true;
            }
            return false;
        }
    }

    private Map<String, Funnel> funnels = new HashMap<>();

    public boolean isActionAllowed(String userId, String actionKey, int capacity, float leakingRate) {
        String key = String.format("%s:%s", userId, actionKey);
        Funnel funnel = funnels.get(key);
        if (funnel == null) {
            funnel = new Funnel(capacity, leakingRate);
            funnels.put(key, funnel);
        }
        return funnel.watering(1); // Needs 1 quota
    }
}

The make_space method of the Funnel object is the core of the funnel algorithm, called before each pouring to trigger leaking and make space. The amount of space made available depends on the time elapsed and the flow rate. The space occupied by the Funnel object is constant and does not directly relate to the frequency of actions.

The question arises: how can we implement a distributed funnel algorithm? Can we use Redis data structures to solve this?

Observing the fields of the Funnel object, we can store its contents in a hash structure. When pouring, we retrieve the fields from the hash for logical operations and then update the hash, completing the action frequency check.

However, there’s an issue: we cannot guarantee the atomicity of the entire process. Reading from the hash, performing calculations in memory, and updating the hash cannot be atomically done, requiring appropriate locking. Once locking is involved, it can fail, necessitating a retry or an abandonment of the action.

Retrying can degrade performance, while abandonment may affect user experience, increasing code complexity. How can we resolve this issue? Redis-Cell comes to the rescue!

Redis-Cell

Redis 4.0 introduced a rate limiting module called redis-cell, which also uses the funnel algorithm and provides atomic rate limiting commands. This simplifies the rate limiting problem significantly.

The module has a single command: cl.throttle, which has somewhat complex parameters and return values. Let's see how to use this command.

> cl.throttle laoqian:reply 15 30 60 1
                      ▲     ▲  ▲  ▲  ▲
                      |     |  |  |  └───── need 1 quota (optional, default is 1)
                      |     |  └──┴─────── 30 operations / 60 seconds (leaking rate)
                      |     └───────────── 15 capacity (funnel capacity)
                      └─────────────────── key laoqian

This command means the "reply behavior" of user "laoqian" is allowed to occur at a maximum frequency of 30 times every 60 seconds (leaking rate), with an initial funnel capacity of 15. Thus, it can reply to 15 posts continuously before being affected by the leaking rate. The leaking rate now has two parameters, making it more intuitive than a single floating-point number.

> cl.throttle laoqian:reply 15 30 60
1) (integer) 0   # 0 indicates allowed, 1 indicates denied
2) (integer) 15  # Funnel capacity
3) (integer) 14  # Remaining space in the funnel
4) (integer) -1  # If denied, how long to wait before retrying (in seconds)
5) (integer) 2   # Time until the funnel is completely empty (in seconds)

When executing the rate limiting command, if denied, the action must be discarded or retried. The cl.throttle command thoughtfully calculates the retry time, allowing you to use the fourth return value for a sleep operation. If you prefer not to block the thread, you can set up an asynchronous task for retrying.

Thoughts

In addition to user-generated content (UGC), where else can the funnel rate limiting module be applied?

Funnel Rate Limiting has loaded