从源码中分析 Hadoop 的 RPC 机制

RPC是Remote Procedure Call(远程过程调用)的简称,这一机制都要面对两个问题

  • 对象调用方式;
  • 序列/反序列化机制

在此之前,我们有必要了解什么是架构层次的协议。通俗一点说,就是我把某些接口和接口中的方法称为协议,客户端和服务端只要实现这些接口中的方法就可以进行通信了,从这个角度来说,架构层次协议的说法就可以成立了。Hadoop的RPC机制正是采用了这种“架构层次的协议”,有一整套作为协议的接口,如下图:

Hadoop的RPC组件,依赖于Hadoop Writable接口类型的支持,要求每个实现类都要确保将本类的对象正确序列化与反序列化。因此RPC使用Java动态代理与反射实现对象调用方式,客户端到服务器数据的序列化与反序列化由Hadoop框架或用户自己来实现,也就是数据组装时定制的。RPC架构图如下:

动态代理

主要用来做方法的增强,让你可以在不修改源码的情况下,增强一些方法,在方法执行前后做任何你想做的事情(甚至根本不去执行这个方法),因为在InvocationHandler的invoke方法中,你可以直接获取正在调用方法对应的Method对象,具体应用的话,比如可以添加调用日志,做事务控制等。

这个接口的实现部署在其它服务器上,在编写客户端代码的时候,没办法直接调用接口方法,因为接口是不能直接生成对象的,这个时候就可以考虑代理模式(动态代理)了,通过Proxy.newProxyInstance代理一个该接口对应的InvocationHandler对象,然后在InvocationHandler的invoke方法内封装通讯细节就可以了。具体的应用,最经典的当然是Java标准库的RMI,其它比如hessian,各种webservice框架中的远程调用,大致都是这么实现的。

VersionedProtocol接口

VersionedProtocol是所有RPC协议接口的父接口,其中只有一个方法:getProtocolVersion()

HDFS相关

  • ClientDatanodeProtocol:一个客户端和datanode之间的协议接口,用于数据块恢复。
  • ClientProtocol:client与Namenode交互的接口,所有控制流的请求均在这里,如:创建文件、删除文件等;
  • DatanodeProtocol : Datanode与Namenode交互的接口,如心跳、blockreport等; NamenodeProtocol:SecondaryNode与Namenode交互的接口。

Mapreduce相关

  • InterDatanodeProtocol:Datanode内部交互的接口,用来更新block的元数据;
  • InnerTrackerProtocol:TaskTracker与JobTracker交互的接口,功能与DatanodeProtocol相似;
  • JobSubmissionProtocol:JobClient与JobTracker交互的接口,用来提交Job、获得Job等与Job相关的操作;
  • TaskUmbilicalProtocol:Task中子进程与母进程交互的接口,子进程即map、reduce等操作,母进程即TaskTracker,该接口可以回报子进程的运行状态(词汇扫盲: umbilical 脐带的, 关系亲密的) 。

RPC实现流程

简单来说,Hadoop RPC=动态代理+定制的二进制流。分布式对象一般都会要求根据接口生成存根和框架。如 CORBA,可以通过 IDL,生成存根和框架。在ipc.RPC类中有一些内部类,下边简单介绍下

  • Invocation:用于封装方法名和参数,作为数据传输层,相当于VO吧。
  • ClientCache:用于存储client对象,用socket factory作为hash key,存储结构为hashMap <SocketFactory, Client>
  • Invoker:是动态代理中的调用实现类,继承了InvocationHandler.
  • Server:是ipc.Server的实现类。我们就需要这样的步骤了。

上类图

从以上的分析可以知道,Invocation类仅作为VO,ClientCache类只是作为缓存,而Server类用于服务端的处理,他们都和客户端的数据流和业务逻辑没有关系。为了分析 Invoker,我们需要介绍一些 Java 反射实现 Dynamic Proxy 的背景。

Dynamic Proxy 是由两个 class 实现的:java.lang.reflect.Proxyjava.lang.reflect.InvocationHandler,后者是一个接口。

所谓 Dynamic Proxy 是这样一种 class:它是在运行时生成的 class,在生成它时你必须提供一组 interface 给它,然后该 class就宣称它实现了这些 interface。

这个 Dynamic Proxy 其实就是一个典型的 Proxy 模式,它丌会替你作实质性的工作,在生成它的实例时你必须提供一个handler,由它接管实际的工作。

这个 handler,在 Hadoop 的 RPC 中,就是 Invoker 对象。 我们可以简单地理解:就是你可以通过一个接口来生成一个类,这个类上的所有方法调用,都会传递到你生成类时传递的 InvocationHandler 实现中。

在 Hadoop 的 RPC 中,Invoker 实现了 InvocationHandler 的 invoke 方法(invoke 方法也是 InvocationHandler 的唯一方法)。 Invoker 会把所有跟这次调用相关的调用方法名,参数类型列表,参数列表打包,然后利用前面我们分析过的 Client,通过 socket 传递到服务器端。就是说,你在 proxy 类上的任何调用,都通过 Client 发送到远方的服务器上。

Invoker 使用 Invocation。 Invocation 封装了一个过程调用的所有相关信息,它的主要属性有: methodName,调用方法名,parameterClasses,调用方法参数的类型列表和 parameters,调用方法参数。注意,它实现了 Writable 接口,可以串行化。

RPC.Server 实现了 org.apache.hadoop.ipc.Server,你可以把一个对象,通过 RPC,升级成为一个服务器。服务器接收到的请求(通过 Invocation),解串行化以后,就发成了方法名,方法参数列表和参数列表。调用 Java 反射,我们就可以调用对应的对象的方法。调用的结果再通过 socket,迒回给客户端,客户端把结果解包后,就可以返回给Dynamic Proxy 的使用者了。

我们接下来去研究的就是RPC.Invoker类中的invoke()方法了,代码如下

public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable
{
    ……
    ObjectWritable value = (ObjectWritable)
                           client.call(new Invocation(method, args), remoteId);
    ……
    return value.get();
}

一般我们看到的动态代理的invoke()方法中总会有 method.invoke(ac, arg); 这句代码。而上面代码中却没有。其实使用 method.invoke(ac, arg); 是在本地JVM中调用;而在hadoop中,是将数据发送给服务端,服务端将处理的结果再返回给客户端,所以这里的invoke()方法必然需要进行网络通信。而网络通信就是下面的这段代码实现的:

ObjectWritable value = (ObjectWritable)
client.call(new Invocation(method, args), remoteId);

Invocation类在这里封装了方法名和参数,充当VO。其实这里网络通信只是调用了Client类的call()方法。

ipc.Client源码

接下来分析一下ipc.Client源码,在此之前我们得明确下我们的目标,总结出了以下几个问题

  • 客户端和服务端的连接是怎样建立的?
  • 客户端是怎样给服务端发送数据的?
  • 客户端是怎样获取服务端的返回数据的?

基于这三个问题,我们开始分析ipc.Client源码,主要包含以下几个类

  • Call:用于封装Invocation对象,作为VO,写到服务端,同时也用于存储从服务端返回的数据。
  • Connection:用以处理远程连接对象。继承了Thread
  • ConnectionId:唯一确定一个连接

Question1:客户端和服务端的连接是怎样建立的?

Client类中的cal()方法如下

public Writable call(Writable param, ConnectionId remoteId)
throws InterruptedException, IOException
{
    Call call = new Call(param);       //将传入的数据封装成call对象
    Connection connection = getConnection(remoteId, call);   //获得一个连接
    connection.sendParam(call);     // 向服务端发送call对象
    boolean interrupted = false;
    synchronized (call)
    {
        while (!call.done)
        {
            try
            {
                call.wait(); // 等待结果的返回,在Call类的callComplete()方法里有notify()方法用于唤醒线程
            }
            catch (InterruptedException ie)
            {
                // 因中断异常而终止,设置标志interrupted为true
                interrupted = true;
            }
        }
        if (interrupted)
        {
            Thread.currentThread().interrupt();
        }

        if (call.error != null)
        {
            if (call.error instanceof RemoteException)
            {
                call.error.fillInStackTrace();
                throw call.error;
            }
            else     // 本地异常
            {
                throw wrapException(remoteId.getAddress(), call.error);
            }
        }
        else
        {
            return call.value; //返回结果数据
        }
    }
}

具体代码的作用我已做了注释,所以这里不再赘述。分析代码后,我们会发现和网络通信有关的代码只会是下面的两句了:

  Connection connection = getConnection(remoteId, call);   //获得一个连接
  connection.sendParam(call);      // 向服务端发送call对象

先看看是怎么获得一个到服务端的连接吧,下面贴出ipc.Client类中的getConnection()方法。

private Connection getConnection(ConnectionId remoteId,
                                 Call call)
throws IOException, InterruptedException
{
    if (!running.get())
    {
        // 如果client关闭了
        throw new IOException("The client is stopped");
    }
    Connection connection;
    //如果connections连接池中有对应的连接对象,就不需重新创建了;如果没有就需重新创建一个连接对象。
    //但请注意,该//连接对象只是存储了remoteId的信息,其实还并没有和服务端建立连接。
    do
    {
        synchronized (connections)
        {
            connection = connections.get(remoteId);
            if (connection == null)
            {
                connection = new Connection(remoteId);
                connections.put(remoteId, connection);
            }
        }
    }
    while (!connection.addCall(call));   //将call对象放入对应连接中的calls池,就不贴出源码了
    //这句代码才是真正的完成了和服务端建立连接哦~
    connection.setupIOstreams();
    return connection;
}

Client.Connection类中的setupIOstreams()方法如下:

private synchronized void setupIOstreams() throws InterruptedException
{
    ……
    try
    {
        ……
        while (true)
        {
            setupConnection();  //建立连接
            InputStream inStream = NetUtils.getInputStream(socket);     //获得输入流
            OutputStream outStream = NetUtils.getOutputStream(socket);  //获得输出流
            writeRpcHeader(outStream);
            ……
            this.in = new DataInputStream(new BufferedInputStream
                                          (new PingInputStream(inStream)));   //将输入流装饰成DataInputStream
            this.out = new DataOutputStream
            (new BufferedOutputStream(outStream));   //将输出流装饰成DataOutputStream
            writeHeader();
            // 跟新活动时间
            touch();
            //当连接建立时,启动接受线程等待服务端传回数据,注意:Connection继承了Tread
            start();
            return;
        }
    }
    catch (IOException e)
    {
        markClosed(e);
        close();
    }
}

再有一步我们就知道客户端的连接是怎么建立的啦,下面贴出Client.Connection类中的setupConnection()方法

  private synchronized void setupConnection() throws IOException {
      short ioFailures = 0;
      short timeoutFailures = 0;
      while (true) {
        try {
          this.socket = socketFactory.createSocket(); //终于看到创建socket的方法了
          this.socket.setTcpNoDelay(tcpNoDelay);
         ……
          // 设置连接超时为20s
          NetUtils.connect(this.socket, remoteId.getAddress(), 20000);
          this.socket.setSoTimeout(pingInterval);
          return;
        } catch (SocketTimeoutException toe) {
          /* 设置最多连接重试为45次。
           * 总共有20s*45 = 15 分钟的重试时间。
           */
          handleConnectionFailure(timeoutFailures++, 45, toe);
        } catch (IOException ie) {
          handleConnectionFailure(ioFailures++, maxRetries, ie);
        }
      }
    }

不难看出客户端的连接是创建一个普通的socket进行通信的。

Question2:客户端是怎样给服务端发送数据的?

Client.Connection类的sendParam()方法如下

public void sendParam(Call call) {
      if (shouldCloseConnection.get()) {
        return;
      }
      DataOutputBuffer d=null;
      try {
        synchronized (this.out) {
          if (LOG.isDebugEnabled())
            LOG.debug(getName() + " sending #" + call.id);
          //创建一个缓冲区
          d = new DataOutputBuffer();
          d.writeInt(call.id);
          call.param.write(d);
          byte[] data = d.getData();
          int dataLength = d.getLength();
          out.writeInt(dataLength);        //首先写出数据的长度
          out.write(data, 0, dataLength); //向服务端写数据
          out.flush();
        }
      } catch(IOException e) {
        markClosed(e);
      } finally {
        IOUtils.closeStream(d);
      }
    }

Question3:客户端是怎样获取服务端的返回数据的?

Client.Connection类和Client.Call类中的相关方法如下

Method1:


  public void run() {
      ……
      while (waitForWork()) {
        receiveResponse();  //具体的处理方法
      }
      close();
     ……
}

Method2:

private void receiveResponse() {
      if (shouldCloseConnection.get()) {
        return;
      }
      touch();
      try {
        int id = in.readInt();                    // 阻塞读取id
        if (LOG.isDebugEnabled())
          LOG.debug(getName() + " got value #" + id);
          Call call = calls.get(id);    //在calls池中找到发送时的那个对象
        int state = in.readInt();     // 阻塞读取call对象的状态
        if (state == Status.SUCCESS.state) {
          Writable value = ReflectionUtils.newInstance(valueClass, conf);
          value.readFields(in);           // 读取数据
        //将读取到的值赋给call对象,同时唤醒Client等待线程,贴出setValue()代码Method3
          call.setValue(value);              
          calls.remove(id);               //删除已处理的call    
        } else if (state == Status.ERROR.state) {
        ……
        } else if (state == Status.FATAL.state) {
        ……
        }
      } catch (IOException e) {
        markClosed(e);
      }
}

Method3:

public synchronized void setValue(Writable value) {
      this.value = value;
      callComplete();   //具体实现
}
protected synchronized void callComplete() {
      this.done = true;
      notify();         // 唤醒client等待线程
    }

启动一个处理线程,读取从服务端传来的call对象,将call对象读取完毕后,唤醒client处理线程。就这么简单,客户端就获取了服务端返回的数据。客户端的源码分析暂时到这,下面我们来分析Server端的源码

ipc.Server源码分析

内部类如下

  • Call :用于存储客户端发来的请求
  • Listener : 监听类,用于监听客户端发来的请求,同时Listener内部还有一个静态类,Listener.Reader,当监听器监听到用户请求,便让Reader读取用户请求。
  • Responder :响应RPC请求类,请求处理完毕,由Responder发送给请求客户端。
  • Connection :连接类,真正的客户端请求读取逻辑在这个类中。 Handler :请求处理类,会循环阻塞读取callQueue中的call对象,并对其进行操作。

初始化Server

hadoop是怎样初始化RPC的Server端的呢?

Namenode初始化时一定初始化了RPC的Sever端,那我们去看看Namenode的初始化源码

private void initialize(Configuration conf) throws IOException {
   ……
    // 创建 rpc server
    InetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf);
    if (dnSocketAddr != null) {
      int serviceHandlerCount =
        conf.getInt(DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
                    DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
      //获得serviceRpcServer
      this.serviceRpcServer = RPC.getServer(this, dnSocketAddr.getHostName(), 
          dnSocketAddr.getPort(), serviceHandlerCount,
          false, conf, namesystem.getDelegationTokenSecretManager());
      this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
      setRpcServiceServerAddress(conf);
}
//获得server
    this.server = RPC.getServer(this, socAddr.getHostName(),
        socAddr.getPort(), handlerCount, false, conf, namesystem
        .getDelegationTokenSecretManager());

   ……
    this.server.start();  //启动 RPC server   Clients只允许连接该server
    if (serviceRpcServer != null) {
      serviceRpcServer.start();  //启动 RPC serviceRpcServer 为HDFS服务的server
    }
    startTrashEmptier(conf);
  }

RPC的server对象是通过ipc.RPC类的getServer()方法获得的。ipc.RPC类中的getServer()源码如下

public static Server getServer(final Object instance, final String bindAddress, final int port,
                               final int numHandlers,
                               final boolean verbose, Configuration conf,
                               SecretManager <? extends TokenIdentifier > secretManager)
throws IOException
{
    return new Server(instance, conf, bindAddress, port, numHandlers, verbose, secretManager);
}

getServer()是一个创建Server对象的工厂方法,但创建的却是RPC.Server类的对象。

运行Server

初始化Server后,Server端就运行起来了,看看ipc.Server的start()源码

 /** 启动服务 */
public synchronized void start()
{
    responder.start();  //启动responder
    listener.start();   //启动listener
    handlers = new Handler[handlerCount];

    for (int i = 0; i < handlerCount; i++)
    {
        handlers[i] = new Handler(i);
        handlers[i].start();   //逐个启动Handler
    }
}

Server处理请求

  • 分析源码得知,Server端采用Listener监听客户端的连接,下面先分析一下Listener的构造函数
    public Listener() throws IOException {
      address = new InetSocketAddress(bindAddress, port);
      // 创建ServerSocketChannel,并设置成非阻塞式
      acceptChannel = ServerSocketChannel.open();
      acceptChannel.configureBlocking(false);

      // 将server socket绑定到本地端口
      bind(acceptChannel.socket(), address, backlogLength);
      port = acceptChannel.socket().getLocalPort(); 
      // 获得一个selector
      selector= Selector.open();
      readers = new Reader[readThreads];
      readPool = Executors.newFixedThreadPool(readThreads);
      //启动多个reader线程,为了防止请求多时服务端响应延时的问题
      for (int i = 0; i < readThreads; i++) {       
        Selector readSelector = Selector.open();
        Reader reader = new Reader(readSelector);
        readers[i] = reader;
        readPool.execute(reader);
      }
      // 注册连接事件
      acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
      this.setName("IPC Server listener on " + port);
      this.setDaemon(true);
    }

在启动Listener线程时,服务端会一直等待客户端的连接,下面贴出Server.Listener类的run()方法

public void run()
{
    ……
    while (running)
    {
        SelectionKey key = null;
        try
        {
            selector.select();
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while (iter.hasNext())
            {
                key = iter.next();
                iter.remove();
                try
                {
                    if (key.isValid())
                    {
                        if (key.isAcceptable())
                            doAccept(key);     //具体的连接方法
                    }
                }
                catch (IOException e)
                {
                }
                key = null;
            }
        }
        catch (OutOfMemoryError e)
        {
            ……
        }

Server.Listener类中doAccept ()方法中的关键源码如下:

    void doAccept(SelectionKey key) throws IOException,  OutOfMemoryError {
      Connection c = null;
      ServerSocketChannel server = (ServerSocketChannel) key.channel();
      SocketChannel channel;
      while ((channel = server.accept()) != null) { //建立连接
        channel.configureBlocking(false);
        channel.socket().setTcpNoDelay(tcpNoDelay);
        Reader reader = getReader();  //从readers池中获得一个reader
        try {
          reader.startAdd(); // 激活readSelector,设置adding为true
          SelectionKey readKey = reader.registerChannel(channel);//将读事件设置成兴趣事件
          c = new Connection(readKey, channel, System.currentTimeMillis());//创建一个连接对象
          readKey.attach(c);   //将connection对象注入readKey
          synchronized (connectionList) {
            connectionList.add(numConnections, c);
            numConnections++;
          }
        …… 
        } finally {
//设置adding为false,采用notify()唤醒一个reader,其实代码十三中启动的每个reader都使
//用了wait()方法等待。因篇幅有限,就不贴出源码了。
          reader.finishAdd();
        }
      }
    }

当reader被唤醒,reader接着执行doRead()方法。

  • 接收请求 Server.Listener.Reader类中的doRead()方法和Server.Connection类中的readAndProcess()方法源码如下:

Method1:

 void doRead(SelectionKey key) throws InterruptedException {
      int count = 0;
      Connection c = (Connection)key.attachment();  //获得connection对象
      if (c == null) {
        return;  
      }
      c.setLastContact(System.currentTimeMillis());
      try {
        count = c.readAndProcess();    // 接受并处理请求  
      } catch (InterruptedException ieo) {
       ……
      }
     ……    
}

Method2:

public int readAndProcess() throws IOException, InterruptedException {
      while (true) {
        ……
        if (!rpcHeaderRead) {
          if (rpcHeaderBuffer == null) {
            rpcHeaderBuffer = ByteBuffer.allocate(2);
          }
         //读取请求头
          count = channelRead(channel, rpcHeaderBuffer);
          if (count < 0 || rpcHeaderBuffer.remaining() > 0) {
            return count;
          }
        // 读取请求版本号  
          int version = rpcHeaderBuffer.get(0);
          byte[] method = new byte[] {rpcHeaderBuffer.get(1)};
        ……  

          data = ByteBuffer.allocate(dataLength);
        }
        // 读取请求  
        count = channelRead(channel, data);

        if (data.remaining() == 0) {
         ……
          if (useSasl) {
         ……
          } else {
            processOneRpc(data.array());//处理请求
          }
        ……
          }
        } 
        return count;
      }
    }
  • 获得call对象

Method1:

 private void processOneRpc(byte[] buf) throws IOException,
        InterruptedException {
      if (headerRead) {
        processData(buf);
      } else {
        processHeader(buf);
        headerRead = true;
        if (!authorizeConnection()) {
          throw new AccessControlException("Connection from " + this
              + " for protocol " + header.getProtocol()
              + " is unauthorized for user " + user);
        }
      }
}

Method2:

    private void processData(byte[] buf) throws  IOException, InterruptedException {
      DataInputStream dis =
        new DataInputStream(new ByteArrayInputStream(buf));
      int id = dis.readInt();      // 尝试读取id
      Writable param = ReflectionUtils.newInstance(paramClass, conf);//读取参数
      param.readFields(dis);        

      Call call = new Call(id, param, this);  //封装成call
      callQueue.put(call);   // 将call存入callQueue
      incRpcCount();  // 增加rpc请求的计数
    }
  • 处理call对象 对call对象的处理是Server类中的Handler内部类来处理的。Server.Handler类中run()方法中的关键代码如下:
  while (running) {
        try {
          final Call call = callQueue.take(); //弹出call,可能会阻塞
          ……
          //调用ipc.Server类中的call()方法,但该call()方法是抽象方法,具体实现在RPC.Server类中
          value = call(call.connection.protocol, call.param, call.timestamp);
          synchronized (call.connection.responseQueue) {
            setupResponse(buf, call, 
                        (error == null) ? Status.SUCCESS : Status.ERROR, 
                        value, errorClass, error);
             ……
            //给客户端响应请求
            responder.doRespond(call);
          }
  }
  • 返回请求 Server.Responder类中的doRespond()方法源码如下:
void doRespond(Call call) throws IOException
{
    synchronized (call.connection.responseQueue)
    {
        call.connection.responseQueue.addLast(call);
        if (call.connection.responseQueue.size() == 1)
        {
            // 返回响应结果,并激活writeSelector
            processResponse(call.connection.responseQueue, true);
        }
    }
}

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏developerHaoz 的安卓之旅

Android Volley 源码解析(一),网络请求的执行流程

花了好几天,重新研究了 Volley 的源码实现,比起之前又有了一番新的体会,啃源码真的是一件让人纠结的事情,阅读优秀的源码,特别是难度相对较大的源码,一旦陷入...

1054
来自专栏Java架构

Java 8简明教程

1815
来自专栏大内老A

ASP.NET路由系统实现原理:HttpHandler的动态映射

我们知道一个请求最终通过一个具体的HttpHandler进行处理,而我们熟悉的用于表示一个Web页面的Page对象就是一个HttpHandler,被用于处理基于...

2006
来自专栏大内老A

通过一个模拟程序让你明白WCF大致的执行流程

在《通过一个模拟程序让你明白ASP.NET MVC是如何运行的》一文中我通过一个普通的ASP.NET Web程序模拟了ASP.NET MVC的执行流程,现在我们...

1946
来自专栏Java Web

Java 面试知识点解析(四)——版本特性篇(2)

在遨游了一番 Java Web 的世界之后,发现了自己的一些缺失,所以就着一篇深度好文:知名互联网公司校招 Java 开发岗面试知识点解析 ,来好好的对 Jav...

3568
来自专栏坚毅的PHP

redis学习笔记

摘录些nosqlfans上看的资源(http://blog.nosqlfan.com/html/3537.html),用了一年了,只会安装、启动和get set...

35710
来自专栏Code_iOS

Objective-C 内存管理(上)学习笔记

这里的“计数”表明必然会有一个东西(变量)来记录引用的变化,而在OC里这个变量就是retainCount;那么还有一个问题就是通过什么方式来操作这个变量,OC里...

502
来自专栏情情说

RabbitMQ实战:运行和管理RabbitMQ

上一篇 介绍了AMQP消息通信,包括队列、交换器和绑定,通过虚拟主机还可以隔离数据和权限,消息持久化和发送方确认模式确保了消息不丢失。

4496
来自专栏Spark生态圈

[spark] 从spark-submit开始解析整个任务调度流程

spark应用程序可以以Client模式和Cluster启动,区别在于Client模式下的Driver是在执行spark-submit命令节点上启动的,而Clu...

1702
来自专栏Adamshuang 技术文章

Zookeeper 通知更新可靠吗? 解读源码找答案!

遇到Keepper通知更新无法收到的问题,思考节点变更通知的可靠性,通过阅读源码解析了解到zk Watch的注册以及触发的机制,本地调试运行模拟zk更新的不可靠...

6868

扫码关注云+社区