redis实现限流的核心思路是利用redis提供的key过期时间作为限流窗口期,key的值记录该窗口期内已经产生的访问资源次数,key本身记录限流的资源范围。
具体步骤如下:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
spring:
redis:
host: xxx
port: 6379
password: xxx
lettuce:
#只有自动配置连接池的依赖,连接池才会生效
pool:
max-active: 8 #最大连接
max-idle: 8 #最大空闲连接
min-idle: 0 #最小空闲连接
max-wait: 100 #连接等待时间
@ConditionalOnMissingBean
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory)
{
//创建template
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
//设置连接工厂
redisTemplate.setConnectionFactory(redisConnectionFactory);
//设置序列化工具
GenericJackson2JsonRedisSerializer jsonRedisSerializer = new GenericJackson2JsonRedisSerializer();
//key和hashKey采用String序列化
redisTemplate.setKeySerializer(RedisSerializer.string());
redisTemplate.setHashKeySerializer(RedisSerializer.string());
//value和hashValue用JSON序列化
redisTemplate.setValueSerializer(jsonRedisSerializer);
redisTemplate.setHashValueSerializer(jsonRedisSerializer);
return redisTemplate;
}
/**
* 流量控制
* @author 大忽悠
* @create 2023/2/6 10:50
*/
public interface RateLimiter {
/**
* @param requestInfo 请求信息
* @return 当前请求是否允许通过
*/
boolean pass(RequestInformation requestInfo);
}
/**
* 请求信息
* @author 大忽悠
* @create 2023/2/6 10:55
*/
@Data
public class RequestInformation {
/**
* 限流key
*/
private String key;
/**
* 限流时间
*/
private int time;
/**
* time时间内最大请求资源次数
*/
private int count;
/**
* 限流类型
*/
private int limitType;
/**
* 请求的方法信息
*/
private Method method;
/**
* 方法参数信息
*/
private Object[] arguments;
/**
* 客户端IP地址
*/
private String ip;
private HttpServletRequest httpServletRequest;
private HttpServletResponse httpServletResponse;
public RequestInformation() {
}
}
/**
* 限流注解
* @author 大忽悠
* @create 2023/2/6 10:39
*/
@Target({ElementType.METHOD,ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Limiter {
/**
* @return 限流key--默认为rate_limit:业务名:类名.方法名 ,如果限制了IP类型,则为: rate_limit:业务名:ip:类名.方法名
*/
String key() default "";
/**
* @return 限流时间,单位为s
*/
int time() default 60;
/**
* @return time时间内限制的资源请求次数
*/
int count() default 100;
/**
* @return 限流类型
*/
int limitType() default LimitType.DEFAULT;
}
public class RedisRateLimiterImpl implements RateLimiter{
private static final String RATE_LIMITER_KEY_PREFIX="rate_limiter";
/**
* 使用redis做限流处理使用的lua脚本
*/
private static final String LIMITER_LUA=
"local key = KEYS[1]\n" +
"local count = tonumber(ARGV[1])\n" +
"local time = tonumber(ARGV[2])\n" +
"local current = redis.call('get', key)\n" +
"if current and tonumber(current) > count then\n" +
" return 1\n" +
"end\n" +
"current = redis.call('incr', key)\n" +
"if tonumber(current) == 1 then\n" +
" redis.call('expire', key, time)\n" +
"end\n" +
"return 0\n";
private RedisTemplate<String, Object> redisTemplate;
public RedisRateLimiterImpl(RedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
}
/**
* @param requestInfo 请求信息
* @return 当前请求是否允许通过
*/
@Override
public boolean pass(RequestInformation requestInfo) {
//拿到限流key
String limiterKey=getRateLimiterKey(requestInfo);
//执行lua脚本
Long limiterRes = redisTemplate.execute(RedisScript.of(LIMITER_LUA,Long.class), List.of(limiterKey), requestInfo.getCount(), requestInfo.getTime());
//判断限流结果
return limiterRes==0L;
}
private String getRateLimiterKey(RequestInformation requestInfo) {
return combineKey(RATE_LIMITER_KEY_PREFIX,
requestInfo.getKey(),
requestInfo.getIp(),
requestInfo.getMethod().getClass().getName(),
requestInfo.getMethod().getName());
}
private String combineKey(String ... keys) {
StringBuilder keyBuilder=new StringBuilder();
for (int i = 0; i < keys.length; i++) {
if(StringUtils.isEmpty(keys[i])){
continue;
}
keyBuilder.append(keys[i]);
if(i==keys.length-1){
continue;
}
keyBuilder.append(":");
}
return keyBuilder.toString();
}
}
lua脚本解释:
KEYS 和 ARGV 都是一会调用时候传进来的参数,tonumber 就是把字符串转为数字,redis.call 就是执行具体的 redis 指令,具体流程是这样:
注意; lua脚本也可以定义在文件在,然后通过加载文件获取
@Bean
public DefaultRedisScript<Long> limitScript() {
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lua/limit.lua")));
redisScript.setResultType(Long.class);
return redisScript;
}
或者在 Redis 服务端定义好 Lua 脚本,然后计算出来一个散列值,在 Java 代码中,通过这个散列值锁定要执行哪个 Lua 脚本
我们需要将限流逻辑在需要流量管控的方法执行前先执行,因此需要拦截目标方法,有两个思路:
这里我采用的是手动编写advisor的方式进行实现,下面演示具体步骤:
/**
* 限流方法拦截器
* @author 大忽悠
* @create 2023/2/6 11:08
*/
@Slf4j
public class RateLimiterMethodInterceptor implements MethodInterceptor {
private final RateLimiter rateLimiter;
public RateLimiterMethodInterceptor(RateLimiter rateLimiter) {
this.rateLimiter=rateLimiter;
}
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
try{
RequestInformation requestInformation = new RequestInformation();
buildMethodInfo(requestInformation,invocation);
buildLimitInfo(requestInformation);
buildRequestInfo(requestInformation);
if (rateLimiter.pass(requestInformation)){
return invocation.proceed();
}
logWarn(requestInformation);
}catch (Exception e){
e.printStackTrace();
throw e;
}
throw new RateLimiterException("访问过于频繁,请稍后再试!");
}
private void logWarn(RequestInformation requestInformation) {
if(requestInformation.getHttpServletRequest()!=null){
log.warn("rateLimiter拦截了一个请求,该请求信息如下: URI: {} ,IP: {} ,方法名: {} ,方法参数信息: {} ",
requestInformation.getHttpServletRequest().getRequestURI(),requestInformation.getIp(),requestInformation.getMethod().getName(),
Arrays.toString(requestInformation.getArguments()));
}else {
log.warn("rateLimiter拦截了一个请求,该请求信息如下: 方法名: {} ,方法参数信息: {} ",
requestInformation.getMethod().getName(), Arrays.toString(requestInformation.getArguments()));
}
}
private void buildLimitInfo(RequestInformation requestInformation) throws RateLimiterException {
Method method = requestInformation.getMethod();
Limiter limiter;
if(method.isAnnotationPresent(Limiter.class)){
limiter = method.getAnnotation(Limiter.class);
}else {
limiter=method.getClass().getAnnotation(Limiter.class);
}
if(limiter==null){
throw new RateLimiterException("无法在当前方法"+method.getName()+"或者类"+method.getClass().getName()+"上寻找到@Limiter注解");
}
requestInformation.setKey(limiter.key());
requestInformation.setCount(limiter.count());
requestInformation.setTime(limiter.time());
requestInformation.setLimitType(limiter.limitType());
}
private void buildMethodInfo(RequestInformation requestInformation, MethodInvocation invocation) {
requestInformation.setMethod(invocation.getMethod());
if(invocation instanceof ReflectiveMethodInvocation){
ReflectiveMethodInvocation reflectiveMethodInvocation = (ReflectiveMethodInvocation) invocation;
requestInformation.setArguments(reflectiveMethodInvocation.getArguments());
}
}
/**
* 从线程上下文中取出请求和响应相关信息
*/
private void buildRequestInfo(RequestInformation requestInformation) {
RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
if(requestAttributes instanceof ServletRequestAttributes){
ServletRequestAttributes sra = (ServletRequestAttributes) requestAttributes;
requestInformation.setHttpServletRequest(sra.getRequest());
requestInformation.setHttpServletResponse(sra.getResponse());
}
if(requestInformation.getHttpServletRequest()!=null && requestInformation.getLimitType()==LimitType.IP){
requestInformation.setIp(IPUtils.getIpAddress(requestInformation.getHttpServletRequest()));
}
}
}
/**
* 限流增强器
*
* @author 大忽悠
* @create 2023/2/6 10:57
*/
public class RateLimiterAdvisor extends AbstractPointcutAdvisor {
private Pointcut pointcut;
private RateLimiterMethodInterceptor rateLimiterMethodInterceptor;
public RateLimiterAdvisor(RateLimiter rateLimiter) {
pointcut = buildPointCut();
rateLimiterMethodInterceptor=new RateLimiterMethodInterceptor(rateLimiter);
}
@Override
public Pointcut getPointcut() {
return pointcut;
}
@Override
public Advice getAdvice() {
return rateLimiterMethodInterceptor;
}
private Pointcut buildPointCut() {
return new Pointcut() {
@Override
public ClassFilter getClassFilter() {
return (c)-> AnnotationUtils.isCandidateClass(c,Limiter.class);
}
@Override
public MethodMatcher getMethodMatcher() {
return new StaticMethodMatcher() {
@Override
public boolean matches(Method method, Class<?> targetClass) {
return method.isAnnotationPresent(Limiter.class) || targetClass.isAnnotationPresent(Limiter.class);
}
};
}
};
}
}
/**
* @author 大忽悠
* @create 2023/2/6 11:14
*/
@Configuration
public class RateLimiterAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public RateLimiterAdvisor rateLimiterAdvisor(RateLimiter rateLimiter) {
return new RateLimiterAdvisor(rateLimiter);
}
@Bean
@ConditionalOnMissingBean
public RateLimiter rateLimiter(RedisTemplate<String, Object> redisTemplate) {
return new RedisRateLimiterImpl(redisTemplate);
}
...
}
采用切面进行实现,可以参考江南一点雨大佬给出的实现:
@Aspect
@Component
public class RateLimiterAspect {
private static final Logger log = LoggerFactory.getLogger(RateLimiterAspect.class);
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
@Autowired
private RedisScript<Long> limitScript;
@Before("@annotation(rateLimiter)")
public void doBefore(JoinPoint point, RateLimiter rateLimiter) throws Throwable {
String key = rateLimiter.key();
int time = rateLimiter.time();
int count = rateLimiter.count();
String combineKey = getCombineKey(rateLimiter, point);
List<Object> keys = Collections.singletonList(combineKey);
try {
Long number = redisTemplate.execute(limitScript, keys, count, time);
if (number==null || number.intValue() > count) {
throw new ServiceException("访问过于频繁,请稍候再试");
}
log.info("限制请求'{}',当前请求'{}',缓存key'{}'", count, number.intValue(), key);
} catch (ServiceException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException("服务器限流异常,请稍候再试");
}
}
public String getCombineKey(RateLimiter rateLimiter, JoinPoint point) {
StringBuffer stringBuffer = new StringBuffer(rateLimiter.key());
if (rateLimiter.limitType() == LimitType.IP) {
stringBuffer.append(IpUtils.getIpAddr(((ServletRequestAttributes) RequestContextHolder.currentRequestAttributes()).getRequest())).append("-");
}
MethodSignature signature = (MethodSignature) point.getSignature();
Method method = signature.getMethod();
Class<?> targetClass = method.getDeclaringClass();
stringBuffer.append(targetClass.getName()).append("-").append(method.getName());
return stringBuffer.toString();
}
}
@ResponseStatus()
@ExceptionHandler(UndeclaredThrowableException.class)
public Result exception(UndeclaredThrowableException e) {
log.error("错误类型为RateLimiterException : "+e);
Throwable cause = e.getCause();
if(cause instanceof RateLimiterException){
RateLimiterException ex= (RateLimiterException)cause;
return Result.error(ex.getMessage(), ex.getMessage());
}
return Result.error(cause.getMessage(),cause.getMessage());
}
@RestController
@RequestMapping("order")
public class OrderController {
@Limiter(time = 20,count = 5,limitType = LimitType.IP)
@GetMapping("/{orderId}")
public Order queryOrderByUserId(@PathVariable("orderId") Long orderId) {
return Order.builder().id(1L).name("大忽悠").userId(3L).price(10L).build();
}
}
每一个 IP 地址,在 20 秒内只能访问5次,大家可以手动测试。
/**
* 限流注解
* @author 大忽悠
* @create 2023/2/6 10:39
*/
@Target({ElementType.METHOD,ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Limiter {
/**
* @return 限流key--默认为rate_limit:业务名:类名.方法名 ,如果限制了IP类型,则为: rate_limit:业务名:ip:类名.方法名
*/
String key() default "";
/**
* @return 限流时间,单位为s
*/
int time() default 60;
/**
* @return time时间内限制的资源请求次数
*/
int count() default 100;
/**
* @return 限流类型
*/
int limitType() default LimitType.DEFAULT;
}
/**
* 限流类型
* @author 大忽悠
* @create 2023/2/6 10:43
*/
public interface LimitType {
/**
* 默认限流类型
*/
int DEFAULT=0;
/**
* 根据IP进行限制
*/
int IP=1;
}
/**
* 流量控制
* @author 大忽悠
* @create 2023/2/6 10:50
*/
public interface RateLimiter {
/**
* @param requestInfo 请求信息
* @return 当前请求是否允许通过
*/
boolean pass(RequestInformation requestInfo);
}
/**
* 限流增强器
*
* @author 大忽悠
* @create 2023/2/6 10:57
*/
public class RateLimiterAdvisor extends AbstractPointcutAdvisor {
private Pointcut pointcut;
private RateLimiterMethodInterceptor rateLimiterMethodInterceptor;
public RateLimiterAdvisor(RateLimiter rateLimiter) {
pointcut = buildPointCut();
rateLimiterMethodInterceptor=new RateLimiterMethodInterceptor(rateLimiter);
}
@Override
public Pointcut getPointcut() {
return pointcut;
}
@Override
public Advice getAdvice() {
return rateLimiterMethodInterceptor;
}
private Pointcut buildPointCut() {
return new Pointcut() {
@Override
public ClassFilter getClassFilter() {
return (c)-> AnnotationUtils.isCandidateClass(c,Limiter.class);
}
@Override
public MethodMatcher getMethodMatcher() {
return new StaticMethodMatcher() {
@Override
public boolean matches(Method method, Class<?> targetClass) {
return method.isAnnotationPresent(Limiter.class) || targetClass.isAnnotationPresent(Limiter.class);
}
};
}
};
}
}
/**
* @author 大忽悠
* @create 2023/2/6 11:14
*/
@Configuration
public class RateLimiterAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public RateLimiterAdvisor rateLimiterAdvisor(RateLimiter rateLimiter) {
return new RateLimiterAdvisor(rateLimiter);
}
@Bean
@ConditionalOnMissingBean
public RateLimiter rateLimiter(RedisTemplate<String, Object> redisTemplate) {
return new RedisRateLimiterImpl(redisTemplate);
}
@ConditionalOnMissingBean
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory)
{
//创建template
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
//设置连接工厂
redisTemplate.setConnectionFactory(redisConnectionFactory);
//设置序列化工具
GenericJackson2JsonRedisSerializer jsonRedisSerializer = new GenericJackson2JsonRedisSerializer();
//key和hashKey采用String序列化
redisTemplate.setKeySerializer(RedisSerializer.string());
redisTemplate.setHashKeySerializer(RedisSerializer.string());
//value和hashValue用JSON序列化
redisTemplate.setValueSerializer(jsonRedisSerializer);
redisTemplate.setHashValueSerializer(jsonRedisSerializer);
return redisTemplate;
}
}
/**
* 限流异常
* @author 大忽悠
* @create 2023/2/6 11:39
*/
public class RateLimiterException extends Exception {
public RateLimiterException(String ex) {
super(ex);
}
}
/**
* 限流方法拦截器
* @author 大忽悠
* @create 2023/2/6 11:08
*/
@Slf4j
public class RateLimiterMethodInterceptor implements MethodInterceptor {
private final RateLimiter rateLimiter;
public RateLimiterMethodInterceptor(RateLimiter rateLimiter) {
this.rateLimiter=rateLimiter;
}
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
try{
RequestInformation requestInformation = new RequestInformation();
buildMethodInfo(requestInformation,invocation);
buildLimitInfo(requestInformation);
buildRequestInfo(requestInformation);
if (rateLimiter.pass(requestInformation)){
return invocation.proceed();
}
logWarn(requestInformation);
}catch (Exception e){
e.printStackTrace();
throw e;
}
throw new RateLimiterException("访问过于频繁,请稍后再试!");
}
private void logWarn(RequestInformation requestInformation) {
if(requestInformation.getHttpServletRequest()!=null){
log.warn("rateLimiter拦截了一个请求,该请求信息如下: URI: {} ,IP: {} ,方法名: {} ,方法参数信息: {} ",
requestInformation.getHttpServletRequest().getRequestURI(),requestInformation.getIp(),requestInformation.getMethod().getName(),
Arrays.toString(requestInformation.getArguments()));
}else {
log.warn("rateLimiter拦截了一个请求,该请求信息如下: 方法名: {} ,方法参数信息: {} ",
requestInformation.getMethod().getName(), Arrays.toString(requestInformation.getArguments()));
}
}
private void buildLimitInfo(RequestInformation requestInformation) throws RateLimiterException {
Method method = requestInformation.getMethod();
Limiter limiter;
if(method.isAnnotationPresent(Limiter.class)){
limiter = method.getAnnotation(Limiter.class);
}else {
limiter=method.getClass().getAnnotation(Limiter.class);
}
if(limiter==null){
throw new RateLimiterException("无法在当前方法"+method.getName()+"或者类"+method.getClass().getName()+"上寻找到@Limiter注解");
}
requestInformation.setKey(limiter.key());
requestInformation.setCount(limiter.count());
requestInformation.setTime(limiter.time());
requestInformation.setLimitType(limiter.limitType());
}
private void buildMethodInfo(RequestInformation requestInformation, MethodInvocation invocation) {
requestInformation.setMethod(invocation.getMethod());
if(invocation instanceof ReflectiveMethodInvocation){
ReflectiveMethodInvocation reflectiveMethodInvocation = (ReflectiveMethodInvocation) invocation;
requestInformation.setArguments(reflectiveMethodInvocation.getArguments());
}
}
/**
* 从线程上下文中取出请求和响应相关信息
*/
private void buildRequestInfo(RequestInformation requestInformation) {
RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
if(requestAttributes instanceof ServletRequestAttributes){
ServletRequestAttributes sra = (ServletRequestAttributes) requestAttributes;
requestInformation.setHttpServletRequest(sra.getRequest());
requestInformation.setHttpServletResponse(sra.getResponse());
}
if(requestInformation.getHttpServletRequest()!=null && requestInformation.getLimitType()==LimitType.IP){
requestInformation.setIp(IPUtils.getIpAddress(requestInformation.getHttpServletRequest()));
}
}
}
public class RedisRateLimiterImpl implements RateLimiter{
private static final String RATE_LIMITER_KEY_PREFIX="rate_limiter";
/**
* 使用redis做限流处理使用的lua脚本
*/
private static final String LIMITER_LUA=
"local key = KEYS[1]\n" +
"local count = tonumber(ARGV[1])\n" +
"local time = tonumber(ARGV[2])\n" +
"local current = redis.call('get', key)\n" +
"if current and tonumber(current) > count then\n" +
" return 1\n" +
"end\n" +
"current = redis.call('incr', key)\n" +
"if tonumber(current) == 1 then\n" +
" redis.call('expire', key, time)\n" +
"end\n" +
"return 0\n";
private RedisTemplate<String, Object> redisTemplate;
public RedisRateLimiterImpl(RedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
}
/**
* @param requestInfo 请求信息
* @return 当前请求是否允许通过
*/
@Override
public boolean pass(RequestInformation requestInfo) {
//拿到限流key
String limiterKey=getRateLimiterKey(requestInfo);
//执行lua脚本
Long limiterRes = redisTemplate.execute(RedisScript.of(LIMITER_LUA,Long.class), List.of(limiterKey), requestInfo.getCount(), requestInfo.getTime());
//判断限流结果
return limiterRes==0L;
}
private String getRateLimiterKey(RequestInformation requestInfo) {
return combineKey(RATE_LIMITER_KEY_PREFIX,
requestInfo.getKey(),
requestInfo.getIp(),
requestInfo.getMethod().getClass().getName(),
requestInfo.getMethod().getName());
}
private String combineKey(String ... keys) {
StringBuilder keyBuilder=new StringBuilder();
for (int i = 0; i < keys.length; i++) {
if(StringUtils.isEmpty(keys[i])){
continue;
}
keyBuilder.append(keys[i]);
if(i==keys.length-1){
continue;
}
keyBuilder.append(":");
}
return keyBuilder.toString();
}
}
/**
* 请求信息
* @author 大忽悠
* @create 2023/2/6 10:55
*/
@Data
public class RequestInformation {
/**
* 限流key
*/
private String key;
/**
* 限流时间
*/
private int time;
/**
* time时间内最大请求资源次数
*/
private int count;
/**
* 限流类型
*/
private int limitType;
/**
* 请求的方法信息
*/
private Method method;
/**
* 方法参数信息
*/
private Object[] arguments;
/**
* 客户端IP地址
*/
private String ip;
private HttpServletRequest httpServletRequest;
private HttpServletResponse httpServletResponse;
public RequestInformation() {
}
}
/**
* @author 大忽悠
* @create 2023/2/6 12:51
*/
public class IPUtils {
/**
* 获取用户真实IP地址,不使用request.getRemoteAddr();的原因是有可能用户使用了代理软件方式避免真实IP地址,
* 参考文章: http://developer.51cto.com/art/201111/305181.htm
*
* 可是,如果通过了多级反向代理的话,X-Forwarded-For的值并不止一个,而是一串IP值,究竟哪个才是真正的用户端的真实IP呢?
* 答案是取X-Forwarded-For中第一个非unknown的有效IP字符串。
*
* 如:X-Forwarded-For:192.168.1.110, 192.168.1.120, 192.168.1.130,
* 192.168.1.100
*
* 用户真实IP为: 192.168.1.110
*
* @param request
* @return
*/
public static String getIpAddress(HttpServletRequest request) {
String ip = request.getHeader("x-forwarded-for");
if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) {
ip = request.getHeader("Proxy-Client-IP");
}
if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) {
ip = request.getHeader("WL-Proxy-Client-IP");
}
if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) {
ip = request.getHeader("HTTP_CLIENT_IP");
}
if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) {
ip = request.getHeader("HTTP_X_FORWARDED_FOR");
}
if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) {
ip = request.getRemoteAddr();
}
return ip;
}
}