前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >分布式系统模式8-Singular Update Queue

分布式系统模式8-Singular Update Queue

作者头像
java达人
发布2021-01-05 15:02:36
6100
发布2021-01-05 15:02:36
举报
文章被收录于专栏:java达人java达人

作者: Unmesh Joshi

译者: java达人

来源: https://martinfowler.com/articles/patterns-of-distributed-systems/

使用单个线程异步处理请求以维持顺序,而不会阻塞调用方。

问题

当状态需要由多个并发客户端更新时,我们需要安全更新,每次更新一个。考虑Write-Ahead Log 模式的示例。我们需要一次处理一个条目,即使有几个并发客户端试图写入。锁通常用于防止并发修改。但是,如果正在执行的任务很耗时,比如写入文件,那么阻塞所有其他调用线程直到任务完成,可能会严重影响整个系统吞吐量和延迟时间。有效地使用计算资源是很重要的,同时仍然保证每次执行一个。

解决方案

实现一个工作队列和一个在该队列上工作的线程。多个并发客户端可以向队列提交状态更改。但只有一个线程处理状态更改。这可以通过像golang语言中的goroutine和channel自然地实现。

典型的java实现如下:

SingularUpdateQueue有一个队列和一个要应用于队列元素的函数。它是从java.lang.Thread继承而来的,以确保它的单线程执行。

代码语言:javascript
复制

public class SingularUpdateQueue<Req, Res> extends Thread implements Logging {
    private ArrayBlockingQueue<RequestWrapper<Req, Res>> workQueue
            = new ArrayBlockingQueue<RequestWrapper<Req, Res>>(100);
    private Function<Req, Res> handler;
    private volatile boolean isRunning = false;

这里显示的实现使用Java的Thread类,只是为了演示基本的代码结构。在单线程中使用Java的ExecutorService也可以实现同样的效果。您可以参考书籍《Java并发实战》来了解更多关于使用ExecutorService的知识。

客户端在自己的线程上向队列提交请求。队列将每个请求包装在一个简单的包装器中,将其与future组合在一起,将future返回给客户端,以便在请求最终完成后客户端能够做出响应。

代码语言:javascript
复制
class SingularUpdateQueue… 
 public CompletableFuture<Res> submit(Req request) {
      try {
          var requestWrapper = new RequestWrapper<Req, Res>(request);
          workQueue.put(requestWrapper);
          return requestWrapper.getFuture();
      }
      catch (InterruptedException e) {
          throw new RuntimeException(e);
      }
  }
  
  class RequestWrapper<Req, Res> {
    private final CompletableFuture<Res> future;
    private final Req request;

    public RequestWrapper(Req request) {
        this.request = request;
        this.future = new CompletableFuture<Res>();
    }
    public CompletableFuture<Res> getFuture() { 
    return future; 
    }
    public Req getRequest() { return request; }

队列中的元素由SingularUpdateQueue从thread继承的单个专用线程处理。队列允许多个并发生成器添加要执行的任务。队列实现应该是线程安全的,并且不应该在竞争下增加太多开销。执行线程从队列中获取请求并一次处理一个请求。CompletableFuture随着任务执行的响应而完成。

代码语言:javascript
复制
class SingularUpdateQueue…  
@Override
  public void run() {
       isRunning = true;
       while(isRunning) {
           Optional<RequestWrapper<Req, Res>> item = take();
           item.ifPresent(requestWrapper -> {
               try {
                   Res response = handler.apply(requestWrapper.getRequest());
                   requestWrapper.complete(response);

               } catch (Exception e) {
                   requestWrapper.completeExceptionally(e);
               }
           });
      }
  }
  
  class RequestWrapper…  
  public void complete(Res response) {
      future.complete(response);
  }
  public void completeExceptionally(Exception e) {
      getFuture().completeExceptionally(e);
  }

值得注意的是,我们可以在从队列读取元素时设置一个超时,而不是无限期地阻塞。它允许我们在需要时退出线程,将isRunning设置为false,并且队列不会在为空时无限期阻塞而阻塞执行线程。因此,我们使用带有超时的poll方法,而不是无限期阻塞的take方法。这使我们能够完全地关闭执行线程。

代码语言:javascript
复制
class SingularUpdateQueue…  
private Optional<RequestWrapper<Req, Res>> take() {
      try {
          return Optional.ofNullable(workQueue.poll(300, TimeUnit.MILLISECONDS));

      } catch (InterruptedException e) {
          return Optional.empty();
      }
  }


  public void shutdown() {
      this.isRunning = false;
  }

例如,一个处理来自多个客户端的请求并更新预写日志的服务器可以使用SingularUpdateQueue,如下所示。

SingularUpdateQueue的客户端通过指定参数化类型和处理来自队列的消息时要运行的函数来设置它。在本例中,我们使用预写日志请求的消费者。这个使用者只有一个实例,它将控制对日志数据结构的访问。使用者需要将每个请求放入日志,然后返回响应。只有在将消息放入日志之后才能发送响应消息。我们使用SingularUpdateQueue来确保这些操作有一个可靠的顺序。

代码语言:javascript
复制
public class WalRequestConsumer implements Consumer<Message<RequestOrResponse>> {

    private final SingularUpdateQueue<Message<RequestOrResponse>, Message<RequestOrResponse>> walWriterQueue;
    private final WriteAheadLog wal;
    public WalRequestConsumer(Config config) {
        this.wal = WriteAheadLog.openWAL(config);
        walWriterQueue = new SingularUpdateQueue<>((message) -> {
            wal.writeEntry(serialize(message));
            return responseMessage(message);
        });
        startHandling();
    }

    private void startHandling() { this.walWriterQueue.start(); }

consumer的accept方法接收消息,将其放在队列中,在处理每个消息后发送响应。此方法在调用者的线程上运行,允许多个调用者同时调用accept。

代码语言:javascript
复制
@Override
public void accept(Message message) {
    CompletableFuture<Message<RequestOrResponse>> future = walWriterQueue.submit(message);
    future.whenComplete((responseMessage, error) -> {
        sendResponse(responseMessage);
    });
}

队列的选择

队列数据结构的选择是一个重要的问题。在JVM上,有各种数据结构可供选择:

• ArrayBlockingQueue(用于Kafka请求队列)

顾名思义,这是一个数组支持的阻塞队列。当需要创建一个固定的有界队列时,将使用此方法。一旦队列满了,生产者将阻塞。这提供了背压机制,在消费者速度慢而生产者速度快的情况下非常有用

• ConcurrentLinkedQueue和ForkJoinPool(在Akka actor mailbox实现中使用)

当我们没有消费者等待生产者时,可以使用ConcurrentLinkedQueue,但是有一些协调器只在任务被排队到ConcurrentLinkedQueue之后才调度消费者。

• LinkedBlockingDeque (Zookeeper和Kafka响应队列使用)

这主要用于需要在不阻塞生产者的情况下使用无界限队列时。我们需要小心这种选择,因为如果没有实现backpressure技术,队列可能很快就会被填满,并继续消耗所有内存

• RingBuffer(用于LMAX Disruptor)

正如LMAX Disruptor中所讨论的,有时候,任务处理是对延迟敏感的。因此,使用ArrayBlockingQueue在各处理阶段之间复制任务可能会增加不可接受的延迟。在这些情况下,可以使用RingBuffer在阶段之间传递任务。

使用Channel和轻量级线程。

这对于支持轻量级线程以及channel概念的语言或库(例如golang, kotlin)来说是很自然的选择。所有请求都被传递到一个要处理的channel。有一个单独的goroutine来处理所有消息更新状态。然后将响应写入单独的channel,并由单独的goroutine进行处理,然后将其发送回客户端。如以下代码所示,更新key的请求被传递到单个共享请求channel。

代码语言:javascript
复制
func (s *server) putKv(w http.ResponseWriter, r *http.Request)  {
  kv, err := s.readRequest(r, w)
  if err != nil {
    log.Panic(err)
    return
  }

  request := &requestResponse{
    request:         kv,
    responseChannel: make(chan string),
  }

  s.requestChannel <- request
  response := s.waitForResponse(request)
  w.Write([]byte(response))
}

请求在单个goroutine中处理,以更新所有状态。

代码语言:javascript
复制
func (s* server) Start() error {
  go s.serveHttp()

  go s.singularUpdateQueue()

  return nil
}

func (s *server) singularUpdateQueue() {
  for {
    select {
    case e := <-s.requestChannel:
      s.updateState(e)
      e.responseChannel <- buildResponse(e);
    }
  }
}

背压

当使用工作队列在线程之间进行通信时,背压可能是一个重要的问题。如果消费者速度慢而生产者速度快,队列可能很快就会被填满。除非采取一些预防措施,否则它可能会在大量任务填满队列时耗尽内存。通常,如果队列已满,则通过发送方阻塞来保持队列的边界。例如,java.util.concurrent.ArrayBlockingQueue 有两个添加元素的方法。如果数组已满,put方法将阻塞生产者。如果队列已满,add方法会抛出IllegalStateException,但不会阻塞生产者。了解可用于向队列添加任务的方法的语义是很重要的。在使用ArrayBlockingQueue时,应该使用put方法来阻塞发送方,提供背压机制。像reactive-stream这样的框架可以帮助实现从消费者到生产者的更复杂的背压机制。

其他注意点

• 任务链 大多数时候,任务处理需要通过任务链来完成。SingularUpdateQueue执行的结果需要传递给其他阶段。如上面的WalRequestConsumer所示,在记录被写入预写日志之后,需要通过套接字连接发送响应。这可以通过在单独的线程上执行由SingularUpdateQueue返回的future来完成。它也可以将任务提交给其他SingularUpdateQueue。• 外部服务调用。有时,作为SingularUpdateQueue中任务执行的一部分,需 要进行外部服务调用,并且SingularUpdateQueue的状态由服务调用的响应进行更新。在这个场景中,重要的是不要进行网络阻塞调用,否则它会阻塞正在处理所有任务的唯一线程。调用是异步进行的。必须注意,在异步服务调用的future callback中不要访问SingularUpdateQueue状态,因为这会在单独的线程中执行,这违背了单线程对SingularUpdateQueue中所有状态更改的目的。调用的结果应该添加到工作队列中,类似于其他事件或请求。

例子

所有的共识实现,如Zookeeper(ZAB)或etcd (RAFT),都需要严格按顺序处理请求,一次一个。它们使用类似的代码结构。

• Zookeeper的请求处理管道实现是由单线程请求处理器完成的

• Apache Kafka中的控制器需要从zookeeper更新多个并发事件的状态,它在一 个单独线程中处理这些事件,所有事件处理程序在一个队列中提交事件 • Cassandra使用SEDA架构,使用单线程阶段更新其Gossip状态。

• Etcd和其他基于go-lang的实现有一个单独的goroutine工作的请求通道,以 更新其状态

• LMAX-Diruptor结构遵循单一写入者原则,在更新本地状态时避免互斥。

java达人

ID:drjava

(长按或扫码识别)

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-12-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 java达人 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 问题
  • 解决方案
  • 队列的选择
  • 使用Channel和轻量级线程。
  • 背压
  • 其他注意点
  • 例子
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档