专栏首页码农知识点zookeeper源码分析(7)-服务器请求处理链的初始化

zookeeper源码分析(7)-服务器请求处理链的初始化

在zookeeper集群中,分为Leader,Follewer,Observer三种类型的服务器角色,请求是通过各自的请求处理链来处理,所有的请求处理器均实现了RequestProcessor接口,通过处理链的上一个请求处理器调用该处理器的processRequest方法将请求传递过来,这个请求的传递过程是由一个线程完成的。

public interface RequestProcessor {
    public static class RequestProcessorException extends Exception {
        public RequestProcessorException(String msg, Throwable t) {
            super(msg, t);
        }
    }
//请求处理方法
    void processRequest(Request request) throws RequestProcessorException;

    void shutdown();
}

下面分别看下不同角色的服务器启动时的请求处理链初始化过程。

Leader请求处理链初始化

Leader的主要工作如下:

  • 事务请求的唯一调度和处理者,保证集群事务处理的顺序性。
  • 集群内部各服务器的调度者。 当leader完成集群间数据的同步时,会启动LeaderZooKeeperServer,初始化请求链。 LeaderZooKeeperServer.setupRequestProcessors
protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
        commitProcessor = new CommitProcessor(toBeAppliedProcessor,
                Long.toString(getServerId()), false,
                getZooKeeperServerListener());
        commitProcessor.start();
        ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
                commitProcessor);
        proposalProcessor.initialize();
        prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
        prepRequestProcessor.start();
//设置LeaderZooKeeperServer.firstProcessor
        firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);

        setupContainerManager();
    }

请求处理器都持有下一个请求处理器的引用private final RequestProcessor nextProcessor;,在上面的构造方法中会设置各自的nextProcessor,并启动处理器。最后会设置LeaderZooKeeperServer.firstProcessorLeaderRequestProcessor,这个处理器主要是对本地session创建临时节点时的请求预处理,将在=======介绍,它的nextProcessorPrepRequestProcessor

可大体认为Leader的请求处理链如下:

PrepRequestProcessor

Leader服务器的请求预处理器,进行一些创建请求事务头,事务体,ACL检查和版本检查等的预处理操作。初始化方法为:

public PrepRequestProcessor(ZooKeeperServer zks,
            RequestProcessor nextProcessor) {
        super("ProcessThread(sid:" + zks.getServerId() + " cport:"
                + zks.getClientPort() + "):", zks.getZooKeeperServerListener());
        this.nextProcessor = nextProcessor;
        this.zks = zks;
    }

主要属性为:

//请求存储队列,该线程启动后会不断从队列中获取请求进行预处理
LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>();
//设置为ProposalRequestProcessor
    private final RequestProcessor nextProcessor;
//当前zkServer实例
    ZooKeeperServer zks;

ProposalRequestProcessor Leader服务器的事务投票处理器,也是事务处理流程的发起者。对于非事务请求,它会直接将请求流转到 CommitProcessor处理器。对于事务请求,除了将请求交给CommitProcessor处理器外,还会根据请求类型创建对应的Proposal提议,并发送给所有的Follewer服务器来发起一次集群内的事务投票。同时,它还会将事务请求交给SyncRequestProcessor处理器进行事务日志的记录。 初始化过程为:

public ProposalRequestProcessor(LeaderZooKeeperServer zks,
            RequestProcessor nextProcessor) {
        this.zks = zks;
        this.nextProcessor = nextProcessor;
//初始化SyncRequestProcessor
        AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader());
        syncProcessor = new SyncRequestProcessor(zks, ackProcessor);
    }
//启动syncProcessor
 public void initialize() {
        syncProcessor.start();
    }

主要属性为:

 LeaderZooKeeperServer zks;
//设置为CommitProcessor
    RequestProcessor nextProcessor;
    SyncRequestProcessor syncProcessor;

SyncRequestProcessor 是事务日志记录处理器,主要用来将事务请求记录到事务日志文件中,同时会根据条件触发zookeeper进行数据快照。 并在数据同步完成后将请求传递给nextProcessor, 初始化过程为:

public SyncRequestProcessor(ZooKeeperServer zks,
            RequestProcessor nextProcessor) {
        super("SyncThread:" + zks.getServerId(), zks
                .getZooKeeperServerListener());
        this.zks = zks;
        this.nextProcessor = nextProcessor;
        running = true;
    }

主要属性为:

private final ZooKeeperServer zks;
//请求存储队列
    private final LinkedBlockingQueue<Request> queuedRequests =
        new LinkedBlockingQueue<Request>();
//leader服务器中中会设置为AckRequestProcessor
    private final RequestProcessor nextProcessor;
//用于异步快照的线程
    private Thread snapInProcess = null;
//线程运行状态标志
    volatile private boolean running;

AckRequestProcessor 负责在SyncRequestProcessor处理器完成事务日志记录后,向Proposal投票收集器发送ACK反馈,表示当前leader服务器已经完成了对该Proposal的事务日志记录。初始化过程为:

//持有leader引用,调用leader.processAck发送反馈
 Leader leader;

    AckRequestProcessor(Leader leader) {
        this.leader = leader;
    }

CommitProcessor 事务提交处理器,对于非事务请求,该处理器会直接将请求交给nextProcessor处理;对于事务请求,它会等待集群内针对Proposal的投票直到Proposal可被提交,它保证了事务请求的顺序处理。初始化过程为:

//LeaderZooKeeperServer.setupRequestProcessors
 commitProcessor = new CommitProcessor(toBeAppliedProcessor,
                Long.toString(getServerId()), false,
                getZooKeeperServerListener());
        commitProcessor.start();

public CommitProcessor(RequestProcessor nextProcessor, String id,
                           boolean matchSyncs, ZooKeeperServerListener listener) {
        super("CommitProcessor:" + id, listener);
        this.nextProcessor = nextProcessor;
        this.matchSyncs = matchSyncs;
    }

主要属性为:

 /** Default: numCores */
    public static final String ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADS =
        "zookeeper.commitProcessor.numWorkerThreads";
    /** Default worker pool shutdown timeout in ms: 5000 (5s) */
    public static final String ZOOKEEPER_COMMIT_PROC_SHUTDOWN_TIMEOUT =
        "zookeeper.commitProcessor.shutdownTimeout";

//刚进来的请求存储队列
    protected LinkedBlockingQueue<Request> queuedRequests =
        new LinkedBlockingQueue<Request>();

   //已经提交的请求队列
    protected final LinkedBlockingQueue<Request> committedRequests =
        new LinkedBlockingQueue<Request>();

//等待Proposal可被提交的请求,key为session id,value为所关联的session's requests.
    protected final Map<Long, LinkedList<Request>> pendingRequests =
            new HashMap<Long, LinkedList<Request>>(10000);

    /** The number of requests currently being processed */
    protected final AtomicInteger numRequestsProcessing = new AtomicInteger(0);

//设置为ToBeAppliedProcessor
    RequestProcessor nextProcessor;

    /** For testing purposes, we use a separated stopping condition for the
     * outer loop.*/
    protected volatile boolean stoppedMainLoop = true; 
    protected volatile boolean stopped = true;

    private long workerShutdownTimeoutMS;
//处理已提交的请求
    protected WorkerService workerPool;
    private Object emptyPoolSync = new Object();

    /**
     * This flag indicates whether we need to wait for a response to come back from the
     * leader or we just let the sync operation flow through like a read. The flag will
     * be false if the CommitProcessor is in a Leader pipeline.
     */
//leader服务器默认为false
    boolean matchSyncs;

start方法主要是初始化workerPool并启动线程

public void start() {
        int numCores = Runtime.getRuntime().availableProcessors();
        int numWorkerThreads = Integer.getInteger(
            ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADS, numCores);
        workerShutdownTimeoutMS = Long.getLong(
            ZOOKEEPER_COMMIT_PROC_SHUTDOWN_TIMEOUT, 5000);

        LOG.info("Configuring CommitProcessor with "
                 + (numWorkerThreads > 0 ? numWorkerThreads : "no")
                 + " worker threads.");
        if (workerPool == null) {
            workerPool = new WorkerService(
                "CommitProcWork", numWorkerThreads, true);
        }
        stopped = false;
        stoppedMainLoop = false;
        super.start();
    }

ToBeAppliedProcessor 维护了那些已被CommitProcessor处理过的可被提交的Proposal事务请求的队列leader.toBeApplied,会将请求交给下一个处理器next处理,如果是事务请求,next处理完之后需要将请求从toBeApplied中移除。初始化方法为:

/**
         * This request processor simply maintains the toBeApplied list. For
         * this to work next must be a FinalRequestProcessor and
         * FinalRequestProcessor.processRequest MUST process the request
         * synchronously!
         *
         * @param next
         *                a reference to the FinalRequestProcessor
         */
        ToBeAppliedRequestProcessor(RequestProcessor next, Leader leader) {
            if (!(next instanceof FinalRequestProcessor)) {
                throw new RuntimeException(ToBeAppliedRequestProcessor.class
                        .getName()
                        + " must be connected to "
                        + FinalRequestProcessor.class.getName()
                        + " not "
                        + next.getClass().getName());
            }
            this.leader = leader;
            this.next = next;
        }

主要属性为:

//设置为FinalRequestProcessor
private final RequestProcessor next;

        private final Leader leader;

FinalRequestProcessor 最后一个请求处理器,处理请求并构造客户端请求的响应。针对事务事情,会负责将事务应用到内存数据库中。仅有ZooKeeperServer zks 成员变量。

Follewer请求处理链初始化

Follewer服务器的主要工作如下:

  • 处理客户端非事务请求,转发事务请求给Leader服务器
  • 参与事务请求Proposal的投票
  • 参与Leader选举投票 当Follewer完成集群间数据的同步时,会启动FollowerZooKeeperServer,初始化请求链。 FollowerZooKeeperServer.setupRequestProcessors
protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        commitProcessor = new CommitProcessor(finalProcessor,
                Long.toString(getServerId()), true, getZooKeeperServerListener());
        commitProcessor.start();
        firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
        ((FollowerRequestProcessor) firstProcessor).start();
        syncProcessor = new SyncRequestProcessor(this,
                new SendAckRequestProcessor((Learner)getFollower()));
        syncProcessor.start();
    }

可以看出第一个请求处理器firstProcessor=new FollowerRequestProcessor(this, commitProcessor); 同leader服务器请求处理链的初始化过程,会初始化每个请求处理器的nextProcessor,并启动处理器。 Follewer的请求处理链如下:

FollowerRequestProcessor Follewer服务器的第一个请求处理器,识别当前请求是否是事务请求。如果是事务请求,不仅将请求交给nextProcessor,还会将事务请求转发给Leader服务器。初始化过程为:

public FollowerRequestProcessor(FollowerZooKeeperServer zks,
            RequestProcessor nextProcessor) {
        super("FollowerRequestProcessor:" + zks.getServerId(), zks
                .getZooKeeperServerListener());
        this.zks = zks;
        this.nextProcessor = nextProcessor;
    }

主要属性为:

FollowerZooKeeperServer zks;
//设置为CommitProcessor
    RequestProcessor nextProcessor;

    LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();

    boolean finished = false;

CommitProcessor 在Follewer服务器中,初始化过程为:

RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        commitProcessor = new CommitProcessor(finalProcessor,
                Long.toString(getServerId()), true, getZooKeeperServerListener());
        commitProcessor.start();

此时,matchSyncs=true,表示对于OpCode.sync同步请求,需要等待leader的响应。 SyncRequestProcessor 在Follewer服务器中,nextProcessor为SendAckRequestProcessor,会接收leader服务器的请求,并将数据强刷到磁盘上,并将请求交给SendAckRequestProcessor处理 SendAckRequestProcessor 承担事务日志记录反馈的角色,在完成强刷事务日志记录后,会向leader服务器发送ACK消息表明自身完成了事务日志的记录工作。

Observer请求处理链初始化

Observer作用是观察zookeeper集群的最新状态并将变更同步过来,原理同Follewer,对于非事务请求,可进行独立处理。对于事务请求,会转发给Leader服务器处理。但是不参与任何形式的投票。 当Observer完成集群间数据的同步时,会启动ObserverZooKeeperServer,初始化请求链。 ObserverZooKeeperServer.setupRequestProcessors

protected void setupRequestProcessors() {      
        // We might consider changing the processor behaviour of 
        // Observers to, for example, remove the disk sync requirements.
        // Currently, they behave almost exactly the same as followers.
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        commitProcessor = new CommitProcessor(finalProcessor,
                Long.toString(getServerId()), true,
                getZooKeeperServerListener());
        commitProcessor.start();
        firstProcessor = new ObserverRequestProcessor(this, commitProcessor);
        ((ObserverRequestProcessor) firstProcessor).start();

        /*
         * Observer should write to disk, so that the it won't request
         * too old txn from the leader which may lead to getting an entire
         * snapshot.
         *
         * However, this may degrade performance as it has to write to disk
         * and do periodic snapshot which may double the memory requirements
         */
        if (syncRequestProcessorEnabled) {
            syncProcessor = new SyncRequestProcessor(this, null);
            syncProcessor.start();
        }
    }

可以看出在初始化请求处理链的阶段会将SyncRequestProcessor启动,为了能记录事务日志和定期同步快照信息。但是nextProcessor为null,不需要参与事务请求的ACK回复。第一个请求处理器firstProcessor = new ObserverRequestProcessor(this, commitProcessor);同FollowerRequestProcessor的作用。 Observer请求处理链为:

感谢您的阅读,我是Monica23334 || Monica2333 。立下每周写一篇原创文章flag的小姐姐,关注我并期待打脸吧~

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • zookeeper源码分析(8)-会话管理

    zookeeper客户端和服务端维持一个TCP长连接,它们之间任何正常的通信都需要一个正常的会话。本文主要分析会话生命周期中会话状态的变化过程和客户端服务端如何...

    Monica2333
  • servlet/tomcat等容器/springMVC之间的关系

    Servlet是JavaEE规范的一种,主要是为了扩展Java作为Web服务的功能,统一接口。由其他内部厂商如tomcat,jetty内部实现web的功能。如一...

    Monica2333
  • zookeeper源码分析(3)— 一次会话的创建过程

    在一次会话的创建过程中,需要客户端首先发送创建会话请求,服务端集群创建会话成功后会将响应发送给客户端。

    Monica2333
  • Android 保持同一Session网络请求

    手机注册获取验证码的时候,总是说验证码过期,明明刚获取的验证码,还是提示验证码过期。这种情况就是多次网络请求不在同一个Session,很可能就是用了不同的请求方...

    三哥
  • 小程序成为金融机构科技创新突破口?

    不知你是否意识到,当你走进一些银行营业网点办理业务时,先下载手机银行APP,成了你办理业务的前提条件。如果碰到有不情愿的客户,大堂经理还会试图说服你:“您先下载...

    亚里士多的去了
  • 打通小程序和移动应用APP,实现一云多端

    Flutter 是当前最火热的跨端开发框架,可以快速开发出界面优雅、性能卓越的跨端应用,并且同时支持 AOT 和 JIT 两种运行时,兼顾研发效率和应用性能。

    腾讯云开发TCB
  • 嵌入式程序员也能开发微信小程序

    对于广大的嵌入式程序员来说,可能觉得小程序开发离自己可能很远,其实随着现在技术的进步,物联网的发展,我们需要掌握的东西越来越多,包括智能互联,从微信小程序还没有...

    用户1605515
  • 小程序实战1-项目总览

    通过3周业余时间的开发,完成了我的车管家小程序开发,并提交审核发布出来了。希望通过接下来的系列博客,带领大家完成一个这样小程序开发,并提交微信审核发布出来。

    八哥
  • php设计模式

    什么是设计模式 设计模式,是一种解决问题的思维,而并非某种特定的方法。是前人给我们总结的宝贵经验。学习设计模式是为了编写可复用、可拓展、高性能软件。学习设计模式...

    joshua317
  • 小程序+Saas平台:新一代的低成本流量入口

    导读:从微信公众号上线以来,目前已经超过了2000万个,通过公众号低成本获取流量的时代已经过去,2018年,新一代的流量入口会在哪里呢?

    微购儿小程序

扫码关注云+社区

领取腾讯云代金券