首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >小白需懂的异步请求的处理

小白需懂的异步请求的处理

作者头像
用户7386338
发布2020-05-29 16:10:16
1.8K0
发布2020-05-29 16:10:16
举报
文章被收录于专栏:Java患者Java患者Java患者

同步的处理模式

在我们传统的服务中,当一个HTTP请求过来时,tomcat或者是其他的中间件都会有一个主线程来处理请求,所有的业务逻辑都会在这个线程里面处理完,最后会给出一个响应。由于我们的tomcat所管理的线程数是有限的,当线程到达一定程度后,再有请求过来将会无法去处理了。

异步处理模式

而异步处理是,当一个HTTP请求过来时,我们tomcat的主线程会去调起其他的副线程来执行我们的业务逻辑,当副线程执行完后再由我们的主线程把结果响应回去。在副线程处理业务逻辑中,我们的主线程是可以空闲出来的,然后去处理其他的请求的,也就是说采用这种模式去处理请求的话,我们的服务器的吞吐量会有一个明显的提升。

同步和异步的图解

同步处理的代码

下面我们用线程等待模拟我们的实际业务,通过访问这个接口,得到的结果显然是开始与结束相差1秒。

    @RequestMapping("/order")
    public String order() throws Exception{
        logger.info("主线程开始");
        Thread.sleep(1000);
        logger.info("主线程结束");
        return "success";
    }

使用Runnable异步处理

首先我们能想到的是使用Callable来实现我们的异步处理。

  @RequestMapping("/order")
  public Callable<String> order() throws Exception {
      logger.info("主线程开始");
      Callable<String> result = new Callable<String>() {
          @Override
          public String call() throws Exception {
              logger.info("副线程开始");
              Thread.sleep(1000);
              logger.info("副线程返回");
              return "success";
          }
      };
      logger.info("主线程结束");
      return result;
  }

经过这样的改动后,我们的业务处理放到副线程里面。当我们调用这个接口,控制台是:

并且我们通过浏览器的开发工具知道,这个请求一个耗时1秒多,也就是我们的业务消耗的时间。

为什么要写DeferredResult异步处理

虽然我们已经学会使用Callable去异步处理我们的请求,但是因为Runnable这种形式不能满足我们所有的场景。在使用Runnable处理时,如上面代码案例所示,我们的副线程必须由我们的主线程来调起的。

在我们正真的企业开发中,场景是比较复杂的,我们就以订单下单通常为例:

如上图所示,我们可以知道,接收下单的请求和真正处理下单的业务逻辑并不是在同台服务器上,当HTTP请求进到应用1里面,应用1会把他放到消息队列中,然后应用2去监听这个消息队列,当监听到这个消息队列中有下单的请求后,应用2开始处理业务逻辑。应用2处理完毕后,会把这个消息结果放进消息队列中,同时应用1里面有个一线程2监听消息队列,当它监听到有请求处理完毕后,它会根据消息的结果去返回一个HTTP响应。

在这个场景里面,线程1和线程2是完全隔离的。在这种相似的场景下面Runnable显然不能满足我们的需求了,这时就要用到我们的DeferredResult。下面我们来写一个段代码来看如何用DeferredResult来实现我们的异步请求。

使用DeferredResul异步处理

由于篇幅字数不宜过长,我们也不可能搭建一个消息队列的服务处理。我们把上面业务图分成4块内容。

我们使用下面这个类来进行模拟消息队列就好:

@Component
public class MockQueue {

    /**
     * 下单的的消息
     * 当这个属性有值的时候就认为有下单消息
     */
  private String placeOrder;

    /**
     * 订单完成的消息
     * 当这个属性有值的时候就认为有订单完成的消息
     */
  private String completeOrder;

  private Logger logger = LoggerFactory.getLogger(getClass());

  public String getPlaceOrder() {
    return placeOrder;
  }

  public void setPlaceOrder(String placeOrder) throws Exception {
    // 假设这是应用2单独的线程
    new Thread(() -> {
      logger.info("接到下单请求, " + placeOrder);
      try {
          // 模拟下单的过程
        Thread.sleep(1000);
      } catch (Exception e) {
        e.printStackTrace();
      }
      // 收到下单完成消息
      this.completeOrder = placeOrder;
      logger.info("下单请求处理完毕," + placeOrder);
    }).start();
  }

  public String getCompleteOrder() {
    return completeOrder;
  }

  public void setCompleteOrder(String completeOrder) {
    this.completeOrder = completeOrder;
  }

}

我们使用下面这块代码来模拟我们的线程1和线程2间发送DefferredResult:

@Component
public class DeferredResultHolder {

  private Map<String, DeferredResult<String>> map = new HashMap<String, DeferredResult<String>>();
  public Map<String, DeferredResult<String>> getMap() {
    return map;
  }

  public void setMap(Map<String, DeferredResult<String>> map) {
    this.map = map;
  }
}

再来看下我们主线程的代码,在这段代码中我们是看不到有其他的线程来参与的。

@RestController
public class AsyncController {

  @Autowired
  private MockQueue mockQueue;
  @Autowired
  private DeferredResultHolder deferredResultHolder;
  private Logger logger = LoggerFactory.getLogger(getClass());

  @RequestMapping("/order")
  public DeferredResult<String> order() throws Exception {
    logger.info("主线程开始");
        // 8为随机数作为我们的订单号
    String orderNumber = RandomStringUtils.randomNumeric(8);
    mockQueue.setPlaceOrder(orderNumber);

    DeferredResult<String> result = new DeferredResult<>();
    deferredResultHolder.getMap().put(orderNumber, result);
    return result;
  }

我们最后来实现我们线程2监听我们的消息队列的代码,这个类需要实现ApplicationListener。接口泛型是ContextRefreshedEvent,这个事件是Spring容器初始化完毕的事件。

@Component
public class QueueListener implements ApplicationListener<ContextRefreshedEvent> {

  @Autowired
  private MockQueue mockQueue;
  @Autowired
  private DeferredResultHolder deferredResultHolder;
  private Logger logger = LoggerFactory.getLogger(getClass());

  @Override
  public void onApplicationEvent(ContextRefreshedEvent event) {
    new Thread(() -> {
        // 模拟监听completeOrder有值
      while (true) {
                // 有值就处理
        if (StringUtils.isNotBlank(mockQueue.getCompleteOrder())) {

          String orderNumber = mockQueue.getCompleteOrder();
          logger.info("返回订单处理结果:"+orderNumber);
          // setResult设置最终要返回的信息
          deferredResultHolder.getMap().get(orderNumber).setResult("place order success");
          mockQueue.setCompleteOrder(null);
        }else{
            // 没值就睡1秒
          try {
            Thread.sleep(100);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }
    }).start();
  }
}

DeferredResult处理结果

实际上我们有3个线程,第一个是主线程负责接收HTTP请求,第二个线程是真正处理下单的逻辑,第三个线程把处理结果返回给客户端。而第一个线程和最后一个线程会用DefferredResult实现交互。

处理结果如下:

对于前台来说是毫无感知的,但我们的后台一共有三个线程处理这个业务。

异步相关的配置

@Configuration
public class WebConfig extends WebMvcConfigurerAdapter {

    /**
     * 同步的拦截器注册
     * @param registry
     */
    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(timeInterceptor);
    }

    /**
     * 异步的拦截器注册
     * @param configurer
     */
    @Override
    public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
        // 注册异步的拦截器
        configurer.registerCallableInterceptors();
        // 设置异步请求默认此时时间
        configurer.setDefaultTimeout();
        // 设置异步的线程池
        // 在默认的情况下。就像我们Runnable
        // spring副线程是不会去复用的,而是开启新的线程
        // 那么我们可以通过这个方法设置可从用的线程池
        configurer.setTaskExecutor();
    }
}
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-03-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Java患者 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 异步处理模式
  • 同步和异步的图解
  • 同步处理的代码
  • 使用Runnable异步处理
  • 为什么要写DeferredResult异步处理
  • 使用DeferredResul异步处理
  • DeferredResult处理结果
  • 异步相关的配置
相关产品与服务
消息队列 CMQ 版
消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档