Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >千万级日志回放引擎设计稿

千万级日志回放引擎设计稿

作者头像
FunTester
发布于 2022-02-08 05:16:37
发布于 2022-02-08 05:16:37
60810
代码可运行
举报
文章被收录于专栏:FunTesterFunTester
运行总次数:0
代码可运行

现在压测系统一直用的方案是goreplay进行二次开发完成的。因为整体是Java技术栈的,使用goreplay有存在两方面问题:一是兼容性,语言和开发框架上,增加了用例创建执行的复杂度;二是维护成本,goreplay二次开发方案已经无法满足现在的性能测试需求。如果维护两套压测引擎会带来更多工作量。

所以为了尽可能解决这两方面问题,接到了一个活儿,调研一下Java实现日志回放功能。主要就是读了goreplay的源码以及它设计思路,用Java重现实现一遍。

这里用到了前两天分享的Disruptor高性能队列常用API演示高性能队列Disruptor在测试中应用,有兴趣的可以再翻一翻。另视频版还在制作中,年后会和大家相见。

思路

总体设计思路如下:

千万级日志回放设计

PS:流量递增和动态增减尚未实现,还在研究goreplay的源码。

日志拉取和解析

日志的拉取和初步解析依旧采取原来项目中的逻辑,通过SQL语句网关日志中拉取日志,并对日志内容进行初步解析,放入云OSS中,并将链接存入数据库(此步骤放在录制流量成功之后)。

PS:目前日志解析保留的有用信息只有URL

日志格式如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/v1/level,funtester.com,-,token,-,1622611469,-
/v1/level,funtester.com,-,token,-,1622611469,-
/v1/level,funtester.com,-,token,-,1622611469,-
/v1/level,funtester.com,-,token,-,1622611469,-
/v1/level,funtester.com,-,token,-,1622611469,-
/v1/level,funtester.com,-,token,-,1622611469,-
/v1/level,funtester.com,-,token,-,1622611469,-

实现步骤

  • 首先将日志中有用信息(URL)以及token放到内存中
  • 通过配置host,读取URL,以及响应header(token,压测标识,常用header,模拟盘标识)组装HTTP请求。
  • 创建Disruptor对象,使用异步创建生产者
  • 通过消费者消费(发出请求)消息(HTTP请求对象),达到HTTP接口日志流量回复功能。

性能指标

  • 本机6C16G配置测试数据
  • 实测1千万URL读取速度约为9s ~ 13s,内存无压力,如果后续更大日志量需求,可以通过stream方式异步读取日志,实测日志读取速度在80万/s以上,满足目前需求。
  • 单生产者速度25万QPS
  • 单机测试QPS 8.8万,CPU跑满,触及物理极限,此数据与之前工具对比压测差异不大。

风险

  • 消费者异步对消息进行存储,超过一定数量将会丢弃消息。这个问题在消费者速度小于生产者速度时会触发。
  • 消费者数量需要在启动前设定,如果参数设置不合理,会导致消费者压力瓶颈,无法动态增加消费者。

PS:这些风险后续会逐个解决。

代码实现

生产者Demo:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
def ft = {
    output("创建线程")
    fun {
        int i = 0
        while (key) {
            def url = logs.get(i % logs.size())
            def get = getHttpGet(HOST + url)
            get.addHeader("token", tokens.get(i % tokens.size()))
            get.addHeader(HttpClientConstant.USER_AGENT)
            ringBuffer.publishEvent {e, s ->
                e.setRequest(get)
            }
            i++
        }
    }
}
ft()

读取文件代码

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
 * 通过闭包传入方法读取超大文件部分内容
 *
 * @param filePath
 * @param function
 * @return
 */
public static List<String> readByLine(String filePath, Function<String, String> function) {
    if (StringUtils.isEmpty(filePath) || !new File(filePath).exists() || new File(filePath).isDirectory())
        ParamException.fail("文件信息错误!" + filePath);
    logger.debug("读取文件名:{}", filePath);
    List<String> lines = new ArrayList<>();
    File file = new File(filePath);
    if (file.isFile() && file.exists()) { // 判断文件是否存在
        try (FileInputStream fileInputStream = new FileInputStream(file);
             InputStreamReader read = new InputStreamReader(fileInputStream, DEFAULT_CHARSET);
             BufferedReader bufferedReader = new BufferedReader(read, 3 * 1024 * 1024);) {
            String line = null;
            while ((line = bufferedReader.readLine()) != null) {
                String apply = function.apply(line);
                if (StringUtils.isNotBlank(apply)) lines.add(apply);
            }
        } catch (Exception e) {
            logger.warn("读取文件内容出错", e);
        }
    } else {
        logger.warn("找不到指定的文件:{}", filePath);
    }
    return lines;
}

演示Demo

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package com.funtest.groovytest

import com.funtester.base.constaint.FixedThread
import com.funtester.config.HttpClientConstant
import com.funtester.frame.execute.Concurrent
import com.funtester.frame.execute.ThreadPoolUtil
import com.funtester.httpclient.ClientManage
import com.funtester.httpclient.FunLibrary
import com.funtester.utils.ArgsUtil
import com.funtester.utils.RWUtil
import com.lmax.disruptor.EventHandler
import com.lmax.disruptor.RingBuffer
import com.lmax.disruptor.WorkHandler
import com.lmax.disruptor.YieldingWaitStrategy
import com.lmax.disruptor.dsl.Disruptor
import com.lmax.disruptor.dsl.ProducerType
import org.apache.http.client.methods.HttpGet
import org.apache.http.client.methods.HttpRequestBase
import org.junit.platform.commons.util.StringUtils

import java.util.concurrent.LinkedBlockingDeque
import java.util.function.Function

class ReplayTest extends FunLibrary {

    static String url = "http://localhost:12345/test";

    static HttpGet httpGet = getHttpGet(url);

    //    static LinkedBlockingQueue<HttpRequestBase> requests = new LinkedBlockingQueue<>()

    static def HOST = "http://localhost:12345"

    static def key = true

    static Disruptor<RequestEvent> disruptor

    public static void main(String[] args) {
        def logfile = "/Users/oker/Desktop/log.csv"
        //        def logfile = "/Users/oker/Desktop/fun.csv"
        //1千万日志
        def tokenfile = "/Users/oker/Desktop/token.csv"
        //2万用户token
        List<String> logs = RWUtil.readByLine(logfile, new Function<String, String>() {

            @Override
            String apply(String s) {
                return StringUtils.isNotBlank(s) && s.startsWith("/") ? s.split(COMMA)[0] : null
            }
        });
        List<String> tokens = RWUtil.readByLine(tokenfile, new Function<String, String>() {

            @Override
            String apply(String s) {
                return StringUtils.isNotBlank(s) ? s.split(COMMA)[4] : null
            }
        });

        output("总计 ${formatLong(logs.size())} 条日志")
        disruptor = new Disruptor<RequestEvent>(
                RequestEvent::new,
                512 * 512,
                ThreadPoolUtil.getFactory(),
                ProducerType.MULTI,
                new YieldingWaitStrategy()
        );
        RingBuffer<RequestEvent> ringBuffer = disruptor.getRingBuffer();

        def ft = {
            output("创建线程")
            fun {
                int i = 0
                while (key) {
                    def url = logs.get(i % logs.size())
                    def get = getHttpGet(HOST + url)
                    get.addHeader("token", tokens.get(i % tokens.size()))
                    get.addHeader(HttpClientConstant.USER_AGENT)
                    ringBuffer.publishEvent {e, s ->
                        e.setRequest(get)
                    }
                    i++
                }
            }
        }
        ft()
        disruptor.handleEventsWith(new FunTester(10))
        //        5.times {ft()}

        //下面开始测试
        ClientManage.init(10, 5, 0, "", 0)
        def util = new ArgsUtil(args)
        def thread = util.getIntOrdefault(0, 20)
        def times = util.getIntOrdefault(1, 60000)
        RUNUP_TIME = util.getIntOrdefault(2, 0)
        def tasks = []
        thread.times {
            def tester = new FunTester(times)
            disruptor.handleEventsWith(tester);
            tasks << tester
        }
        disruptor.start();
        new Concurrent(tasks, "这是千万级日志回放演示Demo").start()

    }


    private static class FunTester extends FixedThread implements EventHandler<RequestEvent>, WorkHandler<RequestEvent> {

        LinkedBlockingDeque<HttpRequestBase> reqs = new LinkedBlockingDeque<HttpRequestBase>()

        FunTester(int limit) {
            super(null, limit, true)
        }

        @Override
        protected void doing() throws Exception {
            FunLibrary.executeOnly(reqs.take())
        }

        @Override
        FixedThread clone() {
            return new FunTester(limit)
        }

        @Override
        protected void after() {
            super.after()
            key = false
            disruptor.shutdown()
        }

        @Override
        void onEvent(RequestEvent event, long sequence, boolean endOfBatch) throws Exception {
            if (reqs.size() < 100000) reqs.add(event.getRequest())
        }

        @Override
        void onEvent(RequestEvent event) throws Exception {
            if (reqs.size() < 100000) reqs.add(event.getRequest())
        }
    }


    private static class RequestEvent {

        HttpRequestBase request;

        public HttpRequestBase getRequest() {
            return request;
        }

        public void setRequest(HttpRequestBase request) {
            this.request = request;
        }

    }


}

PS:这里用到了多个group,原因在设计稿中标记了。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-12-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 FunTester 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
1 条评论
热度
最新
1
1
回复回复点赞举报
推荐阅读
编辑精选文章
换一批
Java&Go高性能队列之Disruptor性能测试
之前写过Java&Go高性能队列之LinkedBlockingQueue性能测试之后,就一直准备这这篇文章,作为准备内容的过程中也写过一些Disruptor高性能消息队列的应用文章:高性能队列Disruptor在测试中应用和千万级日志回放引擎设计稿。
FunTester
2022/04/01
8950
Java&Go高性能队列之LinkedBlockingQueue性能测试
在写完高性能队列Disruptor在测试中应用和千万级日志回放引擎设计稿视频版之后,我就一直在准备Java & Go 语言几种高性能消息队列的性能测试,其中选取了几种基准测试场景以及在性能测试中的应用场景。
FunTester
2022/02/08
1.4K3
基于docker的分布式性能测试框架功能验证(三)
本文是DCS_FunTester测试框架分布式性能测试功能拓展实践,是一种比较粗略的技术验证实践,技术方案采用
FunTester
2021/07/23
3510
基于docker的分布式性能测试框架功能验证(三)
分布式性能测试框架用例方案设想(一)
在近期工作规划中,分布式压测框架提上日程,目前「FunTester」已经具备了一些分布式压测中用到的功能。
FunTester
2021/06/23
7150
分布式性能测试框架用例方案设想(三)
性能测试脚本基于FunTester性能测试框架,在之前的方案二中,我们需要将用例写进去基础的jar包中,然后通过反射调用,灵活之处就是可以将用例的主要变量参数化,但是死板之处就是用例整体的设计已经完成了,参数化能力有限。如果用例场景需要增添,这种方式也就无能为力了,又时候甚至一点点的用例改动都会造成用例失效的情况。
FunTester
2021/07/23
5220
自动化项目基类实践--视频演示
接口测试中业务验证 http://mpvideo.qpic.cn/0bf2raaagaaaauaafxudrvpfbcgdaoeaaaya.f10002.mp4?dis_k=c999f00b893c9
FunTester
2020/04/08
3110
基于时间戳的日志回放引擎
之前写过一个日志回放引擎的第一代千万级日志回放引擎设计稿,当时理解的日志回放就是把日志记录的请求重新发出去,这就是回放线上用户的流量了。可是在我最近看goreplay的过程中,重新刷新了我的认知。
FunTester
2022/12/09
3120
高性能队列Disruptor在测试中应用
最近在研究goreplay的源码的过程中,感觉有些思路还是很值得借鉴。所以自己立了一个flag,实现一个千万级日志回放功能。但是在这个实现的过程中遇到一个棘手的问题:Java自带的LinkedBlockingQueue比较难以直接满足需求场景和性能要求。
FunTester
2022/02/08
8350
Java&Go三种HTTP客户端性能测试
在学完Golang语言HTTP客户端实践、Go语言HTTPServer开发的六种实现之后,我自然开始了Java&Go两种语言的HTTP客户端性能测试。
FunTester
2021/12/02
1K0
单机12万QPS——FunTester复仇记
在文章10万QPS,K6、Gatling和FunTester终极对决!中,最后测试结果FunTester除了在在CPU方面有一丁点优势以外,内存和QPS均略逊一筹,特别是内存方面劣势尤为明显。当时立了一个flag:
FunTester
2021/08/18
4150
性能测试中记录每一个耗时请求
在之前的文章性能测试中标记每个请求中提到,把每一个接口都进行requestID的标记,接下来的工作就简单了,就是设置各种超时配置,然后进行压测,会记录超时的请求ID和响应时间(采取“响应时间_requestID”形式),结果如图:
FunTester
2020/02/17
4640
基于docker的分布式性能测试框架功能验证(二)
本文是FunTester测试框架分布式性能测试功能拓展实践,是一种比较粗略的技术验证实践,技术方案采用分布式性能测试框架用例方案设想(二)中所设想场景,基于jar包内函数的,这个方案需要将测试用例写到更新到jar包中或者classpath目录下。
FunTester
2021/06/23
3270
下单延迟10s撤单性能测试
研发提出了一个下单之后延迟10s撤单的压测需求,着实让我迷糊了一会儿,最后参考了Java的延迟队列java.util.concurrent.DelayQueue实现了这个需求。
FunTester
2021/12/02
4290
下单延迟10s撤单性能测试
单点登录性能测试方案
项目登录系统升级,改为单点登录:英文全称Single Sign On。SSO是在多个应用系统中,用户只需要登录一次就可以访问所有相互信任的应用系统。 之前有的统一登录方式被废弃,由于单点登录比较之前的登录系统复杂很多。之前的方案请求一个接口即可获得用户校验令牌。 先分享一下单点登录的技术方案的时序图:
FunTester
2019/10/08
1.6K0
单点登录性能测试方案
链路压测中如何记录每一个耗时的请求
前文回顾:性能测试中记录每一个耗时请求,做完了单接口耗时请求的记录功能,近期又迎来了一批多接口链路压测的需求。刚好趁着这个机会,多实现一些不同场景的链路压测需求,锻炼一波,也能提高自己写的「FunTester」测试框架的兼容性,可谓一石多鸟,何乐而不为。
FunTester
2020/12/24
8510
DCS_FunTester分布式压测框架更新(一)
在经过了一些人生的思考和积累,终于开始第一轮的更新,除了修复BUG以外,还进行不少功能性更新。另外本地部署已经完成了。有兴趣的可以自己多体验一下。分布式性能测试框架单节点内测
FunTester
2021/07/23
3980
性能测试中标记每个请求
在做性能测试过程中,遇到一个棘手的问题,开发让我们复现几个请求时间较长的请求,他们看日志进行链路追踪,查找瓶颈所在。
FunTester
2020/01/17
4140
拷贝HttpRequestBase对象
在实践性能测试框架第二版的过程中,我实现了一个单个HttpRequestBase对象的concurrent对象创建,单之前都是用使用唯一的HttpRequestBase对象进行多线程请求,目前来看是没有问题的,但为了防止以后出现意外BUG和统一concurrent的构造方法使用,故尝试拷贝了一个HttpRequestBase对象。原因是因为之前封装的深拷贝方法对于HttpRequestBase对象的实现类如:httpget和httppost并不适用,因为没有实现Serializable接口。所以单独写了一个HttpRequestBase对象的拷贝方法,供大家参考。
FunTester
2019/12/31
5740
HTTP接口测试基础【FunTester框架教程】
今天继续编写FunTester测试框架的教程,主要内容是HTTP接口测试基础,分为请求(GET、POST、PUT等)、请求头、cookie、响应、JSON以及资源释放。
FunTester
2021/09/14
5560
基于docker的分布式性能测试框架功能验证(一)
本文是「FunTester」测试框架分布式性能测试功能拓展实践,是一种比较粗略的技术验证实践,技术方案采用分布式性能测试框架用例方案设想(一)中所设想场景。
FunTester
2021/06/23
4270
推荐阅读
相关推荐
Java&Go高性能队列之Disruptor性能测试
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验