前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >分布式系统模式9-Single Socket Channel

分布式系统模式9-Single Socket Channel

作者头像
java达人
发布2021-01-05 15:02:54
5070
发布2021-01-05 15:02:54
举报
文章被收录于专栏:java达人java达人

作者: Unmesh Joshi

译者: java达人

来源: https://martinfowler.com/articles/patterns-of-distributed-systems/

通过使用一个TCP连接来维护发送到服务器的请求的顺序。

问题

当我们使用领导者和追随者模式时,我们需要确保领导者和每个追随者之间的信息保持有序,并对丢失的信息进行重试。与此同时保持较低的新连接成本,这样打开新连接不会增加系统的延迟。

解决方案

幸运的是,长期广泛使用的TCP机制提供了所有这些必要的特征。因此,我们可以通过保证一个follower和leader之间的所有通信都通过一个套接字通道来实现我们需要的通信。然后追随者使用一个Singular Update Queue序列化来自leader的更新

节点在连接打开后永远不会关闭它,并持续读取新请求。节点对每个连接使用一个专用线程来读写请求。如果使用了非阻塞io,则不需要每个连接一个线程。一个简单的基于线程的实现如下:

代码语言:javascript
复制
class SocketHandlerThread…

  @Override
  public void run() {
      try {
          //Continues to read/write to the socket connection till it is closed.
          while (true) {
              handleRequest();
          }
      } catch (Exception e) {
          getLogger().debug(e);
      }
  }

   private void handleRequest() {
      RequestOrResponse request = readRequestFrom(clientSocket);
      RequestId requestId = RequestId.valueOf(request.getRequestId());
      requestConsumer.accept(new Message<>(request, requestId, clientSocket));
    }

节点读取请求并将它们提交给一个单一的更新队列进行处理。一旦节点处理了请求,它就将响应写回套接字。

每当节点建立通信时,它就会打开一个套接字连接,用于与另一方的所有请求。

代码语言:javascript
复制
class SingleSocketChannel…

  public class SingleSocketChannel implements Closeable {
      final InetAddressAndPort address;
      final int heartbeatIntervalMs;
      private Socket clientSocket;
      private final OutputStream socketOutputStream;
      private final InputStream inputStream;

      public SingleSocketChannel(InetAddressAndPort address, int heartbeatIntervalMs) throws IOException {
          this.address = address;
          this.heartbeatIntervalMs = heartbeatIntervalMs;
          clientSocket = new Socket();
          clientSocket.connect(new InetSocketAddress(address.getAddress(), address.getPort()), heartbeatIntervalMs);
          clientSocket.setSoTimeout(heartbeatIntervalMs * 10); //set socket read timeout to be more than heartbeat.
          socketOutputStream = clientSocket.getOutputStream();
          inputStream = clientSocket.getInputStream();
      }

      public synchronized RequestOrResponse blockingSend(RequestOrResponse request) throws IOException {
          writeRequest(request);
          byte[] responseBytes = readResponse();
          return deserialize(responseBytes);
      }

      private void writeRequest(RequestOrResponse request) throws IOException {
          var dataStream = new DataOutputStream(socketOutputStream);
          byte[] messageBytes = serialize(request);
          dataStream.writeInt(messageBytes.length);
          dataStream.write(messageBytes);
      }

在连接上保持一个超时是很重要的,这样在出现错误时它就不会无限期地阻塞。我们使用HeartBeat机制,定期通过套接字通道发送请求,以使其保持活动状态。这个超时时间通常为心跳间隔的倍数,包含网络往返时间和一些可能的网络延迟。将连接超时设置为心跳间隔的10倍是合理的。

代码语言:javascript
复制
class SocketListener…

  private void setReadTimeout(Socket clientSocket) throws SocketException {
      clientSocket.setSoTimeout(config.getHeartBeatIntervalMs() * 10);
  }

通过单个通道发送请求可能会产生头部阻塞问题。为了避免这些问题,我们可以使用Request Pipeline。

例子

•Zookeeper使用一个套接字通道和每个追随者一个线程来完成所有的通信。•Kafka在follower和leader分区之间使用单个套接字通道来复制消息。•参考Raft共识算法的实现,LogCabin使用单套接字通道在领导者和追随者之间进行通信

java达人

ID:drjava

(长按或扫码识别)

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-12-26,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 java达人 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 问题
  • 解决方案
  • 例子
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档