前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >「高并发通信框架Netty4 源码解读(六)」NIO通道之Socket通道

「高并发通信框架Netty4 源码解读(六)」NIO通道之Socket通道

作者头像
源码之路
发布2020-09-04 10:21:17
6340
发布2020-09-04 10:21:17
举报
文章被收录于专栏:源码之路源码之路

socket网络通信太重要了。也是本专题的重中之重,所以小编单独写一篇文章来介绍Socket通道。Socket 通道有与文件通道不同的特征。新的 socket 通道类可以运行非阻塞模式并且是可选择的。这两个性能可以激活程序(如网络服务器和中间件组件)巨大的可伸缩性和灵活性。本节中我们会看到,再也没有为每个 socket 连接使用一个线程的必要了,也避免了管理大量线程所需的上下文交换总开销。借助新的 NIO 类,一个或几个线程就可以管理成百上千的活动 socket 连接了并且只有很少甚至可能没有性能损失。这一点相当相当的重要!!!

全部 socket 通道类(DatagramChannel、 SocketChannel 和ServerSocketChannel)都是由位于 java.nio.channels.spi 包中的 AbstractSelectableChannel 引申而来。这意味着我们可以用一个 Selector 对象来执行 socket 通道的有条件的选择。选择器下一篇再讲。

请注意 DatagramChannel 和 SocketChannel 实现定义读和写功能的接口而 ServerSocketChannel不实现。ServerSocketChannel 负责监听传入的连接和创建新的 SocketChannel 对象,它本身从不传输数据。

在我们具体讨论每一种 socket 通道前,您应该了解 socket 和 socket 通道之间的关系。之前的博文有写道,通道是一个连接 I/O 服务导管并提供与该服务交互的方法。就某个 socket 而言,它不会再次实现与之对应的 socket 通道类中的 socket 协议 API,而 java.net 中已经存在的 socket 通道都可以被大多数协议操作重复使用。

全部 socket 通道类(DatagramChannel、 SocketChannel 和 ServerSocketChannel)在被实例化时都会创建一个对等 socket 对象。这些是我们所熟悉的来自 java.net 的类(Socket、 ServerSocket和 DatagramSocket),它们已经被更新以识别通道。对等 socket 可以通过调用 socket( )方法从一个通道上获取。此外,这三个 java.net 类现在都有 getChannel( )方法。

虽然每个 socket 通道(在 java.nio.channels 包中)都有一个关联的 java.net socket 对象,却并非所有的 socket 都有一个关联的通道。如果您用传统方式(直接实例化)创建了一个Socket 对象,它就不会有关联的 SocketChannel 并且它的 getChannel( )方法将总是返回 null。

Socket 通道委派协议操作给对等 socket 对象。如果在通道类中存在似乎重复的 socket 方法,那么将有某个新的或者不同的行为同通道类上的这个方法相关联。

非阻塞模式

Socket 通道可以在非阻塞模式下运行。这个陈述虽然简单却有着深远的含义。传统 Java socket的阻塞性质曾经是 Java 程序可伸缩性的最重要制约之一。非阻塞 I/O 是许多复杂的、高性能的程序构建的基础。

要把一个 socket 通道置于非阻塞模式,我们要依靠所有 socket 通道类的公有超级类:SelectableChannel。下面的方法就是关于通道的阻塞模式的:

代码语言:javascript
复制
public abstract class SelectableChannel
extends AbstractChannel
implements Channel
{
  // This is a partial API listing
  public abstract void configureBlocking (boolean block) throws IOException;
  public abstract boolean isBlocking( );
  public abstract Object blockingLock( );
}

有条件的选择(readiness selection)是一种可以用来查询通道的机制,该查询可以判断通道是否准备好执行一个目标操作,如读或写。非阻塞 I/O 和可选择性是紧密相连的,那也正是管理阻塞模式的 API 代码要在 SelectableChannel 超级类中定义的原因。 SelectableChannel 的剩余 API 将在下篇讨论。 设置或重新设置一个通道的阻塞模式是很简单的,只要调用 configureBlocking( )方法即可,传递参数值为 true 则设为阻塞模式,参数值为 false 值设为非阻塞模式。真的,就这么简单!您可以通过调用 isBlocking( )方法来判断某个 socket 通道当前处于哪种模式:

代码语言:javascript
复制
SocketChannel sc = SocketChannel.open( );
sc.configureBlocking (false); // nonblocking
...
if ( ! sc.isBlocking( )) {
  doSomething (cs);
}

服务器端的使用经常会考虑到非阻塞 socket 通道,因为它们使同时管理很多 socket 通道变得更容易。但是,在客户端使用一个或几个非阻塞模式的 socket 通道也是有益处的,例如,借助非阻塞socket 通道, GUI 程序可以专注于用户请求并且同时维护与一个或多个服务器的会话。在很多程序上,非阻塞模式都是有用的。

偶尔地,我们也会需要防止 socket 通道的阻塞模式被更改。 API 中有一个 blockingLock( )方法,该方法会返回一个非透明的对象引用。返回的对象是通道实现修改阻塞模式时内部使用的。只有拥有此对象的锁的线程才能更改通道的阻塞模式(对象的锁是用同步的 Java 密码获取的。对于确保在执行代码的关键部分时 socket 通道的阻塞模式不会改变以及在不影响其他线程的前提下暂时改变阻塞模式来说,这个方法都是非常方便的。

代码语言:javascript
复制
Socket socket = null;
Object lockObj = serverChannel.blockingLock( );
// have a handle to the lock object, but haven't locked it yet
// may block here until lock is acquired
synchronize (lockObj)
{
// This thread now owns the lock; mode can't be changed
  boolean prevState = serverChannel.isBlocking( );
  serverChannel.configureBlocking (false);
  socket = serverChannel.accept( );
  serverChannel.configureBlocking (prevState);
}
// lock is now released, mode is allowed to change
if (socket != null) {
  doSomethingWithTheSocket (socket);
}

.

ServerSocketChannel

让我们从最简单的 ServerSocketChannel 来开始对 socket 通道类的讨论。以下是ServerSocketChannel 的完整 API:

代码语言:javascript
复制
public abstract class ServerSocketChannel extends AbstractSelectableChannel
{
  public static ServerSocketChannel open( ) throws IOException
  public abstract ServerSocket socket( );
  public abstract ServerSocket accept( ) throws IOException;
  public final int validOps( )
}

ServerSocketChannel 是一个基于通道的 socket 监听器。它同我们所熟悉的 java.net.ServerSocket执行相同的基本任务,不过它增加了通道语义,因此能够在非阻塞模式下运行。

用静态的 open( )工厂方法创建一个新的 ServerSocketChannel 对象,将会返回同一个未绑定的java.net.ServerSocket 关联的通道。该对等 ServerSocket 可以通过在返回的 ServerSocketChannel 上调用 socket( )方法来获取。作为 ServerSocketChannel 的对等体被创建的 ServerSocket 对象依赖通道实现。这些 socket 关联的 SocketImpl 能识别通道。通道不能被封装在随意的 socket 对象外面。

由于 ServerSocketChannel 没有 bind( )方法,因此有必要取出对等的 socket 并使用它来绑定到一个端口以开始监听连接。我们也是使用对等 ServerSocket 的 API 来根据需要设置其他的 socket 选项。

代码语言:javascript
复制
ServerSocketChannel ssc = ServerSocketChannel.open( );
ServerSocket serverSocket = ssc.socket( );
// Listen on port 1234
serverSocket.bind (new InetSocketAddress (1234));

同它的对等体 java.net.ServerSocket 一样, ServerSocketChannel 也有 accept( )方法。一旦您创建了一个 ServerSocketChannel 并用对等 socket 绑定了它,然后您就可以在其中一个上调用 accept( )。

如果您选择在 ServerSocket 上调用 accept( )方法,那么它会同任何其他的 ServerSocket 表现一样的行为:总是阻塞并返回一个 java.net.Socket 对象。如果您选择在 ServerSocketChannel 上调用 accept( )方法则会返回 SocketChannel 类型的对象,返回的对象能够在非阻塞模式下运行。

假设系统已经有一个安全管理器(security manager),两种形式的方法调用都执行相同的安全检查。如果以非阻塞模式被调用,当没有传入连接在等待时, ServerSocketChannel.accept( )会立即返回 null。正是这种检查连接而不阻塞的能力实现了可伸缩性并降低了复杂性。可选择性也因此得到实现。我们可以使用一个选择器实例来注册一个ServerSocketChannel 对象以实现新连接到达时自动通知的功能。下例演示了如何使用一个非阻塞的 accept( )方法。

代码语言:javascript
复制
public class ChannelAccept {
    public static final String GREETING = "Hello I must be going.\r\n";

    public static void main(String[] argv)
            throws Exception {
        int port = 1234; // default
        ByteBuffer buffer = ByteBuffer.wrap(GREETING.getBytes());
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.socket().bind(new InetSocketAddress(port));
        ssc  .configureBlocking(false);
        while (true) {
            System.out.println("Waiting for connections");
            SocketChannel sc = ssc.accept();
            if (sc == null) {
// no connections, snooze a while
                Thread.sleep(2000);
            } else {
                System.out.println("Incoming connection from: "
                        + sc.socket().getRemoteSocketAddress());
                buffer.rewind();
                sc.write(buffer);
                sc.close();
            }
        }
    }
}

SocketChannel

下面开始学习 SocketChannel,它是使用最多的 socket 通道类:

代码语言:javascript
复制
public abstract class SocketChannel extends AbstractSelectableChannel implements ByteChannel, ScatteringByteChannel, GatheringByteChannel
{
  // This is a partial API listing
  public static SocketChannel open( ) throws IOException
  public static SocketChannel open (InetSocketAddress remote) throws IOException
  public abstract Socket socket( );
  public abstract boolean connect (SocketAddress remote) throws IOException;
  public abstract boolean isConnectionPending( );
  public abstract boolean finishConnect( ) throws IOException;
  public abstract boolean isConnected( );
  public final int validOps( )
}

Socket 和 SocketChannel 类封装点对点、有序的网络连接,类似于我们所熟知并喜爱的 TCP/IP网络连接SocketChannel 扮演客户端发起同一个监听服务器的连接。直到连接成功,它才能收到数据并且只会从连接到的地址接收。每个 SocketChannel 对象创建时都是同一个对等的 java.net.Socket 对象串联的。静态的 open( )方法可以创建一个新的 SocketChannel 对象,而在新创建的 SocketChannel 上调用 socket( )方法能返回它对等的 Socket 对象;在该 Socket 上调用 getChannel( )方法则能返回最初的那个 SocketChannel。

虽然每个 SocketChannel 对象都会创建一个对等的 Socket 对象,反过来却不成立。直接创建的 Socket 对象不会关联 SocketChannel 对象,它们的getChannel( )方法只返回 null。

新创建的 SocketChannel 虽已打开却是未连接的。在一个未连接的 SocketChannel 对象上尝试一个 I/O 操作会导致 NotYetConnectedException 异常。我们可以通过在通道上直接调用 connect( )方法或在通道关联的 Socket 对象上调用 connect( )来将该 socket 通道连接。一旦一个 socket 通道被连接,它将保持连接状态直到被关闭。您可以通过调用布尔型的 isConnected( )方法来测试某个SocketChannel 当前是否已连接。

第二种带 InetSocketAddress 参数形式的 open( )是在返回之前进行连接的便捷方法。这段代码:

代码语言:javascript
复制
SocketChannel socketChannel = SocketChannel.open (new InetSocketAddress ("somehost", somePort));

等价于下面这段代码:

代码语言:javascript
复制
SocketChannel socketChannel = SocketChannel.open( );
socketChannel.connect (new InetSocketAddress ("somehost", somePort));

如果您选择使用传统方式进行连接——通过在对等 Socket 对象上调用 connect( )方法,那么传统的连接语义将适用于此。线程在连接建立好或超时过期之前都将保持阻塞。如果您选择通过在通上直接调用 connect( )方法来建立连接并且通道处于阻塞模式(默认模式),那么连接过程实际上是一样的。

在 SocketChannel 上并没有一种 connect( )方法可以让您指定超时(timeout)值,当 connect( )方法在非阻塞模式下被调用时 SocketChannel 提供并发连接:它发起对请求地址的连接并且立即返回值。如果返回值是 true,说明连接立即建立了(这可能是本地环回连接);如果连接不能立即建立, connect( )方法会返回 false 且并发地继续连接建立过程。

面向流的的 socket 建立连接状态需要一定的时间,因为两个待连接系统之间必须进行包对话以建立维护流 socket 所需的状态信息。跨越开放互联网连接到远程系统会特别耗时。假如某个SocketChannel 上当前正由一个并发连接,isConnectPending( )方法就会返回 true 值。调用 finishConnect( )方法来完成连接过程,该方法任何时候都可以安全地进行调用。假如在一个非阻塞模式的 SocketChannel 对象上调用 finishConnect( )方法,将可能出现下列情形之一:

  • connect( )方法尚未被调用。那么将产生 NoConnectionPendingException 异常。
  • 连接建立过程正在进行,尚未完成。那么什么都不会发生, finishConnect( )方法会立即返回false 值。
  • 在非阻塞模式下调用 connect( )方法之后, SocketChannel 又被切换回了阻塞模式。那么如果有必要的话,调用线程会阻塞直到连接建立完成, finishConnect( )方法接着就会返回 true值。
  • 在初次调用 connect( )或最后一次调用 finishConnect( )之后,连接建立过程已经完成。那么SocketChannel 对象的内部状态将被更新到已连接状态, finishConnect( )方法会返回 true值,然后 SocketChannel 对象就可以被用来传输数据了。
  • 连接已经建立。那么什么都不会发生, finishConnect( )方法会返回 true 值。

当通道处于中间的连接等待(connection-pending)状态时,您只可以调用 finishConnect( )、isConnectPending( )或 isConnected( )方法。一旦连接建立过程成功完成, isConnected( )将返回 true值。

代码语言:javascript
复制
InetSocketAddress addr = new InetSocketAddress (host, port);
SocketChannel sc = SocketChannel.open( );
sc.configureBlocking (false);
sc.connect (addr);
while ( ! sc.finishConnect( )) {
  doSomethingElse( );
}
doSomethingWithChannel (sc);
sc.close( );

一段用来管理异步连接的可用代码

代码语言:javascript
复制
public class ConnectAsync {
    public static void main(String[] argv) throws Exception {
        String host = "localhost";
        int port = 80;
        if (argv.length == 2) {
            host = argv[0];
            port = Integer.parseInt(argv[1]);
        }
        InetSocketAddress addr = new InetSocketAddress(host, port);
        SocketChannel sc = SocketChannel.open();
        sc.configureBlocking(false);
        System.out.println("initiating connection");
        sc.connect(addr);
        while (!sc.finishConnect()) {
            doSomethingUseful();
        }
        System.out.println("connection established");
// Do something with the connected socket
// The SocketChannel is still nonblocking
        sc.close();
    }

    private static void doSomethingUseful() {
        System.out.println("doing something useless");
    }
}

如果尝试异步连接失败,那么下次调用 finishConnect( )方法会产生一个适当的经检查的异常以指出问题的性质。通道然后就会被关闭并将不能被连接或再次使用。与连接相关的方法使得我们可以对一个通道进行轮询并在连接进行过程中判断通道所处的状态。

Socket 通道是线程安全的。并发访问时无需特别措施来保护发起访问的多个线程,不过任何时候都只有一个读操作和一个写操作在进行中。请记住, sockets 是面向流的而非包导向的。它们可以保证发送的字节会按照顺序到达但无法承诺维持字节分组。某个发送器可能给一个 socket 写入了20 个字节而接收器调用 read( )方法时却只收到了其中的 3 个字节。剩下的 17 个字节还是传输中。由于这个原因,让多个不配合的线程共享某个流 socket 的同一侧绝非一个好的设计选择。connect( )和 finishConnect( )方法是互相同步的,并且只要其中一个操作正在进行,任何读或写的方法调用都会阻塞,即使是在非阻塞模式下。如果此情形下您有疑问或不能承受一个读或写操作在某个通道上阻塞,请用 isConnected( )方法测试一下连接状态。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 非阻塞模式
  • ServerSocketChannel
  • SocketChannel
相关产品与服务
消息队列 TDMQ
消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档