前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >如何实现漏桶算法与令牌桶算法

如何实现漏桶算法与令牌桶算法

作者头像
Bug开发工程师
发布2019-07-13 12:48:35
1.6K0
发布2019-07-13 12:48:35
举报
文章被收录于专栏:码农沉思录
代码语言:javascript
复制
作者:大数据孟小鹏(Java架构沉思录做了部分修改)
原文:https://blog.csdn.net/mengxpFighting/article/details/79117934

Java中对于生产者消费者模型,或者小米手机营销(1分钟卖多少台手机)等都存在限流的思想在里面。

关于限流目前存在两大类:从线程并发数角度(jdk1.5 Semaphore)限流和从速率限流(guava)。

Semaphore:从线程并发数限流。

RateLimiter:从速率限流。目前常见的算法是漏桶算法和令牌算法。

令牌桶算法。相比漏桶算法而言区别在于,令牌桶是会去匀速的生成令牌,拿到令牌才能够进行处理,类似于匀速往桶里放令牌。

漏桶算法是:生产者消费者模型,生产者往木桶里生产数据,消费者按照预先定义的速度去消费数据。

应用场景:

漏桶算法:必须读写分离的情况下,限制读取的速度。

令牌桶算法:必须读写分离的情况下,限制写的速率。

实现的方法都是一样的,通过RateLimiter来实现。

对于多线程场景下,很多时候使用的类都是原子性的,但是由于代码逻辑的原因,也可能发生线程安全问题。

1. 关于RateLimter和Semphore简单用法

代码语言:javascript
复制
package concurrent;import com.google.common.util.concurrent.RateLimiter;import java.util.concurrent.*;
import java.util.stream.IntStream;import static java.lang.Thread.currentThread;/**
* ${DESCRIPTION}
* 关于限流 目前存在两大类,从线程个数(jdk1.5 Semaphore)和RateLimiter速率(guava)
* Semaphore:从线程个数限流
* RateLimiter:从速率限流  目前常见的算法是漏桶算法和令牌算法,下面会具体介绍
*
* @author mengxp
* @version 1.0
* @create 2018-01-15 22:44
**/
public class RateLimiterExample {  //Guava  0.5的意思是 1秒中0.5次的操作,2秒1次的操作  从速度来限流,从每秒中能够执行的次数来
   private final static RateLimiter limiter=RateLimiter.create(0.5d);   //同时只能有三个线程工作 Java1.5  从同时处理的线程个数来限流
   private final static Semaphore sem=new Semaphore(3);
   private static void testSemaphore(){
       try {
           sem.acquire();
           System.out.println(currentThread().getName()+" is doing work...");
           TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(10));
       } catch (InterruptedException e) {
           e.printStackTrace();
       }finally {
           sem.release();
           System.out.println(currentThread().getName()+" release the semephore..other thread can get and do job");
       }
   }   public static void runTestSemaphore(){
       ExecutorService service = Executors.newFixedThreadPool(10);
       IntStream.range(0,10).forEach((i)->{
           //RateLimiterExample::testLimiter 这种写法是创建一个线程
           service.submit(RateLimiterExample::testSemaphore);
       });
   }   /**
    * Guava的RateLimiter
    */
   private static void testLimiter(){
       System.out.println(currentThread().getName()+" waiting  " +limiter.acquire());
   }   //Guava的RateLimiter
   public static void runTestLimiter(){
       ExecutorService service = Executors.newFixedThreadPool(10);
       IntStream.range(0,10).forEach((i)->{
           //RateLimiterExample::testLimiter 这种写法是创建一个线程
           service.submit(RateLimiterExample::testLimiter);
       });
   }   public static void main(String[] args) {
       IntStream.range(0,10).forEach((a)-> System.out.println(a));//从0-9
       //runTestLimiter();
       runTestSemaphore();
   }
}

2. 实现漏桶算法

代码语言:javascript
复制
package concurrent.BucketAl;import com.google.common.util.concurrent.Monitor;
import com.google.common.util.concurrent.RateLimiter;import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;import static java.lang.Thread.currentThread;/**
* ${DESCRIPTION}
*
* @author mengxp
* @version 1.0
* @create 2018-01-20 22:42
* 实现漏桶算法 实现多线程生产者消费者模型 限流
**/
public class Bucket {
   //定义桶的大小
   private final ConcurrentLinkedQueue<Integer> container=new ConcurrentLinkedQueue<>();   private final static int  BUCKET_LIMIT=1000;   //消费者 不论多少个线程,每秒最大的处理能力是1秒中执行10次
   private final RateLimiter consumerRate=RateLimiter.create(10d);   //往桶里面放数据时,确认没有超过桶的最大的容量
   private Monitor offerMonitor=new Monitor();   //从桶里消费数据时,桶里必须存在数据
   private Monitor consumerMonitor=new Monitor();   /**
    * 往桶里面写数据
    * @param data
    */
   public void submit(Integer data){
       if (offerMonitor.enterIf(offerMonitor.newGuard(()->container.size()<BUCKET_LIMIT))){
           try {
               container.offer(data);
               System.out.println(currentThread()+" submit.."+data+" container size is :["+container.size()+"]");
           } finally {
               offerMonitor.leave();
           }
       }else {
           //这里时候采用降级策略了。消费速度跟不上产生速度时,而且桶满了,抛出异常
           //或者存入MQ DB等后续处理
           throw new IllegalStateException(currentThread().getName()+"The bucket is ful..Pls latter can try...");
       }
   }   /**
    * 从桶里面消费数据
    * @param consumer
    */
   public void takeThenConsumer(Consumer<Integer> consumer){
       if (consumerMonitor.enterIf(consumerMonitor.newGuard(()->!container.isEmpty()))){
           try {
               //不打印时 写 consumerRate.acquire();
               System.out.println(currentThread()+"  waiting"+consumerRate.acquire());
               Integer data = container.poll();
               //container.peek() 只是去取出来不会删掉
               consumer.accept(data);
           }finally {
               consumerMonitor.leave();
           }
       }else {
           //当木桶的消费完后,可以消费那些降级存入MQ或者DB里面的数据
           System.out.println("will consumer Data from MQ...");
           try {
               TimeUnit.SECONDS.sleep(10);
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
       }
   }}

2.1 漏桶算法测试类

代码语言:javascript
复制
package concurrent.BucketAl;import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;import static java.lang.Thread.currentThread;/**
* ${DESCRIPTION}
*
* @author mengxp
* @version 1.0
* @create 2018-01-20 23:11
* 漏桶算法测试
* 实现漏桶算法 实现多线程生产者消费者模型 限流
**/
public class BuckerTest {   public static void main(String[] args) {
       final Bucket bucket = new Bucket();
       final AtomicInteger DATA_CREATOR = new AtomicInteger(0);       //生产线程 10个线程 每秒提交 50个数据  1/0.2s*10=50个
       IntStream.range(0, 10).forEach(i -> {
           new Thread(() -> {
               for (; ; ) {
                   int data = DATA_CREATOR.incrementAndGet();
                   try {
                       bucket.submit(data);
                       TimeUnit.MILLISECONDS.sleep(200);
                   } catch (Exception e) {
                       //对submit时,如果桶满了可能会抛出异常
                       if (e instanceof IllegalStateException) {
                           System.out.println(e.getMessage());
                           //当满了后,生产线程就休眠1分钟
                           try {
                               TimeUnit.SECONDS.sleep(60);
                           } catch (InterruptedException e1) {
                               e1.printStackTrace();
                           }
                       }
                   }
               }
           }).start();
       });       //消费线程  采用RateLimiter每秒处理10个  综合的比率是5:1
       IntStream.range(0, 10).forEach(i -> {
           new Thread(
                   () -> {
                       for (; ; ) {
                           bucket.takeThenConsumer(x -> {
                               System.out.println(currentThread()+"C.." + x);
                           });
                       }
                   }
           ).start();
       });   }
}

3. 令牌桶算法

代码语言:javascript
复制
package concurrent.TokenBucket;import com.google.common.util.concurrent.RateLimiter;import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;import static java.lang.Thread.currentThread;
import static java.lang.Thread.interrupted;/**
* ${DESCRIPTION}
*
* @author mengxp
* @version 1.0
* @create 2018-01-21 0:18
* 令牌桶算法。相比漏桶算法而言区别在于,令牌桶是会去匀速的生成令牌,拿到令牌才能够进行处理,类似于匀速往桶里放令牌
* 漏桶算法是:生产者消费者模型,生产者往木桶里生产数据,消费者按照定义的速度去消费数据
*
* 应用场景:
* 漏桶算法:必须读写分流的情况下,限制读取的速度
* 令牌桶算法:必须读写分离的情况下,限制写的速率或者小米手机饥饿营销的场景  只卖1分种抢购1000
*
* 实现的方法都是一样。RateLimiter来实现
* 对于多线程问题查找时,很多时候可能使用的类都是原子性的,但是由于代码逻辑的问题,也可能发生线程安全问题
**/
public class TokenBuck {   //可以使用 AtomicInteger+容量  可以不用Queue实现
  private AtomicInteger phoneNumbers=new AtomicInteger(0);
  private RateLimiter rateLimiter=RateLimiter.create(20d);//一秒只能执行五次
  //默认销售500台
  private final static int DEFALUT_LIMIT=500;
  private final int saleLimit;   public TokenBuck(int saleLimit) {
       this.saleLimit = saleLimit;
   }   public TokenBuck() {
       this(DEFALUT_LIMIT);
   }   public int buy(){
       //这个check 必须放在success里面做判断,不然会产生线程安全问题(业务引起)
       //原因当phoneNumbers=99 时 同时存在三个线程进来。虽然phoneNumbers原子性,但是也会发生。如果必须写在这里,在success
       //里面也需要加上double check
      /* if (phoneNumbers.get()>=saleLimit){
           throw new IllegalStateException("Phone has been sale "+saleLimit+" can not  buy more...")
       }*/       //目前设置超时时间,10秒内没有抢到就抛出异常
       //这里的TimeOut*Ratelimiter=总数  这里的超时就是让别人抢几秒,所以设置总数也可以由这里的超时和RateLimiter来计算
        boolean success = rateLimiter.tryAcquire(10, TimeUnit.SECONDS);
        if (success){
            if (phoneNumbers.get()>=saleLimit){
                throw new IllegalStateException("Phone has been sale "+saleLimit+" can not  buy more...");
            }
            int phoneNo = phoneNumbers.getAndIncrement();
            System.out.println(currentThread()+" user has get :["+phoneNo+"]");
            return phoneNo;
        }else {
            //超时后 同一时间,很大的流量来强时,超时快速失败。
            throw new RuntimeException(currentThread()+"has timeOut can try again...");
        }   }
}

3.1 令牌桶算法的测试类

代码语言:javascript
复制
package concurrent.TokenBucket;import java.util.stream.IntStream;/**
* ${DESCRIPTION}
*
* @author mengxp
* @version 1.0
* @create 2018-01-21 0:40
**/
public class TokenBuckTest {
   public static void main(String[] args) {
       final TokenBuck tokenBuck=new TokenBuck(200);       IntStream.range(0,300).forEach(i->{
           //目前测试时,让一个线程抢一次,不用循环抢
           //tokenBuck::buy 这种方式 产生一个Runnable
           new Thread(tokenBuck::buy).start();
       });
   }
}
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2018-10-03,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码农沉思录 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档