专栏首页Java架构沉思录如何实现漏桶算法与令牌桶算法

如何实现漏桶算法与令牌桶算法

作者:大数据孟小鹏(Java架构沉思录做了部分修改)
原文:https://blog.csdn.net/mengxpFighting/article/details/79117934

Java中对于生产者消费者模型,或者小米手机营销(1分钟卖多少台手机)等都存在限流的思想在里面。

关于限流目前存在两大类:从线程并发数角度(jdk1.5 Semaphore)限流和从速率限流(guava)。

Semaphore:从线程并发数限流。

RateLimiter:从速率限流。目前常见的算法是漏桶算法和令牌算法。

令牌桶算法。相比漏桶算法而言区别在于,令牌桶是会去匀速的生成令牌,拿到令牌才能够进行处理,类似于匀速往桶里放令牌。

漏桶算法是:生产者消费者模型,生产者往木桶里生产数据,消费者按照预先定义的速度去消费数据。

应用场景:

漏桶算法:必须读写分离的情况下,限制读取的速度。

令牌桶算法:必须读写分离的情况下,限制写的速率。

实现的方法都是一样的,通过RateLimiter来实现。

对于多线程场景下,很多时候使用的类都是原子性的,但是由于代码逻辑的原因,也可能发生线程安全问题。

1. 关于RateLimter和Semphore简单用法

package concurrent;import com.google.common.util.concurrent.RateLimiter;import java.util.concurrent.*;
import java.util.stream.IntStream;import static java.lang.Thread.currentThread;/**
* ${DESCRIPTION}
* 关于限流 目前存在两大类,从线程个数(jdk1.5 Semaphore)和RateLimiter速率(guava)
* Semaphore:从线程个数限流
* RateLimiter:从速率限流  目前常见的算法是漏桶算法和令牌算法,下面会具体介绍
*
* @author mengxp
* @version 1.0
* @create 2018-01-15 22:44
**/
public class RateLimiterExample {  //Guava  0.5的意思是 1秒中0.5次的操作,2秒1次的操作  从速度来限流,从每秒中能够执行的次数来
   private final static RateLimiter limiter=RateLimiter.create(0.5d);   //同时只能有三个线程工作 Java1.5  从同时处理的线程个数来限流
   private final static Semaphore sem=new Semaphore(3);
   private static void testSemaphore(){
       try {
           sem.acquire();
           System.out.println(currentThread().getName()+" is doing work...");
           TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(10));
       } catch (InterruptedException e) {
           e.printStackTrace();
       }finally {
           sem.release();
           System.out.println(currentThread().getName()+" release the semephore..other thread can get and do job");
       }
   }   public static void runTestSemaphore(){
       ExecutorService service = Executors.newFixedThreadPool(10);
       IntStream.range(0,10).forEach((i)->{
           //RateLimiterExample::testLimiter 这种写法是创建一个线程
           service.submit(RateLimiterExample::testSemaphore);
       });
   }   /**
    * Guava的RateLimiter
    */
   private static void testLimiter(){
       System.out.println(currentThread().getName()+" waiting  " +limiter.acquire());
   }   //Guava的RateLimiter
   public static void runTestLimiter(){
       ExecutorService service = Executors.newFixedThreadPool(10);
       IntStream.range(0,10).forEach((i)->{
           //RateLimiterExample::testLimiter 这种写法是创建一个线程
           service.submit(RateLimiterExample::testLimiter);
       });
   }   public static void main(String[] args) {
       IntStream.range(0,10).forEach((a)-> System.out.println(a));//从0-9
       //runTestLimiter();
       runTestSemaphore();
   }
}

2. 实现漏桶算法

package concurrent.BucketAl;import com.google.common.util.concurrent.Monitor;
import com.google.common.util.concurrent.RateLimiter;import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;import static java.lang.Thread.currentThread;/**
* ${DESCRIPTION}
*
* @author mengxp
* @version 1.0
* @create 2018-01-20 22:42
* 实现漏桶算法 实现多线程生产者消费者模型 限流
**/
public class Bucket {
   //定义桶的大小
   private final ConcurrentLinkedQueue<Integer> container=new ConcurrentLinkedQueue<>();   private final static int  BUCKET_LIMIT=1000;   //消费者 不论多少个线程,每秒最大的处理能力是1秒中执行10次
   private final RateLimiter consumerRate=RateLimiter.create(10d);   //往桶里面放数据时,确认没有超过桶的最大的容量
   private Monitor offerMonitor=new Monitor();   //从桶里消费数据时,桶里必须存在数据
   private Monitor consumerMonitor=new Monitor();   /**
    * 往桶里面写数据
    * @param data
    */
   public void submit(Integer data){
       if (offerMonitor.enterIf(offerMonitor.newGuard(()->container.size()<BUCKET_LIMIT))){
           try {
               container.offer(data);
               System.out.println(currentThread()+" submit.."+data+" container size is :["+container.size()+"]");
           } finally {
               offerMonitor.leave();
           }
       }else {
           //这里时候采用降级策略了。消费速度跟不上产生速度时,而且桶满了,抛出异常
           //或者存入MQ DB等后续处理
           throw new IllegalStateException(currentThread().getName()+"The bucket is ful..Pls latter can try...");
       }
   }   /**
    * 从桶里面消费数据
    * @param consumer
    */
   public void takeThenConsumer(Consumer<Integer> consumer){
       if (consumerMonitor.enterIf(consumerMonitor.newGuard(()->!container.isEmpty()))){
           try {
               //不打印时 写 consumerRate.acquire();
               System.out.println(currentThread()+"  waiting"+consumerRate.acquire());
               Integer data = container.poll();
               //container.peek() 只是去取出来不会删掉
               consumer.accept(data);
           }finally {
               consumerMonitor.leave();
           }
       }else {
           //当木桶的消费完后,可以消费那些降级存入MQ或者DB里面的数据
           System.out.println("will consumer Data from MQ...");
           try {
               TimeUnit.SECONDS.sleep(10);
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
       }
   }}

2.1 漏桶算法测试类

package concurrent.BucketAl;import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;import static java.lang.Thread.currentThread;/**
* ${DESCRIPTION}
*
* @author mengxp
* @version 1.0
* @create 2018-01-20 23:11
* 漏桶算法测试
* 实现漏桶算法 实现多线程生产者消费者模型 限流
**/
public class BuckerTest {   public static void main(String[] args) {
       final Bucket bucket = new Bucket();
       final AtomicInteger DATA_CREATOR = new AtomicInteger(0);       //生产线程 10个线程 每秒提交 50个数据  1/0.2s*10=50个
       IntStream.range(0, 10).forEach(i -> {
           new Thread(() -> {
               for (; ; ) {
                   int data = DATA_CREATOR.incrementAndGet();
                   try {
                       bucket.submit(data);
                       TimeUnit.MILLISECONDS.sleep(200);
                   } catch (Exception e) {
                       //对submit时,如果桶满了可能会抛出异常
                       if (e instanceof IllegalStateException) {
                           System.out.println(e.getMessage());
                           //当满了后,生产线程就休眠1分钟
                           try {
                               TimeUnit.SECONDS.sleep(60);
                           } catch (InterruptedException e1) {
                               e1.printStackTrace();
                           }
                       }
                   }
               }
           }).start();
       });       //消费线程  采用RateLimiter每秒处理10个  综合的比率是5:1
       IntStream.range(0, 10).forEach(i -> {
           new Thread(
                   () -> {
                       for (; ; ) {
                           bucket.takeThenConsumer(x -> {
                               System.out.println(currentThread()+"C.." + x);
                           });
                       }
                   }
           ).start();
       });   }
}

3. 令牌桶算法

package concurrent.TokenBucket;import com.google.common.util.concurrent.RateLimiter;import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;import static java.lang.Thread.currentThread;
import static java.lang.Thread.interrupted;/**
* ${DESCRIPTION}
*
* @author mengxp
* @version 1.0
* @create 2018-01-21 0:18
* 令牌桶算法。相比漏桶算法而言区别在于,令牌桶是会去匀速的生成令牌,拿到令牌才能够进行处理,类似于匀速往桶里放令牌
* 漏桶算法是:生产者消费者模型,生产者往木桶里生产数据,消费者按照定义的速度去消费数据
*
* 应用场景:
* 漏桶算法:必须读写分流的情况下,限制读取的速度
* 令牌桶算法:必须读写分离的情况下,限制写的速率或者小米手机饥饿营销的场景  只卖1分种抢购1000
*
* 实现的方法都是一样。RateLimiter来实现
* 对于多线程问题查找时,很多时候可能使用的类都是原子性的,但是由于代码逻辑的问题,也可能发生线程安全问题
**/
public class TokenBuck {   //可以使用 AtomicInteger+容量  可以不用Queue实现
  private AtomicInteger phoneNumbers=new AtomicInteger(0);
  private RateLimiter rateLimiter=RateLimiter.create(20d);//一秒只能执行五次
  //默认销售500台
  private final static int DEFALUT_LIMIT=500;
  private final int saleLimit;   public TokenBuck(int saleLimit) {
       this.saleLimit = saleLimit;
   }   public TokenBuck() {
       this(DEFALUT_LIMIT);
   }   public int buy(){
       //这个check 必须放在success里面做判断,不然会产生线程安全问题(业务引起)
       //原因当phoneNumbers=99 时 同时存在三个线程进来。虽然phoneNumbers原子性,但是也会发生。如果必须写在这里,在success
       //里面也需要加上double check
      /* if (phoneNumbers.get()>=saleLimit){
           throw new IllegalStateException("Phone has been sale "+saleLimit+" can not  buy more...")
       }*/       //目前设置超时时间,10秒内没有抢到就抛出异常
       //这里的TimeOut*Ratelimiter=总数  这里的超时就是让别人抢几秒,所以设置总数也可以由这里的超时和RateLimiter来计算
        boolean success = rateLimiter.tryAcquire(10, TimeUnit.SECONDS);
        if (success){
            if (phoneNumbers.get()>=saleLimit){
                throw new IllegalStateException("Phone has been sale "+saleLimit+" can not  buy more...");
            }
            int phoneNo = phoneNumbers.getAndIncrement();
            System.out.println(currentThread()+" user has get :["+phoneNo+"]");
            return phoneNo;
        }else {
            //超时后 同一时间,很大的流量来强时,超时快速失败。
            throw new RuntimeException(currentThread()+"has timeOut can try again...");
        }   }
}

3.1 令牌桶算法的测试类

package concurrent.TokenBucket;import java.util.stream.IntStream;/**
* ${DESCRIPTION}
*
* @author mengxp
* @version 1.0
* @create 2018-01-21 0:40
**/
public class TokenBuckTest {
   public static void main(String[] args) {
       final TokenBuck tokenBuck=new TokenBuck(200);       IntStream.range(0,300).forEach(i->{
           //目前测试时,让一个线程抢一次,不用循环抢
           //tokenBuck::buy 这种方式 产生一个Runnable
           new Thread(tokenBuck::buy).start();
       });
   }
}

本文分享自微信公众号 - Java架构沉思录(code-thinker),作者:大数据孟小鹏

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2018-10-03

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Java效率工具Lombok使用及原理

    在过往的Java项目中,充斥着太多不友好的代码:POJO的getter/setter/toString;异常处理;I/O流的关闭操作等等,这些样板代码既没有技术...

    黄泽杰
  • 还在用JDK8?我都开始上手JDK 13了!

    目标:提高应用程序类 - 数据共享(AppCDS)的可用性。消除了用户进行试运行以创建每个应用程序的类列表的需要。

    黄泽杰
  • 分库分表就能无限扩容吗

    像我这样的菜鸟,总会有各种疑问,刚开始是对 JDK API 的疑问,对 NIO 的疑问,对 JVM 的疑问,当工作几年后,对服务的可用性,可扩展性也有了新的疑问...

    黄泽杰
  • Python实现固定效应回归模型实现因果关系推断

    众所周知,“相关并不意味着因果关系”。我要告诉你,相关可以表示因果关系,但需要一定条件。这些条件已在计量经济学文献中被广泛讨论。在本文中,我将以一种易于理解的方...

    deephub
  • java读取excel

    week
  • Java基础-HelloWorld

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 ...

    cwl_java
  • Win7 Eclipse 搭建spark java1.8编译环境,JavaRDD的helloworld例子

    Win7 Eclipse 搭建spark java1.8编译环境,JavaRDD的helloworld例子:

    马克java社区
  • 用Python围观垃圾分类是什么回事

    纸巾再湿也是干垃圾?瓜子皮再干也是湿垃圾??最近大家都被垃圾分类折磨的不行,傻傻的你是否拎得清????自2019.07.01开始,上海已率先实施垃圾分类制度,违...

    叫我龙总
  • Python知识点总结篇(五)

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

    村雨遥
  • 业界 | 英特尔9代酷睿CPU正式发布:制程不变,超线程被砍

    在英特尔今天发布的众多公告中,最重要的一条就是该公司发布了第九代酷睿处理器,在英特尔主流消费平台上提供多达 8 个内核。该处理器与当前的 Coffee Lake...

    机器之心

扫码关注云+社区

领取腾讯云代金券