前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >基于时间戳的日志回放引擎

基于时间戳的日志回放引擎

作者头像
FunTester
发布2022-12-09 14:01:06
2310
发布2022-12-09 14:01:06
举报
文章被收录于专栏:FunTesterFunTester

之前写过一个日志回放引擎的第一代千万级日志回放引擎设计稿,当时理解的日志回放就是把日志记录的请求重新发出去,这就是回放线上用户的流量了。可是在我最近看goreplay的过程中,重新刷新了我的认知。

查阅了一些资料,终于算是了解了一些基于时间戳的方案和思路。大体如下:通过工具把线上某段时间的流量记录下来,其中包含时间戳等信息,然后通过回放引擎把流量回放出去。

解决思路

目前流量回放集中于HTTP流量,所以之前写过的引擎的发压部分还是可以继续使用。所以我也有了自己的解决思路:

  1. 日志清洗,其实就是把规范化的日志解析成引擎框架可以使用的对象,通常包含HTTP请求的组成部分。
  2. 按照时间戳排序,通常使用现成的工具这一步是可以省略,但是由于日志记录是已经存在的组件,这里需要做一些兼容性工作
  3. 日志回放,通过线程池和连接池两个池化技术可以解决性能方面的问题。再结合当前的分布式方案做一些兼容功能即可。

其中最最核心的应该就是队列的选择,这里我用看java的java.util.concurrent.DelayQueue,也没找到其他更好的框架了。其实在一开始我想复用自己写之前写的日志回放框架的队列,也尝试对集中常用队列进行了性能测试:

本来想是用多线程去读取日志的过程中,通过判断每一条日志是否到时间点,然后丢到一个线程安全的队列中,后面用线程池取队列中的对象,发送请求的。但是仔细想来太复杂了,流量过了好几手,不利于实现和拓展功能。

然后我重新对java.util.concurrent.DelayQueue进行了性能测试延迟队列DelayQueue性能测试,有了测试结果之后,就可以放心大胆地干了。关于延迟队列的基本使用可参考下单延迟10s撤单性能测试

实现

总体来说实现起来思路比较清晰,我分成三部分分享。

属性定义

  1. 我首先定义了一个com.funtester.frame.execute.ReplayConcurrent.ReplayLog日志对象,用于存储每一个请求日志
  2. 然后定义一个com.funtester.frame.execute.ReplayConcurrent#logs用来存储日志,这里旧事重提一下,千万级别的日志对象,存储在内存里面是OK的,所以我才会采用这种方式。为什么要从日志文件中转一手呢?因为日志是不按照时间戳排序的。
  3. 再定义com.funtester.frame.execute.ReplayConcurrent#logDelayQueue用来当作回放请求队列,也就是流量中转站,生产者从com.funtester.frame.execute.ReplayConcurrent#logs中取,clone之后丢到队列中;消费者从队列中取对象,丢给线程池。
  4. 定义com.funtester.frame.execute.ReplayConcurrent#handle当作是处理流量的方法,就是把流量对象包装成HttpRequestBase对象然后发送出去

生产者

  1. 确定使用异步线程完成,使用Java自定义异步功能实践
  2. 根据com.funtester.frame.execute.ReplayConcurrent#logDelayQueue性能测试数据,添加com.funtester.frame.execute.ReplayConcurrent#threadNum参数来控制。
  3. 多线程取com.funtester.frame.execute.ReplayConcurrent#logs对象,用到了几个线程安全类,用于保障多线程是顺序读取,避免了在延迟队列中进行排序操作。
  4. 使用了com.funtester.frame.execute.ReplayConcurrent#getMAX_LENGTH控制队列的长度。貌似没找到限制延迟队列长度的API。只能自己实现了,思路当添加日志数量超过最大值,存储当前队列长度。当长度大于最大长度,则在下一次添加对象前,休眠1s,然后在重置本地存储的队列长度。这样可以解决这个问题。当然最大值设置足够高,避免1s中内队列变成空。回放引擎设计50万QPS,所以我就先设置了80万的最大长度。后续可以根据实际情况调整。

消费者

  1. 依旧使用异步,生产者
  2. 使用API时java.util.concurrent.DelayQueue#poll(long, java.util.concurrent.TimeUnit),避免阻塞导致线程无法终止。
  3. 引入com.funtester.frame.execute.ReplayConcurrent#getMultiple控制流量回放的倍数。
  4. 使用com.funtester.frame.execute.ReplayConcurrent#getTotal记录回放的日志数量。
  5. 使用com.funtester.frame.execute.ReplayConcurrent#getHandle处理日志对象。

代码如下:

代码语言:javascript
复制
package com.funtester.frame.execute

import com.funtester.base.bean.AbstractBean
import com.funtester.frame.SourceCode
import com.funtester.utils.LogUtil
import com.funtester.utils.RWUtil
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger

import java.util.concurrent.DelayQueue
import java.util.concurrent.Delayed
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.LongAdder

/**
 * 回放功能执行类*/
class ReplayConcurrent extends SourceCode {

    private static Logger logger = LogManager.getLogger(ReplayConcurrent.class);

    static ThreadPoolExecutor executor

    static boolean key = true

    static int MAX_LENGTH = 800000

    int threadNum = 2

    String name

    String fileName

    int multiple

    Closure handle

    List<ReplayLog> logs

    DelayQueue<ReplayLog> logDelayQueue = new DelayQueue<ReplayLog>()

    LongAdder total = new LongAdder()

    ReplayConcurrent(String name, String fileName, int multiple, Closure handle) {
        this.name = name
        this.fileName = fileName
        this.multiple = multiple
        this.handle = handle

    }

    void start() {
        if (executor == null) executor = ThreadPoolUtil.createCachePool(THREADPOOL_MAX, "R")
        time({
            RWUtil.readFile(fileName, {
                def delay = new ReplayLog(it)
                if (delay.getTimestamp() != 0) logDelayQueue.add(delay)
            })
        }, 1, "读取日志$fileName")
        logs = logDelayQueue.toList()
        def timestamp = logs.get(0).getTimestamp()
        logDelayQueue.clear()
        AtomicInteger index = new AtomicInteger()
        AtomicInteger size = new AtomicInteger()
        def LogSize = logs.size()
        AtomicInteger diff = new AtomicInteger()
        threadNum.times {
            fun {
                while (key) {
                    if (index.get() % LogSize == 0) diff.set(getMark() - timestamp)
                    if (index.get() % MAX_LENGTH == 0) size.set(logDelayQueue.size())
                    if (size.get() > MAX_LENGTH) {
                        sleep(1.0)
                        size.set(logDelayQueue.size())
                    }
                    def replay = logs.get(index.getAndIncrement() % LogSize)
                    logDelayQueue.add(replay.clone(replay.timestamp + diff.get()))
                }
            }
        }
        threadNum.times {
            fun {
                while (key) {
                    def poll = logDelayQueue.poll(1, TimeUnit.SECONDS)
                    if (poll != null) {
                        executor.execute {
                            multiple.times {
                                handle(poll.getUrl())
                                total.add(1)
                            }
                        }

                    }
                }
            }
        }
        fun {
            while (key) {
                sleep(COUNT_INTERVAL as double)
                int real = total.sumThenReset() / COUNT_INTERVAL as int
                def active = executor.getActiveCount()
                def count = active == 0 ? 1 : active
                logger.info("{} ,实际QPS:{} 活跃线程数:{} 单线程效率:{}", name, real, active, real / count as int)
            }
        }

    }

    /**
     * 中止
     * @return
     */
    def stop() {
        key = false
        executor.shutdown()
        logger.info("replay压测关闭了!")
    }

    /**
     * 日志对象*/
    static class ReplayLog extends AbstractBean implements Delayed {

        int timestamp

        String url

        ReplayLog(String logLine) {
            def log = LogUtil.getLog(logLine)
            this.url = log.getUrl()
            this.timestamp = log.getTime()
        }

        ReplayLog(int timestamp, String url) {
            this.timestamp = timestamp
            this.url = url
        }

        @Override
        long getDelay(TimeUnit unit) {
            return this.timestamp - getMark()
        }

        @Override
        int compareTo(Delayed o) {
            return this.timestamp - o.timestamp
        }

        protected Object clone(int timestamp) {
            return new ReplayLog(timestamp, this.url)
        }
    }
}

自测

下面是我的测试用例:

代码语言:javascript
复制
package com.okcoin.hickwall.presses

import com.okcoin.hickwall.presses.funtester.frame.execute.ReplayConcurrent
import com.okcoin.hickwall.presses.funtester.httpclient.FunHttp

class RplayT extends FunHttp {

    static String HOST = "http://localhost:12345"

    public static void main(String[] args) {
        def fileName = "api.log"
        new ReplayConcurrent("测试回放功能", fileName, 1, {String url ->
            getHttpResponse(getHttpGet(HOST + url))
        }).start()

    }
}

测试结果如下:

代码语言:javascript
复制
22:45:43.510 main 
  ###### #     #  #    # ####### ######  #####  ####### ######  #####
  #      #     #  ##   #    #    #       #         #    #       #    #
  ####   #     #  # #  #    #    ####    #####     #    ####    #####
  #      #     #  #  # #    #    #            #    #    #       #   #
  #       #####   #    #    #    ######  #####     #    ######  #    #
  
10:56:18 F-5 测试回放功能, 实际QPS:23162 活跃线程数:0单线程效率:23162
10:56:23 F-5 测试回放功能, 实际QPS:36575 活跃线程数:6单线程效率:6095
10:56:28 F-5 测试回放功能, 实际QPS:38974 活跃线程数:21单线程效率:1855 
10:56:33 F-5 测试回放功能, 实际QPS:32798 活跃线程数:8单线程效率:4099
10:56:38 F-5 测试回放功能,实际QPS:35224 活跃线程数:4单线程效率:8806
10:56:43 F-5 测试回放功能,实际QPS:28426 活跃线程数:0单线程效率:28426
10:56:48 F-5 测试回放功能, 实际QPS:33607 活跃线程数:6单线程效率:5601
10:56:53 F-5 测试回放功能,实际QPS:34392 活跃线程数:0单线程效率:34392

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-08-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 FunTester 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 解决思路
  • 实现
    • 属性定义
      • 生产者
        • 消费者
        • 自测
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档