Appearance
Funnel Rate Limiting
Funnel rate limiting is one of the most commonly used limiting methods. As the name suggests, this algorithm is inspired by the structure of a funnel.
The capacity of a funnel is limited. If you block the spout and continue to pour water in, it will fill up until no more can fit. If you open the spout, water will flow out, and after some has drained, you can continue pouring in. If the flow rate from the spout exceeds the pouring rate, the funnel will never fill up. If the flow rate is lower, once the funnel is full, pouring must pause until it drains.
Thus, the remaining space in the funnel represents the current quantity of actions that can continue, and the flow rate from the spout represents the maximum frequency allowed by the system. Below is code that describes a single-machine funnel algorithm.
python
# coding: utf8
import time
class Funnel(object):
def __init__(self, capacity, leaking_rate):
self.capacity = capacity # Funnel capacity
self.leaking_rate = leaking_rate # Spout flow rate
self.left_quota = capacity # Remaining space in the funnel
self.leaking_ts = time.time() # Last leak time
def make_space(self):
now_ts = time.time()
delta_ts = now_ts - self.leaking_ts # Time since last leak
delta_quota = delta_ts * self.leaking_rate # Space made available
if delta_quota < 1: # Insufficient space, wait for next time
return
self.left_quota += delta_quota # Increase remaining space
self.leaking_ts = now_ts # Update leak time
if self.left_quota > self.capacity: # Remaining space cannot exceed capacity
self.left_quota = self.capacity
def watering(self, quota):
self.make_space()
if self.left_quota >= quota: # Check if remaining space is sufficient
self.left_quota -= quota
return True
return False
funnels = {} # All funnels
# capacity: funnel capacity
# leaking_rate: spout flow rate quota/s
def is_action_allowed(user_id, action_key, capacity, leaking_rate):
key = '%s:%s' % (user_id, action_key)
funnel = funnels.get(key)
if not funnel:
funnel = Funnel(capacity, leaking_rate)
funnels[key] = funnel
return funnel.watering(1)
for i in range(20):
print(is_action_allowed('laoqian', 'reply', 15, 0.5))
Here’s a Java version:
java
public class FunnelRateLimiter {
static class Funnel {
int capacity;
float leakingRate;
int leftQuota;
long leakingTs;
public Funnel(int capacity, float leakingRate) {
this.capacity = capacity;
this.leakingRate = leakingRate;
this.leftQuota = capacity;
this.leakingTs = System.currentTimeMillis();
}
void makeSpace() {
long nowTs = System.currentTimeMillis();
long deltaTs = nowTs - leakingTs;
int deltaQuota = (int) (deltaTs * leakingRate);
if (deltaQuota < 0) { // Overflow protection
this.leftQuota = capacity;
this.leakingTs = nowTs;
return;
}
if (deltaQuota < 1) { // Minimum space unit
return;
}
this.leftQuota += deltaQuota;
this.leakingTs = nowTs;
if (this.leftQuota > this.capacity) {
this.leftQuota = this.capacity;
}
}
boolean watering(int quota) {
makeSpace();
if (this.leftQuota >= quota) {
this.leftQuota -= quota;
return true;
}
return false;
}
}
private Map<String, Funnel> funnels = new HashMap<>();
public boolean isActionAllowed(String userId, String actionKey, int capacity, float leakingRate) {
String key = String.format("%s:%s", userId, actionKey);
Funnel funnel = funnels.get(key);
if (funnel == null) {
funnel = new Funnel(capacity, leakingRate);
funnels.put(key, funnel);
}
return funnel.watering(1); // Needs 1 quota
}
}
The make_space
method of the Funnel object is the core of the funnel algorithm, called before each pouring to trigger leaking and make space. The amount of space made available depends on the time elapsed and the flow rate. The space occupied by the Funnel object is constant and does not directly relate to the frequency of actions.
The question arises: how can we implement a distributed funnel algorithm? Can we use Redis data structures to solve this?
Observing the fields of the Funnel object, we can store its contents in a hash structure. When pouring, we retrieve the fields from the hash for logical operations and then update the hash, completing the action frequency check.
However, there’s an issue: we cannot guarantee the atomicity of the entire process. Reading from the hash, performing calculations in memory, and updating the hash cannot be atomically done, requiring appropriate locking. Once locking is involved, it can fail, necessitating a retry or an abandonment of the action.
Retrying can degrade performance, while abandonment may affect user experience, increasing code complexity. How can we resolve this issue? Redis-Cell comes to the rescue!
Redis-Cell
Redis 4.0 introduced a rate limiting module called redis-cell, which also uses the funnel algorithm and provides atomic rate limiting commands. This simplifies the rate limiting problem significantly.
The module has a single command: cl.throttle
, which has somewhat complex parameters and return values. Let's see how to use this command.
> cl.throttle laoqian:reply 15 30 60 1
▲ ▲ ▲ ▲ ▲
| | | | └───── need 1 quota (optional, default is 1)
| | └──┴─────── 30 operations / 60 seconds (leaking rate)
| └───────────── 15 capacity (funnel capacity)
└─────────────────── key laoqian
This command means the "reply behavior" of user "laoqian" is allowed to occur at a maximum frequency of 30 times every 60 seconds (leaking rate), with an initial funnel capacity of 15. Thus, it can reply to 15 posts continuously before being affected by the leaking rate. The leaking rate now has two parameters, making it more intuitive than a single floating-point number.
> cl.throttle laoqian:reply 15 30 60
1) (integer) 0 # 0 indicates allowed, 1 indicates denied
2) (integer) 15 # Funnel capacity
3) (integer) 14 # Remaining space in the funnel
4) (integer) -1 # If denied, how long to wait before retrying (in seconds)
5) (integer) 2 # Time until the funnel is completely empty (in seconds)
When executing the rate limiting command, if denied, the action must be discarded or retried. The cl.throttle
command thoughtfully calculates the retry time, allowing you to use the fourth return value for a sleep operation. If you prefer not to block the thread, you can set up an asynchronous task for retrying.
Thoughts
In addition to user-generated content (UGC), where else can the funnel rate limiting module be applied?