前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >高性能队列Disruptor在测试中应用

高性能队列Disruptor在测试中应用

作者头像
FunTester
发布于 2022-02-08 05:16:01
发布于 2022-02-08 05:16:01
83400
代码可运行
举报
文章被收录于专栏:FunTesterFunTester
运行总次数:0
代码可运行

最近在研究goreplay的源码的过程中,感觉有些思路还是很值得借鉴。所以自己立了一个flag,实现一个千万级日志回放功能。但是在这个实现的过程中遇到一个棘手的问题:Java自带的LinkedBlockingQueue比较难以直接满足需求场景和性能要求。

熟悉goreplay的测友应该清楚Go语言chanel在goreplay这个框架中应用是十分广泛的,加上Go语言自身较高的性能,可以说双剑合并。所以我也想照葫芦画瓢写一个类似思路的实现。这个后面会有专题讲这个。

基于此,我搜到了Disruptor这个高性能队列。Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单。

测试使用Disruptor时候不用像Springboot框架中那样,创建各类对象,抽象各种对象方法,我的原则就是怎么简单怎么来,下面分享一下Disruptor在测试中的基础实践和简单案例演示。

依赖

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// https://mvnrepository.com/artifact/com.lmax/disruptor
implementation group: 'com.lmax', name: 'disruptor', version: '3.4.2'

只列出了Gradle的,版本建议使用3+,有些Lambda语法支持需要。

Event对象

首先我们要定义一个Event类型,当然也可以直接使用类似java.lang.String使用已经存在的类,但是在设置Event对象时候,需要使用new关键字以及构造新的Event时,使用set方法总比直接=赋值的一致性更好一些。单独写一个Event类,可以更加简单,是的代码逻辑性更强,不用收到其他类的影响。

这里我定义了简单的Event类:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    public static class FunEvent {

        String id;

        public String getId() {
            return id;
        }

        public void setId(String id) {
            this.id = id;
        }

    }

Disruptor创建

Disruptor对象创建首先需要我们定义一个Event类型,调用构造方法,参数有五个。1.Event初始化方法;2.ringbuffsize,这里设置需要是2的整数倍;3.threadfactory,创建线程的工厂类,这里我用了com.funtester.frame.execute.ThreadPoolUtil#getFactory();4.生产者模式,分成单生产者和多生产者枚举类;5.等待策略,官方提供了一些实现类供选择,我使用了com.lmax.disruptor.YieldingWaitStrategy

创建方法如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
        Disruptor<FunEvent> disruptor = new Disruptor<FunEvent>(
                FunEvent::new,
                1024 * 1024,
                ThreadPoolUtil.getFactory(),
                ProducerType.MULTI,
                new YieldingWaitStrategy()
        );

生产者

对于消息队列来讲,需要两个重要的角色,生产者和消费者。这里先将一下Disruptor生产者,我搜到不少资料,都是需要创建一个生产者的类,然后实现一个方法,这个方法内容基本一致的,内容如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
           long sequence = ringBuffer.next();
            try {
                FunEvent funEvent = ringBuffer.get(sequence);
                funEvent.setId(orderId);
            } finally {
                ringBuffer.publish(sequence);
            }

然后使用生产者对象调用这个方法,我觉得有点多此一举了,幸好有一篇文章介绍了Disruptor一些新特性的时候提到支持了Lambda语法,这下就可以不用创建生产者对象了。语法如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
                ringBuffer.publishEvent((Event, sequence) -> Event.setId(StringUtil.getString(10)));

消费者

消费者创建需要实现两个接口com.lmax.disruptor.EventHandlercom.lmax.disruptor.WorkHandler,这俩一个是处理单消费者模式,另外一个多消费者模式。

创建方法如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    /**
     * 消费者
     */
    private static class FunEventHandler implements EventHandler<FunEvent>, WorkHandler<FunEvent> {

        public void onEvent(FunEvent Event, long sequence, boolean endOfBatch) {
            output("消费消息:" + Event.getId() + TAB + sequence);
        }

        public void onEvent(FunEvent Event) {
            output("消费消息:" + Event.getId());
        }

    }

配置handler

这里分两类:配置单个消费者和配置多个消费者。

单个消费者:

disruptor.handleEventsWith(new FunEventHandler());

多个消费者:

disruptor.handleEventsWithWorkerPool(new FunEventHandler(), new FunEventHandler());

不管是单个还是多个,每次调用都会产生一个com.lmax.disruptor.dsl.EventHandlerGroup,每个com.lmax.disruptor.dsl.EventHandlerGroup都会完整消费每一个生产者产生的Event,如果设置了5次,那么一个Event就会被消费5次,每个com.lmax.disruptor.dsl.EventHandlerGroup消费一次,而且是阻塞的,加入某一个com.lmax.disruptor.dsl.EventHandlerGroup对象消费慢了,会阻塞其他消费者消费下一个Event。

启动

组装完成之后就可以启动了Disruptor了,语法如下:disruptor.start();,关闭语法disruptor.shutdown();,此处的关闭不会清空getRingBuffer已经存在的Event,看官方文档应该是停止生产,然后等待消费。

演示Demo

Java版本

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    public static void main(String[] args) {
        Disruptor<FunEvent> disruptor = new Disruptor<FunEvent>(
                FunEvent::new,
                1024 * 1024,
                ThreadPoolUtil.getFactory(),
                ProducerType.MULTI,
                new YieldingWaitStrategy()
        );
        disruptor.handleEventsWithWorkerPool(new FunEventHandler(), new FunEventHandler());
        disruptor.handleEventsWith(new FunEventHandler());
        disruptor.start();
        RingBuffer<FunEvent> ringBuffer = disruptor.getRingBuffer();
        for (int i = 0; i < 3; i++) {
            ringBuffer.publishEvent((event, sequence) -> event.setId(StringUtil.getString(10)));
        }
        sleep(5.0);
        disruptor.shutdown();

    }

控制台输出:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
INFO-> main 当前用户:oker,工作目录:/Users/oker/IdeaProjects/funtester/,系统编码格式:UTF-8,系统Mac OS X版本:10.16
INFO-> main 
  ###### #     #  #    # ####### ######  #####  ####### ######  #####
  #      #     #  ##   #    #    #       #         #    #       #    #
  ####   #     #  # #  #    #    ####    #####     #    ####    #####
  #      #     #  #  # #    #    #            #    #    #       #   #
  #       #####   #    #    #    ######  #####     #    ######  #    #

INFO-> F-3  消费消息:i3OrH2ZnxD 0
INFO-> F-1  消费消息:i3OrH2ZnxD
INFO-> F-2  消费消息:whhoxoMxmR
INFO-> F-3  消费消息:whhoxoMxmR 1
INFO-> F-2  消费消息:IeP9fIRpKp
INFO-> F-3  消费消息:IeP9fIRpKp 2

Process finished with exit code 0

可以看到,每个消息会消费了两次。其中F-3线程消费量=F-1和F-2线程消费量总和,这就跟家理解了com.lmax.disruptor.dsl.EventHandlerGroup的功能。

Groovy+异步版本

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    public static void main(String[] args) {
        Disruptor<FunEvent> disruptor = new Disruptor<FunEvent>(
                FunEvent::new,
                1024 * 1024,
                ThreadPoolUtil.getFactory(),
                ProducerType.MULTI,
                new YieldingWaitStrategy()
        )
        disruptor.handleEventsWithWorkerPool(new FunEventHandler(), new FunEventHandler())
        disruptor.handleEventsWith(new FunEventHandler())
        disruptor.start()
        RingBuffer<FunEvent> ringBuffer = disruptor.getRingBuffer();
        def funtester = {
            fun {
                100.times {ringBuffer.publishEvent((event, sequence) -> event.setId(StringUtil.getString(10)));}
            }
        }
        10.times {funtester()}
        sleep(5.0)
        disruptor.shutdown()
    }
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-12-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 FunTester 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
使用Disruptor完成多个消费者不重复消费消息
上一篇https://blog.csdn.net/tianyaleixiaowu/article/details/79787377里讲了Disruptor完成多个消费者并行、顺序重复消费Event。重复消费类似于kafka中,同一个topic被不同的group的消费者消费。这样的场景比较常见。当然更常见的场景是不重复消费,也就是一个消息只能被消费一次。
天涯泪小武
2019/01/17
3.8K0
深入java多线程与高并发:JMH与Disruptor,确定能学会?
今天我们讲两个内容,第一个是JMH,第二个是Disruptor。这两个内容是给大家做更进一步的这种多线程和高并发的一些专业上的处理。生产环境之中我们很可能不自己定义消息队列,而是使用
愿天堂没有BUG
2022/10/28
7080
深入java多线程与高并发:JMH与Disruptor,确定能学会?
Disruptor
GitHub - LMAX-Exchange/disruptor: High Performance Inter-Thread Messaging Library
阿超
2024/09/01
1020
从构建分布式秒杀系统聊聊Disruptor高性能队列
前言 秒杀架构持续优化中,基于自身认知不足之处在所难免,也请大家指正,共同进步。文章标题来自码友<tukangzheng>的建议,希望可以把阻塞队列ArrayBlockingQueue这个队列替换成Disruptor,由于之前曾接触过这个东西,听说很不错,正好借此机会整合进来。 简介 LMAX Disruptor是一个高性能的线程间消息库。它源于LMAX对并发性,性能和非阻塞算法的研究,如今构成了Exchange基础架构的核心部分。 Disruptor它是一个开源的并发框架,并获得2011 Duke’s 程
小柒2012
2018/06/07
1.4K0
Java&Go高性能队列之Disruptor性能测试
之前写过Java&Go高性能队列之LinkedBlockingQueue性能测试之后,就一直准备这这篇文章,作为准备内容的过程中也写过一些Disruptor高性能消息队列的应用文章:高性能队列Disruptor在测试中应用和千万级日志回放引擎设计稿。
FunTester
2022/04/01
8910
disruptor (史上最全)[通俗易懂]
文章很长,而且持续更新,建议收藏起来,慢慢读!疯狂创客圈总目录 语雀版 | 总目录 码云版| 总目录 博客园版 为您奉上珍贵的学习资源 :
全栈程序员站长
2022/09/07
1.5K0
disruptor (史上最全)[通俗易懂]
性能测试中Disruptor框架shutdown失效的问题分享
在基于Disruptor开发新的性能测试QPS模型时候,中间遇到了很多问题,踩了很多坑。今天就分享一个比较典型的问题:shutdown失效。
FunTester
2022/04/01
5070
高性能队列Disruptor的使用
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/nuaazhaofeng/article/details/72918467
天涯泪小武
2019/07/01
1.7K0
高性能队列Disruptor的使用
并发编程之Disruptor
一、Disruptor是什么 Disruptor是一个高性能的异步处理框架,或者可以认为是最快的消息框架(轻量的JMS),也可以认为是一个观察者模式实现,或者事件-监听模式的实现,直接称disruptor模式。 Disruptor最大特点是高性能,它被设计用于在生产者—消费者问题(producer-consumer problem,简称PCP)上获得尽量高的吞吐量(TPS,Transaction Per Second))和尽量低的延迟。Disruptor是LMAX在线交易平台的关键组成部分,LMAX平台使
lyb-geek
2018/03/27
2.4K1
并发编程之Disruptor
两个例子带你入门 Disruptor
Disruptor 是英国外汇交易公司 LMAX 开发的一个高性能队列。很多知名开源项目里,比如 canal 、log4j2、 storm 都是用了 Disruptor 以提升系统性能 。
勇哥java实战
2023/09/19
3870
高性能无锁并发框架 Disruptor,太强了!
Disruptor是一个开源框架,研发的初衷是为了解决高并发下队列锁的问题,最早由LMAX提出并使用,能够在无锁的情况下实现队列的并发操作,并号称能够在一个线程里每秒处理6百万笔订单
Java技术栈
2020/09/22
3.4K0
高性能无锁并发框架 Disruptor,太强了!
千万级日志回放引擎设计稿
现在压测系统一直用的方案是goreplay进行二次开发完成的。因为整体是Java技术栈的,使用goreplay有存在两方面问题:一是兼容性,语言和开发框架上,增加了用例创建执行的复杂度;二是维护成本,goreplay二次开发方案已经无法满足现在的性能测试需求。如果维护两套压测引擎会带来更多工作量。
FunTester
2022/02/08
6041
千万级日志回放引擎设计稿
并发框架disruptor_ringbuffer的常规用法
指向了位置4,然后返回3。这样,线程D就获得了位置3的操作权限。 * 接着,另一个线程E做类似以上的操作 * 提交写入 * 以上,线程D和线程E都可以同时线程安全的往各自负责的区块(或位置,slots)写入数据。但是,我们可以讨论一下线程E先完成任务的场景…
全栈程序员站长
2022/09/30
5010
并发框架disruptor_ringbuffer的常规用法
高吞吐框架Disruptor应用场景
多年前在并发编程网http://ifeve.com/disruptor了解到了自认为是黑科技的并发框架DISRUPTOR, 我当时在想NETTY为什么没有和它整合。后来了解过的log4j2, jstorm也慢慢有用到, 而一直以来也并没有机会去使用和了解细节, 大多时候觉得Doug Lea的JDK并发包也足够使用。而近期业务需要基于NETTY简单裹了一个类似vertx的luoying-server https://github.com/zealzeng/luoying-server 业务处理线程不适合在event loop中处理, 简单用有界的ThreadPoolExecutor作为worker pool, 想考虑把disruptor整合进来, 看了两天发觉对disruptor的使用场景产生了误解。
Zeal
2020/11/11
5K0
Disruptor高性能缓存队列入门指导
Disruptor是什么,怎么使用,网上有很多教材,但有些过于复杂,剖析了Disruptor的方方面面,实际上对应普通的开发人员,使用这个工具,只需要指导知道大概原理和使用方法,并不需要知道非常深入的原理。
源哥
2018/08/28
7220
Disruptor高性能缓存队列入门指导
秒级达百万高并发框架-Disruptor
Disruptor是一个高性能的并发框架,主要应用于创建具有高吞吐量、低延迟、无锁(lock-free)的数据结构和事件处理系统。它最初由LMAX公司开发的,已经成为了业界广泛使用的高性能并发框架。
逍遥壮士
2023/09/01
1.5K0
秒级达百万高并发框架-Disruptor
Disruptor—核心概念及体验
https://github.com/LMAX-Exchange/disruptor/wiki/Introduction
luozhiyun
2019/10/07
1.2K0
disruptor笔记之五:事件消费实战
本篇是《disruptor笔记》的第五篇,前文《disruptor笔记之四:事件消费知识点小结》从理论上梳理分析了独立消费和共同消费,留下了三个任务,今天就来成这些任务,即编码实现以下三个场景:
程序员欣宸
2021/11/22
3160
disruptor笔记之五:事件消费实战
Disruptor简单使用
  Disruptor从功能上来说,可以实现队列的功能,也可以把它当成单机版的JMS来看待。从性能上来说,它比ArrayBlockingQueue有更好的性能表现,对于生产者消费者模型的业务,Disruptor是一个更好的选择可以很好的实现业务的分离。
良辰美景TT
2019/03/29
8570
Disruptor简单使用
Disruptor详解
LMAX是一种新型零售金融交易平台,它能够以很低的延迟产生大量交易。这个系统是建立在JVM平台上,其核心是一个业务逻辑处理器,它能够在一个线程里每秒处理6百万订单。业务逻辑处理器完全是运行在内存中,使用事件源驱动方式。业务逻辑处理器的核心是Disruptor。 Disruptor它是一个开源的并发框架,并获得2011 Duke’s 程序框架创新奖,能够在无锁的情况下实现网络的Queue并发操作。 Disruptor是一个高性能的异步处理框架,或者可以认为是最快的消息框架(轻量的JMS),也可以认为是一个观察者模式的实现,或者事件监听模式的实现。 在Disruptor中,我们想实现hello world 需要如下几步骤: 第一:建立一个Event类 第二:建立一个工厂Event类,用于创建Event类实例对象 第三:需要有一个监听事件类,用于处理数据(Event类) 第四:我们需要进行测试代码编写。实例化Disruptor实例,配置一系列参数。然后我们对Disruptor实例绑定监听事件类,接受并处理数据。 第五:在Disruptor中,真正存储数据的核心叫做RingBuffer,我们通过Disruptor实例拿到它,然后把数据生产出来,把数据加入到RingBuffer的实例对象中即可。 Event类:数据封装类
JavaEdge
2021/02/22
1.6K0
相关推荐
使用Disruptor完成多个消费者不重复消费消息
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文