基于Redis实现分布式应用限流的方法

 更新时间:2017年12月15日 10:30:49   作者:冷冷gg  
本篇文章主要介绍了基于 Redis 实现分布式应用限流的方法,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧

限流的目的是通过对并发访问/请求进行限速或者一个时间窗口内的的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务。

前几天在DD的公众号,看了一篇关于使用 瓜娃 实现单应用限流的方案 --》原文,参考《redis in action》 实现了一个jedis版本的,都属于业务层次限制。 实际场景中常用的限流策略:

Nginx接入层限流

按照一定的规则如帐号、IP、系统调用逻辑等在Nginx层面做限流

业务应用系统限流

通过业务代码控制流量这个流量可以被称为信号量,可以理解成是一种锁,它可以限制一项资源最多能同时被多少进程访问。

代码实现

import redis.clients.jedis.Jedis;
import redis.clients.jedis.Transaction;
import redis.clients.jedis.ZParams;
import java.util.List;
import java.util.UUID;

/**
 * @email wangiegie@gmail.com
 * @data 2017-08
 */
public class RedisRateLimiter {
  private static final String BUCKET = "BUCKET";
  private static final String BUCKET_COUNT = "BUCKET_COUNT";
  private static final String BUCKET_MONITOR = "BUCKET_MONITOR";

  static String acquireTokenFromBucket(
      Jedis jedis, int limit, long timeout) {
    String identifier = UUID.randomUUID().toString();
    long now = System.currentTimeMillis();
    Transaction transaction = jedis.multi();

    //删除信号量
    transaction.zremrangeByScore(BUCKET_MONITOR.getBytes(), "-inf".getBytes(), String.valueOf(now - timeout).getBytes());
    ZParams params = new ZParams();
    params.weightsByDouble(1.0,0.0);
    transaction.zinterstore(BUCKET, params, BUCKET, BUCKET_MONITOR);

    //计数器自增
    transaction.incr(BUCKET_COUNT);
    List<Object> results = transaction.exec();
    long counter = (Long) results.get(results.size() - 1);

    transaction = jedis.multi();
    transaction.zadd(BUCKET_MONITOR, now, identifier);
    transaction.zadd(BUCKET, counter, identifier);
    transaction.zrank(BUCKET, identifier);
    results = transaction.exec();
    //获取排名,判断请求是否取得了信号量
    long rank = (Long) results.get(results.size() - 1);
    if (rank < limit) {
      return identifier;
    } else {//没有获取到信号量,清理之前放入redis 中垃圾数据
      transaction = jedis.multi();
      transaction.zrem(BUCKET_MONITOR, identifier);
      transaction.zrem(BUCKET, identifier);
      transaction.exec();
    }
    return null;
  }
}

调用

测试接口调用

@GetMapping("/")
public void index(HttpServletResponse response) throws IOException {
  Jedis jedis = jedisPool.getResource();
  String token = RedisRateLimiter.acquireTokenFromBucket(jedis, LIMIT, TIMEOUT);
  if (token == null) {
    response.sendError(500);
  }else{
    //TODO 你的业务逻辑
  }
  jedisPool.returnResource(jedis);
}

优化

使用拦截器 + 注解优化代码

拦截器

@Configuration
static class WebMvcConfigurer extends WebMvcConfigurerAdapter {
  private Logger logger = LoggerFactory.getLogger(WebMvcConfigurer.class);
  @Autowired
  private JedisPool jedisPool;

  public void addInterceptors(InterceptorRegistry registry) {
    registry.addInterceptor(new HandlerInterceptorAdapter() {
      public boolean preHandle(HttpServletRequest request, HttpServletResponse response,
                   Object handler) throws Exception {
        HandlerMethod handlerMethod = (HandlerMethod) handler;
        Method method = handlerMethod.getMethod();
        RateLimiter rateLimiter = method.getAnnotation(RateLimiter.class);
        if (rateLimiter != null){
          int limit = rateLimiter.limit();
          int timeout = rateLimiter.timeout();
          Jedis jedis = jedisPool.getResource();
          String token = RedisRateLimiter.acquireTokenFromBucket(jedis, limit, timeout);
          if (token == null) {
            response.sendError(500);
            return false;
          }
          logger.debug("token -> {}",token);
          jedis.close();
        }
        return true;
      }
    }).addPathPatterns("/*");
  }
}

定义注解

/**
 * @email wangiegie@gmail.com
 * @data 2017-08
 * 限流注解
 */

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RateLimiter {
  int limit() default 5;
  int timeout() default 1000;
}

使用

@RateLimiter(limit = 2, timeout = 5000)
@GetMapping("/test")
public void test() {
}

并发测试

工具:apache-jmeter-3.2

说明: 没有获取到信号量的接口返回500,status是红色,获取到信号量的接口返回200,status是绿色。

当限制请求信号量为2,并发5个线程:

image

当限制请求信号量为5,并发10个线程:

image

资料

基于reids + lua的实现

总结

  1. 对于信号量的操作,使用事务操作。
  2. 不要使用时间戳作为信号量的排序分数,因为在分布式环境中,各个节点的时间差的原因,会出现不公平信号量的现象。
  3. 可以使用把这块代码抽成@rateLimiter注解,然后再方法上使用就会很方便啦
  4. 不同接口的流控,可以参考源码的里面RedisRateLimiterPlus,无非是每个接口生成一个监控参数
  5. 源码:boding1-pig-cloud-jb51.rar

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

相关文章

  • SpringBoot集成Aviator实现参数校验的示例代码

    SpringBoot集成Aviator实现参数校验的示例代码

    在实际开发中,参数校验是保障系统稳定和数据可靠性的重要措施,Aviator 是一个高性能的表达式引擎,它能够简化复杂的逻辑判断并提升参数校验的灵活性,本文将介绍如何在 Spring Boot 中集成 Aviator,并利用它来实现灵活的参数校验,需要的朋友可以参考下
    2025-02-02
  • Spring Boot 整合 Druid过程解析

    Spring Boot 整合 Druid过程解析

    这篇文章主要介绍了Spring Boot 整合 Druid过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-11-11
  • maven 打包时间戳问题

    maven 打包时间戳问题

    这篇文章主要介绍了maven 打包时间戳问题,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-01-01
  • 解决IDEA中快捷键Alt+Enter不能使用的问题

    解决IDEA中快捷键Alt+Enter不能使用的问题

    这篇文章主要介绍了解决IDEA中快捷键Alt+Enter不能使用的问题,本文通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-12-12
  • 详解Java类库的概念以及import的使用方法

    详解Java类库的概念以及import的使用方法

    这篇文章主要介绍了详解Java类库的概念以及import的使用方法,是Java入门学习中的基础知识,需要的朋友可以参考下
    2015-09-09
  • 如何在Java中使用WebSocket协议

    如何在Java中使用WebSocket协议

    WebSocket是一种基于 TCP 协议的全双工通信协议,可以在浏览器和服务器之间建立实时、双向的数据通信,下面这篇文章主要给大家介绍了关于如何在Java中使用WebSocket协议的相关资料,需要的朋友可以参考下
    2024-02-02
  • SpringBoot使用Cache集成Redis做缓存的保姆级教程

    SpringBoot使用Cache集成Redis做缓存的保姆级教程

    Spring Cache是Spring框架提供的一个缓存抽象层,它简化了缓存的使用和管理,Spring Cache默认使用服务器内存,并无法控制缓存时长,查找缓存中的数据比较麻烦,本文已常用的Redis作为缓存中间件作为示例,详细讲解项目中如何使用Cache提高系统性能,需要的朋友可以参考下
    2025-01-01
  • 详解Java字符型常量和字符串常量的区别

    详解Java字符型常量和字符串常量的区别

    Java 中的字符型常量和字符串常量是两种不同的数据类型,本文将给大家详细介绍一下Java字符型常量和字符串常量的区别,文中通过代码讲解的非常详细,需要的朋友可以参考下
    2023-10-10
  • SpringMVC框架中使用Filter实现请求日志打印方式

    SpringMVC框架中使用Filter实现请求日志打印方式

    这篇文章主要介绍了SpringMVC框架中使用Filter实现请求日志打印方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-10-10
  • es创建索引和mapping的实例

    es创建索引和mapping的实例

    这篇文章主要介绍了es创建索引和mapping的实例,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-02-02

最新评论