缓存,降级和限流是大型分布式系统中的三把利剑。目前限流主要有漏桶和令牌桶两种算法。
漏桶算法的示意图如下:
令牌桶算法相对漏桶算法的优势在于可以处理系统的突发流量,其算法示意图如下所示:
Guava RateLimiter是一个谷歌提供的限流工具,RateLimiter基于令牌桶算法,可以有效限定单个JVM实例上某个接口的流量。
import com.google.common.util.concurrent.RateLimiter;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class RateLimiterExample {
public static void main(String[] args) throws InterruptedException {
// qps设置为5,代表一秒钟只允许处理五个并发请求
RateLimiter rateLimiter = RateLimiter.create(5);
ExecutorService executorService = Executors.newFixedThreadPool(5);
int nTasks = 10;
CountDownLatch countDownLatch = new CountDownLatch(nTasks);
long start = System.currentTimeMillis();
for (int i = 0; i < nTasks; i++) {
final int j = i;
executorService.submit(() -> {
rateLimiter.acquire(1);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
System.out.println(Thread.currentThread().getName() + " gets job " + j + " done");
countDownLatch.countDown();
});
}
executorService.shutdown();
countDownLatch.await();
long end = System.currentTimeMillis();
System.out.println("10 jobs gets done by 5 threads concurrently in " + (end - start) + " milliseconds");
}
}
复制代码
pool-1-thread-1 gets job 0 done pool-1-thread-2 gets job 1 done pool-1-thread-3 gets job 2 done pool-1-thread-4 gets job 3 done pool-1-thread-5 gets job 4 done pool-1-thread-6 gets job 5 done pool-1-thread-7 gets job 6 done pool-1-thread-8 gets job 7 done pool-1-thread-9 gets job 8 done pool-1-thread-10 gets job 9 done 10 jobs gets done by 5 threads concurrently in 2805 milliseconds 复制代码
上面例子中我们提交10个工作任务,每个任务大概耗时1000微秒,开启10个线程,并且使用RateLimiter设置了qps为5,一秒内只允许五个并发请求被处理,虽然有10个线程,但是我们设置了qps为5,一秒之内只能有五个并发请求。我们预期的总耗时大概是2000微秒左右,结果为2805和预期的差不多。
RateLimiter基于令牌桶算法,它的核心思想主要有:
acquire(20)
RateLimiter主要的类的类图如下所示:
RateLimiter 是一个抽象类,SmoothRateLimiter 继承自 RateLimiter,不过 SmoothRateLimiter 任然是一个抽象类,SmoothBursty 和 SmoothWarmingUp 才是具体的实现类。
SmoothRateLimiter 是抽象类,其中定义了一些关键的参数,我们先来看一下这些参数:
/** * The currently stored permits. */ double storedPermits; /** * The maximum number of stored permits. */ double maxPermits; /** * The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits * per second has a stable interval of 200ms. */ double stableIntervalMicros; /** * The time when the next request (no matter its size) will be granted. After granting a request, * this is pushed further in the future. Large requests push this further than small requests. */ private long nextFreeTicketMicros = 0L; // could be either in the past or future 复制代码
storedPermits 表明当前令牌桶中有多少令牌。maxPermits 表示令牌桶最大令牌数目,storedPermits 的取值范围为:[0, maxPermits]。stableIntervalMicros 等于 1/qps ,它代表系统在稳定期间,两次请求之间间隔的微秒数。例如:如果我们设置的 qps 为5,则 stableIntervalMicros 为200ms。nextFreeTicketMicros 表示系统处理完当前请求后,下一次请求被许可的最短微秒数,如果在这之前有请求进来,则必须等待。
RateLimiter 中提供了创建 SmoothBursty 的方法:
public static RateLimiter create(double permitsPerSecond) {
return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
}
@VisibleForTesting
static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */); // maxBurstSeconds 用于计算 maxPermits
rateLimiter.setRate(permitsPerSecond); // 设置生成令牌的速率
return rateLimiter;
}
复制代码
SmoothBursty 的 maxBurstSeconds 构造函数参数主要用于计算 maxPermits : maxPermits = maxBurstSeconds * permitsPerSecond; 。
我们再看一下 setRate 的方法,RateLimiter 中 setRate 方法最终后调用 doSetRate 方法,doSetRate 是一个抽象方法,SmoothRateLimiter 抽象类中覆写了 RateLimiter 的 doSetRate 方法:
// SmoothRateLimiter类中的doSetRate方法,覆写了 RateLimiter 类中的 doSetRate 方法,此方法再委托下面的 doSetRate 方法做处理。
@Override
final void doSetRate(double permitsPerSecond, long nowMicros) {
resync(nowMicros);
double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
this.stableIntervalMicros = stableIntervalMicros;
doSetRate(permitsPerSecond, stableIntervalMicros);
}
// SmoothBursty 和 SmoothWarmingUp 类中覆写此方法
abstract void doSetRate(double permitsPerSecond, double stableIntervalMicros);
// SmoothBursty 中对 doSetRate的实现
@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = this.maxPermits;
maxPermits = maxBurstSeconds * permitsPerSecond;
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = maxPermits;
} else {
storedPermits =
(oldMaxPermits == 0.0)
? 0.0 // initial state
: storedPermits * maxPermits / oldMaxPermits;
}
}
复制代码
SmoothRateLimiter 类的 doSetRate方法中我们着重看一下 resync 这个方法:
void resync(long nowMicros) {
// if nextFreeTicket is in the past, resync to now
if (nowMicros > nextFreeTicketMicros) {
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
storedPermits = min(maxPermits, storedPermits + newPermits);
nextFreeTicketMicros = nowMicros;
}
}
复制代码
resync 方法就是 RateLimiter 中 惰性计算 storedPermits 的实现。每一次请求来的时候,都会调用到这个方法。这个方法的过程大致如下:
1 / QPS 。coolDownIntervalMicros 方法在 SmoothWarmingUp 中的计算方式为 warmupPeriodMicros / maxPermits ,warmupPeriodMicros 是 SmoothWarmingUp 的“预热”时间。 tryAcquire 方法用于尝试获取若干个 permit,此方法不会等待,如果获取失败则直接返回失败。canAcquire 方法用于判断当前的请求能否通过:
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
long timeoutMicros = max(unit.toMicros(timeout), 0);
checkPermits(permits);
long microsToWait;
synchronized (mutex()) {
long nowMicros = stopwatch.readMicros();
if (!canAcquire(nowMicros, timeoutMicros)) { // 首先判断当前超时时间之内请求能否被满足,不能满足的话直接返回失败
return false;
} else {
microsToWait = reserveAndGetWaitLength(permits, nowMicros); // 计算本次请求需要等待的时间,此方法是核心
}
}
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return true;
}
final long reserveAndGetWaitLength(int permits, long nowMicros) {
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
return max(momentAvailable - nowMicros, 0);
}
private boolean canAcquire(long nowMicros, long timeoutMicros) {
return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;
}
final long queryEarliestAvailable(long nowMicros) {
return nextFreeTicketMicros;
}
复制代码
此逻辑比较简单,就是看 nextFreeTicketMicros 减去 timeoutMicros 是否小于等于 nowMicros。如果当前需求能被满足,则继续往下走。
接着会调用 SmoothRateLimiter 类的 reserveEarliestAvailable 方法,该方法返回当前请求需要等待的时间。改方法在 acquire 方法中也会用到,我们来着重分析这个方法。
// 计算本次请求需要等待的时间
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
resync(nowMicros); // 本次请求和上次请求之间间隔的时间是否应该有新的令牌生成,如果有则更新 storedPermits
long returnValue = nextFreeTicketMicros;
// 本次请求的令牌数 requiredPermits 由两个部分组成:storedPermits 和 freshPermits,storedPermits 是令牌桶中已有的令牌
// freshPermits 是需要新生成的令牌数
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
double freshPermits = requiredPermits - storedPermitsToSpend;
// 分别计算从两个部分拿走的令牌各自需要等待的时间,然后总和作为本次请求需要等待的时间,SmoothBursty 中从 storedPermits 拿走的部分不需要等待时间
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros);
// 更新 nextFreeTicketMicros,这里更新的其实是下一次请求的时间,是一种“预消费”
this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
// 更新 storedPermits
this.storedPermits -= storedPermitsToSpend;
return returnValue;
}
/**
* Translates a specified portion of our currently stored permits which we want to spend/acquire,
* into a throttling time. Conceptually, this evaluates the integral of the underlying function we
* use, for the range of [(storedPermits - permitsToTake), storedPermits].
*
* <p>This always holds: {@code 0 <= permitsToTake <= storedPermits}
*/
abstract long storedPermitsToWaitTime(double storedPermits, double permitsToTake);
复制代码
上面的代码是 SmoothRateLimiter 中的具体实现。其主要有以下步骤:
acquire 方法没有等待超时的概念,会一直阻塞直到满足本次请求。
public double acquire(int permits) {
long microsToWait = reserve(permits);
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return 1.0 * microsToWait / SECONDS.toMicros(1L);
}
final long reserve(int permits) {
checkPermits(permits);
synchronized (mutex()) {
return reserveAndGetWaitLength(permits, stopwatch.readMicros());
}
}
final long reserveAndGetWaitLength(int permits, long nowMicros) {
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
return max(momentAvailable - nowMicros, 0);
}
abstract long reserveEarliestAvailable(int permits, long nowMicros);
复制代码
acquire 方法最终还是通过 reserveEarliestAvailable 方法来计算本次请求需要等待的时间。这个方法上面已经分析过了,这里就不再过多阐述。
SmoothWarmingUp 相对 SmoothBursty 来说主要区别在于 storedPermitsToWaitTime 方法。其他部分原理和 SmoothBursty 类似。
SmoothWarmingUp 是 SmoothRateLimiter 的子类,它相对于 SmoothRateLimiter 多了几个属性:
static final class SmoothWarmingUp extends SmoothRateLimiter {
private final long warmupPeriodMicros;
/**
* The slope of the line from the stable interval (when permits == 0), to the cold interval
* (when permits == maxPermits)
*/
private double slope;
private double thresholdPermits;
private double coldFactor;
...
}
复制代码
这四个参数都是和 SmoothWarmingUp 的“热身”(warmup)机制相关。warmup 可以用如下的图来表示:
* ^ throttling * | * cold + / * interval | /. * | / . * | / . ← "warmup period" is the area of the trapezoid between * | / . thresholdPermits and maxPermits * | / . * | / . * | / . * stable +----------/ WARM . * interval | . UP . * | . PERIOD. * | . . * 0 +----------+-------+--------------→ storedPermits * 0 thresholdPermits maxPermits 复制代码
上图中横坐标是当前令牌桶中的令牌 storedPermits,前面说过 SmoothWarmingUp 将 storedPermits 分为两个区间:[0, thresholdPermits) 和[thresholdPermits, maxPermits]。纵坐标是请求的间隔时间,stableInterval 就是 1 / QPS ,例如设置的 QPS 为1,则 stableInterval 就是200ms, coldInterval = stableInterval * coldFactor ,这里的 coldFactor 是 "hard-coded"写死的是3。
注意到这里图像的面积就是 waitMicros 也即是本次请求需要等待的时间。计算过程就在 SmoothWarmingUp 中覆写的 storedPermitsToWaitTime 方法中:
@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
long micros = 0;
// measuring the integral on the right part of the function (the climbing line)
if (availablePermitsAboveThreshold > 0.0) { // 如果当前 storedPermits 超过 availablePermitsAboveThreshold 则计算从 超过部分拿令牌所需要的时间(图中的 WARM UP PERIOD)
// WARM UP PERIOD 部分计算的方法,这部分是一个梯形,梯形的面积计算公式是 “(上底 + 下底) * 高 / 2”
double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
// TODO(cpovirk): Figure out a good name for this variable.
double length = permitsToTime(availablePermitsAboveThreshold)
+ permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
micros = (long) (permitsAboveThresholdToTake * length / 2.0); // 计算出从 WARM UP PERIOD 拿走令牌的时间
permitsToTake -= permitsAboveThresholdToTake; // 剩余的令牌从 stable 部分拿
}
// measuring the integral on the left part of the function (the horizontal line)
micros += (stableIntervalMicros * permitsToTake); // stable 部分令牌获取花费的时间
return micros;
}
// WARM UP PERIOD 部分 获取相应令牌所对应的的时间
private double permitsToTime(double permits) {
return stableIntervalMicros + permits * slope;
}
复制代码
SmoothWarmingUp 类中 storedPermitsToWaitTime 方法将 permitsToTake 分为两部分,一部分从 WARM UP PERIOD 部分拿,这部分是一个梯形,面积计算就是(上底 + 下底)* 高 / 2。另一部分从 stable 部分拿,它是一个长方形,面积就是 长 * 宽。最后返回两个部分的时间总和。
使用Guava RateLimiter限流以及源码解析
Guava API文档