快捷搜索:  汽车  科技

redis 自带事务不需要lua脚本(Lua实现分布式限流组件)

redis 自带事务不需要lua脚本(Lua实现分布式限流组件)具体实现见 实施步骤 一节,由于实施步骤较多,故放在后面章节,我们先来看看如何集成及使用。组件基于springboot starter机制,结合SPI 接口设计思想(内部集成:Redis Lua实现限流算法(令牌桶 固定窗口 滑动窗口)以及限流回退默认实现),支持注解方式/配置文件方式接入限流,主要分为以下部分:2、尽量少改或者不改造已有功能:少侵入或者0侵入式开发。3、扩展方便,集成简单,开发速率高,使用简单。在实际应用时也不要太纠结算法问题,因为一些限流算法实现是一样的只是描述不一样;具体使用哪种限流技术还是要根据实际场景来选择,不要一味去找最佳模式,白猫黑猫能解决问题的就是好猫 :)

limit-Spring-boot-starter

limit-spring-boot-starter是一个基于springboot starter机制,结合SPI 接口设计思想(内部集成:Redis Lua实现限流算法(令牌桶 固定窗口 滑动窗口)以及限流回退默认实现),支持注解方式/配置文件方式接入限流,扩展方便,集成使用简单的分布式限流组件。
开源地址:https://gitee.com/javacoo/limit-spring-boot-starter

背景介绍业务背景

1、随着业务的快速发展,对接的第三方合作机构越来越多,对外提供服务API访问量成倍增加,导致服务器压力也不断增加,而服务器资源是有限的,当请求量达到设计的极限时,如果不采取措施,轻则导致服务响应时间变长,重则可能造成整个系统瘫痪。

生产环境背景

1、账单日批量业务接口访问量暴增,特别是某个时间段
2、业务方调用接口的速度未知,QPS可能达到400/s,600/s,或者更高
3、对外服务API性能上限是 QPS 300/s
4、已经出现服务不可用,应用崩溃的事故

需求分析

1、鉴于业务方对接口的调用频率未知,而我方的接口服务有上限,为保证服务的可用性,业务层需要对接口调用方的流量进行限制—–接口限流。

2、尽量少改或者不改造已有功能:少侵入或者0侵入式开发。

3、扩展方便,集成简单,开发速率高,使用简单。

设计思路主流思路
  • 在开发高并发系统时有三把利器用来保护系统:缓存、降级和限流。缓存的目的是提升系统访问速度和增大系统能处理的容量,可谓是抗高并发流量的银弹;而降级是当服务出问题或者影响到核心流程的性能则需要暂时屏蔽掉,待高峰或者问题解决后再打开;而有些场景并不能用缓存和降级来解决,比如稀缺资源(秒杀、抢购)、写服务(如评论、下单)、频繁的复杂查询(评论的最后几页),因此需有一种手段来限制这些场景的并发/请求量,即限流。
  • 常见的限流有:限制总并发数(比如数据库连接池、线程池)、限制瞬时并发数(如nginx的limit_conn模块,用来限制瞬时并发连接数)、限制时间窗口内的平均速率(如Guava的RateLimiter、nginx的limit_req模块,限制每秒的平均速率);其他还有如限制远程接口调用速率、限制MQ的消费速率。另外还可以根据网络连接数、网络流量、CPU或内存负载等来限流。
  • 如果是单节点我们可以使用google为我们提供的guava包下的RateLimiter进行限流,它使用的是令牌桶算法,分布式场景下也可以使用网关进行限流,如Spring Clound Gateway,其实还有很多开源的限流框架如阿里的Sentinel,甚至我们可以利用redis lua脚本自己来实现限流。
前面的话

在实际应用时也不要太纠结算法问题,因为一些限流算法实现是一样的只是描述不一样;具体使用哪种限流技术还是要根据实际场景来选择,不要一味去找最佳模式,白猫黑猫能解决问题的就是好猫 :)

我的思路

组件基于springboot starter机制,结合SPI 接口设计思想(内部集成:Redis Lua实现限流算法(令牌桶 固定窗口 滑动窗口)以及限流回退默认实现),支持注解方式/配置文件方式接入限流,主要分为以下部分:

  • 新建 springboot starter工程
  • 基于SPI思想设计扩展接口
  • redis lua实现分布式限流
  • 支持接口添加注解方式限流
  • 支持配置文件方式限流
  • 支持限流回退
  • ...

具体实现见 实施步骤 一节,由于实施步骤较多,故放在后面章节,我们先来看看如何集成及使用。

集成及使用集成

<dependency> <groupId>com.javacoo</groupId> <artifactId>limit-spring-boot-starter</artifactId> <version>1.0.0</version> </dependency>使用

  • 配置文件接入限流 主要配置说明,详见 LimitConfig 限流配置,覆盖原则:有方法级独立配置则使用独立配置,否则使用全局配置,当配置了 limit.expression限流表达式则激活了配置文件接入限流,注解方式失效。

#全局配置 #限流表达式:拦截 com.javacoo.service.example.service 包及子包下所有方法 limit.expression=execution(* com.javacoo.service.example.service..*.*(..)) #全局限流回退实现名称 limit.limit-rule.fallback-impl=globalFallback #方法级配置:针对getExampleInfo方法 配置独立的限流规则 #给定的时间范围 单位(秒) limit.limit-method-map[getExampleInfo].period = 60 #一定时间内最多访问次数 limit.limit-method-map[getExampleInfo].count = 5 #限流类型 limit.limit-method-map[getExampleInfo].limitType = CUSTOMER #降级策略 limit.limit-method-map[getExampleInfo].fallbackStrategy = FALLBACK #当降级策略为:回退 时回退处理接口实现名称 limit.limit-method-map[getExampleInfo].fallbackImpl = getExampleInfoFallback

  • 使用注解方式接入限流:默认配置情况

//60秒内,允许访问1次 @Limit(period = 60 count = 1) public Optional<ExampleDto> getExampleInfo(String id) { AbstractAssert.isNotBlank(id ErrorCodeConstants.SERVICE_GET_EXAMPLE_INFO_ID); ExampleDto exampleDto = new ExampleDto(); exampleDto.setId("1"); exampleDto.setData("正常数据"); return Optional.ofNullable(exampleDto); }

限流效果:正常访问

2021-08-11 15:13:38.553 INFO 16452 --- [ main] c.j.l.c.i.redis.RedisLuaRateLimiter : [限流交易请求] key:[COM.JAVACOO.SERVICE.EXAMPLE.SERVICE.IMPL.EXAMPLESERVICEIMPL.GETEXAMPLEINFO] 60秒内 已访问次数:1 60秒内 限制次数:1 2021-08-11 15:13:38.553 INFO 16452 --- [ main] c.j.l.c.handler.AbstractLimitHandler : [限流交易请求] 尝试获取执行权限成功 开始执行目标方法

限流效果:降级策略:FAIL_FAST

2021-08-11 15:14:14.413 INFO 16192 --- [ main] c.j.l.c.i.redis.RedisLuaRateLimiter : [限流交易请求] key:[COM.JAVACOO.SERVICE.EXAMPLE.SERVICE.IMPL.EXAMPLESERVICEIMPL.GETEXAMPLEINFO] 60秒内 已访问次数:2 60秒内 限制次数:1 2021-08-11 15:14:14.414 INFO 16192 --- [ main] c.j.l.c.handler.AbstractLimitHandler : [限流交易请求] 尝试获取执行权限失败 服务降级处理 降级策略:FAIL_FAST 2021-08-11 15:14:14.416 ERROR 16192 --- [ main] c.j.l.c.handler.AbstractLimitHandler : [限流服务执行异常] com.javacoo.limit.client.exception.LimitException: 访问过于频繁 超出访问限制 ...

使用注解接入限流:回退策略情况

//60秒内,允许访问1次,回退策略,指定回退处理类 @Limit(period = 60 count = 1 fallbackStrategy = FallbackStrategy.FALLBACK fallbackImpl = "getExampleInfoFallback") public Optional<ExampleDto> getExampleInfo(String id) { AbstractAssert.isNotBlank(id ErrorCodeConstants.SERVICE_GET_EXAMPLE_INFO_ID); ExampleDto exampleDto = new ExampleDto(); exampleDto.setId("1"); exampleDto.setData("正常数据"); return Optional.ofNullable(exampleDto); }

限流效果:正常访问

2021-08-11 15:22:44.613 INFO 16720 --- [ main] c.j.l.c.i.redis.RedisLuaRateLimiter : [限流交易请求] key:[COM.JAVACOO.SERVICE.EXAMPLE.SERVICE.IMPL.EXAMPLESERVICEIMPL.GETEXAMPLEINFO] 60秒内 已访问次数:1 60秒内 限制次数:1 2021-08-11 15:22:44.614 INFO 16720 --- [ main] c.j.l.c.handler.AbstractLimitHandler : [限流交易请求] 尝试获取执行权限成功 开始执行目标方法 ...

限流效果:降级策略:FALLBACK

2021-08-11 15:23:09.497 INFO 8592 --- [ main] c.j.l.c.handler.AnnotationLimitHandler : [AnnotationLimitHandler限流交易请求] 尝试获取方法:com.javacoo.service.example.service.impl.ExampleServiceImpl.getExampleInfo 执行权限 2021-08-11 15:23:09.666 INFO 8592 --- [ main] c.j.l.c.i.redis.RedisLuaRateLimiter : [限流交易请求] key:[COM.JAVACOO.SERVICE.EXAMPLE.SERVICE.IMPL.EXAMPLESERVICEIMPL.GETEXAMPLEINFO] 60秒内 已访问次数:2 60秒内 限制次数:1 2021-08-11 15:23:09.666 INFO 8592 --- [ main] c.j.l.c.handler.AbstractLimitHandler : [限流交易请求] 尝试获取执行权限失败 服务降级处理 降级策略:FALLBACK 2021-08-11 15:23:09.668 INFO 8592 --- [ main] c.j.s.e.fallback.GetExampleInfoFallback : getExampleInfo方法降级处理 ...

限流回退扩展实现

基于xkernel 提供的SPI机制(详见:https://gitee.com/javacoo/xkernel
),扩展非常方便,大致步骤如下:
1、 实现限流回退接口:如 com.javacoo.service.example.fallback.GetExampleInfoFallback com.javacoo.service.example.fallback.GlobalFallback

2、配置限流回退接口:

  • 在项目resource目录新建包->META-INF->services
  • 创建com.javacoo.limit.client.api.Fallback文件,文件内容:实现类的全局限定名,如:

globalFallback=com.javacoo.service.example.fallback.GlobalFallback getExampleInfoFallback=com.javacoo.service.example.fallback.GetExampleInfoFallback

redis 自带事务不需要lua脚本(Lua实现分布式限流组件)(1)

- 修改配置文件,添加如下内容: ```properties #全局配置 limit.limit-rule.fallback-impl=globalFallback #方法级配置 limit.limit-method-map[getExampleInfo].fallbackImpl = getExampleInfoFallback ```

全局实现

/** * 全局方法降级处理接口实现 * <li></li> */ @Slf4j public class GlobalFallback implements Fallback<Object> { /** * 服务降级处理 * <li></li> * @return: R 返回对象 */ @Override public Object getFallback() { log.info("全局降级处理"); return null; } }

指定方法实现

/** * getExampleInfo方法降级处理接口实现 * <li></li> */ @Slf4j public class GetExampleInfoFallback implements Fallback<Optional<ExampleDto>> { /** * 服务降级处理 * <li></li> * * @return: java.lang.Object 返回对象 */ @Override public Optional<ExampleDto> getFallback() { log.info("getExampleInfo方法降级处理"); ExampleDto exampleDto = new ExampleDto(); exampleDto.setData("请求过多 请稍后再试"); return Optional.ofNullable(exampleDto); } }实施步骤1,新建limit-spring-boot-starter工程

  • 工程结构

redis 自带事务不需要lua脚本(Lua实现分布式限流组件)(2)

limit.png

  • 类结构图

redis 自带事务不需要lua脚本(Lua实现分布式限流组件)(3)

RateLimiter.png

  • 项目结构

limit-spring-boot-starter └── src ├── main │ ├── java │ │ └── com.javacoo │ │ ├────── limit │ │ │ ├──────client │ │ │ │ ├── api │ │ │ │ │ ├── Fallback 服务降级回退处理接口 │ │ │ │ │ └── RateLimiter 限流接口 │ │ │ │ ├── annotation │ │ │ │ │ └── Limit 限流注解 │ │ │ │ ├── config │ │ │ │ │ ├── LimitRule 限流规则配置 │ │ │ │ │ └── LimitConfig 限流配置 │ │ │ │ ├── enums │ │ │ │ │ ├── FallbackStrategy 降级策略 │ │ │ │ │ └── LimitType 限流类型 │ │ │ │ ├── exception │ │ │ │ │ └── LimitException 限流异常 │ │ │ │ ├── util │ │ │ │ │ └── WebUtil 工具类 │ │ │ │ ├── handler │ │ │ │ │ ├── AbstractLimitHandler 抽象限流处理器 │ │ │ │ │ ├── AnnotationLimitHandler 限流注解处理器 │ │ │ │ │ ├── ConfigLimitHandler 限流配置处理器 │ │ │ │ │ └── LimitPointcutAdvisor 限流切面Advisor │ │ │ │ └── internal 接口内部实现 │ │ │ │ ├── redis │ │ │ │ │ ├── DefaultFallback 服务降级回退处理接口默认实现 │ │ │ │ └── redis │ │ │ │ ├── LimitRedisConfig RedisTemplate配置类 │ │ │ │ ├── AbstractRateLimiter 抽象限流接口实现 │ │ │ │ ├── FixedWindowRateLimiter 固定窗口算法实现类 │ │ │ │ ├── LeakyBucketRateLimiter 漏桶算法实现类 │ │ │ │ ├── SlidingWindowRateLimiter 滑动窗口算法实现类 │ │ │ │ └── TokenBucketRateLimiter 令牌桶算法实现类 │ │ │ └──────starter │ │ │ ├── LimitAutoConfiguration 自动配置类 │ │ │ └── RateLimiterHolder 分布式限流接口对象持有者 │ └── resource │ ├── META-INF │ │ ├── spring.factories │ │ └── ext │ │ └── internal │ │ ├── com.javacoo.limit.client.api.Fallback │ │ └── com.javacoo.limit.client.api.RateLimiter │ └── script │ ├── FixedWindow.lua │ ├── LeakyBucket.lua │ ├── SlidingWindow.lua │ └── TokenBucket.lua └── test 测试2,基于SPI思想设计扩展接口

限流接口->com.javacoo.limit.client.api.RateLimiter

/** * 限流接口 * <p>说明:</p> * <li></li> */ @Spi(LimitConfig.DEFAULT_IMPL) public interface RateLimiter { /** 默认时间单位:秒 */ TimeUnit DEFAULT_TIME_UNIT = TimeUnit.SECONDS; /** 默认限流时间范围:1 */ int DEFAULT_PERIOD = 1; /** 默认限流数量: 10 */ int DEFAULT_LIMIT_COUNT = 10; /** * 尝试获取 * <p>说明:</p> * <li></li> * @param keys key * @param count 限制数量 * @return boolean 是否成功 */ boolean tryAcquire(ImmutableList<String> keys int count); /** * 尝试获取 * <p>说明:</p> * <li></li> * @param keys key * @param count 限制数量 * @param period 时间周期 * @return boolean 是否成功 */ boolean tryAcquire(ImmutableList<String> keys int count int period); /** * 尝试获取 * <p>说明:</p> * <li></li> * @param keys key * @param count 限制数量 * @param period 时间周期 * @param timeUnit 时间周期单位 * @return boolean 是否成功 */ boolean tryAcquire(ImmutableList<String> keys int count int period TimeUnit timeUnit); }

服务降级回退处理接口->com.javacoo.limit.client.api.Fallback

/** * 服务降级回退处理接口 * <li></li> */ @Spi(LimitConfig.DEFAULT_IMPL) public interface Fallback<R> { /** * 服务降级处理 * <li></li> * @return: R 返回对象 */ default R getFallback(){ return null;} }redis lua实现分布式限流(内部扩展实现)

RedisTemplate配置类

/** * RedisTemplate配置类 * <li></li> */ @Slf4j @Configuration public class LimitRedisConfig { private static final String DATA_FORMAT = "yyyy-MM-dd HH:mm:ss"; @Bean public RedisTemplate<String Serializable> limitRedisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate<String Serializable> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(redisConnectionFactory); FastJsonRedisSerializer<Serializable> fastJsonRedisSerializer = new FastJsonRedisSerializer(Serializable.class); FastJsonConfig fastJsonConfig = new FastJsonConfig(); fastJsonConfig.setDateFormat(DATA_FORMAT); fastJsonRedisSerializer.setFastJsonConfig(fastJsonConfig); redisTemplate.setValueSerializer(fastJsonRedisSerializer); redisTemplate.setHashValueSerializer(fastJsonRedisSerializer); // 设置键(key)的序列化采用StringRedisSerializer。 redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.setHashKeySerializer(new StringRedisSerializer()); redisTemplate.afterPropertiesSet(); return redisTemplate; } }

抽象限流接口实现

/** * 抽象限流接口实现 * <li></li> */ @Slf4j public abstract class AbstractRateLimiter implements RateLimiter { /** * RedisScript */ protected RedisScript<Long> redisLuaScript; /** * RedisTemplate */ @Autowired protected RedisTemplate<String Serializable> redisTemplate; @PostConstruct public void initLUA() { redisLuaScript = new DefaultRedisScript<>(buildLuaScript() Long.class); } /** * 尝试获取 * <p>说明:</p> * <li></li> * * @param keys key * @param count 限制数量 * @return boolean 是否成功 */ @Override public boolean tryAcquire(ImmutableList<String> keys int count) { return tryAcquire(keys count RateLimiter.DEFAULT_PERIOD); } /** * 尝试获取 * <p>说明:</p> * <li></li> * * @param keys key * @param count 限制数量 * @param period 时间周期 * @return boolean 是否成功 */ @Override public boolean tryAcquire(ImmutableList<String> keys int count int period) { return tryAcquire(keys count period RateLimiter.DEFAULT_TIME_UNIT); } /** * 尝试获取 * <p>说明:</p> * <li></li> * * @param keys key * @param count 限制数量 * @param period 时间周期 * @param timeUnit 时间周期单位 * @return boolean 是否成功 */ @Override public boolean tryAcquire(ImmutableList<String> keys int count int period TimeUnit timeUnit) { return acquire(keys count period timeUnit); } /** * 构建lua脚本 * <li></li> * @return: java.lang.String */ protected abstract String buildLuaScript(); /** * 尝试获取 * <p>说明:</p> * <li></li> * @param keys key * @param limitCount 限制数量 * @param limitPeriod 时间周期 * @param timeUnit 时间周期单位 * @return boolean 是否成功 */ protected abstract boolean acquire(ImmutableList<String> keys int limitCount int limitPeriod TimeUnit timeUnit); /** * 加载lua脚本 * <li></li> * @param path: * @return: java.lang.String */ protected String loadLuaScript(String path){ try { PathMatchingResourcePatternResolver resolver = new PathMatchingResourcePatternResolver(this.getClass().getClassLoader()); Resource[] resource = resolver.getResources(path); String luaScriptContent = StreamUtils.copyToString(resource[0].getInputStream() StandardCharsets.UTF_8); log.info("完成加载Lua脚本:{}" luaScriptContent); return luaScriptContent; } catch (IOException ioException) { ioException.printStackTrace(); throw new LimitException("加载Lua脚本异常" ioException); } } }

固定窗口算法实现类

** * 分布式限流 固定窗口算法实现类 * <p>说明:</p> * <li>固定窗口,一般来说,如非时间紧迫,不建议选择这个方案,太过生硬。但是,为了能快速止损眼前的问题可以作为临时应急的方案</li> */ @Slf4j public class FixedWindowRateLimiter extends AbstractRateLimiter { /** * 构建lua脚本 * <li></li> * @return: java.lang.String */ @Override protected String buildLuaScript(){ return loadLuaScript( "classpath:script/FixedWindow.lua"); } /** * 尝试获取 * <p>说明:</p> * <li></li> * * @param keys key * @param limitCount 限制数量 * @param limitPeriod 时间周期 * @param timeUnit 时间周期单位 * @return boolean 是否成功 */ @Override protected boolean acquire(ImmutableList<String> keys int limitCount int limitPeriod TimeUnit timeUnit){ Long count = redisTemplate.execute(redisLuaScript keys limitCount limitPeriod); log.info("[固定窗口限流交易请求] key:{} 返回:{} {}秒内 限制次数:{}" keys count limitPeriod limitCount); if (count != null && count.intValue() == 1) { return true; } else { return false; } } }

FixedWindow.lua

--限流KEY local key = KEYS[1] --限流大小 local limit = tonumber(ARGV[1]) --时间周期 local period = tonumber(ARGV[2]) local current = tonumber(redis.call('get' key) or "0") --如果超出限流大小 if current 1 > limit then return 0 else redis.call("incr" key) if current == 1 then redis.call("expire" key period) end return 1 end

  • 漏桶算法实现类

滑动窗口算法实现类

/** * 分布式限流 滑动窗口算法实现类 * <p>说明:</p> * <li>滑动窗口。这个方案适用于对异常结果「高容忍」的场景,毕竟相比“两窗”少了一个缓冲区。但是,胜在实现简单</li> */ @Slf4j public class SlidingWindowRateLimiter extends AbstractRateLimiter { /** * 构建lua脚本 * <li></li> * @return: java.lang.String */ @Override protected String buildLuaScript(){ return loadLuaScript( "classpath:script/SlidingWindow.lua"); } /** * 尝试获取 * <p>说明:</p> * <li></li> * * @param keys key * @param limitCount 限制数量 * @param limitPeriod 时间周期 * @param timeUnit 时间周期单位 * @return boolean 是否成功 */ @Override protected boolean acquire(ImmutableList<String> keys int limitCount int limitPeriod TimeUnit timeUnit){ Long count = redisTemplate.execute(redisLuaScript keys "1" limitCount limitPeriod); log.info("[滑动窗口限流交易请求] key:{} {}秒内 返回数量:{} {}秒内 限制次数:{}" keys limitPeriod count limitPeriod limitCount); if (count != null && count.intValue() > 0) { return true; } else { return false; } } }

SlidingWindow.lua

local function addToQueue(x time) local count=0 for i=1 x 1 do redis.call('lpush' KEYS[1] time) count=count 1 end return count end --返回 local result=0 --限流KEY local key = KEYS[1] --申请数 local applyCount = tonumber(ARGV[1]) --阀值数量 local limit = tonumber(ARGV[2]) --阀值时间 local period = tonumber(ARGV[3]) redis.replicate_commands() local now = redis.call('time')[1] redis.call('SET' 'now' now); --当前时间 local current_time = now local timeBase = redis.call('lindex' key limit - applyCount) if (timeBase == false) or (tonumber(current_time) - tonumber(timeBase) > period) then result = result addToQueue(applyCount tonumber(current_time)) end if (timeBase ~= false) then redis.call('ltrim' key 0 limit) end return result

令牌桶算法实现类

/** * 分布式限流 令牌桶算法实现类 * <p>说明:</p> * <li>令牌桶。当你需要尽可能的压榨程序的性能(此时桶的最大容量必然会大于等于程序的最大并发能力),并且所处的场景流量进入波动不是很大(不至于一瞬间取完令牌,压垮后端系统)</li> */ @Slf4j public class TokenBucketRateLimiter extends AbstractRateLimiter { /** * 构建lua脚本 * <li></li> */ @Override protected String buildLuaScript(){ return loadLuaScript( "classpath:script/TokenBucket.lua"); } /** * 尝试获取 * <p>说明:</p> * <li></li> * * @param keys key * @param limitCount 限制数量 * @param limitPeriod 时间周期 * @param timeUnit 时间周期单位 * @return boolean 是否成功 */ @Override protected boolean acquire(ImmutableList<String> keys int limitCount int limitPeriod TimeUnit timeUnit){ Long count = redisTemplate.execute(redisLuaScript keys limitCount limitPeriod); log.info("[令牌桶限流交易请求] key:{} 返回:{} {}秒内 限制次数:{}" keys count limitPeriod limitCount); if (count != null && count.intValue() == 1) { return true; } else { return false; } } }

TokenBucket.lua

--利用redis的hash结构,存储key所对应令牌桶的上次获取时间和上次获取后桶中令牌数量 local ratelimit_info = redis.pcall('HMGET' KEYS[1] 'last_time' 'current_token_num') local last_time = ratelimit_info[1] local current_token_num = tonumber(ratelimit_info[2]) redis.replicate_commands() local now = redis.call('time')[1] redis.call('SET' 'now' now); --tonumber是将value转换为数字,此步是取出桶中最大令牌数、生成令牌的速率(每秒生成多少个)、当前时间 local max_token_num = tonumber(ARGV[1]) local token_rate = tonumber(ARGV[2]) --local current_time = tonumber(ARGV[3]) local current_time = now --reverse_time 即多少毫秒生成一个令牌 local reverse_time = 1000/token_rate --如果current_token_num不存在则说明令牌桶首次获取或已过期,即说明它是满的 if current_token_num == nil then current_token_num = max_token_num last_time = current_time else --计算出距上次获取已过去多长时间 local past_time = current_time-last_time --在这一段时间内可产生多少令牌 local reverse_token_num = math.floor(past_time/reverse_time) current_token_num = current_token_num reverse_token_num last_time = reverse_time * reverse_token_num last_time if current_token_num > max_token_num then current_token_num = max_token_num end end local result = 0 if(current_token_num > 0) then result = 1 current_token_num = current_token_num - 1 end --将最新得出的令牌获取时间和当前令牌数量进行存储 并设置过期时间 redis.call('HMSET' KEYS[1] 'last_time' last_time 'current_token_num' current_token_num) redis.call('pexpire' KEYS[1] math.ceil(reverse_time*(max_token_num - current_token_num) (current_time-last_time))) return result4,支持接口添加注解方式限流

自定义限流注解

/** * 自定义限流注解 * <p>说明:</p> * <li></li> */ @Target({ElementType.METHOD ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Inherited @Documented public @interface Limit { /** * 名字 */ String name() default ""; /** * key */ String key() default ""; /** * Key的前缀 */ String prefix() default ""; /** * 给定的时间范围 单位(秒) */ int period(); /** * 一定时间内最多访问次数 */ int count(); /** * 限流的类型(用户自定义key 或者 请求ip) */ LimitType limitType() default LimitType.CUSTOMER; /** * 降级策略:默认快速失败 */ FallbackStrategy fallbackStrategy() default FallbackStrategy.FAIL_FAST; /** * 当降级策略为:回退 时回退处理接口实现名称 */ String fallbackImpl() default ""; }

限流注解处理器

/** * 限流注解处理器 * <li>基于aspectj</li> */ @Slf4j @Aspect @Component public class AnnotationLimitHandler extends AbstractLimitHandler<Object>{ @Around("@annotation(limit)") public Object around(ProceedingJoinPoint joinPoint Limit limit) throws Throwable { MethodSignature signature = (MethodSignature) joinPoint.getSignature(); // 获取被拦截的方法 Method method = signature.getMethod(); // 获取被拦截的类名 String className = signature.getDeclaringType().getName(); // 获取被拦截的方法名 String methodName = method.getName(); String defaultKey = StringUtils.join(className "." methodName); log.info("[AnnotationLimitHandler限流交易请求] 尝试获取方法:{} 执行权限" defaultKey); //组装限流规则 LimitRule limitRule = LimitRule.builder() .defaultKey(defaultKey) .count(limit.count()) .key(limit.key()) .limitType(limit.limitType()) .period(limit.period()) .prefix(limit.prefix()) .fallbackStrategy(limit.fallbackStrategy()) .fallbackImpl(limit.fallbackImpl()) .build(); return handle(limitRule () -> { try { return joinPoint.proceed(); } catch (Throwable throwable) { log.error("[AnnotationLimitHandler限流交易请求] 方法执行异常" throwable); } return null; }); } }5,支持配置文件方式限流

接口限流配置

/** * 接口限流配置 */ @ConfigurationProperties(prefix = LimitConfig.PREFIX) public class LimitConfig { /** 前缀 */ public static final String PREFIX = "limit"; /** 前缀 */ public static final String EXP = "expression"; /** limit是否可用 默认值*/ public static final String ENABLED = "enabled"; /** 默认实现 默认值*/ public static final String DEFAULT_IMPL= "default"; /** PointCut表达式 默认值*/ public static final String DEFAULT_EXP= ""; /** limit是否可用*/ private String enabled = ENABLED; /**实现*/ private String impl = DEFAULT_IMPL; /**PointCut表达式*/ private String expression = DEFAULT_EXP; /**全局限流规则配置*/ @NestedConfigurationProperty private LimitRule limitRule = new LimitRule().init(); /**特殊接口限流规则配置Map*/ private Map<String LimitRule> limitMethodMap = new HashMap<>(5); public String getEnabled() { return enabled; } public void setEnabled(String enabled) { this.enabled = enabled; } public String getImpl() { return impl; } public void setImpl(String impl) { this.impl = impl; } public String getExpression() { return expression; } public void setExpression(String expression) { this.expression = expression; } public LimitRule getLimitRule() { return limitRule; } public void setLimitRule(LimitRule limitRule) { this.limitRule = limitRule; } public Map<String LimitRule> getLimitMethodMap() { return limitMethodMap; } public void setLimitMethodMap(Map<String LimitRule> limitMethodMap) { this.limitMethodMap = limitMethodMap; } }

限流规则配置

/** * 限流规则配置 * <li>单个接口方法限流规则配置</li> */ @Data @Builder @AllArgsConstructor @NoArgsConstructor public class LimitRule { /** 给定的时间范围 单位(秒) 默认值*/ public static final int DEFAULT_PERIOD = 1; /** 一定时间内最多访问次数 默认值*/ public static final int DEFAULT_COUNT = 50; /** * defaultKey */ private String defaultKey = ""; /** * key */ private String key; /** * Key的前缀 */ private String prefix; /** * 给定的时间范围 单位(秒) */ private int period; /** * 一定时间内最多访问次数 */ private int count; /** * 限流类型 */ private LimitType limitType; /** * 降级策略:默认快速失败 */ private FallbackStrategy fallbackStrategy; /** * 当降级策略为:回退 时回退处理接口实现名称 */ private String fallbackImpl; /** * 初始化 * <li></li> * @return: void */ public LimitRule init(){ this.setPeriod(DEFAULT_PERIOD); this.setCount(DEFAULT_COUNT); this.setLimitType(LimitType.CUSTOMER); this.setFallbackStrategy(FallbackStrategy.FAIL_FAST); this.setFallbackImpl(LimitConfig.DEFAULT_IMPL); return this; } }

限流配置处理器

/** * 限流配置处理器 * <li>处理配置方式限流</li> */ @Slf4j public class ConfigLimitHandler extends AbstractLimitHandler<Object> implements MethodInterceptor { @Nullable @Override public Object invoke(@Nonnull MethodInvocation methodInvocation) throws Throwable { // 获取被拦截的类名 String className = methodInvocation.getClass().getName(); // 获取被拦截的方法名 String methodName = methodInvocation.getMethod().getName(); String defaultKey = StringUtils.join(className "." methodName); //获取配置文件中的限流规则 LimitRule limitRule = lockConfig.getLimitRule(); //如果需要特殊处理 if(lockConfig.getLimitMethodMap().containsKey(methodName)){ limitRule = lockConfig.getLimitMethodMap().get(methodName); } limitRule.setDefaultKey(defaultKey); log.info("[ConfigLimitHandler限流交易请求] 尝试获取方法:{} 执行权限" defaultKey); return handle(limitRule () -> { try { return methodInvocation.proceed(); } catch (Throwable throwable) { log.error("[ConfigLimitHandler限流交易请求] 方法执行异常" throwable); } return null; }); } }

限流切面Advisor

/** * 限流切面Advisor * <li></li> */ @Slf4j public class LimitPointcutAdvisor extends AbstractBeanFactoryPointcutAdvisor { @Autowired private LimitConfig lockConfig; @Override public Pointcut getPointcut() { AspectJExpressionPointcut adapterPointcut = new AspectJExpressionPointcut(); //从配置文件中获取PointCut表达式 adapterPointcut.setExpression(lockConfig.getExpression()); return adapterPointcut; } }6,支持限流回退

  • 服务降级回退处理接口默认实现

/** * 服务降级回退处理接口默认实现 * <li></li> */ public class DefaultFallback implements Fallback<Object> { /** * 服务降级处理 * <li></li> * @return: java.lang.Object 返回对象 */ @Override public Object getFallback() { return null; } }7,其他辅助类

  • 降级策略枚举类

/** * 降级策略 * <li></li> */ public enum FallbackStrategy { /** * 快速失败 */ FAIL_FAST /** * 回退 */ FALLBACK; }

  • 限流类型枚举类

/** * 限流类型 * <p>说明:</p> * <li></li> */ public enum LimitType { /** * 自定义key */ CUSTOMER /** * 请求者IP */ IP; }

  • 限流异常类

/** * 限流异常 * <li></li> */ public class LimitException extends RuntimeException{ /**错误码*/ protected String code; public LimitException() { } public LimitException(Throwable ex) { super(ex); } public LimitException(String message) { super(message); } public LimitException(String code String message) { super(message); this.code = code; } public LimitException(String message Throwable ex) { super(message ex); } }

  • 抽象限流处理器

/** * 抽象限流处理器 * <p>说明:</p> * <li></li> */ @Slf4j public abstract class AbstractLimitHandler<R> { @Autowired protected LimitConfig lockConfig; @Autowired private RateLimiter rateLimiter; /** * 限流处理 * <li></li> * @param limitRule: 限流规则 * @param function: 结果提供函数 * @return: R */ protected R handle(LimitRule limitRule Supplier<R> function){ //规则整理 handleLimitRule(limitRule); ImmutableList<String> keys = ImmutableList.of(StringUtils.join(limitRule.getPrefix() limitRule.getKey())); try { if (rateLimiter.tryAcquire(keys limitRule.getCount() limitRule.getPeriod())) { log.info("[限流交易请求] 尝试获取执行权限成功 开始执行目标方法"); return function.get(); }else{ log.info("[限流交易请求] 尝试获取执行权限失败 服务降级处理 降级策略:{}" limitRule.getFallbackStrategy()); if(FallbackStrategy.FAIL_FAST.equals(limitRule.getFallbackStrategy())){ throw new LimitException("访问过于频繁 超出访问限制"); } //获取回退处理接口实现:如果为空则使用默认实现 Fallback<R> fallback = ExtensionLoader.getExtensionLoader(Fallback.class).getExtension(limitRule.getFallbackImpl()); fallback = fallback != null ? fallback : ExtensionLoader.getExtensionLoader(Fallback.class).getDefaultExtension(); return fallback.getFallback(); } } catch (Throwable e) { log.error("[限流服务执行异常]" e); if (e instanceof LimitException) { throw e; } throw new LimitException("限流服务执行异常" e); } } /** * 限流规则整理 * <li></li> * @param limitRule: 限流规则 * @return: void */ private void handleLimitRule(LimitRule limitRule){ //限流规则整理:如果为空则使用全局配置 limitRule.setKey(StringUtils.isBlank(limitRule.getKey()) ? lockConfig.getLimitRule().getKey() : limitRule.getKey()); limitRule.setCount(limitRule.getCount() == 0 ? lockConfig.getLimitRule().getCount() : limitRule.getCount()); limitRule.setFallbackImpl(StringUtils.isBlank(limitRule.getFallbackImpl()) ? lockConfig.getLimitRule().getFallbackImpl() : limitRule.getFallbackImpl()); limitRule.setFallbackStrategy(limitRule.getFallbackStrategy() == null ? lockConfig.getLimitRule().getFallbackStrategy() : limitRule.getFallbackStrategy()); limitRule.setLimitType(limitRule.getLimitType() == null ? lockConfig.getLimitRule().getLimitType() : limitRule.getLimitType()); limitRule.setPeriod(limitRule.getPeriod() == 0 ? lockConfig.getLimitRule().getPeriod() : limitRule.getPeriod()); limitRule.setPrefix(StringUtils.isBlank(limitRule.getPrefix()) ? lockConfig.getLimitRule().getPrefix() : limitRule.getPrefix()); String key = limitRule.getKey(); //根据限流类型获取不同的key 如果不传我们会以方法名作为key switch (limitRule.getLimitType()) { case IP: key = WebUtil.getIpAddress(); break; case CUSTOMER: key = StringUtils.isBlank(key) ? StringUtils.upperCase(limitRule.getDefaultKey()) : key; break; default: key = StringUtils.upperCase(limitRule.getDefaultKey()); } limitRule.setKey(key); } }

  • 工具类

/** * 工具类 * <li></li> */ public class WebUtil { private static final String UNKNOWN = "unknown"; /** * 获取IP * <p>说明:</p> * <li></li> */ public static String getIpAddress() { HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest(); String ip = request.getHeader("x-forwarded-for"); if (ip == null || ip.length() == 0 || UNKNOWN.equalsIgnoreCase(ip)) { ip = request.getHeader("Proxy-Client-IP"); } if (ip == null || ip.length() == 0 || UNKNOWN.equalsIgnoreCase(ip)) { ip = request.getHeader("WL-Proxy-Client-IP"); } if (ip == null || ip.length() == 0 || UNKNOWN.equalsIgnoreCase(ip)) { ip = request.getRemoteAddr(); } return ip; } }

  • 自动配置类

/** * 自动配置类 * <li></li> */ @Slf4j @Configuration @EnableConfigurationProperties(value = LimitConfig.class) @ConditionalOnClass(RateLimiter.class) @ConditionalOnProperty(prefix = LimitConfig.PREFIX value = LimitConfig.ENABLED matchIfMissing = true) public class LimitAutoConfiguration { @Autowired private LimitConfig lockConfig; @Bean @ConditionalOnMissingBean(RateLimiter.class) public RateLimiter createRateLimiter() { log.info("初始化分布式限流对象 实现类名称:{}" lockConfig.getImpl()); RateLimiterHolder.rateLimiter = ExtensionLoader.getExtensionLoader(RateLimiter.class).getExtension(lockConfig.getImpl()); log.info("初始化分布式限流对象成功 实现类:{}" RateLimiterHolder.rateLimiter); return RateLimiterHolder.rateLimiter; } @Bean @ConditionalOnProperty(prefix = LimitConfig.PREFIX value = LimitConfig.EXP) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public LimitPointcutAdvisor adapterServiceAdvisor() { LimitPointcutAdvisor advisor = new LimitPointcutAdvisor(); advisor.setAdviceBeanName("limitPointcutAdvisor"); advisor.setAdvice(createConfigLimitHandler()); advisor.setOrder(Ordered.HIGHEST_PRECEDENCE); return advisor; } @Bean public ConfigLimitHandler createConfigLimitHandler() { return new ConfigLimitHandler(); } @Bean @ConditionalOnMissingBean(LimitPointcutAdvisor.class) public AnnotationLimitHandler createAnnotationLimitHandler() { return new AnnotationLimitHandler(); } }

  • 限流接口对象持有者

/** * 限流接口对象持有者 * <p>说明:</p> * <li></li> */ public class RateLimiterHolder { /** 限流对象*/ static RateLimiter rateLimiter; public static Optional<RateLimiter> getRateLimiter() { return Optional.ofNullable(rateLimiter); } }8,资源文件

  • spring.factories

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.javacoo.limit.client.internal.redis.LimitRedisConfig \ com.javacoo.limit.starter.LimitAutoConfiguration

com.javacoo.limit.client.api.Fallback

default=com.javacoo.limit.client.internal.fallback.DefaultFallback

com.javacoo.limit.client.api.RateLimiter

default=com.javacoo.limit.client.internal.redis.TokenBucketRateLimiter fixedWindow=com.javacoo.limit.client.internal.redis.FixedWindowRateLimiter slidingWindow=com.javacoo.limit.client.internal.redis.SlidingWindowRateLimiter问题及局限性问题

  • 紧经过小规模生产验证,虽满足业务需求,但是还不算成熟,仍需更多测试验证。
  • 仅供学习参考,如需用于生产,请谨慎,并多测试。
  • 推荐使用Spring Clound Gateway,Sentinel等专业流量防护组件 局限性
  • 限流组件保证了高可用,牺牲了性能,增加了一层 IO 环节的开销,单机限流在本地,分布式限流还要通过网络协议。
  • 限流组件保证了高可用,牺牲了一致性,在大流量的情况下,请求的处理会出现延迟的情况,这种场景便无法保证强一致性。特殊情况下,还无法保证最终一致性,部分请求直接被抛弃。
  • 限流组件拥有流控权,若限流组件挂了,会引起雪崩效应,导致请求与业务的大批量失败。
  • 引入限流组件,增加系统的复杂程度,开发难度增加,限流中间件的设计本身就是一个复杂的体系,需要综合业务与技术去思考与权衡,同时还要确保限流组件本身的高可用与性能,极大增加工作量,甚至需要一个团队去专门开发。

猜您喜欢: