聊聊限流
By 王宁
为什么要限流
高并发
缓存
降级
限流
限流的目的是为了保护系统不被大量请求冲垮,通过限制请求的速度来保护系统
限流的目的是为了保护系统不被大量请求冲垮,通过限制请求的速度来保护系统
高可用
如何做限流
控制并发数量
控制同一时刻有多少请求占用资源
public class XXXService {
private final Semaphore permit = new Semaphore(10, true);
public void process(){
try{
permit.acquire();
//业务逻辑处理
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
permit.release();
}
}
}
控制访问速率
控制单位时间内有多少请求可以进入
计数器
漏桶
令牌桶
计数器
单位时间内,通过简单计数的方式限制流量。如果一个流量进来,计数器加一,如果超过最大阈值则拒绝请求,当时间过渡时,将计数器还原为0。

问题:计数器算法有什么漏洞?
滑动窗口
滑动窗口,又称rolling window。将单位时间划分为多个小格子,用一个滑动的窗口计算流量。

漏桶
水(请求)先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大会直接溢出,可以看出漏桶算法能强行限制数据的传输速率。

令牌桶
系统会以一个恒定的速度往桶里放入令牌,如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务,令牌桶算法通过发放令牌,根据令牌的rate频率做请求频率限制,容量限制等。

RateLimiter --- Guava
RateLimiter基于令牌桶算法,以设定的恒定速率向令牌桶内放置令牌,请求到达时,只有拿到令牌才能执行;
RateLimiter支持预消费
RateLimiter有两种限流模式
- 稳定模式(SmoothBursty::令牌生成速度恒定,平滑突发限流)
RateLimiter limiter = RateLimiter.create(doublepermitsPerSecond)
RateLimiter.create(5)表示桶容量为5且每秒新增5个令牌,即每隔200毫秒新增一个令牌;limiter.acquire()表示消费一个令牌,如果当前桶中有足够令牌则成功(返回值为0),如果桶中没有令牌则暂停一段时间,比如发令牌间隔是200毫秒,则等待200毫秒后再去消费令牌,这种实现将突发请求速率平均为了固定请求速率。
- 渐进模式(SmoothWarmingUp::令牌生成速度缓慢提升直到维持在一个稳定值,平滑预热限流)
RateLimiter limiter = RateLimiter.create(doublepermitsPerSecond, long warmupPeriod, TimeUnit unit);
permitsPerSecond表示每秒新增的令牌数,warmupPeriod表示在从冷启动速率过渡到平均速率的时间间隔。
限流组件的设计
资源
限流
配置
限流
public interface Limiter {
/**
* 限流的qps
*
* @return
*/
double getQPS();
/**
* 尝试获取通过的许可,如果获取不到直接返回false
*
* @return
*/
boolean tryAcquire();
}
配置
public class LimiterRule implements Serializable {
/**
* 资源名
*/
private String resource;
/**
* 限制qps
*/
private double qps = -1;
/**
* 是否启用
*/
private boolean enable = true;
}
资源 =====> 限流
public interface LimiterFactory {
/**
* 根据资源获取限流组件
*
* @param resource 资源字符串
* @return
*/
Limiter getLimiterByResource(String resource);
}
限流实现方式
RateLimiter
。。。
限流 <=====> 配置
初始化
变更
配置方式
字典配置
Apollo
zookeeper
...
/**
* 限流配置源
*/
public interface LimiterConfigurationSource {
/**
* 注册配置更新监听器
*
* @param listener
*/
void addUpdateListener(ConfigurationUpdateListener listener);
/**
* 删除配置更新监听器
*
* @param listener
*/
void removeUpdateListener(ConfigurationUpdateListener listener);
/**
* 配置监听器。配置更新时会进行回调
*/
interface ConfigurationUpdateListener {
/**
* 更新配置的回调函数
*
* @param configuration
*/
void updateConfiguration(UpdateConfiguration configuration);
}
/**
* 更新的配置信息
*/
@Data
class UpdateConfiguration {
/**
* 新增的限流规则
*/
private Collection<LimiterRule> added;
/**
* 更新的限流规则
*/
private Collection<LimiterRule> changed;
/**
* 删除的限流规则
*/
private Collection<LimiterRule> deleted;
}
}