在zookeeper集群中,分为Leader,Follewer,Observer三种类型的服务器角色,请求是通过各自的请求处理链来处理,所有的请求处理器均实现了RequestProcessor
接口,通过处理链的上一个请求处理器调用该处理器的processRequest
方法将请求传递过来,这个请求的传递过程是由一个线程完成的。
public interface RequestProcessor { public static class RequestProcessorException extends Exception { public RequestProcessorException(String msg, Throwable t) { super(msg, t); } } //请求处理方法 void processRequest(Request request) throws RequestProcessorException; void shutdown(); }
下面分别看下不同角色的服务器启动时的请求处理链初始化过程。
Leader的主要工作如下:
LeaderZooKeeperServer.setupRequestProcessors
protected void setupRequestProcessors() { RequestProcessor finalProcessor = new FinalRequestProcessor(this); RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader()); commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener()); commitProcessor.start(); ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor); proposalProcessor.initialize(); prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor); prepRequestProcessor.start(); //设置LeaderZooKeeperServer.firstProcessor firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor); setupContainerManager(); }
请求处理器都持有下一个请求处理器的引用private final RequestProcessor nextProcessor;
,在上面的构造方法中会设置各自的nextProcessor
,并启动处理器。最后会设置LeaderZooKeeperServer.firstProcessor
为LeaderRequestProcessor
,这个处理器主要是对本地session创建临时节点时的请求预处理,将在=======介绍,它的nextProcessor
为PrepRequestProcessor
。
可大体认为Leader的请求处理链如下:
PrepRequestProcessor
Leader服务器的请求预处理器,进行一些创建请求事务头,事务体,ACL检查和版本检查等的预处理操作。初始化方法为:
public PrepRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) { super("ProcessThread(sid:" + zks.getServerId() + " cport:" + zks.getClientPort() + "):", zks.getZooKeeperServerListener()); this.nextProcessor = nextProcessor; this.zks = zks; }
主要属性为:
//请求存储队列,该线程启动后会不断从队列中获取请求进行预处理 LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>(); //设置为ProposalRequestProcessor private final RequestProcessor nextProcessor; //当前zkServer实例 ZooKeeperServer zks;
ProposalRequestProcessor
Leader服务器的事务投票处理器,也是事务处理流程的发起者。对于非事务请求,它会直接将请求流转到 CommitProcessor
处理器。对于事务请求,除了将请求交给CommitProcessor
处理器外,还会根据请求类型创建对应的Proposal提议,并发送给所有的Follewer服务器来发起一次集群内的事务投票。同时,它还会将事务请求交给SyncRequestProcessor
处理器进行事务日志的记录。
初始化过程为:
public ProposalRequestProcessor(LeaderZooKeeperServer zks, RequestProcessor nextProcessor) { this.zks = zks; this.nextProcessor = nextProcessor; //初始化SyncRequestProcessor AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader()); syncProcessor = new SyncRequestProcessor(zks, ackProcessor); } //启动syncProcessor public void initialize() { syncProcessor.start(); }
主要属性为:
LeaderZooKeeperServer zks; //设置为CommitProcessor RequestProcessor nextProcessor; SyncRequestProcessor syncProcessor;
SyncRequestProcessor
是事务日志记录处理器,主要用来将事务请求记录到事务日志文件中,同时会根据条件触发zookeeper进行数据快照。
并在数据同步完成后将请求传递给nextProcessor
,
初始化过程为:
public SyncRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) { super("SyncThread:" + zks.getServerId(), zks .getZooKeeperServerListener()); this.zks = zks; this.nextProcessor = nextProcessor; running = true; }
主要属性为:
private final ZooKeeperServer zks; //请求存储队列 private final LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>(); //leader服务器中中会设置为AckRequestProcessor private final RequestProcessor nextProcessor; //用于异步快照的线程 private Thread snapInProcess = null; //线程运行状态标志 volatile private boolean running;
AckRequestProcessor
负责在SyncRequestProcessor处理器完成事务日志记录后,向Proposal投票收集器发送ACK反馈,表示当前leader服务器已经完成了对该Proposal的事务日志记录。初始化过程为:
//持有leader引用,调用leader.processAck发送反馈 Leader leader; AckRequestProcessor(Leader leader) { this.leader = leader; }
CommitProcessor
事务提交处理器,对于非事务请求,该处理器会直接将请求交给nextProcessor
处理;对于事务请求,它会等待集群内针对Proposal的投票直到Proposal可被提交,它保证了事务请求的顺序处理。初始化过程为:
//LeaderZooKeeperServer.setupRequestProcessors commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener()); commitProcessor.start(); public CommitProcessor(RequestProcessor nextProcessor, String id, boolean matchSyncs, ZooKeeperServerListener listener) { super("CommitProcessor:" + id, listener); this.nextProcessor = nextProcessor; this.matchSyncs = matchSyncs; }
主要属性为:
/** Default: numCores */ public static final String ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADS = "zookeeper.commitProcessor.numWorkerThreads"; /** Default worker pool shutdown timeout in ms: 5000 (5s) */ public static final String ZOOKEEPER_COMMIT_PROC_SHUTDOWN_TIMEOUT = "zookeeper.commitProcessor.shutdownTimeout"; //刚进来的请求存储队列 protected LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>(); //已经提交的请求队列 protected final LinkedBlockingQueue<Request> committedRequests = new LinkedBlockingQueue<Request>(); //等待Proposal可被提交的请求,key为session id,value为所关联的session's requests. protected final Map<Long, LinkedList<Request>> pendingRequests = new HashMap<Long, LinkedList<Request>>(10000); /** The number of requests currently being processed */ protected final AtomicInteger numRequestsProcessing = new AtomicInteger(0); //设置为ToBeAppliedProcessor RequestProcessor nextProcessor; /** For testing purposes, we use a separated stopping condition for the * outer loop.*/ protected volatile boolean stoppedMainLoop = true; protected volatile boolean stopped = true; private long workerShutdownTimeoutMS; //处理已提交的请求 protected WorkerService workerPool; private Object emptyPoolSync = new Object(); /** * This flag indicates whether we need to wait for a response to come back from the * leader or we just let the sync operation flow through like a read. The flag will * be false if the CommitProcessor is in a Leader pipeline. */ //leader服务器默认为false boolean matchSyncs;
start方法主要是初始化workerPool
并启动线程
public void start() { int numCores = Runtime.getRuntime().availableProcessors(); int numWorkerThreads = Integer.getInteger( ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADS, numCores); workerShutdownTimeoutMS = Long.getLong( ZOOKEEPER_COMMIT_PROC_SHUTDOWN_TIMEOUT, 5000); LOG.info("Configuring CommitProcessor with " + (numWorkerThreads > 0 ? numWorkerThreads : "no") + " worker threads."); if (workerPool == null) { workerPool = new WorkerService( "CommitProcWork", numWorkerThreads, true); } stopped = false; stoppedMainLoop = false; super.start(); }
ToBeAppliedProcessor
维护了那些已被CommitProcessor处理过的可被提交的Proposal事务请求的队列leader.toBeApplied
,会将请求交给下一个处理器next
处理,如果是事务请求,next处理完之后需要将请求从toBeApplied
中移除。初始化方法为:
/** * This request processor simply maintains the toBeApplied list. For * this to work next must be a FinalRequestProcessor and * FinalRequestProcessor.processRequest MUST process the request * synchronously! * * @param next * a reference to the FinalRequestProcessor */ ToBeAppliedRequestProcessor(RequestProcessor next, Leader leader) { if (!(next instanceof FinalRequestProcessor)) { throw new RuntimeException(ToBeAppliedRequestProcessor.class .getName() + " must be connected to " + FinalRequestProcessor.class.getName() + " not " + next.getClass().getName()); } this.leader = leader; this.next = next; }
主要属性为:
//设置为FinalRequestProcessor private final RequestProcessor next; private final Leader leader;
FinalRequestProcessor
最后一个请求处理器,处理请求并构造客户端请求的响应。针对事务事情,会负责将事务应用到内存数据库中。仅有ZooKeeperServer zks
成员变量。
Follewer服务器的主要工作如下:
FollowerZooKeeperServer.setupRequestProcessors
protected void setupRequestProcessors() { RequestProcessor finalProcessor = new FinalRequestProcessor(this); commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener()); commitProcessor.start(); firstProcessor = new FollowerRequestProcessor(this, commitProcessor); ((FollowerRequestProcessor) firstProcessor).start(); syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor((Learner)getFollower())); syncProcessor.start(); }
可以看出第一个请求处理器firstProcessor=new FollowerRequestProcessor(this, commitProcessor);
同leader服务器请求处理链的初始化过程,会初始化每个请求处理器的nextProcessor
,并启动处理器。
Follewer的请求处理链如下:
FollowerRequestProcessor
Follewer服务器的第一个请求处理器,识别当前请求是否是事务请求。如果是事务请求,不仅将请求交给nextProcessor,还会将事务请求转发给Leader服务器。初始化过程为:
public FollowerRequestProcessor(FollowerZooKeeperServer zks, RequestProcessor nextProcessor) { super("FollowerRequestProcessor:" + zks.getServerId(), zks .getZooKeeperServerListener()); this.zks = zks; this.nextProcessor = nextProcessor; }
主要属性为:
FollowerZooKeeperServer zks; //设置为CommitProcessor RequestProcessor nextProcessor; LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>(); boolean finished = false;
CommitProcessor
在Follewer服务器中,初始化过程为:
RequestProcessor finalProcessor = new FinalRequestProcessor(this); commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener()); commitProcessor.start();
此时,matchSyncs=true
,表示对于OpCode.sync
同步请求,需要等待leader的响应。
SyncRequestProcessor
在Follewer服务器中,nextProcessor
为SendAckRequestProcessor,会接收leader服务器的请求,并将数据强刷到磁盘上,并将请求交给SendAckRequestProcessor处理
SendAckRequestProcessor
承担事务日志记录反馈的角色,在完成强刷事务日志记录后,会向leader服务器发送ACK消息表明自身完成了事务日志的记录工作。
Observer作用是观察zookeeper集群的最新状态并将变更同步过来,原理同Follewer,对于非事务请求,可进行独立处理。对于事务请求,会转发给Leader服务器处理。但是不参与任何形式的投票。
当Observer完成集群间数据的同步时,会启动ObserverZooKeeperServer,初始化请求链。
ObserverZooKeeperServer.setupRequestProcessors
protected void setupRequestProcessors() { // We might consider changing the processor behaviour of // Observers to, for example, remove the disk sync requirements. // Currently, they behave almost exactly the same as followers. RequestProcessor finalProcessor = new FinalRequestProcessor(this); commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener()); commitProcessor.start(); firstProcessor = new ObserverRequestProcessor(this, commitProcessor); ((ObserverRequestProcessor) firstProcessor).start(); /* * Observer should write to disk, so that the it won't request * too old txn from the leader which may lead to getting an entire * snapshot. * * However, this may degrade performance as it has to write to disk * and do periodic snapshot which may double the memory requirements */ if (syncRequestProcessorEnabled) { syncProcessor = new SyncRequestProcessor(this, null); syncProcessor.start(); } }
可以看出在初始化请求处理链的阶段会将SyncRequestProcessor启动,为了能记录事务日志和定期同步快照信息。但是nextProcessor
为null,不需要参与事务请求的ACK回复。第一个请求处理器firstProcessor = new ObserverRequestProcessor(this, commitProcessor);
同FollowerRequestProcessor的作用。
Observer请求处理链为:
感谢您的阅读,我是Monica23334 || Monica2333 。立下每周写一篇原创文章flag的小姐姐,关注我并期待打脸吧~
本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。
我来说两句