转载

zookeeper源码分析之五服务端(集群leader)处理请求流程

leader的实现类为LeaderZooKeeperServer,它间接继承自标准ZookeeperServer。它规定了请求到达leader时需要经历的路径:

PrepRequestProcessor -> ProposalRequestProcessor ->CommitProcessor -> Leader.ToBeAppliedRequestProcessor -> FinalRequestProcessor

具体情况可以参看代码:

@Override protected void setupRequestProcessors() {     RequestProcessor finalProcessor = new FinalRequestProcessor(this);     RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());     commitProcessor = new CommitProcessor(toBeAppliedProcessor,      Long.toString(getServerId()), false,      getZooKeeperServerListener());     commitProcessor.start();     ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,      commitProcessor);     proposalProcessor.initialize();     prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);     prepRequestProcessor.start();     firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);     setupContainerManager(); } 

让我们一步步分析这些RP都做了什么工作?其中PrepRequestProcessor、 FinalRequestProcessor已经在上篇文章中做了分析:

zookeeper源码分析之四服务端(单机)处理请求流程

那我们就开始余下的RP吧

1. ProposalRequestProcessor

这个RP仅仅将请求转发到AckRequestProcessor和SyncRequestProcessor上,看具体代码:

public void processRequest(Request request) throws RequestProcessorException {  // LOG.warn("Ack>>> cxid = " + request.cxid + " type = " +  // request.type + " id = " + request.sessionId);  // request.addRQRec(">prop");  /* In the following IF-THEN-ELSE block, we process syncs on the leader.   * If the sync is coming from a follower, then the follower   * handler adds it to syncHandler. Otherwise, if it is a client of   * the leader that issued the sync command, then syncHandler won't   * contain the handler. In this case, we add it to syncHandler, and   * call processRequest on the next processor.   */  if (request instanceof LearnerSyncRequest){   zks.getLeader().processSync((LearnerSyncRequest)request);  } else {   nextProcessor.processRequest(request);   if (request.getHdr() != null) {    // We need to sync and get consensus on any transactions    try {     zks.getLeader().propose(request);    } catch (XidRolloverException e) {     throw new RequestProcessorException(e.getMessage(), e);    }    syncProcessor.processRequest(request);   }  } } 

SyncRequestProcessor 我们已经在上文中进行了分析,这里就不在赘述了,那就看看AckRequestProcessor的工作是什么吧?

AckRequestProcessor仅仅将发送过来的请求作为ACk转发给leader。代码见明细:

/**  * Forward the request as an ACK to the leader  */ public void processRequest(Request request) {  QuorumPeer self = leader.self;  if(self != null)   leader.processAck(self.getId(), request.zxid, null);  else   LOG.error("Null QuorumPeer"); } 

leader处理请求如下所示:

/**  * Keep a count of acks that are received by the leader for a particular  * proposal  *  * @param zxid, the zxid of the proposal sent out  * @param sid, the id of the server that sent the ack  * @param followerAddr  */ synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {    if (!allowedToCommit) return; // last op committed was a leader change - from now on           // the new leader should commit    if (LOG.isTraceEnabled()) {   LOG.trace("Ack zxid: 0x{}", Long.toHexString(zxid));   for (Proposal p : outstandingProposals.values()) {    long packetZxid = p.packet.getZxid();    LOG.trace("outstanding proposal: 0x{}",      Long.toHexString(packetZxid));   }   LOG.trace("outstanding proposals all");  }  if ((zxid & 0xffffffffL) == 0) {   /*    * We no longer process NEWLEADER ack with this method. However,    * the learner sends an ack back to the leader after it gets    * UPTODATE, so we just ignore the message.    */   return;  }  if (outstandingProposals.size() == 0) {   if (LOG.isDebugEnabled()) {    LOG.debug("outstanding is 0");   }   return;  }  if (lastCommitted >= zxid) {   if (LOG.isDebugEnabled()) {    LOG.debug("proposal has already been committed, pzxid: 0x{} zxid: 0x{}",      Long.toHexString(lastCommitted), Long.toHexString(zxid));   }   // The proposal has already been committed   return;  }  Proposal p = outstandingProposals.get(zxid);  if (p == null) {   LOG.warn("Trying to commit future proposal: zxid 0x{} from {}",     Long.toHexString(zxid), followerAddr);   return;  }  p.addAck(sid);    /*if (LOG.isDebugEnabled()) {   LOG.debug("Count for zxid: 0x{} is {}",     Long.toHexString(zxid), p.ackSet.size());  }*/  boolean hasCommitted = tryToCommit(p, zxid, followerAddr);  // If p is a reconfiguration, multiple other operations may be ready to be committed,  // since operations wait for different sets of acks.    // Currently we only permit one outstanding reconfiguration at a time    // such that the reconfiguration and subsequent outstanding ops proposed while the reconfig is    // pending all wait for a quorum of old and new config, so its not possible to get enough acks    // for an operation without getting enough acks for preceding ops. But in the future if multiple    // concurrent reconfigs are allowed, this can happen and then we need to check whether some pending  // ops may already have enough acks and can be committed, which is what this code does.  if (hasCommitted && p.request!=null && p.request.getHdr().getType() == OpCode.reconfig){      long curZxid = zxid;     while (allowedToCommit && hasCommitted && p!=null){      curZxid++;      p = outstandingProposals.get(curZxid);      if (p !=null) hasCommitted = tryToCommit(p, curZxid, null);         }  } } 

调用实现,最终由CommitProcessor 接着处理请求:

/**  * @return True if committed, otherwise false.  * @param a proposal p  **/ synchronized public boolean tryToCommit(Proposal p, long zxid, SocketAddress followerAddr) {     // make sure that ops are committed in order. With reconfigurations it is now possible  // that different operations wait for different sets of acks, and we still want to enforce  // that they are committed in order. Currently we only permit one outstanding reconfiguration  // such that the reconfiguration and subsequent outstanding ops proposed while the reconfig is  // pending all wait for a quorum of old and new config, so its not possible to get enough acks  // for an operation without getting enough acks for preceding ops. But in the future if multiple  // concurrent reconfigs are allowed, this can happen.  if (outstandingProposals.containsKey(zxid - 1)) return false;  // getting a quorum from all necessary configurations   if (!p.hasAllQuorums()) {    return false;          }   // commit proposals in order   if (zxid != lastCommitted+1) {      LOG.warn("Commiting zxid 0x" + Long.toHexString(zxid)       + " from " + followerAddr + " not first!");     LOG.warn("First is "       + (lastCommitted+1));   }      // in order to be committed, a proposal must be accepted by a quorum         outstandingProposals.remove(zxid);   if (p.request != null) {    toBeApplied.add(p);   }   if (p.request == null) {     LOG.warn("Going to commmit null: " + p);   } else if (p.request.getHdr().getType() == OpCode.reconfig) {                  LOG.debug("Committing a reconfiguration! " + outstandingProposals.size());      //if this server is voter in new config with the same quorum address,      //then it will remain the leader     //otherwise an up-to-date follower will be designated as leader. This saves     //leader election time, unless the designated leader fails                Long designatedLeader = getDesignatedLeader(p, zxid);     //LOG.warn("designated leader is: " + designatedLeader);     QuorumVerifier newQV = p.qvAcksetPairs.get(p.qvAcksetPairs.size()-1).getQuorumVerifier();     self.processReconfig(newQV, designatedLeader, zk.getZxid(), true);     if (designatedLeader != self.getId()) {     allowedToCommit = false;     }     // we're sending the designated leader, and if the leader is changing the followers are      // responsible for closing the connection - this way we are sure that at least a majority of them      // receive the commit message.     commitAndActivate(zxid, designatedLeader);     informAndActivate(p, designatedLeader);     //turnOffFollowers();   } else {     commit(zxid);     inform(p);   }   zk.commitProcessor.commit(p.request);   if(pendingSyncs.containsKey(zxid)){     for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {     sendSync(r);     }        }    return  true;  } 

该程序第一步是发送一个请求到Quorum的所有成员

/**  * Create a commit packet and send it to all the members of the quorum  *  * @param zxid  */ public void commit(long zxid) {  synchronized(this){   lastCommitted = zxid;  }  QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);  sendPacket(qp); } 

发送报文如下:

/**  * send a packet to all the followers ready to follow  *  * @param qp  *                the packet to be sent  */ void sendPacket(QuorumPacket qp) {     synchronized (forwardingFollowers) {         for (LearnerHandler f : forwardingFollowers) {             f.queuePacket(qp);         }     } } 

第二步是通知Observer

/**  * Create an inform packet and send it to all observers.  * @param zxid  * @param proposal  */ public void inform(Proposal proposal) {     QuorumPacket qp = new QuorumPacket(Leader.INFORM, proposal.request.zxid,                                         proposal.packet.getData(), null);     sendObserverPacket(qp); } 

发送observer程序如下:

/**      * send a packet to all observers      */     void sendObserverPacket(QuorumPacket qp) {         for (LearnerHandler f : getObservingLearners()) {             f.queuePacket(qp);         }     }

第三步到

 zk.commitProcessor.commit(p.request);

2. CommitProcessor

CommitProcessor是多线程的,线程间通信通过queue,atomic和wait/notifyAll同步。CommitProcessor扮演一个网关角色,允许请求到剩下的处理管道。在同一瞬间,它支持多个读请求而仅支持一个写请求,这是为了保证写请求在事务中的顺序。

1个commit处理主线程,它监控请求队列,并将请求分发到工作线程,分发过程基于sessionId,这样特定session的读写请求通常分发到同一个线程,因而可以保证运行的顺序。

0~N个工作进程,他们在请求上运行剩下的请求处理管道。如果配置为0个工作线程,主commit线程将会直接运行管道。

经典(默认)线程数是:在32核的机器上,一个commit处理线程和32个工作线程。

多线程的限制:

每个session的请求处理必须是顺序的。

写请求处理必须按照zxid顺序。

必须保证一个session内不会出现写条件竞争,条件竞争可能导致另外一个session的读请求触发监控。

当前实现解决第三个限制,仅仅通过不允许在写请求时允许读进程的处理。

@Override public void run() {  Request request;  try {   while (!stopped) {    synchronized(this) {     while (      !stopped &&      ((queuedRequests.isEmpty() || isWaitingForCommit() || isProcessingCommit()) &&       (committedRequests.isEmpty() || isProcessingRequest()))) {      wait();     }    }    /*     * Processing queuedRequests: Process the next requests until we     * find one for which we need to wait for a commit. We cannot     * process a read request while we are processing write request.     */    while (!stopped && !isWaitingForCommit() &&        !isProcessingCommit() &&        (request = queuedRequests.poll()) != null) {     if (needCommit(request)) {      nextPending.set(request);     } else {      sendToNextProcessor(request);     }    }    /*     * Processing committedRequests: check and see if the commit     * came in for the pending request. We can only commit a     * request when there is no other request being processed.     */    processCommitted();   }  } catch (Throwable e) {   handleException(this.getName(), e);  }  LOG.info("CommitProcessor exited loop!"); } 

主逻辑程序如下:

/*  * Separated this method from the main run loop  * for test purposes (ZOOKEEPER-1863)  */ protected void processCommitted() {  Request request;  if (!stopped && !isProcessingRequest() &&    (committedRequests.peek() != null)) {   /*    * ZOOKEEPER-1863: continue only if there is no new request    * waiting in queuedRequests or it is waiting for a    * commit.     */   if ( !isWaitingForCommit() && !queuedRequests.isEmpty()) {    return;   }   request = committedRequests.poll();   /*    * We match with nextPending so that we can move to the    * next request when it is committed. We also want to    * use nextPending because it has the cnxn member set    * properly.    */   Request pending = nextPending.get();   if (pending != null &&    pending.sessionId == request.sessionId &&    pending.cxid == request.cxid) {    // we want to send our version of the request.    // the pointer to the connection in the request    pending.setHdr(request.getHdr());    pending.setTxn(request.getTxn());    pending.zxid = request.zxid;    // Set currentlyCommitting so we will block until this    // completes. Cleared by CommitWorkRequest after    // nextProcessor returns.    currentlyCommitting.set(pending);    nextPending.set(null);    sendToNextProcessor(pending);   } else {    // this request came from someone else so just    // send the commit packet    currentlyCommitting.set(request);    sendToNextProcessor(request);   }  }    } 

启动多线程处理程序

/**  * Schedule final request processing; if a worker thread pool is not being  * used, processing is done directly by this thread.  */ private void sendToNextProcessor(Request request) {     numRequestsProcessing.incrementAndGet();     workerPool.schedule(new CommitWorkRequest(request), request.sessionId); } 

真实逻辑是

/**  * Schedule work to be done by the thread assigned to this id. Thread  * assignment is a single mod operation on the number of threads.  If a  * worker thread pool is not being used, work is done directly by  * this thread.  */ public void schedule(WorkRequest workRequest, long id) {  if (stopped) {   workRequest.cleanup();   return;  }  ScheduledWorkRequest scheduledWorkRequest =   new ScheduledWorkRequest(workRequest);  // If we have a worker thread pool, use that; otherwise, do the work  // directly.  int size = workers.size();  if (size > 0) {   try {    // make sure to map negative ids as well to [0, size-1]    int workerNum = ((int) (id % size) + size) % size;    ExecutorService worker = workers.get(workerNum);    worker.execute(scheduledWorkRequest);   } catch (RejectedExecutionException e) {    LOG.warn("ExecutorService rejected execution", e);    workRequest.cleanup();   }  } else {   // When there is no worker thread pool, do the work directly   // and wait for its completion   scheduledWorkRequest.start();   try {    scheduledWorkRequest.join();   } catch (InterruptedException e) {    LOG.warn("Unexpected exception", e);    Thread.currentThread().interrupt();   }  } } 

请求处理线程run方法:

@Override public void run() {  try {   // Check if stopped while request was on queue   if (stopped) {    workRequest.cleanup();    return;   }   workRequest.doWork();  } catch (Exception e) {   LOG.warn("Unexpected exception", e);   workRequest.cleanup();  } } 

调用commitProcessor的doWork方法

public void doWork() throws RequestProcessorException {  try {   nextProcessor.processRequest(request);  } finally {   // If this request is the commit request that was blocking   // the processor, clear.   currentlyCommitting.compareAndSet(request, null);   /*    * Decrement outstanding request count. The processor may be    * blocked at the moment because it is waiting for the pipeline    * to drain. In that case, wake it up if there are pending    * requests.    */   if (numRequestsProcessing.decrementAndGet() == 0) {    if (!queuedRequests.isEmpty() ||     !committedRequests.isEmpty()) {     wakeup();    }   }  } } 

将请求传递给下一个RP:Leader.ToBeAppliedRequestProcessor

3.Leader.ToBeAppliedRequestProcessor

Leader.ToBeAppliedRequestProcessor仅仅维护一个toBeApplied列表。

/**  * This request processor simply maintains the toBeApplied list. For  * this to work next must be a FinalRequestProcessor and  * FinalRequestProcessor.processRequest MUST process the request  * synchronously!  *  * @param next  *    a reference to the FinalRequestProcessor  */ ToBeAppliedRequestProcessor(RequestProcessor next, Leader leader) {  if (!(next instanceof FinalRequestProcessor)) {   throw new RuntimeException(ToBeAppliedRequestProcessor.class     .getName()     + " must be connected to "     + FinalRequestProcessor.class.getName()     + " not "     + next.getClass().getName());  }  this.leader = leader;  this.next = next; } /*  * (non-Javadoc)  *  * @see org.apache.zookeeper.server.RequestProcessor#processRequest(org.apache.zookeeper.server.Request)  */ public void processRequest(Request request) throws RequestProcessorException {  next.processRequest(request);  // The only requests that should be on toBeApplied are write  // requests, for which we will have a hdr. We can't simply use  // request.zxid here because that is set on read requests to equal  // the zxid of the last write op.  if (request.getHdr() != null) {   long zxid = request.getHdr().getZxid();   Iterator<Proposal> iter = leader.toBeApplied.iterator();   if (iter.hasNext()) {    Proposal p = iter.next();    if (p.request != null && p.request.zxid == zxid) {     iter.remove();     return;    }   }   LOG.error("Committed request not found on toBeApplied: "       + request);  } } 

4. FinalRequestProcessor前文已经说明,本文不在赘述。

小结:从上面的分析可以知道,leader处理请求的顺序分别是:PrepRequestProcessor -> ProposalRequestProcessor ->CommitProcessor -> Leader.ToBeAppliedRequestProcessor -> FinalRequestProcessor。

请求先通过PrepRequestProcessor接收请求,并进行包装,然后请求类型的不同,设置同享数据;主要负责通知所有follower和observer;CommitProcessor 启动多线程处理请求;Leader.ToBeAppliedRequestProcessor仅仅维护一个toBeApplied列表;

FinalRequestProcessor来作为消息处理器的终结者,发送响应消息,并触发watcher的处理程序。

正文到此结束
Loading...