前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊项目中如何实现请求聚合

聊聊项目中如何实现请求聚合

原创
作者头像
lyb-geek
发布2024-08-06 09:48:22
930
发布2024-08-06 09:48:22
举报
文章被收录于专栏:Linyb极客之路

前言

什么是请求聚合

见名之意就是将多次的请求整合为一个请求处理

如何实现请求聚合

有个快手大佬开源了一个工具类:buffer-trigger,这玩意就可以用来做请求聚合。

buffer-trigger适用场景

  1. 高吞吐量消息处理: 当系统需要处理大量快速产生的数据或消息时,如日志记录、事件追踪、实时交易数据等,单条消息的即时处理可能会导致过多的系统开销(如网络通信、数据库操作等)。通过使用BufferTrigger,可以将这些消息暂时缓存在阻塞队列中,累积到一定数量后一次性进行批量处理。这样既能减少系统调用次数,提升整体处理效率,又能降低对下游系统的瞬时压力。
  2. 延迟敏感但允许适度延后处理: 在某些业务场景中,数据或消息的处理虽有一定的时效性要求,但并不严格到需要立即响应。例如,用户行为分析、运营统计报表生成等任务,可以在容忍的时间窗口内完成。BufferTrigger通过设置批处理阈值和延迟等待时间,允许在满足一定积累量或等待时间后才触发消费,从而实现数据的“准实时”处理,兼顾了处理效率与延迟需求。
  3. 资源优化与成本控制: 对于依赖付费服务(如云存储、API调用)或者计算资源有限的情况,批量处理能够显著减少对外部服务的调用量或内部计算资源的占用。例如,定期向云存储批量上传日志文件、批量发送电子邮件通知、批量查询外部API并聚合结果等。BufferTrigger通过合并多个小任务为一个大任务,有助于降低单位数据处理的成本。
  4. 避免频繁IO操作: 若消息的消费涉及大量的磁盘IO、网络IO或其他昂贵的系统资源操作,如数据库写入、文件写入、跨网络的数据同步等,频繁的单个操作可能导致性能瓶颈。BufferTrigger通过批量处理,能够减少这类操作的次数,从而提高系统整体性能。
  5. 微服务间解耦与流量控制: 在分布式微服务架构中,不同服务之间可能存在强依赖关系。使用BufferTrigger可以在服务间引入一层缓冲,避免下游服务瞬时过载或临时不可用导致整个系统崩溃。同时,批量处理能平滑消费端的请求流量,减轻对上游服务的压力,增强系统的稳定性和容错能力。

如何使用buffer-trigger

1、在项目的pom中引入buffer-trigger GAV

代码语言:java
复制
<dependency>
  <groupId>com.github.phantomthief</groupId>
  <artifactId>buffer-trigger</artifactId>
  <version>0.2.9</version>
</dependency>

2、使用案例一:使用SimpleBufferTrigger

代码语言:java
复制
/**
 * {@link BufferTrigger}的通用实现,适合大多数业务场景
 * <p>
 * 消费触发策略会考虑消费回调函数的执行时间,实际执行间隔 = 理论执行间隔 - 消费回调函数执行时间;
 * 如回调函数执行时间已超过理论执行间隔,将立即执行下一次消费任务.
 *
 * @author w.vela
 */

示例:

代码语言:java
复制
public class BufferTriggerDemo {
     BufferTrigger<Long> bufferTrigger = BufferTrigger.<Long, Map<Long, AtomicInteger>> simple()
            .maxBufferCount(10)
            .interval(4, TimeUnit.SECONDS)
            .setContainer(ConcurrentHashMap::new, (map, uid) -> {
                map.computeIfAbsent(uid, key -> new AtomicInteger()).addAndGet(1);
                return true;
            })
            .consumer(this::consumer)
            .build();



    public void consumer(Map<Long, AtomicInteger> map) {
        System.out.println(map);
    }

    public void test() throws InterruptedException {
        // 进程退出时手动消费一次
        Runtime.getRuntime().addShutdownHook(new Thread(() -> bufferTrigger.manuallyDoTrigger()));
        // 最大容量是10,这里尝试添加11个元素0-10
        for (int i = 0; i < 5; i ++) {
            for (long j = 0; j < 11; j ++) {
                bufferTrigger.enqueue(j);
            }
        }

        Thread.sleep(7000);
    }

参数描述

  • maxBuffeCount(long count): 指定容器最大容量,比如这里指定了10,当在下次聚合前容器元素数量达到10就无法添加了,-1表示无限制;
  • internal(longinterval, TimeUnit unit) :表示多久聚合一次,如果没达到时间那么consumer是不会输出的,聚合后容器就空了。
  • setContainer(Supplier<? extends C> factory, BiPredicate<? super C, ? super E> queueAdder): 第一个变量为factory,是个Supplier,获取容器用的,要求线程安全;第二个变量是缓存更新的方法BiPredicate<? super C, ? super E> queueAdder C为容器类型,E为元素类型
  • consumer(ThrowableConsumer<? super C, Throwable> consumer): 表示如何消费聚合后的数据,标识我们如何去消费聚合后的数据,我这里就是简单打印。 enqueue(E element): 添加元素;
  • manuallyDoTrigger: 主动触发一次消费,通常在java进程关闭的时候调用

2、使用案例二:使用BatchConsumeBlockingQueueTrigger

代码语言:java
复制
/**
 * {@link BufferTrigger}基于阻塞队列的批量消费触发器实现.
 * <p>
 * 该触发器适合生产者-消费者场景,缓存容器基于{@link LinkedBlockingQueue}队列实现.
 * <p>
 * 触发策略类似Kafka linger,批处理阈值与延迟等待时间满足其一即触发消费回调.
 * @author w.vela
 */

示例:

代码语言:java
复制
public class BufferTriggerDemo2 {
     BufferTrigger<Long> bufferTrigger = BufferTrigger.<Long>batchBlocking()
             .bufferSize(50)
             .batchSize(10)
             .linger(Duration.ofSeconds(1))
             .setConsumerEx(this::consume)
             .build();

    private void consume(List<Long> nums) {
        System.out.println(nums);
    }

    public void test() throws InterruptedException {
        // 进程退出时手动消费一次
        Runtime.getRuntime().addShutdownHook(new Thread(() -> bufferTrigger.manuallyDoTrigger()));
        for (long j = 0; j < 60; j ++) {
            bufferTrigger.enqueue(j);
        }

        Thread.sleep(7000);
    }
  • batchBlocking():提供自带背压(back-pressure)的简单批量归并消费能力;
  • bufferSize(intbufferSize): 缓存队列的最大容量; batchSize(int size): 批处理元素的数量阈值,达到这个数量后也会进行消费
  • linger(Duration duration): 多久消费一次
  • setConsumerEx(ThrowableConsumer<? super List, Exception> consumer): 消费函数,注入的对象为缓存队列中尚存的所有元素,非逐个元素消费;

3、两种实现方式在使用上的区别

BatchConsumeBlockingQueueTrigger每次将元素原封不动保存下来,然后一次性消费一整个列表元素。而SimpleBufferTrigger,每次添加元素都会进行计算。

以上示例摘抄该博文https://juejin.cn/post/7160569936576774181

这篇文章比较详细对请求聚合以及buffer-trigger进行了介绍

更多buffer-trigger内容可以看官方源码注释以及相应的单元测试案例

https://github.com/PhantomThief/buffer-trigger

以上就是buffer-trigger的使用教程,不过如果只是写到这边,就没啥意思了,下面就以一个实战的例子,来演示下如何实现请求聚合

案例

注: 以一个批量注册用户为例子,来演示请求聚合。案例将buffer-trigger与springboot做了一个整合。案例只列出核心代码,完整示例查看文末demo链接

1、项目中引入buffer-trigger GAV

代码语言:java
复制
 <dependency>
            <groupId>com.github.phantomthief</groupId>
            <artifactId>buffer-trigger</artifactId>
            <version>${buffer.trigger.version}</version>
        </dependency>

2、封装请求参数类

代码语言:java
复制
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class DataExchange<T,R> {

    private String bizNo;
    private T request;
    private CompletableFuture<Result<R>> response;
}

3、封装buffer-trigger处理类

代码语言:java
复制
@RequiredArgsConstructor
public class DelegateBatchConsumerTriggerHandler<T, R> implements BatchConsumerTriggerHandler<T, R>{


    private final BufferTrigger<DataExchange<T, R>> bufferTrigger;



    @SneakyThrows
    @Override
    public Result<R> handle(T request, String bizNo) {
        DataExchange dataExchange = new DataExchange<>();
        dataExchange.setBizNo(bizNo);
        dataExchange.setRequest(request);
        CompletableFuture<Result> response = new CompletableFuture<>();
        dataExchange.setResponse(response);
        bufferTrigger.enqueue(dataExchange);
        return response.get();
    }


    @Override
    public void closeBufferTrigger() {
        // 触发该事件,关闭BufferTrigger,并将未消费的数据消费
       if(bufferTrigger != null){
           bufferTrigger.close();
       }
    }
}

4、封装buffer-trigger创建工厂

代码语言:java
复制
public interface BatchConsumerTriggerFactory {

    default <T,R> BatchConsumerTriggerBuilder<DataExchange<T,R>> builder(){
        return null;
    }

    default <T,R> BufferTrigger<DataExchange<T,R>> getTrigger(ThrowableConsumer<List<DataExchange<T,R>>, Exception> consumer, String bufferTriggerBizType){
        if(!support(bufferTriggerBizType)){
           return null;
        }
       return builder().setConsumerEx(consumer).build();
    }

    boolean support(String bufferTriggerBizType);

    default <T,R> BatchConsumerTriggerHandler<T,R> getTriggerHandler(ThrowableConsumer<List<DataExchange<T,R>>, Exception> consumer, String bufferTriggerBizType){
        BufferTrigger<DataExchange<T, R>> trigger = getTrigger(consumer, bufferTriggerBizType);
        return new DelegateBatchConsumerTriggerHandler<>(trigger);
    }


}

5、模拟用户注册dao

代码语言:java
复制
@Repository
public class UserDao {
    private final Map<Long, User> userMap = new ConcurrentHashMap<>();
    private final ThreadLocalRandom random = ThreadLocalRandom.current();
    private final LongAdder idAdder = new LongAdder();

    public User register(UserDTO userDTO){
        mockExecuteCostTime();
        return getUser(userDTO);
    }


    public List<User> batchRegister(List<UserDTO> userDTOs){
        mockExecuteCostTime();
        List<User> users = new ArrayList<>();
        userDTOs.forEach(userDTO -> users.add(getUser(userDTO)));
        return users;
    }

6、模拟用户注册service

a、 常规方式

代码语言:java
复制
@Service
@RequiredArgsConstructor
public class UserServiceImpl implements UserService {

    private final UserDao userDao;
    private final LongAdder count = new LongAdder();
    @Override
    public Result<User> register(UserDTO user) {
        count.increment();
        System.out.println("执行次数:" + count.sum());

        return Result.success(userDao.register(user));
    }
    }

b、 请求聚合方式

前置条件: 需在yml指定相关队列、定时器配置以及业务类别

代码语言:yaml
复制
lybgeek:
  buffer:
    trigger:
      consume-queue-trigger-properties:
        - bufferTriggerBizType: userReisgeter
          config:
            batchSize: 100
            bufferSize: 1000
            batchConsumeIntervalMills: 1000
代码语言:java
复制
@Service
@RequiredArgsConstructor
public class UserServiceBufferTriggerImpl implements UserService, InitializingBean, DisposableBean {
    public static final String BUFFER_TRIGGER_BIZ_TYPE = "userReisgeter";

    private final UserDao userDao;

    private final BatchConsumerTriggerFactory batchConsumerTriggerFactory;

    private BatchConsumerTriggerHandler<UserDTO,User> batchConsumerTriggerHandler;


    private final LongAdder count = new LongAdder();

    @SneakyThrows
    @Override
    public Result<User> register(UserDTO user) {
       return batchConsumerTriggerHandler.handle(user,BUFFER_TRIGGER_BIZ_TYPE + "-" + UUID.randomUUID());
    }

 
    @Override
    public void afterPropertiesSet() throws Exception {
        // key为业务属性唯一键,如果不存在业务属性唯一键,则可以取bizNo作为key,示例以username作为唯一键
        Map<String, CompletableFuture<Result<User>>> completableFutureMap = new HashMap<>();
        batchConsumerTriggerHandler = batchConsumerTriggerFactory.getTriggerHandler((ThrowableConsumer<List<DataExchange<UserDTO, User>>, Exception>) dataExchanges -> {
            List<UserDTO> userDTOs = new ArrayList<>();
            for (DataExchange<UserDTO, User> dataExchange : dataExchanges) {
                UserDTO userDTO = dataExchange.getRequest();
                completableFutureMap.put(userDTO.getUsername(),dataExchange.getResponse());
                userDTOs.add(userDTO);
            }
            count.increment();
            System.out.println("执行次数:" + count.sum());
            List<User> users = userDao.batchRegister(userDTOs);
            if(CollectionUtil.isNotEmpty(users)){
                for (User user : users) {
                    CompletableFuture<Result<User>> completableFuture = completableFutureMap.remove(user.getUsername());
                    if(completableFuture != null){
                        completableFuture.complete(Result.success(user));
                    }
                }
            }

        },BUFFER_TRIGGER_BIZ_TYPE);


    }

    @Override
    public void destroy() throws Exception {
        // 触发该事件,关闭BufferTrigger,并将未消费的数据消费
        batchConsumerTriggerHandler.closeBufferTrigger();
    }
}

7、分别开启20个线程,对常规方式以及聚合方式的service进行测试

a、 常规方式

代码语言:java
复制
  @Test
    public void testRegisterUserByCommon() throws IOException {
      new ConcurrentCall(20).run(()->{
          UserDTO user = UserUtil.generateUser();
          return userServiceImpl.register(user);
      });
    }

控制台输出

代码语言:java
复制
执行次数:1
执行次数:2
执行次数:7
执行次数:6
执行次数:10
执行次数:9
执行次数:5
执行次数:4
执行次数:11
执行次数:12
执行次数:3
执行次数:8
执行次数:17
执行次数:16
执行次数:15
执行次数:18
执行次数:14
执行次数:20
执行次数:13
执行次数:19
Result(code=200, msg=success, data=User(id=1, username=yangweize, fullname=杨伟泽, age=12, email=yangweize@qq.com, mobile=64294835455))
Result(code=200, msg=success, data=User(id=3, username=yaojinpeng, fullname=姚晋鹏, age=13, email=yaojinpeng@qq.com, mobile=5381-03836251))
Result(code=200, msg=success, data=User(id=9, username=pengxiaoran, fullname=彭潇然, age=25, email=pengxiaoran@qq.com, mobile=903-85787160))
Result(code=200, msg=success, data=User(id=9, username=guoweize, fullname=郭伟泽, age=9, email=guoweize@qq.com, mobile=57105382845))
Result(code=200, msg=success, data=User(id=8, username=huangjinyu, fullname=黄瑾瑜, age=29, email=huangjinyu@qq.com, mobile=449-27085386))
Result(code=200, msg=success, data=User(id=6, username=renkairui, fullname=任楷瑞, age=3, email=renkairui@qq.com, mobile=2777-67842072))
Result(code=200, msg=success, data=User(id=2, username=fuhaoran, fullname=傅昊然, age=15, email=fuhaoran@qq.com, mobile=332-47390793))
Result(code=200, msg=success, data=User(id=5, username=linmingxuan, fullname=林明轩, age=27, email=linmingxuan@qq.com, mobile=116-31209336))
Result(code=200, msg=success, data=User(id=5, username=shensicong, fullname=沈思聪, age=6, email=shensicong@qq.com, mobile=0532-05033168))
Result(code=200, msg=success, data=User(id=11, username=gongtianyu, fullname=龚天宇, age=4, email=gongtianyu@qq.com, mobile=9752-26976731))
Result(code=200, msg=success, data=User(id=13, username=xiongminghui, fullname=熊明辉, age=23, email=xiongminghui@qq.com, mobile=0049-21709250))
Result(code=200, msg=success, data=User(id=17, username=huzhize, fullname=胡志泽, age=0, email=huzhize@qq.com, mobile=760-85426527))
Result(code=200, msg=success, data=User(id=16, username=gaosiyuan, fullname=高思源, age=5, email=gaosiyuan@qq.com, mobile=42452304656))
Result(code=200, msg=success, data=User(id=13, username=mojiaxi, fullname=莫嘉熙, age=2, email=mojiaxi@qq.com, mobile=7264-82263592))
Result(code=200, msg=success, data=User(id=18, username=caizimo, fullname=蔡子默, age=12, email=caizimo@qq.com, mobile=2653-82403850))
Result(code=200, msg=success, data=User(id=10, username=wancongjian, fullname=万聪健, age=10, email=wancongjian@qq.com, mobile=954-37654583))
Result(code=200, msg=success, data=User(id=14, username=gongyuebin, fullname=龚越彬, age=0, email=gongyuebin@qq.com, mobile=77884047173))
Result(code=200, msg=success, data=User(id=15, username=fenghongtao, fullname=冯鸿涛, age=2, email=fenghongtao@qq.com, mobile=8832-09658213))
Result(code=200, msg=success, data=User(id=19, username=jiangyuanbo, fullname=江苑博, age=12, email=jiangyuanbo@qq.com, mobile=2132-90700641))
Result(code=200, msg=success, data=User(id=20, username=xiaoxinlei, fullname=萧鑫磊, age=13, email=xiaoxinlei@qq.com, mobile=02196775183))

b、 聚合请求方式

代码语言:java
复制
  @Test
    public void testRegisterUserByBufferTrigger() throws IOException {
        new ConcurrentCall(20).run(()->{
            UserDTO user = UserUtil.generateUser();
            return userServiceBufferTriggerImpl.register(user);
        });
    }

控制台输出

代码语言:java
复制
执行次数:1
Result(code=200, msg=success, data=User(id=1, username=heguo, fullname=何果, age=10, email=heguo@qq.com, mobile=5725-06130005))
Result(code=200, msg=success, data=User(id=7, username=houwen, fullname=侯文, age=9, email=houwen@qq.com, mobile=85830365362))
Result(code=200, msg=success, data=User(id=11, username=yangxiaoyu, fullname=杨笑愚, age=5, email=yangxiaoyu@qq.com, mobile=13776594491))
Result(code=200, msg=success, data=User(id=3, username=yusimiao, fullname=余思淼, age=5, email=yusimiao@qq.com, mobile=070-18231344))
Result(code=200, msg=success, data=User(id=12, username=haotianyu, fullname=郝天宇, age=10, email=haotianyu@qq.com, mobile=42693432247))
Result(code=200, msg=success, data=User(id=14, username=wangxinpeng, fullname=汪鑫鹏, age=1, email=wangxinpeng@qq.com, mobile=59660609063))
Result(code=200, msg=success, data=User(id=15, username=tanzhichen, fullname=覃智宸, age=25, email=tanzhichen@qq.com, mobile=075-00624335))
Result(code=200, msg=success, data=User(id=4, username=lu:haoxuan, fullname=吕皓轩, age=14, email=lu:haoxuan@qq.com, mobile=9548-30583153))
Result(code=200, msg=success, data=User(id=2, username=qiuyinxiang, fullname=邱胤祥, age=18, email=qiuyinxiang@qq.com, mobile=04148786960))
Result(code=200, msg=success, data=User(id=5, username=weiweicheng, fullname=魏伟诚, age=25, email=weiweicheng@qq.com, mobile=0960-77489940))
Result(code=200, msg=success, data=User(id=20, username=tanbin, fullname=谭彬, age=27, email=tanbin@qq.com, mobile=297-57401738))
Result(code=200, msg=success, data=User(id=18, username=husiyuan, fullname=胡思远, age=24, email=husiyuan@qq.com, mobile=0809-08658163))
Result(code=200, msg=success, data=User(id=16, username=shishengrui, fullname=石晟睿, age=26, email=shishengrui@qq.com, mobile=8205-70004359))
Result(code=200, msg=success, data=User(id=17, username=lu:zihan, fullname=吕子涵, age=0, email=lu:zihan@qq.com, mobile=162-35081974))
Result(code=200, msg=success, data=User(id=19, username=xionghaoran, fullname=熊昊然, age=19, email=xionghaoran@qq.com, mobile=588-09693393))
Result(code=200, msg=success, data=User(id=13, username=jiangyuebin, fullname=姜越彬, age=19, email=jiangyuebin@qq.com, mobile=472-74492380))
Result(code=200, msg=success, data=User(id=8, username=haoweicheng, fullname=郝伟诚, age=26, email=haoweicheng@qq.com, mobile=73205366322))
Result(code=200, msg=success, data=User(id=10, username=tanhongxuan, fullname=谭鸿煊, age=18, email=tanhongxuan@qq.com, mobile=78536254981))
Result(code=200, msg=success, data=User(id=9, username=xielicheng, fullname=谢立诚, age=18, email=xielicheng@qq.com, mobile=4364-05053591))
Result(code=200, msg=success, data=User(id=6, username=weiluyang, fullname=韦鹭洋, age=28, email=weiluyang@qq.com, mobile=92876761170))

c、 结果分析

常规方式需要调用20次,将结果返回。聚合方式仅需调用一次,就将结果返回

总结

本文主要讲解如何进行请求聚合,请求聚合主要适用于那些需要高效、批量处理数据或消息,并且对处理延迟有一定容忍度的场景。

我们在使用请求聚合时,相关的下游最好能提供批量接口

其次BufferTrigger是单线程消费,在并发很高的场景下可能会出现消费速度跟不上生产速度,这很容易导致full gc问题。所以如果有必要的话需要使用线程池来提升消费速度

demo链接

https://github.com/lyb-geek/springboot-learning/tree/master/springboot-buffer-trigger

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 如何使用buffer-trigger
  • 案例
  • 总结
  • demo链接
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档