来源 | OSCHINA 社区
作者 | 华为云开发者联盟——xindoo
原文链接:https://my.oschina.net/u/4526289/blog/7787170
摘要:本文将详细介绍下 RRateLimiter 的具体使用方式、实现原理还有一些注意事项。我们目前在工作中遇到一个性能问题,我们有个定时任务需要处理大量的数据,为了提升吞吐量,所以部署了很多台机器,但这个任务在运行前需要从别的服务那拉取大量的数据,随着数据量的增大,如果同时多台机器并发拉取数据,会对下游服务产生非常大的压力。之前已经增加了单机限流,但无法解决问题,因为这个数据任务运行中只有不到 10% 的时间拉取数据,如果单机限流限制太狠,虽然集群总的请求量控制住了,但任务吞吐量又降下来。如果限流阈值太高,多机并发的时候,还是有可能压垮下游。所以目前唯一可行的解决方案就是分布式限流。我目前是选择直接使用 Redisson 库中的 RRateLimiter 实现了分布式限流,关于 Redission 可能很多人都有所耳闻,它其实是在 Redis 能力上构建的开发库,除了支持 Redis 的基础操作外,还封装了布隆过滤器、分布式锁、限流器…… 等工具。今天要说的 RRateLimiter 及时其实现的限流器。接下来本文将详细介绍下 RRateLimiter 的具体使用方式、实现原理还有一些注意事项,最后简单谈谈我对分布式限流底层原理的理解。
RedissonClient redissonClient = Redisson.create();
RRateLimiter rateLimiter = redissonClient.getRateLimiter("xindoo.limiter");
rateLimiter.trySetRate(RateType.OVERALL, 100, 1, RateIntervalUnit.HOURS);
rateLimiter.trySetRate 就是设置限流参数,RateType 有两种,OVERALL 是全局限流 ,PER_CLIENT 是单 Client 限流(可以认为就是单机限流),这里我们只讨论全局模式。而后面三个参数的作用就是设置在多长时间窗口内(rateInterval+IntervalUnit),许可总量不超过多少(rate),上面代码中我设置的值就是 1 小时内总许可数不超过 100 个。然后调用 rateLimiter 的 tryAcquire () 或者 acquire () 方法即可获取许可。
rateLimiter.acquire(1); // 申请1份许可,直到成功
boolean res = rateLimiter.tryAcquire(1, 5, TimeUnit.SECONDS); // 申请1份许可,如果5s内未申请到就放弃
使用起来还是很简单的嘛,以上代码中的两种方式都是同步调用,但 Redisson 还同样提供了异步方法 acquireAsync () 和 tryAcquireAsync (),使用其返回的 RFuture 就可以异步获取许可。
private <T> RFuture<T> tryAcquireAsync(RedisCommand<T> command, Long value) {
byte[] random = new byte[8];
ThreadLocalRandom.current().nextBytes(random);
return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"——————————————————————————————————————"
+ "这里是一大段lua代码"
+ "____________________________________",
Arrays.asList(getRawName(), getValueName(), getClientValueName(), getPermitsName(), getClientPermitsName()),
value, System.currentTimeMillis(), random);
}
映入眼帘的就是一大段 lua 代码,其实这段 Lua 代码就是限流实现的核心,我把这段 lua 代码摘出来,并加了一些注释,我们来详细看下。
local rate = redis.call("hget", KEYS[1], "rate") # 100
local interval = redis.call("hget", KEYS[1], "interval") # 3600000
local type = redis.call("hget", KEYS[1], "type") # 0
assert(rate ~= false and interval ~= false and type ~= false, "RateLimiter is not initialized")
local valueName = KEYS[2] # {xindoo.limiter}:value 用来存储剩余许可数量
local permitsName = KEYS[4] # {xindoo.limiter}:permits 记录了所有许可发出的时间戳
# 如果是单实例模式,name信息后面就需要拼接上clientId来区分出来了
if type == "1" then
valueName = KEYS[3] # {xindoo.limiter}:value:b474c7d5-862c-4be2-9656-f4011c269d54
permitsName = KEYS[5] # {xindoo.limiter}:permits:b474c7d5-862c-4be2-9656-f4011c269d54
end
# 对参数校验
assert(tonumber(rate) >= tonumber(ARGV[1]), "Requested permits amount could not exceed defined rate")
# 获取当前还有多少许可
local currentValue = redis.call("get", valueName)
local res
# 如果有记录当前还剩余多少许可
if currentValue ~= false then
# 回收已过期的许可数量
local expiredValues = redis.call("zrangebyscore", permitsName, 0, tonumber(ARGV[2]) - interval)
local released = 0
for i, v in ipairs(expiredValues) do
local random, permits = struct.unpack("Bc0I", v)
released = released + permits
end
# 清理已过期的许可记录
if released > 0 then
redis.call("zremrangebyscore", permitsName, 0, tonumber(ARGV[2]) - interval)
if tonumber(currentValue) + released > tonumber(rate) then
currentValue = tonumber(rate) - redis.call("zcard", permitsName)
else
currentValue = tonumber(currentValue) + released
end
redis.call("set", valueName, currentValue)
end
# ARGV permit timestamp random, random是一个随机的8字节
# 如果剩余许可不够,需要在res中返回下个许可需要等待多长时间
if tonumber(currentValue) < tonumber(ARGV[1]) then
local firstValue = redis.call("zrange", permitsName, 0, 0, "withscores")
res = 3 + interval - (tonumber(ARGV[2]) - tonumber(firstValue[2]))
else
redis.call("zadd", permitsName, ARGV[2], struct.pack("Bc0I", string.len(ARGV[3]), ARGV[3], ARGV[1]))
# 减小可用许可量
redis.call("decrby", valueName, ARGV[1])
res = nil
end
else # 反之,记录到还有多少许可,说明是初次使用或者之前已记录的信息已经过期了,就将配置rate写进去,并减少许可数
redis.call("set", valueName, rate)
redis.call("zadd", permitsName, ARGV[2], struct.pack("Bc0I", string.len(ARGV[3]), ARGV[3], ARGV[1]))
redis.call("decrby", valueName, ARGV[1])
res = nil
end
local ttl = redis.call("pttl", KEYS[1])
# 重置
if ttl > 0 then
redis.call("pexpire", valueName, ttl)
redis.call("pexpire", permitsName, ttl)
end
return res
即便是加了注释,相信你还是很难一下子看懂这段代码的,接下来我就以其在 Redis 中的数据存储形式,然辅以流程图让大家彻底了解其实现实现原理。首先用 RRateLimiter 有个 name,在我代码中就是 xindoo.limiter,用这个作为 KEY 你就可以在 Redis 中找到一个 map,里面存储了 limiter 的工作模式 (type)、可数量 (rate)、时间窗口大小 (interval),这些都是在 limiter 创建时写入到的 redis 中的,在上面的 lua 代码中也使用到了。其次还俩很重要的 key,valueName 和 permitsName,其中在我的代码实现中 valueName 是 {xindoo.limiter}:value ,它存储的是当前可用的许可数量。我代码中 permitsName 的具体值是 {xindoo.limiter}:permits,它是一个 zset,其中存储了当前所有的许可授权记录(含有许可授权时间戳),其中 SCORE 直接使用了时间戳,而 VALUE 中包含了 8 字节的随机值和许可的数量,如下图:往期推荐
开源方案低成本复现ChatGPT流程,仅需1.6GB显存即可体验
点这里 ↓↓↓ 记得 关注✔ 标星⭐ 哦